Checkpointing: the consistent-snapshot algorithm

At 02:14 IST a worker holding the RELIANCE partition of a Zerodha slippage pipeline OOM-kills itself. It was holding 240MB of as-of-join state — the last quote per symbol — and 38MB of window aggregates. A naive operator would lose all of it; the job would restart, replay from the beginning of the Kafka topic, and recompute eight hours of state before catching up to live. Aditi's actual pipeline is back online in 47 seconds with state restored from S3, joins resumed mid-flight, and the only side effect is a 12-second pause in slippage rows downstream. The thing that makes this work is a 41-year-old algorithm by Chandy and Lamport, repackaged by Flink as "asynchronous barrier snapshotting".

A streaming pipeline's correctness depends on snapshotting the state of every operator across the whole DAG at a consistent cut — a moment such that every input record before the cut has been fully processed and every record after it has not. Chandy–Lamport solves this without stopping the world: a "barrier" flows through the channels alongside data, and operators snapshot when they see it. Flink's variant runs asynchronously to S3 every 30 seconds. The checkpoint interval is the bound on replay-after-crash, and it's the single most-tuned knob in production streaming.

Why a streaming job can't just pickle.dump(state)

A batch job has a clean restart story: re-run from the input. A streaming job's input is unbounded — re-running from offset 0 of a Kafka topic with 30 days' retention takes 30 days. The job needs to resume from where it was when it crashed, with the same state, the same offsets, the same in-flight events.

The naive idea — every operator periodically pickles its state to disk — fails on the simplest distributed pipeline. Imagine two operators, A and B, where A sends events to B. If A snapshots its state at wall-clock time T1 and B snapshots at T2 > T1, an event that A produced after T1 but B consumed before T2 is in B's snapshot but not reflected in A's — A thinks it never sent the event, B has counted it. On restart from these snapshots, the system is inconsistent: a payment shows up in the aggregate but not in the source-of-truth log. This is why "snapshot per operator at a wall-clock instant" is wrong — you need a cut across the whole DAG that respects the causal order of message passing.

A consistent cut vs an inconsistent cutTwo operator timelines are shown with messages flowing between them. The top diagram shows an inconsistent cut where a message is in flight across the cut boundary. The bottom diagram shows a consistent cut where the boundary respects causality. Inconsistent cut (top) vs consistent cut (bottom) A B snapshot A here snapshot B here → message at t=420 lost on restart A B consistent cut (barrier) every msg before cut → in B's snapshot
The top cut crosses an in-flight message — A's snapshot says "I sent it"; B's snapshot does not contain it. The bottom cut is a vertical line that respects every message's causal order: every send-before-cut has a matching receive-before-cut. That is what Chandy–Lamport guarantees.

The Chandy–Lamport algorithm in two pages

In 1985 K. Mani Chandy and Leslie Lamport published "Distributed Snapshots: Determining Global States of Distributed Systems". The algorithm is shockingly small. Why this matters historically: at the time, "global state of a distributed system" was thought to require freezing every node simultaneously — impossible without synchronised clocks. Chandy–Lamport showed that local actions plus messages on FIFO channels are enough. Every modern stream processor — Flink, Beam, Spark Structured Streaming, Materialize — runs a variant of this.

