Kafka transactions under the hood

A Razorpay payments engineer is staring at a heap dump from a producer that hung for 47 seconds before throwing ProducerFencedException. She knows the protocol from the /wiki/transactional-writes-2pc-wearing-a-hat chapter — prepare, commit, markers — but knowing the protocol is not the same as knowing which RPC was in flight when the producer was fenced, what bytes were on the wire, or which broker was the coordinator. To debug a transactional producer in production, you need the wire-level view: the six request types, the __transaction_state record format, and the recovery state machine.

A Kafka transaction is six RPCs (FindCoordinator, InitProducerId, AddPartitionsToTxn, Produce, AddOffsetsToTxn, EndTxn) plus a control-batch write the broker performs internally (WriteTxnMarkers). All durable state lives in __transaction_state, a 50-partition compacted topic keyed by transactional.id. Recovery is mechanical: read the log, replay state transitions, finish whatever was in PrepareCommit or PrepareAbort, abort everything else.

The six RPCs in order

Every transactional producer makes the same RPC sequence per transaction. The order is fixed by the protocol — running them out of order returns INVALID_TXN_STATE (error code 48). The full list, in the order a transactional producer issues them:

  1. FindCoordinator(transactional.id) — bootstrap. Producer asks any broker "who's the coordinator for my transactional.id?". The broker hashes the id to a __transaction_state partition (default 50 partitions), looks up that partition's leader, returns its node_id and host:port. Once. Cached for the producer's lifetime.
  2. InitProducerId(transactional.id, transaction.timeout.ms) — handshake. Coordinator allocates a producer_id (PID, a 64-bit integer), bumps producer_epoch by 1, persists (transactional.id → PID, epoch) to __transaction_state, returns (PID, epoch) to the producer. This is also where zombies get fenced: the bumped epoch invalidates any older producer instance still trying to write.
  3. AddPartitionsToTxn(PID, epoch, [topic-partition, ...]) — once per topic-partition the transaction will touch. Coordinator records "this PID's open transaction now includes partition X" in __transaction_state. The state machine flips from Empty to Ongoing.
  4. Produce(PID, epoch, sequence, records) — the actual data writes. Same RPC as a non-transactional producer's, but the record batch header carries producer_id, producer_epoch, base_sequence, and is_transactional=true. Partition leaders accept these and append to their log; the records are physically present but flagged as part of an open transaction.
  5. AddOffsetsToTransaction(PID, epoch, group_id) + TxnOffsetCommit(PID, epoch, [topic-partition, offset, ...]) — only for the consumer-process-produce loop. The first RPC tells the coordinator "the offsets I'm about to commit for consumer group group_id are part of this transaction". The second RPC actually commits the offsets to __consumer_offsets, but tagged with the transaction.
  6. EndTxn(PID, epoch, commit=true|false) — the trigger. Coordinator writes PrepareCommit (or PrepareAbort) to __transaction_state, then issues WriteTxnMarkers to every participant partition's leader, then writes CompleteCommit (or CompleteAbort).

There's a seventh RPC — WriteTxnMarkers — but it's broker-to-broker. The producer never sees it. It's how the coordinator instructs each participant partition's leader to append the COMMIT/ABORT control batch.

The six RPCs of a Kafka transaction, on the wireA sequence diagram with three swim lanes — producer, transaction coordinator, and a participant partition leader — showing the order of FindCoordinator, InitProducerId, AddPartitionsToTxn, Produce, EndTxn, and WriteTxnMarkers RPCs across the lifecycle of one transaction. One transaction, six client RPCs (plus one broker-to-broker) producer txn coordinator partition leader 1. FindCoordinator(txn.id) 2. InitProducerId → PID, epoch 3. AddPartitionsToTxn(PID, [p4, p1]) 4. Produce(PID, epoch, seq, records, is_transactional=true) 5. EndTxn(PID, epoch, commit=true) 6. WriteTxnMarkers (broker→broker) appends COMMIT control batch EndTxn response: ok PrepareCommit / CompleteCommit written to __transaction_state
The wire view of a single transaction. The producer sees six RPCs; the WriteTxnMarkers RPC happens broker-to-broker after the coordinator has durably written PrepareCommit. EndTxn does not return until CompleteCommit is durable.

The __transaction_state topic — the durable log

__transaction_state is a normal Kafka topic with two unusual properties: it is compacted (only the latest record per key survives), and it is keyed by transactional.id. Default 50 partitions, replication factor 3 (configurable via transaction.state.log.replication.factor). The leader of partition hash(transactional.id) % 50 is the transaction coordinator for that producer.

Each record value is a serialised TransactionMetadata struct:

