Multi-Raft and sharding consensus

It is 21:14 IST on the eve of a BharatBazaar mega-sale. The platform team's payments cluster runs a single Raft group across 5 nodes, and a load test has just plateaued at 14,000 writes per second. The Mumbai leader's CPU is at 91%, its disk fsync queue depth is 38, and the four followers are bored — three of their cores idle, their disks coasting at 12% utilisation. Aditi looks at the flame graph and realises the obvious: one leader is the entire write capacity of the cluster, regardless of how many followers you add. Why this is the entry point to Multi-Raft: classical Raft is a single-leader protocol — every write goes through one node, fsyncs to its WAL, replicates to a quorum, and acks. Adding more followers does not increase write throughput; it only increases fault tolerance and read capacity. To scale writes, you need more leaders. The only way to have more leaders without breaking Raft's safety properties is to run more Raft groups, each responsible for a slice of the keyspace. That is Multi-Raft.

Multi-Raft runs thousands of independent Raft groups in parallel, one per shard, with each group's leader responsible for a key range. Throughput scales linearly with the number of groups: 1024 groups across 30 nodes gives ~34 leaders per node, balancing the write load. The hard parts are not the per-group protocol — that's vanilla Raft — but the cross-cutting machinery: heartbeat batching to avoid N² traffic, shard placement and rebalancing, range splits/merges, and atomic cross-shard transactions on top of independently-replicated state.

Why a single Raft group cannot scale

A Raft leader does roughly four pieces of work for every write: receive the RPC, append to the local WAL with fsync, send AppendEntries to followers, and apply the committed entry to the state machine. The fsync and the AppendEntries dominate. On a c6i.xlarge instance with NVMe-backed WAL, sustained Raft throughput tops out around 18–25k writes/sec for ~200-byte entries — the WAL's group-commit batching is the actual bottleneck. Add more followers and the leader's per-write work grows (more AppendEntries to send), so throughput goes down, not up.

The single-group ceiling is structural. CockroachDB, TiKV, YugabyteDB, etcd-at-scale, FoundationDB — every production system that needs more than ~20k writes/sec has reached the same conclusion: shard the keyspace, run one Raft group per shard, and let leaders distribute themselves across the cluster. Why "one Raft group per shard" is structurally different from "one bigger leader": the per-shard leaders are independent — they elect independently, replicate independently, fsync independently to their own WAL segments, and crucially, they can sit on different physical nodes. If shard 042's leader is on node-7 and shard 089's leader is on node-12, the two leaders write in parallel. The cluster's aggregate write capacity becomes Σ (per-shard leader capacity), not min(node capacity).

Single Raft group vs Multi-Raft on a 5-node clusterTwo panels. Left: one Raft group on 5 nodes; the leader on N1 receives all writes and replicates to four followers, capped at one leader's throughput. Right: 12 Raft groups distributed across the same 5 nodes; each node hosts ~2-3 leaders, write throughput is the sum of all leaders' capacities. One leader is the cluster's write ceiling — Multi-Raft removes the ceiling Single Raft group 1 leader, 4 followers — write cap ≈ 20k/s N1 (leader) N2 N3 N4 N5 N1 fsync = bottleneck N2–N5 disks <15% util throughput = 1× leader Multi-Raft (12 groups, 5 nodes) ~2.4 leaders/node — every node writes N1 L: g1,g7 N2 L: g2,g8,g11 N3 L: g3,g9 N4 L: g4,g10,g12 N5 L: g5,g6 Each node hosts followers for 7–8 other groups (replication factor 3 — only some shown) 12 leaders writing in parallel all disks 50–70% util throughput ≈ 12× single-group Illustrative — actual scaling depends on network, fsync, and shard balance. Real systems run 1k–100k groups.
Illustrative — single Raft group vs Multi-Raft on the same 5-node cluster. The right panel shows 12 groups; production deployments run thousands per cluster (TiKV's default is 96 MB per region; a 10 TB cluster has ~100k groups).

Heartbeat batching — the trick that makes thousands of groups affordable

The naive way to run 10,000 Raft groups is to run 10,000 independent Raft state machines, each sending its own heartbeats. With a 50 ms heartbeat interval and a replication factor of 3, that is 10,000 × 2 followers × (1000/50) = 400,000 heartbeat RPCs per second — per node. The TCP overhead alone destroys the cluster. Worse, every heartbeat carries the standard Raft AppendEntries headers (term, prev_log_index, prev_log_term, leader_commit) — adding up to ~50 bytes of pure overhead per RPC, before any payload.

The TiKV team's key insight (described in their 2017 blog post and refined in CockroachDB's storage layer): batch heartbeats from co-located groups. Two Raft groups whose leaders both happen to live on node-3 and whose followers both live on node-7 can share a single TCP message — the message carries an array of (group_id, term, prev_log_index, prev_log_term, leader_commit) tuples instead of one tuple per RPC. Why batching works: the heartbeat is a periodic, fire-and-forget keepalive; it does not need per-group ordering or per-group acknowledgement at the transport layer. As long as the receiving node demultiplexes the batch back into per-group state updates, each group's Raft state machine sees the heartbeat it expected. The batching saves N–1 RPC overheads, where N is the number of co-located groups; on a busy node hosting 200 leaders to a particular peer, that is 199 RPCs collapsed into one.

