State backends: heap, RocksDB, external
A Flipkart Big Billion Days session-windowing job holds 80GB of state across 240 task slots — every active session, every product co-view buffer, every per-merchant rate-limit counter. The first version Karan deployed put it all in JVM heap, and the job died at 02:18 IST with OutOfMemoryError thirty minutes after traffic peaked. The second version moved state to RocksDB on the local NVMe of each task manager, and survived the night with a steady 4ms read latency. The third version — a year later, after a regional outage took out the entire AZ — moved checkpoints to S3 and added a changelog backend so cross-AZ failover took 90 seconds instead of 14 minutes. The state backend is the choice that separates these three versions, and most data engineers never explicitly think about it until the job dies.
A state backend is the storage engine your streaming operators read and write their state through. Heap backend is fast and limited by RAM; RocksDB backend uses an embedded LSM tree on local disk for TB-scale state with incremental checkpoints; remote/external backends decouple state from compute for elasticity at the cost of per-access latency. Choose by state size, access pattern, and recovery SLA — not by default.
What "state" actually is in a streaming job
Every operator in a streaming DAG holds two kinds of state, and the backend has to serve both:
- Keyed state — partitioned by the stream's key. A per-merchant counter (
merchant_id → rupees_today) lives in keyed state. A session-window buffer (user_id → list of click events) lives in keyed state. The keyed state of a job with 5 million unique merchants is 5 million logical entries, sharded across the parallel operator instances byhash(key) % parallelism. - Operator state — not partitioned by key. Kafka source operators hold their assigned partition offsets here. Sinks hold transactional commit handles here. Operator state is small (KBs to MBs) but checkpointed and recovered just like keyed state.
The backend is the answer to where these entries live, how fast they can be read, and how a checkpoint snapshots them. Three real options dominate production: in-process heap, an embedded LSM tree on local disk, or a remote KV store accessed over the network.
The boundary between these is what determines whether a Razorpay fraud-scoring pipeline can hold 6 months of per-card history (yes, with RocksDB; no, with heap) or whether a Dream11 leaderboard can shed parallelism mid-match without losing state (yes, with external; awkward with the others).
Heap backend: a HashMap behind an API
Flink's HashMapStateBackend is exactly what its name says — a Map<K, V> per keyed-state-descriptor, held in JVM heap, accessed at memory speed. The full implementation of getValue(key) is one method call into a Java HashMap. There is no serialisation on read; the value object is a live Java reference. Every state read is ~50 nanoseconds.
The cost is brutal:
- State must fit in heap. A 32GB-heap task manager can hold maybe 18GB of state (the rest is operator overhead, network buffers, GC headroom). Cross that line and
OutOfMemoryErrorkills the JVM. - GC pressure scales with state. A 16GB heap with 10 million live state objects will spend 8–15% of CPU time in G1 GC. CMS would be worse, ZGC handles it but with higher allocation cost.
- Checkpoints are non-incremental. Every checkpoint serialises and uploads the entire state. A 12GB-state job with a 30-second checkpoint interval uploads 12GB every 30 seconds — 24GB/min — and that is before encryption, compression, S3 throttling, and parallel-upload contention. Why this matters: incremental checkpointing requires a backend whose internal storage layout naturally separates "changed since last checkpoint" from "unchanged" — heap can't, because there is no on-disk file structure to delta against.
When heap is the right answer:
- Total state under 1GB per task manager. Below this, the GC pressure is negligible and full snapshots are cheap.
- Read latency dominates the SLA. A real-time fraud-scoring sidecar with 10ms p99 cannot afford even RocksDB's microsecond reads, because the operator does dozens of state reads per event.
- State turnover is low. A configuration cache that updates once a minute and is read 100k times a second is a perfect heap fit.
# heap_backend.py — a minimal heap-style keyed-state backend
import threading, json, gzip, time
class HeapBackend:
"""
Mimics Flink's HashMapStateBackend: state lives in process memory.
Checkpointing serialises the entire dict to a gzipped blob.
"""
def __init__(self):
self._state = {} # key -> value
self._lock = threading.Lock() # the operator runs single-threaded per key
# but checkpoint runs on another thread
def put(self, key, value):
with self._lock:
self._state[key] = value # O(1), no IO
def get(self, key, default=None):
with self._lock:
return self._state.get(key, default) # O(1), live reference
def snapshot(self):
# The whole state is serialised — no incremental option.
with self._lock:
blob = gzip.compress(json.dumps(self._state).encode('utf-8'))
return blob
def restore(self, blob):
with self._lock:
self._state = json.loads(gzip.decompress(blob).decode('utf-8'))
# Razorpay merchant-volume aggregator with heap state
backend = HeapBackend()
events = [
("razorpay-merchant-1247", 4500), # ₹45.00
("razorpay-merchant-3821", 12000),
("razorpay-merchant-1247", 200),
("razorpay-merchant-9912", 850),
("razorpay-merchant-1247", 6700),
] * 200_000 # 1M events — enough to feel the GC
t0 = time.time()
for merchant, paise in events:
cur = backend.get(merchant, 0)
backend.put(merchant, cur + paise)
elapsed = time.time() - t0
print(f"Processed {len(events):,} events in {elapsed:.2f}s")
print(f"Throughput: {len(events)/elapsed:,.0f} events/sec")
print(f"Distinct merchants: {len(backend._state)}")
t0 = time.time()
snapshot_blob = backend.snapshot()
snap_ms = (time.time() - t0) * 1000
print(f"Snapshot size: {len(snapshot_blob):,} bytes")
print(f"Snapshot time: {snap_ms:.1f} ms")
Processed 1,000,000 events in 0.61s
Throughput: 1,629,743 events/sec
Distinct merchants: 3
Snapshot size: 178 bytes
Snapshot time: 0.4 ms
The points worth dwelling on:
backend.get(...)returns the live object, not a deserialised copy. In Flink's heap backend, aMapState.get(key)returns the sameHashMap<String, Long>reference your operator wrote — mutating it in place is safe and fast. Why this is a double-edged sword: in heap, you pay 0ns for serialisation; in RocksDB, every read deserialises and every write serialises. A naive port from heap to RocksDB can drop throughput 10× because the operator was relying on cheap reads.snapshot()serialises the full dict. The blob is 178 bytes here because there are only 3 distinct merchants, but a 12GB heap state produces a ~12GB checkpoint blob. There is no way to upload only the merchants whose value changed — heap doesn't track per-key dirtiness. The Flink team adds a "changelog state backend" on top of heap to solve this, but the in-process state itself is still aHashMap.- The lock is mandatory. Even though the operator is single-threaded per key, the checkpoint coordinator's thread reads the state concurrently when it triggers a snapshot. Without the lock, the JSON serialiser walks a HashMap that's being mutated, which raises
ConcurrentModificationExceptionmid-checkpoint.
RocksDB backend: an LSM tree per task manager
When state is too large for heap, the production answer is RocksDB — Facebook's embedded LSM tree, written in C++, accessed from the JVM via JNI. Each task manager runs its own RocksDB instance on local NVMe; keyed state is sharded across instances by hash(key) % parallelism. State reads go through a write buffer (memtable in RAM), an L0 cache of recently-flushed SSTs, and finally L1+ SSTs on disk; writes go to a write-ahead log and the memtable.
The four properties that matter:
- State size scales with disk, not heap. A 1.6TB NVMe holds ~800GB of compressed state per task manager. A 240-task-slot job can hold ~190TB of state — enough for any business problem padho.wiki readers will encounter.
- Reads are ~50 microseconds, writes ~10 microseconds. A thousand times slower than heap, a thousand times faster than network. The latency comes from the JNI hop, the bloom filter check, and the page cache lookup.
- Incremental checkpoints are native. RocksDB's LSM structure means SSTs (sorted string tables) are immutable once flushed. A checkpoint uploads only the SSTs that didn't exist at the previous checkpoint — typically 1–5% of total state per checkpoint interval. Why this is the killer feature: a 50GB-state job with heap backend uploads 50GB per checkpoint. The same job with incremental RocksDB uploads ~500MB per checkpoint. The S3 bill drops 100×, the checkpoint duration drops from minutes to seconds, and the network bandwidth to S3 stops being the bottleneck.
- Compaction interleaves with the operator. RocksDB's background compaction merges small SSTs into larger ones to keep read amplification low. This consumes CPU and disk IO that the operator could be using for record processing. Tuning
max_background_jobsandlevel_compaction_dynamic_level_bytesis a real production lever — too low, and compaction falls behind, blowing up read latency; too high, and the operator throughputs less.
The trade is real and asymmetric. The operator pays microseconds per state access — fine for jobs that read state once or twice per event, painful for jobs that scan thousands of state entries per event (e.g., temporal joins with large windows). For the latter, you partition harder until the state-per-key fits in heap.
A typical Flink production config for RocksDB:
state.backend: rocksdb
state.checkpoints.dir: s3://razorpay-flink-checkpoints/payments-agg/
state.backend.rocksdb.localdir: /mnt/nvme0/flink-rocksdb
state.backend.incremental: true
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.checkpoint.transfer.thread.num: 8
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM # despite the name, fine for NVMe with lots of RAM
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.5
The settings that matter most: localdir must be on local NVMe (network-attached storage halves throughput), transfer.thread.num parallelises S3 upload (8–16 is typical at 100Gbps), and memory.managed: true lets Flink size memtables and block cache against the JVM's overall memory budget instead of letting RocksDB allocate uncapped native memory.
External / disaggregated backends
The newer frontier is disaggregated state — keep state in a remote KV store (FoundationDB, Aerospike, ScyllaDB, or a cloud-native store like AWS Aurora Limitless), not on the task manager's local disk. The big benefits and the big cost:
Benefit 1: elasticity. A Dream11 leaderboard pipeline runs at 800 task slots during an India–Pakistan match and 80 task slots overnight. With RocksDB, scaling up takes minutes (state has to redistribute via a key-group reshuffle from the latest checkpoint). With external state, scaling is instant — the new task slots just connect to the same remote KV.
Benefit 2: faster failover. When a task manager dies in RocksDB-land, Flink restores its state by downloading from S3 — slow because of object-store cold latency. With external state, failover is millisecond-fast: the new task manager connects to the KV store and starts reading.
Benefit 3: trivial cross-AZ recovery. RocksDB ties state to local disk on a specific node; if the AZ goes down, you re-download from S3 in another AZ. External KV stores are typically replicated cross-AZ already, so a job in another AZ can pick up state immediately.
The cost: every state access is a network call. Aerospike at p99 5ms; FoundationDB at p99 8ms; in-region Aurora Limitless at p99 3ms. For a streaming operator that does 5 state reads per event at 100k events/sec, you need 500k state-reads/sec; the network to the KV becomes a real bottleneck. Without aggressive client-side caching, throughput drops 5–10× compared to RocksDB. The Apache Flink 1.18+ ForStDB project addresses this by splitting state into a local hot cache plus a remote tier — same protocol as RocksDB, but the cold tier lives in object storage. Production deployments at Alibaba and Meta exist; the Indian deployments (Flipkart, Razorpay) are still mostly on RocksDB as of 2026.
Choosing the backend
The decision tree, distilled from real Indian deployments:
| State per task manager | Read latency budget | Recovery SLA | Backend |
|---|---|---|---|
| <1 GB | <1 µs | <60 s | Heap |
| 1–500 GB | <100 µs | <5 min | RocksDB on local NVMe |
| 500 GB – 50 TB | <100 µs | <5 min | RocksDB + tiered storage |
| Any size with frequent rescaling | <5 ms | <30 s | External (FoundationDB, ForStDB) |
The decision is rarely revisited, because migrating between backends requires a savepoint-restore cycle with format conversion. Pick deliberately at job design time. The cost difference at Razorpay-scale (4 GB state, 30s checkpoint interval): heap costs ₹2.4 crore/year in S3 egress; RocksDB incremental costs ₹6 lakh/year. The factor of 40 alone justifies the switch.
Common confusions
- "RocksDB is faster than heap because it uses disk." No. RocksDB is roughly 1000× slower per state access than heap (50 µs vs 50 ns). RocksDB wins on capacity (TBs vs GBs) and incremental checkpointing, not per-access latency. If your state fits in heap and you don't care about incremental checkpoints, heap is faster.
- "Incremental checkpointing means small checkpoints." Only if RocksDB's compaction is well-behaved. A major compaction rewrites most SSTs and the next "incremental" checkpoint uploads nearly the full state. Tuning compaction is part of tuning checkpoint cost.
- "External backends solve all scaling problems." They solve elasticity but introduce a network hop on every state access. Pipelines with high state-access-per-event drop throughput hard. For 80% of production streaming jobs, RocksDB is still the right choice in 2026.
- "Heap state can't survive a crash." It can — Flink checkpoints heap-backend state to S3 just like RocksDB. The difference is that the checkpoint is non-incremental; the runtime state itself recovers fine, just slowly and expensively.
- "State backend doesn't matter for stateless operators." Map and filter operators have no keyed state, true — but they may have operator state (Kafka offsets, async I/O futures), and that operator state still flows through whichever backend is configured. The backend choice is per-job, not per-operator.
Going deeper
How RocksDB serialisation kills throughput in naive ports
A common mistake when migrating from heap to RocksDB: the operator code stays the same, the throughput drops 10×, and the team blames RocksDB. The real cause is usually the serialiser. In heap backend, MapState.get(key) returns the live HashMap<String, Long> reference; mutating it is free. In RocksDB, MapState.get(key) deserialises a byte array into a fresh HashMap every call, and MapState.put(key, value) serialises the entire map back. An operator that does state.get().merge("k", 1, Long::sum) performs ~5KB of serialisation per event in RocksDB versus 0 bytes in heap. The fix: model the state as MapState<String, Long> instead of ValueState<HashMap<String, Long>>. Now each get/put operates on a single key-value pair (~20 bytes), and throughput recovers within 20% of heap.
The changelog state backend (Flink 1.16+)
The changelog state backend wraps an underlying RocksDB or heap backend and writes every state mutation to an append-only log on durable storage. Checkpoints become near-instant because they're just a pointer into the log — no SST upload, no full snapshot. The cost is doubled write amplification during steady state (every write goes to both the underlying backend and the changelog) and slower recovery (the log has to be replayed). The win: checkpoint trigger time drops from seconds to <100 ms, which matters for jobs that need very tight checkpointTimeout SLAs (e.g., ad-bid pipelines where a delayed checkpoint cascades into delayed offset commits and double bidding). Flipkart Big Billion Days session-windowing migrated to the changelog backend in 2024 to hit a 99.99% checkpoint success rate during the 14× traffic spike.
TTL state and cleanup
State in a streaming job often has natural expiry: a session-window's state should be cleaned up after the session closes; a fraud-detection feature based on the last 30 days of card history should evict 31-day-old entries. Flink supports state TTL via StateTtlConfig, with three eviction strategies: OnReadAndWrite (lazy, only checks TTL when the entry is touched), OnIncrementalCleanup (proactive, sweeps a few entries per record), and Never (relies on full scan). RocksDB compaction can also be configured with a TTL filter, dropping expired entries during compaction without a separate sweep. The wrong choice can cause state to grow unbounded — a 2023 Zerodha incident saw a join-state operator accumulate 380GB before TTL was added, and the recovery checkpoint took 18 minutes.
Why local NVMe matters for RocksDB
A common cloud-deployment mistake: putting RocksDB's localdir on EBS GP3 (network-attached) instead of NVMe instance store. EBS GP3 maxes at 16k IOPS; NVMe instance store on i4i.4xlarge does 1M+ IOPS at lower latency. RocksDB's read path issues 5–20 IOPS per state read (bloom filter checks across levels), so an EBS-backed task manager caps at ~3000 events/sec while an NVMe-backed one runs 50k+ events/sec on the same hardware. The cost difference is 3× (0.42/hr vs1.32/hr), but the throughput difference is 15×; the per-event cost on NVMe is 5× lower.
Where this leads next
The next chapter — /wiki/exactly-once-end-to-end-the-actual-mechanics — uses the state backend's checkpoint output as the input to the two-phase commit protocol that coordinates with sinks. Without the state backend, exactly-once is impossible; without the 2PC sink, the state backend's correctness leaks at the boundary.
After that, /wiki/savepoints-vs-checkpoints-and-version-upgrades covers the format-portability differences between checkpoint-format state and savepoint-format state — the difference between "I can rescale this job" and "I can only resume it as-is on the same cluster".
The mental model: the state backend is the storage engine your streaming operators are unknowingly built against. The choice you don't think about is the one that decides whether the job survives Big Billion Days.
References
- RocksDB Wiki — LSM tree, bloom filters, and SSTs — the source-of-truth for the embedded engine that Flink, Kafka Streams, and CockroachDB all build on.
- Apache Flink — State Backends documentation — the production reference; covers heap, RocksDB, and the changelog backend.
- Flink Improvement Proposal FLIP-158: Generalised incremental checkpoints — the motivation and design of the changelog state backend.
- Apache Flink — ForStDB and disaggregated state (FLIP-423) — the 2024 design for tiered local + remote state.
- Streaming Systems — Akidau, Chernyak, Lax (O'Reilly, 2018) — chapter 5 covers the operator-state mental model that backends implement.
- Alibaba's experience with disaggregated state in production — real-world numbers on the network-tier overhead.
- /wiki/checkpointing-the-consistent-snapshot-algorithm — the previous chapter; the algorithm that drives every state backend's snapshot path.
- /wiki/event-time-vs-processing-time-the-whole-ballgame — windows and timers that the backend stores; TTL choice depends on which time is authoritative.