Skip to main content

Redis Streams

Introduced in Redis 5.0, Streams are a log-like data structure providing persistent, ordered, multi-consumer message delivery. Think of it as a lightweight alternative to Kafka built into Redis.


Core Conceptsโ€‹

Stream: user-events
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
ID | Fields
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
1700000001000-0 | event=login, userId=123, ip=192.168.1.1
1700000002000-0 | event=purchase, userId=456, amount=29.99
1700000003000-0 | event=login, userId=789, ip=10.0.0.1
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

Each entry has:

  • Stream ID: {milliseconds}-{sequence} โ€” auto-generated or custom
  • Fields: Key-value pairs (like a Hash)

Writing to a Streamโ€‹

# XADD: append entry, auto-generate ID with *
XADD user-events * event login userId 123 ip 192.168.1.1
# Returns: "1700000001000-0"

# XADD with explicit ID (for replication or idempotency)
XADD user-events 1700000001000-0 event login userId 123

# Cap stream at 1000 entries (trimming)
XADD user-events MAXLEN 1000 * event purchase userId 456 amount 29.99
XADD user-events MAXLEN ~ 1000 * ... # ~ means approximate trim (more efficient)

# XLEN โ€” stream length
XLEN user-events

Reading from a Streamโ€‹

# XREAD โ€” read new messages (fan-out broadcast โ€” all readers get all messages)
XREAD COUNT 10 STREAMS user-events 0 # Read from beginning
XREAD COUNT 10 STREAMS user-events $ # Read only NEW messages from now
XREAD COUNT 10 BLOCK 0 STREAMS user-events $ # Block forever until new message

# XRANGE โ€” read a range by ID
XRANGE user-events - + # All entries
XRANGE user-events 1700000001000-0 + # From ID to end
XRANGE user-events - + COUNT 10 # First 10 entries

# XREVRANGE โ€” reverse order
XREVRANGE user-events + - COUNT 10 # Last 10 entries

Consumer Groups โ€” Coordinated Message Processingโ€‹

Consumer groups enable work distribution: multiple consumers in a group collaborate, with each message delivered to exactly one consumer in the group (like a queue).

# Create consumer group starting from the beginning
XGROUP CREATE user-events payment-processors 0
# or from "now" (ignore historical messages)
XGROUP CREATE user-events payment-processors $ MKSTREAM

# Consumer reads from group โ€” ">" means "give me undelivered messages"
XREADGROUP GROUP payment-processors worker-1 COUNT 10 STREAMS user-events >

# Acknowledge message processed successfully
XACK user-events payment-processors 1700000001000-0

# Check pending messages (delivered but not ACKed)
XPENDING user-events payment-processors - + 10

Consumer Group Architectureโ€‹

Stream: user-events
โ”‚
โ”œโ”€โ”€ Consumer Group: payment-processors
โ”‚ โ”œโ”€โ”€ worker-1 (processing 1700000001-0)
โ”‚ โ”œโ”€โ”€ worker-2 (processing 1700000002-0)
โ”‚ โ””โ”€โ”€ worker-3 (idle โ€” waiting)
โ”‚
โ””โ”€โ”€ Consumer Group: analytics-pipeline (gets ALL messages independently)
โ”œโ”€โ”€ analytics-1 (at 1700000001-0)
โ””โ”€โ”€ analytics-2 (at 1700000002-0)

Key semantics:

  • Within a group: each message โ†’ one consumer (load distribution)
  • Between groups: each group independently reads all messages (fan-out)
  • Without a group: XREAD is pure fan-out (every reader gets every message)

Message Acknowledgment and Recoveryโ€‹

# PEL (Pending Entries List) โ€” tracks unACKed messages per consumer
XPENDING user-events payment-processors - + 10
# Returns: [id, consumer-name, idle-ms, delivery-count]

# Claim messages that have been idle >30 seconds (consumer crashed)
XAUTOCLAIM user-events payment-processors recovery-worker 30000 0-0 COUNT 10
# or manually:
XCLAIM user-events payment-processors recovery-worker 30000 1700000001000-0

# XDEL โ€” remove specific entries
XDEL user-events 1700000001000-0

# Dead-letter: after N delivery attempts โ†’ move to dead-letter stream
XPENDING user-events payment-processors - + 10
# delivery-count > 3 โ†’ XDEL original, XADD dead-letter

Reliable Consumer Patternโ€‹

# Worker with exactly-once processing
while True:
# First: check for messages previously delivered to me but not ACKed
pending = redis.xpending_range('user-events', 'payment-processors',
id='-', range='+', consumer='worker-1')
if pending:
messages = [(p['message_id'], {'retry': True}) for p in pending]
else:
# Get new messages
messages = redis.xreadgroup(
groupname='payment-processors',
consumername='worker-1',
streams={'user-events': '>'},
count=10,
block=5000 # block 5 seconds
)

for stream, entries in (messages or []):
for msg_id, fields in entries:
success = process_message(fields)
if success:
redis.xack('user-events', 'payment-processors', msg_id)
# If failed: stays in PEL for retry or recovery

Stream Trimming and Retentionโ€‹