TransactionMetadata {
  producer_id: int64           # PID, monotonically allocated
  last_producer_id: int64      # the previous PID (for fencing)
  producer_epoch: int16        # bumped on every InitProducerId
  last_producer_epoch: int16
  txn_timeout_ms: int32        # how long before coordinator auto-aborts
  state: enum {Empty, Ongoing, PrepareCommit, PrepareAbort,
               CompleteCommit, CompleteAbort, Dead, PrepareEpochFence}
  topic_partitions: list<TopicPartition>   # who participates in the open txn
  txn_start_timestamp: int64
  txn_last_update_timestamp: int64
}

Why compaction and not retention: the transaction state for a given transactional.id is overwritten on every state transition (Empty → Ongoing → PrepareCommit → CompleteCommit). Only the latest matters for recovery. Compaction guarantees that even after years, the latest TransactionMetadata for every active transactional.id is preserved, while obsolete intermediate states are garbage-collected by the cleaner thread. Without compaction, the log would grow unbounded; with retention-by-time, you'd lose state for long-idle producers.

The state machine has nine values, but only six matter for steady-state operation. The transitions are:

                   InitProducerId
       Empty ────────────────────► Empty (epoch++)
         │
         │ AddPartitionsToTxn
         ▼
       Ongoing ──── EndTxn(commit=true)  ──► PrepareCommit ──► CompleteCommit ──► Empty
         │
         └──── EndTxn(commit=false) ──► PrepareAbort ──► CompleteAbort ──► Empty

Dead is reached when transactional.id.expiration.ms (default 7 days) elapses without activity — the metadata is tombstoned and compacted away. PrepareEpochFence is an internal transient state used to prevent a race during epoch bumping; you'll see it in stack traces but never in steady-state logs.

A wire-protocol-faithful coordinator

The cleanest way to internalise the wire protocol is to write a coordinator that responds to the actual RPC types and records the actual state-machine fields. This is closer to the real broker code than the /wiki/transactional-writes-2pc-wearing-a-hat sketch — fields named the same as in TransactionMetadata.scala, RPCs named the same as in ApiKeys.java.

# txn_wire.py — wire-faithful Kafka transaction coordinator (single broker, single client)
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Optional

class TxnState(IntEnum):
    EMPTY = 0
    ONGOING = 1
    PREPARE_COMMIT = 2
    PREPARE_ABORT = 3
    COMPLETE_COMMIT = 4
    COMPLETE_ABORT = 5
    DEAD = 6
    PREPARE_EPOCH_FENCE = 7

@dataclass
class TransactionMetadata:
    transactional_id: str
    producer_id: int = -1
    last_producer_id: int = -1
    producer_epoch: int = -1
    last_producer_epoch: int = -1
    txn_timeout_ms: int = 60_000
    state: TxnState = TxnState.EMPTY
    topic_partitions: list = field(default_factory=list)

@dataclass
class TxnLogRecord:
    key: str           # transactional.id
    value: TransactionMetadata

# The __transaction_state partition log (compacted in real Kafka; here we keep history)
TXN_LOG: list = []
META: dict = {}        # in-memory view of latest metadata per transactional.id
NEXT_PID: int = 1000

def find_coordinator(txn_id: str) -> int:
    # In real Kafka: hash(txn_id) % 50, return that partition's leader node_id
    return hash(txn_id) % 50

def init_producer_id(txn_id: str) -> tuple[int, int]:
    global NEXT_PID
    meta = META.get(txn_id)
    if meta is None:
        meta = TransactionMetadata(transactional_id=txn_id, producer_id=NEXT_PID)
        NEXT_PID += 1
    else:
        # Bump epoch — fences any zombie still using the old (PID, epoch)
        meta.last_producer_id = meta.producer_id
        meta.last_producer_epoch = meta.producer_epoch
    meta.producer_epoch += 1
    META[txn_id] = meta
    TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))
    return meta.producer_id, meta.producer_epoch

def add_partitions_to_txn(txn_id: str, pid: int, epoch: int, parts: list):
    meta = META[txn_id]
    if (pid, epoch) != (meta.producer_id, meta.producer_epoch):
        raise Exception("INVALID_PRODUCER_EPOCH (47) — fenced")
    if meta.state == TxnState.EMPTY:
        meta.state = TxnState.ONGOING
    meta.topic_partitions.extend(parts)
    TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))

