End-to-end: source → process → sink

A Razorpay refunds processor reads each settlement event from a settlements topic, computes a refund record, writes it to refunds-emitted, writes an audit row to refund-audit, and advances its consumer offset on the source. The processor crashes mid-loop — kernel OOM, a kubelet restart, a deploy that interrupts the JVM. On restart, the system must produce exactly the same downstream effect as if the crash never happened: every settlement triggers exactly one refund, no more, no less. The previous two chapters covered the building blocks — idempotent producers (per-partition dedup) and transactional writes (multi-partition atomicity). This chapter wires them into the end-to-end loop.

End-to-end exactly-once is the consumer-process-produce pattern wrapped in one Kafka transaction. Reads happen in read_committed mode so input is stable. Outputs and the source offset commit are bundled into the same transaction via sendOffsetsToTransaction. On restart, the consumer reads the last committed offset and replays the partial transaction's input — but the producer's idempotent dedup plus the transactional all-or-nothing make the replay invisible to downstream.

The closed loop, in one transaction

The processor's main loop is three operations and a commit: poll, transform, produce, commit-offset. Without transactions, each line is a separate failure surface. With them, the four operations collapse into one atomic step.

The exactly-once consumer-process-produce loopA Kafka stream-processing job reads a settlements record from the source topic in read_committed mode, computes a refund and audit row, writes both to output topics, then commits the source offset — all four operations bundled into one Kafka transaction. On crash, the next run reads the same source offset and replays, but the previous transaction's outputs were never made visible so no duplicates appear downstream. consumer-process-produce inside one transaction settlements source topic partition 7, offset 4521 refunds processor poll → transform → produce transactional.id=refunds-7 isolation=read_committed enable.idempotence=true refunds-emitted output topic refund-audit output topic __consumer_offsets internal, settlements:7=4522 poll one transaction wraps all four: beginTransaction() → produce(refunds) → produce(audit) → sendOffsetsToTransaction(4522) → commitTransaction() crash anywhere = none of them visible to downstream; restart re-polls offset 4521
The four operations the processor must do per record. Wrapping them in one Kafka transaction makes them a single atomic unit — either every downstream effect lands, or none does.

The reason this loop is correct: the source offset commit is itself a write to a Kafka topic (__consumer_offsets), and Kafka transactions span all topic-partition writes including that one. So advancing the consumer's progress and producing the outputs are the same atomic operation. Why this dissolves the duplicate problem: a duplicate happens when output is produced but the offset isn't committed, so restart re-processes the input and produces it again. With offsets and outputs in one transaction, either both happen or neither does — there's no state where one is committed and the other isn't.

The processing loop in working code

Here is the closed-loop pattern in Python with confluent-kafka. It reads settlements, emits a refund and audit row, and commits the source offset, all within one transaction. Run it twice in a row with a forced crash in the middle and the output is identical to a single clean run.

# refunds_processor.py — exactly-once consumer-process-produce
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaException
import json, signal, sys

CONSUMER_CFG = {
    "bootstrap.servers": "kafka.razorpay-test.internal:9092",
    "group.id": "refunds-processor",
    "isolation.level": "read_committed",   # don't see uncommitted upstream txns
    "enable.auto.commit": False,           # we will commit via the producer
    "auto.offset.reset": "earliest",
}

PRODUCER_CFG = {
    "bootstrap.servers": "kafka.razorpay-test.internal:9092",
    "transactional.id": "refunds-7",       # stable per shard, NOT per pod
    "enable.idempotence": True,            # implied by transactional.id, but explicit
    "acks": "all",
    "max.in.flight.requests.per.connection": 5,
}

consumer = Consumer(CONSUMER_CFG)
producer = Producer(PRODUCER_CFG)
producer.init_transactions(timeout=30)     # claim PID, bump epoch, fence zombies
consumer.subscribe(["settlements"])