# Trim to exact count (slower โ€” must find trim point)
XTRIM user-events MAXLEN 10000

# Trim approximately (uses listpack node boundaries โ€” faster)
XTRIM user-events MAXLEN ~ 10000

# Trim by minimum ID (delete messages older than a timestamp)
XTRIM user-events MINID 1700000000000-0 # Delete all before this ID

# Auto-trim on XADD
XADD user-events MAXLEN ~ 100000 * event login userId 123

Memory footprint: A stream entry with 5 fields โ‰ˆ 250 bytes. 1 million entries โ‰ˆ 250 MB โ€” plan trimming accordingly.


Redis Streams vs Kafkaโ€‹

Redis StreamsApache Kafka
PersistenceOptional (RDB/AOF)Durable (disk-first)
Message replayYes (by ID range)Yes (by offset)
Consumer groupsYesYes (Consumer Groups)
OrderingPer key (one stream)Per partition
PartitioningManual (multiple streams)Built-in partitions
Throughput100Kโ€“1M msgs/sec1Mโ€“10M msgs/sec
RetentionMemory-limitedDisk (configurable)
Operational complexityLowHigh
Message replay historyLimited by memoryConfigurable (days/weeks)
Best forSimple event queues, internal eventsHigh-throughput event streaming

Choose Redis Streams when:

  • Already using Redis and need simple message queuing
  • Message volume is moderate and memory-bound retention is acceptable
  • Low operational complexity is a priority
  • Messages are relatively small

Choose Kafka when:

  • Need TB of message history
  • Multiple teams consume the same events
  • Sub-millisecond latency matters at high throughput
  • Built-in partitioning for parallelism

XINFO โ€” Stream Inspectionโ€‹

XINFO STREAM user-events # General stream info (length, first/last ID)
XINFO GROUPS user-events # List all consumer groups
XINFO CONSUMERS user-events payment-processors # Consumer details + pending count

Spring Boot Integrationโ€‹

// Producer
@Service
public class EventPublisher {
private final RedisTemplate<String, String> redisTemplate;
private final StreamOperations<String, String, String> streamOps;

public String publishEvent(String userId, String event) {
Map<String, String> fields = Map.of(
"userId", userId,
"event", event,
"timestamp", Instant.now().toString()
);
RecordId id = streamOps.add("user-events", fields);
return id.getValue();
}
}

// Consumer with Consumer Group
@Component
@Slf4j
public class EventConsumer implements StreamListener<String, MapRecord<String, String, String>> {

@Override
public void onMessage(MapRecord<String, String, String> record) {
log.info("Processing: {} - {}", record.getId(), record.getValue());
// ... process
}
}

@Configuration
public class StreamConfig {
@Bean
public Subscription subscription(RedisConnectionFactory factory, EventConsumer consumer) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();

var container = StreamMessageListenerContainer.create(factory, options);
var sub = container.receive(
Consumer.from("payment-processors", "worker-1"),
StreamOffset.create("user-events", ReadOffset.lastConsumed()),
consumer
);
container.start();
return sub;
}
}

Consumer using @StreamListenerโ€‹

@Component
@Slf4j
public class NotificationStreamListener {

@StreamListener(target = "user-events")
public void handleOrderEvent(
@Payload Map<String, String> payload,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
log.info("Notification triggered for user: {}", payload.get("userId"));
}
}

Dead Letter Pattern for Failed Messagesโ€‹

@Scheduled(fixedDelay = 30_000)
public void recoverDeadMessages() {
// Find messages pending for > 60 seconds
PendingMessages pending = redisTemplate.opsForStream()
.pending("user-events", Consumer.from("payment-processors", "worker-1"),
Range.unbounded(), 100);

pending.forEach(message -> {
Duration pendingFor = Duration.between(
message.getLastDeliveryAt(), Instant.now());

if (pendingFor.toSeconds() > 60) {
if (message.getTotalDeliveryCount() > 3) {
// Send to dead letter stream
redisTemplate.opsForStream().add(
StreamRecords.mapBacked(
Map.of("originalId", message.getId().getValue())
).withStreamKey("dead-letter:user-events")
);
redisTemplate.opsForStream()
.acknowledge("user-events", "payment-processors", message.getId());
} else {
// Re-claim and retry
redisTemplate.opsForStream().claim(
"user-events", "payment-processors", "worker-1",
Duration.ofSeconds(0), message.getId()
);
}
}
});
}

Interview Questions (Senior Level)โ€‹

  1. How do you design Redis Streams retention and consumer-group strategy for at-least-once processing under failure?
  2. When should Redis Streams be replaced with Kafka in a growing event platform?
  3. What idempotency and replay controls do you implement to avoid duplicate side effects?
  4. How do you monitor and remediate PEL growth before it becomes an outage?

Short answer guide:

  • Tune MAXLEN/MINID, acknowledgments, and claim policies around workload SLOs.
  • Move to Kafka when retention, partitioning, and consumer scale exceed Redis fit.
  • Use deterministic message keys and dedup stores in consumers.
  • Alert on pending age/count and automate stale-message recovery paths.