State backends: heap, RocksDB, external

A Flipkart Big Billion Days session-windowing job holds 80GB of state across 240 task slots — every active session, every product co-view buffer, every per-merchant rate-limit counter. The first version Karan deployed put it all in JVM heap, and the job died at 02:18 IST with OutOfMemoryError thirty minutes after traffic peaked. The second version moved state to RocksDB on the local NVMe of each task manager, and survived the night with a steady 4ms read latency. The third version — a year later, after a regional outage took out the entire AZ — moved checkpoints to S3 and added a changelog backend so cross-AZ failover took 90 seconds instead of 14 minutes. The state backend is the choice that separates these three versions, and most data engineers never explicitly think about it until the job dies.

A state backend is the storage engine your streaming operators read and write their state through. Heap backend is fast and limited by RAM; RocksDB backend uses an embedded LSM tree on local disk for TB-scale state with incremental checkpoints; remote/external backends decouple state from compute for elasticity at the cost of per-access latency. Choose by state size, access pattern, and recovery SLA — not by default.

What "state" actually is in a streaming job

Every operator in a streaming DAG holds two kinds of state, and the backend has to serve both:

The backend is the answer to where these entries live, how fast they can be read, and how a checkpoint snapshots them. Three real options dominate production: in-process heap, an embedded LSM tree on local disk, or a remote KV store accessed over the network.

Three state backend topologiesA side-by-side diagram showing three task managers. The first holds state in JVM heap. The second holds state in an embedded RocksDB on local SSD with the JVM beside it. The third holds a small cache in JVM and proxies most reads to a remote key-value store. Heap (left) — RocksDB (centre) — External (right) Task Manager (heap) JVM heap HashMap<K,V> all state in RAM read: ~50 ns cap: heap size cp: full snapshot Task Manager (RocksDB) JVM cache RocksDB SSTs on local NVMe read: ~50 µs cap: disk size cp: incremental Task Manager (external) JVM tiny cache remote KV FoundationDB / Aerospike network hop read: ~1 ms cap: ∞ cp: pointer only
Three backends, three latency tiers, three checkpoint stories. The choice ripples through every other production decision: parallelism, RAM sizing, recovery SLA, S3 cost, AZ topology.

The boundary between these is what determines whether a Razorpay fraud-scoring pipeline can hold 6 months of per-card history (yes, with RocksDB; no, with heap) or whether a Dream11 leaderboard can shed parallelism mid-match without losing state (yes, with external; awkward with the others).

Heap backend: a HashMap behind an API

Flink's HashMapStateBackend is exactly what its name says — a Map<K, V> per keyed-state-descriptor, held in JVM heap, accessed at memory speed. The full implementation of getValue(key) is one method call into a Java HashMap. There is no serialisation on read; the value object is a live Java reference. Every state read is ~50 nanoseconds.

The cost is brutal:

  1. State must fit in heap. A 32GB-heap task manager can hold maybe 18GB of state (the rest is operator overhead, network buffers, GC headroom). Cross that line and OutOfMemoryError kills the JVM.
  2. GC pressure scales with state. A 16GB heap with 10 million live state objects will spend 8–15% of CPU time in G1 GC. CMS would be worse, ZGC handles it but with higher allocation cost.
  3. Checkpoints are non-incremental. Every checkpoint serialises and uploads the entire state. A 12GB-state job with a 30-second checkpoint interval uploads 12GB every 30 seconds — 24GB/min — and that is before encryption, compression, S3 throttling, and parallel-upload contention. Why this matters: incremental checkpointing requires a backend whose internal storage layout naturally separates "changed since last checkpoint" from "unchanged" — heap can't, because there is no on-disk file structure to delta against.

When heap is the right answer:

# heap_backend.py — a minimal heap-style keyed-state backend
import threading, json, gzip, time

