Note: Company names, engineers, incidents, numbers, and scaling scenarios in this article are hypothetical — even when they resemble real ones. See the full disclaimer.

Beam and Flink: write once, run on both

A platform engineer at BhojanBox in 2024 has the same problem most Indian unicorns hit by year three: the analytics team runs Spark Structured Streaming on EMR, the ML team runs Flink on Kubernetes, and the new growth team wants Dataflow on GCP because the ad pixel sits there. Three runtimes, three dialects of "windowed group-by", three on-call rotations. The pitch for Apache Beam is exactly this misery: write the pipeline once in Beam's SDK, pick a runner per cluster, the code compiles down. The reality is more nuanced — Beam describes the what, the runner decides the how, and on Indian hardware with Indian latency budgets the runner you pick almost always ends up being Flink. This chapter is about why that pairing won, what Beam actually portable across, and where the abstraction leaks.

Apache Beam is a runner-agnostic pipeline SDK derived from Querion's Dataflow Model paper. You write PCollection transforms once, then submit the pipeline to a runner — Flink, Spark, Dataflow, or Samza — which compiles Beam primitives down to its native execution graph. Beam-on-Flink is the de-facto open-source production combo because Flink's checkpointing, watermarks, and state primitives align almost 1:1 with Beam's model. The portability is real for the unified batch/stream API and leaky at the runner-feature edges.

What Beam actually gives you: a model, not a runtime

Apache Beam started as Google's Cloud Dataflow SDK, donated to the ASF in 2016 along with the Dataflow Model paper (Akidau et al., VLDB 2015). The paper's claim was that the right primitives for unified batch and streaming are four: what is computed, where in event time it's computed (windowing), when in processing time results are emitted (triggers), and how refinements relate (accumulation mode). Beam is the SDK that exposes these four directly.

The pipeline you write is a directed acyclic graph of PTransforms operating on PCollections. Each PCollection is a logical bag of elements with an event-time timestamp and a window assignment. Your code never says "run on this thread" or "checkpoint every 60 seconds" — those are runner concerns. It says "group these by user_id within a 5-minute tumbling window, fire one output when the window closes plus a refinement every 30 seconds for late data".

Beam SDK to runner translationA diagram showing the Beam SDK at the top, the portable Beam pipeline representation in the middle, and four runner backends (Flink, Spark, Dataflow, Samza) at the bottom, with each runner translating Beam primitives to its native execution graph. Beam: one SDK, many runners Beam SDK (Python / Java / Go) PCollection, ParDo, GroupByKey, Window Portable pipeline IR (protobuf) runner-agnostic; describes WHAT, not HOW Flink runner checkpoints, watermarks, RocksDB state, EOS Spark runner micro-batches structured streaming Dataflow runner GCP managed, autoscaling Samza / Direct / Jet ProfNet / local-test / Hazelcast
You write against the SDK. The SDK lowers to a portable protobuf representation of the DAG. The runner translates that protobuf into its native execution graph (Flink JobGraph, Spark RDD lineage, Dataflow worker pool plan).

Why the IR is protobuf and not Java bytecode: the original Java-only Beam SDK could only target JVM runners. The 2018 portability framework introduced a language-neutral protobuf representation so a Python pipeline can run on a Java-based Flink runner via gRPC bridges (the "Fn API"). This is what makes Python-on-Flink-on-Kubernetes work without forcing the user to write Java — the user-defined functions run in a sidecar Python container that the Flink workers talk to over gRPC.

The four primitives map cleanly to the Dataflow Model:

Beam primitive Dataflow Model question Runner translation (Flink)
ParDo (per-element transform) what FlatMapFunction / ProcessFunction
Window.into(FixedWindows) where in event time WindowAssigner (TumblingEventTimeWindows)
Trigger.AfterWatermark() when in processing time Flink's Trigger interface
AccumulationMode.DISCARDING how refinements relate Window state retention policy

The translation is almost 1:1 for Flink because Flink's runtime was independently built around the same Dataflow Model. For Spark Structured Streaming, the translation is lossier — Spark's micro-batch model can approximate event-time triggers but not as precisely, which is why Beam-on-Spark has known feature gaps on session windows and late-data triggers (see the Beam capability matrix).

