Configuring Exactly-Once Semantics in Kafka
Exactly-once semantics (EOS) ensures that every message is delivered and processed exactly once — no duplicates, no data loss. This guide covers how to configure EOS across producers, consumers, Kafka Streams, and Kafka Connect.
1. Understanding Delivery Guarantees
| Guarantee | Behavior | Risk |
|---|---|---|
| At-most-once | Fire and forget; offsets committed early | Data loss |
| At-least-once | Offsets committed after processing | Duplicate processing |
| Exactly-once | Atomic writes + offset commits | None (highest cost) |
Exactly-once is achieved by combining two Kafka features:
- Idempotent producers — prevent duplicate writes at the broker level
- Transactions — atomically write to multiple partitions and commit offsets
2. Idempotent Producers
An idempotent producer ensures that retries do not create duplicate messages in a partition.
How It Works
- The broker assigns a Producer ID (PID) and tracks a sequence number per partition.
- If a producer retries a send, the broker detects the duplicate sequence number and deduplicates it.
- This guarantees exactly-once delivery for a single producer session to a single partition.
Configuration
# Enable idempotency (implied by enable.idempotence=true)
enable.idempotence=true
# These are automatically set when idempotence is enabled:
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Limitations
- Idempotency alone only prevents duplicates from producer retries.
- It does not protect against application-level replays (e.g., restarting and resending).
- For cross-partition and cross-topic exactly-once, you need transactions.
3. Transactional Producer
Transactions enable atomic writes across multiple partitions and topics, including consumer offset commits.
Configuration
enable.idempotence=true
transactional.id=my-transactional-app-01
transactional.idmust be unique per producer instance and stable across restarts. Kafka uses it to fence zombie producers (previous instances that are still alive after a rebalance).
Transactional Write Pattern
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-01");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Initialize transactions (call once)
producer.initTransactions();
try {
producer.beginTransaction();
// Send to multiple topics/partitions atomically
producer.send(new ProducerRecord<>("output-topic-1", "key1", "value1"));
producer.send(new ProducerRecord<>("output-topic-2", "key2", "value2"));
// Commit consumer offsets as part of the transaction
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(
new TopicPartition("input-topic", 0),
new OffsetAndMetadata(currentOffset + 1)
);
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
// Fatal — cannot recover, must close
producer.close();
} catch (KafkaException e) {
// Abort and retry
producer.abortTransaction();
}
Transaction Lifecycle
initTransactions()
│
▼
beginTransaction()
│
├── send(record1)
├── send(record2)
├── sendOffsetsToTransaction(offsets, groupMetadata)
│
▼
commitTransaction() ──or── abortTransaction()
4. Transactional Consumer
To participate in exactly-once, the consumer must only read committed messages.
Configuration
group.id=my-consumer-group
isolation.level=read_committed
enable.auto.commit=false
| Property | Value | Why |
|---|---|---|
isolation.level | read_committed | Skip uncommitted (in-flight) transactional messages |
enable.auto.commit | false | Offsets are committed by the transactional producer |
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
5. Complete Consume-Transform-Produce Pattern
This is the full exactly-once pattern: consume → process → produce + commit offsets atomically.
KafkaConsumer<String, String> consumer = createTransactionalConsumer();
KafkaProducer<String, String> producer = createTransactionalProducer();
producer.initTransactions();
consumer.subscribe(List.of("input-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// Transform
String result = process(record.value());
// Produce to output topic
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
// Track offsets
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// Atomically commit offsets + produced records
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
producer.close();
break;
} catch (KafkaException e) {
producer.abortTransaction();
}
}
6. Exactly-Once in Kafka Streams
Kafka Streams makes exactly-once trivial to enable — it handles all the transactional plumbing internally.
Configuration
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Enable exactly-once (use "exactly_once_v2" for Kafka 3.0+)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_v2");
| Value | Kafka Version | Description |
|---|---|---|
at_least_once | All | Default; possible duplicates |
exactly_once | 0.11–2.x | Original EOS (one producer per task) |
exactly_once_v2 | 2.6+ (recommended for 3.0+) | Optimized EOS (fewer producers, less overhead) |
Always use
exactly_once_v2on Kafka 2.6+. The originalexactly_onceis deprecated.
What Kafka Streams Does Internally
When exactly_once_v2 is enabled, Kafka Streams:
- Uses a transactional producer per stream thread
- Atomically writes output records and commits consumer offsets
- Atomically flushes changelog (state store) updates
- Fences zombie instances using
transactional.idderived fromapplication.id
7. Exactly-Once in Kafka Connect
Source Connectors
Kafka Connect supports exactly-once source connectors (Kafka 3.3+):
# In connect-distributed.properties
exactly.once.source.support=enabled
# Per-connector override
exactly.once.support=required
transaction.boundary=poll # or "connector" or "interval"
transaction.boundary | Behavior |
|---|---|
poll | One transaction per poll() call (default) |
connector | One global transaction per connector |
interval | Transaction committed at configured interval |
Sink Connectors
Sink connectors achieve exactly-once by:
- Setting
consumer.override.isolation.level=read_committed - Implementing idempotent writes in the sink system (e.g., upserts with primary keys)
8. Broker Configuration for EOS
Ensure your broker cluster is configured to support transactions:
# Minimum ISR for transactional internal topics
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
# Transaction timeout
transaction.max.timeout.ms=900000
# Enable unclean leader election = false (default, keep it)
unclean.leader.election.enable=false
# Minimum in-sync replicas for data topics
min.insync.replicas=2
Cluster requirements:
- At least 3 brokers (for replication factor 3)
min.insync.replicas=2on data topics to prevent data loss withacks=all- The
__transaction_stateinternal topic must be healthy
9. Performance Considerations
Exactly-once introduces overhead. Understand the trade-offs:
| Aspect | Impact |
|---|---|
| Latency | Slightly higher due to transaction commit overhead |
| Throughput | ~3-20% lower (varies by workload) |
| Broker load | Additional transaction coordinator work |
| Producer memory | Transaction buffers require more memory |
Tuning Tips
# Increase batch size to amortize transaction overhead
batch.size=65536
linger.ms=10
# Kafka Streams: increase commit interval (fewer, larger transactions)
commit.interval.ms=100 # default is 100ms for EOS
# Transaction timeout (increase if processing is slow)
transaction.timeout.ms=60000
10. Common Pitfalls
1. Forgetting isolation.level=read_committed
Without this, consumers read uncommitted messages — breaking the exactly-once guarantee.
2. Non-unique transactional.id
If two active producers share the same transactional.id, one will be fenced (killed). Ensure each instance has a unique ID, typically derived from partition assignment.
3. Using exactly_once instead of exactly_once_v2
The v1 mode creates one producer per task, which is resource-intensive. Always use v2 on Kafka 2.6+.
4. External side effects
Exactly-once only applies to Kafka-to-Kafka processing. If your consumer writes to an external database:
- The database write is not part of the Kafka transaction.
- You need idempotent writes on the external system (e.g., upsert with a unique key, deduplication table).
5. Ignoring ProducerFencedException
This is a fatal exception. Do not retry — close the producer and let the application restart.
11. Decision Flowchart
Do you need exactly-once?
│
├── Only preventing producer retry duplicates?
│ └── ✅ Use idempotent producer (enable.idempotence=true)
│
├── Kafka-to-Kafka processing with Kafka Streams?
│ └── ✅ Set processing.guarantee=exactly_once_v2
│
├ ── Kafka-to-Kafka with custom consumer/producer?
│ └── ✅ Use transactional producer + read_committed consumer
│
├── Kafka Connect source?
│ └── ✅ Set exactly.once.source.support=enabled (Kafka 3.3+)
│
└── Kafka to external system?
└── ⚠️ EOS covers Kafka side only; implement idempotent
writes on the external system
12. Summary Configuration Cheat Sheet
Producer
enable.idempotence=true
transactional.id=<unique-per-instance>
acks=all
Consumer
isolation.level=read_committed
enable.auto.commit=false
Kafka Streams
processing.guarantee=exactly_once_v2
Broker
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
min.insync.replicas=2
unclean.leader.election.enable=false