Note: Company names, engineers, incidents, numbers, and scaling scenarios in this article are hypothetical — even when they resemble real ones. See the full disclaimer.

Percolator: snapshot-isolation transactions on a key-value store

In 2010 a Google paper described how the web index — the inverted document graph that powered the search box on every browser tab in the world — had stopped being rebuilt as a giant batch job and started being maintained incrementally, one document at a time, with thousands of workers concurrently updating shared structures. The system was called Percolator, and the surprising part was not that it scaled. The surprising part was that it offered cross-row, cross-table ACID transactions with snapshot isolation on top of Bigtable, a key-value store with no built-in transactions at all. Percolator did this with a clever protocol that layered two-phase commit onto extra columns in each row, used a single global timestamp oracle for ordering, and turned crash recovery into a lazy clean-up problem that any later transaction could resolve. Years later, the same protocol would resurface as the heart of TiKV, of CockroachDB's earliest design notes, and of every "transactional layer over a KV store" you have ever seen on a job posting. This chapter is about the protocol, why it works, and what it costs.

Percolator is a transactional layer that sits on top of a non-transactional sharded KV store (originally Bigtable) and gives you snapshot-isolation, cross-shard, ACID transactions. The core trick is to store, for every cell, three columns — data, lock, write — and to commit in two phases coordinated by a single primary lock that decides the fate of the whole transaction. Crash recovery is passive: a later reader who finds an abandoned lock walks a one-pointer chain to the primary and decides whether to roll forward or back. The design pays two RPCs per row but needs no central coordinator, no synchronous follower replication for the lock service, and no special transaction-log shard. It is the protocol behind TiKV and a direct ancestor of Spanner.

What Percolator is solving — incremental updates at index scale

Pre-Percolator, Google rebuilt its web index as a series of MapReduce jobs that ran end-to-end every few days. A new page crawled at 14:00 would not appear in search results until the next batch finished, sometimes 36 hours later. The team wanted to trickle new documents into the index continuously, but doing so required updating multiple Bigtable rows atomically — the document row, the inverted-index row for every word in the document, the link-graph row for every outbound URL — and Bigtable only guarantees atomicity within a single row.

The naive fix is a centralised transaction coordinator. That does not scale: the index spans petabytes across thousands of tablet servers, and a single coordinator becomes the bottleneck and the single point of failure. The team's question was: can we build cross-shard ACID transactions without putting any new component on the critical path other than the KV store itself?

Percolator's answer is yes, with three pieces. (1) Each row stores extra metadata columns that record locks and committed write pointers. (2) A single global service called the timestamp oracle (TSO) hands out monotonically increasing timestamps; every transaction picks one at start (start_ts) and another at commit (commit_ts). (3) A two-phase commit protocol uses one chosen row's lock — the primary — as the canonical decider. If the primary lock turns into a committed write pointer, the transaction is committed; if the primary lock is gone with no write, the transaction is aborted. Every other row in the transaction is a secondary that points at the primary and inherits its fate.

Percolator's three columns per cellA single Bigtable row labelled "row: bob" with three columns: data, lock, write. The data column has multiple versioned entries: ts=10 value=10, ts=20 value=8. The lock column shows ts=20 holding a primary lock pointing at itself. The write column shows ts=10 pointing back at data ts=10 with type=PUT, and ts=20 with type=PUT pointing at data ts=20. An annotation explains: a read at ts=15 looks in write column for the largest commit_ts ≤ 15, finds ts=10, follows the pointer to data ts=10, returns value=10. Illustrative. Percolator: three columns per cell — data, lock, write row: bob data column ts=20 value = 8 ts=10 value = 10 multi-version: every write keeps history lock column ts=20 PRIMARY (self) a row with a non-empty lock entry is "in flight" — readers wait or clean write column ts=12 PUT → data@ts=10 ts=22 (none yet — uncommitted) commit_ts → start_ts pointer read at ts=15: largest commit_ts ≤ 15 is 12 → ts=10 Illustrative — a row at the moment a transaction has prewritten ts=20 but not yet committed
Each row stores three columns. data is multi-version. lock records in-flight transactions. write records the commit history — a pointer from commit_ts back to the matching data@start_ts. Reads consult write first, then follow the pointer.

The protocol — Prewrite, then Commit

A Percolator transaction T runs in three logical phases: read, prewrite, commit. The read phase happens inline as the application code calls txn.get(row). The prewrite and commit phases together implement two-phase commit but with the lock-and-write columns standing in for prepare-records and a coordinator log.

Read. At the start of the transaction, T calls the TSO and gets start_ts. Every read at row R inspects R's write column, picks the largest commit_ts ≤ start_ts, follows the pointer into R's data column, and returns that value. If T sees a lock entry on R with timestamp ≤ start_ts, T must either wait for the holder to finish or run lock cleanup (described below) before reading.