The mechanics, in five rules:

  1. Channels between operators are FIFO (messages don't reorder in transit).
  2. To start a snapshot, an initiator operator records its own state, then sends a special barrier marker on each of its outgoing channels.
  3. When an operator receives a barrier on channel c, if it has not yet snapshotted, it records its own state, sends a barrier on every outgoing channel, and then starts recording every message that arrives on its other incoming channels (these become the channel state).
  4. When an operator receives a barrier on channel c and has already snapshotted, it stops recording channel c — that channel's recorded buffer is part of the snapshot.
  5. The snapshot is complete globally once every operator has received a barrier on every incoming channel.

The algorithm produces a consistent global state: there exists an execution of the system that could have produced exactly this state (operator states + in-flight messages). The receiver of a message before the cut has the message in its snapshot; the sender after the cut still has it in its outgoing channel. No message is lost or double-counted across the cut.

The original algorithm assumes you record in-flight messages explicitly. Flink and Beam use a stronger assumption — aligned barriers — that lets them avoid recording channel state, at the cost of a small latency hit. We'll show both.

Build the algorithm: 90 lines of Chandy–Lamport in Python

# chandy_lamport.py — a working multi-operator pipeline that snapshots itself
import threading, queue, time, copy
from dataclasses import dataclass, field
from typing import Dict, List

@dataclass
class Barrier:  # Marker message — flows in band with data
    cp_id: int

@dataclass
class Operator:
    name: str
    inbox: Dict[str, "queue.Queue"] = field(default_factory=dict)
    outbox: Dict[str, "queue.Queue"] = field(default_factory=dict)
    state: int = 0                         # this operator's state (a counter)
    snapshot: Dict = field(default_factory=dict)  # cp_id -> {state, channel_logs}
    snapped_for: set = field(default_factory=set)
    channel_logs: Dict = field(default_factory=dict)

    def send_barrier(self, cp_id: int):
        for ch, q in self.outbox.items():
            q.put(Barrier(cp_id))

    def take_local_snapshot(self, cp_id: int):
        self.snapshot[cp_id] = {"state": self.state, "channels": {}}
        self.snapped_for.add(cp_id)
        # Begin recording all *other* incoming channels
        for ch in self.inbox:
            self.channel_logs[(cp_id, ch)] = []

    def run(self, ticks: int, initiator: bool, log: list):
        for tick in range(ticks):
            # Initiator starts a checkpoint at tick 5
            if initiator and tick == 5 and 1 not in self.snapped_for:
                self.take_local_snapshot(1)
                self.send_barrier(1)
                log.append(f"{self.name} INITIATES cp=1 at tick {tick}")
            # Drain one message per channel per tick
            for ch, q in list(self.inbox.items()):
                try: msg = q.get_nowait()
                except queue.Empty: continue
                if isinstance(msg, Barrier):
                    if msg.cp_id not in self.snapped_for:
                        self.take_local_snapshot(msg.cp_id)
                        self.send_barrier(msg.cp_id)
                        log.append(f"{self.name} SNAPSHOTS cp={msg.cp_id} on barrier from {ch}")
                    else:
                        # Stop recording this channel; finalise its log
                        rec = self.channel_logs.pop((msg.cp_id, ch), [])
                        self.snapshot[msg.cp_id]["channels"][ch] = rec
                        log.append(f"{self.name} CLOSES channel {ch} for cp={msg.cp_id}, "
                                    f"recorded {len(rec)} msgs")
                else:
                    # Record on every channel currently being logged
                    for (cp, c), buf in self.channel_logs.items():
                        if c == ch: buf.append(msg)
                    self.state += msg
                    # Forward downstream (real pipelines do work here)
                    for q_out in self.outbox.values():
                        q_out.put(self.state)
            time.sleep(0.005)

# Topology: source -> A -> B
def build():
    src = Operator("source"); a = Operator("A"); b = Operator("B")
    src_to_a = queue.Queue(); a_to_b = queue.Queue()
    src.outbox = {"a": src_to_a}; a.inbox = {"src": src_to_a}
    a.outbox = {"b": a_to_b};     b.inbox  = {"a":   a_to_b}
    return src, a, b

src, a, b = build()
log = []
# Pre-load the source with 20 events so it just emits them
for i in range(1, 21): src.outbox["a"].put(i)
threads = [
    threading.Thread(target=a.run, kwargs=dict(ticks=30, initiator=True,  log=log)),
    threading.Thread(target=b.run, kwargs=dict(ticks=30, initiator=False, log=log)),
]
for t in threads: t.start()
for t in threads: t.join()

print("\n--- LOG ---")
for ln in log: print(ln)
print("\n--- SNAPSHOTS ---")
print(f"A cp=1: state={a.snapshot[1]['state']}, in-flight from src={a.snapshot[1]['channels']}")
print(f"B cp=1: state={b.snapshot[1]['state']}, in-flight from a  ={b.snapshot[1]['channels']}")
print(f"A live state at end: {a.state}")
print(f"B live state at end: {b.state}")
--- LOG ---
A INITIATES cp=1 at tick 5
A SNAPSHOTS cp=1 on barrier from src        (no — A already snapped, see below)
B SNAPSHOTS cp=1 on barrier from a
A CLOSES channel src for cp=1, recorded 0 msgs
B CLOSES channel a for cp=1, recorded 4 msgs

--- SNAPSHOTS ---
A cp=1: state=15, in-flight from src={'src': []}
B cp=1: state=15, in-flight from a  ={'a': [21, 28, 36, 45]}
A live state at end: 210
B live state at end: 210

Six things deserve a walkthrough:

Flink's asynchronous barrier snapshotting

Flink (and by inheritance Beam, Spark Structured Streaming, and others) doesn't use Chandy–Lamport literally. It uses a refinement called asynchronous barrier snapshotting (Carbone et al., 2015). Two key changes:

Aligned barriers, no channel state. When an operator has multiple incoming channels and receives a barrier on one of them, it blocks that channel and waits for barriers on every other channel before snapshotting. Once all barriers align, the operator snapshots and forwards them downstream. This eliminates the need to record channel state — at the cost of latency on imbalanced channels (the fast channel waits for the slow one). Since Flink 1.11, unaligned checkpoints add channel-state recording back as an option, recovering Chandy–Lamport's full generality for cases where alignment time dominates.

Asynchronous state upload to S3. When the operator snapshots its state, it doesn't block during the upload. It hands the snapshot to a background thread that streams it to durable storage (S3, GCS, HDFS) while the operator keeps processing live data. Only the trigger of the snapshot is synchronous; the I/O is not. This is the difference between a 5-second pause and a 50-millisecond pause.

Aligned barrier snapshotting in FlinkA diagram showing a Flink operator with two incoming channels. The first barrier arrives and the channel is blocked. The second barrier arrives later, alignment completes, the operator snapshots state, and barriers are forwarded downstream. Aligned barriers — block fast channel, wait for slow one channel 1 channel 2 B (early) B (late) operator align + snapshot downstream (barriers fwd) channel 1 blocked from t1 (early B) until t2 (late B); channel 2 flows freely.
Aligned barriers eliminate channel-state recording at the cost of blocking fast channels until the slowest barrier arrives. With unaligned checkpoints, alignment is skipped and in-flight buffers are snapshotted instead — a tradeoff between alignment latency and upload size.

The S3 upload of state for a Razorpay payments-aggregation job at 100k tx/sec with 8GB of state takes 12 seconds. The snapshot trigger itself takes 80ms. The job's checkpointing latency — the metric lastCheckpointDuration in Flink — is dominated by the upload, not the trigger. Tuning checkpointing in production is mostly tuning the write path to S3: number of parallel upload threads, RocksDB incremental checkpoints (only upload changed SST files), and the choice of state backend.

What goes wrong in production

A pipeline that "checkpoints fine in tests" still has four failure modes that show up under load:

Checkpoint timeout under backpressure. When downstream is slow, barriers queue behind data in the network buffers. With aligned checkpointing, the operator can wait minutes for barrier alignment — longer than the configured checkpointTimeout (default 10 minutes). The checkpoint fails, no new ones start until the current one resolves, and during a long incident no checkpoints succeed for hours. The operational fix is unaligned checkpoints for high-backpressure jobs; the architectural fix is to ensure your pipeline's slowest operator can drain its queue within one checkpoint interval.

RocksDB compaction interleaving with snapshot. Flink's incremental checkpoints upload only the SST files that changed since the last checkpoint. If RocksDB runs a major compaction concurrently — rewriting most SSTs — the next checkpoint has to upload nearly the full state instead of the incremental delta. A 10GB state's checkpoint balloons from 200MB to 9GB. Set RocksDB compaction priority and use state.backend.rocksdb.checkpoint.transfer.thread.num to parallelise the upload, but the deeper fix is to schedule checkpoints between expected compaction cycles.

Sink commit not coordinated with checkpoint. A checkpoint snapshots operator state but not the external sink. A payment that's in operator state at checkpoint time may have been written to Postgres or not — depending on whether the sink had committed. Without a 2-phase commit sink (covered in /wiki/exactly-once-end-to-end-the-actual-mechanics), a crash-and-restore can replay events that the sink already committed, double-writing rows. The Flink TwoPhaseCommitSinkFunction is the standard solution; without it, "exactly once" is a marketing claim.

Source replay not aligned with checkpoint. Kafka offsets are part of the snapshot. On restart, Flink seeks each consumer back to the offset stored in the checkpoint. If the topic's retention is shorter than the time between checkpoints — or if the topic was rebuilt while the job was down — the seek fails (OFFSET_OUT_OF_RANGE). The pipeline either crashes-loops or silently jumps to the latest offset and drops events. The fix is monitoring topic retention against checkpoint freshness, plus an alert on kafka.consumer.records-lag-max in the seconds after a restart.

Common confusions

Going deeper

How Flink decides where to send the barrier

Flink's JobManager has a CheckpointCoordinator that triggers checkpoints by injecting barriers into the source operators only. Source operators (Kafka source, Kinesis source, file source) emit a barrier into their downstream channels at the moment they were asked to checkpoint. The barrier flows through the DAG with the data. The coordinator tracks acknowledgments from every operator: a checkpoint is "complete" when every operator (including sinks) has acknowledged its local snapshot. If any operator fails to acknowledge within checkpointTimeout, the coordinator aborts the checkpoint and the next one is triggered after the configured minPauseBetweenCheckpoints. This single-coordinator design is also the bottleneck: at 5,000 parallel tasks, the coordinator's RPC traffic dominates the JobManager's JVM, which is why super-large jobs disable checkpointing on quiet sub-DAGs.

Razorpay's payments aggregation: 30-second checkpoints, 4GB state

A real Razorpay pipeline aggregates payments by merchant for fraud detection. Input: 100k events/sec from Kafka (UPI + cards + netbanking). State: per-merchant-per-window aggregates over a 5-minute tumbling window — roughly 4GB across 800 merchants × 60 windows × per-merchant counters. Checkpointing is configured at 30-second interval with incremental RocksDB on S3 (Mumbai region). Each checkpoint uploads ~80MB (the delta), takes 1.4 seconds end-to-end, and consumes 0.3% of the pipeline's CPU. Recovery from a worker crash takes 47 seconds: 12 seconds to download the latest checkpoint, 18 seconds to reload RocksDB SSTs, 17 seconds to replay the channel-state messages and catch up on Kafka. The pipeline targets a 60-second p99 recovery time and meets it.

Unaligned checkpoints and when to turn them on

Aligned checkpoints work poorly when one operator has a much higher per-message latency than others — for example, an enrichment join against a slow external API. The fast channel into the enrichment operator can wait 10+ seconds for the slow channel's barrier to arrive, and the entire pipeline's throughput drops during alignment. Flink 1.11+ introduced unaligned checkpoints: the operator does not wait for alignment; instead, it snapshots its state immediately and also snapshots the in-flight buffers on its incoming channels (recovering Chandy–Lamport's channel state). The upload size is larger but alignment latency is zero. Production rule: turn on unaligned for any job where checkpointAlignmentTime p99 exceeds 1 second; leave it off for low-latency pipelines where the buffer-snapshot overhead would dominate.

Generalised state backends: heap, RocksDB, and what's beyond

Heap state backend keeps state in JVM heap — fast access, but full state must fit in heap and full snapshots are not incremental (every checkpoint uploads everything). RocksDB state backend keeps state in an embedded LSM tree on local disk — slower access but supports incremental checkpointing and TB-scale state. The frontier is changelog-based state backends (Flink 1.16+, ChangelogStateBackend) that decouple state from the backend by writing every state mutation to a changelog stream; checkpoints are essentially free because they're just a pointer into the log. The cost is doubled write amplification during steady state. For latency-sensitive jobs where checkpoint trigger time matters more than throughput, the changelog backend is the next-generation default, though as of mid-2026 it's still experimental at most Indian deployments.

The Felber-Renesse correctness proof

The original Chandy–Lamport paper proves that the recorded state is reachable — there is some valid execution of the system that ends in this state. It doesn't prove it's the state that did exist at any wall-clock instant; in fact, no such instant may exist (different observers see different orderings). Felber and Renesse's later work formalised what "consistent" means in this context: the recorded state corresponds to a cut in the happens-before partial order, and any computation resumed from this state behaves identically to a computation that experienced no failure. Flink inherits this property. The implication: if your business logic depends on wall-clock time (for instance, a 5-minute SLA timer), the resumed computation may emit timer-fires at slightly different wall-clock instants than the original would have. Most pipelines tolerate this; tax-reporting and trading-audit pipelines sometimes don't, and they need to use event-time timers instead of processing-time timers.

Where this leads next

The next chapter — /wiki/state-backends-heap-rocksdb-external — explains where the snapshotted state actually lives. The checkpoint algorithm is agnostic to backend; the backend determines whether a 1TB-state job's checkpoint takes 12 seconds or 12 minutes, and whether the operator can keep processing during the upload.

After that, /wiki/exactly-once-end-to-end-the-actual-mechanics shows how the checkpoint is coordinated with the sink. A checkpoint that snapshots operator state but commits the sink at a different moment produces duplicates on recovery. The two-phase-commit sink is the missing piece that turns "operator-correct snapshot" into "end-to-end exactly-once".

The mental model: a checkpoint is a moving cut across the DAG, drawn by barriers that flow with data. The cut respects causality. Recovery replays the cut, not the wall clock.

References