Skip to main content

Kafka Exactly-Once Semantics (EOS)

:::info Who this guide is for


The Delivery Guarantee Problemโ€‹

Whenever a distributed system sends a message, three things can go wrong:

Producer Network Broker
โ”‚ โ”‚ โ”‚
โ”œโ”€โ”€ sends message โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–บโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–บ
โ”‚ โ”‚ โ”‚ stores message
โ”‚ โ”‚โ—„โ”€โ”€ ACK โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ โ”‚ โ† network drop โ†’ โ”‚
โ”‚ never receives ACK โ”‚ โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ What does the producer do?
โ”‚ Option A: give up โ†’ at-most-once (message may be lost)
โ”‚ Option B: retry โ†’ at-least-once (message may be duplicated)
โ”‚ Option C: exactly-once semantics needed

Delivery guarantee comparisonโ€‹

GuaranteeHow it worksRiskUse case
At-most-onceSend once, no retry on failureMessage loss possibleMetrics, telemetry where occasional loss is acceptable
At-least-onceRetry until ACK receivedDuplicate messages possibleMost business events โ€” handle duplicates in consumer
Exactly-onceMessage delivered and processed exactly one timeNeither loss nor duplicatesPayments, inventory updates, financial ledgers

The email analogyโ€‹

At-most-once:
You write an email, click Send.
If Gmail crashes mid-send: email is gone โ€” you never know it didn't arrive.
Trade-off: fast, but you might lose messages.

At-least-once:
You write an email, click Send. No delivery confirmation?
You send it again. And again. Until you get a receipt.
Trade-off: email arrives (maybe 3 times). Recipient must handle duplicates.

Exactly-once:
Gmail uses a transaction: the email is sent AND a delivery receipt is atomically
linked. If either fails, both are rolled back. The email arrives exactly once.
Trade-off: slowest, requires coordination โ€” but guaranteed correctness.

Why Exactly-Once is Hardโ€‹

In a distributed system, achieving exactly-once is non-trivial because failures happen between components at unpredictable moments:

Scenario: Payment service sends "charge user $100" to Kafka

Failure A โ€” producer crash after write, before ACK:
Broker stored the message โœ…
Producer retries โ†’ second "charge $100" sent โ†’ duplicate charge โŒ

Failure B โ€” broker stored message, consumer processed, consumer crashed before offset commit:
Consumer restarts โ†’ re-reads the same message โ†’ processes "charge $100" again โŒ

Failure C โ€” consumer processed, produced result to output topic, then crashed:
Offset not committed โ†’ consumer restarts โ†’ re-processes input โ†’ duplicate output โŒ

Failure D โ€” network partition splits producer from broker:
Producer thinks write failed โ†’ retries โ†’ broker may have stored both โŒ

Exactly-once requires solving all four failure scenarios simultaneously. Kafka's EOS does this through three cooperating mechanisms:

Three pillars of Kafka EOS:

1. Idempotent Producer โ†’ solves Failure A (deduplicates retries at the broker)
2. Transactions โ†’ solves Failures B, C, D (atomic multi-partition writes)
3. read_committed Consumer โ†’ prevents consumers from seeing uncommitted/aborted data

Pillar 1 โ€” Idempotent Producerโ€‹

The problem: duplicate writes from retriesโ€‹

Without idempotence:

Producer sends batch [msg1, msg2] with sequence=42
Broker stores [msg1, msg2], sends ACK
Network drops the ACK
Producer never receives ACK โ†’ retries
Broker stores [msg1, msg2] AGAIN โ†’ now msg1 and msg2 are duplicated โŒ

The solution: Producer ID + sequence numbersโ€‹

When enable.idempotence=true, the broker assigns each producer a PID (Producer ID) and each partition a monotonically increasing sequence number. The broker deduplicates based on (PID, partition, sequence):

Producer gets PID=101 from the broker (assigned at startup)

Producer sends: PID=101, partition=0, seq=42, data=[msg1, msg2]
Broker stores: seq=42 stored, ACK sent
Network drops ACK

Producer retries: PID=101, partition=0, seq=42, data=[msg1, msg2]
Broker sees: seq=42 already stored for PID=101, partition=0
โ†’ DUPLICATE DETECTED โ†’ ACK without storing again โœ…
No duplicate in the log!

