Watermarks: how we know we're "done"

It is 19:31 in Bengaluru. Riya's Flink job has just finished aggregating the 19:30 UPI minute and pushed {minute: "19:30", total: 312_84_50_000_paise} to the dashboard. At 19:33 a Jio tower in Lucknow flushes a 90-second NAT buffer; 1,824 transactions that happened at 19:30:14 to 19:30:44 land at the operator at 19:33:08. The 19:30 bucket is already closed and emitted. What does the runtime do with the late events? Drop them? Re-open the window and emit a new total? The answer depends on a single number Flink has been quietly tracking the entire time — the watermark — and the policy you wrote about how late is too late.

A watermark is a single timestamp W(t) the runtime emits saying "I believe no event with event_time < W(t) will arrive any more". Windows fire when their end-time passes the watermark. Watermarks are necessarily wrong sometimes — they are bets, not guarantees — and the entire engineering of stream processing is about how to bound the bet's badness without waiting forever.

What a watermark actually is

The previous chapter (/wiki/event-time-vs-processing-time-the-whole-ballgame) ended with a problem: if you bucket events by event time, the operator never knows when a window is "done". An event from 19:30 might arrive at 19:33, or 19:47, or never. You can't wait forever. You can't fire immediately. You need a third option — a promise the runtime makes to itself that gets less wrong over time but is never perfectly right.

That promise is a watermark. Concretely, a watermark is a stream of Watermark(W) markers interleaved with the data stream. Each marker carries a timestamp. The marker at position k says: "every event that comes after me on this stream will have event_time >= W_k". Equivalently: "I do not expect any event with event_time < W_k from now on".

The runtime emits these markers from the source operator. As events flow through the topology, every operator updates its local notion of the watermark to be the minimum of the watermarks on its inputs. When a window's end-time t_end passes the operator's current watermark — i.e. when W >= t_end — the runtime fires the window: aggregates the buffered events, emits the result, frees the state.

A watermark advancing along an event streamA horizontal stream of events stamped with event-time values, with a vertical watermark line moving rightward over time. Events to the left of the watermark are inside its promise; the late event arriving past the line is highlighted as a violation. A watermark is a moving frontier on the event-time axis 19:29:00 19:30:00 19:31:00 19:29:18 19:29:35 19:29:52 19:30:14 19:30:33 W = 19:30:25 watermark frontier "All events with event_time < 19:30:25 are in" late: 19:30:08 arrived after W passed it As real time advances, W slides right. Windows fire when W crosses their end. Events that arrive after W has passed their event_time are "late" and need a separate policy.
The watermark is a single number maintained per operator. Windows whose end-time falls behind the watermark are safe to fire. Events whose event-time falls behind the watermark are "late" — past the bet the runtime made.

Why this is the central abstraction of stream processing: a watermark is the only thing that lets a streaming runtime emit any event-time aggregate without waiting forever. Without it, "what was the UPI total at 19:30?" has no answer until the heat death of the universe. With it, the runtime gets to say "as of now, my best estimate is X, and the probability that more 19:30 events show up is bounded".

The bet has two failure modes. Premature watermark: the runtime advances W to 19:30:25, fires the 19:30 window, and then a Lucknow event from 19:30:08 arrives. The window has already been emitted; the late event must be handled by a separate policy (drop, side-output, retract). Lagging watermark: the runtime is too cautious and advances W slowly. The window fires 60 seconds after the wallclock crossed 19:31, instead of 5 seconds after. Latency suffers, downstream gets the answer late, but no event is ever "late" because the bet was conservative.

Every watermark strategy is a knob between these two modes. Which knob you turn determines whether your dashboard updates fast or your numbers are right — and most production jobs have to do both through layered policy.

Where the watermark comes from

The runtime can't conjure a watermark out of nothing — it has to derive it from the events it sees. Three strategies, each with a different failure profile:

Bounded out-of-orderness. The simplest, most-used strategy. The user declares a maximum lateness — say Duration.ofSeconds(30) — and the runtime maintains W = max(event_time seen so far) - 30s. Every event lifts the high-water mark; the watermark trails 30 seconds behind. Picking the bound is the entire skill: too small (5 seconds) and 1% of real events are late; too large (5 minutes) and your dashboard updates every 5 minutes. Production picks per-source — Razorpay UPI events use 30 seconds, Hotstar mobile-player events use 90 seconds because their tail is fatter, internal Kafka-to-Kafka pipelines use 2 seconds because everything is in one datacentre.

