Kafka Connect Integrating Kafka with External Systems
Moving data between Kafka and external systems — databases, file systems, cloud storage, search engines, data warehouses — requires code. Without Kafka Connect, every team writes custom producer and consumer code for each integration, duplicating effort and introducing inconsistent error handling. Kafka Connect is a framework that standardizes these integrations, providing pre-built connectors for hundreds of systems and a managed runtime that handles scaling, fault tolerance, and offset management automatically.
What Kafka Connect Is
Kafka Connect is a distributed, scalable, and fault-tolerant service for streaming data between Kafka and other systems. It runs as a standalone process (or cluster of processes) separate from your Kafka brokers. You configure connectors through a REST API — no code required for most integrations. Connectors do the actual work of reading from sources and writing to sinks.
KAFKA CONNECT ARCHITECTURE: EXTERNAL WORLD KAFKA CONNECT KAFKA CLUSTER ───────────────────────────────────────────────────────────────────────────── [PostgreSQL DB] ──────→ [Source Connector] ──────→ [Topic: db.orders] [MySQL DB] ──────→ [Source Connector] ──────→ [Topic: db.users] [S3 Files] ──────→ [Source Connector] ──────→ [Topic: raw-files] [Topic: processed] ──→ [Sink Connector] ──────→ [Elasticsearch] [Topic: analytics] ──→ [Sink Connector] ──────→ [Snowflake DW] [Topic: events] ──→ [Sink Connector] ──────→ [S3 Data Lake] Connect Worker 1, Worker 2, Worker 3 (distributed mode for resilience)
Source Connectors vs Sink Connectors
Source connectors pull data from external systems and push it into Kafka topics. They read from databases, file systems, message queues, APIs, and stream data into Kafka continuously.
Sink connectors read data from Kafka topics and write it to external systems. They take event streams out of Kafka and deliver them to databases, data warehouses, search engines, cloud storage, and other destinations.
CONNECTOR TYPE EXAMPLES: SOURCE CONNECTORS (external → Kafka): JDBC Source Connector: Database tables → Kafka topics (MySQL, PostgreSQL, Oracle) Debezium CDC Connector: Database change log → Kafka (row-level changes in real time) S3 Source Connector: Files in S3 → Kafka topics MongoDB Source: MongoDB oplog → Kafka topics Twitter/API sources: REST API data → Kafka topics SINK CONNECTORS (Kafka → external): JDBC Sink Connector: Kafka topics → database tables Elasticsearch Connector: Kafka topics → Elasticsearch index S3 Sink Connector: Kafka topics → S3 files (Parquet, JSON, Avro) Snowflake Connector: Kafka topics → Snowflake tables BigQuery Connector: Kafka topics → Google BigQuery Redis Connector: Kafka topics → Redis cache HTTP Sink: Kafka topics → REST API endpoints
Kafka Connect Deployment Modes
Standalone Mode
A single Connect worker process runs all connectors. State is stored locally on the machine. Simple to set up and suitable for development, testing, or low-volume single-machine deployments. Not fault-tolerant — if the machine fails, all connectors stop.
STANDALONE MODE STARTUP: # Start a standalone Connect worker: bin/connect-standalone.sh \ config/connect-standalone.properties \ config/connectors/my-jdbc-source.properties Everything runs in one process. Connector config loaded from file. Simple but not resilient.
Distributed Mode (Production Standard)
Multiple Connect workers form a cluster. Connectors and tasks are distributed across workers. If one worker fails, others automatically take over its tasks. Configuration is managed through a REST API and stored in internal Kafka topics — no configuration files on disk for connector definitions.
DISTRIBUTED MODE ARCHITECTURE: Connect Cluster (3 workers): Worker A: handles Connector 1 (Task 0, Task 1), Connector 3 (Task 0) Worker B: handles Connector 1 (Task 2), Connector 2 (Task 0, Task 1) Worker C: handles Connector 2 (Task 2), Connector 4 (Task 0, Task 1) Worker A crashes: Workers B and C detect failure (via Connect group protocol, like consumer groups) Tasks from Worker A redistributed to B and C automatically Processing resumes from last committed offset Internal Kafka topics used by distributed mode: connect-configs: Stores connector configurations connect-offsets: Stores source connector offsets connect-status: Stores connector and task status
Starting Kafka Connect in Distributed Mode
DISTRIBUTED MODE STARTUP: # Configure connect-distributed.properties: bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 group.id=connect-cluster ← all workers with same group form one cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.topic=connect-offsets config.storage.topic=connect-configs status.storage.topic=connect-status offset.storage.replication.factor=3 config.storage.replication.factor=3 status.storage.replication.factor=3 rest.host.name=0.0.0.0 rest.port=8083 # Start each worker (run on each machine): bin/connect-distributed.sh config/connect-distributed.properties
Deploying Connectors via REST API
Once Connect is running, you deploy, configure, and manage connectors through the REST API. No broker restart required. No file editing on the server. All configuration happens through API calls.
CONNECTOR MANAGEMENT REST API:
# List all running connectors:
GET http://connect-worker:8083/connectors
# Deploy a JDBC source connector:
POST http://connect-worker:8083/connectors
Content-Type: application/json
{
"name": "jdbc-orders-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/ecommerce",
"connection.user": "kafka_reader",
"connection.password": "secret",
"table.whitelist": "orders,order_items",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db.",
"poll.interval.ms": "5000"
}
}
# Check connector status:
GET http://connect-worker:8083/connectors/jdbc-orders-source/status
# Pause a connector:
PUT http://connect-worker:8083/connectors/jdbc-orders-source/pause
# Resume a connector:
PUT http://connect-worker:8083/connectors/jdbc-orders-source/resume
# Delete a connector:
DELETE http://connect-worker:8083/connectors/jdbc-orders-source
Debezium: Change Data Capture with Kafka Connect
Debezium is the most powerful source connector pattern in the Kafka ecosystem. It reads the database's internal change log (binary log in MySQL, WAL in PostgreSQL, redo log in Oracle) and streams every row-level change — insert, update, delete — as an event to Kafka. This is called Change Data Capture (CDC).
DEBEZIUM CDC FLOW:
Database: PostgreSQL ecommerce
Table: orders
INSERT order (id=100, total=89.99, status="pending")
↓ PostgreSQL WAL (Write-Ahead Log) records this change
↓ Debezium reads WAL
↓ Emits event to Kafka topic: postgres.public.orders
Event payload:
{
"op": "c", ← "c"=create, "u"=update, "d"=delete, "r"=read(snapshot)
"before": null,
"after": {
"id": 100,
"total": 89.99,
"status": "pending"
},
"source": {
"table": "orders",
"db": "ecommerce",
"lsn": 123456789 ← WAL position for exactly-once tracking
}
}
Consumers reading postgres.public.orders:
→ Elasticsearch: indexes the new order for search
→ Analytics: updates dashboard with new sale
→ Audit service: records the change for compliance
Transformations: Single Message Transforms (SMT)
Kafka Connect supports Single Message Transforms (SMTs) — lightweight transformations applied to each message as it flows through the connector. SMTs run within the Connect worker, between the source and the topic (or between the topic and the sink), without needing a separate stream processing system.
COMMON SMTs AND THEIR USES: ReplaceField: Add, drop, or rename fields in a record "transforms": "dropSensitive", "transforms.dropSensitive.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.dropSensitive.blacklist": "password,ssn,credit_card" ExtractField: Extract a nested field to become the message key "transforms": "extractId", "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractId.field": "id" MaskField: Replace sensitive fields with nulls or fixed strings "transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.maskPII.fields": "email,phone" "transforms.maskPII.replacement": "****" TimestampRouter: Route messages to time-based topic names Useful for creating daily/hourly topic partitions in S3 sink.
Connector Parallelism: Tasks
A single connector can run multiple tasks in parallel to increase throughput. Tasks are the actual worker threads that do the reading or writing. The number of tasks is configured per connector and distributed across Connect workers.
CONNECTOR TASKS AND PARALLELISM:
JDBC Source Connector reading 4 tables with tasks.max=4:
Task 0: reads table "orders"
Task 1: reads table "users"
Task 2: reads table "products"
Task 3: reads table "inventory"
All 4 tables read simultaneously across different workers.
S3 Sink Connector writing to topic with 6 partitions, tasks.max=6:
Task 0: reads partition 0 → writes to s3://bucket/p0/
Task 1: reads partition 1 → writes to s3://bucket/p1/
...
Task 5: reads partition 5 → writes to s3://bucket/p5/
6 parallel writes to S3. 6x throughput compared to tasks.max=1.
Rule: Set tasks.max equal to the number of partitions for sink connectors.
For source connectors, set it to the number of source entities (tables, files).
Key Points
- Kafka Connect is a managed framework for streaming data between Kafka and external systems. It eliminates the need to write custom integration code for most use cases.
- Source connectors pull data into Kafka. Sink connectors push Kafka data to external systems. Hundreds of pre-built connectors exist for common systems.
- Distributed mode runs multiple Connect workers as a cluster — fault-tolerant, scalable, and managed entirely through a REST API.
- Debezium CDC connectors read database change logs and stream row-level changes to Kafka in real time — enabling event-driven architectures without application code changes.
- Single Message Transforms (SMTs) allow lightweight per-message transformations within connectors — field masking, renaming, extraction, routing — without external stream processing.
- Set tasks.max to the number of source tables or sink partitions to achieve full parallelism and maximum throughput.
