Transactional writes: 2PC wearing a hat

A Razorpay refunds processor reads each settlement event from one Kafka topic, computes a refund record, writes it to a refunds-emitted topic, and advances its consumer offset on the source — all in one logical step. If the process crashes between the write and the offset commit, a naive setup either drops the refund (offset committed but write lost) or duplicates it (write happened, offset wasn't, restart replays). The transactional producer makes those three operations — write to many partitions, commit consumer offset, all-or-nothing — succeed together or fail together. The previous chapter showed how the idempotent producer dedups retries within one session per partition. This chapter shows how the transactional layer extends that into a multi-partition atomic commit, and reveals it as classical two-phase commit with the broker playing coordinator.

Kafka transactions are two-phase commit run by a per-producer transaction coordinator. The producer writes to many partitions, those writes are tagged "uncommitted", the coordinator writes a PREPARE marker to its internal transaction log, then writes a COMMIT (or ABORT) marker to every involved partition. Consumers in read_committed mode skip records until the matching commit marker arrives. This is 2PC, with the transaction-state log replacing the classical "transaction manager" durable store.

What goes wrong with idempotence alone

The idempotent producer (KIP-98, covered in /wiki/idempotent-producers-and-sequence-numbers) gives you per-session, per-partition deduplication. That is enough when you write a single record to a single partition and want retries to not duplicate. It is not enough for the refunds processor, which needs three things atomic:

  1. Write the refund record to refunds-emitted (1 partition).
  2. Write an audit row to refund-audit (a different partition, possibly a different topic).
  3. Commit the consumer offset on settlements (a write to the internal __consumer_offsets topic).

If the process crashes between step 1 and step 2, you have a refund without its audit row. If it crashes between step 2 and step 3, you have a refund and audit row that will both be re-emitted on restart because the offset wasn't advanced. Idempotence does not span partitions, and it does not span the consumer offset. Why this is not solvable with more idempotence: each partition has its own dedup window, but there's no cross-partition coordination. The producer can dedup its own retries to one partition, but it can't make two writes to two partitions appear as one atomic event. That requires a coordinator that writes a single durable decision somewhere, then propagates it.

This is exactly the problem two-phase commit (2PC) was designed for. The trick in Kafka is that the "coordinator" and the "durable transaction log" are themselves implemented on top of Kafka — the same broker, a special internal topic called __transaction_state. The protocol is recognisable to anyone who has read a database textbook chapter on distributed transactions; the implementation is just Kafka wearing a transaction-manager hat.

Three writes that must be atomic in a stream-processing transactionA Kafka stream-processing job reads from a settlements topic, writes to a refunds topic and an audit topic, and commits its consumer offset on the source. All three writes plus the offset commit must succeed or fail together. The three writes that must be atomic processor refunds-svc on k8s transactional.id=refunds-1 refunds-emitted topic, partition 4 refund-audit topic, partition 1 __consumer_offsets internal, settlements:7 1. write refund 2. write audit 3. commit offset All three must commit, or none — otherwise duplicate or lost refunds crash between steps = inconsistency that idempotence alone can't fix
The three writes a typical stream-processing job needs to make atomic. Idempotence handles dedup within each partition; transactions handle the all-or-nothing across partitions and the consumer offset commit.

The 2PC protocol, mapped to Kafka

Classical two-phase commit has three roles: a transaction manager (TM), participants (the resource managers, RMs), and a durable log where the TM records the commit decision. The protocol runs in two phases:

  1. Prepare phase. TM tells every RM "prepare to commit". Each RM writes its changes durably as "uncommitted", then votes YES or NO. If any vote is NO, the TM decides ABORT.
  2. Commit phase. TM writes the decision (COMMIT or ABORT) durably to its log, then tells every RM the decision. Each RM, on hearing the decision, finalises its uncommitted changes and acknowledges.

Kafka's mapping:

Why control batches and not metadata: Kafka's storage is append-only. There is no way to "mark" earlier records in place — once written, a record is immutable. The control batch is appended after the transaction's data records, so a consumer reading sequentially knows the outcome by the time it reaches the marker. The trade-off is that consumers in read_committed mode buffer or skip records until the marker arrives, which adds a tail-latency cost equal to the transaction's duration.

The protocol step-by-step, for a producer with transactional.id="refunds-1" running one transaction:

  1. InitTransactions() — the producer asks for a producer_id (PID) and an epoch. The coordinator bumps the epoch (fencing any zombie with the previous epoch) and writes the new PID/epoch to __transaction_state.
  2. BeginTransaction() — pure client-side; flips a flag. No broker call yet.
  3. First send to a new partition — the producer sends an AddPartitionsToTxnRequest to the coordinator listing the partitions it's about to write to. The coordinator appends Ongoing state to __transaction_state. This is the prepare-phase analogue: the coordinator now knows which RMs are involved and has durably recorded its intent.
  4. Sends to those partitions — normal ProduceRequests, but the records carry the transactional.id flag and the (PID, epoch). The partition leader appends them to its log as ordinary records — they're physically present but flagged as "part of an open transaction".
  5. SendOffsetsToTransaction() — the consumer group's offset commit becomes part of the same transaction. The producer sends an AddOffsetsToTxnRequest; the coordinator records that __consumer_offsets partition X is also a participant.
  6. CommitTransaction() — the producer sends an EndTxnRequest(commit=true) to the coordinator. The coordinator:
    • Writes PrepareCommit to __transaction_state (this is the durable decision; once this lands, the transaction is committed even if the coordinator dies — a recovered coordinator will see PrepareCommit and finish the commit phase).
    • Sends a WriteTxnMarkersRequest to every participant partition. The partition leader appends the COMMIT control batch to its log.
    • On hearing back from all participants, writes CompleteCommit to __transaction_state. The transaction is fully done.

The asymmetry that makes this 2PC and not just "write a marker": the coordinator's durable decision (PrepareCommit) lands before the partition markers. If the coordinator dies between PrepareCommit and finishing the markers, a new coordinator picks up the __transaction_state log on recovery, sees the PrepareCommit, and re-issues WriteTxnMarkersRequest — finishing the commit. There is no rollback after PrepareCommit. Why this is the textbook 2PC commit point: in classical 2PC, once the TM writes COMMIT to its durable log, it is forward-only — the system will commit even through crashes. Kafka's PrepareCommit is that point. If the producer crashes after PrepareCommit, the transaction still commits. If a coordinator crashes after PrepareCommit, the new coordinator finishes it.

A working transactional coordinator, in 80 lines of Python

The cleanest way to internalise the protocol is to write the coordinator's state machine. This is a single-broker, single-producer simplification that captures the load-bearing pieces — __transaction_state as a list, two participant partitions as lists, and a coordinator that drives the state machine.

# txn_coordinator.py — minimal Kafka-style 2PC coordinator
from dataclasses import dataclass, field
from enum import Enum

class TxnState(Enum):
    EMPTY = "Empty"
    ONGOING = "Ongoing"
    PREPARE_COMMIT = "PrepareCommit"
    COMPLETE_COMMIT = "CompleteCommit"
    PREPARE_ABORT = "PrepareAbort"
    COMPLETE_ABORT = "CompleteAbort"

@dataclass
class Partition:
    name: str
    log: list = field(default_factory=list)  # each entry: {"type": "data"|"COMMIT"|"ABORT", ...}

@dataclass
class Coordinator:
    txn_state_log: list = field(default_factory=list)  # __transaction_state
    current_state: TxnState = TxnState.EMPTY
    participants: list = field(default_factory=list)
    pid: int = 0
    epoch: int = 0

    def init_transactions(self, transactional_id: str):
        self.epoch += 1
        self.pid = hash(transactional_id) & 0xFFFF
        self.txn_state_log.append({"event": "INIT", "pid": self.pid, "epoch": self.epoch})
        self.current_state = TxnState.EMPTY

    def add_partition(self, partition: Partition):
        if self.current_state == TxnState.EMPTY:
            self.current_state = TxnState.ONGOING
            self.txn_state_log.append({"event": "ONGOING", "pid": self.pid, "epoch": self.epoch})
        self.participants.append(partition)

    def write_data(self, partition: Partition, payload: dict):
        # data record carries (pid, epoch); flagged as part of open txn
        partition.log.append({"type": "data", "pid": self.pid, "epoch": self.epoch, **payload})

    def commit(self, fail_after_prepare: bool = False):
        # Phase 1: durable decision
        self.txn_state_log.append({"event": "PrepareCommit", "pid": self.pid, "epoch": self.epoch})
        self.current_state = TxnState.PREPARE_COMMIT

        if fail_after_prepare:
            # Simulate coordinator crash here; recovery will finish the commit
            print("[!] coordinator crashed after PrepareCommit — recovery would replay")
            return

        # Phase 2: write COMMIT marker to each participant
        for p in self.participants:
            p.log.append({"type": "COMMIT", "pid": self.pid, "epoch": self.epoch})

        self.txn_state_log.append({"event": "CompleteCommit", "pid": self.pid, "epoch": self.epoch})
        self.current_state = TxnState.COMPLETE_COMMIT
        self.participants = []

    def recover(self):
        # On startup, replay txn_state_log to find unfinished transactions
        last = self.txn_state_log[-1] if self.txn_state_log else None
        if last and last["event"] == "PrepareCommit":
            print(f"[recover] found PrepareCommit for pid={last['pid']}, finishing commit")
            for p in self.participants:
                p.log.append({"type": "COMMIT", "pid": last["pid"], "epoch": last["epoch"]})
            self.txn_state_log.append({"event": "CompleteCommit", **{k: last[k] for k in ('pid','epoch')}})

# Demo: refunds processor writing to two partitions atomically
refunds = Partition("refunds-emitted-p4")
audit = Partition("refund-audit-p1")
coord = Coordinator()

coord.init_transactions("refunds-1")
coord.add_partition(refunds)
coord.add_partition(audit)
coord.write_data(refunds, {"refund_id": "RFD-9001", "amount_paise": 250000, "merchant": "razorpay-test"})
coord.write_data(audit, {"refund_id": "RFD-9001", "actor": "system", "reason": "merchant-initiated"})
coord.commit()

print("refunds.log:", refunds.log)
print("audit.log:", audit.log)
print("txn_state_log:", coord.txn_state_log)
refunds.log: [
  {'type': 'data', 'pid': 24145, 'epoch': 1, 'refund_id': 'RFD-9001', 'amount_paise': 250000, 'merchant': 'razorpay-test'},
  {'type': 'COMMIT', 'pid': 24145, 'epoch': 1}
]
audit.log: [
  {'type': 'data', 'pid': 24145, 'epoch': 1, 'refund_id': 'RFD-9001', 'actor': 'system', 'reason': 'merchant-initiated'},
  {'type': 'COMMIT', 'pid': 24145, 'epoch': 1}
]
txn_state_log: [
  {'event': 'INIT', 'pid': 24145, 'epoch': 1},
  {'event': 'ONGOING', 'pid': 24145, 'epoch': 1},
  {'event': 'PrepareCommit', 'pid': 24145, 'epoch': 1},
  {'event': 'CompleteCommit', 'pid': 24145, 'epoch': 1}
]

Walking through the load-bearing lines:

This 80-line model captures the protocol's load-bearing invariants. Real Kafka adds: per-partition leader election, transaction-id expiration, control-batch wire format, and the WriteTxnMarkersRequest RPC. The state-machine logic is the same.

Read-committed isolation: why consumers care

A consumer reading from a partition that has open transactions sees data records and control batches interleaved. Two isolation levels apply:

The LSO is a per-partition value the broker maintains: it advances as transactions close, and it lags the high-water mark by the duration of the longest open transaction. Why this introduces tail latency: a consumer's "freshness" is bounded below by the LSO. If a producer holds a transaction open for 30 seconds (a long aggregation window, say), no read_committed consumer can see records produced in that window for 30 seconds. This is the cost of atomicity — buying it from the producer side is how the consumer side gets to remain simple.

For Razorpay's refunds processor, every downstream — the refunds-display service, the merchant-notification service, the warehouse loader — would be read_committed. None of them ever sees a refund record without its audit row, because both arrive together when the transaction commits.

Last Stable Offset and read_committed visibilityA partition log timeline showing data records from two transactions interleaved. Transaction 1 commits, transaction 2 is still open. The LSO sits at the start of transaction 2's first record; a read_committed consumer cannot see past that point even though later records are physically present. Last Stable Offset and read_committed isolation partition log (offsets 0..10): T1.A off 0 T1.B off 1 T1.COMMIT off 2 T2.A off 3 T2.B off 4 T2.C off 5 T2 ? off 6 T2 ? off 7 LSO = 3 (start of earliest open txn) read_committed consumer reads offsets 0..2 (T1's data and commit marker), then waits at offset 3. Once T2's COMMIT lands at, say, offset 8, LSO advances to 9 and the consumer releases T2.A, T2.B, T2.C in order.
The LSO is the broker's promise to read_committed consumers: "you will not see any record at offset >= LSO until the transaction it belongs to closes". This is what keeps consumers from observing ghosts of aborted transactions.