What idempotence guarantees (and doesn't)โ€‹

โœ… Guarantees:
- No duplicates within a single producer session
- Preserves message ordering even with retries (seq numbers are in order)
- Works transparently โ€” no consumer-side changes needed

โŒ Does NOT guarantee:
- Exactly-once across multiple partitions (need transactions for that)
- Survival across producer restarts (PID resets on restart โ†’ new session)
- Exactly-once to external systems (DB, HTTP endpoints, etc.)

Idempotent producer configurationโ€‹

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// Enable idempotence โ€” automatically sets acks=all and retries=MAX_VALUE
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// These are set automatically by enable.idempotence=true, but explicit is clearer:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// max in-flight = 5 is safe for idempotent producers (broker reorders if needed)
// without idempotence: max in-flight must be 1 to preserve order on retry

KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

Pillar 2 โ€” Kafka Transactionsโ€‹

Idempotent producer alone only prevents retry duplicates within one partition of one session. Transactions provide:

  • Atomic writes across multiple partitions (all succeed or all are aborted)
  • Atomic pairing of output production + consumer offset commit (the consume-transform-produce pattern)

Transaction anatomyโ€‹

BEGIN TRANSACTION (producer registers with transaction coordinator)
โ”‚
โ”œโ”€โ”€ Write to partition orders-0: [msg: order-1 processed]
โ”œโ”€โ”€ Write to partition payments-0: [msg: payment-1 charged]
โ”œโ”€โ”€ Write to partition notifications-0: [msg: notify user-1]
โ”œโ”€โ”€ Commit consumer offset for raw-orders-0 (via sendOffsetsToTransaction)
โ”‚
COMMIT TRANSACTION
โ”‚
Transaction coordinator writes COMMIT marker to all affected partitions
Consumers with read_committed isolation can now see all messages atomically

How the transaction coordinator worksโ€‹

Kafka maintains an internal topic __transaction_state (default 50 partitions) that stores transaction state. Each producer with a transactional.id is assigned to one partition of __transaction_state โ€” its transaction coordinator is the broker that leads that partition.

Transaction flow:
1. initTransactions()
Producer contacts transaction coordinator
Gets assigned epoch E for transactional.id "payment-app-1"
Epoch fences any previous incarnation of this transactional.id (zombie fencing)

2. beginTransaction()
Local marker only โ€” no broker communication yet

3. send() calls
Producer registers each affected partition with the coordinator:
"I am writing to orders-0, payments-0, notifications-0"
Coordinator writes AddPartitionsToTxn to __transaction_state

4. sendOffsetsToTransaction()
Coordinator atomically includes consumer offset in the transaction

5. commitTransaction()
Producer sends COMMIT request to coordinator
Coordinator writes PREPARE_COMMIT to __transaction_state
Coordinator sends WriteTxnMarkers to each affected partition's leader
Each leader writes a COMMIT marker at the end of its log
Coordinator writes COMPLETE_COMMIT to __transaction_state
commitTransaction() returns to caller โ€” transaction is done

If failure at step 5 (before leaders receive markers):
Next initTransactions() by the same transactional.id detects incomplete state
Coordinator completes or aborts the pending transaction before starting new one

Full transactional producer exampleโ€‹

@Service
public class PaymentTransactionService {

private final KafkaProducer<String, Object> producer;

public PaymentTransactionService() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-" +
InetAddress.getLocalHost().getHostName()); // unique per instance!
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

this.producer = new KafkaProducer<>(props);
producer.initTransactions(); // registers with transaction coordinator; blocks until ready
}

// Process one payment: consume raw-payment โ†’ produce result + commit offset atomically
public void processPayment(ConsumerRecord<String, RawPayment> record,
KafkaConsumer<String, RawPayment> consumer) {
producer.beginTransaction();
try {
// 1. Business logic
ProcessedPayment result = chargeCard(record.value());

// 2. Produce output โ€” part of the transaction
producer.send(new ProducerRecord<>("processed-payments", record.key(), result));
producer.send(new ProducerRecord<>("payment-audit-log", record.key(), result));

// 3. Commit input offset ATOMICALLY with the output
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
// groupMetadata() is preferred over just groupId โ€” carries generation info
// for accurate zombie fencing (KIP-447)

// 4. Commit โ€” all sends and offset commit are atomic
producer.commitTransaction();

} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException e) {
// Fatal โ€” cannot recover; producer must be recreated
log.error("Fatal transaction error โ€” shutting down producer", e);
producer.close();
throw new RuntimeException("Unrecoverable producer error", e);

} catch (KafkaException e) {
// Transient โ€” abort and retry
log.warn("Transient error โ€” aborting transaction", e);
producer.abortTransaction();
// Caller should retry the processing of this record
throw e;
}
}
}

Pillar 3 โ€” read_committed Consumerโ€‹

The problem: reading uncommitted dataโ€‹

Without read_committed isolation, consumers read all messages as soon as they are appended to the log โ€” including messages from transactions that are later aborted:

t=0: Producer starts transaction, writes msg1 to orders-0 (in-flight, not committed)
t=1: Consumer reads orders-0 โ†’ sees msg1 (even though transaction not committed yet!)
t=2: Producer crashes โ†’ transaction aborted โ†’ coordinator writes ABORT marker
t=3: Consumer already processed msg1 โ†’ cannot undo the processing โŒ

With read_committed:
t=0: Producer writes msg1 (in-flight)
t=1: Consumer reads orders-0 โ†’ msg1 is withheld (pending transaction)
t=2: Producer commits โ†’ COMMIT marker written
t=3: Consumer now sees msg1 โ†’ processes it safely โœ…

OR:
t=2: Producer crashes โ†’ ABORT marker written
t=3: Consumer never sees msg1 โ†’ skips it entirely โœ…

Last Stable Offset (LSO)โ€‹

With read_committed, consumers can only read up to the Last Stable Offset (LSO) โ€” the offset just below the earliest open (uncommitted) transaction:

Partition log:
Offset 0: [msg-a, committed]
Offset 1: [msg-b, committed]
Offset 2: [msg-c, transaction TXN-1, in-flight] โ† LSO is here
Offset 3: [msg-d, committed]
Offset 4: [msg-e, transaction TXN-1, in-flight]
Offset 5: [msg-f, committed]

read_committed consumer can read: offsets 0, 1 only
Offsets 3, 5 are "hidden" behind the stalled LSO from TXN-1

If TXN-1 takes 10 seconds to commit:
read_committed consumers are stuck for 10 seconds โ† LATENCY IMPACT
Monitored via: consumer.metrics()["records-lag-max"]

Consumer configurationโ€‹

Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-processor");

// Critical for EOS: only read committed transaction data
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

// Critical for EOS: manual offset commit (offset committed via sendOffsetsToTransaction)
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

// Group metadata needed for accurate fencing (KIP-447)
consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "payment-processor-instance-1");
// Static membership โ€” reduces rebalances; useful for long-running EOS consumers

The Consume-Transform-Produce Patternโ€‹

This is the core EOS use case: read from Kafka โ†’ transform โ†’ write back to Kafka, exactly-once.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ One Transaction Boundary โ”‚
โ”‚ โ”‚
โ”‚ Consumer reads record from "raw-orders" โ”‚
โ”‚ offset 42 on partition 0 โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ Business logic (transform/enrich/validate) โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ Producer sends to "processed-orders" (part of transaction) โ”‚
โ”‚ Producer sends to "order-audit" (part of transaction) โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ sendOffsetsToTransaction: โ”‚
โ”‚ raw-orders-0 โ†’ offset 43 (committed IN the transaction) โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ commitTransaction() โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–บ โ”‚
โ”‚ Atomically: โ”‚
โ”‚ โœ… processed-orders message visible to consumers โ”‚
โ”‚ โœ… order-audit message visible to consumers โ”‚
โ”‚ โœ… raw-orders offset advanced to 43 โ”‚
โ”‚ โ”‚
โ”‚ On failure before commit: โ”‚
โ”‚ abortTransaction() โ”‚
โ”‚ โœ… processed-orders message NOT visible (rolled back) โ”‚
โ”‚ โœ… order-audit message NOT visible (rolled back) โ”‚
โ”‚ โœ… offset stays at 42 (record will be re-processed) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Spring Kafka EOS implementationโ€‹

@Configuration
public class KafkaEosConfig {

@Bean
public ProducerFactory<String, Object> eosProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-");
// Spring appends a suffix per listener thread to make it unique
return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, Object> eosKafkaTemplate(
ProducerFactory<String, Object> factory) {
return new KafkaTemplate<>(factory);
}

@Bean
public ConsumerFactory<String, RawOrder> eosConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, RawOrder> eosListenerFactory(
ConsumerFactory<String, RawOrder> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {

ConcurrentKafkaListenerContainerFactory<String, RawOrder> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);

// V2 mode: one producer per listener thread (better than V1's per-partition producer)
factory.getContainerProperties().setEosMode(ContainerProperties.EOSMode.V2);

// Link template to container so Spring manages transactions automatically
factory.getContainerProperties().setKafkaTemplate(kafkaTemplate);

return factory;
}
}