Same code, three runners: a worked example

Here is a Beam pipeline that computes per-merchant rolling 5-minute payment volume, filtered to UPI transactions over ₹1,000. The same Python file runs on the Flink runner (production), the Direct runner (local tests), and the Dataflow runner (if BhojanBox decided to ship to GCP next quarter).

# upi_volume_5min.py — single Beam pipeline, run on three runners
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
import argparse, json, datetime

parser = argparse.ArgumentParser()
parser.add_argument("--runner", choices=["FlinkRunner", "DirectRunner", "DataflowRunner"], required=True)
parser.add_argument("--bootstrap", default="kafka.swiggy-internal:9092")
known, pipeline_args = parser.parse_known_args()

opts = PipelineOptions(pipeline_args, runner=known.runner, streaming=True,
                       project="swiggy-data-eng", region="ap-south-1")

with beam.Pipeline(options=opts) as p:
    payments = (p
        | "ReadKafka" >> ReadFromKafka(
            consumer_config={"bootstrap.servers": known.bootstrap, "auto.offset.reset": "latest"},
            topics=["payments.captured.v1"])
        | "ParseJSON" >> beam.Map(lambda kv: json.loads(kv[1].decode("utf-8")))
        | "FilterUPI" >> beam.Filter(lambda e: e["method"] == "upi" and e["amount"] >= 1000_00)
        | "AssignTimestamp" >> beam.Map(
            lambda e: beam.window.TimestampedValue(e, e["event_time_ms"] / 1000.0))
        | "KeyByMerchant" >> beam.Map(lambda e: (e["merchant_id"], e["amount"])))

    windowed = (payments
        | "Window5Min" >> beam.WindowInto(
            FixedWindows(300),                                     # 5-minute tumbling
            trigger=AfterWatermark(early=AfterProcessingTime(30)), # early refinements every 30s
            accumulation_mode=AccumulationMode.ACCUMULATING,
            allowed_lateness=600))                                 # 10 minutes for late events

    totals = (windowed
        | "SumPerMerchant" >> beam.CombinePerKey(sum)
        | "FormatOutput" >> beam.Map(lambda kv: (kv[0].encode("utf-8"),
              json.dumps({"merchant": kv[0], "volume_paise": kv[1],
                          "computed_at": datetime.datetime.utcnow().isoformat()}).encode("utf-8")))
        | "WriteKafka" >> WriteToKafka(
            producer_config={"bootstrap.servers": known.bootstrap},
            topic="metrics.merchant_5min_volume.v1"))
# Local test on Direct runner (single process, in-memory):
$ python upi_volume_5min.py --runner DirectRunner --bootstrap localhost:9092
INFO:apache_beam.runners.direct: starting pipeline...
INFO: window [2026-04-25T11:30:00, 2026-04-25T11:35:00) merchant=razorpay-test volume=2_45_000 paise

# Production on Flink runner (Kubernetes, 12 task slots):
$ python upi_volume_5min.py --runner FlinkRunner --flink_master=flink-jm:8081 \
  --environment_type=DOCKER --environment_config=apache/beam_python3.11_sdk:2.56.0
INFO:apache_beam.runners.portability.flink_runner: submitting JobGraph (487 vertices)...
INFO: job submitted: jobid=ae21b5..., state=RUNNING
INFO: throughput: 41,200 events/sec, watermark lag: 8.4s, checkpoint interval: 60s

# GCP Dataflow runner (managed, autoscaling):
$ python upi_volume_5min.py --runner DataflowRunner \
  --temp_location=gs://swiggy-de-temp --staging_location=gs://swiggy-de-staging
INFO:apache_beam.runners.dataflow: workers: 4 (autoscaling 2-32), job_id=2026-04-25_11_31_08