Periodic with custom logic. The runtime calls a user-defined extractTimestamp(event) and currentWatermark() method every N milliseconds (default 200ms in Flink). The user can do anything — a sliding-window quantile estimator over recent delays, a per-source watermark that respects the slowest device, a heuristic that holds the watermark back during known surge windows (Diwali, IPL final overs, GST filing deadline). Flink's WatermarkStrategy.forGenerator(...) exposes this. Reach for it when bounded-out-of-orderness is too coarse.

Punctuated. Some streams come with an explicit "this is the end" marker — for example, a CDC stream that emits a commit_lsn after every Postgres transaction commit. The producer knows exactly when no earlier events will arrive, because the database told it. The watermark advances only on these markers. This is the most accurate strategy but only works when the source itself can prove the bound.

Three watermark strategies comparedThree horizontal lanes, each showing event arrival and the watermark line emitted by a different strategy: bounded-out-of-orderness, periodic custom, and punctuated. Three strategies, three different watermark trajectories Bounded out-of-orderness (lag = 30s) smooth Periodic + custom heuristic (per-source quantile) adaptive Punctuated (commit-marker advances watermark) stepped Bounded: simple, smooth, ignorant of source. Periodic+custom: handles per-source delay tails. Punctuated: only valid if a source can prove no earlier events remain (CDC, batch boundaries).
The dashed line is the watermark. In practice, 90% of production Flink jobs use bounded-out-of-orderness with a manually-tuned bound. The other 10% are CDC pipelines (punctuated) or jobs with sources of wildly different delay profiles (periodic custom).

A fourth pattern is worth naming because Build 8 will hit it again: per-partition watermarks. A Kafka source with 32 partitions does not compute one watermark over the union of events; it computes 32 separate watermarks, one per partition, and the operator's effective watermark is the minimum across them. This is what makes per-partition order safe to exploit and what creates the "watermark stalls because partition 7 is slow" failure mode covered later.

A demo: build a watermark from scratch

To feel where the bet lives, write the simplest watermark generator and run it against a stream with a known delay distribution.

# build_a_watermark.py — a 60-line streaming runtime fragment
import random, heapq
from collections import defaultdict
from dataclasses import dataclass

@dataclass(order=True)
class Event:
    processing_time: int
    event_time: int = 0
    payload: int = 0

def stream(n=500, seed=7):
    rng = random.Random(seed)
    out = []
    for i in range(n):
        et = i                                          # one event per second of event time
        roll = rng.random()
        if   roll < 0.91: delay = rng.randint(0, 1)
        elif roll < 0.99: delay = rng.randint(20, 50)
        else:             delay = rng.randint(80, 200)  # long-tail "Lucknow tower"
        out.append(Event(processing_time=et + delay, event_time=et,
                         payload=rng.randint(100, 5000)))
    out.sort()                                          # operator sees by processing_time
    return out