def end_txn(txn_id: str, pid: int, epoch: int, commit: bool):
    meta = META[txn_id]
    if (pid, epoch) != (meta.producer_id, meta.producer_epoch):
        raise Exception("INVALID_PRODUCER_EPOCH (47) — fenced")
    # Phase 1: durable commit decision
    meta.state = TxnState.PREPARE_COMMIT if commit else TxnState.PREPARE_ABORT
    TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))
    # Phase 2: WriteTxnMarkers to each participant (simulated)
    for tp in meta.topic_partitions:
        marker = "COMMIT" if commit else "ABORT"
        print(f"  WriteTxnMarkers → {tp}: append {marker} control batch (PID={pid}, epoch={epoch})")
    # Phase 3: completion
    meta.state = TxnState.COMPLETE_COMMIT if commit else TxnState.COMPLETE_ABORT
    meta.topic_partitions = []
    TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))

def recover():
    """Replay TXN_LOG to rebuild META; finish any in-flight Prepare* state."""
    META.clear()
    for rec in TXN_LOG:
        META[rec.key] = rec.value
    for txn_id, meta in META.items():
        if meta.state == TxnState.PREPARE_COMMIT:
            print(f"  [recover] {txn_id}: PrepareCommit → finishing commit")
            for tp in meta.topic_partitions:
                print(f"    WriteTxnMarkers → {tp}: append COMMIT")
            meta.state = TxnState.COMPLETE_COMMIT
            meta.topic_partitions = []
            TXN_LOG.append(TxnLogRecord(key=txn_id, value=meta))

# Demo
print(f"Coordinator partition for 'refunds-1': {find_coordinator('refunds-1')}")
pid, epoch = init_producer_id("refunds-1")
print(f"InitProducerId → PID={pid}, epoch={epoch}")

add_partitions_to_txn("refunds-1", pid, epoch, ["refunds-emitted-4", "refund-audit-1"])
print(f"AddPartitionsToTxn → state={META['refunds-1'].state.name}")

end_txn("refunds-1", pid, epoch, commit=True)
print(f"EndTxn → state={META['refunds-1'].state.name}")

# Zombie scenario
try:
    add_partitions_to_txn("refunds-1", pid, epoch=1, parts=["x"])  # old epoch
except Exception as e:
    print(f"Zombie blocked: {e}")
Coordinator partition for 'refunds-1': 27
InitProducerId → PID=1000, epoch=0
AddPartitionsToTxn → state=ONGOING
  WriteTxnMarkers → refunds-emitted-4: append COMMIT control batch (PID=1000, epoch=0)
  WriteTxnMarkers → refund-audit-1: append COMMIT control batch (PID=1000, epoch=0)
EndTxn → state=COMPLETE_COMMIT
Zombie blocked: INVALID_PRODUCER_EPOCH (47) — fenced

Walking through the load-bearing lines:

The 80-line model above misses two pieces of real Kafka: per-broker WriteTxnMarkers requests with delivery acks, and transactional.id.expiration.ms cleanup. Add those and you have a faithful single-broker simulator. The state-machine logic is unchanged.

Recovery — what happens when the coordinator dies

The __transaction_state topic is replicated like any topic (default RF=3), so a coordinator partition leader dying triggers a normal Kafka leader election. The new leader becomes the coordinator for every transactional.id that hashes to that partition. Recovery is mechanical:

  1. Read the partition log from start to high-water mark. Build the in-memory TransactionMetadata map, keyed by transactional.id.
  2. For each transactional.id whose latest state is PrepareCommit — issue WriteTxnMarkers(COMMIT) to every participant partition listed in topic_partitions, then write CompleteCommit to __transaction_state.
  3. For each whose latest state is PrepareAbort — same, but WriteTxnMarkers(ABORT) and CompleteAbort.
  4. For each whose latest state is Ongoing — abort it. The producer didn't reach EndTxn, so the new coordinator decides for it.
  5. For each whose latest state is Empty / Complete* — nothing to do. Steady state.
  6. For each whose txn_last_update_timestamp is older than transactional.id.expiration.ms — tombstone it. State becomes Dead; compaction will remove the record.

The recovery duration is bounded by the number of in-flight transactions for that coordinator partition, not by the partition's log size. A coordinator with 100 active producers and one in-flight transaction each recovers in ~tens of milliseconds. Why this works without a separate quorum protocol: __transaction_state is replicated by the same ISR mechanism every Kafka topic uses. The new leader has the same log up to replica.lag.time.max.ms, which is the bound on data loss. The recovery procedure is just deterministic state-machine replay; it does not need consensus beyond what Kafka's normal replication already provides.