class HeapBackend:
    """
    Mimics Flink's HashMapStateBackend: state lives in process memory.
    Checkpointing serialises the entire dict to a gzipped blob.
    """
    def __init__(self):
        self._state = {}                     # key -> value
        self._lock = threading.Lock()        # the operator runs single-threaded per key
                                              # but checkpoint runs on another thread

    def put(self, key, value):
        with self._lock:
            self._state[key] = value          # O(1), no IO

    def get(self, key, default=None):
        with self._lock:
            return self._state.get(key, default)  # O(1), live reference

    def snapshot(self):
        # The whole state is serialised — no incremental option.
        with self._lock:
            blob = gzip.compress(json.dumps(self._state).encode('utf-8'))
        return blob

    def restore(self, blob):
        with self._lock:
            self._state = json.loads(gzip.decompress(blob).decode('utf-8'))

# Razorpay merchant-volume aggregator with heap state
backend = HeapBackend()
events = [
    ("razorpay-merchant-1247",  4500),  # ₹45.00
    ("razorpay-merchant-3821", 12000),
    ("razorpay-merchant-1247",   200),
    ("razorpay-merchant-9912",   850),
    ("razorpay-merchant-1247",  6700),
] * 200_000   # 1M events — enough to feel the GC

t0 = time.time()
for merchant, paise in events:
    cur = backend.get(merchant, 0)
    backend.put(merchant, cur + paise)
elapsed = time.time() - t0

print(f"Processed {len(events):,} events in {elapsed:.2f}s")
print(f"Throughput: {len(events)/elapsed:,.0f} events/sec")
print(f"Distinct merchants: {len(backend._state)}")

t0 = time.time()
snapshot_blob = backend.snapshot()
snap_ms = (time.time() - t0) * 1000
print(f"Snapshot size: {len(snapshot_blob):,} bytes")
print(f"Snapshot time: {snap_ms:.1f} ms")
Processed 1,000,000 events in 0.61s
Throughput: 1,629,743 events/sec
Distinct merchants: 3
Snapshot size: 178 bytes
Snapshot time: 0.4 ms

The points worth dwelling on:

RocksDB backend: an LSM tree per task manager

When state is too large for heap, the production answer is RocksDB — Facebook's embedded LSM tree, written in C++, accessed from the JVM via JNI. Each task manager runs its own RocksDB instance on local NVMe; keyed state is sharded across instances by hash(key) % parallelism. State reads go through a write buffer (memtable in RAM), an L0 cache of recently-flushed SSTs, and finally L1+ SSTs on disk; writes go to a write-ahead log and the memtable.

The four properties that matter:

  1. State size scales with disk, not heap. A 1.6TB NVMe holds ~800GB of compressed state per task manager. A 240-task-slot job can hold ~190TB of state — enough for any business problem padho.wiki readers will encounter.
  2. Reads are ~50 microseconds, writes ~10 microseconds. A thousand times slower than heap, a thousand times faster than network. The latency comes from the JNI hop, the bloom filter check, and the page cache lookup.
  3. Incremental checkpoints are native. RocksDB's LSM structure means SSTs (sorted string tables) are immutable once flushed. A checkpoint uploads only the SSTs that didn't exist at the previous checkpoint — typically 1–5% of total state per checkpoint interval. Why this is the killer feature: a 50GB-state job with heap backend uploads 50GB per checkpoint. The same job with incremental RocksDB uploads ~500MB per checkpoint. The S3 bill drops 100×, the checkpoint duration drops from minutes to seconds, and the network bandwidth to S3 stops being the bottleneck.
  4. Compaction interleaves with the operator. RocksDB's background compaction merges small SSTs into larger ones to keep read amplification low. This consumes CPU and disk IO that the operator could be using for record processing. Tuning max_background_jobs and level_compaction_dynamic_level_bytes is a real production lever — too low, and compaction falls behind, blowing up read latency; too high, and the operator throughputs less.

The trade is real and asymmetric. The operator pays microseconds per state access — fine for jobs that read state once or twice per event, painful for jobs that scan thousands of state entries per event (e.g., temporal joins with large windows). For the latter, you partition harder until the state-per-key fits in heap.

