Note: Company names, engineers, incidents, numbers, and scaling scenarios in this article are hypothetical — even when they resemble real ones. See the full disclaimer.

Idempotent producers and sequence numbers

A PaisaBridge payments-events producer is sending merchant settlement records to a Kafka topic at 4,200 events per second when one of its TCP connections to a broker hangs for 1.8 seconds. The client times out, retries, gets an ACK, and moves on. The broker's partition log now has exactly one copy of that record — not zero, not two. No client-side dedup table, no idempotency key chosen by the application. The previous chapter named the three exactly-once guarantees and said Kafka covers the delivery leg. This chapter is how — the sequence number sitting next to every record, the producer-id epoch fencing zombies, and the broker-side bookkeeping that drops duplicate retries silently before the log ever sees them.

Kafka's idempotent producer assigns a (producer_id, partition, sequence_number) triple to every record. The broker tracks the highest sequence number it has accepted per (producer_id, partition) and silently drops any record whose sequence number is <= last_seen. Network retries become safe. The guarantee is per-session and per-partition only — surviving producer restarts requires the transactional layer on top.

What goes wrong without sequence numbers

Picture a producer sending one record. The client serialises the record, opens a TCP connection to the partition leader, writes the bytes, and waits for an ACK. Three things can fail:

  1. The request never reached the broker — TCP timeout before the leader saw the bytes. Safe to retry; no duplicate.
  2. The request reached the broker, the broker appended the record, but the ACK was lost on the way back — leader has the record, client thinks it failed. Retry creates a duplicate.
  3. The leader died after appending but before the ACK — the new leader (an in-sync replica) may or may not have the record depending on acks. Retry may create a duplicate.

Cases 2 and 3 are the painful ones. The producer cannot tell from its side whether the broker has the record. Without protocol-level help, the producer has two choices: retry (and risk duplicates) or don't retry (and risk loss). At-least-once vs at-most-once — the classic dichotomy. Why this is unfixable purely on the client: the client only knows what it sent and what came back. With a lost ACK, "did the broker accept this?" is unobservable from the client. The fix has to live on the broker side, in the form of a memory of what's already been seen.

The pre-KIP-98 Kafka producer (before 0.11, mid-2017) lived with this. Most pipelines deployed at-least-once with downstream dedup — typically a primary key check in the warehouse. That works at low scale and breaks at high scale; the warehouse dedup adds 100ms of latency per record and a write-amplification cost that scales linearly with retry rate.

Why a lost ACK forces a duplicate without sequence numbersA producer sends a record to a broker. The broker appends it and returns an ACK, but the ACK is lost. The producer times out and retries; the broker appends a second copy because it has no way to know this is the same record. Without sequence numbers: lost ACK → duplicate Producer PaisaBridge app Broker partition leader 1. send(record X) 2. ACK (LOST in transit) 3. retry send(record X) → broker appends X TWICE in the log
The producer cannot distinguish "broker never saw it" from "ACK was lost". Without a record-level identifier the broker recognises, the safe choice for the producer is to retry, and retries duplicate.

The sequence number contract

KIP-98 (Apache Kafka, 0.11, 2017) added two tiny pieces of state per producer instance:

  • producer_id (PID): a 64-bit integer the broker assigns to a producer when it first connects with enable.idempotence=true. Unique per producer session.
  • sequence_number: a 32-bit integer the producer assigns per (producer_id, partition). Starts at 0; increments by 1 for every record sent to that partition. Independent counters per partition.

When the producer sends a record, the client library tags it with (producer_id, partition, sequence_number). The broker, on receiving a record for partition P from producer PID, checks its in-memory map last_seen[PID][P]:

  • If sequence_number == last_seen[PID][P] + 1: append the record. Update last_seen[PID][P] = sequence_number. Return ACK with the offset.
  • If sequence_number <= last_seen[PID][P]: this is a duplicate retry. Do not append. Return the original offset (the broker remembers offsets for the last 5 in-flight requests per producer).
  • If sequence_number > last_seen[PID][P] + 1: there is a gap — the producer skipped a sequence number. This is an OutOfOrderSequenceException; the broker rejects the record and the producer must reset and retry from the last good sequence.

The contract is a sliding window of 5 in-flight requests by default (max.in.flight.requests.per.connection=5). The broker keeps the last 5 sequence numbers and their offsets per (PID, partition); out-of-order delivery within that window is handled, beyond it is rejected. Why the window is 5: small enough that the broker's per-producer state stays bounded (5 entries × number of partitions × number of producers = a few hundred KB on a busy cluster), and large enough that the producer can pipeline writes without blocking on each ACK. Going above 5 used to require disabling idempotence; since Kafka 1.0 the broker tracks up to 5 regardless of the producer's max.in.flight setting, and the producer caps itself at 5 when idempotence is on.