CockroachDB's MultiRaft library batches at three layers: the heartbeats themselves, the WAL writes (one fsync per node per ~1 ms tick instead of per group), and the network sends (using a single Snappy-compressed message per peer connection per tick). The combined effect: a 30-node CockroachDB cluster with 100,000 ranges sends ~6,000 heartbeat batches per second total instead of 12 million individual heartbeat RPCs.

A working Multi-Raft heartbeat-batching simulation

# multi_raft_heartbeat_batching.py — compares per-group RPC vs batched heartbeats
# under a Multi-Raft workload.
import collections, statistics

NUM_NODES = 30
NUM_GROUPS = 10_000
REPLICATION_FACTOR = 3
HEARTBEAT_INTERVAL_MS = 50
TICK_MS = 1
RPC_OVERHEAD_BYTES = 50           # TCP/IP + protobuf framing per RPC
HEARTBEAT_PAYLOAD_BYTES = 32      # term + prev_log_index + prev_log_term + leader_commit

# Simulate: assign each group's leader to a random node, followers to two others.
import random; random.seed(42)
groups = []
for g in range(NUM_GROUPS):
    members = random.sample(range(NUM_NODES), REPLICATION_FACTOR)
    groups.append({"id": g, "leader": members[0], "followers": members[1:]})