@Service
@RequiredArgsConstructor
public class OrderProcessingService {

private final KafkaTemplate<String, Object> kafkaTemplate;

@KafkaListener(
topics = "raw-orders",
groupId = "order-processor",
containerFactory = "eosListenerFactory"
)
// Spring Kafka automatically wraps this method in a Kafka transaction
// when EOSMode is configured โ€” no manual beginTransaction/commitTransaction needed
public void process(ConsumerRecord<String, RawOrder> record) {

// Business logic
ProcessedOrder processed = transformOrder(record.value());

// Send output โ€” automatically part of the transaction managed by Spring
kafkaTemplate.send("processed-orders", record.key(), processed);
kafkaTemplate.send("order-audit-log", record.key(), new AuditEvent(processed));

// Spring automatically calls sendOffsetsToTransaction and commitTransaction
// If this method throws an exception: Spring abortTransaction() automatically
}

private ProcessedOrder transformOrder(RawOrder raw) {
// Business transformation logic
return new ProcessedOrder(raw.orderId(), raw.userId(),
calculateTotal(raw.items()), "PROCESSING");
}
}

EOSMode V1 vs V2โ€‹

V1 (ALPHA)V2 (BETA โ€” default since Spring Kafka 2.6)
Producer perConsumer group + topic + partitionListener container thread
PerformanceMore producers = more overheadFewer producers = better
Supported sinceSpring Kafka 2.3Spring Kafka 2.6
RecommendationLegacy onlyโœ… Always use V2

Kafka Streams EOSโ€‹

Kafka Streams makes exactly-once much simpler โ€” it manages the transaction lifecycle automatically.

Enabling EOS in Kafka Streamsโ€‹

Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "payment-stream-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");

// Enable exactly-once (V2 is the recommended mode, requires Kafka 2.5+)
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2); // "exactly_once_v2"

// Optional: tune commit interval (default 100ms for EOS)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

// Optional: reduce standby replicas for faster failover
streamsProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, RawPayment> rawPayments = builder.stream("raw-payments");

rawPayments
.filter((key, payment) -> payment.getAmount().compareTo(BigDecimal.ZERO) > 0)
.mapValues(PaymentProcessor::process)
.peek((key, processed) -> log.info("Processing payment {}", key))
.to("processed-payments");

// Kafka Streams automatically:
// - Wraps each poll-process-produce cycle in a Kafka transaction
// - Commits consumer offsets via sendOffsetsToTransaction
// - Aborts transaction and retries on failure
// - Handles producer fencing across task migrations
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
streams.start();

What Kafka Streams EOS manages automaticallyโ€‹

For each stream task (one task per partition assigned to this instance):

Internal producer transactional.id:
"{application.id}-{thread.client.id}-{partition}"
e.g. "payment-stream-app-StreamThread-1-0"

Every 100ms (commit.interval.ms):
1. beginTransaction() โ€” on the task's producer
2. Process all records received in this poll cycle
3. Send all output records to output topics
4. sendOffsetsToTransaction() โ€” for all input offsets consumed
5. commitTransaction() โ€” atomically visible to downstream consumers

On task migration (rebalance):
New instance uses same transactional.id โ†’ fences the old instance's producer
Old instance's pending transaction is aborted
New instance starts fresh โ†’ re-processes from last committed offset

Exactly-once V1 vs V2 in Kafka Streamsโ€‹

exactly_once (V1)exactly_once_v2 (V2)
Producer perThreadThread (same as V1 externally)
Internal mechanismKIP-98KIP-447 (uses consumer group metadata for fencing)
Requires Kafka0.11+2.5+
PerformanceBaseline~20% better throughput
Zombie fencingtransactional.id basedtransactional.id + consumer generation
RecommendationLegacyโœ… Use V2 for new systems

Zombie Producer Fencingโ€‹

One of the most subtle EOS challenges: what happens when a producer instance is presumed dead but then comes back (a "zombie")?

The zombie scenarioโ€‹

t=0: Producer instance A (transactional.id="payment-app-1") is healthy
t=1: A starts transaction, writes partial output, hangs (GC pause, network issue)
t=2: Kubernetes restarts A (timeout exceeded) โ†’ new instance A' starts
t=3: A' calls initTransactions() โ†’ transaction coordinator assigns new epoch E+1
t=4: A' starts normal processing...
t=5: Old A wakes up (GC pause ended), still has epoch E
A tries to continue its transaction
โ†’ Broker rejects: "ProducerFencedException: producer epoch is not current"
โ†’ A cannot write anything โ€” zombie is fenced โœ…
t=6: A' processes and commits correctly
Epoch mechanism:
transactional.id = "payment-app-1"
Version 1: epoch=0 โ†’ registered by A
Version 2: epoch=1 โ†’ registered by A' (A is now fenced at epoch 0)
Version 3: epoch=2 โ†’ registered by A'' (A' is now fenced at epoch 1)

Any producer with an outdated epoch for a given transactional.id is immediately rejected.

Transactional ID uniqueness is criticalโ€‹

// โŒ WRONG: all instances share the same transactional.id
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor");
// When two instances start simultaneously: each fences the other โ†’ neither can produce!

// โœ… CORRECT: unique per instance
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"payment-processor-" + InetAddress.getLocalHost().getHostName());
// or:
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"payment-processor-" + System.getenv("POD_NAME"));
// or: use a UUID stored in a durable store to survive restarts

