Kafka Exactly Once Semantics Delivering Messages

Exactly-once semantics (EOS) is the hardest delivery guarantee to achieve in any distributed system. Networks drop packets, brokers crash mid-write, and retries create duplicates. Kafka's exactly-once system combines idempotent producers, transactional writes, and consumer isolation levels to guarantee that each message is processed and its effects applied precisely one time — not zero, not two, but exactly one.

Why Exactly-Once Is Difficult

Consider a payment system: Consumer reads "charge user $100" from Kafka, calls the payment API, then crashes before committing its Kafka offset. On restart, the consumer re-reads the message and calls the payment API again. The user is charged $200. This is a duplicate that at-least-once delivery permits and that business logic cannot easily absorb.

The challenge is that "exactly once" must span three systems: reading from Kafka (consumer), processing (application), and writing results (to Kafka or another system). Each transition is a potential failure point. Kafka's EOS closes all three gaps when both the source and sink are Kafka topics.

The Two Components of Kafka EOS

Kafka's exactly-once guarantee requires two pieces working together: the idempotent producer (which prevents broker-side duplicates) and transactions (which make read-process-write atomic across partitions).

Idempotent Producer Review

The idempotent producer assigns each producer instance a Producer ID (PID) and tags each message with a monotonically increasing sequence number per partition. The broker uses these to deduplicate retries. If the producer retries a message that the broker already stored, the broker silently discards the duplicate and resends the ACK for the original.

The idempotent producer alone guarantees exactly-once to a single partition — but only for writes within a single producer session. It doesn't coordinate across multiple partitions or link the write to the prior read's offset commit.

Kafka Transactions

Transactions extend exactly-once to span multiple partitions and link the output writes to the input offset commit. A transaction groups a set of writes (to any number of topics/partitions) and an offset commit into a single atomic unit. Either everything commits or everything aborts — partial states are never visible to consumers.

TRANSACTION COMPONENTS:

1. Transactional Producer: sends writes + commits inside a transaction
2. Transaction Coordinator: a broker that manages the 2-phase commit protocol
3. Transaction Log: internal Kafka topic (__transaction_state) that records transaction state
4. Read-Committed Consumer: reads only committed transaction data

TRANSACTION FLOW:

[Consumer] reads from input-topic at offset 50
[Producer] beginTransaction()
[Producer] sends result to output-topic partition 0
[Producer] sends result to output-topic partition 1
[Producer] sendOffsetsToTransaction(committed_offset=51, group="my-group")
[Producer] commitTransaction()
              ↕ (2-phase commit via Transaction Coordinator)
All writes and offset become visible simultaneously.

If crash before commitTransaction():
  Transaction Coordinator marks transaction as ABORTED.
  Writes to output-topic are invisible to read_committed consumers.
  offset for my-group stays at 50 (not 51).
  Consumer re-reads from offset 50 on restart → idempotent re-processing.

Setting Up Exactly-Once in Kafka Streams

The easiest way to use Kafka EOS is through Kafka Streams, which handles the transaction lifecycle automatically. You simply set the processing guarantee to exactly-once.

KAFKA STREAMS EOS CONFIGURATION:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
          StreamsConfig.EXACTLY_ONCE_V2);   ← enables EOS automatically

// Kafka Streams handles:
// - transactional producer setup
// - offset commit inside transactions
// - consumer read_committed isolation
// - transaction abort on failure
// - restart and replay from correct position

EXACTLY_ONCE_V2 vs EXACTLY_ONCE:
  EXACTLY_ONCE (EOS v1): one transaction per task per commit interval. More overhead.
  EXACTLY_ONCE_V2 (EOS v2, Kafka 2.5+): shares transactions across tasks per thread.
                                          Lower overhead. Recommended.

Setting Up Exactly-Once with Raw Producer/Consumer API

For non-Streams applications (custom read-process-write loops), you must manage transactions manually.

EXACTLY-ONCE WITH RAW API (pseudocode):

# Producer setup:
props.put("transactional.id", "payment-processor-1")  ← unique per app instance
props.put("enable.idempotence", "true")                ← required for transactions

producer = KafkaProducer(props)
producer.initTransactions()    ← registers with transaction coordinator

# Consumer setup:
props.put("isolation.level", "read_committed")  ← skip uncommitted/aborted data
props.put("enable.auto.commit", "false")         ← manual control

consumer = KafkaConsumer(props)
consumer.subscribe(["input-topic"])

# Processing loop:
while True:
  records = consumer.poll(1000)
  if records.isEmpty(): continue
  
  try:
    producer.beginTransaction()
    
    for record in records:
      result = process(record)
      producer.send(ProducerRecord("output-topic", result))
    
    # Commit the consumer offset INSIDE the transaction:
    offsets = buildOffsetMap(records)
    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata())
    
    producer.commitTransaction()   ← atomic commit of all writes + offsets
    
  except Exception as e:
    producer.abortTransaction()    ← all writes rolled back, offsets unchanged
    # Consumer will re-read the same input records on next poll