Common confusions

Going deeper

The transaction-state machine in __transaction_state

Kafka's __transaction_state topic is keyed by transactional.id and stores TransactionMetadata records with the full state machine: Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead. The keying-by-transactional.id means every state change for one producer hashes to the same partition, so the coordinator's view is consistent. The Dead state is reached when the metadata is being garbage-collected after transactional.id.expiration.ms elapses with no activity. The state machine has an explicit transition table — readable in TransactionStateManager.scala in the Apache Kafka source — and any transition not in the table is a hard error. This is the level of formality that makes the protocol verifiable: you can read the state diagram and prove (manually) that every crash point recovers to a consistent state.

Why transactional.id is global, not per-process

A common confusion is to set transactional.id = "refunds-svc-" + uuid() so every pod has a unique ID. This breaks zombie fencing: if a pod restarts, the new pod gets a fresh transactional.id, the broker sees no prior epoch to bump, and the old pod's transactions can complete in parallel with the new pod's. The correct pattern is to derive transactional.id from a stable shard identity — "refunds-svc-" + shard_index — so that a pod restart for shard 7 collides with the previous pod's transactional.id, bumps the epoch, and fences the zombie. In Kubernetes terms: transactional.id should come from the StatefulSet pod ordinal, not from hostname. The Razorpay refunds service learnt this in early 2024 when a horizontal pod autoscaler was scaling shards independently and four pods ended up sharing the same workload due to mis-configured transactional.id derivation, which manifested as occasional duplicate refund emissions during deployment windows.

