How Kafka Consumers Read Messages from Topics

Kafka consumers are the applications that read messages from topics and turn raw event streams into useful actions — updating databases, sending notifications, running analytics, or triggering downstream workflows. Understanding how consumers work, how they fetch data, and how they handle the reading lifecycle gives you control over throughput, latency, and reliability in your Kafka-powered systems.

What a Kafka Consumer Is

A Kafka consumer is a client library embedded in your application. It subscribes to one or more topics, polls the Kafka broker for new messages, deserializes the byte arrays it receives back into usable objects, and delivers those records to your application code for processing. The consumer keeps track of which messages it has read using offsets stored in Kafka's internal topic.

Unlike traditional message brokers that push messages to consumers, Kafka uses a pull model. The consumer initiates each fetch by asking the broker: "Give me the next batch of messages from partition X starting at offset Y." This pull model lets consumers control their own read rate — a slow consumer doesn't get overwhelmed, and a fast consumer can pull as quickly as the broker can serve.

The Consumer Poll Loop

Every Kafka consumer runs a poll loop — the heart of any consumer application. The loop runs continuously, polling the broker for new messages, processing each batch, and then polling again.

CONSUMER POLL LOOP STRUCTURE:

consumer.subscribe(["orders", "payments"])  ← subscribe to topics

loop forever:
  records = consumer.poll(timeout=1000ms)   ← ask broker for messages
                                              wait up to 1 second if none ready
  for each record in records:
    process(record)                          ← your business logic here
    
  consumer.commitSync()                      ← mark messages as processed

Why the timeout matters:
  poll(0ms):   Return immediately, even if no records. Low latency, high CPU.
  poll(1000ms): Wait up to 1 second for records. Efficient for low-traffic topics.
  poll(5000ms): Wait up to 5 seconds. Good for very low-volume topics.

What Happens Inside poll()

The poll() call does more than just fetch messages. It also sends heartbeats to the broker (proving the consumer is alive), triggers partition rebalancing if needed (when consumers join or leave the group), and handles offset commits if auto-commit is enabled. Poll() is the single most important method in the consumer's lifecycle — calling it regularly is critical even when there is nothing to process.

Consumer Configuration: The Key Properties

bootstrap.servers

Same as the producer — the list of broker addresses used to discover the cluster. Provide two or three for resilience.

group.id

Identifies which consumer group this consumer belongs to. All consumers with the same group.id coordinate to share the workload of reading a topic. Consumers with different group.id values read the topic independently. This is one of Kafka's most powerful features — the same topic can serve multiple independent consumer groups simultaneously.

key.deserializer / value.deserializer

The reverse of the producer's serializers. Convert byte arrays back into objects. Must match the serializer used by the producer that wrote the messages.

auto.offset.reset

What to do when no committed offset exists for this consumer group. Options: earliest (start from the beginning), latest (start from now, skip history), none (throw an error).

max.poll.records

Maximum number of records returned in a single poll() call. Default 500. Reduce if your processing per record is slow and you risk exceeding max.poll.interval.ms. Increase if your processing is fast and you want higher throughput per poll.

max.poll.interval.ms

Maximum time allowed between two consecutive poll() calls. Default 5 minutes. If your application doesn't call poll() within this window, the broker assumes the consumer is dead and triggers a rebalance to redistribute its partitions to other consumers. This setting matters if your processing logic is slow — increase it if each batch takes longer than 5 minutes to process.

fetch.min.bytes and fetch.max.wait.ms

These two settings work together to control fetch efficiency. fetch.min.bytes tells the broker: "Don't send me less than N bytes in a response." If fewer bytes are available, the broker waits. fetch.max.wait.ms sets the maximum wait time — even if fetch.min.bytes haven't accumulated, send after this many milliseconds. Together they implement a server-side batching wait that reduces the number of small, wasteful fetches.

FETCH EFFICIENCY SETTINGS:

Low-latency scenario (real-time dashboard):
  fetch.min.bytes = 1        ← send immediately, even 1 byte
  fetch.max.wait.ms = 100    ← maximum 100ms wait
  Result: Very responsive. Many small fetches. Higher broker load.

