Stateless operators: map, filter, the easy part

It is 04:11 on a Tuesday at a Bengaluru fintech. Riya, the on-call engineer, gets paged: the card-events Flink job is in RESTARTING and the upstream Kafka lag is at 1.4 million events. She opens the Grafana dashboard, sees that the only operators that have backed up are the stateful aggregators, and that the stateless parseJson → filterByCountry → enrichBin pipeline at the front of the job restarted in 11 seconds without re-reading anything except its committed offset. She fixes the aggregator's RocksDB allocation, the cluster recovers, and she goes back to bed at 04:38. The reason her night was 27 minutes and not 4 hours is that the front of her job was stateless, and stateless operators have a property the rest of streaming spends entire books trying to recover: they cannot be wrong on restart.

A stateless operator is a pure function from one input event to zero or more output events — map, filter, flatMap, schema validation, projection, format conversion. Because the operator carries no memory between events, restarting it just means resuming reads from the last committed offset; correctness comes for free. The hard parts of streaming (windows, joins, watermarks, exactly-once) only appear when an operator has to remember something across events. Knowing exactly which operators are stateless — and resisting the urge to "just add a small dict" — is what keeps the easy 5% of your pipeline from contaminating the hard 95%.

What "stateless" actually means

A stateless operator is a function f such that the output for event e_i depends only on e_i. Not on any earlier event, not on any wall-clock time, not on any external mutable lookup. Formally:

f(e_i) = output_i        # depends on e_i alone
state(after e_i) = state(before e_i)   # i.e. nothing

The three common stateless operators are map, filter, and flatMap. A fourth — keyBy in Flink, groupByKey in Spark — is technically stateless too: it doesn't carry memory, it just routes events to the operator instance responsible for that key. After keyBy, the next operator is usually stateful, but keyBy itself isn't.

The three shapes of stateless operatorsThree lanes showing map (one in, one out), filter (one in, zero or one out), and flatMap (one in, zero or many out). Each lane shows input events flowing into a function box and output events emerging on the other side, with the explicit absence of state. Three shapes — what comes in, what goes out, no memory map: one in → exactly one out f(e) = e.upper() filter: one in → zero or one out p(e) = e.amount > 0 flatMap: one in → zero or many out e.split(' ')
map preserves cardinality. filter shrinks it. flatMap can expand it. None of the three remembers any earlier event — the function box has no arrow looping back to itself.

The defining property is the absence of that loopback arrow. Why this matters for restart: when the operator has no internal state, a restart is just "open the source from the last committed offset and call f again". There is no "warming up", no "rebuilding the dict", no "waiting for the watermark to advance" — the operator is conceptually identical at offset N before and after the restart, because nothing changed about it across events. This is the property Riya's Flink job leaned on at 04:11.

The standard stateless operators in production

A working pipeline that consumes from Kafka and writes to a downstream sink almost always has a stateless prefix and a (possibly empty) stateful body. The prefix does the boring, parallelisable, restart-safe work. Knowing exactly which work is in the prefix prevents the mistake of accidentally introducing state into it.

The complete list:

  1. map — one event in, exactly one event out. Examples: parse the JSON body, rename a field, convert a unit, compute a hash for downstream routing, redact a PII column, attach a fixed enrichment looked up from a static config.
  2. filter — keep events that satisfy a predicate; drop the rest. Examples: drop test merchants, drop events with amount < 0, drop events from regions the consumer doesn't serve, drop heartbeat pings.
  3. flatMap — one event in, zero or more out. Examples: split a multi-line log entry into one event per line, expand a "basket-of-items" event into one event per item, emit "begin" + "end" events from a single transaction event.
  4. Projection (select in SQL terms) — keep a subset of fields. Strictly a special case of map, but called out separately because it appears in every pipeline and changes the on-wire size dramatically.
  5. Schema validation — run each event through a schema check; route bad ones to a dead-letter queue, pass good ones through. Implemented as flatMap with two outputs: the main stream and a side output for rejects.
  6. Format conversion — JSON → Avro, Avro → Parquet-row, Protobuf → Iceberg-row. A wide map. The runtime treats it as stateless even though it does CPU work; there is no memory of earlier events.
  7. Static enrichmentevent.merchant_id joined against a small read-only table that the operator loads at startup (BIN ranges, country code lookups, currency tables). The lookup table is data, but it is not state in the streaming sense: it is read-only, identical across all operator instances, and reloaded on a restart. Why this is still stateless: the runtime's correctness story doesn't change. Restart the operator, reload the static table from the same source, replay events from the committed offset — same outputs. The lookup is a pure function of input + config, exactly like a CPU's instruction decoder.

