State stores: why RocksDB is in every streaming engine

It is 21:40 on a Friday at PhonePe. Asha, the streaming platform on-call, is staring at a Flink TaskManager that has 184 GB of "user → 7-day-rolling-spend" state on its local SSD. The job has been running for six weeks without a checkpoint failure, and the per-key lookup latency on the hot path is 80 microseconds. She did not write that state store. She did not configure a database. She did not deploy Redis. The state store was already inside Flink — the same one that is inside Kafka Streams, the same one that backs Materialize's arrangements, the same one CockroachDB and TiKV use as their storage layer. It is RocksDB, and the reason it shows up under almost every stateful streaming engine is not historical accident — it is that an LSM tree gives a stream processor four properties at once that no other open-source store gives together.

A streaming state store needs to absorb millions of writes per second, support point and range lookups by key, snapshot atomically for checkpoints, and recover by replay — all on local SSD attached to the operator process. RocksDB's log-structured merge tree was designed for exactly that workload, and it is embeddable as a library, not a server. Every major streaming engine — Flink, Kafka Streams, Samza, Faust, Materialize — uses RocksDB or a near-clone for the same reason: write-heavy keyed state with cheap snapshots is the LSM's home turf.

What "state" means inside a stream processor

After the stateless prefix you saw in /wiki/stateless-operators-map-filter-the-easy-part, the job hits a keyBy and starts being stateful. State, in the streaming runtime sense, is anything the operator writes that has to survive a restart so the next event sees a consistent view. The four shapes that show up in real jobs:

  1. ValueState[K, V] — one value per key. "Last login time per user", "current ride status per driver". Read on every event for that key, written on most.
  2. MapState[K1, K2, V] — a nested map per key. "Per user, a dict of merchant → spend in last 7 days". Hot path is a single nested lookup.
  3. ListState[K, V] — an append-only list per key. "Per user, the last 50 events" — used by stream joins and pattern matching.
  4. AggregatingState[K, V] — a running fold per key. "Per user, count + sum + min + max in current window".

A working PhonePe fraud job has all four in one operator: ValueState for "user's current risk tier", MapState for "merchant → spend last 7 days", ListState for "last 100 transactions", AggregatingState for "spend in current 1-hour tumbling window". For 100 million users, that is roughly 100M × (8 + 200 + 8000 + 64) ≈ 800 GB of working state. Per operator instance — divided by parallelism, but the order of magnitude is the point.

The four state shapes inside a streaming operatorFour boxes side by side showing ValueState (one value per key), MapState (nested map), ListState (append-only list), and AggregatingState (running fold). Each box shows the per-key cell layout and a typical access pattern. The four state shapes a stream processor needs ValueState one value / key u_91 → tier_A u_92 → tier_B u_93 → tier_A point lookup put / get / clear ~80 bytes / cell MapState nested map / key u_91 → swiggy: 1240 u_91 → flipkart: 8990 u_91 → bms: 350 prefix scan + point put / get / iter ~2 KB / parent key ListState append-only list u_91[0] → t_2401 u_91[1] → t_2402 u_91[2] → t_2403 range scan add / iter / clear ~80 KB / parent key AggregatingState running fold / key u_91 → sum=2491 u_91 → cnt=14 u_91 → max=8990 read-modify-write add / get ~64 bytes / cell
Every stateful operator in production uses some combination of these four. The numbers are ballpark per-key memory; multiply by 100 million users and you see why "just put it in a HashMap" fails before the day shift.

