End-to-end: source → process → sink
A Razorpay refunds processor reads each settlement event from a settlements topic, computes a refund record, writes it to refunds-emitted, writes an audit row to refund-audit, and advances its consumer offset on the source. The processor crashes mid-loop — kernel OOM, a kubelet restart, a deploy that interrupts the JVM. On restart, the system must produce exactly the same downstream effect as if the crash never happened: every settlement triggers exactly one refund, no more, no less. The previous two chapters covered the building blocks — idempotent producers (per-partition dedup) and transactional writes (multi-partition atomicity). This chapter wires them into the end-to-end loop.
End-to-end exactly-once is the consumer-process-produce pattern wrapped in one Kafka transaction. Reads happen in read_committed mode so input is stable. Outputs and the source offset commit are bundled into the same transaction via sendOffsetsToTransaction. On restart, the consumer reads the last committed offset and replays the partial transaction's input — but the producer's idempotent dedup plus the transactional all-or-nothing make the replay invisible to downstream.
The closed loop, in one transaction
The processor's main loop is three operations and a commit: poll, transform, produce, commit-offset. Without transactions, each line is a separate failure surface. With them, the four operations collapse into one atomic step.
The reason this loop is correct: the source offset commit is itself a write to a Kafka topic (__consumer_offsets), and Kafka transactions span all topic-partition writes including that one. So advancing the consumer's progress and producing the outputs are the same atomic operation. Why this dissolves the duplicate problem: a duplicate happens when output is produced but the offset isn't committed, so restart re-processes the input and produces it again. With offsets and outputs in one transaction, either both happen or neither does — there's no state where one is committed and the other isn't.
The processing loop in working code
Here is the closed-loop pattern in Python with confluent-kafka. It reads settlements, emits a refund and audit row, and commits the source offset, all within one transaction. Run it twice in a row with a forced crash in the middle and the output is identical to a single clean run.
# refunds_processor.py — exactly-once consumer-process-produce
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaException
import json, signal, sys
CONSUMER_CFG = {
"bootstrap.servers": "kafka.razorpay-test.internal:9092",
"group.id": "refunds-processor",
"isolation.level": "read_committed", # don't see uncommitted upstream txns
"enable.auto.commit": False, # we will commit via the producer
"auto.offset.reset": "earliest",
}
PRODUCER_CFG = {
"bootstrap.servers": "kafka.razorpay-test.internal:9092",
"transactional.id": "refunds-7", # stable per shard, NOT per pod
"enable.idempotence": True, # implied by transactional.id, but explicit
"acks": "all",
"max.in.flight.requests.per.connection": 5,
}
consumer = Consumer(CONSUMER_CFG)
producer = Producer(PRODUCER_CFG)
producer.init_transactions(timeout=30) # claim PID, bump epoch, fence zombies
consumer.subscribe(["settlements"])
def compute_refund(settlement: dict) -> tuple[dict, dict]:
"""Pure function: settlement → (refund record, audit row). Deterministic."""
refund = {
"refund_id": f"RFD-{settlement['settlement_id']}",
"merchant": settlement["merchant"],
"amount_paise": settlement["amount_paise"],
"currency": "INR",
}
audit = {
"refund_id": refund["refund_id"],
"actor": "system",
"reason": "settlement-reversed",
"ts": settlement["ts"],
}
return refund, audit
while True:
msg = consumer.poll(1.0)
if msg is None: continue
if msg.error(): raise KafkaException(msg.error())
settlement = json.loads(msg.value())
refund, audit = compute_refund(settlement)
producer.begin_transaction()
try:
producer.produce("refunds-emitted", json.dumps(refund).encode(),
key=refund["refund_id"].encode())
producer.produce("refund-audit", json.dumps(audit).encode(),
key=refund["refund_id"].encode())
# Bundle the source offset commit into the same transaction.
next_offsets = [TopicPartition(msg.topic(), msg.partition(), msg.offset() + 1)]
producer.send_offsets_to_transaction(next_offsets, consumer.consumer_group_metadata())
producer.commit_transaction(timeout=30)
except KafkaException:
producer.abort_transaction(timeout=30)
raise # let the supervisor restart us; the consumer will re-poll the same offset
Sample output for one settlement, then a forced crash, then a restart:
[run 1] polled settlements:7@4521 settlement_id=S-88231
[run 1] begin_transaction
[run 1] produce refunds-emitted RFD-S-88231
[run 1] produce refund-audit RFD-S-88231
[run 1] send_offsets_to_transaction settlements:7=4522
[run 1] commit_transaction OK
[run 1] polled settlements:7@4522 settlement_id=S-88232
[run 1] begin_transaction
[run 1] produce refunds-emitted RFD-S-88232
[run 1] produce refund-audit RFD-S-88232
[!! kernel-OOM, pod restarts, new pod with same transactional.id=refunds-7 ]
[run 2] init_transactions — bumped epoch, fenced previous (in-flight S-88232 aborted)
[run 2] polled settlements:7@4522 settlement_id=S-88232 # same offset re-polled
[run 2] begin_transaction
[run 2] produce refunds-emitted RFD-S-88232 # downstream sees this once
[run 2] produce refund-audit RFD-S-88232
[run 2] send_offsets_to_transaction settlements:7=4523
[run 2] commit_transaction OK
Walking through the load-bearing lines:
isolation.level=read_committedis non-negotiable for an exactly-once consumer. Without it, the processor would see records from upstream transactions that may later abort, and would emit refunds for settlements that never actually happened. Why this is the input-side guarantee: every link in the pipeline must beread_committedfor end-to-end exactly-once. The moment one consumer isread_uncommitted, ghosts from aborted upstream transactions leak into its outputs, and those outputs land downstream as real records — there's no way to retract them.enable.auto.commit=falsedisables the consumer's own offset committer. The transactional producer handles offset commits viasend_offsets_to_transaction. If both committers were active, you'd get two writes to__consumer_offsetsand the latter would silently override the transactional one.init_transactions()is the fencing step. It claims the PID, bumps the producer epoch, and cancels any in-flight transaction from a previous incarnation of the sametransactional.id. The fenced transaction's data records are still in the log but their COMMIT marker will never arrive —read_committedconsumers skip them as the eventual ABORT marker lands.send_offsets_to_transaction(next_offsets, group_metadata)is the magic. It tells the transaction coordinator: "as part of this transaction, also writesettlements:7 = 4522to__consumer_offsets". When thecommitTransactionlands, the offset commit is in the same atomic batch as the produces. Whyconsumer_group_metadatais required: the broker needs to know which consumer group's offsets are being moved, and it has to fence against simultaneous re-balances. The metadata carries the group's generation ID; if a rebalance has happened, the offset commit fails and the transaction aborts. This prevents two pods from both committing offsets for the same partition.abort_transaction()on exception is essential. It writes the ABORT marker to all participant partitions so thatread_committedconsumers skip the partial output. Without it, the data records would sit indefinitely until log retention cleared them, occupying disk and confusing operators reading the log directly.- The exception is re-raised so the supervisor (Kubernetes, systemd, whatever) restarts the process. On restart,
init_transactions()fences again, the consumer re-polls the same offset, and the loop resumes — idempotently.
The result: no duplicate RFD-S-88232 reaches refunds-emitted even though the producer wrote it once and was about to write it again. The first write lives in the partition log but its transaction was aborted, so read_committed consumers never see it.
Why each layer is necessary
The end-to-end guarantee is the product of three independent properties. Drop any one and exactly-once collapses.
| Layer | What it provides | What breaks without it |
|---|---|---|
Idempotent producer (enable.idempotence=true) |
Per-session, per-partition dedup of producer retries | Network retries duplicate records inside one partition; aborts can't reliably mask them |
Transactional producer (transactional.id set) |
Atomicity across multiple partitions including __consumer_offsets |
Outputs and offset commit decouple; crash between them = duplicate or loss |
read_committed on every downstream consumer |
Aborted records and in-flight transactions invisible | Aborted writes from upstream become real downstream — exactly-once leaks |
The interaction is subtle. Suppose you set transactional.id but not enable.idempotence. The Kafka client will refuse to start — transactions imply idempotence, so the broker enforces it. Suppose you set enable.idempotence but no transactional.id. You get per-partition dedup of retries but no cross-partition atomicity, so the offset and outputs can diverge. Suppose you set both on the producer but the downstream consumer is read_uncommitted. Aborted output records leak through and propagate. Why all three are needed: each layer addresses a different failure surface. Idempotence handles in-network retry duplicates. Transactions handle cross-partition crash atomicity. read_committed handles propagation of aborts. They are orthogonal — none can substitute for another.
The historical confusion in the streaming community comes from "exactly-once" being marketed by some vendors as a single switch. It is not. Confluent's own documentation, after KIP-98 shipped, was careful to call it "exactly-once processing semantics" and to enumerate the configuration prerequisites. When a Razorpay or Swiggy team adopts a streaming framework and ships a pipeline labelled "exactly-once" without auditing every consumer's isolation.level, the resulting system is at-least-once with extra steps. The first audit step in any production migration is to grep every consumer config in the codebase for isolation.level and verify it is read_committed everywhere downstream of a transactional producer.
Crash points and what each one looks like
The processor can crash at six identifiable points. Walk through each:
- Before
begin_transaction— no transaction open, no records written. Restart re-polls; clean. - After
begin_transaction, before anyproduce— empty open transaction.init_transactionson restart bumps the epoch and the empty transaction is implicitly aborted. Clean. - After first
produce, before second — partial output records in partition logs, all flagged with the open PID/epoch. Restart bumps the epoch; broker aborts the previous transaction; ABORT markers land;read_committedconsumers never saw the records (they were past the LSO). Clean. - After both
produces, beforesend_offsets_to_transaction— outputs written, offset not yet bundled. Same outcome as case 3: epoch bump, abort, markers, downstream sees nothing. Clean. - After
send_offsets_to_transaction, beforecommit_transaction— same as case 4. The offset commit is also part of the open transaction; the abort wipes it. - After
commit_transaction— the success case — outputs visible, offset advanced. Restart re-polls the next offset. Clean.
The interesting case is mid-commit_transaction: the producer sent EndTxnRequest to the coordinator and is waiting for the response. If the producer crashes here, the coordinator's __transaction_state log already has PrepareCommit (or it doesn't). If PrepareCommit is durable, the recovery path is forward — the coordinator finishes writing markers. If not, the recovery path is backward — abort. Either way, the producer's restart sees a consistent state. Why this is the 2PC commit point's payoff: the entire correctness argument reduces to "is the prepare-commit decision durable before any participant marker is written?" If yes, recovery always converges. Kafka's __transaction_state is replicated like any topic, so the durability is straightforward.
The point worth emphasising: there is no crash window where downstream consumers in read_committed mode observe a partial transaction. The visibility cliff is the COMMIT control batch. Either the consumer has read past it (in which case the whole transaction is visible) or it hasn't (in which case nothing is). Half-states cannot leak.
Common confusions
- "
read_committedis automatic when you use a transactional producer." No — it's a per-consumer setting. The producer's transactionality has zero effect on consumers unless the consumer explicitly setsisolation.level=read_committed. The default isread_uncommitted. Every downstream consumer in the pipeline must be configured. A single mis-configured consumer breaks the chain. - "Exactly-once means each event is processed exactly once." No — it means each event has exactly one externally visible effect. The processor may re-poll and re-process the same record many times after crashes; the consumer-process-produce loop ensures only one of those processings has visible output. Processing happens many times; downstream visibility happens once.
- "
sendOffsetsToTransactionandconsumer.commitdo the same thing." No —consumer.commitwrites to__consumer_offsetsoutside any transaction, immediately.sendOffsetsToTransactionincludes the offset write inside the producer's transaction so it commits atomically with the produces. Mixing them is a common bug: the consumer auto-commits in the background while the producer also commits, leading to duplicate offset advances and wrong replays after restart. - "This works the same for non-Kafka sinks." No — Kafka's transactionality covers writes to Kafka topics and Kafka's
__consumer_offsetsonly. A sink that writes to MySQL, S3, an HTTP webhook, or BigQuery cannot participate in Kafka's transaction. Those need a separate 2PC sink — typically Flink's two-phase-commit sink coordinator, covered in /wiki/flinks-two-phase-commit-sink. - "Transactions slow you down a lot." Per-record overhead is small (one extra control batch every commit), but transaction frequency is the real lever. Committing every record is expensive — every commit triggers
WriteTxnMarkersRequestto every participant partition. Real systems batch records into transactions of 10–10,000 records each, paying one commit cost per batch. The trade-off is tail latency: bigger batches mean later visibility forread_committedconsumers. - "
transactional.idshould be unique per process." No — it should be unique per logical shard. Two pods running the same shard ID is the failure mode you're protecting against. The protocol bumps the epoch and fences the older pod's transactions when the newer pod callsinit_transactions. If pods have unique IDs, this fencing never fires.
Going deeper
Kafka Streams hides all this for you
Kafka Streams (processing.guarantee=exactly_once_v2) wraps the consumer-process-produce loop in a higher-level DSL. You write stream.mapValues(...).to("output") and the framework configures the underlying transactional producer, sets the consumer to read_committed, batches records into transactions sized by commit.interval.ms, and handles init_transactions and abort logic. The default commit.interval.ms=100 means transactions commit every 100ms, giving sub-second downstream visibility. Reading the Kafka Streams source — specifically StreamThread.runOnce and TaskManager.commit — is the cleanest tour of the pattern in production-quality code; the high-level operations map directly to the manual code above. The same goes for ksqlDB, which uses Kafka Streams underneath. If you're building a new system in 2026, you almost certainly want one of these abstractions; the manual loop is for understanding what they do, and for the rare case where the DSL doesn't fit.
What processing.guarantee=exactly_once_v2 changed
The original Kafka Streams EOS (introduced with KIP-129 in 2017) used one transactional.id per task. A topology with 50 tasks meant 50 transactions per commit interval — 50 control batches per partition involved per commit. KIP-447 (Kafka 2.5+) changed this to use one transactional.id per stream thread, fencing using consumer group epoch instead of partition assignment. The result is a 10–50× reduction in transactional overhead for high-task topologies. The label exactly_once_v2 in newer Kafka clients selects the KIP-447 mode. If you're upgrading a Streams app from 2.4 to 2.5+, switching to v2 typically reclaims significant throughput at no semantic cost.
Why ₹2 crore in mis-attributed refunds is the real cost of getting this wrong
A Razorpay-scale processor handling 10 million settlements a day at an average refund value of ₹2,000 emits a refund stream with annualised value of about ₹7,300 crore. A 0.01% duplicate rate at this scale — well within what a non-transactional pipeline produces during normal pod-restart cadence — is ₹73 lakh of duplicated refund instructions per year, almost all of which then need to be reconciled by an ops team flagging them on the merchant statements. The reconciliation cost (engineer-hours, customer-success time, partial credit-note issuance) typically dwarfs the refund face value because the dispute trail crosses three or four systems and has to be reconciled within the merchant's GST filing cycle. Exactly-once semantics is not a theoretical nicety here — it's the difference between a refunds pipeline that runs unattended and one that needs three engineers babysitting it during deploys.
The Flink interpretation: checkpoints as transactions
Apache Flink's exactly-once mode uses Kafka transactions as the transactional commit primitive but binds them to Flink's checkpoint boundaries instead of per-record. A Flink job with a 30-second checkpoint interval opens a Kafka transaction at checkpoint N's start and commits it when checkpoint N completes successfully. The trade-off is severe end-to-end latency (downstream read_committed consumers see records ~30s late) bought against very small per-record overhead. If your Flink job has a 30-second checkpoint and you want sub-second downstream freshness, the answer is to shorten the checkpoint interval — at the cost of more I/O for state-backend snapshots. This is one of the central tuning knobs in production Flink at any of the Indian streaming-heavy companies (Dream11, Swiggy, Cred). The chapter on Flink's two-phase-commit sink expands the full picture.
Operational instrumentation: what to monitor in production
Three signals tell you the loop is healthy. First, transaction commit rate vs abort rate. The Kafka broker exposes kafka.server:type=transaction-coordinator-metrics with txn-commit-rate and txn-abort-rate. In a healthy refunds processor at Razorpay scale, abort rate is a tiny fraction of commit rate — perhaps one abort per few thousand commits, all attributable to deploys or genuine errors. A sudden rise in aborts means producers are crashing mid-transaction or init_transactions is fencing zombies, and you should look at pod restart cadence. Second, LSO lag — the gap between high-water mark and last stable offset on output partitions. A growing LSO lag means transactions are staying open longer than expected; for a refunds processor that should commit every record, an LSO lag above a few hundred milliseconds is a bug. Third, per-record processing latency p99 — measured from msg.timestamp() to the moment commit_transaction returns. This is your downstream-visible latency; a Razorpay-internal SLO of "p99 ≤ 2 seconds end-to-end" maps directly to this metric. Forgetting to instrument any of these is how production teams discover six months later that exactly-once was silently downgraded to at-least-once when someone toggled enable.auto.commit=true while debugging.
Where this leads next
The closed loop covered here is the foundation for every higher-level streaming abstraction:
- /wiki/kafka-streams-exactly-once-v2 — how Kafka Streams'
processing.guarantee=exactly_once_v2wraps this loop into a one-line config. - /wiki/flinks-two-phase-commit-sink — what happens when the sink is not Kafka. The 2PC pattern generalises but needs a sink-side coordinator.
- /wiki/end-to-end-latency-vs-throughput-tradeoff — the central tuning knob: transaction size and commit frequency directly trade throughput against
read_committedvisibility lag. - /wiki/sink-idempotency-keys-and-upserts — the alternative pattern for non-transactional sinks.
The pattern to internalise: end-to-end exactly-once is three configurations stacked, not one. Producer idempotence handles intra-partition retries; producer transactions handle cross-partition atomicity bundled with offset commit; consumer read_committed mode handles abort propagation. Build any one wrong and the guarantee silently collapses — usually in production, usually during a deploy, usually in a way that takes weeks to detect because the duplicates are rare.
References
- KIP-98: Exactly Once Delivery and Transactional Messaging — the original design; the
sendOffsetsToTransactionAPI is in section "Consumer Coordination". - KIP-447: Producer scalability for exactly once semantics — the v2 mode that scales transactional ID usage to one-per-thread.
- Confluent: Enabling Exactly-Once in Kafka Streams — the higher-level wrapping, with the
commit.interval.msknob explained. - Apache Kafka source:
TransactionalProducer.java— the client-side state machine; readable in one sitting. - Flink Documentation: End-to-End Exactly-Once with Kafka — the binding between Flink checkpoints and Kafka transactions.
- /wiki/transactional-writes-2pc-wearing-a-hat — the previous chapter; the protocol underneath the loop.
- /wiki/idempotent-producers-and-sequence-numbers — the layer-1 substrate.
- Confluent: Exactly-Once Semantics Are Possible — Here's How Kafka Does It — Neha Narkhede's design overview.