Late data: drop, reprocess, or side-output

It is 19:47 IST on Diwali night. Riya's Flink job emitted the 19:30 UPI minute at 19:31:05 with total = ₹3,12,84,50,000. At 19:47:22 a Jio tower in Lucknow flushes a 16-minute NAT buffer; 1,824 transactions stamped 19:30:14–19:30:44 land at her job's source operator. The watermark is at 19:46:30. The 19:30 window has been closed, fired, persisted to ClickHouse, and rendered on a dashboard the CFO is looking at. What does the runtime do with the 1,824 stragglers? Three answers exist in production, and which one is right depends entirely on what the dashboard is for.

A late event is one whose event_time falls behind the operator's current watermark. You have three policies to handle it: drop (cheap, lossy), buffer-then-update via allowed_lateness (the middle ground), or side-output to a separate stream (correct, requires downstream cooperation). Production picks per-pipeline based on whether the consumer can absorb retractions and how badly a wrong number hurts.

What "late" means precisely

The previous chapter (/wiki/watermarks-how-we-know-were-done) defined the watermark as a moving frontier on the event-time axis: a promise that no event with event_time < W will arrive any more. That promise is a heuristic. It is right almost always and wrong sometimes. When it is wrong, the event that arrives behind the line is late.

Two slightly different definitions get conflated, and the bug usually lives in the gap between them. Late at the source means event_time < operator.current_watermark when the event is read off the source. Late at the window means the event maps to a window whose end-time has already been crossed by the watermark — the window has been fired. An event can be late-at-source without being late-at-window if the window is large enough; it can also be late-at-window even if the source isn't tracking lateness, because windowing is the operator that makes the firing decision.

The runtime cares about the second definition. Until a window has fired, late-arriving events can simply join the already-buffered events and contribute to the eventual aggregate. Once the window has fired, the aggregate has been emitted downstream and the in-memory state has typically been freed. Now the late event is a problem with no good free answer.

When a late event becomes a problemA timeline showing the watermark advancing past a window's end. Events arriving before the watermark crosses the end are buffered into the window; events arriving after are late and need a policy. A late event is one that maps to an already-fired window 19:29 19:30 19:31 19:32 19:30 window [19:30:00, 19:31:00) end_time = 19:31:00 W=19:31:30 window has fired on-time on-time on-time arrives at 19:32:18 event_time 19:30:24 LATE on-time event (joins window before W crosses end) late event (window already fired, needs policy) The window's end is 19:31:00. The watermark crossed it and fired the window. After that, any event mapped to this window — regardless of how recently it arrived — is late and the runtime asks: drop, update, or side-output?
Lateness is defined relative to the firing decision, not relative to the wallclock. A 16-minute-late event is a different problem from a 6-second-late event only because of where the watermark has moved in the meantime.

Why this distinction matters in code: in Flink, WindowOperator checks if (window.maxTimestamp() <= currentWatermark) { fire(); cleanState(window); }. After cleanState, the in-memory aggregate is gone. A late event for that window must either be dropped (no state to update), held in a side-output (escapes the operator), or trigger a re-fire that requires a separate allowed_lateness window-state buffer. Each path is a different code branch and a different storage cost.

The three policies

Drop

The default in most stream-processing systems. When the operator detects a late event (event_time + allowed_lateness < watermark), it discards the event and increments a counter (numLateRecordsDropped in Flink, dropped_late_events_total in Beam metrics). The counter goes on the dashboard; the event is gone.

Drop is the right answer when the consumer cannot tolerate retractions and the late-event volume is small enough to be a rounding error. A "transactions per minute" dashboard updated every 60 seconds for an internal health check tolerates 0.05% loss without a single human noticing. A regulatory reconciliation report does not.

Buffer-then-update with allowed_lateness

The runtime keeps a window's state alive past its firing for an additional allowed_lateness duration. During that window, any late event triggers a re-fire — the runtime updates the aggregate and emits a new result for the same window. After allowed_lateness expires, the state is dropped and any further late events behave as if no policy existed (drop or side-output, depending on configuration).

