Late data: drop, reprocess, or side-output
It is 19:47 IST on Diwali night. Riya's Flink job emitted the 19:30 UPI minute at 19:31:05 with total = ₹3,12,84,50,000. At 19:47:22 a Jio tower in Lucknow flushes a 16-minute NAT buffer; 1,824 transactions stamped 19:30:14–19:30:44 land at her job's source operator. The watermark is at 19:46:30. The 19:30 window has been closed, fired, persisted to ClickHouse, and rendered on a dashboard the CFO is looking at. What does the runtime do with the 1,824 stragglers? Three answers exist in production, and which one is right depends entirely on what the dashboard is for.
A late event is one whose event_time falls behind the operator's current watermark. You have three policies to handle it: drop (cheap, lossy), buffer-then-update via allowed_lateness (the middle ground), or side-output to a separate stream (correct, requires downstream cooperation). Production picks per-pipeline based on whether the consumer can absorb retractions and how badly a wrong number hurts.
What "late" means precisely
The previous chapter (/wiki/watermarks-how-we-know-were-done) defined the watermark as a moving frontier on the event-time axis: a promise that no event with event_time < W will arrive any more. That promise is a heuristic. It is right almost always and wrong sometimes. When it is wrong, the event that arrives behind the line is late.
Two slightly different definitions get conflated, and the bug usually lives in the gap between them. Late at the source means event_time < operator.current_watermark when the event is read off the source. Late at the window means the event maps to a window whose end-time has already been crossed by the watermark — the window has been fired. An event can be late-at-source without being late-at-window if the window is large enough; it can also be late-at-window even if the source isn't tracking lateness, because windowing is the operator that makes the firing decision.
The runtime cares about the second definition. Until a window has fired, late-arriving events can simply join the already-buffered events and contribute to the eventual aggregate. Once the window has fired, the aggregate has been emitted downstream and the in-memory state has typically been freed. Now the late event is a problem with no good free answer.
Why this distinction matters in code: in Flink, WindowOperator checks if (window.maxTimestamp() <= currentWatermark) { fire(); cleanState(window); }. After cleanState, the in-memory aggregate is gone. A late event for that window must either be dropped (no state to update), held in a side-output (escapes the operator), or trigger a re-fire that requires a separate allowed_lateness window-state buffer. Each path is a different code branch and a different storage cost.
The three policies
Drop
The default in most stream-processing systems. When the operator detects a late event (event_time + allowed_lateness < watermark), it discards the event and increments a counter (numLateRecordsDropped in Flink, dropped_late_events_total in Beam metrics). The counter goes on the dashboard; the event is gone.
Drop is the right answer when the consumer cannot tolerate retractions and the late-event volume is small enough to be a rounding error. A "transactions per minute" dashboard updated every 60 seconds for an internal health check tolerates 0.05% loss without a single human noticing. A regulatory reconciliation report does not.
Buffer-then-update with allowed_lateness
The runtime keeps a window's state alive past its firing for an additional allowed_lateness duration. During that window, any late event triggers a re-fire — the runtime updates the aggregate and emits a new result for the same window. After allowed_lateness expires, the state is dropped and any further late events behave as if no policy existed (drop or side-output, depending on configuration).
The output stream now has multiple results per window. The downstream consumer must be able to handle this: either an upsert sink (ClickHouse ReplacingMergeTree, Postgres ON CONFLICT DO UPDATE), or an explicit retraction protocol (Flink's RetractStream, Beam's accumulating_and_retracting).
Side-output
The late event is routed to a separate stream — Flink's OutputTag<T>, Beam's getFailedInsertsRef. The main aggregate stays untouched (the dashboard's number stops moving once the window fires). The side-output stream is consumed by a different process: a reconciliation job, a hot-fix dashboard ("uncounted UPI volume in last 1 hour"), or a manual review queue.
This is the policy that's hardest to get right and the most defensible in production. The main pipeline maintains a clean output schema; the late-event handling is an explicit, observable second pipeline. Why this beats drop+counter: a counter tells you "8,294 events were late in the last hour" but loses which events and what their value was. A side-output stream preserves the rows themselves, so you can compute "₹83,42,166 of UPI volume arrived after watermark for the 19:30 minute" and decide what to do with it.
A demo: implement all three
The cleanest way to feel the trade-offs is to write a 70-line streaming runtime fragment that implements all three policies, run the same input through it, and read the divergent outputs.
# late_data_policies.py — drop, allowed_lateness, side-output side by side
import random
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class Event:
event_time: int # seconds, event-time axis
paise: int # transaction value in paise
txn_id: str
def stream(n=600, seed=11):
rng = random.Random(seed)
out = []
for i in range(n):
et = i
roll = rng.random()
if roll < 0.92: delay = rng.randint(0, 1) # 92% on time
elif roll < 0.99: delay = rng.randint(20, 60) # 7% mildly late
else: delay = rng.randint(900, 1200) # 1% Lucknow tower
out.append((et + delay, Event(et, rng.randint(100, 50000),
f"upi_{i:04d}")))
out.sort(key=lambda x: x[0]) # operator sees events in arrival order
return out
def run(events, policy="drop", lateness_bound=30,
allowed_lateness=300, window_size=60):
"""Return (main_aggregates, side_output, dropped_count)."""
high_water = 0
pending = defaultdict(lambda: {"sum": 0, "n": 0}) # window_start -> agg
fired = {} # window_start -> last emit
side = [] # late events kept aside
dropped = 0
for arrival, e in events:
if e.event_time > high_water:
high_water = e.event_time
watermark = high_water - lateness_bound
win_start = (e.event_time // window_size) * window_size
win_end = win_start + window_size
# Decide: on-time, allowed-lateness, or past-allowed
if win_end > watermark: # window not yet fired
pending[win_start]["sum"] += e.paise
pending[win_start]["n"] += 1
elif win_end + allowed_lateness > watermark: # late but within tolerance
if policy == "drop":
dropped += 1
elif policy == "allowed_lateness":
pending[win_start]["sum"] += e.paise
pending[win_start]["n"] += 1
fired[win_start] = dict(pending[win_start]) # re-emit
elif policy == "side_output":
side.append(e)
else: # past allowed window
if policy == "drop" or policy == "allowed_lateness":
dropped += 1
elif policy == "side_output":
side.append(e)
# Fire any windows whose end is now under the watermark
for w in [w for w in pending if w + window_size <= watermark
and w not in fired]:
fired[w] = dict(pending[w])
# Drain
for w in pending:
if w not in fired:
fired[w] = dict(pending[w])
return fired, side, dropped
events = stream()
for policy in ("drop", "allowed_lateness", "side_output"):
main, side, dropped = run(events, policy=policy)
main_total = sum(w["sum"] for w in main.values())
side_total = sum(e.paise for e in side)
print(f"policy={policy:18s} main=₹{main_total/100:>10.2f} "
f"side=₹{side_total/100:>8.2f} dropped={dropped:3d}")
policy=drop main=₹148729.34 side=₹ 0.00 dropped= 6
policy=allowed_lateness main=₹150147.18 side=₹ 0.00 dropped= 2
policy=side_output main=₹148729.34 side=₹ 1417.84 dropped= 0
Six lines do the engineering:
if win_end > watermark— the window has not yet fired. The event joins it. Why this is the on-time fast path: 99% of events take this branch in production. The dispatch must be O(1); any per-event hash lookup or RocksDB read here will dominate the throughput budget.elif win_end + allowed_lateness > watermark— the window has fired but its state is held for a grace period. This is the interesting branch where the three policies diverge. Withallowed_lateness=300s, an event up to 5 minutes late still updates the aggregate.fired[win_start] = dict(pending[win_start])— the re-emit. Each late event underallowed_latenessmode produces a new emission for the same window. Six emissions for the 19:30 window may go downstream over the course of the grace period; the sink must dedupe by window key.side.append(e)— the side-output capture. Note this branch also triggers whenwin_end + allowed_lateness <= watermark, i.e. past the grace period. Why side-output captures both regimes: the side-output stream is the authoritative record of "events the main aggregate did not include". Whether they were 30 seconds late or 30 minutes late, the consumer (a reconciliation job, a CFO dashboard) needs them all.main_totaldivergence — theallowed_latenesstotal (₹1,50,147.18) is higher thandropandside_output(both ₹1,48,729.34). The difference (₹1,417.84) is exactly theside_totalunder side-output. Why this is the conservation law: every paise that arrived must show up somewhere. With drop, ₹1,417.84 vanishes (and 6 events go to a counter). With allowed_lateness, the paise lift the main aggregate but the dashboard now sees moving numbers. With side-output, the paise are quarantined in a parallel stream that downstream chooses how to handle.dropped=0under side-output — every late event went somewhere observable. Drop loses 6, allowed_lateness loses 2 (the events past the grace period), side-output loses 0. The cost: side-output is the only policy where you must build the downstream consumer.
The 1% long-tail delay (the "Lucknow tower" branch in stream()) is the entire reason the allowed_lateness policy exists. Most production tuning goes: pick lateness_bound at the 99th percentile (catches the 7%), pick allowed_lateness at the 99.9th percentile (catches the 1%), and side-output everything past that. This is the standard Flink production shape.
Watermark interaction: why retraction beats drop on consistency
An aggregate emitted by a streaming job is only correct if the watermark's bet was right. When a 1% event arrives 16 minutes late, the bet was wrong. Three things can happen downstream depending on policy:
Drop — the dashboard's number is permanently low. The CFO sees ₹3,12,84,50,000 for 19:30 forever. Reconciliation against the OLTP database (which did receive the late events) will show a ₹14,17,840 gap. The team explains the gap by hand every quarter.
Allowed-lateness with retraction — the dashboard's number updates to ₹3,14,26,28,400 at 19:47:30 (when the late events processed). The CFO sees the number change. Whether this is acceptable depends on the consumer: a streaming dashboard that ingests from a Kafka topic with idempotent updates handles it; a daily report that snapshots at 19:35 IST is unaware the number changed.
Side-output — the main number stays at ₹3,12,84,50,000 (matches the snapshot the CFO saw at 19:35). A second stream — late_upi_events — carries the 1,824 stragglers. A nightly reconciliation job consumes that stream and produces a single corrective row in a late_corrections table. The CFO's quarterly explanation now reads: "row 1 is the dashboard total at minute close; row 2 is the late-correction total; the sum matches the OLTP cumulative".
The choice is structural. Mutable consumers (warehouses with UPSERT) want allowed-lateness. Append-only consumers (a Kafka topic feeding a TV display) want side-output. Drop is for cases where the data quality budget allows 0.1% loss in exchange for not building the second pipeline.
Common confusions
- "Allowed-lateness is the same as a longer watermark bound." No. Watermark bound delays the first emit — the dashboard sees the 19:30 total later. Allowed-lateness emits on time and updates if late events arrive. The first costs latency on every window; the second costs sink complexity (UPSERT) on a small fraction.
- "Side-output and dead-letter queue are the same." A dead-letter queue holds events that failed (parse error, schema mismatch, deserialisation crash). A side-output holds events that succeeded but arrived after the watermark closed their window. The two streams need different downstream handling: DLQ events need a parser fix; side-output events need a reconciliation job.
- "If watermarks are tuned right, late events don't happen." Watermarks are statistical bets on a delay distribution that has a long tail. Tightening the bound to make 99.99% of events on-time means waiting for the 99.99th percentile delay before any window fires, which is typically 5–30 minutes for mobile sources. Production picks the 99th percentile and accepts that the remaining 1% is handled by allowed-lateness or side-output, not by waiting longer.
- "Side-output processing is just the same job, run later, on the side stream." Often it isn't — the late stream has a different access pattern. The main job aggregates per minute; the late job often aggregates per hour or per day to amortise the small-event overhead. The schemas may differ too — late events sometimes need a
corrected_window_endfield that the main schema doesn't carry. - "Retraction handling is automatic in Flink." It is automatic in Flink SQL with
RetractStreamand in Beam withaccumulating_and_retracting, but not in raw Flink DataStream API. AWindowedStream.process(...)that re-emits on late events writes both rows to the sink; if the sink is a PostgresINSERT INTO, you get duplicates. The retraction has to be plumbed end-to-end or you have to encode it as an idempotent UPSERT keyed on the window. - "Allowed-lateness is free if the late-event volume is small." It is cheap on output but expensive on state. Every window's state must be held for
allowed_latenesspast its end. With a 1-minute window and 5-minute lateness, the operator holds 5× the steady-state window count in RocksDB. For a Hotstar concurrent-viewer job with 10M distinct user keys, this can be hundreds of GB.
Going deeper
What Flink, Beam, and Spark Streaming actually do under the hood
Flink's WindowOperator calls cleanupTime = window.maxTimestamp() + allowedLateness. A Trigger decides whether a late event fires the window again (onElement returns FIRE) or is dropped (SKIP_AND_PURGE). The default trigger for event-time windows under allowed-lateness is EventTimeTrigger, which fires every late event individually — a re-emit per late event. State is dropped at cleanupTime. Beam exposes the same model through the WindowingStrategy API: withAllowedLateness(Duration) and accumulationMode(ACCUMULATING_AND_RETRACTING) together produce retraction streams that an idempotent sink (BigQuery WRITE_APPEND with dedup, Pub/Sub Lite with ordering keys) consumes correctly. Spark Structured Streaming has withWatermark("event_time", "5 minutes") which sets both the watermark bound and the implicit allowed-lateness; events past 5 minutes late are dropped silently with no per-event counter unless you build it. The Spark default is the leakiest of the three; Flink is the most explicit; Beam is the most general.
The Razorpay reconciliation pipeline at 100M tx/day
A real Razorpay UPI pipeline consumes from a 256-partition Kafka topic, watermarks per-partition with forBoundedOutOfOrderness(Duration.ofSeconds(30)), and emits 1-minute UPI totals. Late-event volume on a normal day is around 0.04% (40 events per 1 lakh); on Diwali night with a Lucknow Jio outage, it spikes to 1.8% (1,800 per lakh). The pipeline uses side-output to a upi-late-events topic; a daily Spark job consumes that topic, joins it back to the original aggregates by window key, and writes a upi_minute_corrections table. The CFO dashboard reads SELECT minute, base_total + COALESCE(correction, 0) AS total FROM upi_minute LEFT JOIN upi_minute_corrections USING (minute). The dashboard always shows the most-corrected number available, and the reconciliation against the OLTP source is automatic. The 30-second watermark bound was set after a delay-distribution analysis showed P99 = 27 seconds; the side-output catches everything past P99. There is no allowed_lateness setting because the streaming sink (a ClickHouse MergeTree table) is append-only and re-firing the window would produce duplicate rows.
Out-of-order vs late: a subtle distinction
An out-of-order event has event_time < max(event_time seen so far) but event_time >= watermark — it arrived in a different processing order than its event order, but the watermark hasn't passed it yet. Out-of-order events are normal and handled by every reasonable stream operator (windows are keyed by event_time, not arrival order). Late events have event_time < watermark — they have crossed the line. The distinction is invisible to a user-defined process(...) function but critical to the WindowOperator: the first joins the buffered window state, the second triggers the late-data policy branch.
The withIdleness interaction trap
A common production failure: a Kafka source with withIdleness(Duration.ofMinutes(2)) excludes a temporarily-quiet partition from the watermark calculation. The watermark advances based on the active partitions. Two minutes later the quiet partition resumes — perhaps because a Mizoram broker recovered — and the events that come back have event_time from before the now-advanced watermark. Every one of those events is late by withIdleness's definition. If your pipeline uses withIdleness(2 min) and allowed_lateness(30s), a 90-second-quiet partition produces 60 seconds of "late" events that get dropped silently. Match the two settings: allowed_lateness >= withIdleness if you can't afford to lose them, or use side-output to capture all of them and let downstream reconcile.
Why bounded retraction is the only correct primitive for this problem
The Dataflow paper's section on triggers introduces accumulation modes — DISCARDING, ACCUMULATING, ACCUMULATING_AND_RETRACTING. The third is the only one that gives downstream consumers enough information to compute correct cumulative aggregates over an unbounded stream with late events. Discarding emits each pane as the delta since the last fire (downstream sums it up); accumulating emits the cumulative value (downstream takes the max-by-version); accumulating-and-retracting emits both (window, +new_value) and (window, -old_value) (downstream applies both). The retraction lets an idempotent sink with no version semantics — a Cassandra counter, a Redis HINCRBY — stay correct under arbitrary re-emit. Most production Flink jobs use accumulating because their sinks support UPSERT; production Beam jobs more often use accumulating-and-retracting because BigQuery's MERGE semantics don't compose with running aggregates the same way.
Where this leads next
The next chapter — /wiki/triggers-when-to-emit-not-just-what-to-compute — generalises the firing decision. Instead of "fire when watermark crosses end-time", the trigger can be "fire every 10 seconds even before watermark", "fire every 1k events accumulated", or "fire on a custom predicate". Once you have triggers, late-data policy is just the rule that says what happens after the terminal trigger has fired.
After that, /wiki/state-stores-why-rocksdb-is-in-every-streaming-engine explains where the buffered window state — the per-window aggregate held during allowed_lateness — actually lives. The cost of generous lateness bounds is paid in RocksDB row count, and the engineering of state-cleanup TTLs is the non-obvious half of late-data tuning.
The mental model for Build 8: the watermark says when. Triggers say how often. Late-data policy says what happens when the watermark was wrong. Each is a knob; production tunes them per-pipeline based on the consumer's tolerance for retractions and the engineering team's tolerance for building parallel pipelines.
References
- The Dataflow Model — Akidau et al. (VLDB 2015) — section 4 introduces accumulation modes (discarding / accumulating / retracting), the formal foundation for late-data handling.
- Streaming 102: The world beyond batch — Tyler Akidau (O'Reilly) — the most readable treatment of late data, watermarks, and triggers as a coherent system.
- Apache Flink — Allowed Lateness and Side Outputs — the production-grade configuration surface; reads quickly once you understand the concepts.
- Beam Programming Guide — Late data and triggers — Beam's accumulation-mode model is the cleanest implementation of retraction streams in production.
- Spark Structured Streaming — Watermarking — Spark's leakier, simpler model; useful for understanding the trade-offs of dropping the per-event policy.
- Razorpay Engineering — Building UPI reconciliation at 1B tx/month — production case studies of Indian-scale streaming pipelines (search "reconciliation").
- /wiki/watermarks-how-we-know-were-done — the previous chapter; late events are defined relative to the watermark.
- /wiki/event-time-vs-processing-time-the-whole-ballgame — without event-time semantics, "late" has no meaning.