RocksDB LSM layout for streaming stateA diagram of a RocksDB instance showing the memtable in RAM, L0 SSTs as recently flushed files, and L1, L2 SSTs as larger sorted files on disk. Arrows show writes flowing to the memtable then flushing to L0, and reads checking the bloom filter then descending the levels. RocksDB LSM — write path (down) vs read path (across) memtable RAM, sorted L0 (recent SSTs) L1 (compacted) L2 (compacted, big) read path for key K 1. block cache? 2. memtable? 3. bloom L0 → L0 SST 4. bloom L1 → L1 SST 5. bloom L2 → L2 SST stop at first hit; bloom skips most levels checkpoint upload snapshot ref-counts SSTs upload only NEW SSTs re-use prior SSTs by URI delta = ~1–5% of total major compaction breaks this → uploads near-full state
The LSM gives you cheap incremental checkpoints because immutable SSTs are reusable across snapshots. A major compaction rewrites SSTs and breaks the reuse — the next checkpoint uploads almost everything. Tuning compaction priority is tuning checkpoint cost.

A typical Flink production config for RocksDB:

state.backend: rocksdb
state.checkpoints.dir: s3://razorpay-flink-checkpoints/payments-agg/
state.backend.rocksdb.localdir: /mnt/nvme0/flink-rocksdb
state.backend.incremental: true
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.checkpoint.transfer.thread.num: 8
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM   # despite the name, fine for NVMe with lots of RAM
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.5

The settings that matter most: localdir must be on local NVMe (network-attached storage halves throughput), transfer.thread.num parallelises S3 upload (8–16 is typical at 100Gbps), and memory.managed: true lets Flink size memtables and block cache against the JVM's overall memory budget instead of letting RocksDB allocate uncapped native memory.

External / disaggregated backends

The newer frontier is disaggregated state — keep state in a remote KV store (FoundationDB, Aerospike, ScyllaDB, or a cloud-native store like AWS Aurora Limitless), not on the task manager's local disk. The big benefits and the big cost:

Benefit 1: elasticity. A Dream11 leaderboard pipeline runs at 800 task slots during an India–Pakistan match and 80 task slots overnight. With RocksDB, scaling up takes minutes (state has to redistribute via a key-group reshuffle from the latest checkpoint). With external state, scaling is instant — the new task slots just connect to the same remote KV.

Benefit 2: faster failover. When a task manager dies in RocksDB-land, Flink restores its state by downloading from S3 — slow because of object-store cold latency. With external state, failover is millisecond-fast: the new task manager connects to the KV store and starts reading.

Benefit 3: trivial cross-AZ recovery. RocksDB ties state to local disk on a specific node; if the AZ goes down, you re-download from S3 in another AZ. External KV stores are typically replicated cross-AZ already, so a job in another AZ can pick up state immediately.

The cost: every state access is a network call. Aerospike at p99 5ms; FoundationDB at p99 8ms; in-region Aurora Limitless at p99 3ms. For a streaming operator that does 5 state reads per event at 100k events/sec, you need 500k state-reads/sec; the network to the KV becomes a real bottleneck. Without aggressive client-side caching, throughput drops 5–10× compared to RocksDB. The Apache Flink 1.18+ ForStDB project addresses this by splitting state into a local hot cache plus a remote tier — same protocol as RocksDB, but the cold tier lives in object storage. Production deployments at Alibaba and Meta exist; the Indian deployments (Flipkart, Razorpay) are still mostly on RocksDB as of 2026.

Choosing the backend

The decision tree, distilled from real Indian deployments:

State per task manager Read latency budget Recovery SLA Backend
<1 GB <1 µs <60 s Heap
1–500 GB <100 µs <5 min RocksDB on local NVMe
500 GB – 50 TB <100 µs <5 min RocksDB + tiered storage
Any size with frequent rescaling <5 ms <30 s External (FoundationDB, ForStDB)