KIP-447 โ€” consumer group metadata for better fencingโ€‹

The original zombie fencing (KIP-98) only used the transactional.id. This missed a window: what if the zombie completes its transaction before the new instance registers?

KIP-447 adds consumer group generation to fencing. sendOffsetsToTransaction(offsets, consumer.groupMetadata()) includes the consumer group's current generation. If a zombie tries to commit offsets from an old consumer generation, the broker rejects it even if the epoch hasn't been bumped yet.

// โœ… Preferred: pass full consumer group metadata (KIP-447, Kafka 2.5+)
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

// โŒ Legacy: only passes group ID, no generation info
producer.sendOffsetsToTransaction(offsets, "my-consumer-group");

EOS Internals: Two-Phase Commitโ€‹

Kafka's transaction protocol is a form of two-phase commit (2PC) adapted for a distributed log:

Phase 1 โ€” Prepare:
Transaction coordinator receives commitTransaction() from producer
Coordinator writes "PREPARE_COMMIT txn=TXN-1" to __transaction_state
This is durable โ€” even if the coordinator crashes, the commit will complete

Phase 2 โ€” Commit:
Coordinator sends WriteTxnMarkers RPC to all affected partition leaders:
โ†’ orders-0 leader: write COMMIT marker at offset 45
โ†’ payments-0 leader: write COMMIT marker at offset 12
โ†’ __consumer_offsets-0: write COMMIT marker for offset commit
Each broker acknowledges
Coordinator writes "COMPLETE_COMMIT txn=TXN-1" to __transaction_state

If coordinator crashes between Phase 1 and Phase 2:
New coordinator reads PREPARE_COMMIT from __transaction_state
Completes Phase 2 automatically โ€” the transaction commits eventually

If coordinator crashes before Phase 1:
Transaction was never prepared โ€” aborted automatically after transaction.timeout.ms

Transaction timeoutโ€‹

# Producer:
transaction.timeout.ms=60000 # default 60s โ€” abort if transaction takes longer
# Keep short: long transactions delay LSO โ†’ consumer lag

# Broker:
transaction.max.timeout.ms=900000 # broker rejects transactions with longer timeout

EOS Limitationsโ€‹

EOS is Kafka-internal onlyโ€‹

โœ… EOS covers:
Producer โ†’ Broker (idempotent writes)
Broker โ†’ Broker (transactional replication)
Consumer offset commit โ†” Producer write (atomic via sendOffsetsToTransaction)

โŒ EOS does NOT cover:
Kafka โ†’ External Database โ† you need idempotent writes or 2PC at the DB level
Kafka โ†’ REST API calls โ† external calls inside a transaction are NOT rolled back
Kafka โ†’ File system writes โ† files written before transaction abort are NOT undone
// โŒ DANGEROUS: side effects outside the transaction
@KafkaListener(topics = "raw-orders")
public void process(RawOrder order) {
kafkaTemplate.executeInTransaction(t -> {
// This DB write is NOT part of the Kafka transaction
// If the Kafka transaction aborts, the DB write is NOT rolled back โŒ
orderRepository.save(new Order(order));

t.send("processed-orders", order.getId(), processOrder(order));
return null;
});
}

// โœ… Better: idempotent DB write keyed on order ID
@KafkaListener(topics = "raw-orders")
public void process(RawOrder order) {
kafkaTemplate.executeInTransaction(t -> {
// Upsert: INSERT ... ON CONFLICT DO UPDATE
// Safe to retry because duplicate calls produce the same result
orderRepository.upsertByExternalId(order.getId(), new Order(order));

t.send("processed-orders", order.getId(), processOrder(order));
return null;
});
}

Non-idempotent operations inside transactionsโ€‹

// โŒ WRONG: sending an email is NOT idempotent โ€” abort doesn't un-send it
@KafkaListener(topics = "raw-orders")
public void process(RawOrder order) {
kafkaTemplate.executeInTransaction(t -> {
t.send("processed-orders", order.getId(), processOrder(order));
emailService.sendConfirmation(order.getUserEmail()); // if txn aborts, email already sent!
return null;
});
}

// โœ… Correct pattern: publish an event for the email, let a separate service send it
@KafkaListener(topics = "raw-orders")
public void process(RawOrder order) {
kafkaTemplate.executeInTransaction(t -> {
t.send("processed-orders", order.getId(), processOrder(order));
t.send("email-notifications", order.getUserEmail(), new EmailEvent(order));
// Email notification is transactional โ€” only sent if this transaction commits
return null;
});
}