The check is per-partition because Kafka's ordering guarantee is per-partition. Sequence numbers across different partitions are independent — partition 7 doesn't care that partition 3 just got sequence 1,42,000.

A working idempotent broker, in 60 lines of Python

The cleanest way to internalise the sequence-number protocol is to write the broker side. Here's a single-partition idempotent broker that mimics what a real Kafka broker does for (PID, P) dedup:

# idempotent_broker.py — single-partition broker with sequence-number dedup
from collections import defaultdict, deque
from dataclasses import dataclass, field

@dataclass
class Record:
    producer_id: int
    sequence_number: int
    payload: dict

@dataclass
class Broker:
    log: list = field(default_factory=list)
    # last_seen[(producer_id, partition)] = highest accepted sequence_number
    last_seen: dict = field(default_factory=lambda: defaultdict(lambda: -1))
    # cache of last 5 seq -> offset, per (pid, partition), for retry idempotence
    seq_to_offset: dict = field(default_factory=lambda: defaultdict(lambda: deque(maxlen=5)))

    def append(self, partition: int, record: Record):
        key = (record.producer_id, partition)
        last = self.last_seen[key]
        seq = record.sequence_number

        # Duplicate retry — return the original offset
        for cached_seq, cached_off in self.seq_to_offset[key]:
            if cached_seq == seq:
                return {"status": "DUPLICATE", "offset": cached_off, "appended": False}

        # Out-of-order — gap detected, reject
        if seq != last + 1:
            return {"status": "OUT_OF_ORDER", "expected": last + 1, "got": seq, "appended": False}

        # First-time append
        offset = len(self.log)
        self.log.append(record)
        self.last_seen[key] = seq
        self.seq_to_offset[key].append((seq, offset))
        return {"status": "OK", "offset": offset, "appended": True}

# Simulate a PaisaBridge payments producer with PID=42 sending to partition 0
broker = Broker()
PID, PART = 42, 0
records = [Record(PID, i, {"merchant": "razorpay-test", "amount": 100 + i * 50})
           for i in range(5)]

# Normal send of 0..4
for r in records:
    print(broker.append(PART, r))

# Now simulate a lost-ACK retry: producer thinks seq=2 failed, retries it
print("--- retry of seq=2 (lost ACK case) ---")
print(broker.append(PART, records[2]))

# And an out-of-order accident: producer sends seq=10, skipping 5..9
print("--- out-of-order seq=10 ---")
print(broker.append(PART, Record(PID, 10, {"merchant": "razorpay-test", "amount": 999})))
{'status': 'OK', 'offset': 0, 'appended': True}
{'status': 'OK', 'offset': 1, 'appended': True}
{'status': 'OK', 'offset': 2, 'appended': True}
{'status': 'OK', 'offset': 3, 'appended': True}
{'status': 'OK', 'offset': 4, 'appended': True}
--- retry of seq=2 (lost ACK case) ---
{'status': 'DUPLICATE', 'offset': 2, 'appended': False}
--- retry of seq=10 (out-of-order) ---
{'status': 'OUT_OF_ORDER', 'expected': 5, 'got': 10, 'appended': False}

Walking through the load-bearing lines:

  • last_seen = defaultdict(lambda: -1) — the broker initialises last-seen to -1 so the first record (sequence 0) passes the seq == last + 1 check. Real Kafka uses the same convention.
  • seq_to_offset = defaultdict(lambda: deque(maxlen=5)) — the deque caps at 5 entries per (PID, partition). This is the in-flight window; older sequence numbers fall out and a much-later retry would be rejected as out-of-order rather than recognised as duplicate. Why bounded: keeping every sequence number forever would mean per-producer broker state growing without limit. The 5-entry window matches the producer's in-flight bound, so any retry the producer is allowed to issue is a retry the broker can recognise.
  • The duplicate check (if cached_seq == seq) returns the original offset. This is critical: the producer's callback gets the same offset for the original send and the retry, so application code that records "this record went to offset 2" stays correct even when the retry happens. Real Kafka's RecordMetadata.offset() is exactly this value.
  • OUT_OF_ORDER is fatal for that producer session. The producer must call init_producer_id() again (which assigns a new PID) and resend everything from the gap. In practice this only happens when the producer has a bug or when a transactional commit went sideways.

This 60-line model is faithful to the per-partition contract. Real Kafka's implementation lives in ProducerStateManager.scala (broker side) and RecordAccumulator.java (producer side), but the algorithm is the same. The dedup happens in the broker's append path, before the record is written to the log segment.

Producer-id epoch and zombie fencing

The producer-id alone handles network retries within one producer session. It does not handle producer process restarts. A new producer instance — even with the same client config — gets a fresh PID from the broker. The broker has no way to associate the new PID with the old one, so any record the new producer sends starts a fresh sequence space, and records from a zombie old producer (still alive, still holding a TCP connection) can interleave with the new producer's records.

