Beam and Flink: write once, run on both
A platform engineer at Swiggy in 2024 has the same problem most Indian unicorns hit by year three: the analytics team runs Spark Structured Streaming on EMR, the ML team runs Flink on Kubernetes, and the new growth team wants Dataflow on GCP because the ad pixel sits there. Three runtimes, three dialects of "windowed group-by", three on-call rotations. The pitch for Apache Beam is exactly this misery: write the pipeline once in Beam's SDK, pick a runner per cluster, the code compiles down. The reality is more nuanced — Beam describes the what, the runner decides the how, and on Indian hardware with Indian latency budgets the runner you pick almost always ends up being Flink. This chapter is about why that pairing won, what Beam actually portable across, and where the abstraction leaks.
Apache Beam is a runner-agnostic pipeline SDK derived from Google's Dataflow Model paper. You write PCollection transforms once, then submit the pipeline to a runner — Flink, Spark, Dataflow, or Samza — which compiles Beam primitives down to its native execution graph. Beam-on-Flink is the de-facto open-source production combo because Flink's checkpointing, watermarks, and state primitives align almost 1:1 with Beam's model. The portability is real for the unified batch/stream API and leaky at the runner-feature edges.
What Beam actually gives you: a model, not a runtime
Apache Beam started as Google's Cloud Dataflow SDK, donated to the ASF in 2016 along with the Dataflow Model paper (Akidau et al., VLDB 2015). The paper's claim was that the right primitives for unified batch and streaming are four: what is computed, where in event time it's computed (windowing), when in processing time results are emitted (triggers), and how refinements relate (accumulation mode). Beam is the SDK that exposes these four directly.
The pipeline you write is a directed acyclic graph of PTransforms operating on PCollections. Each PCollection is a logical bag of elements with an event-time timestamp and a window assignment. Your code never says "run on this thread" or "checkpoint every 60 seconds" — those are runner concerns. It says "group these by user_id within a 5-minute tumbling window, fire one output when the window closes plus a refinement every 30 seconds for late data".
Why the IR is protobuf and not Java bytecode: the original Java-only Beam SDK could only target JVM runners. The 2018 portability framework introduced a language-neutral protobuf representation so a Python pipeline can run on a Java-based Flink runner via gRPC bridges (the "Fn API"). This is what makes Python-on-Flink-on-Kubernetes work without forcing the user to write Java — the user-defined functions run in a sidecar Python container that the Flink workers talk to over gRPC.
The four primitives map cleanly to the Dataflow Model:
| Beam primitive | Dataflow Model question | Runner translation (Flink) |
|---|---|---|
ParDo (per-element transform) |
what | FlatMapFunction / ProcessFunction |
Window.into(FixedWindows) |
where in event time | WindowAssigner (TumblingEventTimeWindows) |
Trigger.AfterWatermark() |
when in processing time | Flink's Trigger interface |
AccumulationMode.DISCARDING |
how refinements relate | Window state retention policy |
The translation is almost 1:1 for Flink because Flink's runtime was independently built around the same Dataflow Model. For Spark Structured Streaming, the translation is lossier — Spark's micro-batch model can approximate event-time triggers but not as precisely, which is why Beam-on-Spark has known feature gaps on session windows and late-data triggers (see the Beam capability matrix).
Same code, three runners: a worked example
Here is a Beam pipeline that computes per-merchant rolling 5-minute payment volume, filtered to UPI transactions over ₹1,000. The same Python file runs on the Flink runner (production), the Direct runner (local tests), and the Dataflow runner (if Swiggy decided to ship to GCP next quarter).
# upi_volume_5min.py — single Beam pipeline, run on three runners
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
import argparse, json, datetime
parser = argparse.ArgumentParser()
parser.add_argument("--runner", choices=["FlinkRunner", "DirectRunner", "DataflowRunner"], required=True)
parser.add_argument("--bootstrap", default="kafka.swiggy-internal:9092")
known, pipeline_args = parser.parse_known_args()
opts = PipelineOptions(pipeline_args, runner=known.runner, streaming=True,
project="swiggy-data-eng", region="ap-south-1")
with beam.Pipeline(options=opts) as p:
payments = (p
| "ReadKafka" >> ReadFromKafka(
consumer_config={"bootstrap.servers": known.bootstrap, "auto.offset.reset": "latest"},
topics=["payments.captured.v1"])
| "ParseJSON" >> beam.Map(lambda kv: json.loads(kv[1].decode("utf-8")))
| "FilterUPI" >> beam.Filter(lambda e: e["method"] == "upi" and e["amount"] >= 1000_00)
| "AssignTimestamp" >> beam.Map(
lambda e: beam.window.TimestampedValue(e, e["event_time_ms"] / 1000.0))
| "KeyByMerchant" >> beam.Map(lambda e: (e["merchant_id"], e["amount"])))
windowed = (payments
| "Window5Min" >> beam.WindowInto(
FixedWindows(300), # 5-minute tumbling
trigger=AfterWatermark(early=AfterProcessingTime(30)), # early refinements every 30s
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=600)) # 10 minutes for late events
totals = (windowed
| "SumPerMerchant" >> beam.CombinePerKey(sum)
| "FormatOutput" >> beam.Map(lambda kv: (kv[0].encode("utf-8"),
json.dumps({"merchant": kv[0], "volume_paise": kv[1],
"computed_at": datetime.datetime.utcnow().isoformat()}).encode("utf-8")))
| "WriteKafka" >> WriteToKafka(
producer_config={"bootstrap.servers": known.bootstrap},
topic="metrics.merchant_5min_volume.v1"))
# Local test on Direct runner (single process, in-memory):
$ python upi_volume_5min.py --runner DirectRunner --bootstrap localhost:9092
INFO:apache_beam.runners.direct: starting pipeline...
INFO: window [2026-04-25T11:30:00, 2026-04-25T11:35:00) merchant=razorpay-test volume=2_45_000 paise
# Production on Flink runner (Kubernetes, 12 task slots):
$ python upi_volume_5min.py --runner FlinkRunner --flink_master=flink-jm:8081 \
--environment_type=DOCKER --environment_config=apache/beam_python3.11_sdk:2.56.0
INFO:apache_beam.runners.portability.flink_runner: submitting JobGraph (487 vertices)...
INFO: job submitted: jobid=ae21b5..., state=RUNNING
INFO: throughput: 41,200 events/sec, watermark lag: 8.4s, checkpoint interval: 60s
# GCP Dataflow runner (managed, autoscaling):
$ python upi_volume_5min.py --runner DataflowRunner \
--temp_location=gs://swiggy-de-temp --staging_location=gs://swiggy-de-staging
INFO:apache_beam.runners.dataflow: workers: 4 (autoscaling 2-32), job_id=2026-04-25_11_31_08
The load-bearing pieces:
runner=known.runneris the only line that changes between environments. The same JobGraph submission works on three different cluster types.AssignTimestampis what tells the runner to treatevent_time_msas the event timestamp, not the Kafka ingestion timestamp. Why this matters at 11pm production: if you skipAssignTimestamp, the runner uses processing time for windowing. A 30-minute Kafka backlog after a consumer outage will then collapse all backlog events into "now"-shaped windows, ruining your 5-minute rolling metric. The pipeline still runs, the numbers are just wrong — the worst kind of bug.FixedWindows(300)plusAfterWatermark(early=AfterProcessingTime(30))is the Dataflow Model in two lines. The window says "group events by 5-minute event-time buckets". The trigger says "fire one final result when the watermark crosses the window-end, plus speculative early-refinement results every 30 processing-time seconds while the window is open". Why both: the early trigger gives the dashboard a near-real-time number for the in-progress window; the watermark trigger gives the final number that downstream consumers can trust as complete. Without the early trigger, the dashboard would only update at the close of each window, looking frozen for 5 minutes at a stretch.accumulation_mode=ACCUMULATINGmeans each refinement contains the cumulative window total, not a delta from the previous emission. The downstream sink overwrites the previous value for that window key. The alternative (DISCARDING) emits deltas, which the sink would have to add up.allowed_lateness=600keeps the window's state alive in the runner for 10 minutes past the watermark crossing. A late event (Kafka producer outage, mobile app retrying with stale buffered events) within that 10-minute grace fires another refinement. After 10 minutes, the state is garbage-collected and further-late events are dropped — see /wiki/late-arriving-data-and-the-backfill-problem.environment_type=DOCKERin the Flink command tells the Flink runner to spin up a Python SDK harness sidecar container per worker. The Java Flink TaskManager talks to the Python container over gRPC for everyParDo. This is the portability-framework cost: you pay one gRPC round-trip per element. For high-throughput pipelines, batching at the SDK harness boundary (controlled by--experiments=use_runner_v2,beam_fn_api) reduces the cost from per-element to per-batch.
In production at Swiggy, this pipeline runs on Flink. The Direct runner is for tests; the Dataflow runner is the option you keep in your back pocket if you ever decide GCP is cheaper.
Why Flink became the default Beam runner outside Google
Beam supports six runners (Flink, Spark, Dataflow, Samza, Jet, Direct), but in Indian production data the choice collapses to two: Dataflow if you're on GCP, Flink everywhere else. Three reasons.
Reason 1: capability parity with Dataflow. Flink independently implemented the Dataflow Model's primitives — event-time, watermarks, triggers, allowed lateness — because the Stratosphere project and the Google paper converged on the same vocabulary around 2014. The Beam capability matrix shows Flink at full coverage on every streaming feature except a few Dataflow-specific extensions (timers in the experimental Splittable DoFn API, certain stateful processing patterns).
Reason 2: cost. Dataflow's autoscaling and managed scheduler are excellent, but the GCP premium over self-managed Flink on EKS is typically 2-3× for Indian fintech-scale workloads (200k events/sec sustained, 50TB state). Razorpay's 2023 cost analysis (internal, but referenced in their tech-talk circuit) put their Flink-on-EKS bill at roughly ₹18 lakh/month against an extrapolated ₹50 lakh/month on Dataflow for the same throughput. At their scale the difference funds an SRE team.
Reason 3: state-store control. Flink's RocksDB-backed state with incremental checkpoints to S3 lets teams run pipelines with terabytes of keyed state. Dataflow handles state for you (and well), but the "well" is opaque — you can't tune compaction, you can't switch state backends, you can't read the state externally for debugging. For a 2 a.m. incident where the question is "is the user-session-state operator stuck on a hot key?", being able to SSH into the TaskManager and rocksdb_ldb scan the state is sometimes the difference between a 20-minute fix and a 4-hour fix.
The trade-off is that Beam-on-Flink is two products to operate. Flink itself has a JobManager, TaskManagers, RocksDB tuning, checkpoint storage configuration. Beam adds the Python SDK harness, the Fn API, the portable runner JobServer. Failures can come from either layer. The on-call runbook needs both.
Where the abstraction leaks
Beam's portability promise is "write once, run anywhere", but in practice you write once-and-test-on-the-target-runner. The leaks:
- I/O connectors are not fully portable. Beam's
KafkaIOworks on every runner, but the Flink runner uses Flink'sFlinkKafkaConsumerfor offsets/checkpoints while Spark uses its own. Restart semantics differ. A pipeline that recovers cleanly on Flink may double-count on Spark restart. - Custom state APIs differ. Beam's
StateSpecandTimerSpec(per-key timers) work fully on Flink and Dataflow, partially on Spark, not at all on Samza. Pipelines that rely on stateful processing tend to lock into Flink. - DoFn lifecycle.
setup/start_bundle/finish_bundle/teardownsemantics are runner-specific. Flink calls them per task slot; Dataflow calls them per worker; Direct calls them once. Asetupthat opens a database connection works fine on Flink (one connection per slot) but explodes on Direct (no parallelism, butsetupruns once anyway, leaks the connection in tests). - Performance characteristics. A Beam pipeline on Flink is typically 30–50% slower than equivalent native Flink Java code, because the cross-language portability framework adds gRPC overhead per element. For tight inner loops, teams sometimes drop to native PyFlink (
pyflink.datastream) and skip Beam. - Monitoring surface. Flink's JobManager UI shows operators with their Flink-native names ("KafkaSource → ParDo:FilterUPI → ParDo:KeyByMerchant → WindowedCombine"). Beam users don't recognise these as the transforms they wrote. Beam's own metrics surface helps, but the operational language splits between "Beam pipeline" (your code) and "Flink job" (what's actually running).
These leaks are not fatal — they're the cost of the abstraction. The right framing is: Beam is a portable description language for stream pipelines. The runner is still where the production behaviour lives. A senior engineer running Beam-on-Flink learns enough Flink to debug the runtime layer when the abstraction springs a leak.
Common confusions
- "Beam is a streaming framework." No, Beam is an SDK that targets streaming and batch frameworks. By itself it executes nothing — the Direct runner is a test harness, not a production engine. The thing actually running your pipeline is always Flink, Spark, Dataflow, or Samza.
- "Beam-on-Flink is the same as native Flink." Mechanically related but operationally different. Native Flink (DataStream API or PyFlink) gives you full Flink feature access and 30–50% better throughput. Beam gives you portability in exchange. Pick Beam when you'll genuinely use the portability (e.g. plan to migrate runners), pick native Flink when you'll stay on Flink and need the performance.
- "Apache Beam replaces Apache Flink." They sit at different layers — Beam is the model and SDK, Flink is the runtime. The Beam-on-Flink stack uses both. Flink-Beam-replacement comparisons in vendor blog posts are confused; the real comparison is "Beam-on-Flink vs PyFlink" or "Beam-on-Flink vs Beam-on-Dataflow".
- "Dataflow is open source via Beam." Beam is open source, Dataflow is not. The Dataflow service uses Google-internal scheduling, autoscaling, and state-management infrastructure that is not in the Beam codebase. You can run a Beam pipeline on the open-source Flink runner and get most of the same model semantics, but the Google service itself is closed.
- "Streaming and batch unification means I write one pipeline." True for the SDK level — the same
Pipelineobject can read a boundedPCollection(batch) or an unbounded one (streaming). False for the operational level — the production deploy of a batch pipeline (run-to-completion, output to a partition) and a streaming pipeline (continuously running, checkpoints, autoscaling) are completely different jobs. Beam unifies the code, not the operation. - "Switching runners is a config change." True for the runner ID. False for everything else: I/O configuration, state-store sizing, checkpoint storage, monitoring integration, and the on-call runbook all change with the runner. Plan a runner switch as a 2–4 week migration, not a one-line PR.
Going deeper
The Splittable DoFn API and why it took five years
Beam's original I/O abstraction (Source/Sink) was based on Hadoop-style splits — a source declared its splits up front, the runner farmed them to workers. This worked for batch and broke for streaming. The 2018 Splittable DoFn (SDF) proposal generalised: a source is a DoFn that can pause itself, return a checkpoint of its progress, and resume from another worker. This took until 2022 to fully ship across runners because each runner had to implement dynamic work rebalancing semantics (Flink, Spark, Dataflow). The reason it matters: SDF is what lets a Beam pipeline auto-split a Kafka partition that became a hot key, by having the source DoFn checkpoint and resume on a less-loaded worker. The full proposal is in BEAM-65; the Flink-runner support is documented in the Beam programming guide.
Why Spark Structured Streaming under Beam feels weird
Spark's execution model is micro-batch — it accumulates input for a small interval (default 100ms in Continuous Processing mode, 1s in micro-batch), then runs a batch query over it. Beam's model is per-event with watermark-driven triggers. The Beam Spark runner translates between these by accumulating Beam events into Spark batches, running the windowed aggregation, then emitting. The cost: latency floor of one micro-batch interval, no support for session windows (which require per-event triggering), weak support for processing-time triggers. Teams that started on Spark Structured Streaming and added Beam often end up rewriting to PyFlink within 12 months because the impedance mismatch shows up in production. Whereas Beam-on-Flink feels like Beam-on-Beam — every model primitive maps directly to a runtime primitive.
How Dataflow's auto-scaler and Flink's reactive mode compare
Dataflow's claim to fame is autoscaling: the service watches per-stage backlog and scales workers from 2 to 1000 in minutes. Flink's reactive mode (FLIP-159, 2021) and the more recent adaptive scheduler bring similar capabilities to self-managed Flink. The difference: Dataflow scales transparently with no operator action; Flink reactive mode requires you to run on Kubernetes with the right RBAC, the right state-storage configuration (S3 for incremental checkpoints), and the right scaling triggers wired up via the Kubernetes operator. The work to make Flink scale-on-load is one or two engineer-weeks; the work to make Dataflow scale-on-load is checking a box at deploy time. For a team without dedicated Flink SREs, this is the actual reason Dataflow stays attractive for greenfield builds even at the GCP premium.
Where Indian companies sit on Beam-vs-native-Flink in 2026
Razorpay's analytics team uses Beam-on-Flink with Python SDK because their feature engineering team needs Python and the portability story protects them if they ever migrate to GCP. Swiggy's ML platform uses native PyFlink because the latency budget for fraud-feature pipelines is tighter (sub-100ms p99 to influence the order-placement decision) and Beam's portability overhead exceeds that budget. Zerodha runs native Java Flink because their order-tick processing was originally on Storm and the Flink rewrite went straight to native DataStream API in 2020, before Beam-on-Flink Python was production-ready. PhonePe is split — older pipelines on PyFlink, newer ones on Beam-on-Flink for the portability story. The pattern is: Beam where multiple runners are realistic over the next 3 years, native runtime where they're not. The portability is insurance; the premium is performance.
Where this leads next
Beam closes the SDK-portability loop, but the unification thesis it embodies extends further:
- /wiki/the-dataflow-model-batch-as-bounded-streams — the paper that gave Beam its primitives. Read this if you want the theoretical story.
- /wiki/incremental-view-maintenance-as-the-endgame — what comes after pipeline-style streaming: a system that maintains the answer to a query incrementally without you writing any pipeline at all. Materialize, RisingWave, and Flink's table API live here.
- /wiki/checkpointing-the-consistent-snapshot-algorithm — the runner mechanism that makes Beam's exactly-once promise actually work on Flink.
- /wiki/flinks-two-phase-commit-sink — the sink-side mechanism that Beam pipelines on Flink quietly rely on for end-to-end exactly-once.
The pattern to internalise is the layering. Beam is a model and SDK layer. Flink is a runtime layer. Kafka is a transport and durability layer. Iceberg is a storage layer. A modern streaming stack at Razorpay or Swiggy uses all four, with each layer mostly unaware of the layers below. The portability Beam offers is at the model layer; portability across the runtime, transport, and storage layers requires its own abstractions (e.g. Iceberg gives you storage portability across query engines). When a vendor pitches "the unified streaming platform", ask which layer they're actually portable across — it's usually one of the four, dressed up as all four.
References
- The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing (Akidau et al., VLDB 2015) — the founding paper. Beam is a direct implementation.
- Apache Beam programming guide — the SDK reference. Read the windowing and triggers sections; skim the rest.
- Beam runner capability matrix — the honest "what works where" table. Bookmark this; check it before committing to a runner.
- Apache Flink: Stream and Batch Processing in a Single Engine (Carbone et al., IEEE Bulletin 2015) — the runtime that Beam pairs with most often, articulated by its authors.
- Streaming 102: The world beyond batch (Akidau, O'Reilly Radar 2016) — the practitioner companion to the VLDB paper. Read this if the paper feels too dense.
- BEAM-65: Splittable DoFn proposal — the I/O generalisation that took five years. Worth reading to understand why portable connectors were slow to land.
- /wiki/kappa-architecture-stream-only-and-reprocess-from-the-log — the architectural pattern that Beam-on-Flink most commonly implements.
- /wiki/the-dataflow-model-batch-as-bounded-streams — the Dataflow Model in depth.