Failure Scenariosโ€‹

Scenario 1 โ€” Producer crashes mid-transactionโ€‹

t=0: Producer starts transaction (epoch=3, txn.id="payment-app-1")
t=1: Producer writes to processed-payments-0
t=2: Producer writes to payment-audit-0
t=3: Producer CRASHES (no commitTransaction called)

What happens:
Transaction coordinator has registered the transaction as OPEN
After transaction.timeout.ms (60s default): coordinator marks it TIMED_OUT
Coordinator sends WriteTxnMarkers to all affected partitions: ABORT
Partitions write ABORT markers
read_committed consumers never see the partial writes โœ…

When producer restarts:
initTransactions() contacts coordinator
Coordinator sees previous incomplete transaction โ†’ cleans it up
Assigns epoch=4 โ†’ producer proceeds with fresh state

Scenario 2 โ€” Consumer rebalance during transactionโ€‹

t=0: Consumer group has 2 members: C1 (owns partition 0), C2 (owns partition 1)
t=1: C1 is processing partition 0, record offset=42
C1 is in the middle of a transaction...
t=2: C2 crashes โ†’ rebalance triggers
Partition 1 is reassigned to C1 (now C1 owns 0 and 1)
Partition 0 is still owned by C1 โ€” no change

Impact: Kafka Streams handles this automatically.
For manual consumer/producer: avoid long transactions across rebalances.
Use consumer.poll() timeout < max.poll.interval.ms to prevent heartbeat timeout.

If C1 itself crashes during rebalance:
Both partitions assigned to C3
C3 uses same transactional.id as C1 โ†’ fences C1's zombie (if it recovers)
C3 re-processes from last committed offset

Scenario 3 โ€” Broker crash with pending transactionโ€‹

t=0: Producer sends to broker-1 (leader of processed-payments-0)
t=1: Broker-1 stores the record but crashes before sending ACK
The record is also on broker-2 and broker-3 (replicas)
t=2: Leader election: broker-2 becomes leader of processed-payments-0
t=3: Producer retries โ†’ broker-2 receives the record again
Producer uses same sequence number โ†’ DUPLICATE DETECTED โ†’ ack without storing โœ…
Idempotent producer deduplication works across leader switches!
t=4: Producer continues transaction normally

Performance Considerationsโ€‹

EOS overhead analysisโ€‹

ComponentOverheadCause
Idempotent producer~3โ€“5%Sequence number tracking, retry deduplication
Transactions (commit)2 extra broker round tripsPREPARE_COMMIT + WriteTxnMarkers
read_committed consumerVariable LSO lagWaiting for transaction commit before exposing records
Total EOS overhead~10โ€“20% throughput reductionvs at-least-once baseline

Performance tuningโ€‹

// โ”€โ”€ Batch multiple messages in one transaction โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
// Anti-pattern: one transaction per message
for (RawOrder order : orders) {
producer.beginTransaction();
producer.send(...);
producer.commitTransaction(); // 2 round trips ร— 1000 orders = 2000 round trips โŒ
}

// Best practice: batch within a transaction
producer.beginTransaction();
for (RawOrder order : orders) {
producer.send(...); // all sends buffered, sent in one batch
}
producer.commitTransaction(); // 2 round trips for 1000 messages โœ…
# Producer tuning for EOS
linger.ms=10 # wait 10ms to fill batches (reduces commit overhead)
batch.size=65536 # 64KB batch size (larger = fewer commits needed)
compression.type=lz4 # compress batches โ€” smaller network payload per commit

# Kafka Streams: tune commit interval
commit.interval.ms=100 # default for EOS (100ms = 10 commits/sec)
# Increasing to 500ms: fewer commits, higher throughput, more reprocessing on failure
# commit.interval.ms=500

When to use EOS vs at-least-once + idempotent consumerโ€‹

ScenarioChooseWhy
Financial: charge cards, transfer moneyEOSDuplicates cause real financial harm
Inventory: reserve/release stockEOSStock levels must be exact
Analytics: count events for a dashboardAt-least-once + idempotent consumerSlightly off count is acceptable; avoid EOS overhead
Log aggregation: ship logs to ElasticsearchAt-least-onceDuplicate logs are tolerable; EOS adds latency
Kafka to DB with unique constraintsAt-least-once + idempotent writesINSERT ... ON CONFLICT DO NOTHING is cheaper than EOS
Kafka Streams pure topologyEOS (exactly_once_v2)Built-in; low overhead with Streams

Broker & Topic Configurationโ€‹

# โ”€โ”€ Transaction coordinator topic โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
transaction.state.log.replication.factor=3 # 3 replicas for durability
transaction.state.log.min.isr=2 # require 2 ISR for transaction commits
transaction.state.log.num.partitions=50 # default โ€” enough for most clusters