The load-bearing pieces:

  • runner=known.runner is the only line that changes between environments. The same JobGraph submission works on three different cluster types.
  • AssignTimestamp is what tells the runner to treat event_time_ms as the event timestamp, not the Kafka ingestion timestamp. Why this matters at 11pm production: if you skip AssignTimestamp, the runner uses processing time for windowing. A 30-minute Kafka backlog after a consumer outage will then collapse all backlog events into "now"-shaped windows, ruining your 5-minute rolling metric. The pipeline still runs, the numbers are just wrong — the worst kind of bug.
  • FixedWindows(300) plus AfterWatermark(early=AfterProcessingTime(30)) is the Dataflow Model in two lines. The window says "group events by 5-minute event-time buckets". The trigger says "fire one final result when the watermark crosses the window-end, plus speculative early-refinement results every 30 processing-time seconds while the window is open". Why both: the early trigger gives the dashboard a near-real-time number for the in-progress window; the watermark trigger gives the final number that downstream consumers can trust as complete. Without the early trigger, the dashboard would only update at the close of each window, looking frozen for 5 minutes at a stretch.
  • accumulation_mode=ACCUMULATING means each refinement contains the cumulative window total, not a delta from the previous emission. The downstream sink overwrites the previous value for that window key. The alternative (DISCARDING) emits deltas, which the sink would have to add up.
  • allowed_lateness=600 keeps the window's state alive in the runner for 10 minutes past the watermark crossing. A late event (Kafka producer outage, mobile app retrying with stale buffered events) within that 10-minute grace fires another refinement. After 10 minutes, the state is garbage-collected and further-late events are dropped — see /wiki/late-arriving-data-and-the-backfill-problem.
  • environment_type=DOCKER in the Flink command tells the Flink runner to spin up a Python SDK harness sidecar container per worker. The Java Flink TaskManager talks to the Python container over gRPC for every ParDo. This is the portability-framework cost: you pay one gRPC round-trip per element. For high-throughput pipelines, batching at the SDK harness boundary (controlled by --experiments=use_runner_v2,beam_fn_api) reduces the cost from per-element to per-batch.

In production at BhojanBox, this pipeline runs on Flink. The Direct runner is for tests; the Dataflow runner is the option you keep in your back pocket if you ever decide GCP is cheaper.

Beam supports six runners (Flink, Spark, Dataflow, Samza, Jet, Direct), but in Indian production data the choice collapses to two: Dataflow if you're on GCP, Flink everywhere else. Three reasons.

Beam runner feature comparisonA comparison table showing Flink, Spark, and Dataflow runners scored on event-time windowing fidelity, exactly-once delivery, state primitives, and operational maturity. Where the runners actually differ capability Flink Spark Structured Dataflow event-time triggers full Dataflow Model subset (no session) full (Querion reference) exactly-once checkpoints + 2PC sink at-least-once for sinks native state primitives RocksDB-backed, rich in-memory only managed, opaque latency floor ~100ms ~500ms (micro-batch) ~100ms cost (typical Indian) EKS/own k8s EMR, similar 2-3× managed premium on-call burden you run it you run it Querion runs it
Beam-on-Flink covers the same capability surface as Beam-on-Dataflow without the managed-service premium. Beam-on-Spark trades feature completeness for an existing batch deployment your org already runs.

Reason 1: capability parity with Dataflow. Flink independently implemented the Dataflow Model's primitives — event-time, watermarks, triggers, allowed lateness — because the Stratosphere project and the Querion paper converged on the same vocabulary around 2014. The Beam capability matrix shows Flink at full coverage on every streaming feature except a few Dataflow-specific extensions (timers in the experimental Splittable DoFn API, certain stateful processing patterns).

Reason 2: cost. Dataflow's autoscaling and managed scheduler are excellent, but the GCP premium over self-managed Flink on EKS is typically 2-3× for Indian fintech-scale workloads (200k events/sec sustained, 50TB state). PaisaBridge's 2023 cost analysis (internal, but referenced in their tech-talk circuit) put their Flink-on-EKS bill at roughly ₹18 lakh/month against an extrapolated ₹50 lakh/month on Dataflow for the same throughput. At their scale the difference funds an SRE team.