The transactional layer adds an epoch number to the PID. When a producer with transactional.id="razorpay-payments-1" calls initTransactions(), the broker:

  1. Looks up any existing PID for that transactional.id.
  2. Bumps the epoch on that PID.
  3. Returns (PID, new_epoch) to the producer.

Now every record carries (PID, epoch, sequence_number). The broker rejects any record from (PID, old_epoch, *) with InvalidProducerEpoch. The zombie's TCP connection stays open, but every send fails. The new producer's records succeed. Why this matters at scale: in Kubernetes, pod restarts happen routinely — health-check failures, rolling deploys, OOM kills. Without epoch fencing, a restarted PaisaBridge payments producer would compete with its own zombie for 30–60 seconds (until the zombie's TCP connection times out), and that window is exactly where duplicates would creep in.

The transactional.id is the application-chosen, stable identifier — typically derived from the deployment topology (e.g., payments-producer-shard-7). It survives restarts; the PID and epoch don't, but the broker maintains the mapping transactional.id → (PID, current_epoch) durably, so a restart picks up the right PID with a bumped epoch.

Producer-id epoch fencing during a producer restartA timeline showing two producer instances with the same transactional.id. The first instance has PID=42, epoch=3. It crashes. The second instance starts, calls initTransactions, gets PID=42, epoch=4. The first instance becomes a zombie; its sends are rejected with InvalidProducerEpoch. Epoch fencing: how the broker tells the zombie apart time producer instance A PID=42 epoch=3 A crashes (k8s OOM) but TCP still up briefly producer instance B PID=42 epoch=4 (bumped) zombie A sends record → InvalidProducerEpoch (rejected) transactional.id = "payments-producer-shard-7" stable across restarts; PID and epoch are not
The transactional.id is the application's stable identifier; the broker maps it to (PID, epoch). On a restart, the new instance bumps the epoch and any record from the old (lower) epoch is fenced out. This is what makes idempotent delivery survive process restarts.

What the producer config actually does

When you set enable.idempotence=true in a Kafka producer, the client library implicitly sets four things — most users don't realise the bundle:

Config Default with idempotence Why
acks all Without acks=all the leader can ACK before replication; a leader failover loses the record and the dedup state is meaningless.
retries Integer.MAX_VALUE Idempotence is pointless if the producer gives up. With dedup, infinite retry is safe.
max.in.flight.requests.per.connection <= 5 The broker's window. More than 5 means the broker can't dedup a retry that arrives outside the window.
enable.idempotence true The producer asks the broker for a PID via InitProducerId on first connection.

acks=all is the load-bearing default. It means the leader waits for every in-sync replica to acknowledge the append before sending the producer ACK. With acks=1 (leader only) a leader crash between append and replication loses the record; the producer's retry would re-send it, the new leader would assign a fresh sequence number to it (because the old leader's last_seen state died with it), and dedup wouldn't kick in. Why dedup state lives on the leader: it's per-partition, and partition leadership can move. To survive leader failover, the dedup state is replicated as part of the partition's ProducerStateManager snapshot — every in-sync replica has the same last_seen[PID] map. When a follower becomes leader, it has the dedup state ready and the producer's retries are still recognised.

The producer-side correlate is enable.idempotence=true. Setting it triggers a InitProducerIdRequest on first connection; the broker assigns a PID and the producer stores it. From then on, every ProduceRequest carries (PID, epoch, base_sequence_number) per partition.

Common confusions

  • "Idempotent producer means I can't get duplicates anywhere." No — only within one producer session, per partition, within the 5-record window. Producer restarts, network partitions that exceed the window, and writes that span multiple partitions are not covered. End-to-end exactly-once needs the transactional layer plus a 2PC sink.
  • "Sequence numbers are like message IDs that the application chooses." No — they're chosen by the producer client library, monotonically per partition, invisible to the application. You don't set them; you get them. The application's idempotency key (if any) is a separate concern that lives in the record's payload or headers.
  • "enable.idempotence=true is enough; I don't need transactions." Enough for the single-producer-session case. Not enough if your producer pod restarts (zombie fencing needs transactional.id), or if your write spans multiple partitions atomically (transactions span partitions, idempotence does not), or if your downstream consumer needs read_committed isolation.
  • "The broker stores sequence numbers forever." No — only the last 5 per (PID, partition), and the PID itself is evicted after transactional.id.expiration.ms (default 7 days) of inactivity. After eviction, a producer with the same transactional.id gets a fresh PID and sequence space.
  • "Two producers with the same transactional.id will both work." No — the second one's initTransactions() call bumps the epoch and fences the first. This is by design; running two instances with the same transactional.id is a configuration bug, not a load-balancing strategy.
  • "The idempotent producer prevents application-level duplicates." No — it prevents transport-level duplicates only. If your application reads from a database and re-sends the same row to Kafka due to its own retry logic, the producer will assign different sequence numbers to those two sends and both will land in the log. Application-level idempotency needs an idempotency key in the payload.