Prewrite (phase 1 of 2PC). T picks one row in its write set as the primary (any deterministic choice — usually the lexicographically smallest key). For every row R in the write set, T attempts to write data@start_ts = newvalue and lock@start_ts = (primary_key). The lock entry on the primary points at itself; secondary lock entries point at the primary. The write is conditional: if R already has a lock entry, or a write entry with commit_ts ≥ start_ts, the prewrite fails — T aborts.

Why the primary-pointer trick: in a system without a coordinator log, you need a single source of truth that decides the transaction's fate. Percolator picks the primary row's lock and write columns to be that source of truth. After phase 1, every secondary row carries a pointer "if you want to know whether this transaction committed, ask the primary". The primary's atomic transition from "has a lock" to "has a committed write" is the commit point. Because Bigtable guarantees single-row atomicity, that transition is itself atomic — no consensus needed.

Commit (phase 2 of 2PC). T calls the TSO again to get commit_ts. T issues a single Bigtable conditional-write to the primary row that, atomically, (a) checks the primary's lock@start_ts is still there, (b) writes write@commit_ts = (PUT, start_ts), and (c) deletes lock@start_ts. The instant this single-row mutation succeeds, the transaction is committed. T then asynchronously walks its secondaries and, for each, writes the corresponding write@commit_ts and deletes the lock@start_ts. If T crashes before it finishes the secondaries, that is fine: the secondaries still carry locks pointing at the primary, and any later reader will resolve them.

Why this is correct under crash: the primary's atomic transition is the linearisation point. Before it, T is uncommitted; after it, T is committed. There is no intermediate state where T is "half committed" — Bigtable single-row atomicity guarantees that. Secondaries that still have stale locks are just waiting for someone to finalise them, and finalising is idempotent.

Percolator prewrite and commit message-sequence chartThree vertical timelines: client, primary row (row=bob), secondary row (row=alice). The client first calls the TSO and gets start_ts=20. Then prewrite messages go to bob (with PRIMARY flag) and alice (with pointer to bob). Both rows write data@20 and lock@20. The client calls TSO and gets commit_ts=25. The client sends commit-primary to bob: bob atomically writes write@25=PUT,20 and deletes lock@20. This is labelled "commit point". Then the client asynchronously sends commit-secondary to alice: alice writes write@25 and deletes lock@20. Annotations on the right note: if the client crashes after the commit point but before the secondary commit, the alice row carries a stale lock; a later reader resolves it by checking bob. Illustrative. Percolator: prewrite + commit (2PC over Bigtable) client primary row (bob) secondary row (alice) prewrite (PRIMARY, start_ts=20) prewrite (→ bob, start_ts=20) data@20 lock@20 data@20 lock@20→bob commit-primary (commit_ts=25) write@25=PUT,20 delete lock@20 COMMIT POINT commit-secondary (async) write@25 delete lock@20 If client crashes after commit-primary, secondary lock points at bob — a later reader resolves it
The single-row atomic update at the primary is the linearisation point. Secondaries are best-effort cleanup that any later reader can finish.

A working Percolator simulator

This is a reduced Percolator written against an in-memory Bigtable-like store. It implements the three columns, prewrite, commit, and lock-cleanup. KapitalKite uses a derivative of this protocol on TiKV under their order book — the simulation here mirrors the shape of that production code.

# Percolator simulator — three columns per cell, primary-lock 2PC.
# Realistic enough to demonstrate snapshot isolation and crash recovery.
import itertools, threading, time
from collections import defaultdict

class KV:
    def __init__(self):
        self.data, self.lock, self.write = defaultdict(dict), {}, defaultdict(dict)
        self.mu = threading.Lock()

class TSO:                                         # global timestamp oracle
    def __init__(self): self._c = itertools.count(10)
    def now(self): return next(self._c)

