Beam and Flink: write once, run on both

A platform engineer at Swiggy in 2024 has the same problem most Indian unicorns hit by year three: the analytics team runs Spark Structured Streaming on EMR, the ML team runs Flink on Kubernetes, and the new growth team wants Dataflow on GCP because the ad pixel sits there. Three runtimes, three dialects of "windowed group-by", three on-call rotations. The pitch for Apache Beam is exactly this misery: write the pipeline once in Beam's SDK, pick a runner per cluster, the code compiles down. The reality is more nuanced — Beam describes the what, the runner decides the how, and on Indian hardware with Indian latency budgets the runner you pick almost always ends up being Flink. This chapter is about why that pairing won, what Beam actually portable across, and where the abstraction leaks.

Apache Beam is a runner-agnostic pipeline SDK derived from Google's Dataflow Model paper. You write PCollection transforms once, then submit the pipeline to a runner — Flink, Spark, Dataflow, or Samza — which compiles Beam primitives down to its native execution graph. Beam-on-Flink is the de-facto open-source production combo because Flink's checkpointing, watermarks, and state primitives align almost 1:1 with Beam's model. The portability is real for the unified batch/stream API and leaky at the runner-feature edges.

What Beam actually gives you: a model, not a runtime

Apache Beam started as Google's Cloud Dataflow SDK, donated to the ASF in 2016 along with the Dataflow Model paper (Akidau et al., VLDB 2015). The paper's claim was that the right primitives for unified batch and streaming are four: what is computed, where in event time it's computed (windowing), when in processing time results are emitted (triggers), and how refinements relate (accumulation mode). Beam is the SDK that exposes these four directly.

The pipeline you write is a directed acyclic graph of PTransforms operating on PCollections. Each PCollection is a logical bag of elements with an event-time timestamp and a window assignment. Your code never says "run on this thread" or "checkpoint every 60 seconds" — those are runner concerns. It says "group these by user_id within a 5-minute tumbling window, fire one output when the window closes plus a refinement every 30 seconds for late data".

Beam SDK to runner translationA diagram showing the Beam SDK at the top, the portable Beam pipeline representation in the middle, and four runner backends (Flink, Spark, Dataflow, Samza) at the bottom, with each runner translating Beam primitives to its native execution graph. Beam: one SDK, many runners Beam SDK (Python / Java / Go) PCollection, ParDo, GroupByKey, Window Portable pipeline IR (protobuf) runner-agnostic; describes WHAT, not HOW Flink runner checkpoints, watermarks, RocksDB state, EOS Spark runner micro-batches structured streaming Dataflow runner GCP managed, autoscaling Samza / Direct / Jet LinkedIn / local-test / Hazelcast
You write against the SDK. The SDK lowers to a portable protobuf representation of the DAG. The runner translates that protobuf into its native execution graph (Flink JobGraph, Spark RDD lineage, Dataflow worker pool plan).

Why the IR is protobuf and not Java bytecode: the original Java-only Beam SDK could only target JVM runners. The 2018 portability framework introduced a language-neutral protobuf representation so a Python pipeline can run on a Java-based Flink runner via gRPC bridges (the "Fn API"). This is what makes Python-on-Flink-on-Kubernetes work without forcing the user to write Java — the user-defined functions run in a sidecar Python container that the Flink workers talk to over gRPC.

The four primitives map cleanly to the Dataflow Model:

Beam primitive Dataflow Model question Runner translation (Flink)
ParDo (per-element transform) what FlatMapFunction / ProcessFunction
Window.into(FixedWindows) where in event time WindowAssigner (TumblingEventTimeWindows)
Trigger.AfterWatermark() when in processing time Flink's Trigger interface
AccumulationMode.DISCARDING how refinements relate Window state retention policy

The translation is almost 1:1 for Flink because Flink's runtime was independently built around the same Dataflow Model. For Spark Structured Streaming, the translation is lossier — Spark's micro-batch model can approximate event-time triggers but not as precisely, which is why Beam-on-Spark has known feature gaps on session windows and late-data triggers (see the Beam capability matrix).

Same code, three runners: a worked example

Here is a Beam pipeline that computes per-merchant rolling 5-minute payment volume, filtered to UPI transactions over ₹1,000. The same Python file runs on the Flink runner (production), the Direct runner (local tests), and the Dataflow runner (if Swiggy decided to ship to GCP next quarter).

# upi_volume_5min.py — single Beam pipeline, run on three runners
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
import argparse, json, datetime

parser = argparse.ArgumentParser()
parser.add_argument("--runner", choices=["FlinkRunner", "DirectRunner", "DataflowRunner"], required=True)
parser.add_argument("--bootstrap", default="kafka.swiggy-internal:9092")
known, pipeline_args = parser.parse_known_args()

opts = PipelineOptions(pipeline_args, runner=known.runner, streaming=True,
                       project="swiggy-data-eng", region="ap-south-1")

