Watermarks: how we know we're "done"
It is 19:31 in Bengaluru. Riya's Flink job has just finished aggregating the 19:30 UPI minute and pushed {minute: "19:30", total: 312_84_50_000_paise} to the dashboard. At 19:33 a Jio tower in Lucknow flushes a 90-second NAT buffer; 1,824 transactions that happened at 19:30:14 to 19:30:44 land at the operator at 19:33:08. The 19:30 bucket is already closed and emitted. What does the runtime do with the late events? Drop them? Re-open the window and emit a new total? The answer depends on a single number Flink has been quietly tracking the entire time — the watermark — and the policy you wrote about how late is too late.
A watermark is a single timestamp W(t) the runtime emits saying "I believe no event with event_time < W(t) will arrive any more". Windows fire when their end-time passes the watermark. Watermarks are necessarily wrong sometimes — they are bets, not guarantees — and the entire engineering of stream processing is about how to bound the bet's badness without waiting forever.
What a watermark actually is
The previous chapter (/wiki/event-time-vs-processing-time-the-whole-ballgame) ended with a problem: if you bucket events by event time, the operator never knows when a window is "done". An event from 19:30 might arrive at 19:33, or 19:47, or never. You can't wait forever. You can't fire immediately. You need a third option — a promise the runtime makes to itself that gets less wrong over time but is never perfectly right.
That promise is a watermark. Concretely, a watermark is a stream of Watermark(W) markers interleaved with the data stream. Each marker carries a timestamp. The marker at position k says: "every event that comes after me on this stream will have event_time >= W_k". Equivalently: "I do not expect any event with event_time < W_k from now on".
The runtime emits these markers from the source operator. As events flow through the topology, every operator updates its local notion of the watermark to be the minimum of the watermarks on its inputs. When a window's end-time t_end passes the operator's current watermark — i.e. when W >= t_end — the runtime fires the window: aggregates the buffered events, emits the result, frees the state.
Why this is the central abstraction of stream processing: a watermark is the only thing that lets a streaming runtime emit any event-time aggregate without waiting forever. Without it, "what was the UPI total at 19:30?" has no answer until the heat death of the universe. With it, the runtime gets to say "as of now, my best estimate is X, and the probability that more 19:30 events show up is bounded".
The bet has two failure modes. Premature watermark: the runtime advances W to 19:30:25, fires the 19:30 window, and then a Lucknow event from 19:30:08 arrives. The window has already been emitted; the late event must be handled by a separate policy (drop, side-output, retract). Lagging watermark: the runtime is too cautious and advances W slowly. The window fires 60 seconds after the wallclock crossed 19:31, instead of 5 seconds after. Latency suffers, downstream gets the answer late, but no event is ever "late" because the bet was conservative.
Every watermark strategy is a knob between these two modes. Which knob you turn determines whether your dashboard updates fast or your numbers are right — and most production jobs have to do both through layered policy.
Where the watermark comes from
The runtime can't conjure a watermark out of nothing — it has to derive it from the events it sees. Three strategies, each with a different failure profile:
Bounded out-of-orderness. The simplest, most-used strategy. The user declares a maximum lateness — say Duration.ofSeconds(30) — and the runtime maintains W = max(event_time seen so far) - 30s. Every event lifts the high-water mark; the watermark trails 30 seconds behind. Picking the bound is the entire skill: too small (5 seconds) and 1% of real events are late; too large (5 minutes) and your dashboard updates every 5 minutes. Production picks per-source — Razorpay UPI events use 30 seconds, Hotstar mobile-player events use 90 seconds because their tail is fatter, internal Kafka-to-Kafka pipelines use 2 seconds because everything is in one datacentre.
Periodic with custom logic. The runtime calls a user-defined extractTimestamp(event) and currentWatermark() method every N milliseconds (default 200ms in Flink). The user can do anything — a sliding-window quantile estimator over recent delays, a per-source watermark that respects the slowest device, a heuristic that holds the watermark back during known surge windows (Diwali, IPL final overs, GST filing deadline). Flink's WatermarkStrategy.forGenerator(...) exposes this. Reach for it when bounded-out-of-orderness is too coarse.
Punctuated. Some streams come with an explicit "this is the end" marker — for example, a CDC stream that emits a commit_lsn after every Postgres transaction commit. The producer knows exactly when no earlier events will arrive, because the database told it. The watermark advances only on these markers. This is the most accurate strategy but only works when the source itself can prove the bound.
A fourth pattern is worth naming because Build 8 will hit it again: per-partition watermarks. A Kafka source with 32 partitions does not compute one watermark over the union of events; it computes 32 separate watermarks, one per partition, and the operator's effective watermark is the minimum across them. This is what makes per-partition order safe to exploit and what creates the "watermark stalls because partition 7 is slow" failure mode covered later.
A demo: build a watermark from scratch
To feel where the bet lives, write the simplest watermark generator and run it against a stream with a known delay distribution.
# build_a_watermark.py — a 60-line streaming runtime fragment
import random, heapq
from collections import defaultdict
from dataclasses import dataclass
@dataclass(order=True)
class Event:
processing_time: int
event_time: int = 0
payload: int = 0
def stream(n=500, seed=7):
rng = random.Random(seed)
out = []
for i in range(n):
et = i # one event per second of event time
roll = rng.random()
if roll < 0.91: delay = rng.randint(0, 1)
elif roll < 0.99: delay = rng.randint(20, 50)
else: delay = rng.randint(80, 200) # long-tail "Lucknow tower"
out.append(Event(processing_time=et + delay, event_time=et,
payload=rng.randint(100, 5000)))
out.sort() # operator sees by processing_time
return out
def run(events, lateness=30, window_size=60):
"""Bounded-out-of-orderness watermark. Fire 60s windows when W >= window_end."""
high_water = 0
fired_windows: dict[int, int] = {}
pending = defaultdict(int) # window_start -> sum
late_events = 0; late_paise = 0
for e in events:
# 1) Update high-water mark
if e.event_time > high_water:
high_water = e.event_time
watermark = high_water - lateness
# 2) Window assignment
win_start = (e.event_time // window_size) * window_size
win_end = win_start + window_size
# 3) Late event check: window already fired?
if win_start in fired_windows:
late_events += 1; late_paise += e.payload
continue
pending[win_start] += e.payload
# 4) Fire any windows whose end has passed the watermark
ready = [w for w in pending if w + window_size <= watermark]
for w in ready:
fired_windows[w] = pending.pop(w)
# Drain remaining (job ending — assume infinite watermark)
for w, total in pending.items():
fired_windows[w] = total
return fired_windows, late_events, late_paise
for lateness in (5, 30, 90):
fired, n_late, p_late = run(stream(), lateness=lateness)
total = sum(fired.values())
print(f"lateness={lateness:3d}s windows={len(fired):2d} total=₹{total/100:>9.2f} "
f"late_events={n_late:3d} late_₹={p_late/100:.2f}")
lateness= 5s windows= 9 total=₹14237.66 late_events= 28 late_₹=842.18
lateness= 30s windows= 9 total=₹14237.66 late_events= 4 late_₹=128.04
lateness= 90s windows= 9 total=₹14237.66 late_events= 0 late_₹=0.00
Six lines do the engineering:
high_water = e.event_time— the only state the watermark generator carries is the maximum event-time seen so far. Why this is enough: bounded-out-of-orderness assumes the lateness is bounded (the user's promise), so the maximum seen so far minus the bound is a valid floor on what's still arriving.watermark = high_water - lateness— the watermark trails the high-water mark by exactlylateness. A larger lateness means a more conservative watermark (fewer late events, more buffering, slower window fires).win_end <= watermark— the firing condition. A window can be safely emitted when its end-time has been crossed by the watermark, because by then the runtime has promised that noevent_time < win_endwill arrive.if win_start in fired_windows: late_events += 1— late-event accounting. Withlateness=5, 28 events overshoot; withlateness=30, only 4; withlateness=90, zero. The total ₹ is identical across all three runs because the late events still contribute tototalif you sumfired_windows.values()before the late check — but in production they would be in side-output, not the main aggregate.pending[win_start] += e.payload— the state of in-flight windows. Why this is the dominant cost: every Flink/Beam/Spark Streaming job allocates RocksDB rows forpendingkeyed by(window_start, key). State size grows linearly with(allowed_lateness × event_rate × distinct_keys). A 30s window over 100k events/sec with 1M distinct keys holds tens of GB of state.for w, total in pending.items(): fired_windows[w] = total— the drain step. Real streams don't end, but if they do (job shutdown, batch mode), the runtime treats the final watermark as+∞and fires everything still pending.
The trade-off is starkly visible in the output. Lateness of 5 seconds means 28 events drop, totalling ₹842 — for a Razorpay reconciliation pipeline that is unacceptable. Lateness of 90 seconds means zero late events but every window emits 90 seconds after its event-time end — for a "transactions per minute" dashboard updated every 60 seconds, that lag is a problem too. The right answer for production is layered: a short watermark (30s) for fast first-emit, plus an allowed_lateness policy that updates the aggregate when stragglers arrive within the next 5 minutes (covered in /wiki/late-data-handling-allowed-lateness-side-outputs-retractions).
Watermark propagation through a topology
A Flink job is a DAG of operators. Each operator has its own notion of "current watermark" computed from its inputs. Single-input operators take the input's watermark as theirs. Multi-input operators (joins, unions) take the minimum across inputs.
Why minimum? Because the multi-input operator must be safe with respect to every input. If the click stream's watermark is at 19:30:25 and the order-completed stream's watermark is at 19:29:50, an event with event_time = 19:30:00 could still arrive on the order stream — so the join operator's watermark cannot have advanced past 19:29:50. The slowest input governs the operator's progress.
This is the reason a Flink job's watermark "lag" metric (watermark_lag_ms = wallclock - operator_watermark) is the single most important production health signal. Lag growing means some operator's slowest input has stopped advancing. The lag tells you how long ago "now" was, on the event-time clock the entire pipeline runs on.
# topology_min_watermark.py — minimum-across-inputs propagation
class Operator:
def __init__(self, name, n_inputs):
self.name = name
self.input_wms = [0] * n_inputs
self.current_wm = 0
def receive_watermark(self, input_idx, wm):
if wm > self.input_wms[input_idx]:
self.input_wms[input_idx] = wm
new = min(self.input_wms)
if new > self.current_wm:
self.current_wm = new
return new # advance downstream
return None
join = Operator("interval-join", n_inputs=2)
print(join.receive_watermark(0, 100)) # clicks at 100 → wm = min(100, 0) = 0 → no advance
print(join.receive_watermark(1, 95)) # orders at 95 → wm = min(100, 95) = 95 → advance to 95
print(join.receive_watermark(0, 130)) # clicks at 130 → still min(130, 95) = 95 → no advance
print(join.receive_watermark(1, 120)) # orders at 120 → min(130, 120) = 120 → advance to 120
None
95
None
120
The slow input (orders) governs the join's watermark every time. Doubling the click stream's rate doesn't help. This is the asymmetric backpressure trap covered in /wiki/event-time-vs-processing-time-the-whole-ballgame's "Going deeper" section, and the reason WatermarkStrategy.withIdleness(...) exists — to let a stalled input be excluded from the minimum calculation after a timeout, with the trade-off that resuming events from that input become "late" against the advanced watermark.
Watermarks in production: what to monitor
The Flink/Beam/Spark Streaming production playbook is short, and every team learns it the same way (by getting paged). Three numbers belong on the dashboard.
Watermark lag — wallclock - watermark. Steady-state should match your forBoundedOutOfOrderness bound plus pipeline latency (typically 30–120 seconds). A growing lag means an upstream is stalled or backpressure is biting. Alert at 3× the steady-state.
Late-event rate — events dropped or side-outputted because their event-time fell behind the watermark when they arrived. Should be under 0.1% in healthy operation. A spike means the lateness bound is too tight or a source has a delay tail you didn't model. Why both metrics matter together: lag and late-event-rate are inversely correlated knobs. Tighter lateness → smaller lag, more late events. Looser lateness → larger lag, fewer late events. The dashboard tells you which side of the trade-off you're on.
Per-partition watermark spread — the gap between the fastest and slowest partition's watermark. A growing spread is the early warning that a single Kafka partition is falling behind (broker on a slow rack, replication catching up, GC pause). The job's overall watermark is governed by the slowest partition, so the spread tells you which partition to investigate before the whole job's watermark stops advancing.
A real Razorpay-shaped Flink dashboard has these three on a single panel, plus the input-rate per partition, plus checkpoint-duration. When the on-call engineer is paged at 2 a.m. for "the UPI dashboard is stale", the answer almost always lives in this panel.
Common confusions
- "A watermark is the current time." A watermark is a timestamp on the event-time axis, expressed in event-time units. It has nothing to do with the operator's wallclock. A watermark of
19:30:25at wallclock19:31:08means "I think no event with event-time before 19:30:25 will arrive any more"; the wallclock could be Tuesday next week and the watermark still says 19:30:25 if no new data has arrived. - "Watermarks are only used for windowing." Windowing is the loudest use, but the same watermark drives event-time timers (used for session windows, for "if no follow-up event in 30s emit a session-closed marker"), interval-joins (state pruning), and event-time-aware sources (waiting for replay completion). Any operator that needs to know "is there more past the timestamp T?" reads the watermark.
- "A larger lateness bound is always safer." It is safer in the sense of fewer late events, but it costs RocksDB state proportional to
lateness × rate × keys. A naive 5-minute lateness on the Hotstar concurrent-viewer stream (10M events/sec at peak, distinct user_id keys) is tens of GB of state per minute. Production teams measure the actual delay distribution and pick a bound at the 99.9th percentile, then handle the long tail with a side-output policy. - "Watermarks can be wrong, so they're useless." Watermarks are necessarily wrong some of the time — they are statistical bets, not guarantees. The engineering is in bounding the wrongness: how often (rate of late events) and how badly (lateness magnitude). With a well-tuned bound, less than 0.1% of events arrive late; with allowed-lateness side-outputs the rest are still captured. The system is "right with bounded error", which is the only achievable goal in any system that must emit before its inputs are exhausted.
- "Watermarks are emitted only by sources." Sources generate watermarks; every operator propagates and possibly adjusts them (e.g. a per-key reorder operator may delay watermark forwarding). Custom operators that buffer events for reordering or dedup must subtract their own buffering bound from the inbound watermark before forwarding it.
- "Watermarks make a pipeline exactly-once." No — watermarks are about event-time semantics, not delivery. Exactly-once requires checkpoint barriers, idempotent sinks, and transactional commits (Build 9). Watermarks tell you when to emit; exactly-once tells you how to commit so that emitting-twice doesn't create duplicates.
Going deeper
The Dataflow Model and what watermarks are not
Akidau et al. distinguish "completeness" (the watermark has crossed every event for a window) from "punctuality" (the runtime has committed to that completeness). Watermarks supply completeness probabilistically — a heuristic estimate. Punctuality, in contrast, is a guarantee, and the only way to get it is from the source itself (a CDC commit-LSN, a batch-job done-marker, an end-of-day boundary). The Dataflow paper's key contribution was untangling these: most production streams are run with heuristic watermarks plus an allowed_lateness retraction policy, which together approximate guarantees without ever achieving them. Reading section 3 of Akidau is worth the hour — it is the cleanest formal statement of why streaming correctness is structurally a bounded-error problem, not a zero-error one.
Idleness — the trick to keep the slowest partition from stalling everything
A 32-partition Kafka topic where partition 7 receives one event per minute and the others receive 1k/sec produces a tragedy: the operator's watermark is the minimum, so it advances at partition 7's rate. The fix is WatermarkStrategy.withIdleness(Duration.ofMinutes(2)). After 2 minutes of no events on partition 7, that partition is excluded from the minimum and the watermark advances at the rate of the active partitions. The cost: any event that does eventually arrive on partition 7 is treated as late against the now-advanced watermark and goes to side-output. Idleness is the right call when partition 7 is genuinely empty (e.g. a Kafka topic with one partition per Indian state and Mizoram is quiet) and the wrong call when partition 7 is congested (it'll catch up, and you've made every catch-up event late). Pick by knowing the source.
Watermarks under failure: the checkpoint-restart asymmetry
When a Flink job fails and restarts from a checkpoint, the watermark resets to its checkpointed value — typically a few seconds before the failure. The runtime then re-reads Kafka from the checkpointed offset and re-derives the watermark from scratch. This is correct for event-time semantics: the same input produces the same watermark. But it does mean the processing of the post-checkpoint events happens at a wallclock far past their event-time, and any external system (a sink, a dashboard, a downstream operator) that timestamps things at wallclock-of-receipt now has a minute or two of "old" events arriving rapidly. Sinks need to be careful — a Postgres INSERT INTO metrics(window, value, ingested_at) should use window (event-time) as the dedup key, not ingested_at. The restart will replay events; idempotent sinks survive, non-idempotent sinks emit duplicates.
When to write a custom watermark assigner
Three signals you've outgrown forBoundedOutOfOrderness. (a) The source has multiple delay regimes — the Hotstar mobile player has 200ms median for online users and 30-minute median for users who paused with bad connectivity. A single bound is wrong for both. Write a per-source-subclass watermark that uses the playback-mode field to pick the bound. (b) The source has known surge windows — Razorpay knows that on Diwali night the delay tail grows because Jio queues lengthen. A custom assigner can hold the watermark back during the 19:30–22:00 IST window. (c) The source itself can prove a bound — Postgres logical decoding emits commit_lsn at every transaction commit, and that LSN's timestamp is a hard guarantee that no earlier transaction will arrive. A punctuated assigner that advances on commit markers gives you tight watermarks with zero late events.
Watermark holding for sub-graph correctness
Some operators need to hold back the watermark they emit. A reordering operator that buffers 5 seconds of events to sort them by event_time must subtract 5 seconds from the inbound watermark before forwarding it; otherwise downstream operators would see watermarks that promise "no events earlier than T" while the reorderer still has events earlier than T sitting in its buffer. Flink's BoundedOutOfOrdernessTimestampExtractor and Beam's Timer.OnTimerExpired are the standard places where holds are inserted. If you write a custom operator that buffers events for any reason, the watermark you emit must reflect the buffering, not the pass-through input.
Where this leads next
The next chapter — /wiki/late-data-handling-allowed-lateness-side-outputs-retractions — covers what to do when an event lands past the watermark. Three policies, three trade-offs: drop (cheap), buffer-then-drop (the allowed_lateness knob), or retract-and-re-emit (correct, expensive, requires downstream cooperation).
After that, /wiki/triggers-when-to-emit-not-just-what-to-compute generalises beyond "fire on watermark" to "fire on any condition" — early triggers for low-latency intermediate results, custom triggers for business rules ("emit when 100k events accumulate or 60s pass, whichever first").
The mental model for Build 8: the watermark is the runtime's clock for the world, the event-time end-of-window is the question, and triggers + late-data policy are the answers. Every other Build 8 chapter — RocksDB state, checkpoints, exactly-once, stream joins — assumes this clock works. Build 9 closes the loop on durability; this chapter is about correctness on the time axis.
References
- The Dataflow Model — Akidau et al., 2015 — section 3 is the canonical formalisation of watermarks and the heuristic-vs-perfect distinction.
- The world beyond batch: Streaming 102 (Akidau on O'Reilly) — readable treatment of watermarks, triggers, and late data.
- Apache Flink — Generating Watermarks —
WatermarkStrategy,forBoundedOutOfOrderness,withIdleness, custom generators. - MillWheel: Fault-Tolerant Stream Processing at Internet Scale (VLDB 2013) — the system that introduced the production-grade watermark-with-low-watermark mechanism, ancestor of Beam.
- Streaming Systems — Akidau, Chernyak, Lax (O'Reilly) — chapter 3 on watermarks; chapter 4 on triggers.
- /wiki/event-time-vs-processing-time-the-whole-ballgame — the previous chapter; watermarks only make sense on the event-time clock.
- /wiki/late-data-handling-allowed-lateness-side-outputs-retractions — the next chapter; what the runtime does when the watermark's bet was wrong.
- /wiki/state-stores-why-rocksdb-is-in-every-streaming-engine — where the buffered "in-flight window" state lives during the watermark's wait.