Kafka transactions under the hood
A Razorpay payments engineer is staring at a heap dump from a producer that hung for 47 seconds before throwing ProducerFencedException. She knows the protocol from the /wiki/transactional-writes-2pc-wearing-a-hat chapter — prepare, commit, markers — but knowing the protocol is not the same as knowing which RPC was in flight when the producer was fenced, what bytes were on the wire, or which broker was the coordinator. To debug a transactional producer in production, you need the wire-level view: the six request types, the __transaction_state record format, and the recovery state machine.
A Kafka transaction is six RPCs (FindCoordinator, InitProducerId, AddPartitionsToTxn, Produce, AddOffsetsToTxn, EndTxn) plus a control-batch write the broker performs internally (WriteTxnMarkers). All durable state lives in __transaction_state, a 50-partition compacted topic keyed by transactional.id. Recovery is mechanical: read the log, replay state transitions, finish whatever was in PrepareCommit or PrepareAbort, abort everything else.
The six RPCs in order
Every transactional producer makes the same RPC sequence per transaction. The order is fixed by the protocol — running them out of order returns INVALID_TXN_STATE (error code 48). The full list, in the order a transactional producer issues them:
FindCoordinator(transactional.id)— bootstrap. Producer asks any broker "who's the coordinator for mytransactional.id?". The broker hashes the id to a__transaction_statepartition (default 50 partitions), looks up that partition's leader, returns itsnode_idandhost:port. Once. Cached for the producer's lifetime.InitProducerId(transactional.id, transaction.timeout.ms)— handshake. Coordinator allocates aproducer_id(PID, a 64-bit integer), bumpsproducer_epochby 1, persists(transactional.id → PID, epoch)to__transaction_state, returns(PID, epoch)to the producer. This is also where zombies get fenced: the bumped epoch invalidates any older producer instance still trying to write.AddPartitionsToTxn(PID, epoch, [topic-partition, ...])— once per topic-partition the transaction will touch. Coordinator records "this PID's open transaction now includes partition X" in__transaction_state. The state machine flips fromEmptytoOngoing.Produce(PID, epoch, sequence, records)— the actual data writes. Same RPC as a non-transactional producer's, but the record batch header carriesproducer_id,producer_epoch,base_sequence, andis_transactional=true. Partition leaders accept these and append to their log; the records are physically present but flagged as part of an open transaction.AddOffsetsToTransaction(PID, epoch, group_id)+TxnOffsetCommit(PID, epoch, [topic-partition, offset, ...])— only for the consumer-process-produce loop. The first RPC tells the coordinator "the offsets I'm about to commit for consumer groupgroup_idare part of this transaction". The second RPC actually commits the offsets to__consumer_offsets, but tagged with the transaction.EndTxn(PID, epoch, commit=true|false)— the trigger. Coordinator writesPrepareCommit(orPrepareAbort) to__transaction_state, then issuesWriteTxnMarkersto every participant partition's leader, then writesCompleteCommit(orCompleteAbort).
There's a seventh RPC — WriteTxnMarkers — but it's broker-to-broker. The producer never sees it. It's how the coordinator instructs each participant partition's leader to append the COMMIT/ABORT control batch.
The __transaction_state topic — the durable log
__transaction_state is a normal Kafka topic with two unusual properties: it is compacted (only the latest record per key survives), and it is keyed by transactional.id. Default 50 partitions, replication factor 3 (configurable via transaction.state.log.replication.factor). The leader of partition hash(transactional.id) % 50 is the transaction coordinator for that producer.
Each record value is a serialised TransactionMetadata struct:
TransactionMetadata {
producer_id: int64 # PID, monotonically allocated
last_producer_id: int64 # the previous PID (for fencing)
producer_epoch: int16 # bumped on every InitProducerId
last_producer_epoch: int16
txn_timeout_ms: int32 # how long before coordinator auto-aborts
state: enum {Empty, Ongoing, PrepareCommit, PrepareAbort,
CompleteCommit, CompleteAbort, Dead, PrepareEpochFence}
topic_partitions: list<TopicPartition> # who participates in the open txn
txn_start_timestamp: int64
txn_last_update_timestamp: int64
}
Why compaction and not retention: the transaction state for a given transactional.id is overwritten on every state transition (Empty → Ongoing → PrepareCommit → CompleteCommit). Only the latest matters for recovery. Compaction guarantees that even after years, the latest TransactionMetadata for every active transactional.id is preserved, while obsolete intermediate states are garbage-collected by the cleaner thread. Without compaction, the log would grow unbounded; with retention-by-time, you'd lose state for long-idle producers.
The state machine has nine values, but only six matter for steady-state operation. The transitions are:
InitProducerId
Empty ────────────────────► Empty (epoch++)
│
│ AddPartitionsToTxn
▼
Ongoing ──── EndTxn(commit=true) ──► PrepareCommit ──► CompleteCommit ──► Empty
│
└──── EndTxn(commit=false) ──► PrepareAbort ──► CompleteAbort ──► Empty
Dead is reached when transactional.id.expiration.ms (default 7 days) elapses without activity — the metadata is tombstoned and compacted away. PrepareEpochFence is an internal transient state used to prevent a race during epoch bumping; you'll see it in stack traces but never in steady-state logs.
A wire-protocol-faithful coordinator
The cleanest way to internalise the wire protocol is to write a coordinator that responds to the actual RPC types and records the actual state-machine fields. This is closer to the real broker code than the /wiki/transactional-writes-2pc-wearing-a-hat sketch — fields named the same as in TransactionMetadata.scala, RPCs named the same as in ApiKeys.java.
# txn_wire.py — wire-faithful Kafka transaction coordinator (single broker, single client)
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Optional
class TxnState(IntEnum):
EMPTY = 0
ONGOING = 1
PREPARE_COMMIT = 2
PREPARE_ABORT = 3
COMPLETE_COMMIT = 4
COMPLETE_ABORT = 5
DEAD = 6
PREPARE_EPOCH_FENCE = 7
@dataclass
class TransactionMetadata:
transactional_id: str
producer_id: int = -1
last_producer_id: int = -1
producer_epoch: int = -1
last_producer_epoch: int = -1
txn_timeout_ms: int = 60_000
state: TxnState = TxnState.EMPTY
topic_partitions: list = field(default_factory=list)
@dataclass
class TxnLogRecord:
key: str # transactional.id
value: TransactionMetadata
# The __transaction_state partition log (compacted in real Kafka; here we keep history)
TXN_LOG: list = []
META: dict = {} # in-memory view of latest metadata per transactional.id
NEXT_PID: int = 1000
def find_coordinator(txn_id: str) -> int:
# In real Kafka: hash(txn_id) % 50, return that partition's leader node_id
return hash(txn_id) % 50
def init_producer_id(txn_id: str) -> tuple[int, int]:
global NEXT_PID
meta = META.get(txn_id)
if meta is None:
meta = TransactionMetadata(transactional_id=txn_id, producer_id=NEXT_PID)
NEXT_PID += 1
else:
# Bump epoch — fences any zombie still using the old (PID, epoch)
meta.last_producer_id = meta.producer_id
meta.last_producer_epoch = meta.producer_epoch
meta.producer_epoch += 1
META[txn_id] = meta
TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))
return meta.producer_id, meta.producer_epoch
def add_partitions_to_txn(txn_id: str, pid: int, epoch: int, parts: list):
meta = META[txn_id]
if (pid, epoch) != (meta.producer_id, meta.producer_epoch):
raise Exception("INVALID_PRODUCER_EPOCH (47) — fenced")
if meta.state == TxnState.EMPTY:
meta.state = TxnState.ONGOING
meta.topic_partitions.extend(parts)
TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))
def end_txn(txn_id: str, pid: int, epoch: int, commit: bool):
meta = META[txn_id]
if (pid, epoch) != (meta.producer_id, meta.producer_epoch):
raise Exception("INVALID_PRODUCER_EPOCH (47) — fenced")
# Phase 1: durable commit decision
meta.state = TxnState.PREPARE_COMMIT if commit else TxnState.PREPARE_ABORT
TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))
# Phase 2: WriteTxnMarkers to each participant (simulated)
for tp in meta.topic_partitions:
marker = "COMMIT" if commit else "ABORT"
print(f" WriteTxnMarkers → {tp}: append {marker} control batch (PID={pid}, epoch={epoch})")
# Phase 3: completion
meta.state = TxnState.COMPLETE_COMMIT if commit else TxnState.COMPLETE_ABORT
meta.topic_partitions = []
TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))
def recover():
"""Replay TXN_LOG to rebuild META; finish any in-flight Prepare* state."""
META.clear()
for rec in TXN_LOG:
META[rec.key] = rec.value
for txn_id, meta in META.items():
if meta.state == TxnState.PREPARE_COMMIT:
print(f" [recover] {txn_id}: PrepareCommit → finishing commit")
for tp in meta.topic_partitions:
print(f" WriteTxnMarkers → {tp}: append COMMIT")
meta.state = TxnState.COMPLETE_COMMIT
meta.topic_partitions = []
TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))
# Demo
print(f"Coordinator partition for 'refunds-1': {find_coordinator('refunds-1')}")
pid, epoch = init_producer_id("refunds-1")
print(f"InitProducerId → PID={pid}, epoch={epoch}")
add_partitions_to_txn("refunds-1", pid, epoch, ["refunds-emitted-4", "refund-audit-1"])
print(f"AddPartitionsToTxn → state={META['refunds-1'].state.name}")
end_txn("refunds-1", pid, epoch, commit=True)
print(f"EndTxn → state={META['refunds-1'].state.name}")
# Zombie scenario
try:
add_partitions_to_txn("refunds-1", pid, epoch=1, parts=["x"]) # old epoch
except Exception as e:
print(f"Zombie blocked: {e}")
Coordinator partition for 'refunds-1': 27
InitProducerId → PID=1000, epoch=0
AddPartitionsToTxn → state=ONGOING
WriteTxnMarkers → refunds-emitted-4: append COMMIT control batch (PID=1000, epoch=0)
WriteTxnMarkers → refund-audit-1: append COMMIT control batch (PID=1000, epoch=0)
EndTxn → state=COMPLETE_COMMIT
Zombie blocked: INVALID_PRODUCER_EPOCH (47) — fenced
Walking through the load-bearing lines:
init_producer_idis the fencing point. It bumpsproducer_epochunconditionally on every call. If a zombie still holds(PID, epoch=0)and a new producer instance with the sametransactional.idcallsInitProducerId, the coordinator storesepoch=1in__transaction_state. Any subsequent RPC from the zombie carries(PID, epoch=0)and getsINVALID_PRODUCER_EPOCH(error 47). Why this works without a separate fencing message: every transactional RPC carries(PID, epoch), and the coordinator validates against the latest stored value before doing anything. Fencing is a compare operation, not a send operation. The zombie isn't told it's fenced — it's silently rejected on its next RPC.end_txnwritesPREPARE_COMMITtoTXN_LOGbefore simulatingWriteTxnMarkers. This is the durable commit point. If the coordinator crashed at this exact line, recovery would seePREPARE_COMMITin the log and finish the commit — the transaction is forward-only past this point.recoveris the entire crash-recovery story. It rebuildsMETAfromTXN_LOGand finishes anyPREPARE_*state by writing the markers and advancing toCOMPLETE_*. Real Kafka does this when a__transaction_statepartition leader changes — the new leader replays the partition's log, rebuilds theTransactionMetadatamap, and finishes prepared transactions in the background.TXN_LOGis append-only and keyed bytransactional.id. Compaction (not modelled here) would garbage-collect intermediate states, keeping only the latestTransactionMetadatapertransactional.id. Why intermediate states can be discarded: state-machine recovery only needs the latest state pertransactional.id. If the latest isCOMPLETE_COMMIT, the previousPREPARE_COMMITandONGOINGrecords add no information. Kafka's log cleaner can discard them; the active state remains.
The 80-line model above misses two pieces of real Kafka: per-broker WriteTxnMarkers requests with delivery acks, and transactional.id.expiration.ms cleanup. Add those and you have a faithful single-broker simulator. The state-machine logic is unchanged.
Recovery — what happens when the coordinator dies
The __transaction_state topic is replicated like any topic (default RF=3), so a coordinator partition leader dying triggers a normal Kafka leader election. The new leader becomes the coordinator for every transactional.id that hashes to that partition. Recovery is mechanical:
- Read the partition log from start to high-water mark. Build the in-memory
TransactionMetadatamap, keyed bytransactional.id. - For each
transactional.idwhose latest state isPrepareCommit— issueWriteTxnMarkers(COMMIT)to every participant partition listed intopic_partitions, then writeCompleteCommitto__transaction_state. - For each whose latest state is
PrepareAbort— same, butWriteTxnMarkers(ABORT)andCompleteAbort. - For each whose latest state is
Ongoing— abort it. The producer didn't reachEndTxn, so the new coordinator decides for it. - For each whose latest state is
Empty/Complete*— nothing to do. Steady state. - For each whose
txn_last_update_timestampis older thantransactional.id.expiration.ms— tombstone it. State becomesDead; compaction will remove the record.
The recovery duration is bounded by the number of in-flight transactions for that coordinator partition, not by the partition's log size. A coordinator with 100 active producers and one in-flight transaction each recovers in ~tens of milliseconds. Why this works without a separate quorum protocol: __transaction_state is replicated by the same ISR mechanism every Kafka topic uses. The new leader has the same log up to replica.lag.time.max.ms, which is the bound on data loss. The recovery procedure is just deterministic state-machine replay; it does not need consensus beyond what Kafka's normal replication already provides.
Common confusions
- "
InitProducerIdallocates a new PID on every call." No — it allocates a new PID only the first time atransactional.idis seen. Subsequent calls reuse the existing PID and only bump the epoch. The PID is stable for the lifetime of thetransactional.id(untiltransactional.id.expiration.mselapses). - "The coordinator and the partition leader are the same broker." Not in general. The coordinator is the leader of the
__transaction_statepartition derived fromhash(transactional.id) % 50. Each participant partition has its own leader, possibly on a different broker.WriteTxnMarkersis a broker-to-broker RPC precisely because the coordinator and the participant leaders are usually distinct. - "
EndTxnreturns when markers are written." Close, but the precise rule is:EndTxnreturns whenCompleteCommit(orCompleteAbort) is durable in__transaction_state.WriteTxnMarkersto all participants must succeed before that, because the coordinator waits for marker acks before writingComplete*. So in practice, by the time the producer sees theEndTxnresponse, every participant partition has the marker. - "Compaction in
__transaction_stateis the same as a regular topic." It's the same mechanism but with one wrinkle: tombstones (null values) forDeadtransactions are subject todelete.retention.ms. If you set this too low, an in-flight transaction whose producer just woke up after a long pause might find its metadata already deleted, andInitProducerIdwill allocate a fresh PID — losing fencing. Razorpay learnt this the hard way in 2024 when a misconfigureddelete.retention.ms=60000on a staging cluster let a paused-then-resumed producer duplicate a refund. - "You can have multiple in-flight transactions per
transactional.id." No. Onetransactional.idruns one transaction at a time. The coordinator's state machine has one slot per id. To run more in parallel, use distincttransactional.ids (e.g., one per shard). - "
AddPartitionsToTxnis sent for every record." It's sent only the first time the producer writes to a new partition within the current transaction. Subsequent writes to the same partition skip the RPC. The coordinator already knows the partition is a participant.
Going deeper
What INVALID_TXN_STATE (error 48) actually means
INVALID_TXN_STATE is the broker's way of saying "your RPC doesn't fit my view of your transaction state". Common triggers: calling AddPartitionsToTxn after EndTxn returned but before BeginTransaction, or calling EndTxn(commit=true) when the coordinator has already started auto-aborting because transaction.timeout.ms elapsed. Reading the error code alone is uninformative; correlate with __transaction_state records around the producer's PID. The kafka-transactions.sh --describe-transaction CLI tool reads __transaction_state directly and prints the latest TransactionMetadata for a given transactional.id — the first thing to run when debugging this error.
Why the default __transaction_state partition count is 50
The number of __transaction_state partitions caps the number of independent coordinators. With 50 partitions, you get 50 coordinator slots distributed across brokers; for a 6-broker cluster, that's ~8 coordinators per broker, enough to balance load even if a few brokers are also handling heavy data partitions. Increasing past 50 is rarely justified (each additional partition costs a tiny amount of metadata overhead), but for very large clusters with millions of transactional.ids — Cred's reward-events pipeline runs ~80k distinct transactional.ids — bumping transaction.state.log.num.partitions to 100 or 200 spreads coordinator load. The trade-off is that you cannot decrease this number after cluster creation without a manual log-and-data migration.
How transaction.timeout.ms interacts with consumer-side LSO
A producer that hangs holds its transaction open until transaction.timeout.ms (default 60s, max 15 min) elapses, at which point the coordinator forcibly aborts. During that window, the LSO on every participant partition is pinned at the start of that producer's data — read_committed consumers stall. So the operational implication: the transaction.timeout.ms you set is also the worst-case tail latency seen by every read_committed consumer of every participant partition, on the path of one stuck producer. Setting it to 15 minutes "just to be safe" can mean 15-minute consumer stalls. Zerodha's tick-data pipeline keeps it at 30s for this reason — they would rather have stuck transactions auto-abort and producers retry than have consumer dashboards stall.
The last_producer_id field — handling the rare "double-bump"
When a producer reconnects after a long network partition, it may not know whether the coordinator already bumped its epoch. The coordinator stores last_producer_id and last_producer_epoch precisely to recognise the previous identity. If the producer's InitProducerId arrives with (producer_id=last_producer_id, producer_epoch=last_producer_epoch+1), the coordinator knows this is the legitimate new instance recovering, and assigns the next epoch deterministically. Without this field, two consecutive coordinator failovers could mis-attribute a fence as a new producer. The field exists for the same reason database systems track previous LSN in WAL records: deterministic recovery across failures of the recovery mechanism itself.
Why the producer caches FindCoordinator and what happens when the coordinator moves
The producer caches (transactional.id → coordinator node_id) after the first FindCoordinator call. When the __transaction_state partition's leader moves (broker restart, leader election, partition reassignment), the producer's next RPC to the old leader returns NOT_COORDINATOR (error 16). The producer transparently retries FindCoordinator, gets the new leader, and resends the RPC. From the application's perspective, this is invisible — Producer.send() doesn't return; it just blocks a little longer. The transaction.timeout.ms is the upper bound on how long this can take before the transaction is force-aborted. Watching producer.send() p99 latency for spikes correlated with broker rolls is a useful signal that coordinator failover is happening more than you'd think.
Where this leads next
The protocol is now fully visible. The remaining chapters compose it with real workloads:
- /wiki/end-to-end-source-process-sink — the consumer-process-produce loop, where
sendOffsetsToTransactionties the source-side commit into the same transaction. - /wiki/flinks-two-phase-commit-sink — generalising 2PC to non-Kafka sinks: how Flink wraps an external system in a
TwoPhaseCommitSinkFunctionthat mirrors this protocol's prepare/commit shape. - /wiki/wall-exactly-once-is-a-lie-everyone-tells — what this protocol does not solve, and what "exactly-once" means as soon as the sink is HTTP, MySQL, or S3.
The pattern to internalise: every wire-level RPC has a state-machine consequence in __transaction_state, and every state-machine entry in __transaction_state corresponds to a recovery decision. If you can trace any production failure back to "what was the last state in __transaction_state for this transactional.id, and which RPC was in flight when it crashed" — you can debug any transactional-producer issue.
References
- KIP-98: Exactly Once Delivery and Transactional Messaging — the canonical wire-protocol reference; the schema for every RPC is in the appendix.
- Apache Kafka source:
TransactionMetadata.scala— the actual state-machine implementation; ~600 lines, readable in one sitting. - Apache Kafka source:
TransactionCoordinator.scala— the RPC handlers; trace any RPC to its handler here. - Confluent: Transactions in Apache Kafka — under the hood — Apurva Mehta and Jason Gustafson's protocol walkthrough; the diagrams that this article extends with the RPC swim-lane view.
- Kafka Protocol Guide —
EndTxnRequestand friends — the byte-level wire format for every transactional RPC. - /wiki/transactional-writes-2pc-wearing-a-hat — the conceptual 2PC view; this chapter is the wire-level companion.
- /wiki/idempotent-producers-and-sequence-numbers — the dedup substrate; PID and epoch are introduced there before they become transactional anchors here.