What is not in this list, and why each one isn't: dedup needs to remember "have I seen this id?" — state. Aggregation (count, sum, avg) needs a running value — state. Joins need at least one side's events held in memory — state. Anything windowed — state. Anything "compared to the previous event" — state. Anything that emits "first occurrence of X" — state.

Building a stateless prefix from scratch

Rather than treat stateless operators as a Flink configuration detail, write one end-to-end. The pipeline below reads JSON card-transaction events from Kafka, parses them, validates a schema, drops test merchants, redacts PII, and writes the transformed event to a downstream topic. There is no aggregation, no window, no join — and that is the point. Re-running it from any offset, by any replica, on any number of partitions, produces the same per-record outputs.

# stateless_prefix.py — runs continuously; restart-safe by construction
from kafka import KafkaConsumer, KafkaProducer
import json, hashlib, sys, signal

# Static enrichment loaded once at startup. Read-only. Re-loaded on restart.
BIN_TO_BANK = {
    "411111": ("HDFC", "credit"), "412345": ("ICICI", "debit"),
    "421777": ("SBI",  "credit"), "476173": ("Axis", "debit"),
}
TEST_MERCHANTS = {"merch_test_001", "merch_test_002", "merch_demo"}

REQUIRED_FIELDS = {"txn_id", "user_id", "amount", "merchant", "card_bin", "event_time"}

def parse(raw_bytes):
    return json.loads(raw_bytes)                                 # map: bytes -> dict

def validate(ev):
    missing = REQUIRED_FIELDS - ev.keys()
    if missing:
        return None, f"schema: missing {missing}"               # rejected
    if not isinstance(ev["amount"], (int, float)) or ev["amount"] <= 0:
        return None, "schema: amount must be positive number"
    return ev, None

def is_test(ev):
    return ev["merchant"] in TEST_MERCHANTS                     # filter predicate

def redact(ev):
    # map: hash the user_id with a project-wide salt for downstream join keys
    ev["user_hash"] = hashlib.sha256(("padho:" + ev["user_id"]).encode()).hexdigest()[:16]
    del ev["user_id"]
    return ev

def enrich(ev):
    bank, typ = BIN_TO_BANK.get(ev["card_bin"], ("UNKNOWN", "unknown"))
    ev["bank"], ev["card_type"] = bank, typ                     # map: static lookup
    return ev

def main():
    consumer = KafkaConsumer(
        "card.txns.raw",
        bootstrap_servers="kafka.razorpay.local:9092",
        group_id="stateless-prefix",
        auto_offset_reset="earliest",
        enable_auto_commit=False,                               # we commit after produce
        value_deserializer=lambda b: b,                         # raw bytes; parse below
    )
    producer = KafkaProducer(
        bootstrap_servers="kafka.razorpay.local:9092",
        value_serializer=lambda d: json.dumps(d).encode(),
        acks="all",
    )
    rejects = "card.txns.dlq"
    sink    = "card.txns.clean"

    for msg in consumer:
        try:
            ev = parse(msg.value)
        except json.JSONDecodeError as e:
            producer.send(rejects, {"raw": msg.value.decode("utf-8","replace"),
                                    "reason": f"json: {e}"})
            consumer.commit()
            continue

        ev, reason = validate(ev)
        if reason:
            producer.send(rejects, {"raw": msg.value.decode("utf-8","replace"),
                                    "reason": reason})
            consumer.commit()
            continue

        if is_test(ev):                                          # filter
            consumer.commit()
            continue

        ev = redact(ev)                                          # map
        ev = enrich(ev)                                          # map
        producer.send(sink, ev)
        producer.flush()                                         # before commit
        consumer.commit()                                        # at-least-once

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
    main()
# Sample run, against a Kafka with 6 events on `card.txns.raw`:
$ python stateless_prefix.py
sink   <- {"txn_id":"t_001","amount":2499,"merchant":"swiggy","card_bin":"411111","event_time":"2026-04-25T19:14:08","bank":"HDFC","card_type":"credit","user_hash":"a91c4f12e0b8d3a7"}
sink   <- {"txn_id":"t_002","amount":150,"merchant":"bookmyshow","card_bin":"412345","event_time":"2026-04-25T19:14:09","bank":"ICICI","card_type":"debit","user_hash":"7d9e2b14a8c5f021"}
dlq    <- {"raw":"{\"txn_id\":\"t_003\"...","reason":"schema: missing {'amount'}"}
sink   <- {"txn_id":"t_004","amount":89999,"merchant":"flipkart","card_bin":"421777","bank":"SBI","card_type":"credit","user_hash":"3c8a5d72e9b1f604","event_time":"2026-04-25T19:14:11"}
(skip)  merchant=merch_test_001                               # filtered out
dlq    <- {"raw":"not-json{","reason":"json: Expecting value: line 1 column 1"}