# โ”€โ”€ Consumer offset topic โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
offsets.topic.replication.factor=3
offsets.topic.num.partitions=50

# โ”€โ”€ Per-broker โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
unclean.leader.election.enable=false # never elect an out-of-sync replica as leader
min.insync.replicas=2 # require 2 ISR for writes (must be < RF)
default.replication.factor=3 # all new topics default to RF=3

# โ”€โ”€ Transaction timeouts โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
transaction.max.timeout.ms=900000 # max allowed transaction.timeout.ms (15 min)
transaction.abort.timed.out.transaction.cleanup.interval.ms=10000 # check every 10s

Complete EOS Configuration Cheat Sheetโ€‹

# โ”€โ”€ Producer โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
enable.idempotence=true
transactional.id=<app-name>-<unique-suffix> # unique per producer instance
acks=all
retries=2147483647 # Integer.MAX_VALUE
max.in.flight.requests.per.connection=5
transaction.timeout.ms=30000 # 30s โ€” fail fast on stall

# โ”€โ”€ Consumer โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
isolation.level=read_committed
enable.auto.commit=false
group.id=<consumer-group>

# โ”€โ”€ Kafka Streams โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
processing.guarantee=exactly_once_v2
commit.interval.ms=100
replication.factor=3

# โ”€โ”€ Broker โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
min.insync.replicas=2
unclean.leader.election.enable=false

Testing EOSโ€‹

@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"raw-orders", "processed-orders"},
brokerProperties = {
"transaction.state.log.replication.factor=1", // single broker for tests
"transaction.state.log.min.isr=1"
}
)
class EosOrderProcessingTest {

@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired private OrderProcessingService orderService;

@Test
void processedOrderIsVisibleOnlyAfterCommit() throws Exception {
// Arrange: send a raw order
kafkaTemplate.send("raw-orders", "order-1", new RawOrder("order-1", "user-1", 99.99));

// Act: let the service process it
Thread.sleep(2000);

// Assert: result is visible to read_committed consumer
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-verifier");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, ProcessedOrder> verifier = new KafkaConsumer<>(consumerProps);
verifier.subscribe(List.of("processed-orders"));

ConsumerRecords<String, ProcessedOrder> records = verifier.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(1);
assertThat(records.iterator().next().value().orderId()).isEqualTo("order-1");
}

@Test
void duplicateRawOrderProducesExactlyOneProcessedOrder() throws Exception {
// Simulate retry by sending the same raw order twice
RawOrder order = new RawOrder("order-42", "user-5", 149.99);
kafkaTemplate.send("raw-orders", "order-42", order);
kafkaTemplate.send("raw-orders", "order-42", order); // "duplicate" from retry

Thread.sleep(3000);

// Consumer should produce exactly one processed-orders result
// (service must be idempotent on order ID โ€” upsert, not insert)
List<ProcessedOrder> results = consumeAll("processed-orders", 5);
long order42Count = results.stream()
.filter(r -> r.orderId().equals("order-42"))
.count();
assertThat(order42Count).isEqualTo(1L); // exactly once, not twice
}
}

Common Mistakesโ€‹

MistakeProblemFix
Shared transactional.id across instancesAll instances fence each other โ†’ none can produceUse unique ID per instance (hostname, POD_NAME)
enable.auto.commit=true with EOSAuto-commit races with sendOffsetsToTransaction โ†’ double commit โ†’ undefined behaviourAlways enable.auto.commit=false with EOS
isolation.level=read_uncommitted on EOS consumerConsumer reads aborted transaction data โ€” sees phantom recordsAlways read_committed for EOS consumers
Side effects (HTTP calls, DB writes) inside transactionExternal calls not rolled back on transaction abortKeep side effects outside; publish events inside the transaction for another service to handle
transaction.timeout.ms too longStalled transaction holds LSO โ†’ consumer lag growsSet to 30s; fix slow processing rather than extending timeout
Not handling ProducerFencedExceptionOld zombie producer tries to continue โ†’ exception is swallowed โ†’ data lossCatch and fail hard โ€” recreate the producer on this error
exactly_once instead of exactly_once_v2 in Kafka StreamsLegacy mode โ€” lower throughput, weaker fencingUse exactly_once_v2 (requires Kafka 2.5+)
Calling System.gc() or long pauses in producer threadGC pause > max.poll.interval.ms or transaction.timeout.ms โ†’ fenced as zombieUse ZGC or Shenandoah; keep producer thread free of heavy computation
Using EOS for every topic regardless of need10โ€“20% throughput overhead on all topicsApply EOS only to topics that genuinely require it (financial data, inventory)
Not monitoring transaction abort rateSilent data quality issues if aborts spikeAlert on kafka.producer.metrics.transaction-abort-rate > 0