class Txn:
    def __init__(self, kv, tso):
        self.kv, self.tso = kv, tso
        self.start_ts = tso.now()
        self.writes = {}                           # row -> new value, staged

    def get(self, row):
        with self.kv.mu:
            if row in self.kv.lock and self.kv.lock[row][0] <= self.start_ts:
                raise Conflict(f"row {row} locked by {self.kv.lock[row]}")
            commits = [c for c in self.kv.write[row] if c <= self.start_ts]
            if not commits: return None
            return self.kv.data[row][self.kv.write[row][max(commits)][1]]

    def put(self, row, value): self.writes[row] = value

    def commit(self):
        if not self.writes: return True
        keys = sorted(self.writes); primary = keys[0]
        # PHASE 1 — prewrite every row.
        with self.kv.mu:
            for k in keys:
                if k in self.kv.lock or any(c >= self.start_ts for c in self.kv.write[k]):
                    self._abort(keys); raise Conflict(f"prewrite {k} conflict")
                self.kv.data[k][self.start_ts] = self.writes[k]
                self.kv.lock[k] = (self.start_ts, primary)
        # PHASE 2 — commit primary atomically, then secondaries asynchronously.
        commit_ts = self.tso.now()
        with self.kv.mu:
            assert self.kv.lock[primary] == (self.start_ts, primary)
            self.kv.write[primary][commit_ts] = ("PUT", self.start_ts)
            del self.kv.lock[primary]              # ← THE COMMIT POINT
        for k in keys[1:]:
            with self.kv.mu:
                self.kv.write[k][commit_ts] = ("PUT", self.start_ts)
                del self.kv.lock[k]
        return True

    def _abort(self, keys):
        with self.kv.mu:
            for k in keys:
                self.kv.data[k].pop(self.start_ts, None)
                if self.kv.lock.get(k, (None,))[0] == self.start_ts: del self.kv.lock[k]

class Conflict(Exception): pass

# DEMO — Riya transfers ₹500 from alice → bob, atomically across two rows.
kv, tso = KV(), TSO()
kv.data['alice'][1] = 1500; kv.write['alice'][2] = ("PUT", 1)
kv.data['bob'][1] = 200;    kv.write['bob'][2]  = ("PUT", 1)
t = Txn(kv, tso)
assert t.get('alice') == 1500 and t.get('bob') == 200
t.put('alice', 1000); t.put('bob', 700); t.commit()
print("alice:", Txn(kv, tso).get('alice'), "bob:", Txn(kv, tso).get('bob'))

Realistic output:

alice: 1000 bob: 700

A line-by-line walkthrough. KV holds the three columns as Python dicts: data[row][ts], lock[row] (single in-flight lock per row), write[row][commit_ts]. TSO.now() is the timestamp oracle — in production this is a single replicated service handing out one timestamp per RPC. Txn.__init__ snapshots start_ts at construction; every get reads as-of that timestamp. get first checks for a conflicting lock with start_ts ≤ self.start_ts (a lock from an earlier or concurrent transaction blocks reads), then walks write[row] for the largest commit_ts ≤ self.start_ts, then dereferences into data. commit picks the lexicographically smallest row as primary, prewrites every row (writing data@start_ts and acquiring lock@start_ts), then performs the single critical linedel self.kv.lock[primary] after writing the primary's write@commit_ts. That single mutation is the commit point. Secondaries are then cleaned up; if the process crashed after the commit point, the next reader on a secondary would see a stale lock pointing at primary, look up primary's write column, find the commit, and finalise the secondary. The DEMO transfers ₹500 atomically across two rows; both reads after commit see the new values.

Crash recovery — passive, by any later reader

The most elegant part of Percolator is what happens when a client crashes mid-transaction. There is no central coordinator log to replay, no recovery daemon — recovery is passive, performed by any later transaction that happens to touch a row with a stale lock.

The rule is: when reader R encounters a lock on row K with timestamp ts_l, R looks at the lock's primary_key field and walks to the primary row P. R then inspects P's lock and write columns:

  • Case 1 — P has write@commit_ts for ts_l. The transaction committed. R writes K's write@commit_ts = (PUT, ts_l) and deletes K's lock. The transaction is now finalised on K too.
  • Case 2 — P has no lock@ts_l and no write for ts_l. The transaction was aborted (or its primary was already cleaned by another reader). R deletes K's data@ts_l and K's lock.
  • Case 3 — P still has lock@ts_l. The transaction is genuinely in-flight. R checks the lock's TTL — if expired, R rolls the primary forward (case 1) or back (case 2) by inspecting whether write@commit_ts was ever written. If TTL is fresh, R waits.

Why this terminates: each step either reduces the number of rows with stale locks (cases 1 and 2 finalise K) or waits a bounded TTL. There is no infinite loop because the primary's state is monotonic — once write@commit_ts exists, it stays; once lock@ts_l is gone without a corresponding write, it stays gone. Two readers concurrently resolving the same lock both produce the same outcome; the operations are idempotent.

This is the design's most consequential property: no special recovery component. The system runs entirely on Bigtable single-row atomicity plus the TSO. Crash recovery is just the cost of one extra primary-lookup RPC for the unlucky reader who first touches a stale lock.