with beam.Pipeline(options=opts) as p:
    payments = (p
        | "ReadKafka" >> ReadFromKafka(
            consumer_config={"bootstrap.servers": known.bootstrap, "auto.offset.reset": "latest"},
            topics=["payments.captured.v1"])
        | "ParseJSON" >> beam.Map(lambda kv: json.loads(kv[1].decode("utf-8")))
        | "FilterUPI" >> beam.Filter(lambda e: e["method"] == "upi" and e["amount"] >= 1000_00)
        | "AssignTimestamp" >> beam.Map(
            lambda e: beam.window.TimestampedValue(e, e["event_time_ms"] / 1000.0))
        | "KeyByMerchant" >> beam.Map(lambda e: (e["merchant_id"], e["amount"])))

    windowed = (payments
        | "Window5Min" >> beam.WindowInto(
            FixedWindows(300),                                     # 5-minute tumbling
            trigger=AfterWatermark(early=AfterProcessingTime(30)), # early refinements every 30s
            accumulation_mode=AccumulationMode.ACCUMULATING,
            allowed_lateness=600))                                 # 10 minutes for late events

    totals = (windowed
        | "SumPerMerchant" >> beam.CombinePerKey(sum)
        | "FormatOutput" >> beam.Map(lambda kv: (kv[0].encode("utf-8"),
              json.dumps({"merchant": kv[0], "volume_paise": kv[1],
                          "computed_at": datetime.datetime.utcnow().isoformat()}).encode("utf-8")))
        | "WriteKafka" >> WriteToKafka(
            producer_config={"bootstrap.servers": known.bootstrap},
            topic="metrics.merchant_5min_volume.v1"))
# Local test on Direct runner (single process, in-memory):
$ python upi_volume_5min.py --runner DirectRunner --bootstrap localhost:9092
INFO:apache_beam.runners.direct: starting pipeline...
INFO: window [2026-04-25T11:30:00, 2026-04-25T11:35:00) merchant=razorpay-test volume=2_45_000 paise

# Production on Flink runner (Kubernetes, 12 task slots):
$ python upi_volume_5min.py --runner FlinkRunner --flink_master=flink-jm:8081 \
  --environment_type=DOCKER --environment_config=apache/beam_python3.11_sdk:2.56.0
INFO:apache_beam.runners.portability.flink_runner: submitting JobGraph (487 vertices)...
INFO: job submitted: jobid=ae21b5..., state=RUNNING
INFO: throughput: 41,200 events/sec, watermark lag: 8.4s, checkpoint interval: 60s

# GCP Dataflow runner (managed, autoscaling):
$ python upi_volume_5min.py --runner DataflowRunner \
  --temp_location=gs://swiggy-de-temp --staging_location=gs://swiggy-de-staging
INFO:apache_beam.runners.dataflow: workers: 4 (autoscaling 2-32), job_id=2026-04-25_11_31_08

The load-bearing pieces:

In production at Swiggy, this pipeline runs on Flink. The Direct runner is for tests; the Dataflow runner is the option you keep in your back pocket if you ever decide GCP is cheaper.

Why Flink became the default Beam runner outside Google

Beam supports six runners (Flink, Spark, Dataflow, Samza, Jet, Direct), but in Indian production data the choice collapses to two: Dataflow if you're on GCP, Flink everywhere else. Three reasons.

Beam runner feature comparisonA comparison table showing Flink, Spark, and Dataflow runners scored on event-time windowing fidelity, exactly-once delivery, state primitives, and operational maturity. Where the runners actually differ capability Flink Spark Structured Dataflow event-time triggers full Dataflow Model subset (no session) full (Google reference) exactly-once checkpoints + 2PC sink at-least-once for sinks native state primitives RocksDB-backed, rich in-memory only managed, opaque latency floor ~100ms ~500ms (micro-batch) ~100ms cost (typical Indian) EKS/own k8s EMR, similar 2-3× managed premium on-call burden you run it you run it Google runs it
Beam-on-Flink covers the same capability surface as Beam-on-Dataflow without the managed-service premium. Beam-on-Spark trades feature completeness for an existing batch deployment your org already runs.

Reason 1: capability parity with Dataflow. Flink independently implemented the Dataflow Model's primitives — event-time, watermarks, triggers, allowed lateness — because the Stratosphere project and the Google paper converged on the same vocabulary around 2014. The Beam capability matrix shows Flink at full coverage on every streaming feature except a few Dataflow-specific extensions (timers in the experimental Splittable DoFn API, certain stateful processing patterns).

Reason 2: cost. Dataflow's autoscaling and managed scheduler are excellent, but the GCP premium over self-managed Flink on EKS is typically 2-3× for Indian fintech-scale workloads (200k events/sec sustained, 50TB state). Razorpay's 2023 cost analysis (internal, but referenced in their tech-talk circuit) put their Flink-on-EKS bill at roughly ₹18 lakh/month against an extrapolated ₹50 lakh/month on Dataflow for the same throughput. At their scale the difference funds an SRE team.

