Managing Consumer Offsets and Commit Strategies in Apache Kafka
Offset management is the mechanism that makes Kafka consumers reliable and recoverable. Commit strategies determine what your system promises about message delivery — whether it might process some messages twice, or skip some on failure, or achieve exactly-once processing. Choosing and implementing the right commit strategy directly determines how your application behaves when things go wrong.
Why Offset Commits Matter
Kafka consumers read messages and commit offsets to record their progress. A committed offset is Kafka's answer to the question: "How far has this consumer group progressed in reading this partition?" On restart after a crash, the consumer resumes from the committed offset. Everything between the last committed offset and the crash point gets re-read.
This means the gap between when you process a message and when you commit its offset is the window of potential re-processing. Shrink this window to reduce duplicate processing. Eliminate it entirely only with exactly-once semantics, which has its own complexity cost.
Auto-Commit: The Simple Default
When enable.auto.commit=true (the default), the consumer library automatically commits offsets at a fixed interval controlled by auto.commit.interval.ms (default 5000ms = 5 seconds). The commit happens inside the poll() call — when poll() is called, if enough time has passed since the last auto-commit, it triggers a commit of the current offset before fetching new messages.
AUTO-COMMIT BEHAVIOR (enable.auto.commit=true, interval=5s):
t=0s: poll() returns records [msg@0 to msg@14]
Process msg@0 through msg@14
t=5s: poll() → triggers auto-commit of offset=15
Returns records [msg@15 to msg@29]
Process msg@15 through msg@29
t=10s: poll() → triggers auto-commit of offset=30
Returns [msg@30 to msg@44]
Process...
CRASH at t=8s (committed up to offset=15):
Restart → resumes from offset=15
Re-reads msg@15 to msg@29 (already processed but not committed)
Duplicate processing of 15 messages.
RULE: Auto-commit = at-least-once delivery. Duplicates possible on crash.
The Hidden Danger of Auto-Commit
Auto-commit commits whatever offset the consumer is currently at — regardless of whether your processing logic has actually finished. If poll() returns 100 records and auto-commit fires after you have processed 50 of them, the commit marks all 100 as done even though the last 50 aren't processed yet. A crash after the commit means those 50 records are silently skipped.
AUTO-COMMIT SILENT SKIP SCENARIO:
t=0s: poll() returns msg@0 to msg@99
Processing msg@0...msg@49 (slow processing)
t=5s: (background) auto-commit fires: commits offset=100
(Kafka thinks: consumer processed everything up to 100)
Processing continues: msg@50...
t=6s: CRASH at msg@73
Restart: resumes from committed offset=100
Skips msg@73 to msg@99 silently. DATA LOSS.
This is why auto-commit can lead to at-most-once, not at-least-once,
if your processing is slower than the commit interval.
To avoid this, either: (1) disable auto-commit and commit manually after processing, or (2) ensure your poll() records are always fully processed before the next poll() call fires the auto-commit.
Manual Synchronous Commit
Disabling auto-commit and using commitSync() gives you precise control. The consumer only commits after your code explicitly says processing is complete.
MANUAL SYNC COMMIT PATTERN:
props.put("enable.auto.commit", "false")
loop:
records = consumer.poll(1000)
for record in records:
process(record) ← your business logic
consumer.commitSync() ← blocks until commit acknowledged by broker
retries automatically on failure
# On crash between process() and commitSync():
Restart → re-reads unprocessed records (at-least-once)
# On crash after commitSync():
Restart → resumes from committed offset (no re-read)
Commit After Each Record vs After Each Batch
COMMIT AFTER EACH RECORD (most conservative):
for record in records:
process(record)
consumer.commitSync({record.topic_partition: record.offset + 1})
Benefit: Minimum re-processing window (at most 1 record on crash)
Cost: Lots of commit calls → higher broker load, lower throughput
COMMIT AFTER ENTIRE BATCH (most common):
for record in records:
process(record)
consumer.commitSync() ← one commit per batch
Benefit: Low commit overhead
Cost: On crash, re-processes entire last batch (up to max.poll.records)
Manual Asynchronous Commit
commitAsync() sends the commit request without waiting for the broker's response. Processing continues immediately while the commit happens in the background. This improves throughput because the consumer doesn't block waiting for commit ACKs.
ASYNC COMMIT PATTERN:
loop:
records = consumer.poll(1000)
for record in records:
process(record)
consumer.commitAsync(callback) ← non-blocking, continues immediately
callback fires when broker responds
def callback(offsets, exception):
if exception:
log.error("Commit failed for offsets: " + offsets)
# Do NOT retry here. commitAsync intentionally does not retry.
else:
log.debug("Committed: " + offsets)
Why commitAsync() Does Not Auto-Retry
This surprises many developers. If commitAsync fails, why doesn't it retry?
Consider this scenario: commitAsync sends commit for offset=100. It fails (network hiccup). Meanwhile, the consumer processes more records and commitAsync sends commit for offset=150. That succeeds. Now the retry for offset=100 fires. It succeeds and writes offset=100. The committed position just moved backward from 150 to 100. On next restart, the consumer re-reads offsets 100-150 that were already processed — unnecessary duplicates.
Kafka avoids this by not retrying commitAsync. If you need guaranteed commit delivery, use commitSync() — but only on shutdown, not in the hot path.
BEST PRACTICE: COMBINE ASYNC + SYNC
loop:
records = consumer.poll(1000)
for record in records:
process(record)
consumer.commitAsync() ← fast path, no blocking
on_shutdown():
try:
consumer.commitSync() ← final guaranteed commit on clean shutdown
finally:
consumer.close()
Async for throughput during normal operation.
Sync for correctness on shutdown. Best of both worlds.
Committing Specific Offsets
Both commitSync() and commitAsync() accept an optional map of topic-partition → offset. This lets you commit progress for specific partitions independently — useful when processing different partitions at different rates, or when you want to commit per-record for some partitions and per-batch for others.
COMMITTING SPECIFIC OFFSETS:
records = consumer.poll(1000)
currentOffsets = {}
for record in records:
process(record)
currentOffsets[TopicPartition(record.topic, record.partition)] = \
OffsetAndMetadata(record.offset + 1)
# Commit every 100 records regardless of batch size
if processed_count % 100 == 0:
consumer.commitAsync(currentOffsets, callback)
consumer.commitAsync(currentOffsets, callback) ← commit remainder
This commits per-100-records regardless of batch boundaries.
Granular control over the commit frequency per partition.
External Offset Storage: Custom Commit Strategies
Kafka stores offsets in its internal __consumer_offsets topic by default. But nothing stops you from storing offsets in an external system — a database, a file, or Redis. This is useful when you want to make message processing and offset storage atomic in the same transaction.
The Database Atomic Commit Pattern
ATOMIC PROCESSING + OFFSET STORAGE IN DATABASE:
Problem: Process record → update DB → commit Kafka offset
If crash between DB update and Kafka commit:
→ Consumer restarts from old Kafka offset
→ Re-processes records → DB gets duplicate updates
Solution: Store the Kafka offset IN the same DB transaction:
begin_db_transaction():
db.execute("INSERT INTO orders VALUES (?)", record.value)
db.execute("INSERT INTO kafka_offsets VALUES (?, ?, ?)",
record.topic, record.partition, record.offset + 1)
commit_db_transaction()
# Do NOT commit to Kafka's __consumer_offsets
# Use manual partition assignment + seek on startup instead:
on_startup():
consumer.assign(partitions)
for partition in partitions:
stored_offset = db.query("SELECT offset FROM kafka_offsets WHERE partition=?", partition)
if stored_offset:
consumer.seek(partition, stored_offset)
else:
consumer.seekToBeginning([partition])
Now: DB write + offset advance are atomic in one DB transaction.
Kafka crash between poll and the next DB transaction → re-reads but DB is idempotent.
Offset Reset Strategies
When you need to replay events or skip ahead in a topic, you reset consumer group offsets using the kafka-consumer-groups.sh tool. This only works when the consumer group is stopped — no active consumers.
RESETTING OFFSETS (consumer group must be stopped first): # Reset to earliest (replay everything from the beginning): bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group my-group \ --topic orders \ --reset-offsets \ --to-earliest \ --execute # Reset to a specific date/time (replay events from a given point): bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group my-group \ --topic orders \ --reset-offsets \ --to-datetime 2024-01-01T00:00:00.000 \ --execute # Reset to latest (skip all historical messages): bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group my-group \ --topic orders \ --reset-offsets \ --to-latest \ --execute # Shift offset by N messages (go back 1000 messages): bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group my-group \ --topic orders \ --reset-offsets \ --shift-by -1000 \ --execute
Rebalance Listeners: Saving State Before Partition Revocation
During a rebalance, partitions can be taken away from your consumer before you have committed their latest offsets. Kafka provides a rebalance listener interface so you can react to partition assignment changes — commit offsets right before a partition is revoked, and seek to the right position when a new partition is assigned.
REBALANCE LISTENER PATTERN (Java pseudocode):
class SaveOffsetsOnRebalance implements ConsumerRebalanceListener:
onPartitionsRevoked(partitions):
# Called BEFORE partitions are taken away
consumer.commitSync(currentOffsets) ← save progress immediately!
currentOffsets.clear()
onPartitionsAssigned(partitions):
# Called AFTER new partitions are assigned
for partition in partitions:
offset = externalStore.getOffset(partition)
consumer.seek(partition, offset)
consumer.subscribe(["orders"], SaveOffsetsOnRebalance())
Without this listener: pending uncommitted offsets lost on rebalance.
With this listener: offsets safely committed before partitions move.
Key Points
- Auto-commit (default) commits offsets every 5 seconds inside the poll() call. It provides at-least-once delivery but can cause silent data skips if processing is slower than the commit interval.
- Manual commitSync() blocks until the commit is acknowledged by the broker and retries on failure. Use it for guaranteed commits, especially on shutdown.
- Manual commitAsync() is non-blocking and faster. It does not retry on failure to avoid committing stale offsets. Use it in the hot path for throughput.
- Combine commitAsync() in the main loop with commitSync() on shutdown for the best balance of throughput and reliability.
- External offset storage (in a database) enables atomic processing and offset tracking in a single transaction, enabling exactly-once processing without Kafka transactions.
- Use kafka-consumer-groups.sh with --reset-offsets to replay events or skip ahead. The consumer group must be stopped before resetting.
- Implement ConsumerRebalanceListener to commit offsets before partitions are revoked during rebalances, preventing uncommitted progress loss.