The decision is rarely revisited, because migrating between backends requires a savepoint-restore cycle with format conversion. Pick deliberately at job design time. The cost difference at Razorpay-scale (4 GB state, 30s checkpoint interval): heap costs ₹2.4 crore/year in S3 egress; RocksDB incremental costs ₹6 lakh/year. The factor of 40 alone justifies the switch.

Common confusions

Going deeper

How RocksDB serialisation kills throughput in naive ports

A common mistake when migrating from heap to RocksDB: the operator code stays the same, the throughput drops 10×, and the team blames RocksDB. The real cause is usually the serialiser. In heap backend, MapState.get(key) returns the live HashMap<String, Long> reference; mutating it is free. In RocksDB, MapState.get(key) deserialises a byte array into a fresh HashMap every call, and MapState.put(key, value) serialises the entire map back. An operator that does state.get().merge("k", 1, Long::sum) performs ~5KB of serialisation per event in RocksDB versus 0 bytes in heap. The fix: model the state as MapState<String, Long> instead of ValueState<HashMap<String, Long>>. Now each get/put operates on a single key-value pair (~20 bytes), and throughput recovers within 20% of heap.

The changelog state backend (Flink 1.16+)

The changelog state backend wraps an underlying RocksDB or heap backend and writes every state mutation to an append-only log on durable storage. Checkpoints become near-instant because they're just a pointer into the log — no SST upload, no full snapshot. The cost is doubled write amplification during steady state (every write goes to both the underlying backend and the changelog) and slower recovery (the log has to be replayed). The win: checkpoint trigger time drops from seconds to <100 ms, which matters for jobs that need very tight checkpointTimeout SLAs (e.g., ad-bid pipelines where a delayed checkpoint cascades into delayed offset commits and double bidding). Flipkart Big Billion Days session-windowing migrated to the changelog backend in 2024 to hit a 99.99% checkpoint success rate during the 14× traffic spike.

TTL state and cleanup

State in a streaming job often has natural expiry: a session-window's state should be cleaned up after the session closes; a fraud-detection feature based on the last 30 days of card history should evict 31-day-old entries. Flink supports state TTL via StateTtlConfig, with three eviction strategies: OnReadAndWrite (lazy, only checks TTL when the entry is touched), OnIncrementalCleanup (proactive, sweeps a few entries per record), and Never (relies on full scan). RocksDB compaction can also be configured with a TTL filter, dropping expired entries during compaction without a separate sweep. The wrong choice can cause state to grow unbounded — a 2023 Zerodha incident saw a join-state operator accumulate 380GB before TTL was added, and the recovery checkpoint took 18 minutes.

Why local NVMe matters for RocksDB

A common cloud-deployment mistake: putting RocksDB's localdir on EBS GP3 (network-attached) instead of NVMe instance store. EBS GP3 maxes at 16k IOPS; NVMe instance store on i4i.4xlarge does 1M+ IOPS at lower latency. RocksDB's read path issues 5–20 IOPS per state read (bloom filter checks across levels), so an EBS-backed task manager caps at ~3000 events/sec while an NVMe-backed one runs 50k+ events/sec on the same hardware. The cost difference is 3× (0.42/hr vs1.32/hr), but the throughput difference is 15×; the per-event cost on NVMe is 5× lower.

Where this leads next

The next chapter — /wiki/exactly-once-end-to-end-the-actual-mechanics — uses the state backend's checkpoint output as the input to the two-phase commit protocol that coordinates with sinks. Without the state backend, exactly-once is impossible; without the 2PC sink, the state backend's correctness leaks at the boundary.

After that, /wiki/savepoints-vs-checkpoints-and-version-upgrades covers the format-portability differences between checkpoint-format state and savepoint-format state — the difference between "I can rescale this job" and "I can only resume it as-is on the same cluster".

The mental model: the state backend is the storage engine your streaming operators are unknowingly built against. The choice you don't think about is the one that decides whether the job survives Big Billion Days.

References