def compute_refund(settlement: dict) -> tuple[dict, dict]:
    """Pure function: settlement → (refund record, audit row). Deterministic."""
    refund = {
        "refund_id": f"RFD-{settlement['settlement_id']}",
        "merchant": settlement["merchant"],
        "amount_paise": settlement["amount_paise"],
        "currency": "INR",
    }
    audit = {
        "refund_id": refund["refund_id"],
        "actor": "system",
        "reason": "settlement-reversed",
        "ts": settlement["ts"],
    }
    return refund, audit

while True:
    msg = consumer.poll(1.0)
    if msg is None: continue
    if msg.error(): raise KafkaException(msg.error())

    settlement = json.loads(msg.value())
    refund, audit = compute_refund(settlement)

    producer.begin_transaction()
    try:
        producer.produce("refunds-emitted", json.dumps(refund).encode(),
                         key=refund["refund_id"].encode())
        producer.produce("refund-audit", json.dumps(audit).encode(),
                         key=refund["refund_id"].encode())
        # Bundle the source offset commit into the same transaction.
        next_offsets = [TopicPartition(msg.topic(), msg.partition(), msg.offset() + 1)]
        producer.send_offsets_to_transaction(next_offsets, consumer.consumer_group_metadata())
        producer.commit_transaction(timeout=30)
    except KafkaException:
        producer.abort_transaction(timeout=30)
        raise   # let the supervisor restart us; the consumer will re-poll the same offset

Sample output for one settlement, then a forced crash, then a restart:

[run 1] polled settlements:7@4521 settlement_id=S-88231
[run 1] begin_transaction
[run 1] produce refunds-emitted RFD-S-88231
[run 1] produce refund-audit    RFD-S-88231
[run 1] send_offsets_to_transaction settlements:7=4522
[run 1] commit_transaction OK
[run 1] polled settlements:7@4522 settlement_id=S-88232
[run 1] begin_transaction
[run 1] produce refunds-emitted RFD-S-88232
[run 1] produce refund-audit    RFD-S-88232
[!! kernel-OOM, pod restarts, new pod with same transactional.id=refunds-7 ]

[run 2] init_transactions — bumped epoch, fenced previous (in-flight S-88232 aborted)
[run 2] polled settlements:7@4522 settlement_id=S-88232   # same offset re-polled
[run 2] begin_transaction
[run 2] produce refunds-emitted RFD-S-88232               # downstream sees this once
[run 2] produce refund-audit    RFD-S-88232
[run 2] send_offsets_to_transaction settlements:7=4523
[run 2] commit_transaction OK

Walking through the load-bearing lines:

The result: no duplicate RFD-S-88232 reaches refunds-emitted even though the producer wrote it once and was about to write it again. The first write lives in the partition log but its transaction was aborted, so read_committed consumers never see it.

Why each layer is necessary

The end-to-end guarantee is the product of three independent properties. Drop any one and exactly-once collapses.

Layer What it provides What breaks without it
Idempotent producer (enable.idempotence=true) Per-session, per-partition dedup of producer retries Network retries duplicate records inside one partition; aborts can't reliably mask them
Transactional producer (transactional.id set) Atomicity across multiple partitions including __consumer_offsets Outputs and offset commit decouple; crash between them = duplicate or loss
read_committed on every downstream consumer Aborted records and in-flight transactions invisible Aborted writes from upstream become real downstream — exactly-once leaks

The interaction is subtle. Suppose you set transactional.id but not enable.idempotence. The Kafka client will refuse to start — transactions imply idempotence, so the broker enforces it. Suppose you set enable.idempotence but no transactional.id. You get per-partition dedup of retries but no cross-partition atomicity, so the offset and outputs can diverge. Suppose you set both on the producer but the downstream consumer is read_uncommitted. Aborted output records leak through and propagate. Why all three are needed: each layer addresses a different failure surface. Idempotence handles in-network retry duplicates. Transactions handle cross-partition crash atomicity. read_committed handles propagation of aborts. They are orthogonal — none can substitute for another.

