State stores: why RocksDB is in every streaming engine
It is 21:40 on a Friday at PhonePe. Asha, the streaming platform on-call, is staring at a Flink TaskManager that has 184 GB of "user → 7-day-rolling-spend" state on its local SSD. The job has been running for six weeks without a checkpoint failure, and the per-key lookup latency on the hot path is 80 microseconds. She did not write that state store. She did not configure a database. She did not deploy Redis. The state store was already inside Flink — the same one that is inside Kafka Streams, the same one that backs Materialize's arrangements, the same one CockroachDB and TiKV use as their storage layer. It is RocksDB, and the reason it shows up under almost every stateful streaming engine is not historical accident — it is that an LSM tree gives a stream processor four properties at once that no other open-source store gives together.
A streaming state store needs to absorb millions of writes per second, support point and range lookups by key, snapshot atomically for checkpoints, and recover by replay — all on local SSD attached to the operator process. RocksDB's log-structured merge tree was designed for exactly that workload, and it is embeddable as a library, not a server. Every major streaming engine — Flink, Kafka Streams, Samza, Faust, Materialize — uses RocksDB or a near-clone for the same reason: write-heavy keyed state with cheap snapshots is the LSM's home turf.
What "state" means inside a stream processor
After the stateless prefix you saw in /wiki/stateless-operators-map-filter-the-easy-part, the job hits a keyBy and starts being stateful. State, in the streaming runtime sense, is anything the operator writes that has to survive a restart so the next event sees a consistent view. The four shapes that show up in real jobs:
- ValueState[K, V] — one value per key. "Last login time per user", "current ride status per driver". Read on every event for that key, written on most.
- MapState[K1, K2, V] — a nested map per key. "Per user, a dict of merchant → spend in last 7 days". Hot path is a single nested lookup.
- ListState[K, V] — an append-only list per key. "Per user, the last 50 events" — used by stream joins and pattern matching.
- AggregatingState[K, V] — a running fold per key. "Per user, count + sum + min + max in current window".
A working PhonePe fraud job has all four in one operator: ValueState for "user's current risk tier", MapState for "merchant → spend last 7 days", ListState for "last 100 transactions", AggregatingState for "spend in current 1-hour tumbling window". For 100 million users, that is roughly 100M × (8 + 200 + 8000 + 64) ≈ 800 GB of working state. Per operator instance — divided by parallelism, but the order of magnitude is the point.
The store has to satisfy six properties at once: high write throughput (every event mutates state), fast point reads (every event reads state first), prefix iteration (MapState, ListState), atomic snapshots (the runtime checkpoints periodically), reasonable space amplification (you can't 10× your storage bill), and embeddable (it has to live in-process, not over the network). Why "embeddable" is non-negotiable: a network round-trip to Redis is roughly 200 microseconds on a good day. At 50k events/sec/operator, that is 50000 × 200µs = 10 seconds of wall time per second of input — the operator falls behind immediately. Local-SSD RocksDB, by contrast, serves a point lookup in 5–80 microseconds depending on cache state. The factor-of-10 gap between in-process and network is what makes the difference between "Flink keeps up" and "Flink falls behind forever".
Why an LSM tree, not a B-tree or a hash map
Every database course teaches B-trees first; every embedded key-value lookup naturally reaches for a hash map. Streaming workloads break both. Walk through why.
A B-tree (used by SQLite, InnoDB, BoltDB, BadgerDB-old) optimises for random-read latency at the cost of write amplification. An update touches a leaf page, a possible parent, and a WAL entry — typically 3–8 page writes for one logical update. At 50k events/sec/operator, that is 200k–400k page writes/sec on the device, which a single SSD can sustain only briefly before write throttling kicks in. A streaming job sees roughly one write per event at the operator level; that is fundamentally different from an OLTP workload that sees one write per transaction commit (and a transaction commit is rare relative to row touches).
A hash map (used by toy implementations, Redis, in-memory state) gives O(1) point lookup but two things kill it for streaming: (a) it sits in RAM only, so 184 GB of state needs 184 GB of RAM per operator instance, which is economically dead at scale; (b) it does not have a snapshot operation that can be performed concurrently with writes — Redis's BGSAVE forks the process and copies pages on write, and Flink's checkpoint protocol cannot rely on that primitive. You'd have to stop-the-world the operator while you snapshot, which violates the runtime's correctness story.
A log-structured merge tree (LSM) does something different. Writes go to an in-memory sorted buffer (the memtable); when the memtable fills, it is flushed to disk as an immutable sorted file (an SSTable, also called an L0 file). Background threads later compact smaller files into larger ones, merging keys and discarding tombstones. Point lookups walk from the memtable through L0 down through Lk, stopping at the first file that contains the key. Range scans stream keys in sorted order across all levels.
The trade-off is well-known: LSM trees give cheap writes at the cost of more expensive reads (you may have to look at multiple levels) and read amplification (the same logical key may be physically present in several files until compaction reclaims them). For a streaming workload — where every event is one write and one read of the same key — the LSM is the correct choice because the write cost dominates and the read cost is mitigated by a large block cache and a memtable that already holds the recently-written keys.
The numerical case: a B-tree on a write-heavy workload has write amplification of roughly 10–20× (every update rewrites a page). A well-tuned RocksDB has write amplification of 5–15× across all compaction levels combined, but the writes themselves are sequential — and SSDs are 50× faster at sequential writes than random writes. Why sequential matters that much: an SSD's flash translation layer (FTL) can absorb a sequential write stream into one erase block, marking it valid in one atomic step. Random writes scatter across many erase blocks, each of which has to be erased and rewritten — the dreaded "write cliff" where SSD performance drops by 10× under sustained random write load. RocksDB never asks the SSD for random writes; that is the entire point.
Embedding RocksDB — the operator-side API
Show the actual API. RocksDB ships as a C++ library with bindings for Java (used by Flink and Kafka Streams), Python (python-rocksdb), Go, and Rust. The Python flavour is enough to internalise what a streaming runtime is doing under the hood:
# state_store_demo.py — embed RocksDB inside an "operator" loop
import rocksdb, json, struct, time, os, shutil
# 1. Open a column-family-aware database. Streaming runtimes use one CF per state name.
opts = rocksdb.Options(create_if_missing=True)
opts.write_buffer_size = 64 * 1024 * 1024 # 64 MB memtable
opts.max_write_buffer_number = 3 # up to 3 memtables before stalling
opts.target_file_size_base = 64 * 1024 * 1024 # L1 file size
opts.max_background_compactions = 4
DB_PATH = "/tmp/phonepe_user_spend"
if os.path.exists(DB_PATH): shutil.rmtree(DB_PATH)
db = rocksdb.DB(DB_PATH, opts)
# 2. Helper: encode (user_id, merchant) -> bytes key. Sorted layout matters for prefix scans.
def k(user_id: int, merchant: str = "") -> bytes:
return struct.pack(">Q", user_id) + merchant.encode("utf-8")
# 3. Simulate a stream of UPI-like events flowing into the operator
events = [
(91_000_001, "swiggy", 1240),
(91_000_001, "flipkart", 8990),
(91_000_001, "bookmyshow", 350),
(91_000_002, "swiggy", 450),
(91_000_002, "flipkart", 1200),
(91_000_001, "swiggy", 560), # second swiggy hit for u_001
(91_000_003, "myntra", 2199),
]
# 4. The "operator": for each event, read the running spend, add, write back
t0 = time.time()
for user_id, merchant, amount in events:
key = k(user_id, merchant)
prior = db.get(key)
total = (struct.unpack(">Q", prior)[0] if prior else 0) + amount
db.put(key, struct.pack(">Q", total))
# 5. Prefix scan: total spend across all merchants for user 91_000_001
prefix = struct.pack(">Q", 91_000_001)
it = db.iterkeys(); it.seek(prefix)
running = 0
for raw in it:
if not raw.startswith(prefix): break
val = db.get(raw)
running += struct.unpack(">Q", val)[0]
# 6. Atomic snapshot — the runtime calls this for every checkpoint
snap = db.snapshot()
snapshot_view = db.get(k(91_000_001, "swiggy"), snapshot=snap)
print(f"per-event latency p50 : {(time.time()-t0)*1e6/len(events):.1f} µs")
print(f"u_91_000_001 swiggy : {struct.unpack('>Q', db.get(k(91_000_001,'swiggy')))[0]} paise")
print(f"u_91_000_001 total : {running} paise")
print(f"snapshotted swiggy : {struct.unpack('>Q', snapshot_view)[0]} paise")
# Sample run on a 2024 Macbook Pro (M3, NVMe SSD):
$ python state_store_demo.py
per-event latency p50 : 41.2 µs
u_91_000_001 swiggy : 1800 paise
u_91_000_001 total : 11140 paise
snapshotted swiggy : 1800 paise
The five lines that decide the streaming-runtime story:
opts.write_buffer_size = 64 MB— the memtable. Every write goes here first, then to the WAL on disk. The runtime tunes this against parallelism; total RAM usage =write_buffer_size × max_write_buffer_number × column_families × parallelism. Why this sizing matters: a too-small memtable means frequent flushes (many small L0 files = expensive compaction); too large means rare flushes (better write throughput) but slower recovery (the WAL is longer to replay). Flink's default is 64 MB, tuned by Confluent and Ververica across hundreds of production deployments.struct.pack(">Q", user_id) + merchant.encode(...)— keys are bytes, and the byte layout determines what range scans are cheap. Big-endian integers sort numerically; appending the merchant name gives sorted-by-(user, merchant) order. The MapState abstraction in Flink builds on exactly this trick — the runtime concatenates theuser_idand the inner key into a single RocksDB byte key, and a "scan all merchants for user U" becomes a prefix iteration.db.put(key, ...)returns in microseconds — because the write only goes to the in-memory memtable + the OS page cache for the WAL. The WALfsyncis batched across many writes; the SSTable flush is asynchronous. This is what gives the LSM its 1M+ writes/sec/instance in benchmarks.db.snapshot()— the most important operation for streaming. It is O(1) and creates a read-consistent view of the database that survives concurrent writes. Flink's checkpoint protocol callssnapshot()synchronously during the barrier alignment, then asynchronously copies the snapshot's SST files to S3 — without blocking the operator from continuing to process events. Why this is unique to LSMs: SST files are immutable. Once written, they are never modified — only deleted by compaction. A snapshot just pins a set of files (preventing their deletion) and a memtable sequence number. Copying those files to S3 is straightforward and cannot interfere with the operator's write path. A B-tree, by contrast, has to use copy-on-write or shadow paging — both of which add complexity and cost.- The prefix scan —
iterkeys()+seek(prefix)walks the merged view across memtable + all SST levels in sorted order. The runtime exposes this asMapState.entries(). The cost is O(matching keys + bloom-filter probes) — efficient because the underlying SSTables are pre-sorted and indexed.
Run the script and the per-event latency lands in the 30–80 microsecond range on a laptop. On a production server with NVMe and a tuned block cache, p50 is closer to 5 microseconds and p99 stays under 200 microseconds even with 200 GB of state.
Why every streaming engine landed on RocksDB
Look at what shipped:
- Apache Flink — RocksDBStateBackend (the recommended backend for any state larger than a few GB; the alternative HashMapStateBackend keeps state in JVM heap and is unusable past ~10 GB).
- Kafka Streams — uses RocksDB by default for every stateful operator (
StreamsBuilder.table(), windowed stores, session stores). - Apache Samza — RocksDB-backed
KeyValueStore. - Faust (Python streams library, used by Robinhood and several Indian fintechs) — RocksDB-backed tables.
- Materialize — uses an LSM-style storage layer for its arrangements (Differential Dataflow's data plane is a similar log-structured store).
- Apache Pinot, ClickHouse (real-time analytics, the cousin of streaming) — both use LSM-style storage at the segment level.
- CockroachDB, TiKV, YugabyteDB (distributed databases that happen to share the workload shape) — RocksDB or its Rust port
rocksdb-rust/Pebbleunderneath.
Convergence on one library is not coincidence. The four properties that streaming needs — write throughput, point + range reads, atomic snapshots, embeddability — are the four properties RocksDB was explicitly designed to deliver. The library is also under active development at Meta, where the original team built it; the production deployment that drives most optimisations (Meta's UDB, ZippyDB) has a workload shape extremely similar to streaming.
The two engines that did not pick RocksDB are instructive. Apache Heron at Twitter built its own state store; the team eventually rewrote on top of LMDB-style storage but reported significant pain at scale. Spark Structured Streaming uses an in-memory state store by default and a custom file-based one for state larger than RAM; it is widely considered the weakest part of Spark's streaming story. The pattern is consistent: rolling your own state store ends with rebuilding RocksDB.
What goes wrong — production failure modes
The state backend is a database, and a database has failure modes. Three real ones from Indian streaming deployments:
The first is compaction stalls. RocksDB has a hard limit on L0 file count (default 36); past that, writes block until compaction catches up. A Razorpay job under load grew L0 to 80 files within 12 minutes of a deploy because the new code emitted 5× more state writes per event; the operator backed up to 4 million events of lag while the operator threads waited on compaction. The fix involves tuning level0_file_num_compaction_trigger, level0_slowdown_writes_trigger, and max_background_compactions, but the deeper diagnosis is "the workload's write rate exceeded the device's compaction throughput" — a property of the SSD, not the application. Choose your compaction triggers, choose your hardware, or choose to write less.
The second is state size grows without bound. The classic case: a ValueState[user_id, last_login_time] for a service that keeps every user forever. Over six months at 100k new users/day, the state hits 18M keys × 80 bytes ≈ 1.5 GB — manageable. Over six years, it is 220M keys × 80 bytes ≈ 18 GB, with no eviction policy. Flink's StateTtlConfig solves this by attaching a TTL to each entry; at compaction time, expired entries are dropped. The mistake teams make is forgetting to set the TTL until the SSD fills — at which point shrinking the state requires either a state migration or a job restart with a smaller retention.
The third is checkpoint duration creep. If the SSD is shared with other tenants on the host, the checkpoint's S3 upload competes for IOPS with the operator's compaction reads. A Swiggy delivery-tracking job had checkpoints grow from 8 seconds to 90 seconds over three months as state grew from 12 GB to 70 GB and a noisy neighbour started sharing the SSD. The fix was a dedicated SSD per TaskManager and an incremental checkpoint mode (state.backend.incremental: true) — only changed SST files get uploaded, not the full state. Most production deployments turn incremental on; it is off by default for historical reasons.
Common confusions
- "RocksDB is just for embedded databases like phones." It started as a fork of LevelDB at Meta in 2012 specifically to handle server-class workloads — high concurrency, multi-terabyte state, rich operational controls. The "embedded" in "embedded library" means in-process, not low-end. Meta's UDB runs petabytes on RocksDB.
- "You can just put state in Redis instead." You can, and several pre-2018 streaming pipelines did. The cost is a network hop on every event, which caps your operator throughput at roughly 1/(network latency) — about 5k events/sec on a good day. RocksDB embedded in-process gives 100k–500k events/sec/operator. The economics force the choice.
- "State stores are the same as databases." They share mechanics but the consistency story differs. A streaming state store is eventually consistent with the source log — the source of truth is the input topic, and the state store is a derived view that can be rebuilt by replay. A database is the source of truth. This is why Flink can rebuild any operator's state from Kafka given enough time; you cannot rebuild Postgres from Postgres's WAL on a different machine without significant ceremony.
- "Compaction is just garbage collection." It is more — compaction also turns many small files into fewer larger files (reducing read amplification), drops tombstones (recovering space), and re-sorts overlapping ranges. GC handles only the third. The compaction strategy (level vs universal vs FIFO) is a major tuning knob; level compaction is the default for streaming because it bounds read amplification.
- "Bigger memtable = better." Up to a point. A 1 GB memtable absorbs writes well but takes proportionally longer to flush, increases recovery time after a crash (more WAL to replay), and increases per-write JVM heap pressure for Flink. Most production deployments stay in the 64–256 MB range per column family.
- "RocksDB on EBS is fine." Network-attached block storage adds 2–5 ms of latency per IO compared to local NVMe's 50–200 µs. For a stateful Flink job, that is the difference between 100k events/sec and 10k events/sec per operator. Production streaming runs on local SSDs (i3, i4i, im4gn instance families on AWS); EBS is only acceptable for development.
Going deeper
LSM compaction strategies — level vs universal vs FIFO
RocksDB ships three compaction strategies and the choice affects everything. Level compaction (the default) keeps each level k roughly 10× the size of level k-1, with non-overlapping key ranges within each level past L0. Read amplification is bounded (one file per level) and write amplification is moderate (5–15×). Universal compaction keeps fewer, larger files; it trades higher read amplification for lower write amplification and is preferred for write-heavy log-style workloads. FIFO compaction simply deletes the oldest files when a size limit is hit; it is used for time-series workloads where the data has a natural retention.
For streaming state, level compaction is the universal choice because point lookups dominate. Kafka Streams sometimes uses universal for changelog-style state where range scans on a single user's history are the hot path. Why the choice matters operationally: a job that switches from level to universal can see a 30% drop in CPU usage (less compaction work) but a 40% increase in tail-read latency (more files to probe per lookup). Run the change in shadow first; never on a Friday afternoon.
Bloom filters — how RocksDB skips files that cannot contain your key
Each SST file ships with a bloom filter, a compact bitmap that answers "could this file contain key K?" with no false negatives and a tunable false-positive rate. RocksDB's default is 10 bits per key, giving roughly 1% false positive — meaning a point lookup that misses the key still has to probe ~1% of files that lie about containing it. The bloom filter sits in the block cache (or, in newer versions, can be partitioned and only the relevant block loaded), so the probe is a memory access, not a disk seek. Without bloom filters, a point miss in a 5-level LSM would require 5 disk seeks; with them, it is one seek on average. The 10-bits-per-key cost — roughly 1% of typical key-value sizes — is the price you pay for sub-millisecond p99 reads on a multi-TB store. There is also a whole-key-filtering toggle in RocksDB that lets you put bloom filters on prefixes only; useful for MapState-style access where the inner key varies but the user_id prefix is what determines file placement.
Incremental checkpoints — why production turns this on
A Flink incremental checkpoint uploads only the SST files that changed since the last checkpoint, not the full state. Because SST files are immutable, "changed" means "newly written or newly compacted" — and the runtime tracks both. For a 200 GB state with a 60-second checkpoint cadence, a full checkpoint moves 200 GB to S3 every minute (≈ 27 Gbps of bandwidth, which the cluster does not have). An incremental checkpoint typically moves 1–5 GB per cadence — the freshly-flushed memtable's L0 files plus any compaction outputs. The trade-off is restore complexity: to recover, you have to download the full chain of SST files (a snapshot is a manifest of file IDs across many checkpoints). Flink handles this transparently; what changes is that "delete an old checkpoint" is no longer a full file delete — it requires reference counting. Most teams turn incremental on, set a reasonable retention (state.checkpoints.num-retained: 3), and never look at the directory structure again.
Why TiKV, CockroachDB, and YugabyteDB also use RocksDB
The same workload shape — high write rate per node, point + range queries, atomic snapshot for replication — appears in distributed transactional databases. CockroachDB used RocksDB until 2020, then forked it as Pebble (a Go-native rewrite that gave them control over the codebase). TiKV uses RocksDB unchanged. YugabyteDB uses a fork called DocDB that adds multi-version concurrency control on top. The convergence is not "everyone copies Meta"; it is "the workload shape forces the choice". Streaming state stores and distributed-database storage layers are the same problem at different abstraction levels — both need the LSM's write throughput and snapshot semantics, both run embedded in-process, both serve point + range queries.
When RocksDB is the wrong choice
Three workloads where it is not. (a) Tiny state — sub-100 MB, kept in JVM heap. The HashMap state backend is faster because it skips serialization. Flink's HashMapStateBackend is the right call here. (b) Read-heavy with rare writes — a B-tree wins (PostgreSQL, MySQL InnoDB). Streaming jobs essentially never have this shape, but data-mart-style stores do. (c) Append-only with no random access — a plain log file beats both. This is what Kafka itself uses for the partition log. The state store sits downstream of the log; the log itself does not need an LSM.
Where this leads next
The next chapter covers the three window shapes that drive most stateful streaming work — /wiki/windowing-tumbling-sliding-session — and shows how each one accumulates state in the RocksDB backend you just internalised. After that, /wiki/event-time-vs-processing-time-the-whole-ballgame introduces the two clocks every streaming job has to reason about, which is where late events and watermarks enter the picture. /wiki/watermarks-how-we-know-were-done closes the loop on "when can we emit a result?".
The mental model to take into those chapters: state lives on local SSD, indexed by an LSM tree, snapshotted by pinning immutable files, and recovered by replay from the input log. Every piece of streaming you read from here on assumes that substrate. Once a job hits 50 GB of state, the operational question stops being "is the algorithm correct?" and becomes "is the state backend keeping up with compaction?" — and the answer to that question lives in RocksDB metrics, not in the job's business logic.
The capstone of Build 8 — /wiki/wall-exactly-once-is-a-lie-everyone-tells — explains how the state store's snapshot interacts with the input log's offset commit to produce the exactly-once semantics every streaming engine markets.
References
- The Log-Structured Merge Tree — O'Neil et al., 1996 — the original paper; predates SSDs but the intuition is the same.
- RocksDB tuning guide — Meta engineering — the canonical reference for
write_buffer_size, compaction triggers, and bloom filters. - Flink RocksDB state backend — Apache Flink documentation — incremental checkpoints, TTL, configuration.
- Kafka Streams state stores — Confluent — how Streams uses RocksDB for KTables and windowed stores.
- Pebble — CockroachDB's RocksDB-in-Go rewrite — a clean-room reimplementation; the README is a tutorial in LSM design choices.
- Differential Dataflow arrangements — McSherry et al. — the storage layer underneath Materialize; LSM-flavoured.
- /wiki/stateless-operators-map-filter-the-easy-part — the previous chapter; the part of the job that does NOT need any of this machinery.
- /wiki/the-append-only-log-simplest-store — the foundational primitive an LSM is layered on top of.