Stateless operators: map, filter, the easy part
It is 04:11 on a Tuesday at a Bengaluru fintech. Riya, the on-call engineer, gets paged: the card-events Flink job is in RESTARTING and the upstream Kafka lag is at 1.4 million events. She opens the Grafana dashboard, sees that the only operators that have backed up are the stateful aggregators, and that the stateless parseJson → filterByCountry → enrichBin pipeline at the front of the job restarted in 11 seconds without re-reading anything except its committed offset. She fixes the aggregator's RocksDB allocation, the cluster recovers, and she goes back to bed at 04:38. The reason her night was 27 minutes and not 4 hours is that the front of her job was stateless, and stateless operators have a property the rest of streaming spends entire books trying to recover: they cannot be wrong on restart.
A stateless operator is a pure function from one input event to zero or more output events — map, filter, flatMap, schema validation, projection, format conversion. Because the operator carries no memory between events, restarting it just means resuming reads from the last committed offset; correctness comes for free. The hard parts of streaming (windows, joins, watermarks, exactly-once) only appear when an operator has to remember something across events. Knowing exactly which operators are stateless — and resisting the urge to "just add a small dict" — is what keeps the easy 5% of your pipeline from contaminating the hard 95%.
What "stateless" actually means
A stateless operator is a function f such that the output for event e_i depends only on e_i. Not on any earlier event, not on any wall-clock time, not on any external mutable lookup. Formally:
f(e_i) = output_i # depends on e_i alone
state(after e_i) = state(before e_i) # i.e. nothing
The three common stateless operators are map, filter, and flatMap. A fourth — keyBy in Flink, groupByKey in Spark — is technically stateless too: it doesn't carry memory, it just routes events to the operator instance responsible for that key. After keyBy, the next operator is usually stateful, but keyBy itself isn't.
The defining property is the absence of that loopback arrow. Why this matters for restart: when the operator has no internal state, a restart is just "open the source from the last committed offset and call f again". There is no "warming up", no "rebuilding the dict", no "waiting for the watermark to advance" — the operator is conceptually identical at offset N before and after the restart, because nothing changed about it across events. This is the property Riya's Flink job leaned on at 04:11.
The standard stateless operators in production
A working pipeline that consumes from Kafka and writes to a downstream sink almost always has a stateless prefix and a (possibly empty) stateful body. The prefix does the boring, parallelisable, restart-safe work. Knowing exactly which work is in the prefix prevents the mistake of accidentally introducing state into it.
The complete list:
map— one event in, exactly one event out. Examples: parse the JSON body, rename a field, convert a unit, compute a hash for downstream routing, redact a PII column, attach a fixed enrichment looked up from a static config.filter— keep events that satisfy a predicate; drop the rest. Examples: drop test merchants, drop events withamount < 0, drop events from regions the consumer doesn't serve, drop heartbeat pings.flatMap— one event in, zero or more out. Examples: split a multi-line log entry into one event per line, expand a "basket-of-items" event into one event per item, emit "begin" + "end" events from a single transaction event.- Projection (
selectin SQL terms) — keep a subset of fields. Strictly a special case ofmap, but called out separately because it appears in every pipeline and changes the on-wire size dramatically. - Schema validation — run each event through a schema check; route bad ones to a dead-letter queue, pass good ones through. Implemented as
flatMapwith two outputs: the main stream and a side output for rejects. - Format conversion — JSON → Avro, Avro → Parquet-row, Protobuf → Iceberg-row. A wide
map. The runtime treats it as stateless even though it does CPU work; there is no memory of earlier events. - Static enrichment —
event.merchant_idjoined against a small read-only table that the operator loads at startup (BIN ranges, country code lookups, currency tables). The lookup table is data, but it is not state in the streaming sense: it is read-only, identical across all operator instances, and reloaded on a restart. Why this is still stateless: the runtime's correctness story doesn't change. Restart the operator, reload the static table from the same source, replay events from the committed offset — same outputs. The lookup is a pure function of input + config, exactly like a CPU's instruction decoder.
What is not in this list, and why each one isn't: dedup needs to remember "have I seen this id?" — state. Aggregation (count, sum, avg) needs a running value — state. Joins need at least one side's events held in memory — state. Anything windowed — state. Anything "compared to the previous event" — state. Anything that emits "first occurrence of X" — state.
Building a stateless prefix from scratch
Rather than treat stateless operators as a Flink configuration detail, write one end-to-end. The pipeline below reads JSON card-transaction events from Kafka, parses them, validates a schema, drops test merchants, redacts PII, and writes the transformed event to a downstream topic. There is no aggregation, no window, no join — and that is the point. Re-running it from any offset, by any replica, on any number of partitions, produces the same per-record outputs.
# stateless_prefix.py — runs continuously; restart-safe by construction
from kafka import KafkaConsumer, KafkaProducer
import json, hashlib, sys, signal
# Static enrichment loaded once at startup. Read-only. Re-loaded on restart.
BIN_TO_BANK = {
"411111": ("HDFC", "credit"), "412345": ("ICICI", "debit"),
"421777": ("SBI", "credit"), "476173": ("Axis", "debit"),
}
TEST_MERCHANTS = {"merch_test_001", "merch_test_002", "merch_demo"}
REQUIRED_FIELDS = {"txn_id", "user_id", "amount", "merchant", "card_bin", "event_time"}
def parse(raw_bytes):
return json.loads(raw_bytes) # map: bytes -> dict
def validate(ev):
missing = REQUIRED_FIELDS - ev.keys()
if missing:
return None, f"schema: missing {missing}" # rejected
if not isinstance(ev["amount"], (int, float)) or ev["amount"] <= 0:
return None, "schema: amount must be positive number"
return ev, None
def is_test(ev):
return ev["merchant"] in TEST_MERCHANTS # filter predicate
def redact(ev):
# map: hash the user_id with a project-wide salt for downstream join keys
ev["user_hash"] = hashlib.sha256(("padho:" + ev["user_id"]).encode()).hexdigest()[:16]
del ev["user_id"]
return ev
def enrich(ev):
bank, typ = BIN_TO_BANK.get(ev["card_bin"], ("UNKNOWN", "unknown"))
ev["bank"], ev["card_type"] = bank, typ # map: static lookup
return ev
def main():
consumer = KafkaConsumer(
"card.txns.raw",
bootstrap_servers="kafka.razorpay.local:9092",
group_id="stateless-prefix",
auto_offset_reset="earliest",
enable_auto_commit=False, # we commit after produce
value_deserializer=lambda b: b, # raw bytes; parse below
)
producer = KafkaProducer(
bootstrap_servers="kafka.razorpay.local:9092",
value_serializer=lambda d: json.dumps(d).encode(),
acks="all",
)
rejects = "card.txns.dlq"
sink = "card.txns.clean"
for msg in consumer:
try:
ev = parse(msg.value)
except json.JSONDecodeError as e:
producer.send(rejects, {"raw": msg.value.decode("utf-8","replace"),
"reason": f"json: {e}"})
consumer.commit()
continue
ev, reason = validate(ev)
if reason:
producer.send(rejects, {"raw": msg.value.decode("utf-8","replace"),
"reason": reason})
consumer.commit()
continue
if is_test(ev): # filter
consumer.commit()
continue
ev = redact(ev) # map
ev = enrich(ev) # map
producer.send(sink, ev)
producer.flush() # before commit
consumer.commit() # at-least-once
if __name__ == "__main__":
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
main()
# Sample run, against a Kafka with 6 events on `card.txns.raw`:
$ python stateless_prefix.py
sink <- {"txn_id":"t_001","amount":2499,"merchant":"swiggy","card_bin":"411111","event_time":"2026-04-25T19:14:08","bank":"HDFC","card_type":"credit","user_hash":"a91c4f12e0b8d3a7"}
sink <- {"txn_id":"t_002","amount":150,"merchant":"bookmyshow","card_bin":"412345","event_time":"2026-04-25T19:14:09","bank":"ICICI","card_type":"debit","user_hash":"7d9e2b14a8c5f021"}
dlq <- {"raw":"{\"txn_id\":\"t_003\"...","reason":"schema: missing {'amount'}"}
sink <- {"txn_id":"t_004","amount":89999,"merchant":"flipkart","card_bin":"421777","bank":"SBI","card_type":"credit","user_hash":"3c8a5d72e9b1f604","event_time":"2026-04-25T19:14:11"}
(skip) merchant=merch_test_001 # filtered out
dlq <- {"raw":"not-json{","reason":"json: Expecting value: line 1 column 1"}
The five lines that decide everything:
enable_auto_commit=False— we drive commits manually, after the produce, so a crash mid-message replays the event rather than skipping it. Combined with at-least-once produce semantics, the worst-case outcome is duplicate records downstream — which is acceptable here because every operator in the chain is idempotent over the input record. Why this works without a transaction: every transformation in the chain is a pure function of one input record. If we re-process record N after a crash, the resultingparse → validate → redact → enrichoutputs are byte-identical to the previous run; downstream consumers can dedup ontxn_idif needed. There is no running counter to corrupt, no aggregate to double-count.producer.flush()beforeconsumer.commit()— the produce must be visible to brokers before we mark the input offset consumed. Reversing the order means crashes can lose records. (For exactly-once across produce + commit, you need transactional producers — the topic of /wiki/wall-exactly-once-is-a-lie-everyone-tells.)BIN_TO_BANKis a module-level dict — loaded once at startup, never mutated. This is enrichment, not state. A restart re-loads it from the same source; identical operators on different replicas hold identical copies.hashlib.sha256(("padho:" + user_id).encode())— deterministic. Sameuser_id→ sameuser_hash, in every replica, on every replay. Why this matters for downstream joins: the next stateful job that windows byuser_hashdoesn't care which replica produced the hash, because the hash is a function of input alone. If the hash had any per-instance entropy (e.g. a random seed), you'd have re-introduced state by accident, and downstream windows would split a single user across instances.for msg in consumer:— there is no inter-event memory. The variableevis rebound on every iteration; nothing accumulates.
This is a complete stateless prefix. It scales horizontally (more replicas, more partitions consumed in parallel, no coordination), it survives restart trivially (re-read from committed offset, replay), and it composes with the next operator without leaking state across the boundary. It is also the boring part of the job — and that boredom is the goal.
Why "the easy part" is worth a chapter
A team that fully internalises the stateless/stateful split avoids three classes of bugs that recur in production.
The first class of bug is accidental state in a stateless operator. A common Razorpay anti-pattern: someone adds a "rate-limit log warnings to once per minute" guard inside what looks like a map. The guard uses a class-level dict keyed by warning type. The dict is state. The operator is now stateful by accident — a restart resets the dict, log volume spikes for a minute, the SRE on-call gets paged for "log flood after deploy". The fix is to either move the rate-limiting outside the operator (Loki / Promtail-side), or to acknowledge it is state and put it in the runtime's state backend. The lesson: a stateless operator that "just" needs a tiny dict is no longer stateless, and the production behaviour reflects that.
The second class is placing stateful work in the prefix. Because stateful operators have larger restart times and scale-out costs, putting them earlier in the chain than necessary increases the size of the "expensive part of the job". A pipeline whose first operator is a per-user dedup, before any filter, has to keep state for every user — including the ones whose events would have been filtered out two operators later. Moving the filter and projection upstream of the keyed operator routinely halves state size and checkpoint cost. The architectural rule of thumb: do all stateless work first, narrow the stream as much as possible, then keyBy and start being stateful.
The third class is re-implementing what the runtime already does, badly. Apache Beam's PTransform, Flink's DataStream, Kafka Streams' KStream, Spark Structured Streaming's Dataset all treat map, filter, flatMap as primitives — they're optimised, fused, and parallelised by the runtime. A team that bypasses the streaming SDK to "just write a Kafka consumer with these three steps" gives up the runtime's record-batch fusion (which can run several stateless operators in one CPU pass), the runtime's parallelism story, and the runtime's downstream integration. There are scenarios where bypassing the SDK is justified — extreme low-latency, license constraints, deployment shape — but every team that has done it past 5k events/sec has eventually rebuilt a chunk of what the SDK already did.
Indian-context examples — five real stateless prefixes
The pattern is universal but the workloads are specific. Five Indian teams' production stateless prefixes, with the boundary explicitly annotated:
| Workload | Stateless prefix (always) | Stateful body (where it starts) |
|---|---|---|
| Razorpay card events | parse JSON → validate schema → drop test merchants → redact user_id → enrich BIN→bank | per-user fraud aggregation (5 min window) |
| Swiggy delivery pings | decode protobuf → drop low-accuracy GPS → project (lat,lon,partner_id,ts) | per-partner ETA computation (sliding 60s) |
| Zerodha tick stream | parse FIX → filter to tradable instruments → enrich symbol→ISIN | per-instrument candle building (1 min OHLC) |
| PhonePe UPI events | decode JSON → drop heartbeats → mask VPA → tag (P2P/P2M) | per-merchant settlement aggregates (15 min) |
| Dream11 score events | parse Avro → filter to active matches → enrich match→contests | per-contest leaderboard (real-time) |
Each prefix is 4–6 operators, every one of them stateless, all running before the first keyBy. The number of events typically drops by 30–60% across the prefix (test merchants dropped, heartbeats dropped, fields projected away, nulls filtered) — meaning the stateful body sees a much smaller, cleaner stream than the source. A measurement from one Razorpay team's pre-/post-refactor numbers: moving the test-merchant filter from after keyBy to before it cut RocksDB state size by 22% and reduced checkpoint duration from 41 s to 28 s, with no behaviour change. The bug class isn't a bug — it is leaving free wins on the table.
Common confusions
- "map and filter are the same thing because filter is just map with a None output." They share an interface but the semantics differ — filter is a cardinality-changing operator, and the runtime treats the boundary differently for fusion and parallelism. A
filterfollowed bymapcan be fused into one record-pass; twomaps cannot necessarily be (depending on the runtime). Don't collapse the distinction. - "Stateless operators don't need monitoring." They need lag and throughput monitoring like any operator. What they don't need is state-size, checkpoint-age, and watermark-lag monitoring — those metrics are zero or undefined for a stateless operator. The simpler monitoring shape is part of why stateless is operationally cheaper, not why it can be ignored.
- "Static enrichment lookups are state." They are read-only data, not state. State in the streaming sense is something the operator wrote that has to survive restart. A static lookup table reloaded from a config source on restart is not state — it is the same constant data, every time. The distinction matters because runtime checkpoints don't try to capture it (correctly so).
- "Stateless operators are exactly-once for free." They're exactly-once for processing (since the function is pure, replaying the same input produces the same output), but they're not exactly-once for delivery — a crash mid-produce can still publish duplicates downstream. End-to-end exactly-once requires a transactional producer or an idempotent sink, even when every operator is stateless.
- "You can convert a stateful operator to stateless by storing state in Redis." Storing state outside the operator does not make the operator stateless; it moves the state to a different location. The runtime's checkpoint protocol no longer captures it, which is usually worse (state and offsets can drift). "Stateless" means no memory between events, not "memory but stored elsewhere".
- "All ETL transformations are stateless." Most projection and cleaning transformations are. Deduplication, slowly-changing-dimension lookups, and window-based aggregations are not — and those appear in nearly every real ETL job. "ETL is stateless" is a useful first approximation that breaks the moment you need to dedup an upstream replay.
Going deeper
Why operator fusion is the real performance win
When a runtime knows that parse → filter → map → map → filter is all stateless, it can fuse the chain into a single function that runs in one CPU pass per record, with no inter-operator queueing or context switching. Flink calls this "operator chaining"; Spark's Catalyst optimiser does it via expression-level fusion in Dataset operations; Kafka Streams calls it "topology fusion". Why fusion is dramatic: every operator boundary in a non-fused pipeline costs a queue enqueue, a queue dequeue, a possible thread handoff, and a CPU cache miss on the next operator's locals. A 4-operator stateless chain that runs at 200k events/sec/core unfused commonly runs at 700k events/sec/core fused — a 3.5× win, just from the runtime knowing the chain has no inter-operator state. This is why declaring operators as stateless when they are stateless is not a stylistic choice but a performance one.
The fusion win is also why "moving filters upstream" pays off twice — the filter shrinks the stream (fewer records to subsequent operators), and the fused chain processes the surviving records faster (no boundary cost). On Razorpay's prefix, the fused throughput per core was measured at 1.2M events/sec; the unfused version, with a groupByKey mistakenly inserted between filter and map, dropped to 280k.
The functional-programming heritage and where it leaks
map, filter, and flatMap are the same operators as in Lisp, ML, Haskell, and every functional language since the 1960s. Streaming runtimes adopted them because the operator algebra — map.compose(map) = map, filter.compose(filter) = filter (with conjunction), flatMap.compose(flatMap) = flatMap — gives the optimiser a closed system to reason about. This is the same algebra Spark uses to fuse logical plans, and the same algebra Materialize uses to incrementally maintain SQL views. The leaky part: the algebra assumes f is referentially transparent — calling it with the same input always produces the same output. In a stream, a "stateless" map that reads the current wall clock is not referentially transparent (f(e) depends on e and time.time()), and the runtime's optimiser will produce wrong answers on replay. The discipline is to keep f purely a function of e plus immutable startup config; anything that varies during runtime is state in disguise.
What about side effects — is logging stateless?
Logging, metric emission, and external HTTP calls are technically side effects, and a "pure" stateless operator forbids them. In practice, runtimes are pragmatic: a map that increments a Prometheus counter or writes a log line is treated as stateless because the side effect is out-of-band — not part of the data plane that the runtime's correctness story protects. The pragmatic rule: side effects are fine if (a) they are idempotent (incrementing a counter is fine, "send email" is not), and (b) they don't influence the function's output. A map that sends an HTTP request and uses the response in its output is stateful in spirit and brittle in practice — a network blip changes the output, replays re-call the endpoint, and the operator's correctness depends on a system the runtime doesn't own. Keep external calls behind a separate, idempotent sink at the end of the chain; don't bury them in a map.
When stateless operators run hot — CPU-bound prefixes
A surprising production reality: stateless prefixes are often the CPU bottleneck of a streaming job, not the stateful body. Parsing JSON or Protobuf at 100k events/sec/core saturates a CPU on most hardware; per-event SHA-256 hashing for redaction adds another bottleneck; format conversion (JSON → Avro) is allocation-heavy. The stateful body, which sees a smaller stream (after filtering and projection) and does mostly state lookups, is often cheaper per-record. The corollary: when scaling a streaming job, look at the stateless prefix's CPU profile first — switching from json.loads to orjson, replacing per-event string concatenation with bytes operations, and pre-compiling regex patterns commonly produces 2–4× headroom without touching the stateful operators. The headline-grabbing complexity is in the stateful body; the bill is often in the prefix.
Where this leads next
The next four chapters in Build 8 leave stateless behind and walk through the four pillars of stateful streaming. /wiki/state-stores-why-rocksdb-is-in-every-streaming-engine explains why every streaming runtime ends up with a RocksDB-shaped backend, and what an LSM tree gives a stream processor that a hash map cannot. /wiki/windowing-tumbling-sliding-session covers the three window shapes and how each one accumulates state. /wiki/event-time-vs-processing-time-the-whole-ballgame — the chapter Akidau called "the whole ballgame" — separates the two clocks every streaming job has to reason about. /wiki/watermarks-how-we-know-were-done closes the loop on "when do we emit a result for a window we may still see late events for".
The mental model to take into those chapters: the stateless prefix is the part of your job that is correct by construction, scales by replication, and recovers in seconds. Everything past the first keyBy is the part where streaming earns its reputation for difficulty. Drawing that boundary deliberately — knowing which operators are on which side — is the first move that separates pipelines that survive a 4 a.m. incident from pipelines that don't.
Build 8 spends the next ten chapters on the right side of the boundary. /wiki/wall-exactly-once-is-a-lie-everyone-tells closes the build with the honest version of the exactly-once promise.
References
- The Dataflow Model — Akidau et al., 2015 — the paper that formalised the stateless/stateful split for streaming.
- Apache Flink — Operators documentation — Flink's catalogue of stateless and stateful operators with semantics.
- Kafka Streams — Stateless transformations — the canonical list for the embedded-runtime model.
- Streaming 101 — Tyler Akidau, O'Reilly — the original primer that coined "the easy part" for stateless.
- Apache Beam — PTransforms — Beam's portable operator vocabulary, with explicit stateless/stateful distinction.
- Operator chaining in Flink — Confluent blog — why fusion of stateless operators is a runtime concern.
- /wiki/wall-stateless-stream-processing-isnt-enough — the previous chapter; the wall this chapter sits at the foot of.
- /wiki/consumer-groups-and-offset-management — the Build 7 chapter on offset commits the prefix relies on for restart safety.