The store has to satisfy six properties at once: high write throughput (every event mutates state), fast point reads (every event reads state first), prefix iteration (MapState, ListState), atomic snapshots (the runtime checkpoints periodically), reasonable space amplification (you can't 10× your storage bill), and embeddable (it has to live in-process, not over the network). Why "embeddable" is non-negotiable: a network round-trip to Redis is roughly 200 microseconds on a good day. At 50k events/sec/operator, that is 50000 × 200µs = 10 seconds of wall time per second of input — the operator falls behind immediately. Local-SSD RocksDB, by contrast, serves a point lookup in 5–80 microseconds depending on cache state. The factor-of-10 gap between in-process and network is what makes the difference between "Flink keeps up" and "Flink falls behind forever".

Why an LSM tree, not a B-tree or a hash map

Every database course teaches B-trees first; every embedded key-value lookup naturally reaches for a hash map. Streaming workloads break both. Walk through why.

A B-tree (used by SQLite, InnoDB, BoltDB, BadgerDB-old) optimises for random-read latency at the cost of write amplification. An update touches a leaf page, a possible parent, and a WAL entry — typically 3–8 page writes for one logical update. At 50k events/sec/operator, that is 200k–400k page writes/sec on the device, which a single SSD can sustain only briefly before write throttling kicks in. A streaming job sees roughly one write per event at the operator level; that is fundamentally different from an OLTP workload that sees one write per transaction commit (and a transaction commit is rare relative to row touches).

A hash map (used by toy implementations, Redis, in-memory state) gives O(1) point lookup but two things kill it for streaming: (a) it sits in RAM only, so 184 GB of state needs 184 GB of RAM per operator instance, which is economically dead at scale; (b) it does not have a snapshot operation that can be performed concurrently with writes — Redis's BGSAVE forks the process and copies pages on write, and Flink's checkpoint protocol cannot rely on that primitive. You'd have to stop-the-world the operator while you snapshot, which violates the runtime's correctness story.

A log-structured merge tree (LSM) does something different. Writes go to an in-memory sorted buffer (the memtable); when the memtable fills, it is flushed to disk as an immutable sorted file (an SSTable, also called an L0 file). Background threads later compact smaller files into larger ones, merging keys and discarding tombstones. Point lookups walk from the memtable through L0 down through Lk, stopping at the first file that contains the key. Range scans stream keys in sorted order across all levels.

RocksDB LSM tree anatomy — how a write becomes a durable fileA vertical layered diagram showing the memtable at the top, a write-ahead log to the side, and disk levels L0 through L4 below. Arrows show the path of a write (memtable then flush then compaction) and the path of a read (memtable then L0 then L1 onwards). An LSM tree turns 50k random writes/sec into sequential SSD writes memtable in-RAM skiplist, ~64 MB WAL (write-ahead log) sequential append, fsync writes hit RAM + WAL flush when full L0 SST 0.1 SST 0.2 SST 0.3 files may overlap in key range L1 SST 1.1 (a..m) SST 1.2 (n..z) L2 SST 2.1 ... 2.10 — 10× larger than L1 L3 SST 3.1 ... 3.100 — 10× larger than L2 compaction read path: memtable → L0 (newest first) → L1 → L2 → ... — bloom filter skips files where key cannot exist
Writes are sequential (memtable fill, then flush). Reads can touch multiple levels but bloom filters and block caches keep p99 in microseconds. Compaction is the background work that pays for the cheap writes.

The trade-off is well-known: LSM trees give cheap writes at the cost of more expensive reads (you may have to look at multiple levels) and read amplification (the same logical key may be physically present in several files until compaction reclaims them). For a streaming workload — where every event is one write and one read of the same key — the LSM is the correct choice because the write cost dominates and the read cost is mitigated by a large block cache and a memtable that already holds the recently-written keys.

The numerical case: a B-tree on a write-heavy workload has write amplification of roughly 10–20× (every update rewrites a page). A well-tuned RocksDB has write amplification of 5–15× across all compaction levels combined, but the writes themselves are sequential — and SSDs are 50× faster at sequential writes than random writes. Why sequential matters that much: an SSD's flash translation layer (FTL) can absorb a sequential write stream into one erase block, marking it valid in one atomic step. Random writes scatter across many erase blocks, each of which has to be erased and rewritten — the dreaded "write cliff" where SSD performance drops by 10× under sustained random write load. RocksDB never asks the SSD for random writes; that is the entire point.

Embedding RocksDB — the operator-side API

Show the actual API. RocksDB ships as a C++ library with bindings for Java (used by Flink and Kafka Streams), Python (python-rocksdb), Go, and Rust. The Python flavour is enough to internalise what a streaming runtime is doing under the hood:

# state_store_demo.py — embed RocksDB inside an "operator" loop
import rocksdb, json, struct, time, os, shutil

# 1. Open a column-family-aware database. Streaming runtimes use one CF per state name.
opts = rocksdb.Options(create_if_missing=True)
opts.write_buffer_size = 64 * 1024 * 1024     # 64 MB memtable
opts.max_write_buffer_number = 3              # up to 3 memtables before stalling
opts.target_file_size_base = 64 * 1024 * 1024 # L1 file size
opts.max_background_compactions = 4

DB_PATH = "/tmp/phonepe_user_spend"
if os.path.exists(DB_PATH): shutil.rmtree(DB_PATH)
db = rocksdb.DB(DB_PATH, opts)

# 2. Helper: encode (user_id, merchant) -> bytes key. Sorted layout matters for prefix scans.
def k(user_id: int, merchant: str = "") -> bytes:
    return struct.pack(">Q", user_id) + merchant.encode("utf-8")

# 3. Simulate a stream of UPI-like events flowing into the operator
events = [
    (91_000_001, "swiggy",     1240),
    (91_000_001, "flipkart",   8990),
    (91_000_001, "bookmyshow",  350),
    (91_000_002, "swiggy",      450),
    (91_000_002, "flipkart",   1200),
    (91_000_001, "swiggy",      560),  # second swiggy hit for u_001
    (91_000_003, "myntra",     2199),
]

# 4. The "operator": for each event, read the running spend, add, write back
t0 = time.time()
for user_id, merchant, amount in events:
    key   = k(user_id, merchant)
    prior = db.get(key)
    total = (struct.unpack(">Q", prior)[0] if prior else 0) + amount
    db.put(key, struct.pack(">Q", total))

# 5. Prefix scan: total spend across all merchants for user 91_000_001
prefix = struct.pack(">Q", 91_000_001)
it = db.iterkeys(); it.seek(prefix)
running = 0
for raw in it:
    if not raw.startswith(prefix): break
    val = db.get(raw)
    running += struct.unpack(">Q", val)[0]

# 6. Atomic snapshot — the runtime calls this for every checkpoint
snap = db.snapshot()
snapshot_view = db.get(k(91_000_001, "swiggy"), snapshot=snap)

print(f"per-event latency p50  : {(time.time()-t0)*1e6/len(events):.1f} µs")
print(f"u_91_000_001 swiggy    : {struct.unpack('>Q', db.get(k(91_000_001,'swiggy')))[0]} paise")
print(f"u_91_000_001 total     : {running} paise")
print(f"snapshotted swiggy     : {struct.unpack('>Q', snapshot_view)[0]} paise")
# Sample run on a 2024 Macbook Pro (M3, NVMe SSD):
$ python state_store_demo.py
per-event latency p50  : 41.2 µs
u_91_000_001 swiggy    : 1800 paise
u_91_000_001 total     : 11140 paise
snapshotted swiggy     : 1800 paise

The five lines that decide the streaming-runtime story:

Run the script and the per-event latency lands in the 30–80 microsecond range on a laptop. On a production server with NVMe and a tuned block cache, p50 is closer to 5 microseconds and p99 stays under 200 microseconds even with 200 GB of state.

Why every streaming engine landed on RocksDB

Look at what shipped:

Convergence on one library is not coincidence. The four properties that streaming needs — write throughput, point + range reads, atomic snapshots, embeddability — are the four properties RocksDB was explicitly designed to deliver. The library is also under active development at Meta, where the original team built it; the production deployment that drives most optimisations (Meta's UDB, ZippyDB) has a workload shape extremely similar to streaming.

The two engines that did not pick RocksDB are instructive. Apache Heron at Twitter built its own state store; the team eventually rewrote on top of LMDB-style storage but reported significant pain at scale. Spark Structured Streaming uses an in-memory state store by default and a custom file-based one for state larger than RAM; it is widely considered the weakest part of Spark's streaming story. The pattern is consistent: rolling your own state store ends with rebuilding RocksDB.

What goes wrong — production failure modes

The state backend is a database, and a database has failure modes. Three real ones from Indian streaming deployments:

The first is compaction stalls. RocksDB has a hard limit on L0 file count (default 36); past that, writes block until compaction catches up. A Razorpay job under load grew L0 to 80 files within 12 minutes of a deploy because the new code emitted 5× more state writes per event; the operator backed up to 4 million events of lag while the operator threads waited on compaction. The fix involves tuning level0_file_num_compaction_trigger, level0_slowdown_writes_trigger, and max_background_compactions, but the deeper diagnosis is "the workload's write rate exceeded the device's compaction throughput" — a property of the SSD, not the application. Choose your compaction triggers, choose your hardware, or choose to write less.

The second is state size grows without bound. The classic case: a ValueState[user_id, last_login_time] for a service that keeps every user forever. Over six months at 100k new users/day, the state hits 18M keys × 80 bytes ≈ 1.5 GB — manageable. Over six years, it is 220M keys × 80 bytes ≈ 18 GB, with no eviction policy. Flink's StateTtlConfig solves this by attaching a TTL to each entry; at compaction time, expired entries are dropped. The mistake teams make is forgetting to set the TTL until the SSD fills — at which point shrinking the state requires either a state migration or a job restart with a smaller retention.

The third is checkpoint duration creep. If the SSD is shared with other tenants on the host, the checkpoint's S3 upload competes for IOPS with the operator's compaction reads. A Swiggy delivery-tracking job had checkpoints grow from 8 seconds to 90 seconds over three months as state grew from 12 GB to 70 GB and a noisy neighbour started sharing the SSD. The fix was a dedicated SSD per TaskManager and an incremental checkpoint mode (state.backend.incremental: true) — only changed SST files get uploaded, not the full state. Most production deployments turn incremental on; it is off by default for historical reasons.

Common confusions

Going deeper

LSM compaction strategies — level vs universal vs FIFO

RocksDB ships three compaction strategies and the choice affects everything. Level compaction (the default) keeps each level k roughly 10× the size of level k-1, with non-overlapping key ranges within each level past L0. Read amplification is bounded (one file per level) and write amplification is moderate (5–15×). Universal compaction keeps fewer, larger files; it trades higher read amplification for lower write amplification and is preferred for write-heavy log-style workloads. FIFO compaction simply deletes the oldest files when a size limit is hit; it is used for time-series workloads where the data has a natural retention.

For streaming state, level compaction is the universal choice because point lookups dominate. Kafka Streams sometimes uses universal for changelog-style state where range scans on a single user's history are the hot path. Why the choice matters operationally: a job that switches from level to universal can see a 30% drop in CPU usage (less compaction work) but a 40% increase in tail-read latency (more files to probe per lookup). Run the change in shadow first; never on a Friday afternoon.

Bloom filters — how RocksDB skips files that cannot contain your key

Each SST file ships with a bloom filter, a compact bitmap that answers "could this file contain key K?" with no false negatives and a tunable false-positive rate. RocksDB's default is 10 bits per key, giving roughly 1% false positive — meaning a point lookup that misses the key still has to probe ~1% of files that lie about containing it. The bloom filter sits in the block cache (or, in newer versions, can be partitioned and only the relevant block loaded), so the probe is a memory access, not a disk seek. Without bloom filters, a point miss in a 5-level LSM would require 5 disk seeks; with them, it is one seek on average. The 10-bits-per-key cost — roughly 1% of typical key-value sizes — is the price you pay for sub-millisecond p99 reads on a multi-TB store. There is also a whole-key-filtering toggle in RocksDB that lets you put bloom filters on prefixes only; useful for MapState-style access where the inner key varies but the user_id prefix is what determines file placement.

Incremental checkpoints — why production turns this on

A Flink incremental checkpoint uploads only the SST files that changed since the last checkpoint, not the full state. Because SST files are immutable, "changed" means "newly written or newly compacted" — and the runtime tracks both. For a 200 GB state with a 60-second checkpoint cadence, a full checkpoint moves 200 GB to S3 every minute (≈ 27 Gbps of bandwidth, which the cluster does not have). An incremental checkpoint typically moves 1–5 GB per cadence — the freshly-flushed memtable's L0 files plus any compaction outputs. The trade-off is restore complexity: to recover, you have to download the full chain of SST files (a snapshot is a manifest of file IDs across many checkpoints). Flink handles this transparently; what changes is that "delete an old checkpoint" is no longer a full file delete — it requires reference counting. Most teams turn incremental on, set a reasonable retention (state.checkpoints.num-retained: 3), and never look at the directory structure again.

Why TiKV, CockroachDB, and YugabyteDB also use RocksDB

The same workload shape — high write rate per node, point + range queries, atomic snapshot for replication — appears in distributed transactional databases. CockroachDB used RocksDB until 2020, then forked it as Pebble (a Go-native rewrite that gave them control over the codebase). TiKV uses RocksDB unchanged. YugabyteDB uses a fork called DocDB that adds multi-version concurrency control on top. The convergence is not "everyone copies Meta"; it is "the workload shape forces the choice". Streaming state stores and distributed-database storage layers are the same problem at different abstraction levels — both need the LSM's write throughput and snapshot semantics, both run embedded in-process, both serve point + range queries.

When RocksDB is the wrong choice

Three workloads where it is not. (a) Tiny state — sub-100 MB, kept in JVM heap. The HashMap state backend is faster because it skips serialization. Flink's HashMapStateBackend is the right call here. (b) Read-heavy with rare writes — a B-tree wins (PostgreSQL, MySQL InnoDB). Streaming jobs essentially never have this shape, but data-mart-style stores do. (c) Append-only with no random access — a plain log file beats both. This is what Kafka itself uses for the partition log. The state store sits downstream of the log; the log itself does not need an LSM.

Where this leads next

The next chapter covers the three window shapes that drive most stateful streaming work — /wiki/windowing-tumbling-sliding-session — and shows how each one accumulates state in the RocksDB backend you just internalised. After that, /wiki/event-time-vs-processing-time-the-whole-ballgame introduces the two clocks every streaming job has to reason about, which is where late events and watermarks enter the picture. /wiki/watermarks-how-we-know-were-done closes the loop on "when can we emit a result?".

The mental model to take into those chapters: state lives on local SSD, indexed by an LSM tree, snapshotted by pinning immutable files, and recovered by replay from the input log. Every piece of streaming you read from here on assumes that substrate. Once a job hits 50 GB of state, the operational question stops being "is the algorithm correct?" and becomes "is the state backend keeping up with compaction?" — and the answer to that question lives in RocksDB metrics, not in the job's business logic.

The capstone of Build 8 — /wiki/wall-exactly-once-is-a-lie-everyone-tells — explains how the state store's snapshot interacts with the input log's offset commit to produce the exactly-once semantics every streaming engine markets.

References