Reason 3: state-store control. Flink's RocksDB-backed state with incremental checkpoints to S3 lets teams run pipelines with terabytes of keyed state. Dataflow handles state for you (and well), but the "well" is opaque — you can't tune compaction, you can't switch state backends, you can't read the state externally for debugging. For a 2 a.m. incident where the question is "is the user-session-state operator stuck on a hot key?", being able to SSH into the TaskManager and rocksdb_ldb scan the state is sometimes the difference between a 20-minute fix and a 4-hour fix.

The trade-off is that Beam-on-Flink is two products to operate. Flink itself has a JobManager, TaskManagers, RocksDB tuning, checkpoint storage configuration. Beam adds the Python SDK harness, the Fn API, the portable runner JobServer. Failures can come from either layer. The on-call runbook needs both.

Where the abstraction leaks

Beam's portability promise is "write once, run anywhere", but in practice you write once-and-test-on-the-target-runner. The leaks:

  • I/O connectors are not fully portable. Beam's KafkaIO works on every runner, but the Flink runner uses Flink's FlinkKafkaConsumer for offsets/checkpoints while Spark uses its own. Restart semantics differ. A pipeline that recovers cleanly on Flink may double-count on Spark restart.
  • Custom state APIs differ. Beam's StateSpec and TimerSpec (per-key timers) work fully on Flink and Dataflow, partially on Spark, not at all on Samza. Pipelines that rely on stateful processing tend to lock into Flink.
  • DoFn lifecycle. setup/start_bundle/finish_bundle/teardown semantics are runner-specific. Flink calls them per task slot; Dataflow calls them per worker; Direct calls them once. A setup that opens a database connection works fine on Flink (one connection per slot) but explodes on Direct (no parallelism, but setup runs once anyway, leaks the connection in tests).
  • Performance characteristics. A Beam pipeline on Flink is typically 30–50% slower than equivalent native Flink Java code, because the cross-language portability framework adds gRPC overhead per element. For tight inner loops, teams sometimes drop to native PyFlink (pyflink.datastream) and skip Beam.
  • Monitoring surface. Flink's JobManager UI shows operators with their Flink-native names ("KafkaSource → ParDo:FilterUPI → ParDo:KeyByMerchant → WindowedCombine"). Beam users don't recognise these as the transforms they wrote. Beam's own metrics surface helps, but the operational language splits between "Beam pipeline" (your code) and "Flink job" (what's actually running).

These leaks are not fatal — they're the cost of the abstraction. The right framing is: Beam is a portable description language for stream pipelines. The runner is still where the production behaviour lives. A senior engineer running Beam-on-Flink learns enough Flink to debug the runtime layer when the abstraction springs a leak.

Common confusions

  • "Beam is a streaming framework." No, Beam is an SDK that targets streaming and batch frameworks. By itself it executes nothing — the Direct runner is a test harness, not a production engine. The thing actually running your pipeline is always Flink, Spark, Dataflow, or Samza.
  • "Beam-on-Flink is the same as native Flink." Mechanically related but operationally different. Native Flink (DataStream API or PyFlink) gives you full Flink feature access and 30–50% better throughput. Beam gives you portability in exchange. Pick Beam when you'll genuinely use the portability (e.g. plan to migrate runners), pick native Flink when you'll stay on Flink and need the performance.
  • "Apache Beam replaces Apache Flink." They sit at different layers — Beam is the model and SDK, Flink is the runtime. The Beam-on-Flink stack uses both. Flink-Beam-replacement comparisons in vendor blog posts are confused; the real comparison is "Beam-on-Flink vs PyFlink" or "Beam-on-Flink vs Beam-on-Dataflow".
  • "Dataflow is open source via Beam." Beam is open source, Dataflow is not. The Dataflow service uses Querion-internal scheduling, autoscaling, and state-management infrastructure that is not in the Beam codebase. You can run a Beam pipeline on the open-source Flink runner and get most of the same model semantics, but the Querion service itself is closed.
  • "Streaming and batch unification means I write one pipeline." True for the SDK level — the same Pipeline object can read a bounded PCollection (batch) or an unbounded one (streaming). False for the operational level — the production deploy of a batch pipeline (run-to-completion, output to a partition) and a streaming pipeline (continuously running, checkpoints, autoscaling) are completely different jobs. Beam unifies the code, not the operation.
  • "Switching runners is a config change." True for the runner ID. False for everything else: I/O configuration, state-store sizing, checkpoint storage, monitoring integration, and the on-call runbook all change with the runner. Plan a runner switch as a 2–4 week migration, not a one-line PR.