High-throughput scenario (batch processing):
  fetch.min.bytes = 1048576  ← wait for at least 1MB of data
  fetch.max.wait.ms = 500    ← but no longer than 500ms wait
  Result: Fewer, larger fetches. Higher throughput. Slightly higher latency.

The Consumer Lifecycle

A consumer goes through distinct lifecycle phases from startup to shutdown. Understanding these phases helps you handle restarts, failures, and rebalances correctly.

Phase 1: Initialization

The consumer client starts, reads its configuration, and connects to the bootstrap server to discover the cluster. It creates the network connections it needs but has not yet joined any group or been assigned any partitions.

Phase 2: Group Join

When the consumer calls subscribe() and then poll() for the first time, it contacts the group coordinator broker (the broker responsible for managing this consumer group). It sends a JoinGroup request. The coordinator waits for all consumers in the group to join, then picks one as the group leader.

Phase 3: Partition Assignment

The group leader (one of the consumer instances) runs the partition assignment algorithm — deciding which partitions each consumer gets. The leader sends the assignment back to the coordinator, which distributes it to all group members. Each consumer now knows exactly which partitions it owns.

GROUP JOIN AND ASSIGNMENT:

3 consumers join group "analytics":
  Consumer A, Consumer B, Consumer C all call subscribe("orders")
  Topic "orders" has 6 partitions: P0, P1, P2, P3, P4, P5

Group coordinator picks Consumer A as leader.
Consumer A runs assignment:
  Consumer A → P0, P1
  Consumer B → P2, P3
  Consumer C → P4, P5

Coordinator distributes assignments.
Each consumer starts polling its assigned partitions.

Consumer A polls P0 from offset 0, P1 from offset 0.
Consumer B polls P2 from offset 0, P3 from offset 0.
Consumer C polls P4 from offset 0, P5 from offset 0.

Phase 4: Steady-State Reading

Each consumer runs its poll loop, fetching from its assigned partitions, processing messages, committing offsets, and sending periodic heartbeats to the coordinator. This is the normal operating state.

Phase 5: Rebalance

A rebalance is triggered when any consumer in the group joins, leaves, or fails. The coordinator signals all consumers to stop processing and return their partitions. The group goes through Phase 2-3 again. Partitions are reassigned. Consumers resume from their committed offsets on their new partitions.

Rebalances are disruptive — all consumers in the group pause processing during a rebalance. Minimizing rebalance frequency is a key Kafka performance objective (covered more in consumer groups topic).

Phase 6: Shutdown

When you call consumer.close(), the consumer commits its current offsets, sends a LeaveGroup request to the coordinator (triggering an immediate rebalance to redistribute its partitions), and closes all network connections cleanly. A graceful shutdown minimizes rebalance impact on the rest of the group.

Reading From Multiple Topics

A single consumer can subscribe to multiple topics simultaneously. Kafka merges the message streams from all subscribed topics and delivers them through the same poll() call.

SUBSCRIBING TO MULTIPLE TOPICS:

consumer.subscribe(["orders", "returns", "exchanges"])

poll() returns records from any of the three topics.
Each record has a topic() method to identify its source.

Processing:
  for record in records:
    if record.topic() == "orders":
      processOrder(record)
    elif record.topic() == "returns":
      processReturn(record)
    elif record.topic() == "exchanges":
      processExchange(record)

Consumers can also use pattern-based subscription — subscribe to all topics matching a regular expression. This is useful when new topics matching the pattern are created dynamically and you want the consumer to automatically pick them up without restart.

PATTERN SUBSCRIPTION:

consumer.subscribe(Pattern.compile("orders.*"))

Matches: "orders-us", "orders-eu", "orders-apac", "orders-new-v2"
Auto-picks up new topics matching the pattern as they are created.

Manual Partition Assignment

Instead of subscribing to topics (which triggers automatic partition assignment), consumers can manually assign specific partitions using consumer.assign(). Manual assignment bypasses the consumer group coordination and rebalancing — the consumer owns those partitions exclusively and permanently until explicitly changed.