# Per-group heartbeats: each leader sends one RPC per follower per heartbeat tick.
def naive_rpcs_per_sec():
    rpcs = NUM_GROUPS * (REPLICATION_FACTOR - 1) * (1000 // HEARTBEAT_INTERVAL_MS)
    bytes_per_sec = rpcs * (RPC_OVERHEAD_BYTES + HEARTBEAT_PAYLOAD_BYTES)
    return rpcs, bytes_per_sec

# Batched heartbeats: per (leader_node, follower_node) pair, one batched RPC carrying
# the heartbeat tuples for every group whose leader is on leader_node and a follower on follower_node.
def batched_rpcs_per_sec():
    pair_groups = collections.defaultdict(int)
    for g in groups:
        for f in g["followers"]:
            pair_groups[(g["leader"], f)] += 1
    rpcs = len(pair_groups) * (1000 // HEARTBEAT_INTERVAL_MS)
    bytes_per_sec = sum(
        (RPC_OVERHEAD_BYTES + HEARTBEAT_PAYLOAD_BYTES * count) * (1000 // HEARTBEAT_INTERVAL_MS)
        for count in pair_groups.values()
    )
    return rpcs, bytes_per_sec, pair_groups

naive_r, naive_b = naive_rpcs_per_sec()
batched_r, batched_b, pairs = batched_rpcs_per_sec()
groups_per_pair = list(pairs.values())

print(f"NUM_NODES={NUM_NODES} NUM_GROUPS={NUM_GROUPS}  RF={REPLICATION_FACTOR}")
print(f"distinct (leader, follower) node pairs = {len(pairs)}")
print(f"groups per pair: min={min(groups_per_pair)} median={statistics.median(groups_per_pair):.0f} max={max(groups_per_pair)}")
print(f"naive  RPCs/sec = {naive_r:,}    bytes/sec = {naive_b/1e6:.1f} MB/s")
print(f"batched RPCs/sec = {batched_r:,}   bytes/sec = {batched_b/1e6:.1f} MB/s")
print(f"reduction: {naive_r/batched_r:.1f}x RPCs, {naive_b/batched_b:.1f}x bytes")

Sample run (Python 3.11, deterministic given the seed):

NUM_NODES=30 NUM_GROUPS=10000  RF=3
distinct (leader, follower) node pairs = 870
groups per pair: min=2 median=23 max=42
naive  RPCs/sec = 400,000    bytes/sec = 32.8 MB/s
batched RPCs/sec = 17,400   bytes/sec = 14.5 MB/s
reduction: 23.0x RPCs, 2.3x bytes

Per-line walkthrough. pair_groups[(g["leader"], f)] += 1 is the batching key — every group whose leader sits on the same node and whose follower sits on the same other node shares a single RPC. distinct (leader, follower) node pairs = 870 is the upper bound on the number of RPCs per heartbeat tick; with 30 nodes, the maximum is 30 × 29 = 870 (every ordered pair). The batching squeezes 10,000 × 2 = 20,000 leader→follower edges into 870 pairs, an average of 23 groups per RPC. reduction: 23.0x RPCs is the load reduction on the network's RPC throughput, which is what kernel-level CPU spends time on. 2.3x bytes is the wire-byte reduction — smaller because the heartbeat payload (32 bytes per group) is amortised across the per-RPC overhead (50 bytes), so a batched RPC saves overhead but still carries every per-group payload. Why the byte reduction is smaller than the RPC reduction: each group still needs its own 32-byte heartbeat payload (term, log indexes), regardless of batching. The 50-byte per-RPC overhead — TCP framing, length prefixes, protobuf field tags — is the part batching collapses. The RPC count drops by 23×; the byte count drops by 2.3× because the per-group payload dominates once you batch enough groups together.

Production systems push the batching further: TiKV's RaftBatchSystem (introduced in TiKV 3.0) groups not just heartbeats but also AppendEntries carrying log entries, and shares a single fsync per WAL flush across all groups whose leader is on this node. The fsync sharing is what unlocks high write throughput per node — instead of 1,000 fsyncs/sec across 1,000 leaders, the node performs ~50 group-commit fsyncs/sec, each flushing tens of leaders' log appends.

Range splits, merges, and the placement driver

A static shard layout — pick 1024 ranges at cluster-creation time, never change — does not survive production. Some ranges grow hot (a single key in BharatBazaar's flash-sale becomes the entire write workload of one Raft group); some grow large (an analytical write-heavy customer fills 8 GB into one range); some grow cold (an old quarterly-report partition no one reads anymore). Multi-Raft systems handle this with three runtime operations: split (one range becomes two when its size exceeds a threshold), merge (two adjacent small ranges become one), and rebalance (a range's leader or replica moves to a different node).

Each operation is itself a consensus operation. A range split happens via a special AdminSplit Raft entry that, when committed, creates a new range descriptor for the right-hand side of the key range, picks new replicas, and atomically transfers the right-hand keys' ownership. CockroachDB's split implementation guarantees: at any wall-clock instant, every key is owned by exactly one range; mid-split, requests for "around-the-split-point" keys are forwarded; and the split's commit is durable across the original range's quorum before any traffic shifts. Why splits must be themselves consensus operations: the range descriptor (which range owns which key span, on which replicas, with which leader) is replicated state — every node in the cluster needs to agree on it. A split that one node has applied and another has not means clients querying the two nodes will route to different ranges for the same key, violating linearisability. The split is a Raft entry on the original range's log; it commits when the original range's quorum acks; the new range descriptor is then gossiped to the placement driver, which propagates it cluster-wide via its own consensus log.

Above the per-range Raft groups sits the placement driver (PD in TiKV, the system.range_descriptor table in CockroachDB, the Master in YugabyteDB). The PD is itself a Raft-replicated service — a small group of 3–5 nodes that holds the cluster's range-to-replica mapping, monitors load, and decides which ranges to split, merge, or rebalance. The PD does not handle data path traffic; it only handles metadata. A typical PD running in TiKV at 100k ranges processes ~500 admin operations per second (splits, merges, leader transfers, replica moves) while keeping its own Raft group's write rate under 5k/sec.

Placement driver and the per-range Raft groupsA diagram showing the placement driver as a small Raft-replicated service at the top, holding the range-to-replica map. Below, multiple per-range Raft groups handle data writes. Arrows show the placement driver issuing rebalance / split / merge admin commands, and ranges reporting their size and load metrics back up. The placement driver coordinates; the per-range groups do the work Placement driver (PD) 3–5 node Raft group — metadata only range_id → replicas, leader, size split, merge, rebalance commands size, load, leader heartbeats Range 042 keys [a..f] leader: N7 size: 78 MB Raft group of 3 Range 089 keys [g..n] leader: N12 size: 142 MB Raft group of 3 — splitting Range 256 keys [o..z] leader: N3 size: 12 MB Raft group of 3 — cold PD's job: keep ranges between 64–128 MB; balance leaders across nodes; never lose a quorum. Range 089 will split into 089a/089b at 128 MB threshold. Range 256 is a merge candidate with its right neighbour.
Illustrative — placement driver (top) coordinating per-range Raft groups (bottom). PD is itself a small Raft group; ranges report load and size up; PD issues split / merge / rebalance commands down. Real production clusters have thousands of ranges.

The PD's hardest job is leader balancing. Without intervention, a freshly-elected cluster develops hot-spots — the first node to come up after a network partition wins disproportionately many elections, and ends up hosting 60% of the leaders. The PD periodically transfers leadership using Raft's TimeoutNow / LeadershipTransfer extension: it picks a target replica, asks the current leader to step down and immediately trigger a vote at the target, and the target wins because it received the timeout-now message first. CockroachDB's PD aims for ±5% leader-count variance across nodes; deviations beyond that trigger transfer waves.

Cross-shard transactions — where Multi-Raft hits the consistency wall

Within a single Raft group, transactions are easy: serialise on the leader's log. Across multiple groups, transactions are hard, because each group's log is independent. A transaction that writes to range 042 and range 089 needs both groups to commit or both to abort — but neither group's leader can directly observe the other's log. This is the classic distributed-transaction problem; Multi-Raft systems solve it with two main patterns.

The first pattern is two-phase commit (2PC) layered over Raft. CockroachDB's transaction protocol works this way: a transaction picks one range as the coordinator (the range hosting the first-written key); it writes a transaction record there, then writes intent records to every other touched range. Each intent write is a Raft entry on its own range; the coordinator's transaction record is a Raft entry on the coordinator's range. The transaction commits when the coordinator's record's status flips to COMMITTED — itself a Raft entry. Once that happens, an asynchronous "intent resolution" pass converts intents into final values on every touched range, again via Raft entries. The coordinator's commit is the linearisation point; reads that arrive between intent-write and intent-resolution see the value with a "pending" flag and either wait or push the transaction.

The second pattern is Spanner-style two-phase commit over Paxos with TrueTime. Spanner runs each tablet (its equivalent of a range) as a Paxos group; cross-tablet transactions use 2PC with the participants' Paxos groups as the durable participants. The added ingredient is TrueTime: every transaction is assigned a commit timestamp inside TT.now()'s uncertainty interval, and the coordinator waits out the interval before declaring commit-success — guaranteeing external consistency across regions. CockroachDB approximates this with HLCs (hybrid logical clocks) and explicit timestamp ordering rather than waiting on TrueTime.

Both patterns add latency: a cross-range transaction pays one extra round-trip (the 2PC coordinator's commit) on top of each touched range's Raft round-trip. KapitalKite's market-data team measured this on their CockroachDB cluster — single-range writes p99 of 8 ms, two-range cross-shard transactions p99 of 17 ms, five-range cross-shard transactions p99 of 32 ms. The growth is roughly linear in the number of touched ranges, with a fixed overhead for the coordinator's commit step.

Common confusions

Going deeper

Joint consensus — adding and removing replicas without losing quorum

Multi-Raft systems frequently rebalance: a range's replica moves from node-7 to node-15. Naively, this means transitioning the range's voter set from {N1, N7, N12} to {N1, N12, N15}. If you do this in one step, you risk a moment where the cluster's understanding of "the voter set" is inconsistent — N7 still thinks it's a voter, N15 thinks it's a voter, and a partition between {N7, N1} and {N15, N12} could allow two leaders to be elected.

Raft's solution is joint consensus: a two-phase configuration change. First commit the transitional config Cnew∪Cold = {N1, N7, N12, N15} to the log — during this phase, both quorums ({N1, N7, N12} majority AND {N1, N12, N15} majority) must agree on every entry. Once the joint config is committed, commit the final config Cnew = {N1, N12, N15}. The joint phase is what guarantees no two leaders can be elected during the transition: any candidate must win in both quorums simultaneously, which is impossible across a partition that cuts across the union.

Production Multi-Raft systems use Raft's "single-server membership change" optimisation (Diego Ongaro's PhD thesis Chapter 4) for the common case — adding or removing one node at a time — which is provably safe without joint consensus, and reserve joint consensus for replication-factor changes (RF=3 → RF=5).

TiKV's region split — the actual code path

When a TiKV region exceeds region_split_size_bytes (default 144 MB, with 96 MB target after split), the leader's RaftStore thread proposes a SplitRequest Raft entry. The entry contains: the split key (chosen as the median of the region's key space), the new region IDs, and the new replica assignments. When committed, every replica's apply thread executes the split: it writes a marker indicating "everything below split-key stays with the original region, everything above moves to the new region", creates the new region's metadata, and notifies the PD. The PD then ensures the new region has a leader (initially co-located with the original region's leader, then moved by leader-balancing).

The non-obvious safety property: during the split, requests targeting either side of the split-key may arrive before the split entry is applied locally on a follower. TiKV handles this by buffering such requests in a per-region "pending split" queue, draining them once the split is fully applied. The window is microseconds, but on a hot region it is non-zero and required for linearisability across the split point.

Why YugabyteDB chose tablet-per-table-shard, not range-based

CockroachDB and TiKV use range-based sharding: ranges are slices of the global keyspace, picked dynamically. YugabyteDB instead uses tablet sharding aligned with table partitions: each table is hash-partitioned into a fixed number of tablets at table-creation time. The PD does not split tablets at runtime; instead, the user re-shards offline if they need more parallelism.

The trade-off: range-based sharding adapts to skewed workloads (a hot key can be split out into its own tiny range), but pays the cost of online splits (which are non-trivial — a split during high write load can stall the original range for tens of milliseconds). Tablet sharding is simpler and more predictable, but cannot adapt to surprise hotkeys — if one of your 64 tablets becomes 80% of the workload, you have to take the table offline and re-shard. Both choices are defensible; the choice tracks the team's view of "how often does workload skew happen unexpectedly?".

Reproduce this on your laptop

python3 -m venv .venv && source .venv/bin/activate
pip install --upgrade pip
# save multi_raft_heartbeat_batching.py from the article body
python3 multi_raft_heartbeat_batching.py

# Then explore a real Multi-Raft cluster:
docker run --name tikv-pg -d -p 2379:2379 pingcap/pd:latest
# (full TiKV setup is in the docs at https://tikv.org/docs/dev/deploy/install/test/)
# Watch the placement driver's region count grow as you load data:
curl http://localhost:2379/pd/api/v1/regions/count

Where this leads next

Multi-Raft is the systemic answer to "how do you get write throughput beyond a single Raft group's ceiling". The next chapters cover what comes next — Byzantine settings, leader election machinery, and geo-distributed write paths.

Part 8 is the systems view of consensus; Part 9 explores the leader-and-lease abstractions that sit just above. The placement-driver pattern recurs: in Part 11 (gossip metadata), in Part 12 (consistency models with replication-aware reads), and in Part 17 (geo-aware tablet placement).

References

  1. Ongaro & Ousterhout, "In Search of an Understandable Consensus Algorithm" — USENIX ATC 2014 — the Raft paper. Sections 6 (configuration changes) and 8 (log compaction) are the load-bearing reads for Multi-Raft implementations.
  2. Ongaro, "Consensus: Bridging Theory and Practice" — Stanford PhD thesis 2014 — Chapter 4 (membership changes) covers joint consensus and single-server change.
  3. Huang et al., "TiDB: A Raft-based HTAP Database" — VLDB 2020 — TiKV's Multi-Raft implementation, region split mechanics, and placement-driver design.
  4. Taft et al., "CockroachDB: The Resilient Geo-Distributed SQL Database" — SIGMOD 2020 — Multi-Raft layered with HLC-based 2PC for cross-range transactions.
  5. Corbett et al., "Spanner: Google's Globally-Distributed Database" — OSDI 2012 — the predecessor design with Paxos groups per tablet and TrueTime-coordinated 2PC.
  6. pingcap/tikv — Multi-Raft Go implementation (now Rust) — the raftstore and pd directories are the canonical reading for production Multi-Raft.
  7. cockroachdb/cockroach — Go implementationpkg/kv/kvserver contains the Multi-Raft layer; pkg/kv/kvserver/replica_split_trigger.go is the split-handling code path.
  8. Raft in detail — chapter 52; the per-group protocol that Multi-Raft replicates thousands of times.