Windowing, watermarks, and event time
Run date -u; sleep 2; date -u and the gap is exactly two seconds — but a thousand laptops in Bengaluru, Mumbai, and Coimbatore doing the same thing have drifting clocks, lagging networks, and the occasional flaky 4G tower in Indore, so when their events finally land in your Kafka cluster "now" means seven different things. Windowing is how a streaming system decides whose "now" counts.
A windowed aggregate — count the clicks in the last minute, sum the payments in this hour — only makes sense if you pin down what "minute" or "hour" means. Event time is the time the event actually happened; processing time is when your code saw it. Real systems use event time, and a watermark is the system's best guess that "no more events older than t will arrive." When the watermark crosses a window's edge, the window closes, the result is emitted, and any later straggler is either dropped, sent to a side output, or merged into a revised result.
Two clocks, one stream
Every event a real system processes carries at least two timestamps:
- Event time — the wall-clock instant on the device or service that generated the event. A user tapping "Pay" on PhonePe at 09:42:17 IST. A taxi GPS pulse fired at 09:42:17 from a phone in Andheri. The fact the event refers to.
- Processing time — the wall-clock instant on the stream-processor's machine when it handled the event. Always later than event time, by anywhere from a millisecond to several minutes, depending on network, batching, and back-pressure.
Most beginner streaming code accidentally uses processing time — now() inside the handler — because it is the only clock the handler can see without the event carrying its own. That works for toy demos. It is catastrophic in production. A network blip that delays a thousand events by three seconds shows up in your dashboard as a three-second drop in traffic followed by a three-second spike — neither of which actually happened.
A correct streaming system reads the event-time field out of each record — usually a timestamp column the producer set — and uses that to decide which window the event belongs to. Processing time still exists, as the clock the engine uses to decide when to emit results. But the bucket an event lands in is determined by the time it actually happened.
Why event time is the only honest answer for analytics: any aggregate the business cares about — "UPI payments per minute", "concurrent BookMyShow seat reservations", "auto-rickshaw rides per pin code per hour" — is a statement about the world, not about the processor. Processing time tells you about your own infrastructure (it is good for SLO dashboards). Event time tells you about reality. Mix them up and your reality will be a function of your network jitter.
Three window shapes
Once event time is the rule, the next question is how to slice the stream. Three shapes account for nearly every windowed query you will ever write.
Tumbling
Disjoint, fixed-size, back-to-back. The minute 09:42:00–09:43:00 is one window; 09:43:00–09:44:00 is the next. Every event lands in exactly one window. This is the default for "per-minute counts" dashboards.
Hopping (sliding by step)
Fixed-size windows, but they overlap because they advance by a smaller step than their length. A 5-minute window with a 1-minute step gives you a new window every minute, each covering the last 5 minutes. An event at 09:42:17 lands in five windows: [09:38–09:43), [09:39–09:44), …, [09:42–09:47). Hopping windows produce smoother dashboards (a 5-minute moving average) at the cost of state proportional to the overlap.
Session
No fixed size. A session window is "all events for the same key with no gap longer than g between consecutive events." Riya opens the Swiggy app at 19:00, scrolls for 4 minutes, idles for 12 minutes, scrolls again at 19:16 — that is two sessions if g = 10 min, one session if g = 15 min. Used for user-engagement analytics where the natural unit is "a visit", not "a clock minute".
Each shape is a different fold over the stream. Tumbling and hopping windows are bounded by the clock and known in advance — the engine can pre-allocate state. Session windows are data-defined; the engine cannot know where one ends until it sees a gap of g, and may need to merge two existing windows when a late event bridges them.
What a watermark is
A streaming engine cannot wait forever before closing a window. If the 09:42 minute is to ever produce a result, the engine has to commit, at some point, to "no more 09:42 events are coming." That commitment is a watermark.
Define it precisely:
A watermark
W(t)at processing timetis the engine's claim that no event with event time less thanW(t)will arrive aftert.
A watermark is a promise about future arrivals, made on the basis of past arrivals. If the engine has seen a steady stream of events with event times around 09:42:30 for the last few seconds and has decided that anything older than 09:42:25 should already have shown up, it advances the watermark to 09:42:25. Once the watermark passes 09:42:00 (the close of the previous minute), the engine emits the count for the 09:41 window and closes it.
There are two questions a streaming engine must answer to emit a watermark:
- How is event time read out of each event? A timestamp extractor — usually
record.timestampfrom a Kafka header, or a JSON field the producer wrote. - How much delay should we allow before declaring "no more"? A bounded out-of-orderness policy — Flink's
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))for example. The engine subtracts that allowance from the maximum event time seen so far. Set it too small and you drop legitimate stragglers. Set it too large and your dashboards lag.
Why a watermark is best-effort, not a guarantee: a network partition or a producer bug can always send a record arbitrarily late. Streaming systems acknowledge this by classifying events that arrive after their watermark as late and giving you three options: drop, side-output, or revise. There is no way to be both timely and complete in a system with unbounded delay — this is the streaming version of the impossibility tradeoff. Pick a policy.
Building a tiny watermark engine
Open your editor. Type this in. Do not copy-paste — type it. Every line is a piece of the contract a real engine like Flink or Kafka Streams enforces, written small enough that you can hold it in your head.
# windowing.py — tumbling event-time windows with a bounded-lateness watermark
from collections import defaultdict
class TumblingWindowEngine:
def __init__(self, window_seconds: int, allowed_lateness: int):
self.W = window_seconds # window size in seconds
self.L = allowed_lateness # how late we'll accept stragglers
self.buckets = defaultdict(int) # window_start -> count
self.max_event_time = 0 # highest event time seen so far
self.last_emitted = -1 # window_start of last emitted window
self.late = [] # side output for very-late events
def _window_of(self, ts: int) -> int:
return (ts // self.W) * self.W # floor to window start
def _watermark(self) -> int:
# bounded-out-of-orderness: max event time minus the allowance
return self.max_event_time - self.L
def ingest(self, event_time: int, key: str):
wm = self._watermark()
win = self._window_of(event_time)
if win + self.W <= wm and win <= self.last_emitted:
# arrived after we already emitted that window: it's late
self.late.append((event_time, key))
return
self.buckets[win] += 1
self.max_event_time = max(self.max_event_time, event_time)
# close every window whose end time is <= the new watermark
wm = self._watermark()
emitted = []
for start in sorted(self.buckets):
if start + self.W <= wm and start > self.last_emitted:
emitted.append((start, self.buckets[start]))
self.last_emitted = start
for start, count in emitted:
print(f"WINDOW [{start}, {start + self.W}) → count={count}")
if __name__ == "__main__":
eng = TumblingWindowEngine(window_seconds=10, allowed_lateness=3)
# event_time, key — note events arrive out of order
stream = [
(1, "riya"), (3, "rahul"), (7, "asha"),
(12, "riya"), (15, "rahul"), (8, "kiran"), # 8 arrives late but within lateness
(22, "asha"), (26, "rahul"),
(4, "old"), # very late: should land in 'late'
]
for et, k in stream:
eng.ingest(et, k)
print("late events:", eng.late)
Run it and you will see the windows close in order as the watermark advances:
$ python windowing.py
WINDOW [0, 10) → count=4
WINDOW [10, 20) → count=2
late events: [(4, 'old')]
Walk it line by line — the simplicity is the point.
self.W, self.L. Two parameters. W is the window width — 10 seconds in the demo, typically 1 minute or 5 minutes in production. L is the allowed lateness: how far behind the maximum event time the watermark sits. Bigger L means more state retained and slower dashboards but fewer dropped events.
_window_of(ts). Floor an event time to its window start. Tumbling windows are uniform [k*W, (k+1)*W) intervals — figuring out which one a timestamp belongs to is a single division. This is also why tumbling windows are cheap: the bucket key is computed from the event alone, no lookups.
_watermark(). The bounded-out-of-orderness rule from Flink, in two operators. The watermark is the highest event time seen so far, minus the allowance. Each new event can only push the watermark forward (because max_event_time is monotonic). This monotonicity is what lets the engine close windows safely.
ingest(...). Three things happen. First, check whether this event is so old it falls into a window we already emitted — if so, it goes to the late side output and the bucket is not touched. Second, update the bucket and the maximum event time. Third, recompute the watermark and emit any windows whose right edge has fallen below it.
Why we check start + self.W <= wm, not start < wm: a tumbling window [10, 20) covers events with event times in [10, 20). The watermark must pass the right edge — 20 — before we can be sure no more events for that window are coming. Closing the window when the watermark just enters it would emit an undercount. This single inequality is the difference between correct and silently-wrong code; production engines trip over it constantly.
The 100-line difference between this toy and Flink is checkpointed state, RocksDB-backed buckets, and proper handling of multiple parallel sub-tasks — but the core contract is identical: ingest events, advance a per-task watermark, fire windows when the watermark crosses the boundary, route stragglers to a side output.
Late arrivals: drop, side-output, or revise
Once the watermark is in place, every event is implicitly classified.
- On time. Event time within
[max_event_time - L, max_event_time]. The window is still open, the count gets incremented, life is normal. - Late but tolerable. Event time is older than the watermark but its window has not yet been emitted (because of the allowance
L). Same handling as on-time. - Late after emission. Event time falls in a window that has already been closed and emitted. The engine has three choices.
The three choices are the operational interface every streaming engine exposes:
-
Drop. The cheapest. The event is silently discarded. Used for low-stakes telemetry where a fraction-of-a-percent loss is acceptable.
-
Side output. The event is routed to a separate stream named something like
late-events. A downstream consumer can audit it, store it for forensic analysis, or trigger a recompute. Flink'sgetSideOutput(lateOutputTag)is the canonical pattern. -
Revise (allowed lateness + retraction). The window's state is kept around for an extended grace period. When a late event arrives, the bucket is updated and a revised output is emitted — typically as a Kafka tombstone for the old value followed by the new value. The downstream
KTablecorrects itself. Materialize and ksqlDB do this by default; Flink does it only if you setallowedLatenesson the window.
For dashboards, side-output is usually right: the dashboard never has to revise, and an alert-on-late-event lets the on-call team investigate. For correctness-critical aggregates — Razorpay's per-merchant settlement, Zerodha's per-instrument trade volume — revise is non-negotiable. The state cost is real (windows must be retained for the retraction horizon), but the alternative is a wrong number on a regulatory report.
A worked example: BookMyShow seat-counts during a release
Make it concrete. BookMyShow is selling tickets for a Friday release of Animal. Every booking emits an event (show_id, seats, event_time_ms). The product team wants a dashboard counting bookings per show per minute, in event-time. Tickets are bought from phones across India — Mumbai on JioFiber, Coimbatore on flaky Airtel 4G, the IIT Madras campus on a corporate VPN that batches outbound traffic. Network delays of 0–10 seconds are normal; 30+ second delays show up during the worst cellular dips.
The team picks:
- Tumbling 1-minute windows (W = 60s).
- Allowed lateness 30 seconds (L = 30s) — covers the worst routine cellular jitter.
- Side-output for events older than 30 seconds — they go to a
bookings.latetopic. - A separate batch pipeline reconciles late events overnight and corrects the daily settlement report.
The watermark, in flight, advances every time a fresh event_time_ms is read off the topic. As long as bookings arrive at 09:42:14, 09:42:16, 09:42:19, the watermark sits at latest_event_time - 30s and tracks reality. When the watermark passes 09:43:00, the 09:42 minute closes and its count flushes to the dashboard. A booking whose event time is 09:42:55 but whose processing time is 09:43:25 still slides in — it is 30 seconds late, exactly at the lateness boundary. A booking with event time 09:42:55 and processing time 09:43:35 lands in the side output.
The reconciliation that closes the loop
A finance analyst at BookMyShow loads the dashboard the morning after release. The total seat count for the Animal 09:42 minute reads 11,420. The overnight batch reconciliation reads 11,453. The 33-event difference is the side-output: tickets that arrived too late for the streaming window. Both numbers are "right" — the dashboard tells the team what they could have known in real time, the batch number is the audited truth. The discipline of streaming for speed, batch for correctness is exactly the Lambda-architecture residue that survives even into Kappa: the late side-output is the one place a second pipeline still earns its keep.
The same shape appears in every Indian streaming pipeline that reports to a regulator. Razorpay's per-merchant settlement, Zerodha's end-of-day trade tape, and the NPCI's UPI clearing report all run a real-time aggregator for ops dashboards, and a separate event-sourced re-aggregation overnight that walks the full event log and produces the canonical number. The streaming engine's watermark tells you what was visible at the close of business; the overnight pipeline tells you what really happened. The gap between them — the side-output — is what every postmortem and every audit goes looking for first.
Common confusions
-
"Watermarks are timestamps the producer sets." They are not. The producer sets the event-time timestamp on each record. The watermark is generated by the engine, on the consumer side, as a derived signal of "no more old events expected." The two are different artefacts that travel together — a partition carries its events plus an interspersed stream of watermark markers.
-
"A bigger allowed-lateness means fewer dropped events but no other cost." It also means more retained state. A 5-minute window with 5-minute lateness keeps 5 minutes of state per key per window; with 30-minute lateness it keeps 30. RocksDB grows, checkpoints get heavier, and recovery slows. Pick the smallest lateness that captures the real-world delay distribution.
-
"Watermarks freeze when the stream is idle." Yes — and this is a real operational hazard. If a partition has no events for 10 minutes, its watermark stops advancing, every downstream window that depends on that partition stalls. Engines fix this with idle source detection: after a configurable idleness threshold the partition is excluded from the global watermark calculation. Flink's
withIdleness(Duration.ofMinutes(1))is the knob. Forget it and your dashboard mysteriously freezes at 03:00 IST when traffic is low. -
"Processing time is always wrong, event time is always right." Both have uses. Use event time for any business aggregate (counts, sums, windowed reports, fraud rules tied to wall-clock policy). Use processing time for SLO-style metrics about your own pipeline (consumer lag, throughput, end-to-end latency). Mixing them up is the bug; using each correctly is fine.
-
"Session windows close as soon as the gap elapses." Only after the watermark passes the gap. A late-arriving event within the gap can extend the session, or even merge two previously-separate sessions if it falls between them. This is why session windows are stateful and expensive — the engine retains every open session per key until the watermark guarantees no more bridging events are coming.
-
"Late events and out-of-order events are the same thing." Out-of-order events arrive before their window closes — the engine just slots them in. Late events arrive after — the window is already emitted. Allowed lateness is the buffer that turns the second into the first up to a chosen horizon.
Going deeper
Heuristic vs perfect watermarks
Bounded-out-of-orderness watermarks are heuristic: they assume the stream is at most L seconds disordered, and they are wrong whenever that assumption is violated. Google's MillWheel paper formalised the alternative: a perfect watermark for a bounded source — say, a Pub/Sub topic with a known per-partition delivery guarantee — that the system can prove never produces a late event. In practice, most Kafka-based pipelines settle for heuristic watermarks plus side-output for stragglers, because perfect watermarks require the source to expose a "low water mark" of pending publishes, and Kafka does not. Apache Beam's model lets the source choose; cloud sources like Pub/Sub provide perfect watermarks, while file-based sources do not.
The Dataflow model — windows, triggers, accumulation
The 2015 Google paper The Dataflow Model (Akidau et al., VLDB) is the conceptual canon for this whole chapter. It separates the question into three orthogonal axes: windowing (how to slice the stream), triggers (when to emit a result for a window), and accumulation (whether subsequent emissions for the same window discard, accumulate, or retract earlier ones). The framework is now codified in Apache Beam, Flink, ksqlDB, and Materialize. Reading the paper after this chapter is the right next step — every streaming engine is some specialisation of its design space.
Watermarks across joins and shuffles
A streaming engine usually has many parallel sub-tasks — say, 24 Flink task slots reading 24 Kafka partitions. Each sub-task tracks its own watermark; the global watermark of an operator is the minimum across upstream watermarks, because the operator cannot emit until every upstream has passed the boundary. After a shuffle (a re-partition by a different key), the new operator's watermark is the minimum of all upstream sub-tasks' watermarks. This is why a single slow partition can hold up the entire job: its low watermark drags down the minimum. Production engines instrument this with metrics like currentInputWatermark per-task; when it stalls, you find the slow source.
The pathology this exposes is watermark skew — when one source partition is far behind the others. A common cause is uneven partition keys: if one Kafka partition gets 80% of the traffic, its events stream in slowly relative to processing time, its watermark lags, and every downstream window stalls waiting for it. The fix is either to rebalance partitioning at the producer (often by adding a hash salt to the key) or to accept the lag and tune maxOutOfOrderness higher. Flink's UI surfaces this as the low watermark metric per operator instance; sorting that column by lag finds the bottleneck in seconds.
Allowed lateness vs retention
Two related-but-different timers govern how long a window's state lives. Allowed lateness is how long after the window's right edge the engine will still update its result. Retention (or gracePeriod in some engines) is how long the engine keeps the window's state available for queries even after lateness expires — relevant when downstream consumers want to look up what was the count for [09:42, 09:43) yesterday. Kafka Streams calls these grace() and until() on its Materialized stores; Flink couples them via allowedLateness plus the state TTL. Conflating them is a common production bug — a team sets allowed lateness to 5 minutes, the result topic is correct, but a downstream interactive query against the state store returns nothing because the state was already evicted.
CDC streams and event time
When a database emits a change-data stream — Debezium reading the Postgres WAL, Mongo's change stream — every event already carries a commit timestamp from the source. That commit timestamp is the natural event time, and downstream pipelines should use it rather than the time the CDC connector saw the record. The subtlety is that the WAL writes the commit timestamp at commit, not at the time of the originating SQL statement, so the per-row event time may be later than the user intuitively expects. For most aggregates this is fine. For audit reports the team should expose the application's own created_at column as the event-time field instead.
Why watermarks are not enough alone
A watermark is necessary but not sufficient. Even with perfectly-tuned watermarks, two things can still go wrong: producer clocks drift (a phone whose system clock is 3 minutes ahead emits events that look like the future to your engine), and very-late stragglers fall outside any reasonable allowed lateness. Production pipelines pair watermarks with two extra defenses — timestamp clipping (clamp future-looking event times to the engine's current wall clock to limit damage from clock-skew), and dead-letter queues with manual reconciliation for the long tail of stragglers. The dashboard tells you about the stream right now; the DLQ closes the loop on the small fraction the stream cannot handle.
Where this leads next
- Flink: stateful stream processing — chapter 178: how a real engine implements windows, watermarks, side outputs, and checkpointed state at production scale.
- Materialize and Differential Dataflow: databases as views — chapter 179: an engine where every window is implicitly retracted-and-revised, no
allowedLatenessknob needed. - Exactly-once semantics: how it actually works — chapter 176: when a window's emission is the side effect, exactly-once delivery is what stops a stuck partition from emitting the same window twice.
- The stream / table duality — chapter 175: a windowed aggregate is a table, computed by folding the stream up to a watermark.
- Change streams: CDC built in — the chapter where event-time arrives from a database WAL instead of an application producer.
References
- Tyler Akidau et al., The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing (VLDB 2015) — the foundational paper. Windows, triggers, accumulation, watermarks, all formalised. research.google/pubs/pub43864.
- Tyler Akidau, Streaming 101 / Streaming 102 (O'Reilly Radar, 2015) — the most accessible introduction to event time, watermarks, and triggers; written by the lead author of the Dataflow paper. oreilly.com/radar/the-world-beyond-batch-streaming-101.
- Apache Flink documentation, Generating Watermarks — the practical reference for
WatermarkStrategy.forBoundedOutOfOrderness, idle sources, and per-partition watermarks. nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks. - Apache Kafka Streams documentation, Windowing — the KStreams take on tumbling, hopping, and session windows, plus grace periods. kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#windowing.
- Paul Akidau, Watermarks: Time and Progress in Apache Beam (and beyond) (Strata 2017 talk) — the visual intuition for heuristic vs perfect watermarks across joins and shuffles. oreilly.com/library/view/strata-data-conference/9781491976326/video316594.html.
- Tyler Akidau et al., MillWheel: Fault-Tolerant Stream Processing at Internet Scale (VLDB 2013) — the predecessor system at Google where the watermark abstraction was first deployed in production. research.google/pubs/pub41378.
- padho-wiki, The stream / table duality — chapter 175. A windowed aggregate is a table indexed by
(key, window). - padho-wiki, Kafka as a distributed log — chapter 174. Where the per-partition timestamps and offsets that power watermarks come from.