MANUAL PARTITION ASSIGNMENT:

# Assign specific partitions directly (no group coordination):
partitions = [TopicPartition("orders", 0), TopicPartition("orders", 1)]
consumer.assign(partitions)

# Use case: custom parallel processing where you manage assignment yourself.
# Use case: exactly-once processing frameworks that manage offsets externally.
# Downside: no automatic rebalancing. You manage partition distribution manually.

Reading from a Specific Offset

Consumers can seek to any offset at any time using consumer.seek(). This is the mechanism behind event replay, bootstrapping new consumers, and recovering from corrupted processing.

SEEKING TO A SPECIFIC POSITION:

# Seek to beginning of a partition (replay everything):
consumer.seekToBeginning([TopicPartition("orders", 0)])

# Seek to end (skip all history, only get new messages):
consumer.seekToEnd([TopicPartition("orders", 0)])

# Seek to a specific offset:
consumer.seek(TopicPartition("orders", 0), 5000)

# Seek to a timestamp (e.g., replay last 2 hours):
timestamps = {TopicPartition("orders", 0): System.currentTimeMillis() - 7200000}
offsets = consumer.offsetsForTimes(timestamps)
for tp, offsetInfo in offsets.items():
    consumer.seek(tp, offsetInfo.offset())

Consumer Deserializers

Kafka stores raw bytes. Consumers use deserializers to convert bytes back into objects. The deserializer must match the serializer the producer used. A mismatch causes deserialization errors that can be difficult to debug.

COMMON DESERIALIZER PAIRS:

Producer serializer        → Consumer deserializer
StringSerializer           → StringDeserializer
IntegerSerializer          → IntegerDeserializer
ByteArraySerializer        → ByteArrayDeserializer
KafkaAvroSerializer        → KafkaAvroDeserializer
KafkaJsonSchemaSerializer  → KafkaJsonSchemaDeserializer

Configuration example (Java):
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props.put("schema.registry.url", "http://schema-registry:8081")

Handling Consumer Errors

Consumers encounter two types of errors: errors from the broker (network issues, authorization failures, partition leader election) and errors from your processing logic (bad data, downstream system failures, processing bugs).

For broker errors, the consumer client retries automatically for retriable errors and throws exceptions for non-retriable ones. Your poll loop should catch exceptions and decide whether to retry, skip, or dead-letter the problematic record.

For processing errors, a common pattern is the dead-letter queue — send failed messages to a separate "dead-letter" topic for later investigation and replay, rather than crashing the consumer or blocking the entire pipeline on one bad record.

DEAD-LETTER QUEUE PATTERN:

for record in consumer.poll():
  try:
    process(record)
  except ProcessingException as e:
    deadLetterProducer.send(
      topic = record.topic() + ".dead-letter",
      key = record.key(),
      value = record.value(),
      headers = {"error": str(e), "original-offset": record.offset()}
    )
    log.error("Failed to process record. Sent to dead-letter queue.", e)
    continue  ← skip bad record, continue with next

Dead-letter topic holds failed records for investigation.
Ops team inspects and replays after fixing the root cause.

Key Points

  • Kafka consumers use a pull model — they poll the broker for messages rather than having messages pushed to them.
  • The poll loop is the core of every consumer application. Calling poll() regularly is essential for heartbeating and partition assignment.
  • Key consumer configuration includes bootstrap.servers, group.id, auto.offset.reset, max.poll.records, and max.poll.interval.ms.
  • Consumers go through lifecycle phases: init → group join → partition assignment → steady-state reading → rebalance (on change) → shutdown.
  • consumer.seek() enables replay, bootstrapping from a timestamp, or jumping to a specific offset without external tooling.
  • Deserializers must match the serializers used by the producers that wrote the messages. A mismatch causes errors.
  • The dead-letter queue pattern handles processing failures gracefully — bad records go to a separate topic for investigation rather than blocking the pipeline.

Leave a Comment