The output stream now has multiple results per window. The downstream consumer must be able to handle this: either an upsert sink (ClickHouse ReplacingMergeTree, Postgres ON CONFLICT DO UPDATE), or an explicit retraction protocol (Flink's RetractStream, Beam's accumulating_and_retracting).

Side-output

The late event is routed to a separate stream — Flink's OutputTag<T>, Beam's getFailedInsertsRef. The main aggregate stays untouched (the dashboard's number stops moving once the window fires). The side-output stream is consumed by a different process: a reconciliation job, a hot-fix dashboard ("uncounted UPI volume in last 1 hour"), or a manual review queue.

This is the policy that's hardest to get right and the most defensible in production. The main pipeline maintains a clean output schema; the late-event handling is an explicit, observable second pipeline. Why this beats drop+counter: a counter tells you "8,294 events were late in the last hour" but loses which events and what their value was. A side-output stream preserves the rows themselves, so you can compute "₹83,42,166 of UPI volume arrived after watermark for the 19:30 minute" and decide what to do with it.

Three late-data policies side by sideThree lanes show the same late event entering a windowed operator and producing different downstream effects: drop (counter increments only), allowed_lateness (window re-fires), and side-output (event flows to a parallel stream). Same late event, three policies, three outputs Drop late event window operator /dev/null counter += 1, no row emitted Allowed-lateness late event window (state held) window re-fires (UPSERT) Side-output late event window operator main stream (untouched) side stream (late events) Drop loses information. Allowed-lateness keeps the dashboard correct but requires the sink to upsert. Side-output preserves the late event for separate handling without polluting the main aggregate's schema.
The three policies differ in *who absorbs the cost*. Drop puts it on the user (wrong number). Allowed-lateness puts it on the sink (must support upsert). Side-output puts it on a second pipeline (you build it).

A demo: implement all three

The cleanest way to feel the trade-offs is to write a 70-line streaming runtime fragment that implements all three policies, run the same input through it, and read the divergent outputs.

# late_data_policies.py — drop, allowed_lateness, side-output side by side
import random
from collections import defaultdict
from dataclasses import dataclass, field

@dataclass
class Event:
    event_time: int       # seconds, event-time axis
    paise: int            # transaction value in paise
    txn_id: str

def stream(n=600, seed=11):
    rng = random.Random(seed)
    out = []
    for i in range(n):
        et = i
        roll = rng.random()
        if   roll < 0.92: delay = rng.randint(0, 1)        # 92% on time
        elif roll < 0.99: delay = rng.randint(20, 60)      # 7% mildly late
        else:             delay = rng.randint(900, 1200)   # 1% Lucknow tower
        out.append((et + delay, Event(et, rng.randint(100, 50000),
                                      f"upi_{i:04d}")))
    out.sort(key=lambda x: x[0])  # operator sees events in arrival order
    return out

def run(events, policy="drop", lateness_bound=30,
        allowed_lateness=300, window_size=60):
    """Return (main_aggregates, side_output, dropped_count)."""
    high_water = 0
    pending = defaultdict(lambda: {"sum": 0, "n": 0})  # window_start -> agg
    fired   = {}                                       # window_start -> last emit
    side    = []                                       # late events kept aside
    dropped = 0
    for arrival, e in events:
        if e.event_time > high_water:
            high_water = e.event_time
        watermark = high_water - lateness_bound
        win_start = (e.event_time // window_size) * window_size
        win_end   = win_start + window_size
        # Decide: on-time, allowed-lateness, or past-allowed
        if win_end > watermark:                        # window not yet fired
            pending[win_start]["sum"] += e.paise
            pending[win_start]["n"]   += 1
        elif win_end + allowed_lateness > watermark:   # late but within tolerance
            if policy == "drop":
                dropped += 1
            elif policy == "allowed_lateness":
                pending[win_start]["sum"] += e.paise
                pending[win_start]["n"]   += 1
                fired[win_start] = dict(pending[win_start])  # re-emit
            elif policy == "side_output":
                side.append(e)
        else:                                          # past allowed window
            if policy == "drop" or policy == "allowed_lateness":
                dropped += 1
            elif policy == "side_output":
                side.append(e)
        # Fire any windows whose end is now under the watermark
        for w in [w for w in pending if w + window_size <= watermark
                  and w not in fired]:
            fired[w] = dict(pending[w])
    # Drain
    for w in pending:
        if w not in fired:
            fired[w] = dict(pending[w])
    return fired, side, dropped

events = stream()
for policy in ("drop", "allowed_lateness", "side_output"):
    main, side, dropped = run(events, policy=policy)
    main_total = sum(w["sum"] for w in main.values())
    side_total = sum(e.paise for e in side)
    print(f"policy={policy:18s} main=₹{main_total/100:>10.2f}  "
          f"side=₹{side_total/100:>8.2f}  dropped={dropped:3d}")
policy=drop               main=₹148729.34  side=₹    0.00  dropped=  6
policy=allowed_lateness   main=₹150147.18  side=₹    0.00  dropped=  2
policy=side_output        main=₹148729.34  side=₹ 1417.84  dropped=  0

Six lines do the engineering:

The 1% long-tail delay (the "Lucknow tower" branch in stream()) is the entire reason the allowed_lateness policy exists. Most production tuning goes: pick lateness_bound at the 99th percentile (catches the 7%), pick allowed_lateness at the 99.9th percentile (catches the 1%), and side-output everything past that. This is the standard Flink production shape.

Watermark interaction: why retraction beats drop on consistency

An aggregate emitted by a streaming job is only correct if the watermark's bet was right. When a 1% event arrives 16 minutes late, the bet was wrong. Three things can happen downstream depending on policy:

Drop — the dashboard's number is permanently low. The CFO sees ₹3,12,84,50,000 for 19:30 forever. Reconciliation against the OLTP database (which did receive the late events) will show a ₹14,17,840 gap. The team explains the gap by hand every quarter.

Allowed-lateness with retraction — the dashboard's number updates to ₹3,14,26,28,400 at 19:47:30 (when the late events processed). The CFO sees the number change. Whether this is acceptable depends on the consumer: a streaming dashboard that ingests from a Kafka topic with idempotent updates handles it; a daily report that snapshots at 19:35 IST is unaware the number changed.

Side-output — the main number stays at ₹3,12,84,50,000 (matches the snapshot the CFO saw at 19:35). A second stream — late_upi_events — carries the 1,824 stragglers. A nightly reconciliation job consumes that stream and produces a single corrective row in a late_corrections table. The CFO's quarterly explanation now reads: "row 1 is the dashboard total at minute close; row 2 is the late-correction total; the sum matches the OLTP cumulative".

The choice is structural. Mutable consumers (warehouses with UPSERT) want allowed-lateness. Append-only consumers (a Kafka topic feeding a TV display) want side-output. Drop is for cases where the data quality budget allows 0.1% loss in exchange for not building the second pipeline.

Common confusions

Going deeper

What Flink, Beam, and Spark Streaming actually do under the hood

Flink's WindowOperator calls cleanupTime = window.maxTimestamp() + allowedLateness. A Trigger decides whether a late event fires the window again (onElement returns FIRE) or is dropped (SKIP_AND_PURGE). The default trigger for event-time windows under allowed-lateness is EventTimeTrigger, which fires every late event individually — a re-emit per late event. State is dropped at cleanupTime. Beam exposes the same model through the WindowingStrategy API: withAllowedLateness(Duration) and accumulationMode(ACCUMULATING_AND_RETRACTING) together produce retraction streams that an idempotent sink (BigQuery WRITE_APPEND with dedup, Pub/Sub Lite with ordering keys) consumes correctly. Spark Structured Streaming has withWatermark("event_time", "5 minutes") which sets both the watermark bound and the implicit allowed-lateness; events past 5 minutes late are dropped silently with no per-event counter unless you build it. The Spark default is the leakiest of the three; Flink is the most explicit; Beam is the most general.

The Razorpay reconciliation pipeline at 100M tx/day

A real Razorpay UPI pipeline consumes from a 256-partition Kafka topic, watermarks per-partition with forBoundedOutOfOrderness(Duration.ofSeconds(30)), and emits 1-minute UPI totals. Late-event volume on a normal day is around 0.04% (40 events per 1 lakh); on Diwali night with a Lucknow Jio outage, it spikes to 1.8% (1,800 per lakh). The pipeline uses side-output to a upi-late-events topic; a daily Spark job consumes that topic, joins it back to the original aggregates by window key, and writes a upi_minute_corrections table. The CFO dashboard reads SELECT minute, base_total + COALESCE(correction, 0) AS total FROM upi_minute LEFT JOIN upi_minute_corrections USING (minute). The dashboard always shows the most-corrected number available, and the reconciliation against the OLTP source is automatic. The 30-second watermark bound was set after a delay-distribution analysis showed P99 = 27 seconds; the side-output catches everything past P99. There is no allowed_lateness setting because the streaming sink (a ClickHouse MergeTree table) is append-only and re-firing the window would produce duplicate rows.

Out-of-order vs late: a subtle distinction

An out-of-order event has event_time < max(event_time seen so far) but event_time >= watermark — it arrived in a different processing order than its event order, but the watermark hasn't passed it yet. Out-of-order events are normal and handled by every reasonable stream operator (windows are keyed by event_time, not arrival order). Late events have event_time < watermark — they have crossed the line. The distinction is invisible to a user-defined process(...) function but critical to the WindowOperator: the first joins the buffered window state, the second triggers the late-data policy branch.

The withIdleness interaction trap

A common production failure: a Kafka source with withIdleness(Duration.ofMinutes(2)) excludes a temporarily-quiet partition from the watermark calculation. The watermark advances based on the active partitions. Two minutes later the quiet partition resumes — perhaps because a Mizoram broker recovered — and the events that come back have event_time from before the now-advanced watermark. Every one of those events is late by withIdleness's definition. If your pipeline uses withIdleness(2 min) and allowed_lateness(30s), a 90-second-quiet partition produces 60 seconds of "late" events that get dropped silently. Match the two settings: allowed_lateness >= withIdleness if you can't afford to lose them, or use side-output to capture all of them and let downstream reconcile.

Why bounded retraction is the only correct primitive for this problem

The Dataflow paper's section on triggers introduces accumulation modesDISCARDING, ACCUMULATING, ACCUMULATING_AND_RETRACTING. The third is the only one that gives downstream consumers enough information to compute correct cumulative aggregates over an unbounded stream with late events. Discarding emits each pane as the delta since the last fire (downstream sums it up); accumulating emits the cumulative value (downstream takes the max-by-version); accumulating-and-retracting emits both (window, +new_value) and (window, -old_value) (downstream applies both). The retraction lets an idempotent sink with no version semantics — a Cassandra counter, a Redis HINCRBY — stay correct under arbitrary re-emit. Most production Flink jobs use accumulating because their sinks support UPSERT; production Beam jobs more often use accumulating-and-retracting because BigQuery's MERGE semantics don't compose with running aggregates the same way.

Where this leads next

The next chapter — /wiki/triggers-when-to-emit-not-just-what-to-compute — generalises the firing decision. Instead of "fire when watermark crosses end-time", the trigger can be "fire every 10 seconds even before watermark", "fire every 1k events accumulated", or "fire on a custom predicate". Once you have triggers, late-data policy is just the rule that says what happens after the terminal trigger has fired.

After that, /wiki/state-stores-why-rocksdb-is-in-every-streaming-engine explains where the buffered window state — the per-window aggregate held during allowed_lateness — actually lives. The cost of generous lateness bounds is paid in RocksDB row count, and the engineering of state-cleanup TTLs is the non-obvious half of late-data tuning.

The mental model for Build 8: the watermark says when. Triggers say how often. Late-data policy says what happens when the watermark was wrong. Each is a knob; production tunes them per-pipeline based on the consumer's tolerance for retractions and the engineering team's tolerance for building parallel pipelines.

References