The five lines that decide everything:

This is a complete stateless prefix. It scales horizontally (more replicas, more partitions consumed in parallel, no coordination), it survives restart trivially (re-read from committed offset, replay), and it composes with the next operator without leaking state across the boundary. It is also the boring part of the job — and that boredom is the goal.

Why "the easy part" is worth a chapter

A team that fully internalises the stateless/stateful split avoids three classes of bugs that recur in production.

The boundary between the stateless prefix and the stateful body of a streaming jobA long pipeline diagram with three regions: the stateless prefix on the left (parse, validate, filter, enrich), a keyBy operator at the boundary, and the stateful body on the right (window, join, aggregate). Operational concerns are annotated below each region. Most jobs have a stateless prefix and a stateful body — keep the boundary clean stateless prefix parse → validate → filter → redact → enrich restart-safe, scale-out-safe, no checkpoint keyBy user_hash stateful body window → aggregate → join RocksDB + checkpoints + watermarks operational profile restart in seconds scale by adding replicas monitor: lag, throughput, error rate operational profile restart in minutes (state restore) scale via rescale + state migration monitor: state size, checkpoint age, watermark lag
The boundary is a real architectural feature. Operations on the prefix and the body have different shapes — restart time, scaling story, monitoring metrics. Drawing the line clearly in code (a separate operator chain, a separate keyBy boundary) lets you reason about each side with the right primitives.

The first class of bug is accidental state in a stateless operator. A common Razorpay anti-pattern: someone adds a "rate-limit log warnings to once per minute" guard inside what looks like a map. The guard uses a class-level dict keyed by warning type. The dict is state. The operator is now stateful by accident — a restart resets the dict, log volume spikes for a minute, the SRE on-call gets paged for "log flood after deploy". The fix is to either move the rate-limiting outside the operator (Loki / Promtail-side), or to acknowledge it is state and put it in the runtime's state backend. The lesson: a stateless operator that "just" needs a tiny dict is no longer stateless, and the production behaviour reflects that.

The second class is placing stateful work in the prefix. Because stateful operators have larger restart times and scale-out costs, putting them earlier in the chain than necessary increases the size of the "expensive part of the job". A pipeline whose first operator is a per-user dedup, before any filter, has to keep state for every user — including the ones whose events would have been filtered out two operators later. Moving the filter and projection upstream of the keyed operator routinely halves state size and checkpoint cost. The architectural rule of thumb: do all stateless work first, narrow the stream as much as possible, then keyBy and start being stateful.

The third class is re-implementing what the runtime already does, badly. Apache Beam's PTransform, Flink's DataStream, Kafka Streams' KStream, Spark Structured Streaming's Dataset all treat map, filter, flatMap as primitives — they're optimised, fused, and parallelised by the runtime. A team that bypasses the streaming SDK to "just write a Kafka consumer with these three steps" gives up the runtime's record-batch fusion (which can run several stateless operators in one CPU pass), the runtime's parallelism story, and the runtime's downstream integration. There are scenarios where bypassing the SDK is justified — extreme low-latency, license constraints, deployment shape — but every team that has done it past 5k events/sec has eventually rebuilt a chunk of what the SDK already did.

Indian-context examples — five real stateless prefixes

The pattern is universal but the workloads are specific. Five Indian teams' production stateless prefixes, with the boundary explicitly annotated:

Workload Stateless prefix (always) Stateful body (where it starts)
Razorpay card events parse JSON → validate schema → drop test merchants → redact user_id → enrich BIN→bank per-user fraud aggregation (5 min window)
Swiggy delivery pings decode protobuf → drop low-accuracy GPS → project (lat,lon,partner_id,ts) per-partner ETA computation (sliding 60s)
Zerodha tick stream parse FIX → filter to tradable instruments → enrich symbol→ISIN per-instrument candle building (1 min OHLC)
PhonePe UPI events decode JSON → drop heartbeats → mask VPA → tag (P2P/P2M) per-merchant settlement aggregates (15 min)
Dream11 score events parse Avro → filter to active matches → enrich match→contests per-contest leaderboard (real-time)

Each prefix is 4–6 operators, every one of them stateless, all running before the first keyBy. The number of events typically drops by 30–60% across the prefix (test merchants dropped, heartbeats dropped, fields projected away, nulls filtered) — meaning the stateful body sees a much smaller, cleaner stream than the source. A measurement from one Razorpay team's pre-/post-refactor numbers: moving the test-merchant filter from after keyBy to before it cut RocksDB state size by 22% and reduced checkpoint duration from 41 s to 28 s, with no behaviour change. The bug class isn't a bug — it is leaving free wins on the table.

Common confusions

Going deeper

Why operator fusion is the real performance win

