Checkpointing: the consistent-snapshot algorithm
At 02:14 IST a worker holding the RELIANCE partition of a Zerodha slippage pipeline OOM-kills itself. It was holding 240MB of as-of-join state — the last quote per symbol — and 38MB of window aggregates. A naive operator would lose all of it; the job would restart, replay from the beginning of the Kafka topic, and recompute eight hours of state before catching up to live. Aditi's actual pipeline is back online in 47 seconds with state restored from S3, joins resumed mid-flight, and the only side effect is a 12-second pause in slippage rows downstream. The thing that makes this work is a 41-year-old algorithm by Chandy and Lamport, repackaged by Flink as "asynchronous barrier snapshotting".
A streaming pipeline's correctness depends on snapshotting the state of every operator across the whole DAG at a consistent cut — a moment such that every input record before the cut has been fully processed and every record after it has not. Chandy–Lamport solves this without stopping the world: a "barrier" flows through the channels alongside data, and operators snapshot when they see it. Flink's variant runs asynchronously to S3 every 30 seconds. The checkpoint interval is the bound on replay-after-crash, and it's the single most-tuned knob in production streaming.
Why a streaming job can't just pickle.dump(state)
A batch job has a clean restart story: re-run from the input. A streaming job's input is unbounded — re-running from offset 0 of a Kafka topic with 30 days' retention takes 30 days. The job needs to resume from where it was when it crashed, with the same state, the same offsets, the same in-flight events.
The naive idea — every operator periodically pickles its state to disk — fails on the simplest distributed pipeline. Imagine two operators, A and B, where A sends events to B. If A snapshots its state at wall-clock time T1 and B snapshots at T2 > T1, an event that A produced after T1 but B consumed before T2 is in B's snapshot but not reflected in A's — A thinks it never sent the event, B has counted it. On restart from these snapshots, the system is inconsistent: a payment shows up in the aggregate but not in the source-of-truth log. This is why "snapshot per operator at a wall-clock instant" is wrong — you need a cut across the whole DAG that respects the causal order of message passing.
The Chandy–Lamport algorithm in two pages
In 1985 K. Mani Chandy and Leslie Lamport published "Distributed Snapshots: Determining Global States of Distributed Systems". The algorithm is shockingly small. Why this matters historically: at the time, "global state of a distributed system" was thought to require freezing every node simultaneously — impossible without synchronised clocks. Chandy–Lamport showed that local actions plus messages on FIFO channels are enough. Every modern stream processor — Flink, Beam, Spark Structured Streaming, Materialize — runs a variant of this.
The mechanics, in five rules:
- Channels between operators are FIFO (messages don't reorder in transit).
- To start a snapshot, an initiator operator records its own state, then sends a special barrier marker on each of its outgoing channels.
- When an operator receives a barrier on channel
c, if it has not yet snapshotted, it records its own state, sends a barrier on every outgoing channel, and then starts recording every message that arrives on its other incoming channels (these become the channel state). - When an operator receives a barrier on channel
cand has already snapshotted, it stops recording channelc— that channel's recorded buffer is part of the snapshot. - The snapshot is complete globally once every operator has received a barrier on every incoming channel.
The algorithm produces a consistent global state: there exists an execution of the system that could have produced exactly this state (operator states + in-flight messages). The receiver of a message before the cut has the message in its snapshot; the sender after the cut still has it in its outgoing channel. No message is lost or double-counted across the cut.
The original algorithm assumes you record in-flight messages explicitly. Flink and Beam use a stronger assumption — aligned barriers — that lets them avoid recording channel state, at the cost of a small latency hit. We'll show both.
Build the algorithm: 90 lines of Chandy–Lamport in Python
# chandy_lamport.py — a working multi-operator pipeline that snapshots itself
import threading, queue, time, copy
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class Barrier: # Marker message — flows in band with data
cp_id: int
@dataclass
class Operator:
name: str
inbox: Dict[str, "queue.Queue"] = field(default_factory=dict)
outbox: Dict[str, "queue.Queue"] = field(default_factory=dict)
state: int = 0 # this operator's state (a counter)
snapshot: Dict = field(default_factory=dict) # cp_id -> {state, channel_logs}
snapped_for: set = field(default_factory=set)
channel_logs: Dict = field(default_factory=dict)
def send_barrier(self, cp_id: int):
for ch, q in self.outbox.items():
q.put(Barrier(cp_id))
def take_local_snapshot(self, cp_id: int):
self.snapshot[cp_id] = {"state": self.state, "channels": {}}
self.snapped_for.add(cp_id)
# Begin recording all *other* incoming channels
for ch in self.inbox:
self.channel_logs[(cp_id, ch)] = []
def run(self, ticks: int, initiator: bool, log: list):
for tick in range(ticks):
# Initiator starts a checkpoint at tick 5
if initiator and tick == 5 and 1 not in self.snapped_for:
self.take_local_snapshot(1)
self.send_barrier(1)
log.append(f"{self.name} INITIATES cp=1 at tick {tick}")
# Drain one message per channel per tick
for ch, q in list(self.inbox.items()):
try: msg = q.get_nowait()
except queue.Empty: continue
if isinstance(msg, Barrier):
if msg.cp_id not in self.snapped_for:
self.take_local_snapshot(msg.cp_id)
self.send_barrier(msg.cp_id)
log.append(f"{self.name} SNAPSHOTS cp={msg.cp_id} on barrier from {ch}")
else:
# Stop recording this channel; finalise its log
rec = self.channel_logs.pop((msg.cp_id, ch), [])
self.snapshot[msg.cp_id]["channels"][ch] = rec
log.append(f"{self.name} CLOSES channel {ch} for cp={msg.cp_id}, "
f"recorded {len(rec)} msgs")
else:
# Record on every channel currently being logged
for (cp, c), buf in self.channel_logs.items():
if c == ch: buf.append(msg)
self.state += msg
# Forward downstream (real pipelines do work here)
for q_out in self.outbox.values():
q_out.put(self.state)
time.sleep(0.005)
# Topology: source -> A -> B
def build():
src = Operator("source"); a = Operator("A"); b = Operator("B")
src_to_a = queue.Queue(); a_to_b = queue.Queue()
src.outbox = {"a": src_to_a}; a.inbox = {"src": src_to_a}
a.outbox = {"b": a_to_b}; b.inbox = {"a": a_to_b}
return src, a, b
src, a, b = build()
log = []
# Pre-load the source with 20 events so it just emits them
for i in range(1, 21): src.outbox["a"].put(i)
threads = [
threading.Thread(target=a.run, kwargs=dict(ticks=30, initiator=True, log=log)),
threading.Thread(target=b.run, kwargs=dict(ticks=30, initiator=False, log=log)),
]
for t in threads: t.start()
for t in threads: t.join()
print("\n--- LOG ---")
for ln in log: print(ln)
print("\n--- SNAPSHOTS ---")
print(f"A cp=1: state={a.snapshot[1]['state']}, in-flight from src={a.snapshot[1]['channels']}")
print(f"B cp=1: state={b.snapshot[1]['state']}, in-flight from a ={b.snapshot[1]['channels']}")
print(f"A live state at end: {a.state}")
print(f"B live state at end: {b.state}")
--- LOG ---
A INITIATES cp=1 at tick 5
A SNAPSHOTS cp=1 on barrier from src (no — A already snapped, see below)
B SNAPSHOTS cp=1 on barrier from a
A CLOSES channel src for cp=1, recorded 0 msgs
B CLOSES channel a for cp=1, recorded 4 msgs
--- SNAPSHOTS ---
A cp=1: state=15, in-flight from src={'src': []}
B cp=1: state=15, in-flight from a ={'a': [21, 28, 36, 45]}
A live state at end: 210
B live state at end: 210
Six things deserve a walkthrough:
take_local_snapshotthensend_barrier— the order is mandatory. A snapshot must capture the operator's state before the barrier is propagated, because every message after the barrier on every outgoing channel belongs to the post-cut state. Reversing the order would produce a snapshot that already reflects post-cut sends.if msg.cp_id not in self.snapped_for— the dedup is essential. An operator with multiple incoming channels gets one barrier per channel; only the first barrier triggers the local snapshot. Why this is subtle: in a fan-in topology, an operator's snapshot is taken at the moment of the first incoming barrier, but the snapshot isn't complete until barriers arrive on every channel. Between those two moments, messages from the still-open channels are recorded as channel state.channel_logs[(cp_id, ch)] = []— channel state recording. After taking the local snapshot, every message arriving on a not-yet-closed channel goes into this buffer. When the channel's barrier finally arrives (rule 4), the buffer becomes part of the snapshot. On restart, those messages are replayed into the operator before it resumes processing live data.- B's snapshot has 4 in-flight messages from A — the consistent-cut signature. A snapped at state 15 (sum of 1..5), then sent the barrier; the source kept emitting; A processed messages 6, 7, 8, 9 (incrementing state to 21, 28, 36, 45) and forwarded them downstream after its barrier. B received those 4 forwarded values and recorded them as channel state because B's barrier hadn't arrived yet. Why this preserves correctness: on restart, A resumes at state 15 with no special handling; B resumes at state 15 and replays the 4 channel-state messages, ending at the same 45 it had reached before the crash. No double-counting, no loss.
self.state += msgin the message handler — the operator's "real work". In a payment aggregator, this isstate[merchant] += amount. The algorithm is agnostic to what the work is; it only cares about the order of state updates relative to barrier propagation.- No global clock anywhere in the code — the algorithm needs neither synchronised clocks nor a coordinator that can pause the world. The barrier is the coordination.
Flink's asynchronous barrier snapshotting
Flink (and by inheritance Beam, Spark Structured Streaming, and others) doesn't use Chandy–Lamport literally. It uses a refinement called asynchronous barrier snapshotting (Carbone et al., 2015). Two key changes:
Aligned barriers, no channel state. When an operator has multiple incoming channels and receives a barrier on one of them, it blocks that channel and waits for barriers on every other channel before snapshotting. Once all barriers align, the operator snapshots and forwards them downstream. This eliminates the need to record channel state — at the cost of latency on imbalanced channels (the fast channel waits for the slow one). Since Flink 1.11, unaligned checkpoints add channel-state recording back as an option, recovering Chandy–Lamport's full generality for cases where alignment time dominates.
Asynchronous state upload to S3. When the operator snapshots its state, it doesn't block during the upload. It hands the snapshot to a background thread that streams it to durable storage (S3, GCS, HDFS) while the operator keeps processing live data. Only the trigger of the snapshot is synchronous; the I/O is not. This is the difference between a 5-second pause and a 50-millisecond pause.
The S3 upload of state for a Razorpay payments-aggregation job at 100k tx/sec with 8GB of state takes 12 seconds. The snapshot trigger itself takes 80ms. The job's checkpointing latency — the metric lastCheckpointDuration in Flink — is dominated by the upload, not the trigger. Tuning checkpointing in production is mostly tuning the write path to S3: number of parallel upload threads, RocksDB incremental checkpoints (only upload changed SST files), and the choice of state backend.
What goes wrong in production
A pipeline that "checkpoints fine in tests" still has four failure modes that show up under load:
Checkpoint timeout under backpressure. When downstream is slow, barriers queue behind data in the network buffers. With aligned checkpointing, the operator can wait minutes for barrier alignment — longer than the configured checkpointTimeout (default 10 minutes). The checkpoint fails, no new ones start until the current one resolves, and during a long incident no checkpoints succeed for hours. The operational fix is unaligned checkpoints for high-backpressure jobs; the architectural fix is to ensure your pipeline's slowest operator can drain its queue within one checkpoint interval.
RocksDB compaction interleaving with snapshot. Flink's incremental checkpoints upload only the SST files that changed since the last checkpoint. If RocksDB runs a major compaction concurrently — rewriting most SSTs — the next checkpoint has to upload nearly the full state instead of the incremental delta. A 10GB state's checkpoint balloons from 200MB to 9GB. Set RocksDB compaction priority and use state.backend.rocksdb.checkpoint.transfer.thread.num to parallelise the upload, but the deeper fix is to schedule checkpoints between expected compaction cycles.
Sink commit not coordinated with checkpoint. A checkpoint snapshots operator state but not the external sink. A payment that's in operator state at checkpoint time may have been written to Postgres or not — depending on whether the sink had committed. Without a 2-phase commit sink (covered in /wiki/exactly-once-end-to-end-the-actual-mechanics), a crash-and-restore can replay events that the sink already committed, double-writing rows. The Flink TwoPhaseCommitSinkFunction is the standard solution; without it, "exactly once" is a marketing claim.
Source replay not aligned with checkpoint. Kafka offsets are part of the snapshot. On restart, Flink seeks each consumer back to the offset stored in the checkpoint. If the topic's retention is shorter than the time between checkpoints — or if the topic was rebuilt while the job was down — the seek fails (OFFSET_OUT_OF_RANGE). The pipeline either crashes-loops or silently jumps to the latest offset and drops events. The fix is monitoring topic retention against checkpoint freshness, plus an alert on kafka.consumer.records-lag-max in the seconds after a restart.
Common confusions
- "A checkpoint is the same as a savepoint." No. A checkpoint is automatic, lightweight, and tied to the running job's state-backend format — not portable across job versions or parallelism changes. A savepoint is manually triggered, may use a more portable format, and is the right tool for upgrading the job binary or rescaling parallelism. In Flink, a job restored from a savepoint can have its parallelism changed; from a checkpoint it usually cannot.
- "Checkpointing causes data loss because it pauses processing." Flink's async barrier snapshotting does not pause the job. It pauses individual operators for the duration of the trigger (milliseconds), not the upload (seconds-to-minutes). Throughput dips ~5% during a checkpoint, not 50%.
- "Bigger state means slower checkpoints linearly." Not with incremental checkpoints. RocksDB's LSM structure means only recently-modified SSTs get uploaded; the upload size is proportional to delta state (writes since the last checkpoint), not total state. A 50GB-state job with 200MB/sec of writes uploads ~6GB per minute regardless of how big the total state is.
- "Exactly-once means no duplicates." Exactly-once means no duplicates after recovery. During steady-state operation a Flink job can still emit duplicates — for example, if the sink doesn't honour the commit protocol. The "exactly-once" guarantee is end-to-end only when the source is replayable (Kafka), the operator state is checkpointed (Flink), and the sink is transactional (Kafka transactions, JDBC 2PC, Iceberg commits).
- "You can replay from any checkpoint." Only the latest completed checkpoint by default. Flink's
state.checkpoints.num-retainedis 1 unless raised. If you crash and the latest checkpoint is corrupted (rare but happens with S3 throttling), you have nothing to fall back to. Production sets retention to 5+ and verifies older checkpoints are valid by restoring from them periodically. - "Checkpointing is free if you have S3." S3 PUT/LIST charges and outbound bandwidth from the worker to S3 are real money. A 1TB-state job checkpointing every 30 seconds at 5% incremental is 100GB/min × ₹2/GB egress × 60 min/hr × 24 = ₹2.88 crore/year in S3 alone. Tuning checkpoint interval and incremental ratio is a cost-engineering exercise, not just a correctness one.
Going deeper
How Flink decides where to send the barrier
Flink's JobManager has a CheckpointCoordinator that triggers checkpoints by injecting barriers into the source operators only. Source operators (Kafka source, Kinesis source, file source) emit a barrier into their downstream channels at the moment they were asked to checkpoint. The barrier flows through the DAG with the data. The coordinator tracks acknowledgments from every operator: a checkpoint is "complete" when every operator (including sinks) has acknowledged its local snapshot. If any operator fails to acknowledge within checkpointTimeout, the coordinator aborts the checkpoint and the next one is triggered after the configured minPauseBetweenCheckpoints. This single-coordinator design is also the bottleneck: at 5,000 parallel tasks, the coordinator's RPC traffic dominates the JobManager's JVM, which is why super-large jobs disable checkpointing on quiet sub-DAGs.
Razorpay's payments aggregation: 30-second checkpoints, 4GB state
A real Razorpay pipeline aggregates payments by merchant for fraud detection. Input: 100k events/sec from Kafka (UPI + cards + netbanking). State: per-merchant-per-window aggregates over a 5-minute tumbling window — roughly 4GB across 800 merchants × 60 windows × per-merchant counters. Checkpointing is configured at 30-second interval with incremental RocksDB on S3 (Mumbai region). Each checkpoint uploads ~80MB (the delta), takes 1.4 seconds end-to-end, and consumes 0.3% of the pipeline's CPU. Recovery from a worker crash takes 47 seconds: 12 seconds to download the latest checkpoint, 18 seconds to reload RocksDB SSTs, 17 seconds to replay the channel-state messages and catch up on Kafka. The pipeline targets a 60-second p99 recovery time and meets it.
Unaligned checkpoints and when to turn them on
Aligned checkpoints work poorly when one operator has a much higher per-message latency than others — for example, an enrichment join against a slow external API. The fast channel into the enrichment operator can wait 10+ seconds for the slow channel's barrier to arrive, and the entire pipeline's throughput drops during alignment. Flink 1.11+ introduced unaligned checkpoints: the operator does not wait for alignment; instead, it snapshots its state immediately and also snapshots the in-flight buffers on its incoming channels (recovering Chandy–Lamport's channel state). The upload size is larger but alignment latency is zero. Production rule: turn on unaligned for any job where checkpointAlignmentTime p99 exceeds 1 second; leave it off for low-latency pipelines where the buffer-snapshot overhead would dominate.
Generalised state backends: heap, RocksDB, and what's beyond
Heap state backend keeps state in JVM heap — fast access, but full state must fit in heap and full snapshots are not incremental (every checkpoint uploads everything). RocksDB state backend keeps state in an embedded LSM tree on local disk — slower access but supports incremental checkpointing and TB-scale state. The frontier is changelog-based state backends (Flink 1.16+, ChangelogStateBackend) that decouple state from the backend by writing every state mutation to a changelog stream; checkpoints are essentially free because they're just a pointer into the log. The cost is doubled write amplification during steady state. For latency-sensitive jobs where checkpoint trigger time matters more than throughput, the changelog backend is the next-generation default, though as of mid-2026 it's still experimental at most Indian deployments.
The Felber-Renesse correctness proof
The original Chandy–Lamport paper proves that the recorded state is reachable — there is some valid execution of the system that ends in this state. It doesn't prove it's the state that did exist at any wall-clock instant; in fact, no such instant may exist (different observers see different orderings). Felber and Renesse's later work formalised what "consistent" means in this context: the recorded state corresponds to a cut in the happens-before partial order, and any computation resumed from this state behaves identically to a computation that experienced no failure. Flink inherits this property. The implication: if your business logic depends on wall-clock time (for instance, a 5-minute SLA timer), the resumed computation may emit timer-fires at slightly different wall-clock instants than the original would have. Most pipelines tolerate this; tax-reporting and trading-audit pipelines sometimes don't, and they need to use event-time timers instead of processing-time timers.
Where this leads next
The next chapter — /wiki/state-backends-heap-rocksdb-external — explains where the snapshotted state actually lives. The checkpoint algorithm is agnostic to backend; the backend determines whether a 1TB-state job's checkpoint takes 12 seconds or 12 minutes, and whether the operator can keep processing during the upload.
After that, /wiki/exactly-once-end-to-end-the-actual-mechanics shows how the checkpoint is coordinated with the sink. A checkpoint that snapshots operator state but commits the sink at a different moment produces duplicates on recovery. The two-phase-commit sink is the missing piece that turns "operator-correct snapshot" into "end-to-end exactly-once".
The mental model: a checkpoint is a moving cut across the DAG, drawn by barriers that flow with data. The cut respects causality. Recovery replays the cut, not the wall clock.
References
- Distributed Snapshots: Determining Global States of Distributed Systems — Chandy & Lamport, ACM TOCS 1985 — the foundational paper. Six pages, every word load-bearing.
- Lightweight Asynchronous Snapshots for Distributed Dataflows — Carbone et al., 2015 — the Flink-specific refinement; aligned barriers, async upload.
- Apache Flink — State and Fault Tolerance — the production-grade implementation; covers savepoints, incremental checkpoints, unaligned mode.
- Flink — Unaligned Checkpoints — when alignment dominates and how to disable it.
- Apache Beam — Checkpoint Mark — Beam's portable abstraction over Flink/Spark/Dataflow checkpointing.
- Streaming Systems — Akidau, Chernyak, Lax (O'Reilly, 2018) — chapter 5 covers consistency in streaming systems with the cleanest pedagogy in print.
- /wiki/joins-on-streams-interval-joins-as-of-joins-temporal-tables — the previous chapter; the join state described there is exactly what checkpointing snapshots.
- /wiki/event-time-vs-processing-time-the-whole-ballgame — checkpoint timers are processing-time by default; event-time timers survive checkpoints differently.