Flink: stateful stream processing
Open a terminal and run nc -lk 9000 in one window. Pipe a few numbers into it from another and try to compute their running average — except your "process" is fifty machines spread across three Bengaluru data centres, one of them just rebooted, and the partial sum it was holding is now scrap. Flink is the answer to "where did that partial sum live, and how do we get it back?"
A windowed query like "sum the UPI payments per merchant per minute" needs the partial sum to live somewhere as the events stream in. Flink gives every operator local keyed state, backs it with RocksDB on disk, and takes consistent checkpoints — distributed snapshots of every operator's state plus the input offsets that produced it — using a 1985 algorithm by Chandy and Lamport. After a crash, Flink restores the state, rewinds the source to the snapshotted offset, and replays. Output goes through a transactional sink, so each input event causes its downstream effect exactly once.
Why streams need state, and where the state has to live
The first streaming query that breaks the "easy" tier is the simplest one a business actually wants: count UPI payments per merchant per minute. A single number per merchant per minute. To compute it, the processor must, for every incoming event, look up the running count for (merchant_id, minute) and increment it.
That lookup is state. It is the bytes the operator carries between events. A pure transformation — map, filter, parse_json — is stateless and trivial to scale. The moment you say "…per merchant", you are asking each event to find a counter that lives somewhere across runs.
Three places the state could live, all of them wrong on their own:
- In a remote database (Redis, Cassandra). Every event becomes a network round-trip. At PhonePe-scale (millions of events per second) this is dead on arrival; the network alone caps you at a few thousand events per second per worker.
- In RAM only. Fast, but a single JVM crash erases hours of running aggregates and you cannot get them back without replaying the whole topic from offset zero.
- On the local disk of each worker. Fast enough (RocksDB does ~100k ops/sec per worker) and survives JVM crashes — but a worker disk failure or a rescaling event still loses the state.
Flink's design is all three at once. State lives in RocksDB on each worker's local SSD for hot reads. RAM holds a write-back cache. And periodically — every 30 seconds, say — Flink takes a checkpoint: a consistent snapshot of every operator's state across the whole cluster, written to durable shared storage (S3, HDFS, an NFS mount). When a worker dies, Flink reloads the last checkpoint, rewinds the Kafka source to the offset captured in that same checkpoint, and replays.
Why state must be co-located with the operator: a streaming worker processes events at the speed of disk-local IO (microseconds). A round-trip to a remote KV store is at minimum 200 microseconds on a healthy LAN, often a millisecond. The operator's throughput would drop by 100x. So Flink keeps the working state next to the CPU that uses it, and only pays the network cost once per checkpoint, not once per event.
The crucial property: the checkpoint contains both the state and the input offset. Restoring one without the other corrupts the count. Restore state from yesterday and the offset from today, and you double-count an entire day. Flink's correctness rests on these two numbers being captured in the same atomic snapshot.
A windowed counter, written as Flink would write it
Here is the canonical windowed-aggregate, in Flink's Python (PyFlink) API. Open your editor. Type this in. Do not copy-paste — type it. Every line is a state-management decision in disguise.
# upi_per_minute.py — count UPI payments per merchant per minute
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.datastream.connectors.kafka import KafkaSource
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common import Time, Duration
from pyflink.datastream.window import TumblingEventTimeWindows
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(30_000, CheckpointingMode.EXACTLY_ONCE)
env.get_checkpoint_config().set_checkpoint_storage_dir("s3://flink-ckpts/upi/")
source = (KafkaSource.builder()
.set_bootstrap_servers("kafka.razorpay.internal:9092")
.set_topics("upi-payments")
.set_group_id("per-minute-counter")
.set_starting_offsets("earliest")
.build())
stream = env.from_source(
source,
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(lambda r, _: json.loads(r)["ts_ms"]),
"upi-source")
(stream
.map(lambda r: (json.loads(r)["merchant_id"], 1))
.key_by(lambda x: x[0])
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(lambda a, b: (a[0], a[1] + b[1]))
.sink_to(kafka_sink_with_transactional_id("upi-counts")))
env.execute("upi-per-minute")
The job is twenty-five lines. The interesting lines are not where the addition happens.
env.enable_checkpointing(30_000, EXACTLY_ONCE). Tells Flink to take a checkpoint every 30 seconds and to use the exactly-once variant of the protocol (aligned barriers; the variant matters and the next section is about it). Without this line, a worker crash silently loses up to a minute of partial sums.
set_checkpoint_storage_dir("s3://..."). The durable backing store for checkpoints. Local SSDs are not safe — they die with the worker. S3 / HDFS / Azure Blob is the canonical choice for production. For development, file:///tmp/flink-ckpts works and lets you inspect a checkpoint with ls.
for_bounded_out_of_orderness(Duration.of_seconds(5)). Watermark policy — see the windowing chapter. Tells Flink "events may arrive up to 5 seconds late; after that, close the window." Watermarks are themselves part of the checkpoint, because if a worker dies after the watermark crossed 09:42:00 but before the window closed, the restored worker must re-cross 09:42:00 to fire the same window.
.key_by(lambda x: x[0]). The pivotal call. key_by shards the stream by merchant_id so that all events for merchant_M001 go to the same operator instance. That operator now has keyed state — a logical map from merchant_id to its running count. Flink stores this map in RocksDB, partitioned by key range, on the local disk of the worker that owns the key range.
.window(TumblingEventTimeWindows.of(Time.minutes(1))). The window operator carries its own state — for each (merchant_id, minute), the partial sum being accumulated. When the watermark passes the minute boundary, the partial sum is emitted and the state for that key/minute is freed.
.sink_to(kafka_sink_with_transactional_id(...)). The output is a Kafka transactional producer. Why this matters: Flink commits the checkpoint and the sink's transaction together, using the two-phase-commit protocol described in the exactly-once chapter. The sink is the bridge that turns "I restored state and rewound my offset" into "downstream consumers saw each output exactly once."
The per-event hot path here is: parse JSON → look up merchant_id in RocksDB → add 1 → write back to RocksDB → emit (when window closes) → ack. RocksDB does this in roughly 5 microseconds for a hot key. Flink workers handle 100k–500k events/sec per core on this kind of workload — comparable to a tight in-memory loop, because most reads hit the LSM-tree's memtable cache.
Chandy-Lamport: how to snapshot a running cluster without stopping it
The hard part of Flink is not "save the state to S3." That is a for k, v in rocksdb: write(s3, k, v) loop. The hard part is: how do you snapshot fifty operators connected by network channels — some of them mid-event, some of them with messages in flight on TCP buffers — and end up with a consistent picture? "Consistent" meaning: if operator A's snapshot says "I produced output X at offset 100," then either operator B's snapshot includes X in its state, or B's input channel from A still has X queued for replay.
A naive approach — "stop the world, snapshot everyone, resume" — works but kills throughput. For a 5-second snapshot, every operator stops for 5 seconds, and the cluster's effective throughput drops by 5 / (30+5) = ~14%. Flink does it without stopping, using an algorithm K. Mani Chandy and Leslie Lamport published in 1985 for distributed deadlock detection. Flink calls its variant asynchronous barrier snapshotting.
The trick is to send a special record — a checkpoint barrier, marked with checkpoint id n — through the data stream. Every source injects barrier n into each of its output channels at the moment it decides "I am at offset O_n." The barrier flows through the topology like a regular event. When an operator sees barrier n arrive on every one of its input channels, it does two things in order:
- Snapshot its own local state to S3 (asynchronously, in the background).
- Forward barrier
nto all of its output channels.
What about the case where barrier n arrives early on one input but the other input is still streaming pre-barrier events? The operator aligns the barriers: it buffers (or stalls) the early channel until the late channel's barrier shows up, so that when the snapshot is taken, every event before barrier n has been processed and every event after barrier n has not. This is the aligned checkpoint, and it gives exactly-once semantics. (There is also an unaligned variant for low-latency setups; we'll discuss it below.)
The whole protocol, in one paragraph: the JobManager (Flink's coordinator) tells every source "start checkpoint n." Each source records its current offsets, injects a barrier into every output channel, and reports its piece of the checkpoint as done. The barriers flow downstream. Each operator aligns its inputs, snapshots its state asynchronously to S3, forwards the barriers, and reports done. When the JobManager has heard "done" from every operator, the checkpoint is complete — durably stored in S3 with a small metadata file pointing to it. Only at that instant does the sink's two-phase commit pre-commit get promoted to a real Kafka transaction commit. If anything fails before that, the partially-written checkpoint files are abandoned and the next checkpoint starts fresh.
Why aligning before snapshotting gives exactly-once: at the moment K snapshots its state, every event before barrier n has been incorporated, and zero events after barrier n have been seen. The state is a function of "everything up to offset On." If K crashes and restores from this snapshot, replaying from On reproduces exactly the events K had not yet seen at snapshot time — no more, no less. No event is processed twice; none is dropped.
Why the snapshot can be asynchronous: once the operator has finished aligning the barrier and forwarded it, its in-memory state is logically frozen at that point in time. The operator can keep processing new events (which now belong to checkpoint n+1), and a background thread can copy the frozen state to S3 in parallel. RocksDB's incremental snapshots make this even cheaper — only the SSTables that changed since the last checkpoint are uploaded.
Keyed state, operator state, and broadcast state
Flink exposes three flavours of state to the user. Knowing which is which prevents 90% of "my job ran out of memory" production stories.
Keyed state — the workhorse
Available only inside a key_by partition. Logically a map keyed by the user's key. Comes in five primitive shapes: ValueState[T], ListState[T], MapState[K, V], ReducingState[T], and AggregatingState[IN, OUT]. Stored in RocksDB and partitioned by the key range owned by each parallel subtask. On rescaling (changing the parallelism), Flink redistributes the key ranges across new subtasks.
Use this when the state is a function of a key that is bounded in cardinality and whose updates can be sharded by key. Examples: per-user session info, per-merchant counters, per-device latest reading.
Operator state — small, per-subtask
Not keyed by user data; instead, each parallel subtask of an operator owns a slice. Used by source connectors to remember "I have read up to offset 12345 on this Kafka partition." Always small (kilobytes), always written to S3 in the checkpoint metadata file rather than RocksDB. On rescaling, Flink redistributes the slices either by round-robin (ListState) or by union (UnionListState).
You will rarely write operator state directly; it is the substrate connectors use.
Broadcast state — the same, on every subtask
Used for low-volume "config" streams. Imagine a fraud rule like "block any UPI payment > ₹50,000 from a new device." That rule must be available on every parallel subtask, and updates to the rule (the security team adds "from a new device after midnight") must reach every subtask. Broadcast state replicates the configuration map across all subtasks; updates flow through a separate broadcast stream.
The mental model is simple: keyed state for the firehose, operator state for the source's bookmarks, broadcast state for the rules.
# inside a KeyedProcessFunction
class FraudCounter(KeyedProcessFunction):
def open(self, ctx):
self.count = ctx.get_state(ValueStateDescriptor("count", Types.LONG()))
self.first_seen = ctx.get_state(ValueStateDescriptor("first_seen_ms", Types.LONG()))
def process_element(self, event, ctx):
c = self.count.value() or 0
if c == 0:
self.first_seen.update(ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 60_000)
self.count.update(c + 1)
if self.count.value() > 10:
yield (ctx.get_current_key(), "rate-limit-trip")
def on_timer(self, ts, ctx):
# window expired — reset the per-merchant counter
self.count.clear()
self.first_seen.clear()
Three things worth pointing at. ValueStateDescriptor("count", ...) registers the state under a name that is part of the savepoint's schema; rename it and old savepoints will not deserialise. ctx.timer_service().register_event_time_timer(...) registers a Flink-managed timer in keyed state — also durable, also restored on recovery. self.count.clear() is what frees the RocksDB row when the key is no longer interesting; without this, state grows forever and the worker eventually OOMs after a week of running.
Run this on a single-node cluster against a Kafka topic carrying 200k UPI events per second and watch the metrics endpoint:
$ curl -s localhost:8081/jobs/<job-id>/metrics?get=numRecordsInPerSecond,checkpointDuration,stateSizeBytes
numRecordsInPerSecond : 198,432
checkpointDuration : 1,820 ms (target: < 30s)
stateSizeBytes : 412,309,504 (~ 393 MB across all keyed state)
lastCheckpointSize : 28,114,432 (~ 27 MB delta uploaded to S3)
Two things to read off these numbers. The job sustains its target throughput while taking under-2-second checkpoints — the asynchronous snapshotting is doing its job, the cluster is not stalling. And the delta uploaded to S3 (27 MB) is far smaller than the total state (393 MB), because RocksDB only ships the SSTables that changed since the last checkpoint. That incremental upload is what makes per-30-second checkpoints affordable in production.
Going deeper
Aligned versus unaligned checkpoints
The aligned checkpoint described above stalls early channels until late channels catch up. If your job has skewed input rates — one Kafka partition is 20 seconds behind another, a common sight under back-pressure — alignment can take seconds, and during that window the operator's throughput drops to the speed of the slowest channel. Flink 1.11 introduced unaligned checkpoints: the operator immediately snapshots and includes the in-flight buffers (the data already received but not yet processed) as part of the snapshot. On restore, those buffers are replayed before any new input. The trade-off is a larger checkpoint (the buffer contents balloon S3 usage) for faster checkpointing under back-pressure. Production guidance: align by default, switch to unaligned if checkpoints regularly miss their deadline.
Savepoints versus checkpoints
A checkpoint is owned by Flink and exists for crash recovery. Flink deletes old checkpoints automatically. A savepoint is owned by the operator (the human) and exists for upgrades, rescaling, and migration. Savepoints use the same on-disk format as checkpoints but are explicitly triggered (flink savepoint <jobid>) and never deleted automatically. The classic upgrade flow: take a savepoint, stop the job, deploy new JAR, restart from savepoint. Because the savepoint is keyed by (state_name, key), a new operator definition that uses the same names sees the old state. Renaming a state field, or changing its serializer in an incompatible way, breaks the upgrade — which is why the state schema evolution rules exist.
RocksDB tuning that matters in production
Flink ships with sensible defaults, but two knobs come up in real ops. Block cache size: RocksDB caches compressed SSTable blocks in off-heap memory. Default is 8 MB per state backend, far too small. Production jobs at Razorpay-scale set state.backend.rocksdb.block.cache-size: 256m or higher. Memtable size and count: writes go to the memtable first; full memtables flush to SSTables. A larger memtable batches more writes but uses more memory and lengthens recovery time. The default 64 MB / 2 memtables is fine for keyed state with even key distribution; for highly-skewed keys (one merchant dominates), bumping to 128 MB helps. The Flink docs ship a RocksDBStateBackendConfig table of every knob.
The two-phase commit dance with Kafka sinks
The exactly-once guarantee at the sink leans on Kafka's transactional producer. The choreography:
- When the operator forwards barrier
n, the sink pre-commits its Kafka transaction (callsproducer.send_offsets_to_transaction()andproducer.commit_transaction()is not yet called). All produced records sit in Kafka's log marked as part of an open transaction; downstream consumers withisolation.level=read_committedcannot see them. - The JobManager waits for the global checkpoint to complete in S3.
- Once complete, the JobManager calls commit on the sink's pre-committed transaction. This is
producer.commit_transaction()in Kafka terms. - Only now do downstream consumers see the records.
If the cluster crashes between steps 2 and 3, the recovery code finds the transaction in S3's checkpoint metadata and re-issues the commit. If between steps 1 and 2, the in-flight transaction is aborted during recovery, and the records are invisible. Either way, downstream sees each input event's effect exactly once. The full protocol is in Carbone et al., 2017.
Why Flink and not just Kafka Streams
Kafka Streams (covered briefly in the stream/table duality chapter) gives you keyed state, exactly-once, and windowing, all without leaving the Kafka client library. Many pipelines that look "Flink-shaped" are happier as Kafka Streams jobs. The break-even comes when:
- You need to join more than two streams, or join a stream against an external source.
- Your state per worker exceeds RAM (Kafka Streams uses RocksDB but is less aggressive about memory tuning).
- You need to rescale the job's parallelism without rewriting topics (Flink rescales by redistributing key groups; Kafka Streams rescales by repartitioning topics, which is much more invasive).
- You need event-time processing with late data and side outputs at the level of detail Flink's
ProcessFunctionexposes.
Below those thresholds, Kafka Streams is simpler and ships less Java to your cluster. Above them, Flink is the answer.
Common confusions
-
"Flink keeps all state in RAM." No — by default it uses RocksDB on local SSD as the state backend, with an in-memory write-back cache. The pure
HashMapStateBackendexists but is recommended only when total state per worker fits comfortably in heap (under a few GB). Production jobs at any serious scale useEmbeddedRocksDBStateBackend. -
"Checkpoints and savepoints are the same thing." They share an on-disk format but are governed by different rules. Checkpoints are Flink-managed, used for crash recovery, automatically deleted. Savepoints are user-managed, used for upgrades and migrations, never auto-deleted. Mixing the two — for example, restoring a job from another job's checkpoint — will work but is unsupported.
-
"Exactly-once means each event is delivered once." It means each event's effect is observed once downstream. The same input event may be read from Kafka many times during recovery; the deduplication happens at the sink, not at the source. The exactly-once chapter is the prerequisite reading.
-
"Watermarks are stored separately from state." They are part of the operator's checkpointed state. If a job restores from a checkpoint, it restores the watermark too — otherwise the same window would fire twice (once at original watermark crossing, once on replay).
-
"
key_byshards the data; the operator code is unchanged." The operator code runs innparallel instances after akey_by, wherenis the operator parallelism. Each instance owns a key range (a slice of Flink's 128 default key groups). State is sharded with the keys; you cannot, insideprocess_element, see another key's state without an additionalconnectorbroadcast. -
"RocksDB is the bottleneck." It rarely is. The hot path is the JVM's serializer (Kryo or POJO) plus a memtable lookup. Production traces show RocksDB block-cache hit rates above 95% and per-event latencies under 10 microseconds for typical state. Bottlenecks are usually network shuffles between operators or a slow sink, not RocksDB.
Where this leads next
- Materialize and Differential Dataflow: databases as views — chapter 179: a different design point that takes the same Flink ideas and calls the streaming output a SQL view, queryable like Postgres.
- Change Data Capture: Debezium, logical decoding — chapter 180: where Flink-shaped pipelines get their input from in real systems — the database's own write-ahead log, exposed as a stream.
- Where this is all going: the database as a materialized view — chapter 181: the philosophical close of Build 23, taking Flink's "state is just a snapshotted aggregate over a log" all the way back to the stream/table duality.
References
- K. Mani Chandy and Leslie Lamport, Distributed Snapshots: Determining Global States of Distributed Systems (ACM TOCS, 1985) — the foundational paper Flink's checkpoint algorithm is built on. lamport.azurewebsites.net.
- Paris Carbone et al., State Management in Apache Flink: Consistent Stateful Distributed Stream Processing (VLDB, 2017) — the canonical paper describing Flink's asynchronous barrier snapshotting and the two-phase-commit sink. vldb.org.
- Apache Flink, Stateful Stream Processing — official documentation on state primitives, backends, and checkpointing. nightlies.apache.org/flink.
- Tyler Akidau et al., The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing (VLDB, 2015) — the Google paper whose vocabulary (event time, watermarks, triggers) Flink adopted. research.google.
- Martin Kleppmann, Designing Data-Intensive Applications (O'Reilly, 2017), Ch. 11 Stream Processing — accessible long-form treatment of streaming state, event time, and exactly-once. dataintensive.net.
- RocksDB, Tuning Guide — the reference for the underlying KV store Flink uses for state. github.com/facebook/rocksdb/wiki.
- Exactly-once semantics: how it actually works — internal — the mechanism Flink's transactional sinks rely on.
- Windowing, watermarks, and event time — internal — prerequisite for the time-ordering rules in this article.