Going deeper

The Splittable DoFn API and why it took five years

Beam's original I/O abstraction (Source/Sink) was based on Hadoop-style splits — a source declared its splits up front, the runner farmed them to workers. This worked for batch and broke for streaming. The 2018 Splittable DoFn (SDF) proposal generalised: a source is a DoFn that can pause itself, return a checkpoint of its progress, and resume from another worker. This took until 2022 to fully ship across runners because each runner had to implement dynamic work rebalancing semantics (Flink, Spark, Dataflow). The reason it matters: SDF is what lets a Beam pipeline auto-split a Kafka partition that became a hot key, by having the source DoFn checkpoint and resume on a less-loaded worker. The full proposal is in BEAM-65; the Flink-runner support is documented in the Beam programming guide.

Why Spark Structured Streaming under Beam feels weird

Spark's execution model is micro-batch — it accumulates input for a small interval (default 100ms in Continuous Processing mode, 1s in micro-batch), then runs a batch query over it. Beam's model is per-event with watermark-driven triggers. The Beam Spark runner translates between these by accumulating Beam events into Spark batches, running the windowed aggregation, then emitting. The cost: latency floor of one micro-batch interval, no support for session windows (which require per-event triggering), weak support for processing-time triggers. Teams that started on Spark Structured Streaming and added Beam often end up rewriting to PyFlink within 12 months because the impedance mismatch shows up in production. Whereas Beam-on-Flink feels like Beam-on-Beam — every model primitive maps directly to a runtime primitive.

Dataflow's claim to fame is autoscaling: the service watches per-stage backlog and scales workers from 2 to 1000 in minutes. Flink's reactive mode (FLIP-159, 2021) and the more recent adaptive scheduler bring similar capabilities to self-managed Flink. The difference: Dataflow scales transparently with no operator action; Flink reactive mode requires you to run on Kubernetes with the right RBAC, the right state-storage configuration (S3 for incremental checkpoints), and the right scaling triggers wired up via the Kubernetes operator. The work to make Flink scale-on-load is one or two engineer-weeks; the work to make Dataflow scale-on-load is checking a box at deploy time. For a team without dedicated Flink SREs, this is the actual reason Dataflow stays attractive for greenfield builds even at the GCP premium.

PaisaBridge's analytics team uses Beam-on-Flink with Python SDK because their feature engineering team needs Python and the portability story protects them if they ever migrate to GCP. BhojanBox's ML platform uses native PyFlink because the latency budget for fraud-feature pipelines is tighter (sub-100ms p99 to influence the order-placement decision) and Beam's portability overhead exceeds that budget. ParakhTrade runs native Java Flink because their order-tick processing was originally on Storm and the Flink rewrite went straight to native DataStream API in 2020, before Beam-on-Flink Python was production-ready. DigiPaisa is split — older pipelines on PyFlink, newer ones on Beam-on-Flink for the portability story. The pattern is: Beam where multiple runners are realistic over the next 3 years, native runtime where they're not. The portability is insurance; the premium is performance.

Where this leads next

Beam closes the SDK-portability loop, but the unification thesis it embodies extends further:

The pattern to internalise is the layering. Beam is a model and SDK layer. Flink is a runtime layer. Kafka is a transport and durability layer. Iceberg is a storage layer. A modern streaming stack at PaisaBridge or BhojanBox uses all four, with each layer mostly unaware of the layers below. The portability Beam offers is at the model layer; portability across the runtime, transport, and storage layers requires its own abstractions (e.g. Iceberg gives you storage portability across query engines). When a vendor pitches "the unified streaming platform", ask which layer they're actually portable across — it's usually one of the four, dressed up as all four.

References