๐ŸŽฏ Interview Questionsโ€‹

Q1. What are the three delivery guarantees in Kafka, and when do you choose each?

At-most-once: producer sends once, no retry on failure โ€” message may be lost. Use for metrics and telemetry where some loss is acceptable. At-least-once: producer retries until ACK received โ€” message may be duplicated. Use for most business events where the consumer can handle duplicates (idempotent writes). Exactly-once: message delivered and processed precisely one time, no loss or duplicates. Use for financial transactions, inventory, and any case where duplicates cause real business harm. EOS carries ~10โ€“20% throughput overhead so apply it selectively.

Q2. What are the three pillars required for end-to-end exactly-once in Kafka?

(1) Idempotent producer (enable.idempotence=true): the broker assigns each producer a PID and tracks sequence numbers per partition, deduplicating retries without storing them twice. (2) Transactions (transactional.id + beginTransaction/commitTransaction): enables atomic writes across multiple partitions and atomically pairs output production with consumer offset commits via sendOffsetsToTransaction. (3) read_committed consumer isolation: consumers only see records from committed transactions โ€” records from in-progress or aborted transactions are withheld until the transaction resolves.

Q3. What is sendOffsetsToTransaction and why is it the key to EOS?

sendOffsetsToTransaction includes consumer offset advances inside the current Kafka transaction. This makes input offset consumption and output production atomic: if the transaction commits, both the output records AND the consumer offsets advance together. If the transaction aborts, neither is visible โ€” the message will be re-processed. Without this atomic pairing, you could commit output but fail to advance the offset (causing reprocessing) or advance the offset but lose the output (causing data loss). It is the bridge between the consumer and producer sides of the consume-transform-produce pattern.

Q4. What is zombie producer fencing and why is it necessary for EOS?

A zombie producer is an old producer instance that was presumed dead (timeout, GC pause, network partition) but later recovers. Without fencing, the zombie could complete a transaction that conflicts with work already done by the new instance. Kafka fences zombies using an epoch: every time a producer with a given transactional.id calls initTransactions(), the broker assigns a new epoch (incrementing the previous one). Any produce or commit request with an old epoch is rejected with ProducerFencedException. KIP-447 strengthens this further by including consumer group generation in sendOffsetsToTransaction โ€” rejecting old generations even if the epoch hasn't been bumped yet.

Q5. How does Kafka Streams implement EOS compared to manual consumer/producer?

Kafka Streams automatically wraps each poll-process-produce cycle in a Kafka transaction. For each stream task, it creates an internal producer with a transactional.id derived from the application ID and partition ({app-id}-{thread}-{partition}). Every commit.interval.ms (default 100ms): it calls beginTransaction, processes accumulated records, sends outputs, calls sendOffsetsToTransaction for all consumed offsets, and commitTransaction. Developers just set processing.guarantee=exactly_once_v2 โ€” no manual transaction management. On task migration (rebalance), the new owner uses the same transactional.id, fencing the old owner's zombie producer automatically.

Q6. Why doesn't Kafka EOS cover writes to external systems (like a database)?

Kafka's transaction protocol uses the Kafka broker as the coordinator โ€” it can atomically write to multiple Kafka partitions because all operations go through the Kafka transaction protocol. External systems (databases, REST APIs, file systems) are not participants in the Kafka transaction. If you write to a database inside a Kafka transaction and the transaction aborts, the Kafka broker rolls back the Kafka writes but has no mechanism to roll back the database write. Solutions: (1) make database writes idempotent (INSERT ... ON CONFLICT DO UPDATE) so retries produce the same result; (2) use the outbox pattern โ€” write to a Kafka topic within the transaction, let a separate service consume from that topic and write to the database; (3) use a distributed transaction coordinator (expensive, complex).

Q7. (Senior) What happens to consumers when a long-running transaction stalls the Last Stable Offset?

The LSO (Last Stable Offset) is the offset below the earliest open transaction on a partition. read_committed consumers can only read up to the LSO โ€” messages at higher offsets are withheld even if they are from committed transactions, because the LSO cannot advance past an open transaction. A stalled transaction (slow producer, long processing time, GC pause, deadlock) holds the LSO in place. Consumers appear stuck โ€” their lag grows even though no new messages need processing. This is particularly dangerous because committed messages from other producers at higher offsets are also hidden. Mitigation: set transaction.timeout.ms short (30s); monitor kafka_consumer_group_lag and alert on unexpected growth; use read_uncommitted for non-EOS consumers that don't need the guarantee (they won't be affected by LSO).


See Alsoโ€‹