Reason 3: state-store control. Flink's RocksDB-backed state with incremental checkpoints to S3 lets teams run pipelines with terabytes of keyed state. Dataflow handles state for you (and well), but the "well" is opaque — you can't tune compaction, you can't switch state backends, you can't read the state externally for debugging. For a 2 a.m. incident where the question is "is the user-session-state operator stuck on a hot key?", being able to SSH into the TaskManager and rocksdb_ldb scan the state is sometimes the difference between a 20-minute fix and a 4-hour fix.

The trade-off is that Beam-on-Flink is two products to operate. Flink itself has a JobManager, TaskManagers, RocksDB tuning, checkpoint storage configuration. Beam adds the Python SDK harness, the Fn API, the portable runner JobServer. Failures can come from either layer. The on-call runbook needs both.

Where the abstraction leaks

Beam's portability promise is "write once, run anywhere", but in practice you write once-and-test-on-the-target-runner. The leaks:

These leaks are not fatal — they're the cost of the abstraction. The right framing is: Beam is a portable description language for stream pipelines. The runner is still where the production behaviour lives. A senior engineer running Beam-on-Flink learns enough Flink to debug the runtime layer when the abstraction springs a leak.

Common confusions

Going deeper

The Splittable DoFn API and why it took five years

Beam's original I/O abstraction (Source/Sink) was based on Hadoop-style splits — a source declared its splits up front, the runner farmed them to workers. This worked for batch and broke for streaming. The 2018 Splittable DoFn (SDF) proposal generalised: a source is a DoFn that can pause itself, return a checkpoint of its progress, and resume from another worker. This took until 2022 to fully ship across runners because each runner had to implement dynamic work rebalancing semantics (Flink, Spark, Dataflow). The reason it matters: SDF is what lets a Beam pipeline auto-split a Kafka partition that became a hot key, by having the source DoFn checkpoint and resume on a less-loaded worker. The full proposal is in BEAM-65; the Flink-runner support is documented in the Beam programming guide.

Why Spark Structured Streaming under Beam feels weird

Spark's execution model is micro-batch — it accumulates input for a small interval (default 100ms in Continuous Processing mode, 1s in micro-batch), then runs a batch query over it. Beam's model is per-event with watermark-driven triggers. The Beam Spark runner translates between these by accumulating Beam events into Spark batches, running the windowed aggregation, then emitting. The cost: latency floor of one micro-batch interval, no support for session windows (which require per-event triggering), weak support for processing-time triggers. Teams that started on Spark Structured Streaming and added Beam often end up rewriting to PyFlink within 12 months because the impedance mismatch shows up in production. Whereas Beam-on-Flink feels like Beam-on-Beam — every model primitive maps directly to a runtime primitive.

How Dataflow's auto-scaler and Flink's reactive mode compare

Dataflow's claim to fame is autoscaling: the service watches per-stage backlog and scales workers from 2 to 1000 in minutes. Flink's reactive mode (FLIP-159, 2021) and the more recent adaptive scheduler bring similar capabilities to self-managed Flink. The difference: Dataflow scales transparently with no operator action; Flink reactive mode requires you to run on Kubernetes with the right RBAC, the right state-storage configuration (S3 for incremental checkpoints), and the right scaling triggers wired up via the Kubernetes operator. The work to make Flink scale-on-load is one or two engineer-weeks; the work to make Dataflow scale-on-load is checking a box at deploy time. For a team without dedicated Flink SREs, this is the actual reason Dataflow stays attractive for greenfield builds even at the GCP premium.

Where Indian companies sit on Beam-vs-native-Flink in 2026

Razorpay's analytics team uses Beam-on-Flink with Python SDK because their feature engineering team needs Python and the portability story protects them if they ever migrate to GCP. Swiggy's ML platform uses native PyFlink because the latency budget for fraud-feature pipelines is tighter (sub-100ms p99 to influence the order-placement decision) and Beam's portability overhead exceeds that budget. Zerodha runs native Java Flink because their order-tick processing was originally on Storm and the Flink rewrite went straight to native DataStream API in 2020, before Beam-on-Flink Python was production-ready. PhonePe is split — older pipelines on PyFlink, newer ones on Beam-on-Flink for the portability story. The pattern is: Beam where multiple runners are realistic over the next 3 years, native runtime where they're not. The portability is insurance; the premium is performance.

Where this leads next

Beam closes the SDK-portability loop, but the unification thesis it embodies extends further:

The pattern to internalise is the layering. Beam is a model and SDK layer. Flink is a runtime layer. Kafka is a transport and durability layer. Iceberg is a storage layer. A modern streaming stack at Razorpay or Swiggy uses all four, with each layer mostly unaware of the layers below. The portability Beam offers is at the model layer; portability across the runtime, transport, and storage layers requires its own abstractions (e.g. Iceberg gives you storage portability across query engines). When a vendor pitches "the unified streaming platform", ask which layer they're actually portable across — it's usually one of the four, dressed up as all four.

References