In short
After thirteen chapters of Raft — leader election, log replication, safety invariants, snapshots, membership changes, Multi-Raft sharding, and operating etcd — the right closing thought is the one most engineers take longest to internalise. Raft is not a database. Paxos is not a database. ZAB is not a database. None of these protocols know what a key, a value, a row, or a transaction is. They know one thing: how to take a stream of opaque byte-string commands submitted by clients and get a fault-tolerant majority of nodes to agree on a single total order in which to apply them.
What you do with that ordered stream is up to you. Feed it to a dict[str, str] with a switch statement on PUT/GET/DELETE and you have etcd. Feed it to a single integer with INCR/DECR and you have a distributed counter. Feed it to a tree of ephemeral nodes and you have ZooKeeper. Feed it to a SQL execution engine and you have a Spanner tablet. Feed it to a topic of records and you have a Kafka partition. Same log, different state machine, different product.
Three consequences fall out the moment you accept this. First, the state machine must be strictly deterministic — no random(), no wall-clock reads inside apply, no iteration over Python dict insertion order, no floating-point that might differ across CPUs. Second, all side effects come after consensus — you cannot send the email, charge the card, or write to S3 until the entry is committed, because a leader that proposes an entry might still lose it to a new leader if it crashes mid-replication. Third, the working set is bounded by what you can replay from a snapshot — design log compaction and snapshot serialisation from day one, not as a bolt-on a year later when your WAL is 200 GB.
The tempting wrong design — "let's run a separate Paxos instance per key so different keys can commit in parallel" — is the most common architectural mistake junior teams make in this space. It scatters consensus across thousands of independent quorums, multiplies the leader-election overhead by the keyspace size, and breaks the single property you most want from a coordinator: a single global order across all keys. The correct design is the one CockroachDB, TiKV, and Spanner all converged on: one ordered log per shard, sharded by key range, with a small number of large Raft groups (Multi-Raft from ch.106) rather than a vast number of tiny ones.
This chapter closes Build 13. Build 14 picks up the next problem: when a single shard's log is not enough — when a transaction must touch keys living in different shards — you need 2PC or Percolator stitched on top of multiple shard-logs.
You have just spent thirteen chapters writing Raft. You implemented the leader-election state machine, the AppendEntries RPC, the commit-index advance, the snapshot-and-install dance, the joint-consensus reconfiguration, the lease-based read optimisation, and the Multi-Raft sharding glue. You are looking at three thousand lines of Python that, if you ran it on five EC2 instances, would survive any minority of crashes and never lose a committed write.
And then someone asks you, reasonably, "OK, so what is this thing? Is it a database?"
The honest answer is no. What you have built is an agreement engine. It takes byte-string commands as input and produces, as output, an indexed, totally-ordered, durably-stored sequence of those same byte strings, with the property that every honest replica eventually sees the same sequence. Why this distinction matters: the moment you call your Raft cluster "a database," you start expecting it to know about keys, values, indexes, transactions, schemas — and it knows about none of those things. It knows about an array of bytes. The "database" is the program you write that interprets those bytes.
This article is the conceptual closer for Build 13. It is short on new mechanism and long on the mental shift that, once it lands, makes the entire ecosystem of consensus-based systems suddenly legible. etcd, ZooKeeper, Kafka, CockroachDB, Spanner, Vault, Consul, KRaft — all of them are the same shape with different state machines bolted on top.
The layering
The picture to carry around in your head has three strata.
The bottom layer — what we have spent Build 13 building — is the replicated log. It is the only layer that requires Raft's ten thousand lines of subtle correctness reasoning. Once an entry sits at index i and the cluster has reported committed_index >= i, the layer's contract is: every replica will eventually see exactly that byte string at exactly that index, no matter what crashes happen next.
The middle layer is the state machine. It is a deterministic function apply(state, entry) -> (state', response). Why "deterministic" is doing all the work here: if apply is deterministic, then induction on the log gives you that all replicas, after applying any prefix of the log, are in byte-identical state. No coordination needed beyond what the log itself provides. If apply is non-deterministic, the whole edifice collapses — replicas drift, reads become non-linearizable, and you wake up at 3 AM trying to reconcile two histories.
The top layer is the application API — the verbs your clients actually call. client.put("name", "Alice") does not go directly into the log. It gets encoded into a command tuple, that tuple is serialized to bytes, those bytes are submitted as a Raft log entry, and only after the entry commits and apply runs does the response flow back to the client. From the client's perspective they called put. Underneath, six round trips and five disk fsyncs happened.
The whole point of separating these three layers is that the bottom layer does not change as you build different products on top. The same Raft library — etcd-io/raft in Go, hashicorp/raft, tikv/raft-rs, your own Python implementation from chapters 100-106 — backs all the systems below.
The same log, three different services
The reason this layering matters is that it makes a class of "different products" turn out to be the same product with different middle layers. Look at three concrete state machines fed by the same log.
This is not a thought experiment. Look at the production landscape:
- etcd's state machine is a sorted-key MVCC dictionary with leases and watches.
- ZooKeeper's state machine is a hierarchical tree of znodes with ephemeral and sequential semantics.
- Kafka's KRaft mode uses a Raft log to drive a state machine that holds topic metadata, partition assignments, and broker registrations.
- CockroachDB and TiKV use one Raft group per key range, and each range's state machine is a slice of a sorted KV store.
- Spanner's per-tablet state machine is a SQL execution engine with multi-version timestamps.
- Apache BookKeeper writes the log itself as the product — the state machine is a no-op, the log is the storage.
- HashiCorp Vault in HA mode runs a Raft-backed state machine over secrets.
- MongoDB's primary-secondary replication is conceptually the same — the oplog is the log, the BSON document store is the state machine — though implemented with a different protocol.
Every one of these is the same shape. Once you see it once, you cannot unsee it.
Kafka is worth pausing on because it is the most explicit about the log being the product. Why Kafka feels different but is not: every Kafka topic-partition is itself an ordered, replicated log, and consumers are free to materialise their own state machines off it — a Postgres CDC table, an Elasticsearch index, an in-memory cache. Kafka is the rare system that exposes the log directly to applications instead of hiding it behind a state machine. The architectural insight — "the log is the universal abstraction" — is the central thesis of Jay Kreps's I Heart Logs and the LinkedIn engineering blog posts that became the foundation of the modern streaming ecosystem.
Three consequences you cannot ignore
The "log plus deterministic state machine" model is liberating, but it carries three operational constraints that bite if you ignore them.
1. Determinism is mandatory, not aspirational
Every replica must reach the same state after applying the same prefix of the log. That means apply cannot consult anything that varies across replicas. The list of forbidden things is longer than people expect:
time.time(),time.monotonic(),datetime.now()— the wall clock is local. If your command needs a timestamp, the leader must stamp it before submission and every replica readsentry.timestamp, never the local clock.random.random(),uuid.uuid4()— the PRNG state is per-process. Generate IDs on the leader and put them in the payload.- Iteration over a Python
dict,set, orfrozensetwhose hash depends onPYTHONHASHSEED— the seed is per-process and randomised by default. Sort before iterating, or use anOrderedDict. - Any I/O — reading a file, calling an external API, querying a database. Whatever the leader needs from the outside world it does before logging the command, and the result becomes part of the payload.
- Most floating-point inside the state machine. Modern x86 and ARM mostly agree on IEEE 754, but historical anomalies — x87 80-bit intermediates, FMA, denormal handling — bite. Stick to integers and decimals where possible.
The trick that production RSM frameworks all converge on is separating the non-deterministic preparation from the deterministic application. The leader does the dirty work (clock read, random ID generation, external API call), embeds the results in the command payload, then submits to the log. Every replica's apply sees exactly the same payload and produces exactly the same state transition.
2. Side effects come AFTER consensus, never before
Imagine the temptation: a client sends CHARGE_CARD card_xyz amount_500, your leader handles it, you call Stripe immediately to charge the card, then you log the operation. Fast, low latency, what could go wrong?
What goes wrong: your leader crashes after charging Stripe but before the log entry replicates to a majority. A new leader is elected. The old leader's stale entry never commits. From the perspective of the cluster, the charge never happened — but Stripe debited the customer. Why this is the rule with no exceptions: a leader can have entries in its local log that never become committed. Raft explicitly allows for this — a freshly-elected leader from a different term will overwrite uncommitted suffixes. If your apply (or any code before apply) talks to the outside world, you have just created a class of bug where the world has been changed but the cluster has no record of it.
The discipline is: the only thing the leader does on receiving a command is propose it to the log. External effects happen inside apply, only after the entry has been committed (acknowledged by a majority and known to be in the log forever). For things that genuinely cannot be replayed — a real-world charge, an email send, an SMS — the standard pattern is the outbox: record the intent in the state machine, then have a single follower or the leader read committed-but-unsent rows from the outbox and execute them with idempotency keys. If the same row is processed twice (because of leader failover), the idempotency key makes the second attempt a no-op.
3. Snapshot from day one, not when the disk fills
The log grows forever in principle. In practice you compact it: at some point you take a snapshot of the state machine and truncate every log entry below that snapshot's index. New replicas catch up by transferring the snapshot first, then replaying the tail of the log.
Two design decisions follow immediately. The state machine must be serialisable. If your apply accumulates state into Python objects with circular references and lambda closures, you cannot snapshot it. Plan from the start: state must be a plain serialisable structure — dict, list, integer, string, bytes, or a typed schema (protobuf, msgpack, JSON) — that round-trips through dumps/loads without losing fidelity.
The state machine size must be bounded. Whatever you put in there has to fit in memory on every replica, has to serialise to disk for snapshots, has to transfer over the network when a new replica catches up. A coordination service holds megabytes; a per-shard SQL state machine holds at most a few gigabytes; nothing in this layer holds terabytes. Why this matters at the architectural level: when you find yourself wanting to put 500 GB of user data into a single Raft group, the answer is not a bigger snapshot — it is to shard. CockroachDB caps each range at 512 MB and runs thousands of Raft groups; that is Multi-Raft, and the per-group bounded-state property is what makes it operationally tractable.
The tempting wrong design — per-key consensus
Now the architectural trap. You have ten million keys; writes to different keys are independent; you want to scale write throughput. The naive engineer's instinct: spin up a separate Paxos or Raft instance per key. Different keys can commit in parallel, no shared bottleneck, perfect scaling.
This is wrong in a way that takes a while to feel.
The first problem with per-key consensus is bookkeeping. Every Raft group has a leader, a heartbeat stream every 50 ms, an election timer that fires under partial network failures, and a per-replica log. Ten million Raft groups means ten million leaders, ten million heartbeat streams, ten million election timers. The aggregate background traffic — even with no client load — saturates the network. CockroachDB engineers ran into this in 2017 and spent considerable effort on heartbeat coalescing precisely because Multi-Raft was generating too much chatter even at thousands of groups, let alone millions.
The second problem is lost ordering across keys. Consensus inside one Raft group gives you a total order over operations on that group's log. Two independent Raft groups give you no relative ordering between events in group A and events in group B. If your application ever cares that "the user's account was credited before the order was created" — and it always does, eventually — per-key consensus has thrown away the property that would have made that easy.
The third problem is cross-key transactions become impossible without bolting on a separate protocol. With one log per key, even an atomic two-key write requires two-phase commit between the two key's logs, with all the complexity that entails. With one log per shard of many keys, multi-key writes within a shard are free — they ride a single log entry.
The right design is the one Multi-Raft (ch.106) introduced: shard the keyspace into ranges, one Raft group per range, with each range holding many keys. CockroachDB caps ranges at 512 MB and splits when they grow too large. TiKV does the same. Spanner uses Paxos-per-tablet with similar bounds. The result is a few thousand consensus groups instead of millions, plenty of parallelism for write throughput, total order within a range for free, and a clean place to bolt 2PC for cross-range operations.
A worked example — same log, different state machine
The whole argument condenses into one piece of code. Build a tiny KV store on top of a Raft log; then change ten lines and you have a counter; change ten more and you have a lock service. The Raft layer never knows the difference.
One log, three services
The shape every consensus-driven service takes: a Replica holding a log and a state machine, plus an apply function that decodes one log entry and updates state. Swapping the apply function is the only thing that distinguishes a KV store from a counter from a lock service.
import json
from typing import Any
# ---- The transport: a Raft log. We mock it for clarity, but in production
# this is etcd-io/raft, hashicorp/raft, or your implementation from ch.100-106.
# The contract is: append(payload_bytes) blocks until the entry is committed
# (a majority has acked it), then returns its index. apply_committed() is
# called by the framework with each committed entry in order.
class RaftLog:
"""Stand-in for a real Raft library. In production you wire your apply()
callback into the library's commit-notification interface."""
def __init__(self):
self.entries: list[bytes] = []
def propose(self, payload: bytes) -> int:
# Real Raft: replicate to majority, fsync, then return index.
self.entries.append(payload)
return len(self.entries)
# ---- Service 1: KV store. State is a dict; apply dispatches on op type.
class KVStateMachine:
def __init__(self):
self.state: dict[str, str] = {}
def apply(self, payload: bytes) -> Any:
cmd = json.loads(payload)
op = cmd['op']
if op == 'PUT':
self.state[cmd['k']] = cmd['v']
return 'OK'
if op == 'DEL':
self.state.pop(cmd['k'], None)
return 'OK'
if op == 'CAS':
if self.state.get(cmd['k']) == cmd['expected']:
self.state[cmd['k']] = cmd['new']
return True
return False
raise ValueError(f"unknown op {op!r}")
# ---- Service 2: Counter. State is one integer; same log, different apply.
class CounterStateMachine:
def __init__(self):
self.value: int = 0
def apply(self, payload: bytes) -> int:
cmd = json.loads(payload)
if cmd['op'] == 'INCR':
self.value += cmd['delta']
elif cmd['op'] == 'DECR':
self.value -= cmd['delta']
elif cmd['op'] == 'RESET':
self.value = cmd['to']
else:
raise ValueError(f"unknown op {cmd['op']!r}")
return self.value
# ---- Service 3: Lock service. State is a {lock_name: owner} dict.
class LockStateMachine:
def __init__(self):
self.holders: dict[str, str] = {}
def apply(self, payload: bytes) -> bool:
cmd = json.loads(payload)
if cmd['op'] == 'ACQUIRE':
if cmd['name'] not in self.holders:
self.holders[cmd['name']] = cmd['client_id']
return True
return False # someone else holds it
if cmd['op'] == 'RELEASE':
if self.holders.get(cmd['name']) == cmd['client_id']:
del self.holders[cmd['name']]
return True
return False
raise ValueError(f"unknown op {cmd['op']!r}")
# ---- The wiring is identical for all three.
class Replica:
def __init__(self, state_machine):
self.log = RaftLog()
self.sm = state_machine
self.applied = 0
def submit(self, cmd: dict) -> Any:
# 1. Encode. 2. Propose to Raft (blocks until committed). 3. Apply.
# In real Raft, apply happens via a background loop on every replica
# that reads committed entries in order. We inline it here for clarity.
payload = json.dumps(cmd).encode()
idx = self.log.propose(payload)
result = self.sm.apply(self.log.entries[idx - 1])
self.applied = idx
return result
# ---- Run all three on the same machinery.
if __name__ == '__main__':
kv = Replica(KVStateMachine())
print(kv.submit({'op': 'PUT', 'k': 'name', 'v': 'Alice'})) # OK
print(kv.submit({'op': 'CAS', 'k': 'name',
'expected': 'Alice', 'new': 'Bob'})) # True
print(kv.sm.state) # {'name': 'Bob'}
counter = Replica(CounterStateMachine())
print(counter.submit({'op': 'INCR', 'delta': 7})) # 7
print(counter.submit({'op': 'INCR', 'delta': 35})) # 42
print(counter.submit({'op': 'DECR', 'delta': 2})) # 40
locks = Replica(LockStateMachine())
print(locks.submit({'op': 'ACQUIRE',
'name': 'db_lock', 'client_id': 'A'})) # True
print(locks.submit({'op': 'ACQUIRE',
'name': 'db_lock', 'client_id': 'B'})) # False
print(locks.submit({'op': 'RELEASE',
'name': 'db_lock', 'client_id': 'A'})) # True
Three services, one wiring. The Replica class does not know whether it is running a KV store, a counter, or a lock. The RaftLog certainly does not. Only the apply function — the deterministic decoder of opaque bytes into state transitions — is service-specific, and each is under thirty lines.
This is exactly how production systems are factored. The etcd-io/raft Go library exposes a Node interface; you wire your state machine in via two methods (Apply and Snapshot) and the library handles everything else. hashicorp/raft exposes a FSM interface with the same two methods. tikv/raft-rs is the same shape. The agreement engine is a library. Your product is the apply function.
A real implementation adds two things this sketch elides. First, every replica's apply-loop runs independently — the leader does not call apply on followers; followers consume committed entries from their own log and apply locally. Second, snapshots: when the log gets long, the framework asks the state machine to serialise itself, then truncates the log below that snapshot's index. New replicas catch up by transferring the snapshot first, then replaying the tail. Both etcd-io/raft and hashicorp/raft provide hooks for this; your state machine implements Snapshot() and Restore(reader) and the rest is bookkeeping.
Common confusions
- "Raft is a database." No. Raft is an agreement engine that produces an ordered log of opaque byte strings. The database — or counter, or lock service, or queue — is a deterministic state machine you write that decodes those bytes and updates in-memory state.
- "My state machine can call out to Stripe / send an email / write to S3." Not before the entry is committed, and very carefully even after. Side effects must be idempotent and recovered through an outbox pattern, because crashes between commit and effect can repeat the effect.
- "Per-key Paxos scales better than one big log." It does not. Bookkeeping (heartbeats, elections, leader-state per group) dominates the actual work; ordering across keys is lost; cross-key transactions become impossibly hard. Shard into a small number of large groups instead.
- "Snapshots are an optimisation I can defer." They are an architectural commitment. If your state machine is not designed to be serialised from day one, retrofitting snapshots later is painful and sometimes impossible. Decide your snapshot format with your first apply function.
- "All consensus protocols give me linearizability for free." They give you a totally ordered log. Linearizability of reads additionally requires that read paths consult the leader (or use a consistency mechanism like read-index or lease reads). A naive follower read returns whatever the follower has applied so far, which can lag the latest commit.
Going deeper
Lamport's framing — events in a distributed system
Leslie Lamport's 1978 Time, Clocks, and the Ordering of Events in a Distributed System is the paper that made this entire decomposition thinkable. The core insight is that distributed correctness reduces to ordering — given a way to put events in a single agreed sequence, replicated computation becomes possible. State machine replication is the constructive realisation of that idea. Every paper in the field, including Raft, traces back to this one.
Schneider's canonical formulation
Fred Schneider's 1990 Implementing Fault-Tolerant Services Using the State Machine Approach (ACM Computing Surveys 22(4)) is the textbook formulation. It defines a state machine as the four-tuple (states, initial state, commands, deterministic transition function), distinguishes the agreement and order requirements, and discusses the Byzantine variant. The vocabulary — "state machine," "command," "agreement," "order" — that the rest of the literature uses comes from this paper.
The log as the universal abstraction
Jay Kreps's 2013 LinkedIn Engineering post The Log: What every software engineer should know about real-time data's unifying abstraction, expanded into the book I Heart Logs (O'Reilly 2014), is the modern restatement of Schneider's argument in the language of streaming systems. Kafka is the architectural embodiment: the log is exposed to clients directly as the product, and downstream state machines (databases, indexes, caches) are built on top by anyone who wants one. Reading this after the present chapter makes the connection between consensus systems and streaming systems obvious.
FoundationDB — the log as a service inside a database
FoundationDB is unusual in that it explicitly factors the log out as an internal microservice (the transaction log layer) separate from both the storage layer and the resolver layer. The 2021 SIGMOD paper FoundationDB: A Distributed Unbundled Transactional Key Value Store by Zhou et al. is worth reading because it makes the layering visible inside a single database product — log, state machine, transaction coordinator, all as separable services.
Viewstamped Replication — Paxos's quieter cousin
Brian Oki and Barbara Liskov's 1988 Viewstamped Replication: A New Primary Copy Method to Support Highly-Available Distributed Systems is contemporaneous with Paxos and arguably easier to implement, but did not get the same publicity. Liskov and Cowling's 2012 update Viewstamped Replication Revisited is the cleanest statement; reading it alongside the Raft paper is illuminating because the two algorithms converge on essentially the same primary-backup-with-view-change structure from independent starting points.
Where this leads next
Build 13 ends here. You now have, conceptually if not yet in production-ready code, the entire consensus story: a fault-tolerant log, a deterministic state machine bolted on top, the operational rules that keep them honest, and the architectural pattern (Multi-Raft) that lets the model scale to terabytes by sharding instead of growing single groups.
Build 14 starts with the question Build 13 cannot answer: what about transactions across shards? Each shard has its own log, its own leader, its own ordered sequence of operations. A transaction that touches keys in shard A and shard B needs both logs to either commit the operation or reject it — and to do so atomically, despite the two shards having no direct consensus relationship.
The classic answer is two-phase commit, layered on top of the per-shard logs as a coordinator-driven protocol. Each shard's local commit is a Raft log entry; the coordinator's "decide commit" or "decide abort" is also a Raft log entry (in its own coordinator log). The combination — local consensus per shard, plus a coordinator orchestrating prepare and commit messages — gives you ACID across shards. The cost is latency (two extra round trips) and a famous failure mode (the coordinator dying after prepare but before commit, leaving participants in limbo).
The more modern answer is Percolator (Google, 2010) — Spanner's predecessor — which uses snapshot isolation timestamps and a clever per-row primary-lock pattern to eliminate the coordinator's single-point-of-failure. CockroachDB and TiKV both ship variants of Percolator on top of their Multi-Raft layers. Spanner uses a TrueTime-based variant that depends on synchronised clocks.
We will build all of that in chapters 109-114. The single sentence to carry forward from this chapter is: consensus protocols give you a fault-tolerant ordered log; everything else — the database, the counter, the lock service, the queue — is the deterministic state machine you write on top, and the discipline of writing that state machine correctly (determinism, post-consensus side effects, bounded snapshot-able state) is what separates a working RSM from a 3-AM corruption incident.
References
- Lamport, Time, Clocks, and the Ordering of Events in a Distributed System, Communications of the ACM 21(7), 1978 — the foundational paper. Section 4 sketches the state machine approach: replicate events in a total order consistent with happens-before, and replicas converge.
- Schneider, Implementing Fault-Tolerant Services Using the State Machine Approach: A Tutorial, ACM Computing Surveys 22(4), 1990 — the canonical formalisation of state machine replication, including agreement and order requirements, determinism constraints, and Byzantine variants.
- Ongaro and Ousterhout, In Search of an Understandable Consensus Algorithm (Extended Version), USENIX ATC 2014 — the Raft paper, explicit about the log/state-machine separation and about the apply-callback contract that production Raft libraries expose.
- Oki and Liskov, Viewstamped Replication: A New Primary Copy Method to Support Highly-Available Distributed Systems, PODC 1988; and Liskov and Cowling, Viewstamped Replication Revisited, MIT-CSAIL-TR-2012-021 — the Paxos contemporary, and the cleaner modern restatement; useful as a second viewpoint on the same problem.
- Kreps, The Log: What every software engineer should know about real-time data's unifying abstraction, LinkedIn Engineering 2013 — the modern statement of the log-is-the-universal-abstraction thesis, and the architectural foundation underneath Kafka.
- Zhou et al., FoundationDB: A Distributed Unbundled Transactional Key Value Store, SIGMOD 2021 — a database designed with the log/state-machine/coordinator factoring made explicit; reading it after this chapter shows the layering inside a real production system.