Transactional writes: 2PC wearing a hat
A Razorpay refunds processor reads each settlement event from one Kafka topic, computes a refund record, writes it to a refunds-emitted topic, and advances its consumer offset on the source — all in one logical step. If the process crashes between the write and the offset commit, a naive setup either drops the refund (offset committed but write lost) or duplicates it (write happened, offset wasn't, restart replays). The transactional producer makes those three operations — write to many partitions, commit consumer offset, all-or-nothing — succeed together or fail together. The previous chapter showed how the idempotent producer dedups retries within one session per partition. This chapter shows how the transactional layer extends that into a multi-partition atomic commit, and reveals it as classical two-phase commit with the broker playing coordinator.
Kafka transactions are two-phase commit run by a per-producer transaction coordinator. The producer writes to many partitions, those writes are tagged "uncommitted", the coordinator writes a PREPARE marker to its internal transaction log, then writes a COMMIT (or ABORT) marker to every involved partition. Consumers in read_committed mode skip records until the matching commit marker arrives. This is 2PC, with the transaction-state log replacing the classical "transaction manager" durable store.
What goes wrong with idempotence alone
The idempotent producer (KIP-98, covered in /wiki/idempotent-producers-and-sequence-numbers) gives you per-session, per-partition deduplication. That is enough when you write a single record to a single partition and want retries to not duplicate. It is not enough for the refunds processor, which needs three things atomic:
- Write the refund record to
refunds-emitted(1 partition). - Write an audit row to
refund-audit(a different partition, possibly a different topic). - Commit the consumer offset on
settlements(a write to the internal__consumer_offsetstopic).
If the process crashes between step 1 and step 2, you have a refund without its audit row. If it crashes between step 2 and step 3, you have a refund and audit row that will both be re-emitted on restart because the offset wasn't advanced. Idempotence does not span partitions, and it does not span the consumer offset. Why this is not solvable with more idempotence: each partition has its own dedup window, but there's no cross-partition coordination. The producer can dedup its own retries to one partition, but it can't make two writes to two partitions appear as one atomic event. That requires a coordinator that writes a single durable decision somewhere, then propagates it.
This is exactly the problem two-phase commit (2PC) was designed for. The trick in Kafka is that the "coordinator" and the "durable transaction log" are themselves implemented on top of Kafka — the same broker, a special internal topic called __transaction_state. The protocol is recognisable to anyone who has read a database textbook chapter on distributed transactions; the implementation is just Kafka wearing a transaction-manager hat.
The 2PC protocol, mapped to Kafka
Classical two-phase commit has three roles: a transaction manager (TM), participants (the resource managers, RMs), and a durable log where the TM records the commit decision. The protocol runs in two phases:
- Prepare phase. TM tells every RM "prepare to commit". Each RM writes its changes durably as "uncommitted", then votes
YESorNO. If any vote isNO, the TM decidesABORT. - Commit phase. TM writes the decision (
COMMITorABORT) durably to its log, then tells every RM the decision. Each RM, on hearing the decision, finalises its uncommitted changes and acknowledges.
Kafka's mapping:
- Transaction Manager → the transaction coordinator, a broker selected by hashing the producer's
transactional.idto a partition of the internal__transaction_statetopic. That partition's leader is the coordinator for that producer. - Resource Managers → the partition leaders for every topic-partition the producer wrote to in this transaction, plus the
__consumer_offsetspartition for any offset commits. - TM's durable log → the
__transaction_statetopic itself. Every state change for the transaction (Empty → Ongoing → PrepareCommit → CompleteCommit) is appended there as a record. - Per-RM commit marker → a special "control batch" record (one of two control types:
COMMITorABORT) appended to each participant partition. Consumers inread_committedmode skip records whose transaction's marker isABORTand skip records whose marker hasn't arrived yet.
Why control batches and not metadata: Kafka's storage is append-only. There is no way to "mark" earlier records in place — once written, a record is immutable. The control batch is appended after the transaction's data records, so a consumer reading sequentially knows the outcome by the time it reaches the marker. The trade-off is that consumers in read_committed mode buffer or skip records until the marker arrives, which adds a tail-latency cost equal to the transaction's duration.
The protocol step-by-step, for a producer with transactional.id="refunds-1" running one transaction:
InitTransactions()— the producer asks for aproducer_id(PID) and anepoch. The coordinator bumps the epoch (fencing any zombie with the previous epoch) and writes the new PID/epoch to__transaction_state.BeginTransaction()— pure client-side; flips a flag. No broker call yet.- First send to a new partition — the producer sends an
AddPartitionsToTxnRequestto the coordinator listing the partitions it's about to write to. The coordinator appendsOngoingstate to__transaction_state. This is the prepare-phase analogue: the coordinator now knows which RMs are involved and has durably recorded its intent. - Sends to those partitions — normal
ProduceRequests, but the records carry thetransactional.idflag and the(PID, epoch). The partition leader appends them to its log as ordinary records — they're physically present but flagged as "part of an open transaction". SendOffsetsToTransaction()— the consumer group's offset commit becomes part of the same transaction. The producer sends anAddOffsetsToTxnRequest; the coordinator records that__consumer_offsetspartition X is also a participant.CommitTransaction()— the producer sends anEndTxnRequest(commit=true)to the coordinator. The coordinator:- Writes
PrepareCommitto__transaction_state(this is the durable decision; once this lands, the transaction is committed even if the coordinator dies — a recovered coordinator will seePrepareCommitand finish the commit phase). - Sends a
WriteTxnMarkersRequestto every participant partition. The partition leader appends theCOMMITcontrol batch to its log. - On hearing back from all participants, writes
CompleteCommitto__transaction_state. The transaction is fully done.
- Writes
The asymmetry that makes this 2PC and not just "write a marker": the coordinator's durable decision (PrepareCommit) lands before the partition markers. If the coordinator dies between PrepareCommit and finishing the markers, a new coordinator picks up the __transaction_state log on recovery, sees the PrepareCommit, and re-issues WriteTxnMarkersRequest — finishing the commit. There is no rollback after PrepareCommit. Why this is the textbook 2PC commit point: in classical 2PC, once the TM writes COMMIT to its durable log, it is forward-only — the system will commit even through crashes. Kafka's PrepareCommit is that point. If the producer crashes after PrepareCommit, the transaction still commits. If a coordinator crashes after PrepareCommit, the new coordinator finishes it.
A working transactional coordinator, in 80 lines of Python
The cleanest way to internalise the protocol is to write the coordinator's state machine. This is a single-broker, single-producer simplification that captures the load-bearing pieces — __transaction_state as a list, two participant partitions as lists, and a coordinator that drives the state machine.
# txn_coordinator.py — minimal Kafka-style 2PC coordinator
from dataclasses import dataclass, field
from enum import Enum
class TxnState(Enum):
EMPTY = "Empty"
ONGOING = "Ongoing"
PREPARE_COMMIT = "PrepareCommit"
COMPLETE_COMMIT = "CompleteCommit"
PREPARE_ABORT = "PrepareAbort"
COMPLETE_ABORT = "CompleteAbort"
@dataclass
class Partition:
name: str
log: list = field(default_factory=list) # each entry: {"type": "data"|"COMMIT"|"ABORT", ...}
@dataclass
class Coordinator:
txn_state_log: list = field(default_factory=list) # __transaction_state
current_state: TxnState = TxnState.EMPTY
participants: list = field(default_factory=list)
pid: int = 0
epoch: int = 0
def init_transactions(self, transactional_id: str):
self.epoch += 1
self.pid = hash(transactional_id) & 0xFFFF
self.txn_state_log.append({"event": "INIT", "pid": self.pid, "epoch": self.epoch})
self.current_state = TxnState.EMPTY
def add_partition(self, partition: Partition):
if self.current_state == TxnState.EMPTY:
self.current_state = TxnState.ONGOING
self.txn_state_log.append({"event": "ONGOING", "pid": self.pid, "epoch": self.epoch})
self.participants.append(partition)
def write_data(self, partition: Partition, payload: dict):
# data record carries (pid, epoch); flagged as part of open txn
partition.log.append({"type": "data", "pid": self.pid, "epoch": self.epoch, **payload})
def commit(self, fail_after_prepare: bool = False):
# Phase 1: durable decision
self.txn_state_log.append({"event": "PrepareCommit", "pid": self.pid, "epoch": self.epoch})
self.current_state = TxnState.PREPARE_COMMIT
if fail_after_prepare:
# Simulate coordinator crash here; recovery will finish the commit
print("[!] coordinator crashed after PrepareCommit — recovery would replay")
return
# Phase 2: write COMMIT marker to each participant
for p in self.participants:
p.log.append({"type": "COMMIT", "pid": self.pid, "epoch": self.epoch})
self.txn_state_log.append({"event": "CompleteCommit", "pid": self.pid, "epoch": self.epoch})
self.current_state = TxnState.COMPLETE_COMMIT
self.participants = []
def recover(self):
# On startup, replay txn_state_log to find unfinished transactions
last = self.txn_state_log[-1] if self.txn_state_log else None
if last and last["event"] == "PrepareCommit":
print(f"[recover] found PrepareCommit for pid={last['pid']}, finishing commit")
for p in self.participants:
p.log.append({"type": "COMMIT", "pid": last["pid"], "epoch": last["epoch"]})
self.txn_state_log.append({"event": "CompleteCommit", **{k: last[k] for k in ('pid','epoch')}})
# Demo: refunds processor writing to two partitions atomically
refunds = Partition("refunds-emitted-p4")
audit = Partition("refund-audit-p1")
coord = Coordinator()
coord.init_transactions("refunds-1")
coord.add_partition(refunds)
coord.add_partition(audit)
coord.write_data(refunds, {"refund_id": "RFD-9001", "amount_paise": 250000, "merchant": "razorpay-test"})
coord.write_data(audit, {"refund_id": "RFD-9001", "actor": "system", "reason": "merchant-initiated"})
coord.commit()
print("refunds.log:", refunds.log)
print("audit.log:", audit.log)
print("txn_state_log:", coord.txn_state_log)
refunds.log: [
{'type': 'data', 'pid': 24145, 'epoch': 1, 'refund_id': 'RFD-9001', 'amount_paise': 250000, 'merchant': 'razorpay-test'},
{'type': 'COMMIT', 'pid': 24145, 'epoch': 1}
]
audit.log: [
{'type': 'data', 'pid': 24145, 'epoch': 1, 'refund_id': 'RFD-9001', 'actor': 'system', 'reason': 'merchant-initiated'},
{'type': 'COMMIT', 'pid': 24145, 'epoch': 1}
]
txn_state_log: [
{'event': 'INIT', 'pid': 24145, 'epoch': 1},
{'event': 'ONGOING', 'pid': 24145, 'epoch': 1},
{'event': 'PrepareCommit', 'pid': 24145, 'epoch': 1},
{'event': 'CompleteCommit', 'pid': 24145, 'epoch': 1}
]
Walking through the load-bearing lines:
txn_state_logis the durable transaction-manager log. Every state transition is appended to it. In real Kafka, this is the__transaction_statetopic — a normal Kafka topic, replicated like any other, with the producer'stransactional.idhashing to one of its partitions. This file plays the role of the WAL in a database transaction manager.add_partitiondrives theEMPTY → ONGOINGtransition and recordsONGOINGto the log. Real Kafka does this on theAddPartitionsToTxnRequestRPC. Why this matters: a coordinator recovering from a crash and seeingONGOINGknows the transaction was in flight but no commit decision was reached — recovery aborts it. Recovery seeingPrepareCommitknows the decision was made and finishes the commit. The state itself is the recovery instruction.commitwritesPrepareCommitbefore writing the markers. This is the 2PC commit point. Thefail_after_prepare=Truebranch shows what happens if the coordinator dies right then;recover()replays the log and finishes the commit. Try running withcoord.commit(fail_after_prepare=True); coord.recover()to see it.- The COMMIT marker is appended to each partition's log alongside the data records. A consumer in
read_committedmode reading the partition would see the data record first, hold it in a buffer marked "uncommitted", then see the COMMIT marker and release the data record to the application. Without the marker, the data would be skipped. - No record is removed on abort. The
ABORTmarker is appended; consumers inread_committedmode skip the data records that came before it. The records still occupy log space until log retention deletes them. This is why aborting many transactions per second wastes disk.
This 80-line model captures the protocol's load-bearing invariants. Real Kafka adds: per-partition leader election, transaction-id expiration, control-batch wire format, and the WriteTxnMarkersRequest RPC. The state-machine logic is the same.
Read-committed isolation: why consumers care
A consumer reading from a partition that has open transactions sees data records and control batches interleaved. Two isolation levels apply:
isolation.level=read_uncommitted(default for plain consumers). Consumer reads every data record as soon as it's appended. Open-transaction records are visible. If the transaction later aborts, the consumer has already processed records that "didn't happen".isolation.level=read_committed(for stream-processing consumers). Consumer reads up to the Last Stable Offset (LSO) — the offset of the earliest still-open transaction. Records past the LSO are buffered until their transaction's marker arrives:COMMITmarker → release the buffered records to the application, in order.ABORTmarker → discard the buffered records.
The LSO is a per-partition value the broker maintains: it advances as transactions close, and it lags the high-water mark by the duration of the longest open transaction. Why this introduces tail latency: a consumer's "freshness" is bounded below by the LSO. If a producer holds a transaction open for 30 seconds (a long aggregation window, say), no read_committed consumer can see records produced in that window for 30 seconds. This is the cost of atomicity — buying it from the producer side is how the consumer side gets to remain simple.
For Razorpay's refunds processor, every downstream — the refunds-display service, the merchant-notification service, the warehouse loader — would be read_committed. None of them ever sees a refund record without its audit row, because both arrive together when the transaction commits.
Common confusions
- "Kafka transactions are like database transactions; they roll back on abort." No — aborted records remain in the log. Only the
ABORTcontrol batch is appended; consumers inread_committedmode skip the data records during read. Disk is consumed until log retention deletes the segment. Aborting at high rates wastes storage. - "
transactional.idis a per-transaction identifier." No — it's a per-producer-instance identifier. Onetransactional.idruns many transactions over its lifetime. The unique transaction identity is(PID, epoch, txn_sequence), all internal. The application choosestransactional.idonce per logical producer (e.g.,refunds-svc-shard-7) and never changes it across restarts. - "Two-phase commit blocks indefinitely if a participant dies." Classical 2PC has that property; Kafka's variant does not, because the participants are partition leaders and the coordinator's
__transaction_stateis itself replicated. If a partition leader dies, the new leader has the same uncommitted data records (replicated as part of the partition) and accepts the COMMIT marker on recovery. The blocking failure mode in classical 2PC (TM dies afterPrepareCommit) is handled by__transaction_state's replication — a new coordinator replays the log and finishes. - "Transactions cover the consumer's database write to my MySQL sink." No — Kafka transactions only cover writes to Kafka topics and Kafka's own
__consumer_offsets. A sink that writes to an external system (MySQL, S3, an HTTP webhook) needs its own 2PC handshake — typically Flink's two-phase-commit sink, covered in /wiki/flinks-two-phase-commit-sink. - "Read-committed consumers see the same data as read-uncommitted, just later." Mostly true, but with a subtle order difference:
read_committedreleases records grouped by their commit marker, so a single transaction's records arrive together.read_uncommittedinterleaves data from concurrent transactions in offset order. For applications that depend on transactional grouping (e.g., a refund and its audit row must arrive in the same poll batch),read_committedis mandatory. - "I can use the transactional producer for fire-and-forget logs." You can, but the per-transaction overhead (the prepare marker, the participant marker writes, the LSO impact on consumers) is a poor fit for non-atomic logging. Use
enable.idempotence=truealone for at-least-once-with-no-duplicates, and reserve transactions for the cases where atomicity across partitions or the offset commit is genuinely needed.
Going deeper
The transaction-state machine in __transaction_state
Kafka's __transaction_state topic is keyed by transactional.id and stores TransactionMetadata records with the full state machine: Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead. The keying-by-transactional.id means every state change for one producer hashes to the same partition, so the coordinator's view is consistent. The Dead state is reached when the metadata is being garbage-collected after transactional.id.expiration.ms elapses with no activity. The state machine has an explicit transition table — readable in TransactionStateManager.scala in the Apache Kafka source — and any transition not in the table is a hard error. This is the level of formality that makes the protocol verifiable: you can read the state diagram and prove (manually) that every crash point recovers to a consistent state.
Why transactional.id is global, not per-process
A common confusion is to set transactional.id = "refunds-svc-" + uuid() so every pod has a unique ID. This breaks zombie fencing: if a pod restarts, the new pod gets a fresh transactional.id, the broker sees no prior epoch to bump, and the old pod's transactions can complete in parallel with the new pod's. The correct pattern is to derive transactional.id from a stable shard identity — "refunds-svc-" + shard_index — so that a pod restart for shard 7 collides with the previous pod's transactional.id, bumps the epoch, and fences the zombie. In Kubernetes terms: transactional.id should come from the StatefulSet pod ordinal, not from hostname. The Razorpay refunds service learnt this in early 2024 when a horizontal pod autoscaler was scaling shards independently and four pods ended up sharing the same workload due to mis-configured transactional.id derivation, which manifested as occasional duplicate refund emissions during deployment windows.
The cost of read_committed on tail latency
The LSO advances only as transactions close. If a producer opens a transaction every 100ms and commits every 100ms, read_committed consumers see records ~100ms after they're produced — a small constant. But a producer that batches large windows — say, a Flink job with a 30-second checkpoint interval — keeps transactions open for the checkpoint duration, and the LSO sits ~30 seconds behind the high-water mark. Downstream consumers see this as a 30-second delay before any record from a checkpoint becomes visible. For a real-time analytics pipeline at Zerodha that reads tick data, this is the difference between "5-second p99 dashboard freshness" and "35-second p99 dashboard freshness" — entirely a function of the upstream's transaction duration. Tuning Flink's checkpoint interval is partly a tail-latency knob for this reason.
What happens to consumer offsets on abort
When a transactional producer calls sendOffsetsToTransaction(offsets, group), those offsets are tagged with the transaction. If the transaction commits, the offsets are appended to __consumer_offsets as committed. If it aborts, they are not. This is the essential trick that lets a consumer-process-produce loop be exactly-once: the offset commit and the produce both succeed or both fail. On restart, the consumer reads its committed offset (from before the failed transaction) and re-processes, and the dedup is handled by the producer's idempotent retries plus the transactional re-issue. Without sendOffsetsToTransaction, the offset commit would be a separate API call and could succeed while the produce failed — duplicating exactly the problem we set out to solve.
Comparing to PostgreSQL's prepared transactions
PostgreSQL exposes 2PC directly via PREPARE TRANSACTION 'name' and COMMIT PREPARED 'name' / ROLLBACK PREPARED 'name'. The mechanism is the same: a prepared transaction is durable, surviving crashes; a recovered postmaster will see prepared transactions in pg_prepared_xacts and wait for an external coordinator to issue the final commit or rollback. Kafka's transaction coordinator plays the role of that external coordinator, but inside Kafka itself. The interesting cross-system pattern is when an application wants to atomically write to both Kafka and Postgres — that requires a third-party 2PC coordinator (or, more commonly in production, an outbox pattern: write to Postgres in a single transaction including an outbox row, and a CDC consumer ships the outbox row to Kafka with at-least-once semantics, which the consumer-side dedup handles).
Where this leads next
The next chapters compose the transactional producer with a full processing graph:
- /wiki/end-to-end-source-process-sink — the closed loop: a transactional consumer-producer that reads from one topic, processes, writes to another, and commits the source offset, all in one transaction.
- /wiki/kafka-transactions-under-the-hood — the wire-protocol view: every RPC, every byte, every recovery path.
- /wiki/flinks-two-phase-commit-sink — what happens when the sink is not Kafka: how Flink generalises 2PC for arbitrary external systems.
The pattern to internalise: idempotence handles dedup within one partition's record stream; transactions handle atomicity across partitions and across the offset commit; the sink contract handles atomicity to the outside world. Each layer solves one specific problem, and stacking them gives you end-to-end exactly-once.
References
- KIP-98: Exactly Once Delivery and Transactional Messaging — the canonical design doc; transactional sections are in the second half.
- Transactions in Apache Kafka — Confluent blog — the protocol diagrams (control batches, LSO, transaction state machine) that this article adapts.
- Apache Kafka source:
TransactionStateManager.scala— the state-machine implementation; readable in one afternoon. - Gray and Reuter, Transaction Processing: Concepts and Techniques, Chapter 12 — the classical 2PC reference that Kafka's design echoes; the prepare-commit point is on page 562.
- PostgreSQL: Two-Phase Commit —
PREPARE TRANSACTIONsemantics; useful for cross-system 2PC intuition. - /wiki/idempotent-producers-and-sequence-numbers — the previous chapter; the dedup substrate that transactions build on.
- Confluent: Exactly-Once Semantics Are Possible — Here's How Kafka Does It — Neha Narkhede's original announcement post; useful context for the design rationale.