When a runtime knows that parse → filter → map → map → filter is all stateless, it can fuse the chain into a single function that runs in one CPU pass per record, with no inter-operator queueing or context switching. Flink calls this "operator chaining"; Spark's Catalyst optimiser does it via expression-level fusion in Dataset operations; Kafka Streams calls it "topology fusion". Why fusion is dramatic: every operator boundary in a non-fused pipeline costs a queue enqueue, a queue dequeue, a possible thread handoff, and a CPU cache miss on the next operator's locals. A 4-operator stateless chain that runs at 200k events/sec/core unfused commonly runs at 700k events/sec/core fused — a 3.5× win, just from the runtime knowing the chain has no inter-operator state. This is why declaring operators as stateless when they are stateless is not a stylistic choice but a performance one.

The fusion win is also why "moving filters upstream" pays off twice — the filter shrinks the stream (fewer records to subsequent operators), and the fused chain processes the surviving records faster (no boundary cost). On Razorpay's prefix, the fused throughput per core was measured at 1.2M events/sec; the unfused version, with a groupByKey mistakenly inserted between filter and map, dropped to 280k.

The functional-programming heritage and where it leaks

map, filter, and flatMap are the same operators as in Lisp, ML, Haskell, and every functional language since the 1960s. Streaming runtimes adopted them because the operator algebra — map.compose(map) = map, filter.compose(filter) = filter (with conjunction), flatMap.compose(flatMap) = flatMap — gives the optimiser a closed system to reason about. This is the same algebra Spark uses to fuse logical plans, and the same algebra Materialize uses to incrementally maintain SQL views. The leaky part: the algebra assumes f is referentially transparent — calling it with the same input always produces the same output. In a stream, a "stateless" map that reads the current wall clock is not referentially transparent (f(e) depends on e and time.time()), and the runtime's optimiser will produce wrong answers on replay. The discipline is to keep f purely a function of e plus immutable startup config; anything that varies during runtime is state in disguise.

What about side effects — is logging stateless?

Logging, metric emission, and external HTTP calls are technically side effects, and a "pure" stateless operator forbids them. In practice, runtimes are pragmatic: a map that increments a Prometheus counter or writes a log line is treated as stateless because the side effect is out-of-band — not part of the data plane that the runtime's correctness story protects. The pragmatic rule: side effects are fine if (a) they are idempotent (incrementing a counter is fine, "send email" is not), and (b) they don't influence the function's output. A map that sends an HTTP request and uses the response in its output is stateful in spirit and brittle in practice — a network blip changes the output, replays re-call the endpoint, and the operator's correctness depends on a system the runtime doesn't own. Keep external calls behind a separate, idempotent sink at the end of the chain; don't bury them in a map.

When stateless operators run hot — CPU-bound prefixes

A surprising production reality: stateless prefixes are often the CPU bottleneck of a streaming job, not the stateful body. Parsing JSON or Protobuf at 100k events/sec/core saturates a CPU on most hardware; per-event SHA-256 hashing for redaction adds another bottleneck; format conversion (JSON → Avro) is allocation-heavy. The stateful body, which sees a smaller stream (after filtering and projection) and does mostly state lookups, is often cheaper per-record. The corollary: when scaling a streaming job, look at the stateless prefix's CPU profile first — switching from json.loads to orjson, replacing per-event string concatenation with bytes operations, and pre-compiling regex patterns commonly produces 2–4× headroom without touching the stateful operators. The headline-grabbing complexity is in the stateful body; the bill is often in the prefix.

Where this leads next

The next four chapters in Build 8 leave stateless behind and walk through the four pillars of stateful streaming. /wiki/state-stores-why-rocksdb-is-in-every-streaming-engine explains why every streaming runtime ends up with a RocksDB-shaped backend, and what an LSM tree gives a stream processor that a hash map cannot. /wiki/windowing-tumbling-sliding-session covers the three window shapes and how each one accumulates state. /wiki/event-time-vs-processing-time-the-whole-ballgame — the chapter Akidau called "the whole ballgame" — separates the two clocks every streaming job has to reason about. /wiki/watermarks-how-we-know-were-done closes the loop on "when do we emit a result for a window we may still see late events for".

The mental model to take into those chapters: the stateless prefix is the part of your job that is correct by construction, scales by replication, and recovers in seconds. Everything past the first keyBy is the part where streaming earns its reputation for difficulty. Drawing that boundary deliberately — knowing which operators are on which side — is the first move that separates pipelines that survive a 4 a.m. incident from pipelines that don't.

Build 8 spends the next ten chapters on the right side of the boundary. /wiki/wall-exactly-once-is-a-lie-everyone-tells closes the build with the honest version of the exactly-once promise.

References