Common confusions

  • "Percolator is just 2PC." Yes and no. The two phases (prewrite, commit) map directly onto 2PC's prepare and commit. But Percolator has no coordinator process and no coordinator log — the primary row's lock and write columns are the entire coordinator state, and Bigtable's single-row atomicity is the entire commit log. Classical 2PC needs a durable coordinator that survives crashes; Percolator does not.
  • "The TSO is a single point of failure." It is a single point of coordination, not of failure — the TSO is itself a Paxos/Raft replicated service, but only ever doles out monotonic timestamps, so its state is trivial and its replicas re-election is fast. p99 throughput in the original paper was ~2M timestamps/sec from a single replica with batching; today's TiKV PD does ≥1M/sec routinely.
  • "Snapshot isolation is the same as serialisable." It is not. Percolator's default is snapshot isolation, which permits write skew — two transactions that read overlapping data and write disjoint cells can both commit even though no serial order produces the same result. Spanner and CockroachDB add serialisable certifiers on top; vanilla Percolator does not.
  • "Locks are held while the client thinks." Yes — between prewrite and commit, the row holds a lock, and any concurrent transaction touching it must wait or abort. If the client takes 30 s to reach commit, the row is unavailable for 30 s. Production Percolator uses small TTLs (~10 s) and aggressive lock cleanup to bound this.
  • "Reads are lock-free." Reads see locks but typically resolve them lazily — they do not acquire any lock themselves. A read at start_ts walks the write column and the data column; it touches lock only to check if there is a conflict.

Going deeper

The original paper and what it understated

Peng & Dabek's Large-scale Incremental Processing Using Distributed Transactions and Notifications (OSDI 2010) introduced Percolator and reported that the new continuous-update Caffeine indexer reduced web-page indexing latency by ~50% relative to the previous batch system. What the paper understated is the lock contention cost under hotspot workloads: if many transactions touch the same primary row (a counter row, a popular product, a celebrity user), prewrite contention serialises them. TiKV's later additions — async commit, parallel commit, 1PC for single-row writes — are largely about chiselling that overhead away.

TiKV's evolution: from Percolator to async commit

TiKV (the storage engine behind TiDB) shipped a Percolator-faithful implementation in 2017, then added two important optimisations. Async commit lets the client return success after prewrite-all + commit-primary, without waiting for the secondary cleanup; the cost is that subsequent reads pay the cleanup. 1PC detects single-region transactions and skips prewrite entirely, dropping the protocol to one Raft round. Both reduce the median commit latency from ~2 RTT to ~1 RTT for the common case. The trade-off is that async commit must be careful about the min_commit_ts published to readers, so a reader's snapshot does not slip past an in-progress async commit.

Spanner's TrueTime ancestry — Percolator is the older sibling

Percolator and Spanner are both Google internal — Percolator predates Spanner by about three years. Spanner replaces the TSO with TrueTime, a hardware-backed bounded-uncertainty clock that lets Spanner pick commit_ts without contacting a central oracle. The protocol is otherwise structurally similar: prewrite, commit-primary, commit-secondaries. Spanner adds external consistency (linearisable snapshots) at the cost of waiting out the TrueTime uncertainty window — typically 5–10 ms — at commit. Percolator has no such wait, but offers only snapshot isolation, not external consistency. See /wiki/truetime-spanner-and-physical-logical-hybrids.

KapitalKite's order-book on TiKV — production tuning that mattered

KapitalKite runs its order-book ledger on a TiKV cluster of 64 nodes across three AZs. The default Percolator commit path was producing p99 commit latency of 18 ms under peak (NSE pre-open at 9:08 IST). Three changes brought it to 7 ms p99: (1) enable async commit globally, dropping the secondary-wait from the critical path; (2) reduce lock TTL from the default 3 s to 800 ms because their transaction logic never holds a transaction open longer than 100 ms (so 800 ms is comfortably above p99.99 of legitimate execution time, and aggressive enough that a crashed client's locks clear quickly); (3) co-locate the PD timestamp leader with the primary AZ for the order-book, eliminating one cross-AZ RTT from start_ts and commit_ts acquisition. The third change saved ~3 ms by itself. None of these are deep — but each required understanding the protocol enough to know which knobs were safe to turn.

Where this leads next

Percolator sits at a crossroads in this curriculum. It is the canonical example of transactions on top of a non-transactional store — the technique used by TiKV, Foundation Layer, and several internal systems at large scale. It is also the design that was generalised by Spanner into externally-consistent, multi-region transactions via TrueTime. And it is the protocol that production teams reach for when they want ACID without paying for a full SQL database.

The next chapters in Part 14 cover Calvin (deterministic transaction ordering), Spanner's TrueTime, and deterministic databases as an alternative to 2PC. Each is a different way to answer the same question Percolator answered in 2010: how do you commit across shards without a single coordinator on the critical path?

If you are operating Percolator-derived systems in production, the next operational chapters cover /wiki/2pc-in-detail-including-failure-modes, /wiki/sagas-forward-and-compensating, and the broader design space around long-lived transactions.

References