Coordinator recovery: replay __transaction_state and finish prepared transactionsA timeline showing the __transaction_state partition log entries for two transactional ids, with a coordinator failure marked, and the new coordinator's recovery actions: finish the PrepareCommit transaction, abort the Ongoing transaction. Recovery: replay the log, finish what was prepared __transaction_state partition 27 (entries shown chronologically): refunds-1: Ongoing parts=[p4, p1] refunds-1: PrepareCommit durable decision audit-3: Ongoing parts=[p7] coordinator dies here new coordinator (after leader election): 1. Read partition 27 log → rebuild META: {refunds-1: PrepareCommit, audit-3: Ongoing} 2. refunds-1 in PrepareCommit → WriteTxnMarkers(COMMIT) to p4, p1; write CompleteCommit 3. audit-3 in Ongoing (no decision) → coordinator aborts: PrepareAbort, WriteTxnMarkers(ABORT) to p7, CompleteAbort 4. Background: tombstone any txn.id idle > transactional.id.expiration.ms (default 7d) recovery time bounded by count of in-flight transactions, not by log size (compaction keeps it small)
Coordinator recovery is deterministic state-machine replay over a compacted log. PrepareCommit finishes; Ongoing aborts; the rest is steady-state.

Common confusions

Going deeper

What INVALID_TXN_STATE (error 48) actually means

INVALID_TXN_STATE is the broker's way of saying "your RPC doesn't fit my view of your transaction state". Common triggers: calling AddPartitionsToTxn after EndTxn returned but before BeginTransaction, or calling EndTxn(commit=true) when the coordinator has already started auto-aborting because transaction.timeout.ms elapsed. Reading the error code alone is uninformative; correlate with __transaction_state records around the producer's PID. The kafka-transactions.sh --describe-transaction CLI tool reads __transaction_state directly and prints the latest TransactionMetadata for a given transactional.id — the first thing to run when debugging this error.

Why the default __transaction_state partition count is 50

The number of __transaction_state partitions caps the number of independent coordinators. With 50 partitions, you get 50 coordinator slots distributed across brokers; for a 6-broker cluster, that's ~8 coordinators per broker, enough to balance load even if a few brokers are also handling heavy data partitions. Increasing past 50 is rarely justified (each additional partition costs a tiny amount of metadata overhead), but for very large clusters with millions of transactional.ids — Cred's reward-events pipeline runs ~80k distinct transactional.ids — bumping transaction.state.log.num.partitions to 100 or 200 spreads coordinator load. The trade-off is that you cannot decrease this number after cluster creation without a manual log-and-data migration.

How transaction.timeout.ms interacts with consumer-side LSO

A producer that hangs holds its transaction open until transaction.timeout.ms (default 60s, max 15 min) elapses, at which point the coordinator forcibly aborts. During that window, the LSO on every participant partition is pinned at the start of that producer's data — read_committed consumers stall. So the operational implication: the transaction.timeout.ms you set is also the worst-case tail latency seen by every read_committed consumer of every participant partition, on the path of one stuck producer. Setting it to 15 minutes "just to be safe" can mean 15-minute consumer stalls. Zerodha's tick-data pipeline keeps it at 30s for this reason — they would rather have stuck transactions auto-abort and producers retry than have consumer dashboards stall.

The last_producer_id field — handling the rare "double-bump"

When a producer reconnects after a long network partition, it may not know whether the coordinator already bumped its epoch. The coordinator stores last_producer_id and last_producer_epoch precisely to recognise the previous identity. If the producer's InitProducerId arrives with (producer_id=last_producer_id, producer_epoch=last_producer_epoch+1), the coordinator knows this is the legitimate new instance recovering, and assigns the next epoch deterministically. Without this field, two consecutive coordinator failovers could mis-attribute a fence as a new producer. The field exists for the same reason database systems track previous LSN in WAL records: deterministic recovery across failures of the recovery mechanism itself.

Why the producer caches FindCoordinator and what happens when the coordinator moves

The producer caches (transactional.id → coordinator node_id) after the first FindCoordinator call. When the __transaction_state partition's leader moves (broker restart, leader election, partition reassignment), the producer's next RPC to the old leader returns NOT_COORDINATOR (error 16). The producer transparently retries FindCoordinator, gets the new leader, and resends the RPC. From the application's perspective, this is invisible — Producer.send() doesn't return; it just blocks a little longer. The transaction.timeout.ms is the upper bound on how long this can take before the transaction is force-aborted. Watching producer.send() p99 latency for spikes correlated with broker rolls is a useful signal that coordinator failover is happening more than you'd think.

Where this leads next

The protocol is now fully visible. The remaining chapters compose it with real workloads:

The pattern to internalise: every wire-level RPC has a state-machine consequence in __transaction_state, and every state-machine entry in __transaction_state corresponds to a recovery decision. If you can trace any production failure back to "what was the last state in __transaction_state for this transactional.id, and which RPC was in flight when it crashed" — you can debug any transactional-producer issue.

References