Three layers of exactly-onceA stack diagram showing the three layers required for end-to-end exactly-once semantics: idempotent producer at the bottom handles retry deduplication within one partition, transactional producer in the middle handles cross-partition atomicity including the offset commit, and read_committed consumer at the top prevents aborted records from propagating downstream. three layers stacked = end-to-end exactly-once layer 1 — idempotent producer enable.idempotence=true · per-(PID,partition) seq numbers · dedups retries inside one partition layer 2 — transactional producer transactional.id set · 2PC across partitions · sendOffsetsToTransaction bundles offset commit layer 3 — read_committed consumer (everywhere downstream) isolation.level=read_committed · LSO gates visibility · aborted records skipped on read
The producer side gives you durable atomic writes; the consumer side gives you visibility-after-commit. Both halves are needed — exactly-once is not a producer-only or consumer-only setting.

The historical confusion in the streaming community comes from "exactly-once" being marketed by some vendors as a single switch. It is not. Confluent's own documentation, after KIP-98 shipped, was careful to call it "exactly-once processing semantics" and to enumerate the configuration prerequisites. When a Razorpay or Swiggy team adopts a streaming framework and ships a pipeline labelled "exactly-once" without auditing every consumer's isolation.level, the resulting system is at-least-once with extra steps. The first audit step in any production migration is to grep every consumer config in the codebase for isolation.level and verify it is read_committed everywhere downstream of a transactional producer.

Crash points and what each one looks like

The processor can crash at six identifiable points. Walk through each:

  1. Before begin_transaction — no transaction open, no records written. Restart re-polls; clean.
  2. After begin_transaction, before any produce — empty open transaction. init_transactions on restart bumps the epoch and the empty transaction is implicitly aborted. Clean.
  3. After first produce, before second — partial output records in partition logs, all flagged with the open PID/epoch. Restart bumps the epoch; broker aborts the previous transaction; ABORT markers land; read_committed consumers never saw the records (they were past the LSO). Clean.
  4. After both produces, before send_offsets_to_transaction — outputs written, offset not yet bundled. Same outcome as case 3: epoch bump, abort, markers, downstream sees nothing. Clean.
  5. After send_offsets_to_transaction, before commit_transaction — same as case 4. The offset commit is also part of the open transaction; the abort wipes it.
  6. After commit_transaction — the success case — outputs visible, offset advanced. Restart re-polls the next offset. Clean.

The interesting case is mid-commit_transaction: the producer sent EndTxnRequest to the coordinator and is waiting for the response. If the producer crashes here, the coordinator's __transaction_state log already has PrepareCommit (or it doesn't). If PrepareCommit is durable, the recovery path is forward — the coordinator finishes writing markers. If not, the recovery path is backward — abort. Either way, the producer's restart sees a consistent state. Why this is the 2PC commit point's payoff: the entire correctness argument reduces to "is the prepare-commit decision durable before any participant marker is written?" If yes, recovery always converges. Kafka's __transaction_state is replicated like any topic, so the durability is straightforward.

The point worth emphasising: there is no crash window where downstream consumers in read_committed mode observe a partial transaction. The visibility cliff is the COMMIT control batch. Either the consumer has read past it (in which case the whole transaction is visible) or it hasn't (in which case nothing is). Half-states cannot leak.

Common confusions

Going deeper

Kafka Streams hides all this for you

Kafka Streams (processing.guarantee=exactly_once_v2) wraps the consumer-process-produce loop in a higher-level DSL. You write stream.mapValues(...).to("output") and the framework configures the underlying transactional producer, sets the consumer to read_committed, batches records into transactions sized by commit.interval.ms, and handles init_transactions and abort logic. The default commit.interval.ms=100 means transactions commit every 100ms, giving sub-second downstream visibility. Reading the Kafka Streams source — specifically StreamThread.runOnce and TaskManager.commit — is the cleanest tour of the pattern in production-quality code; the high-level operations map directly to the manual code above. The same goes for ksqlDB, which uses Kafka Streams underneath. If you're building a new system in 2026, you almost certainly want one of these abstractions; the manual loop is for understanding what they do, and for the rare case where the DSL doesn't fit.

