Database Sharding & Partitioning
- New learners โ start at Why Shard? and Sharding Strategies to understand the core concepts.
- Senior engineers โ jump to Consistent Hashing deep dive, Cross-Shard Problems, Rebalancing, or Real-World Database Comparison.
What is Partitioning?โ
Partitioning splits a large dataset into smaller, more manageable pieces. There are two fundamentally different ways to split:
Vertical partitioningโ
Split a table by columns. Move rarely-accessed or large fields into a separate table while keeping hot fields together.
Before (one wide table):
โโโโโโโโโโโฌโโโโโโโโโโโฌโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโ
โ user_id โ username โ email โ profile_bio (TEXT) โ avatar_blob โ
โโโโโโโโโโโดโโโโโโโโโโโดโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโ
After (vertical split):
โโโโโโโโโโโฌโโโโโโโโโโโฌโโโโโโโโ โโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโ
โ user_id โ username โ email โ โ user_id โ profile_bio, avatar โ
โโโโโโโโโโโดโโโโโโโโโโโดโโโโโโโโ โโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโ
Hot table (queried every login) Cold table (queried on profile view)
When to use it: when a small number of columns are accessed in 95% of queries and the rest inflate the row size, bloating cache and increasing I/O.
Horizontal partitioning (sharding)โ
Split a table by rows. Different subsets of rows live on entirely different database servers.
Before: one table with 100M rows on one server
After (sharded by user_id):
Shard A โโโ rows where user_id % 3 = 0 (โ33M rows)
Shard B โโโ rows where user_id % 3 = 1 (โ33M rows)
Shard C โโโ rows where user_id % 3 = 2 (โ33M rows)
When we say "sharding" in system design, we almost always mean horizontal partitioning. The rest of this guide focuses on it.
Why Shard?โ
A single database server has hard physical limits. Understanding which limit you're hitting determines the right solution:
| Bottleneck | Symptom | First-line solution | When you need sharding |
|---|---|---|---|
| Read load | Slow SELECTs, high CPU on reads | Add read replicas | When even read replicas can't absorb the fan-out |
| Write throughput | Replication lag, high disk I/O | Tune indexes, batch writes | When write IOPS saturate the primary's disk |
| Dataset size | Disk full, slow full-table scans | Archival, compression | When data exceeds one server's storage capacity |
| RAM (working set) | High disk reads, cache miss rate rising | Bigger instance | When the working set no longer fits in memory |
Before sharding, try in this order: query tuning โ indexes โ vertical scaling โ read replicas โ caching (Redis). Sharding adds enormous operational complexity. It should be a last resort, not a first instinct.
Concrete thresholds to consider shardingโ
- Table rows exceeding 500Mโ1B and growing fast
- Write throughput exceeding ~10K writes/sec sustained on a single primary
- Dataset exceeding 2โ4 TB (point where even SSDs become expensive and slow)
- P99 write latency increasing despite hardware upgrades
Sharding Strategiesโ
Choosing a routing algorithm is the most important decision in your sharding design. Each strategy makes a different trade-off between write distribution, range query efficiency, and rebalancing cost.
1. Range partitioningโ
Divide data into continuous ranges of the shard key. Each shard owns a contiguous slice.
Shard 1: user_id 1 โ 10,000,000
Shard 2: user_id 10,000,001 โ 20,000,000
Shard 3: user_id 20,000,001 โ 30,000,000
-- PostgreSQL declarative range partition
CREATE TABLE orders (
id BIGSERIAL,
user_id BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
total NUMERIC(12,2)
) PARTITION BY RANGE (created_at);
CREATE TABLE orders_2024_q1 PARTITION OF orders
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE orders_2024_q2 PARTITION OF orders
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
Pros:
- Range queries are extremely efficient โ
WHERE user_id BETWEEN 100 AND 500touches exactly one shard. - Contiguous scans for reporting and archival are fast.
- Easy to reason about which shard holds which data.
Cons:
- High risk of hot spots (see below). Sharding by
created_ator auto-increment means all new writes always go to the last shard. - Uneven data distribution if the key is not uniformly spread.
If you shard by timestamp or a monotonically increasing ID:
t=0: Shard 1 โ all writes (idle: Shard 2, 3)
t=1: Shard 2 โ all writes (idle: Shard 1, 3)
t=2: Shard 3 โ all writes (idle: Shard 1, 2)
You've bought yourself horizontal hardware but not horizontal write throughput. The active shard is always the bottleneck. Mitigations: add a random prefix to the key, shard by a different key (e.g. user_id instead of order_time), or use consistent hashing.
2. Hash partitioningโ
Apply a hash function to the shard key and use modulo to assign a shard:
shard_id = hash(user_id) % num_shards
hash("user_1001") % 3 = 0 โ Shard A
hash("user_1002") % 3 = 2 โ Shard C
hash("user_1003") % 3 = 1 โ Shard B
Pros:
- Even data distribution โ hash functions spread keys uniformly, eliminating hot spots.
- Simple to implement and reason about.
Cons:
- Range queries become scatter-gather operations.
user_id BETWEEN 1 AND 1000hits all shards because adjacent keys hash to different shards. - Rebalancing is catastrophic. Adding a shard changes
num_shardsfrom N to N+1, sohash(key) % (N+1)gives a different result for almost every key. Nearly 100% of data must migrate.
3 shards โ 4 shards:
key "abc": hash % 3 = 1 (was Shard B) โ hash % 4 = 3 (now Shard D)
key "xyz": hash % 3 = 0 (was Shard A) โ hash % 4 = 1 (now Shard B)
... virtually every key must move
This is the fundamental weakness that consistent hashing was designed to solve.
3. Consistent hashingโ
Rather than modulo arithmetic, both keys and nodes are mapped onto a virtual ring from 0 to 2ยณยฒโ1. A key is owned by the first node clockwise from its hash position.
0
โโโโโโโดโโโโโโ
N3 โ โ N1
(75%) โ โ (25%)
โ RING โ
โ โ
โโโโโโโฌโโโโโโ
N2
(50%)
Key hash=10% โ clockwise โ N1 โ
Key hash=40% โ clockwise โ N2 โ
Key hash=60% โ clockwise โ N3 โ
Adding a node: only the keys between the new node and its predecessor on the ring need to move. ~1/N keys migrate instead of ~100%.
Before (3 nodes):
Ring: N1(25%) โโ N2(50%) โโ N3(75%) โโ N1...
After adding N4 at position 60%:
Ring: N1(25%) โโ N2(50%) โโ N4(60%) โโ N3(75%) โโ N1...
Only keys between 50%โ60% move from N3 to N4.
Everything else stays put.
Removing a node (failure or decommission): only that node's keys move to its successor. All other nodes are unaffected.
Pros:
- Minimal data movement on topology changes (~1/N keys migrate).
- Naturally supports heterogeneous nodes via virtual nodes.
- No single point of failure โ ring is fully decentralised.
Cons:
- Basic consistent hashing can still produce uneven distribution if nodes land unluckily on the ring.
- More complex to implement than modulo hashing.
- Non-uniform load is solved with virtual nodes (see senior section below).
4. Directory / lookup serviceโ
A central routing table (sometimes called a shard map) explicitly records where each key or key range lives:
email โ user_id (mapping table)
user_id range โ shard_id
shard_id โ host:port
Pros:
- Maximum flexibility โ you can manually move individual hot tenants to dedicated hardware.
- Supports complex routing logic (geo-based, tier-based).
- Shard rebalancing requires only updating the directory, not rehashing.
Cons:
- Central directory is a single point of failure (mitigate with replication + caching).
- Adds a network hop to every query.
- Directory can become stale โ cache invalidation is a hard problem.
Strategy comparisonโ
| Range | Hash modulo | Consistent hash | Directory | |
|---|---|---|---|---|
| Range queries | โ Efficient (single shard) | โ Scatter-gather | โ Scatter-gather | โ If mapped |
| Write distribution | โ Hot spots (monotonic keys) | โ Even | โ Even | โ Flexible |
| Rebalancing cost | ๐ก Migrate range | โ Migrate ~100% | โ Migrate ~1/N | โ Update table |
| Complexity | Low | Low | Medium | High |
| Used by | PostgreSQL, MySQL, Cassandra | Redis Cluster (basic) | Cassandra, DynamoDB, Riak | Vitess, some custom systems |
Consistent Hashing โ Deep Diveโ
Virtual nodes (vnodes)โ
Basic consistent hashing with one point per physical node produces uneven distribution โ nodes land at random positions and some get much larger arcs than others.
The solution: each physical node claims multiple positions on the ring (virtual nodes). A cluster with 3 servers and 150 vnodes per server has 450 points on the ring, producing near-uniform distribution.
Physical servers: S1, S2, S3
Virtual nodes: S1_1, S1_2, ..., S1_150,
S2_1, S2_2, ..., S2_150,
S3_1, S3_2, ..., S3_150
Ring positions (sorted):
... S3_72 โโ S1_14 โโ S2_91 โโ S3_4 โโ S1_88 ...
โ โ
(key A โ S1_14) (key B โ S2_91)
Benefits of vnodes:
- Even load distribution regardless of how many nodes join or leave.
- A more powerful server can be given more vnodes to handle a proportionally larger share.
- When a node fails, its vnodes are spread across many other nodes, distributing the recovery load instead of dumping everything on one neighbour.
Used by: Cassandra (default 256 vnodes/node), DynamoDB internally, Riak.
Replication with consistent hashingโ
In production, keys are not stored on just one node. They are replicated to the next N nodes clockwise on the ring (the replication factor):
Replication factor = 3:
Key โ primary node โ replica 1 (next clockwise) โ replica 2 (next after that)
This means every piece of data survives the loss of up to N-1 nodes in its replica group.
Cross-Shard Problemsโ
Sharding is not free. It introduces a category of distributed systems problems that don't exist on a single server.
Cross-shard JOINsโ
SQL JOINs assume data lives in the same process. When tables are split across servers, you can't do a SQL JOIN across shards.
-- Works on a single DB:
SELECT u.name, o.total
FROM users u JOIN orders o ON u.id = o.user_id
WHERE u.country = 'VN';
-- With sharding: users and orders may be on different shards.
-- This JOIN is impossible at the DB layer.
Solutions:
- Co-locate data (preferred)
- Denormalize
- Application-layer JOIN
Design your shard key so related entities always land on the same shard. If you shard users and orders both by user_id, all of Alice's data is on the same shard:
Shard A: users where user_id % 3 = 0
orders where user_id % 3 = 0
โ JOIN between Alice's user row and Alice's orders never crosses a shard
Duplicate the fields you need into the child record so you don't need the JOIN at all:
// Orders document includes user snapshot โ no JOIN needed
{
"order_id": "ORD-001",
"user_id": "alice",
"user_name": "Alice Tran", // duplicated
"user_email": "alice@...", // duplicated
"total": 99.90
}
Trade-off: update complexity โ if the user's email changes, all their orders must be updated too.
Fetch data from each shard separately and join in application code (API Composition pattern):
# Step 1: query shard determined by user_id
user = shard_for(user_id).query("SELECT * FROM users WHERE id = ?", user_id)
# Step 2: query shard determined by user_id (same shard if co-located)
orders = shard_for(user_id).query("SELECT * FROM orders WHERE user_id = ?", user_id)
# Step 3: join in application code
result = { **user, "orders": orders }
This works but is slower than a native SQL JOIN and harder to express complex join conditions.
Global transactions (distributed ACID)โ
A transaction that touches data on multiple shards cannot use standard ACID guarantees without a distributed coordination protocol.
Transfer $100 from Alice (Shard A) to Bob (Shard B):
Step 1: Debit Alice on Shard A โ
Step 2: Credit Bob on Shard B โ (Shard B crashes)
Result: Alice lost $100, Bob received nothing โ inconsistency
Solutions:
| Approach | How it works | Trade-offs |
|---|---|---|
| Two-Phase Commit (2PC) | Coordinator asks all shards to "prepare", then commits if all agree. | Synchronous, slow, coordinator is a SPOF. Blocks if coordinator crashes mid-transaction. |
| Saga pattern | Break transaction into a sequence of local transactions. Each step publishes an event. Compensating transactions undo steps on failure. | Eventual consistency, complex to implement, no atomicity guarantee. |
| Avoid cross-shard writes | Design data model so all writes for a logical operation touch one shard only (co-location). | Best option when achievable โ zero coordination cost. |
| Optimistic locking + reconciliation | Allow eventual inconsistency; detect and reconcile conflicts later. | Works for some use cases (analytics); unacceptable for money. |
If you find yourself writing a lot of cross-shard transactions, your shard key is probably wrong. Revisit whether a different key can co-locate the data that is written together.
Unique ID generationโ
Auto-incrementing primary keys (AUTO_INCREMENT, SERIAL) don't work across isolated shards โ each shard would generate its own id=1, id=2, creating duplicates when merged.
Solutions:
- UUID v4 / v7
- Snowflake ID (recommended)
- ULID
- Composite key
import uuid
id = uuid.uuid4() # e.g. "550e8400-e29b-41d4-a716-446655440000"
UUID v4: random 128-bit โ globally unique, but not sortable. Causes random index insertions, defeating B-tree locality.
UUID v7: timestamp-prefixed โ globally unique AND sortable by creation time. Preferred over v4 for database IDs.
Originated at Twitter. A 64-bit integer composed of:
| 41 bits timestamp (ms) | 10 bits machine/datacenter ID | 12 bits sequence |
# Rough implementation concept
def snowflake_id(machine_id: int, sequence: int) -> int:
timestamp_ms = current_time_ms() - EPOCH_MS
return (timestamp_ms << 22) | (machine_id << 12) | sequence
# sequence resets every ms; 4096 IDs/ms per machine
Properties:
- Globally unique without coordination between shards.
- Roughly time-sortable (IDs increase over time).
- Fits in a 64-bit integer โ efficient for indexes.
- 4096 IDs per millisecond per machine (12-bit sequence), 1024 machines (10-bit machine ID).
Used by: Twitter, Discord, Instagram (variant), Mastodon.
ULID (Universally Unique Lexicographically Sortable Identifier) โ a 128-bit ID that is URL-safe and sortable:
01ARZ3NDEKTSV4RRFFQ69G5FAV
โโโโโโโโโโโโคโโโโโโโโโโโโโโโโโโค
timestamp randomness
(48 bits) (80 bits)
Similar to UUID v7 in concept. Sortable as a string. Useful when string IDs are preferred over integers.
-- shard_id is part of the primary key, ensuring global uniqueness
CREATE TABLE users (
shard_id SMALLINT NOT NULL, -- e.g. 0โ255
local_id BIGSERIAL, -- auto-increment within this shard
PRIMARY KEY (shard_id, local_id)
);
Simple but couples the ID to the shard โ makes resharding harder.
Scatter-gather queries (no shard key in filter)โ
When a query doesn't include the shard key in its WHERE clause, the router has no choice but to send it to all shards and merge the results:
-- Shard key is user_id. This query has no user_id:
-- โ sent to ALL shards, results merged, sorted, returned
This is called a scatter-gather or fan-out query. At 10 shards it's 10x overhead. At 100 shards it's 100x.
Solutions:
| Solution | Description | Trade-off |
|---|---|---|
| Global secondary index | Maintain a separate index table mapping non-key attributes to shard keys | Extra write on every insert/update; index must be highly available |
| Mapping table in Redis | email โ user_id stored in Redis; look up user_id first, then query correct shard | Extra network hop; cache invalidation |
| Dual-write to a search index | Write to both your DB and Elasticsearch on every mutation | Eventual consistency; more complex writes |
| Covering shard key | Choose a shard key that appears in all hot query patterns (e.g. tenant_id in a SaaS app) | May not be possible for all queries |
Rebalancing Strategiesโ
When your cluster grows (adding nodes) or shrinks (node failure, decommission), data must be redistributed. How you do this determines downtime and migration cost.
Manual rebalancingโ
An operator explicitly decides which partitions to move and executes the migration. The system makes no automatic decisions.
- Pro: full operator control, no surprise data movements.
- Con: tedious and error-prone at scale; requires careful sequencing.
Used by: MongoDB (manual chunk migration via moveChunk).
Automatic rebalancingโ
The system continuously monitors shard load and migrates partitions in the background when imbalance is detected.
Cassandra:
โ monitors each vnode's data volume
โ auto-streams data to new nodes when they join
โ background streaming, no downtime
DynamoDB:
โ fully managed; partitions split/merge transparently
โ no operator involvement required
- Pro: hands-off operation; reacts to organic growth.
- Con: background migrations consume I/O; can degrade query performance during rebalancing.
Fixed partition countโ
Instead of changing the number of partitions when nodes change, use a large fixed number of partitions (e.g. 1000) and redistribute ownership of those partitions to nodes:
1000 fixed partitions, 3 nodes:
Node A โ partitions 0โ332
Node B โ partitions 333โ666
Node C โ partitions 667โ999
Adding Node D:
Node A โ partitions 0โ249 (moved 83 partitions to D)
Node B โ partitions 333โ582 (moved 84 partitions to D)
Node C โ partitions 667โ916 (moved 83 partitions to D)
Node D โ partitions 250โ332, 583โ666, 917โ999
Only metadata (partitionโnode mapping) changes; the partitions themselves remain stable. Used by Elasticsearch (fixed primary shards).
This is why choosing the right initial shard count matters so much. Rule of thumb: aim for shards of 10โ50 GB each. For a 500 GB index, 10โ50 shards is reasonable.
Scatter-Gather vs. Co-located Queriesโ
Understanding when queries cross shards is essential for performance planning:
Co-located query (best case):
Client โ Router โ Shard A โ all data here
โ
Result
Latency: 1 network hop
Scatter-gather query (worst case):
Client โ Router โ Shard A โโ
โ Shard B โโคโ Merge & sort โ Result
โ Shard C โโ
Latency: 1 hop + merge time + slowest shard's response
Real latency impact: at P50 scatter-gather is ~3x slower; at P99 it's often 10โ20x slower because you wait for the slowest shard (the "long tail" problem).
Real-World Database Comparisonโ
- Cassandra
- DynamoDB
- MongoDB
- PostgreSQL
- Vitess / MySQL
Strategy: Consistent hashing with vnodes (default 256 per node).
Shard key: The partition key in the PRIMARY KEY definition.
CREATE TABLE orders_by_user (
user_id UUID,
created_at TIMESTAMP,
order_id UUID,
total DECIMAL,
PRIMARY KEY ((user_id), created_at, order_id)
-- ^^^^^^^^^
-- partition key โ determines shard
-- created_at, order_id โ clustering keys (sort within shard)
) WITH CLUSTERING ORDER BY (created_at DESC);
Rebalancing: Automatic streaming when nodes join or leave. New nodes bootstrap data from neighbours.
Cross-shard JOINs: Not supported. You must denormalise into query-specific tables.
Strategy: Consistent hashing, fully managed and invisible to the user.
Shard key: The Partition Key (PK) of your table's primary key.
# Table: partition key = user_id, sort key = created_at
# DynamoDB routes all items with the same user_id to the same partition
table.put_item(Item={
'user_id': 'alice', # partition key โ determines shard
'created_at': '2024-01-15T09:00:00Z', # sort key โ orders within shard
'total': Decimal('99.90')
})
Adaptive capacity: DynamoDB automatically isolates hot partitions, borrowing unused capacity from cold ones.
Hot key problem: A single partition can handle ~3000 RCU/s and 1000 WCU/s. If one user_id generates more traffic than this, it becomes a hot partition regardless of consistent hashing.
Mitigation: Add a random suffix to the partition key (write amplification), use user_id#shard_suffix where suffix = hash(timestamp) % 8.
Strategy: Range-based or hashed sharding, operator-chosen per collection.
// Enable sharding on a collection
sh.enableSharding("ecommerce")
// Hashed shard key โ even distribution, no range queries
sh.shardCollection("ecommerce.orders", { user_id: "hashed" })
// Range shard key โ efficient range queries per user
sh.shardCollection("ecommerce.orders", { user_id: 1, created_at: 1 })
Routing: A mongos router process consults the config servers (shard map) and routes each query. Multi-shard queries are aggregated by mongos.
Rebalancing: Semi-automatic. MongoDB auto-migrates chunks in the background but operators can also trigger moveChunk manually. Each collection is split into fixed-size chunks (default 128 MB).
Built-in declarative partitioning (single-server, no routing layer):
-- Hash partitioning (PostgreSQL 11+)
CREATE TABLE users (
user_id BIGINT,
name TEXT,
email TEXT
) PARTITION BY HASH (user_id);
CREATE TABLE users_p0 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE users_p1 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 1);
CREATE TABLE users_p2 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 2);
CREATE TABLE users_p3 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 3);
For multi-server sharding, PostgreSQL requires an extension: Citus (now part of Azure) distributes partitions across worker nodes and adds a coordinator that rewrites queries.
-- With Citus extension
SELECT create_distributed_table('orders', 'user_id');
-- Citus shards the table across worker nodes by user_id
Vitess is the sharding layer built at YouTube that sits in front of MySQL. It is the basis of PlanetScale.
Strategy: Range-based sharding with a VSchema (virtual schema) that defines which columns are shard keys.
{
"sharded": true,
"vindexes": {
"user_id_vindex": {
"type": "hash"
}
},
"tables": {
"users": {
"column_vindexes": [{ "column": "user_id", "name": "user_id_vindex" }]
}
}
}
Vitess handles cross-shard scatter-gather queries, online schema changes (without locking), and connection pooling. Used by YouTube, Slack, GitHub, Shopify.
Sharding in SQL Databases โ Patternsโ
PostgreSQL: list partitioningโ
Useful when the partition key is categorical (e.g. region, status):
CREATE TABLE orders (
id BIGSERIAL,
region TEXT NOT NULL,
total NUMERIC(12,2)
) PARTITION BY LIST (region);
CREATE TABLE orders_apac PARTITION OF orders
FOR VALUES IN ('VN', 'SG', 'TH', 'PH', 'ID');
CREATE TABLE orders_emea PARTITION OF orders
FOR VALUES IN ('DE', 'FR', 'GB', 'NL');
CREATE TABLE orders_amer PARTITION OF orders
FOR VALUES IN ('US', 'CA', 'BR', 'MX');
Partition pruningโ
When the WHERE clause includes the partition key, PostgreSQL eliminates irrelevant partitions from the query plan โ only scanning the partitions that could contain matching rows:
EXPLAIN SELECT * FROM orders WHERE region = 'VN' AND total > 1000;
-- โ Seq Scan on orders_apac (partition pruning eliminates orders_emea, orders_amer)
Sub-partitioning (composite)โ
Partitions can themselves be partitioned โ range by year, then hash within the year:
CREATE TABLE events (
id BIGSERIAL,
created_at DATE NOT NULL,
user_id BIGINT NOT NULL
) PARTITION BY RANGE (created_at);
CREATE TABLE events_2024 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01')
PARTITION BY HASH (user_id); -- sub-partition by user
CREATE TABLE events_2024_p0 PARTITION OF events_2024
FOR VALUES WITH (MODULUS 4, REMAINDER 0);
-- ... p1, p2, p3
Monitoring a Sharded Clusterโ
A sharded system introduces failure modes that don't exist on single-server databases. These are the key signals to watch:
| Metric | What it reveals | Alert threshold (example) |
|---|---|---|
| Shard data size variance | Uneven distribution โ one shard getting all writes | >20% deviation from mean |
| Replication lag per shard | A replica is falling behind; stale reads possible | >30 seconds |
| Scatter-gather query ratio | % of queries without shard key โ cross-shard overhead | >5% of query volume |
| P99 cross-shard query latency | Long-tail queries from multi-shard fan-out | >1 second |
| Chunk migration rate (MongoDB) | Background rebalancing consuming I/O | Monitor spikes during business hours |
| Hot partition rate (DynamoDB) | One partition key receiving disproportionate traffic | Throttled requests per partition |
๐ฏ Interview Questionsโ
For new learnersโ
Q: What is the difference between replication and sharding?
Replication copies the same data to multiple nodes to improve read scalability and high availability (fault tolerance). Sharding splits different data across multiple nodes to improve write scalability and handle datasets too large for one disk. They are almost always used together: a distributed database will have multiple shards, and each shard will have multiple replicas.
Q: What is a hot spot in sharding, and how do you prevent it?
A hot spot occurs when one shard receives disproportionately more traffic than others. The most common cause is choosing a monotonically increasing shard key (timestamp, auto-increment ID) โ all new writes always go to the "latest" shard. Mitigations: choose a high-cardinality, randomly distributed shard key (like
user_id), use hash partitioning, or add a random salt prefix to the key at the cost of scatter-gather on reads.
Q: When should you NOT shard?
When simpler alternatives can solve the problem: query tuning, adding indexes, vertical scaling (bigger server), adding read replicas, or in-memory caching. Sharding adds massive operational complexity โ cross-shard JOINs, distributed transactions, complex deployments, harder debugging. Only shard when you've genuinely exhausted the alternatives and benchmarks confirm the bottleneck.
For senior engineersโ
Q: Why is hash modulo (hash(key) % N) a bad strategy for dynamically scaling a database?
When you add one shard (N becomes N+1), the modulo result changes for nearly every key โ
hash(key) % Nandhash(key) % (N+1)rarely agree. This means close to 100% of data must migrate across the network to restore consistency. For a 10 TB dataset, this is a near-total reshuffle with massive I/O and potential downtime. Consistent hashing solves this by only requiring ~1/N of data to migrate when adding one node.
Q: How do virtual nodes (vnodes) improve consistent hashing?
Basic consistent hashing places one point per physical node on the ring. With three nodes, one might get a 40% arc, another 35%, another 25% โ uneven load. Virtual nodes assign each physical node 100โ300 positions on the ring, producing near-uniform distribution. Additionally: a stronger server can be given more vnodes to carry a larger share; when a node fails, its vnodes scatter to many different neighbours, distributing recovery load instead of overloading one node.
Q: You shard users by user_id. A user logs in with only their email. How do you find the right shard?
email โ user_id. Every login first queries the mapping table to getuser_id, then routes to the correct shard. The mapping table is tiny (just email + user_id) and can be replicated globally for low latency.
Q: How does the Saga pattern replace distributed transactions in a sharded system?
A Saga breaks a cross-shard operation into a sequence of local transactions, each publishing a domain event on completion. Downstream services listen and execute their step. If any step fails, compensating transactions run in reverse order to undo completed steps (e.g. refund a payment if inventory reservation fails). The key difference from 2PC: Saga is asynchronous and eventually consistent โ there is a window where the system is partially updated. This is acceptable for many business workflows (order fulfilment) but not for hard financial consistency (bank transfers).
Q: A DynamoDB table's user_id partition is throttled because one user generates 10x the normal traffic. How do you fix it?
The hot partition problem. Options: (1) Write sharding โ append a random suffix (
user_id#0throughuser_id#7) to distribute writes across 8 partitions, then scatter-gather on reads and aggregate. (2) Caching โ put a Redis cache in front for read-heavy hot users. (3) DAX (DynamoDB Accelerator) โ AWS in-memory cache for DynamoDB, microsecond reads for hot keys. (4) Data model redesign โ if this user's data is always accessed differently from others, consider a separate table or a different access pattern that doesn't funnel through one partition.
Q: Walk through how Cassandra rebalances data when a new node joins.
When a new node joins, it announces itself to the ring with a chosen token (or multiple tokens if using vnodes). Cassandra's gossip protocol propagates its existence to the cluster. The node then streams its responsible token ranges from its neighbours โ specifically, it takes ownership of its vnodes' key ranges, and the previous owners stream the corresponding data to it. Read and write traffic gradually shifts to the new node as the ring topology updates. The process is online โ the cluster continues serving traffic throughout. Once streaming completes, the operator can
nodetool cleanupon the previous owners to reclaim disk space from keys that are no longer their responsibility.