Going deeper

The protocol on the wire

The ProduceRequest v3+ protocol (introduced by KIP-98) carries the idempotence triple in the record batch header, not per-record. A single batch sent to one partition shares one (PID, epoch, base_sequence) and the records inside it have implicit sequence numbers base_sequence + i for the i-th record. The broker validates the entire batch atomically: if base_sequence == last_seen + 1, all records in the batch are appended and last_seen is bumped to base_sequence + batch_size - 1. If base_sequence <= last_seen and the entire batch's sequence range is within the 5-batch cache, it's recognised as a duplicate batch and the original offsets are returned. Mixed-state batches (some records new, some duplicates) are not allowed — the protocol assumes batches are atomic. The Kafka source code reference is ProducerStateManager.scala line ~250 (the validate method); read that file in conjunction with RecordAccumulator.java on the producer side and the picture becomes complete.

Why the in-flight window is exactly 5

The window size 5 is a balance between three pressures: (1) producer throughput — bigger window means more pipelining; (2) broker memory — every (PID, partition) tuple costs ~64 bytes of state, and at 100k producers × 1000 partitions that's 6 GB just for dedup state; (3) the order-preservation invariant — the producer must guarantee that records are appended in sequence-number order, which means a retry of a low-sequence record must not arrive after a higher-sequence record has been appended, and the window bounds the time during which that "could happen". KIP-679 (2021) increased the practical window in some scenarios via "improved producer idempotence", but the public-facing default is still 5. Some workloads (high-frequency trading at SetuBank Securities, e.g.) deliberately set max.in.flight=1 to get strict ordering at the cost of throughput; idempotence still works, just with no pipelining.

The PaisaBridge PID-eviction gotcha

In a 2024 incident, a PaisaBridge batch reconciliation producer ran for 8 days without producing any record, then resumed during EOD settlement processing. Its transactional.id had been evicted from the broker (default 7-day expiration), so initTransactions() returned a new PID with epoch 0. The producer sent record with sequence 0 — but the topic partition's last_seen for the new PID was still -1 (fresh slate), so the record was accepted. Ordinarily fine, except the application logic assumed sequence numbers were monotonic across the producer's lifetime — they weren't, because the PID itself had changed. Fix: bump transactional.id.expiration.ms to 30 days for low-frequency producers, or move the eviction-recovery logic into the application's startup hook so the producer notices a PID change and resets its application-level dedup hash. This is the kind of edge that only shows up at scale and only after long quiet periods — a good reason to audit your transactional.id.expiration.ms setting if you have producers with bursty workloads.

Comparing Kafka's PID with Pulsar's broker-side dedup

Apache Pulsar takes a different approach to the same problem: instead of producer-assigned sequence numbers, the producer specifies a producer name and Pulsar's broker maintains a (producer_name, sequence_id) cursor. The cursor is application-controllable — the producer can set the sequence ID, useful for resuming from a known checkpoint after a crash. Pulsar's mechanism is closer to "the application owns the dedup key"; Kafka's is "the client library owns the dedup key". The trade-off: Pulsar's approach lets the application coordinate dedup across producer restarts without needing a transactional ID, but the application bears more correctness burden. Kafka's approach is more opinionated and more turnkey. For most Indian fintech workloads I've seen, Kafka's design is the easier-to-get-right default; for systems where the application already has a strong record identifier (e.g., a UPI transaction ID), Pulsar's design fits more naturally.

What the producer does on OutOfOrderSequenceException

When a producer receives OutOfOrderSequenceException from the broker, it knows its in-flight pipeline has gone wrong — typically because a request older than the 5-batch window was retried after newer batches succeeded, or because the producer's local sequence counter desynced from the broker's view. The default behaviour as of Kafka 2.5+ is to abort all in-flight batches and call InitProducerId again, which assigns a new PID. Application records that were in-flight when the exception fired are surfaced to the application's send callback as failed; the application is responsible for deciding whether to re-enqueue them. Crucially, this is not a "duplicates were written" exception — it's a "the dedup contract is broken, start over" exception. The records that triggered it may or may not have been appended; the application has to be prepared for either, which is why the recommended pattern is to write through a transactional producer with a stable transactional.id so the new PID inherits the previous transactional state and dedup remains coherent across the restart.

Where this leads next

The idempotent producer is the delivery leg of end-to-end exactly-once. The next chapters cover the other two legs:

When you read a Flink or Kafka Streams job config and see enable.idempotence=true set, you now know the exact contract: per-session, per-partition, 5-record window, with epoch fencing if transactional.id is also set. That's the bedrock the rest of the EOS stack sits on.

References