What processing.guarantee=exactly_once_v2 changed

The original Kafka Streams EOS (introduced with KIP-129 in 2017) used one transactional.id per task. A topology with 50 tasks meant 50 transactions per commit interval — 50 control batches per partition involved per commit. KIP-447 (Kafka 2.5+) changed this to use one transactional.id per stream thread, fencing using consumer group epoch instead of partition assignment. The result is a 10–50× reduction in transactional overhead for high-task topologies. The label exactly_once_v2 in newer Kafka clients selects the KIP-447 mode. If you're upgrading a Streams app from 2.4 to 2.5+, switching to v2 typically reclaims significant throughput at no semantic cost.

Why ₹2 crore in mis-attributed refunds is the real cost of getting this wrong

A Razorpay-scale processor handling 10 million settlements a day at an average refund value of ₹2,000 emits a refund stream with annualised value of about ₹7,300 crore. A 0.01% duplicate rate at this scale — well within what a non-transactional pipeline produces during normal pod-restart cadence — is ₹73 lakh of duplicated refund instructions per year, almost all of which then need to be reconciled by an ops team flagging them on the merchant statements. The reconciliation cost (engineer-hours, customer-success time, partial credit-note issuance) typically dwarfs the refund face value because the dispute trail crosses three or four systems and has to be reconciled within the merchant's GST filing cycle. Exactly-once semantics is not a theoretical nicety here — it's the difference between a refunds pipeline that runs unattended and one that needs three engineers babysitting it during deploys.

The Flink interpretation: checkpoints as transactions

Apache Flink's exactly-once mode uses Kafka transactions as the transactional commit primitive but binds them to Flink's checkpoint boundaries instead of per-record. A Flink job with a 30-second checkpoint interval opens a Kafka transaction at checkpoint N's start and commits it when checkpoint N completes successfully. The trade-off is severe end-to-end latency (downstream read_committed consumers see records ~30s late) bought against very small per-record overhead. If your Flink job has a 30-second checkpoint and you want sub-second downstream freshness, the answer is to shorten the checkpoint interval — at the cost of more I/O for state-backend snapshots. This is one of the central tuning knobs in production Flink at any of the Indian streaming-heavy companies (Dream11, Swiggy, Cred). The chapter on Flink's two-phase-commit sink expands the full picture.

Operational instrumentation: what to monitor in production

Three signals tell you the loop is healthy. First, transaction commit rate vs abort rate. The Kafka broker exposes kafka.server:type=transaction-coordinator-metrics with txn-commit-rate and txn-abort-rate. In a healthy refunds processor at Razorpay scale, abort rate is a tiny fraction of commit rate — perhaps one abort per few thousand commits, all attributable to deploys or genuine errors. A sudden rise in aborts means producers are crashing mid-transaction or init_transactions is fencing zombies, and you should look at pod restart cadence. Second, LSO lag — the gap between high-water mark and last stable offset on output partitions. A growing LSO lag means transactions are staying open longer than expected; for a refunds processor that should commit every record, an LSO lag above a few hundred milliseconds is a bug. Third, per-record processing latency p99 — measured from msg.timestamp() to the moment commit_transaction returns. This is your downstream-visible latency; a Razorpay-internal SLO of "p99 ≤ 2 seconds end-to-end" maps directly to this metric. Forgetting to instrument any of these is how production teams discover six months later that exactly-once was silently downgraded to at-least-once when someone toggled enable.auto.commit=true while debugging.

Where this leads next

The closed loop covered here is the foundation for every higher-level streaming abstraction:

The pattern to internalise: end-to-end exactly-once is three configurations stacked, not one. Producer idempotence handles intra-partition retries; producer transactions handle cross-partition atomicity bundled with offset commit; consumer read_committed mode handles abort propagation. Build any one wrong and the guarantee silently collapses — usually in production, usually during a deploy, usually in a way that takes weeks to detect because the duplicates are rare.

References