The Dataflow model: batch as bounded streams

A Razorpay analytics engineer in 2016 is trying to compute settlement totals per merchant per day. The batch pipeline groups by event_date, sums amount, writes to Postgres. The streaming pipeline groups by 1-minute windows, sums amount, updates Redis. They are the same query in two languages, two runtimes, two semantics. Then someone forwards her the Dataflow paper. Twenty pages later she realises she has been writing the wrong question. The question is not "batch or stream"; the question is when does a window have all of its data. The 2015 paper from Akidau et al. — the people who ran MillWheel and FlumeJava at Google — replaced "batch vs streaming" with one model where the only thing that distinguishes them is whether the input ends.

The Dataflow model says every data-processing problem is the same four questions: what are you computing, where in event time, when in processing time do you fire results, and how do refinements relate. Batch is the special case where the watermark advances to infinity at the end of the input. The model unified the two paradigms and is the intellectual basis for Apache Beam, Flink's DataStream API, and modern unified runtimes.

The four questions that replace "batch or stream"

Before Dataflow, the choice between batch and streaming was a runtime choice — pick Hadoop for hours-of-latency-but-correct, pick Storm for seconds-of-latency-but-approximate. The Dataflow model reframed it as a semantic choice. Every pipeline answers four questions, regardless of runtime:

  1. What are you computing? (Sum, count distinct, top-K, custom reducer.) This is the transformation.
  2. Where in event time is the result grouped? (Fixed 1-minute windows, sliding 5-minute windows with 1-minute period, session windows with 30-minute gap, global window.) This is the windowing strategy.
  3. When in processing time do you emit a result for that window? (When the watermark passes the window's end, every minute regardless, on every input record.) This is the trigger.
  4. How do refinements to the same window relate to each other? (Discard previous output, accumulate, accumulate-and-retract.) This is the accumulation mode.

A batch pipeline answers: what = sum, where = daily fixed window, when = at end of input (watermark hits +∞), how = discarding (one final result per window). A streaming pipeline answers: what = sum, where = daily fixed window, when = every 1 minute on processing time AND at watermark close, how = accumulating (each emit is the running total, replacing the previous emit).

The four questions in the Dataflow modelFour panels labelled What, Where, When, How showing the four orthogonal axes of the Dataflow model with example settings underneath each. Four orthogonal questions, one model WHAT the transform sum / count distinct merchants top-K products custom reducer "sum payment amount per merchant" WHERE event-time window fixed (tumbling) sliding session (gap-based) global "per 1-minute tumbling on event_time" WHEN trigger fires watermark close processing-time data-driven (count) composite (early/late) "every 30s until watermark, then on late" HOW refinement mode discarding accumulating accumulating-and- retracting "each emit replaces the previous total"
Every windowed pipeline — batch or streaming — is a choice along these four axes. The runtime is implementation; the semantics are the model.

The radical idea is that batch is just one corner of the WHEN axis — the corner where the watermark advances to infinity at the end of input, the trigger fires once, the accumulation mode is discarding because there are no later refinements possible. Streaming explores the rest of the axis, but the semantic shape is identical. Why this matters: if batch and streaming are the same shape, you can write a metric once and run it in either mode. Apache Beam, Flink's unified API, and the entire "lakehouse-plus-streaming" generation are direct consequences of accepting this collapse.

Watermarks: how the model knows a window is "done"

The Dataflow model's central operational concept is the watermark — a heuristic estimate of "we have processed all events with event_time ≤ T". When a window's end-of-event-time is below the watermark, the model considers the window complete enough to emit a result. The watermark is a moving frontier on the event-time axis, and it advances based on what the system has seen so far.

Watermark advancing across an event-time timelineAn animation showing events arriving on an event-time axis with the watermark line slowly sweeping right; one late event arrives behind the watermark, marked as late. Watermark sweep across event time events plotted by (event_time, processing_time) 10:00 10:01 10:02 10:03 10:04 10:05 event time → now past late! watermark
The watermark (dashed accent line) sweeps right through event time. Events to its left are "complete"; the orange event arriving behind it is *late* — handled by allowed-lateness triggers, not lost.

Two properties matter:

  1. The watermark is a heuristic, not a guarantee. It is the system's best estimate based on input timestamps and source watermarks. If a Kafka partition hasn't sent data in 5 minutes, what's the watermark? Source-specific logic decides — usually "min event_time across partitions, with idle-partition handling". Late events can and do arrive behind the watermark.
  2. The trigger uses the watermark as input, not as truth. A typical trigger is "fire when watermark passes window-end, then fire again for any late event up to allowed-lateness=10 minutes, then close". The WHEN axis composes multiple sub-triggers into a single windowing strategy.

This is the breakthrough: instead of forcing the system to be either correct (batch, wait for everything) or fast (streaming, fire on every record), the Dataflow model lets you parametrise the trade-off. Allowed lateness = 0 means strict (drop late events). Allowed lateness = 1 hour means tolerant. Early triggers + accumulating mode means "speculative results updated as data arrives". You get to pick where on the latency-correctness curve to sit, per pipeline, without changing runtimes.

Code: the same query in three modes

The clearest way to internalise the model is to see one transformation expressed in three trigger modes, all on the same Beam-Python API. The pipeline computes per-merchant payment totals for 1-minute event-time windows.

# dataflow_model_three_modes.py
# One transformation, three trigger configurations, identical "what" and "where".
import apache_beam as beam
from apache_beam.transforms import window, trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka

def run(mode: str):
    opts = PipelineOptions(streaming=True)
    with beam.Pipeline(options=opts) as p:
        events = (
            p
            | "ReadKafka" >> ReadFromKafka(
                consumer_config={"bootstrap.servers": "kafka.razorpay-internal:9092"},
                topics=["payments.captured"])
            | "ParseJSON" >> beam.Map(parse_payment_event)
            | "Timestamp"  >> beam.Map(
                lambda e: beam.window.TimestampedValue(e, e["event_time_unix"]))
            | "ByMerchant" >> beam.Map(lambda e: (e["merchant_id"], e["amount_paise"]))
        )

        if mode == "batch_style":
            # Watermark close only — one emit per window, when complete.
            windowed = events | window.FixedWindows(60) | beam.GroupByKey()

        elif mode == "early_speculative":
            # Fire every 10s of processing time during the window (early),
            # then watermark close (on time), then on each late event up to 5 min.
            windowed = (events
                | window.FixedWindows(60).with_trigger(
                      trigger.AfterWatermark(
                          early=trigger.AfterProcessingTime(10),
                          late=trigger.AfterCount(1)),
                      allowed_lateness=window.Duration(seconds=300),
                      accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
                | beam.GroupByKey())

        elif mode == "discarding_realtime":
            # Fire every record (data-driven) — pure streaming, each emit is a delta.
            windowed = (events
                | window.FixedWindows(60).with_trigger(
                      trigger.Repeatedly(trigger.AfterCount(1)),
                      accumulation_mode=trigger.AccumulationMode.DISCARDING)
                | beam.GroupByKey())

        totals = windowed | beam.MapTuple(
            lambda mid, amounts: (mid, sum(amounts)))
        totals | "Sink" >> beam.io.WriteToText(f"out_{mode}")

# Sample run output (mode=early_speculative), merchant "RZP_K8RG2":
#   10:23:10  RZP_K8RG2  ₹  4,28,500   (early, 10s into window)
#   10:23:20  RZP_K8RG2  ₹  9,12,300   (early, accumulated)
#   10:23:30  RZP_K8RG2  ₹ 14,06,750   (early, accumulated)
#   10:23:40  RZP_K8RG2  ₹ 18,77,200   (early, accumulated)
#   10:23:50  RZP_K8RG2  ₹ 23,15,000   (early, accumulated)
#   10:24:00  RZP_K8RG2  ₹ 27,40,800   (on time — watermark passed)
#   10:25:14  RZP_K8RG2  ₹ 27,52,300   (late event, allowed_lateness)

Walk through what changes when you flip modes:

The point is that the events PCollection at the top of run() is identical across modes. Only the trigger and accumulation mode change. The "what" (sum, transform shape) and "where" (1-minute fixed window on event time) are unchanged. The model factored what used to be one runtime-coupled choice into three orthogonal parameters.

Late data and the lateness-vs-correctness knob

The hard part of every windowed system is what to do with events that arrive after their window's watermark has passed. In old-school batch, this was solved by the schedule — daily reports for dt=2026-04-25 ran at 2026-04-26 00:30 IST so anything later than 30 minutes was just dropped. In old-school streaming (Storm), late events were silently misrouted into the wrong window or dropped. Neither was satisfactory.

The Dataflow model treats late data as a first-class question with three knobs:

  1. Allowed lateness: how long after the watermark passes the window-end will the system retain state for that window? Beyond this, the window's state is garbage-collected and any further late events are dropped. Why this matters operationally: keeping all window state forever is unbounded memory growth. Allowed lateness is the explicit trade-off: "I'll keep state for 1 hour past the watermark, after that the window is closed for good." If 99.9% of events arrive within 30 minutes of event-time, allowed-lateness = 1 hour catches them all without unbounded state.
  2. Late-firing trigger: when a late event does arrive within allowed lateness, do we fire immediately (AfterCount(1))? Batch them up and fire every 1 minute? The choice affects how many downstream sink writes the late events generate.
  3. Accumulation mode for late firings: typically ACCUMULATING_AND_RETRACTING — emit the new corrected total and a retraction of the previously emitted total. This lets the sink reverse the old result and apply the new one, even if the sink is not idempotent on the windowed key. Beam supports this; not all runners do.

Razorpay's settlement reconciliation pipeline runs with allowed-lateness of 24 hours because UPI events from rural NPCI nodes can be delayed by up to 18 hours during regional outages. Flipkart's Big Billion Days event-counter ran with allowed-lateness of 5 minutes because the dashboard was tolerant of small inaccuracies and the team wanted bounded memory under 14× normal load. Same model, two settings — the Dataflow model says both are valid pipelines, just at different points on the lateness curve.

Common confusions

Going deeper

Why the model needs all four questions, not three

A common reaction on first reading is "what and where seem like one question, when and how are operational concerns". The paper's authors deliberately separate them because each has a different default and different domain experts:

Conflating them ("the streaming engine fires every record because that's what streams do") forces each team to negotiate around the runtime instead of designing for their own constraint. The four-axis model lets each team set its parameter independently. Why this is a real org-design improvement: at PhonePe, the analyst defining a fraud-detection metric does not need to know whether the runtime is Flink or Spark; they specify event-time tumbling 5-minute windows with sum aggregation. The SRE team picks the trigger schedule based on dashboard latency targets. The DBA picks accumulation mode based on whether the warehouse table is a SummingMergeTree or a ReplacingMergeTree. Three teams, three knobs, one pipeline.

MillWheel and FlumeJava: what Dataflow inherited

MillWheel (Google's 2013 paper, Akidau et al.) introduced two crucial primitives the Dataflow model formalised: low-watermarks as a system-wide event-time frontier, and exactly-once stateful processing via persistent timers and dedup. FlumeJava (Google's 2010 paper) introduced PCollections and the deferred-evaluation graph that became Beam's pipeline construction model. The Dataflow paper is essentially "MillWheel's runtime concepts plus FlumeJava's API, unified for batch and streaming". This lineage matters because each predecessor brought a piece — the runtime guarantees, the API shape, the operational semantics — and Dataflow was the synthesis. If you read only one predecessor paper, read MillWheel for the watermark mechanics.

Triggering, retractions, and the relationship to incremental view maintenance

Once you accept that a streaming pipeline can emit-and-retract refinements to a windowed result, you've accepted the building blocks of incremental view maintenance (IVM). The "current value of a window" is a materialised view; emit-and-retract is exactly the +1/-1 deltas that differential dataflow uses. The Dataflow model and differential dataflow (/wiki/incremental-view-maintenance-as-the-endgame) are siblings — different vocabularies for the same idea. Materialize and Feldera implement explicit differential dataflow; Flink's accumulating-and-retracting mode implements the Dataflow flavour. The semantics agree.

How Indian streaming workloads stress-test the model

Dream11's match-day analytics is an interesting stress test. During an India vs Pakistan T20 the platform sees 50× normal event rate concentrated in 3-hour bursts, with event-time clustered around the over-by-over rhythm. Their pipeline uses 30-second tumbling windows for "active users per match", with allowed-lateness of 90 seconds (geo-routing through Mumbai and Bengaluru POPs introduces variable lag). Early triggers fire every 5 seconds during the window so the live "concurrent users" widget stays fresh. Accumulating mode keeps the dashboard reads simple — each emit replaces the previous total. The four-axis model maps cleanly onto a Tier-1 Indian production pipeline; the settings of those axes are where the engineering judgement lives.

The unbounded out-of-order assumption

A subtle but important framing in the paper: the model assumes input is unbounded and out-of-order by default, with batch as a degenerate case (bounded, in-order if you sort). This is the opposite of how most predecessors were designed (batch as primary, streaming as bolt-on). The flip matters because the harder case constrains the design — if you build for unbounded out-of-order and let bounded-in-order be the easy path, you end up with a model that can express anything. If you build for bounded-in-order and try to extend to unbounded out-of-order, you bolt on hacks (Spark Streaming's micro-batches, Storm's at-least-once-with-Trident). The Dataflow paper is explicit that the design choice was "let the harder problem drive the model".

Where this leads next

The Dataflow model is the conceptual basis for the next four chapters in this build:

The pattern to internalise: when someone asks "should this be a batch job or a streaming job", the Dataflow model says you're asking the wrong question. Ask "what, where, when, how" — the runtime is the last decision, not the first.

References