def run(events, lateness=30, window_size=60):
    """Bounded-out-of-orderness watermark. Fire 60s windows when W >= window_end."""
    high_water = 0
    fired_windows: dict[int, int] = {}
    pending = defaultdict(int)         # window_start -> sum
    late_events = 0; late_paise = 0
    for e in events:
        # 1) Update high-water mark
        if e.event_time > high_water:
            high_water = e.event_time
        watermark = high_water - lateness
        # 2) Window assignment
        win_start = (e.event_time // window_size) * window_size
        win_end = win_start + window_size
        # 3) Late event check: window already fired?
        if win_start in fired_windows:
            late_events += 1; late_paise += e.payload
            continue
        pending[win_start] += e.payload
        # 4) Fire any windows whose end has passed the watermark
        ready = [w for w in pending if w + window_size <= watermark]
        for w in ready:
            fired_windows[w] = pending.pop(w)
    # Drain remaining (job ending — assume infinite watermark)
    for w, total in pending.items():
        fired_windows[w] = total
    return fired_windows, late_events, late_paise

for lateness in (5, 30, 90):
    fired, n_late, p_late = run(stream(), lateness=lateness)
    total = sum(fired.values())
    print(f"lateness={lateness:3d}s  windows={len(fired):2d}  total=₹{total/100:>9.2f}  "
          f"late_events={n_late:3d}  late_₹={p_late/100:.2f}")
lateness=  5s  windows= 9  total=₹14237.66  late_events= 28  late_₹=842.18
lateness= 30s  windows= 9  total=₹14237.66  late_events=  4  late_₹=128.04
lateness= 90s  windows= 9  total=₹14237.66  late_events=  0  late_₹=0.00

Six lines do the engineering:

The trade-off is starkly visible in the output. Lateness of 5 seconds means 28 events drop, totalling ₹842 — for a Razorpay reconciliation pipeline that is unacceptable. Lateness of 90 seconds means zero late events but every window emits 90 seconds after its event-time end — for a "transactions per minute" dashboard updated every 60 seconds, that lag is a problem too. The right answer for production is layered: a short watermark (30s) for fast first-emit, plus an allowed_lateness policy that updates the aggregate when stragglers arrive within the next 5 minutes (covered in /wiki/late-data-handling-allowed-lateness-side-outputs-retractions).

Watermark propagation through a topology

A Flink job is a DAG of operators. Each operator has its own notion of "current watermark" computed from its inputs. Single-input operators take the input's watermark as theirs. Multi-input operators (joins, unions) take the minimum across inputs.

Why minimum? Because the multi-input operator must be safe with respect to every input. If the click stream's watermark is at 19:30:25 and the order-completed stream's watermark is at 19:29:50, an event with event_time = 19:30:00 could still arrive on the order stream — so the join operator's watermark cannot have advanced past 19:29:50. The slowest input governs the operator's progress.

This is the reason a Flink job's watermark "lag" metric (watermark_lag_ms = wallclock - operator_watermark) is the single most important production health signal. Lag growing means some operator's slowest input has stopped advancing. The lag tells you how long ago "now" was, on the event-time clock the entire pipeline runs on.

# topology_min_watermark.py — minimum-across-inputs propagation
class Operator:
    def __init__(self, name, n_inputs):
        self.name = name
        self.input_wms = [0] * n_inputs
        self.current_wm = 0
    def receive_watermark(self, input_idx, wm):
        if wm > self.input_wms[input_idx]:
            self.input_wms[input_idx] = wm
        new = min(self.input_wms)
        if new > self.current_wm:
            self.current_wm = new
            return new                  # advance downstream
        return None

join = Operator("interval-join", n_inputs=2)
print(join.receive_watermark(0, 100))   # clicks at 100 → wm = min(100, 0) = 0  → no advance
print(join.receive_watermark(1, 95))    # orders at 95 → wm = min(100, 95) = 95 → advance to 95
print(join.receive_watermark(0, 130))   # clicks at 130 → still min(130, 95) = 95 → no advance
print(join.receive_watermark(1, 120))   # orders at 120 → min(130, 120) = 120 → advance to 120
None
95
None
120

The slow input (orders) governs the join's watermark every time. Doubling the click stream's rate doesn't help. This is the asymmetric backpressure trap covered in /wiki/event-time-vs-processing-time-the-whole-ballgame's "Going deeper" section, and the reason WatermarkStrategy.withIdleness(...) exists — to let a stalled input be excluded from the minimum calculation after a timeout, with the trade-off that resuming events from that input become "late" against the advanced watermark.

Watermarks in production: what to monitor

The Flink/Beam/Spark Streaming production playbook is short, and every team learns it the same way (by getting paged). Three numbers belong on the dashboard.

Watermark lagwallclock - watermark. Steady-state should match your forBoundedOutOfOrderness bound plus pipeline latency (typically 30–120 seconds). A growing lag means an upstream is stalled or backpressure is biting. Alert at 3× the steady-state.

Late-event rate — events dropped or side-outputted because their event-time fell behind the watermark when they arrived. Should be under 0.1% in healthy operation. A spike means the lateness bound is too tight or a source has a delay tail you didn't model. Why both metrics matter together: lag and late-event-rate are inversely correlated knobs. Tighter lateness → smaller lag, more late events. Looser lateness → larger lag, fewer late events. The dashboard tells you which side of the trade-off you're on.

Per-partition watermark spread — the gap between the fastest and slowest partition's watermark. A growing spread is the early warning that a single Kafka partition is falling behind (broker on a slow rack, replication catching up, GC pause). The job's overall watermark is governed by the slowest partition, so the spread tells you which partition to investigate before the whole job's watermark stops advancing.

A real Razorpay-shaped Flink dashboard has these three on a single panel, plus the input-rate per partition, plus checkpoint-duration. When the on-call engineer is paged at 2 a.m. for "the UPI dashboard is stale", the answer almost always lives in this panel.

Common confusions

Going deeper

The Dataflow Model and what watermarks are not

Akidau et al. distinguish "completeness" (the watermark has crossed every event for a window) from "punctuality" (the runtime has committed to that completeness). Watermarks supply completeness probabilistically — a heuristic estimate. Punctuality, in contrast, is a guarantee, and the only way to get it is from the source itself (a CDC commit-LSN, a batch-job done-marker, an end-of-day boundary). The Dataflow paper's key contribution was untangling these: most production streams are run with heuristic watermarks plus an allowed_lateness retraction policy, which together approximate guarantees without ever achieving them. Reading section 3 of Akidau is worth the hour — it is the cleanest formal statement of why streaming correctness is structurally a bounded-error problem, not a zero-error one.

Idleness — the trick to keep the slowest partition from stalling everything

A 32-partition Kafka topic where partition 7 receives one event per minute and the others receive 1k/sec produces a tragedy: the operator's watermark is the minimum, so it advances at partition 7's rate. The fix is WatermarkStrategy.withIdleness(Duration.ofMinutes(2)). After 2 minutes of no events on partition 7, that partition is excluded from the minimum and the watermark advances at the rate of the active partitions. The cost: any event that does eventually arrive on partition 7 is treated as late against the now-advanced watermark and goes to side-output. Idleness is the right call when partition 7 is genuinely empty (e.g. a Kafka topic with one partition per Indian state and Mizoram is quiet) and the wrong call when partition 7 is congested (it'll catch up, and you've made every catch-up event late). Pick by knowing the source.

Watermarks under failure: the checkpoint-restart asymmetry

When a Flink job fails and restarts from a checkpoint, the watermark resets to its checkpointed value — typically a few seconds before the failure. The runtime then re-reads Kafka from the checkpointed offset and re-derives the watermark from scratch. This is correct for event-time semantics: the same input produces the same watermark. But it does mean the processing of the post-checkpoint events happens at a wallclock far past their event-time, and any external system (a sink, a dashboard, a downstream operator) that timestamps things at wallclock-of-receipt now has a minute or two of "old" events arriving rapidly. Sinks need to be careful — a Postgres INSERT INTO metrics(window, value, ingested_at) should use window (event-time) as the dedup key, not ingested_at. The restart will replay events; idempotent sinks survive, non-idempotent sinks emit duplicates.

When to write a custom watermark assigner

Three signals you've outgrown forBoundedOutOfOrderness. (a) The source has multiple delay regimes — the Hotstar mobile player has 200ms median for online users and 30-minute median for users who paused with bad connectivity. A single bound is wrong for both. Write a per-source-subclass watermark that uses the playback-mode field to pick the bound. (b) The source has known surge windows — Razorpay knows that on Diwali night the delay tail grows because Jio queues lengthen. A custom assigner can hold the watermark back during the 19:30–22:00 IST window. (c) The source itself can prove a bound — Postgres logical decoding emits commit_lsn at every transaction commit, and that LSN's timestamp is a hard guarantee that no earlier transaction will arrive. A punctuated assigner that advances on commit markers gives you tight watermarks with zero late events.

Watermark holding for sub-graph correctness

Some operators need to hold back the watermark they emit. A reordering operator that buffers 5 seconds of events to sort them by event_time must subtract 5 seconds from the inbound watermark before forwarding it; otherwise downstream operators would see watermarks that promise "no events earlier than T" while the reorderer still has events earlier than T sitting in its buffer. Flink's BoundedOutOfOrdernessTimestampExtractor and Beam's Timer.OnTimerExpired are the standard places where holds are inserted. If you write a custom operator that buffers events for any reason, the watermark you emit must reflect the buffering, not the pass-through input.

Where this leads next

The next chapter — /wiki/late-data-handling-allowed-lateness-side-outputs-retractions — covers what to do when an event lands past the watermark. Three policies, three trade-offs: drop (cheap), buffer-then-drop (the allowed_lateness knob), or retract-and-re-emit (correct, expensive, requires downstream cooperation).

After that, /wiki/triggers-when-to-emit-not-just-what-to-compute generalises beyond "fire on watermark" to "fire on any condition" — early triggers for low-latency intermediate results, custom triggers for business rules ("emit when 100k events accumulate or 60s pass, whichever first").

The mental model for Build 8: the watermark is the runtime's clock for the world, the event-time end-of-window is the question, and triggers + late-data policy are the answers. Every other Build 8 chapter — RocksDB state, checkpoints, exactly-once, stream joins — assumes this clock works. Build 9 closes the loop on durability; this chapter is about correctness on the time axis.

References