The Transaction Coordinator and Two-Phase Commit

Behind the scenes, Kafka implements transactions using a two-phase commit protocol managed by a transaction coordinator broker. Every transactional write goes through this coordinator.

TWO-PHASE COMMIT INTERNALS:

Phase 1: Prepare
  Producer calls commitTransaction()
  Transaction Coordinator writes "PREPARE_COMMIT" to __transaction_state
  Coordinator sends commit markers to all involved partitions
  Each partition broker writes a transaction commit marker to its log

Phase 2: Commit
  If all partition markers written successfully:
    Coordinator writes "COMPLETE_COMMIT" to __transaction_state
    Transaction is committed
  If any partition marker fails:
    Coordinator writes "PREPARE_ABORT"
    Coordinator sends abort markers to all partitions
    Transaction is aborted

Crash Recovery:
  On coordinator restart: reads __transaction_state and resumes incomplete transactions
  On producer restart: initTransactions() contacts coordinator, coordinator rolls back incomplete transactions from previous session

Transactional ID and Fencing

The transactional.id is a string you configure on the producer that uniquely identifies this producer instance's role in the application. When a new producer with the same transactional.id starts, it receives a new epoch (generation number). The old producer instance (if still running — a zombie) gets fenced — its writes are rejected by the broker with an InvalidProducerEpoch error.

This fencing mechanism prevents zombie producers — stale instances that came back from a network partition and are trying to write outdated data — from corrupting the transaction log.

ZOMBIE FENCING:

Scenario: Producer instance A crashes. Instance B starts with same transactional.id.

Broker:  Epoch for "payment-processor-1" was 5 (Instance A's epoch).
         Instance B connects → broker assigns epoch 6.

Instance A recovers from network partition (didn't actually crash):
  Instance A tries to write with epoch 5.
  Broker rejects: "InvalidProducerEpoch. Current epoch is 6."
  Instance A is fenced → cannot write.
  
Only Instance B (epoch 6) can write. Data integrity maintained.

RULE: One transactional.id = one active producer at a time.

Consumer Isolation Levels

Exactly-once on the consumer side requires using isolation.level=read_committed. Without this, consumers can read messages from in-progress or aborted transactions, breaking the exactly-once guarantee.

ISOLATION LEVELS:

read_uncommitted (default):
  Consumer reads ALL messages, including:
  - Messages from committed transactions ✓
  - Messages from aborted transactions (should be invisible!) ✗
  - Messages from in-progress transactions (incomplete!) ✗
  Risk: Consumer processes data that gets rolled back. Inconsistency.

read_committed:
  Consumer reads ONLY:
  - Messages from committed transactions ✓
  - Non-transactional messages (produced without transactions) ✓
  Consumer SKIPS:
  - Messages from aborted transactions ✓
  - Messages from in-progress transactions (waits for commit/abort) ✓
  
  Trade-off: Consumer must wait for transactions to complete before
             reading those messages. Small added latency per transaction.

Exactly-Once in Practice: Performance Considerations

Exactly-once semantics come with overhead. Transactions add coordination steps (two-phase commit), and read_committed consumers have slightly higher read latency. The actual overhead varies, but benchmarks typically show a 20-40% throughput reduction compared to at-least-once. Plan your capacity accordingly if you require EOS.

EOS PERFORMANCE TRADE-OFFS:

Higher safety → lower raw throughput (for same hardware):

at-most-once (acks=0):            ~maximum throughput, highest risk
at-least-once (acks=1, retries):  ~90% of maximum, low risk
exactly-once (transactions):      ~60-80% of maximum, no duplicates/loss

For financial systems: throughput reduction is acceptable cost of correctness.
For high-volume analytics: at-least-once + idempotent consumers often sufficient.

Key Points

  • Exactly-once semantics guarantees each message is processed and its effect applied precisely one time — no loss, no duplicates.
  • EOS requires two components: idempotent producers (prevent broker-side duplicates) and transactions (atomic read-process-write across partitions).
  • The transactional.id uniquely identifies a producer's role. New instances fence stale zombie producers using epoch numbers.
  • sendOffsetsToTransaction() commits the consumer group offset inside the transaction, making the read and the write atomic.
  • Consumers must use isolation.level=read_committed to skip aborted and in-progress transaction messages.
  • Kafka Streams simplifies EOS with a single configuration: StreamsConfig.EXACTLY_ONCE_V2. It handles all transaction lifecycle management automatically.
  • EOS adds 20-40% throughput overhead. Use it where data correctness is non-negotiable. Use at-least-once with idempotent consumers for high-volume scenarios where duplicates can be tolerated or deduplicated.

Leave a Comment