The cost of read_committed on tail latency

The LSO advances only as transactions close. If a producer opens a transaction every 100ms and commits every 100ms, read_committed consumers see records ~100ms after they're produced — a small constant. But a producer that batches large windows — say, a Flink job with a 30-second checkpoint interval — keeps transactions open for the checkpoint duration, and the LSO sits ~30 seconds behind the high-water mark. Downstream consumers see this as a 30-second delay before any record from a checkpoint becomes visible. For a real-time analytics pipeline at Zerodha that reads tick data, this is the difference between "5-second p99 dashboard freshness" and "35-second p99 dashboard freshness" — entirely a function of the upstream's transaction duration. Tuning Flink's checkpoint interval is partly a tail-latency knob for this reason.

What happens to consumer offsets on abort

When a transactional producer calls sendOffsetsToTransaction(offsets, group), those offsets are tagged with the transaction. If the transaction commits, the offsets are appended to __consumer_offsets as committed. If it aborts, they are not. This is the essential trick that lets a consumer-process-produce loop be exactly-once: the offset commit and the produce both succeed or both fail. On restart, the consumer reads its committed offset (from before the failed transaction) and re-processes, and the dedup is handled by the producer's idempotent retries plus the transactional re-issue. Without sendOffsetsToTransaction, the offset commit would be a separate API call and could succeed while the produce failed — duplicating exactly the problem we set out to solve.

Comparing to PostgreSQL's prepared transactions

PostgreSQL exposes 2PC directly via PREPARE TRANSACTION 'name' and COMMIT PREPARED 'name' / ROLLBACK PREPARED 'name'. The mechanism is the same: a prepared transaction is durable, surviving crashes; a recovered postmaster will see prepared transactions in pg_prepared_xacts and wait for an external coordinator to issue the final commit or rollback. Kafka's transaction coordinator plays the role of that external coordinator, but inside Kafka itself. The interesting cross-system pattern is when an application wants to atomically write to both Kafka and Postgres — that requires a third-party 2PC coordinator (or, more commonly in production, an outbox pattern: write to Postgres in a single transaction including an outbox row, and a CDC consumer ships the outbox row to Kafka with at-least-once semantics, which the consumer-side dedup handles).

Where this leads next

The next chapters compose the transactional producer with a full processing graph:

The pattern to internalise: idempotence handles dedup within one partition's record stream; transactions handle atomicity across partitions and across the offset commit; the sink contract handles atomicity to the outside world. Each layer solves one specific problem, and stacking them gives you end-to-end exactly-once.

References