Consumer groups and offset management

Last chapter you split a topic into 64 partitions so producers could write in parallel. The reads are still a single Python process pulling from partition 0. To process all 64 partitions in parallel you need 64 consumers — and you need them to share the load without two consumers ever reading the same partition, without losing any record when one of them dies, and without rewinding the topic when one of them restarts. That coordination has a name: the consumer group.

A consumer group is N consumer processes sharing the same group.id. Kafka's group coordinator assigns each partition to exactly one consumer in the group, and stores per-(group, partition) offsets in the internal __consumer_offsets topic. The contract is "wherever this group left off, that's where it resumes". The two failure modes that bite production: rebalances that pause processing for seconds, and offset commits that race ahead of the work — committing offset 4203 before the row at 4203 actually hit Postgres means a crash silently drops the row.

What a consumer group is

A consumer group is a string. You set group.id="fraud-rules-v2" on a consumer and Kafka binds you to a coordinator. Every consumer with the same group.id is in the same group; every consumer with a different group.id is reading the topic independently, with its own offset cursor. This is how Razorpay runs three teams off one payments.tx_events topic: fraud reads with group.id="fraud-rules-v2", the analytics team reads with group.id="analytics-warehouse", and the audit pipeline reads with group.id="compliance-audit". Each group sees every record; each group remembers its own progress.

Inside one group, the rule is the inverse: one partition, one consumer. The group coordinator (a designated broker per group) computes an assignment — a map from each partition to one consumer — and tells every consumer which partitions it owns. With 6 partitions and 2 consumers, each consumer owns 3 partitions. With 6 partitions and 8 consumers, 6 consumers own 1 partition each and 2 sit idle. Why one-to-one and not many-to-one: per-partition order is the contract from the previous chapter, and order requires a single reader. Two consumers reading the same partition would race on offset commits — one would commit offset 100, the other offset 80, and on restart the group would either re-process 20 records or drop them. The single-reader rule sidesteps that race entirely.

The producer side from the previous chapter and the consumer side here are deliberately symmetric. The producer hashes a key and picks a partition; the consumer joins a group and receives a partition. In both directions the partition count N is the parallelism dial — N producers can write in parallel, N consumers can read in parallel, no more on either end without breaking ordering. The coordinator broker is the only stateful piece in the consumer-side picture; it knows the current group membership, the current assignment, and the current committed offset for each (group, topic, partition) triple.

There are three reasons to put a consumer in a group rather than reading partitions directly. The first is automatic load balancing: as you add or remove consumer pods (via Kubernetes HPA reacting to lag), partitions are reshuffled without anyone touching config. The second is automatic failure recovery: a consumer that crashes has its partitions reassigned within session.timeout.ms, so the topic keeps draining even though one consumer is gone. The third is centralised offset tracking: every team using __consumer_offsets gets the same operational tooling (kafka-consumer-groups.sh, Burrow, Confluent Control Center) for free. The first two are the load-bearing reasons; the third is what makes consumer groups the boring default.

A useful side effect of "the third reason" is that the on-call rotation can debug a consumer they've never seen before. A new fraud detector, a new analytics pipeline, a new audit consumer — each one shows up in the same kafka-consumer-groups.sh --list output, with the same lag dashboard, with the same reset commands available. At Razorpay this matters: 200+ teams ship consumers, and the platform team would drown if every team had its own offset-tracking convention. The standard __consumer_offsets pattern is the boundary that keeps Kafka usable as an internal platform.

The shape of the contract is worth stating out loud once. The producer side is pure: a record arrives, gets hashed to a partition, gets written. The consumer side is stateful: there is a group, and the group remembers where it stopped. Everything else in this chapter is consequences of those two sentences.

Consumer group anatomyA topic with 6 partitions feeds a single consumer group containing 3 consumers. The group coordinator on broker B2 assigns partitions 0-1 to consumer C1, 2-3 to C2, and 4-5 to C3. A separate dotted box shows __consumer_offsets, the internal topic where each consumer commits the offset of the last record it has processed for each partition it owns. group.id = fraud-rules-v2 — 3 consumers, 6 partitions topic payments.tx_events partition 0 (offsets 0..N) partition 1 (offsets 0..N) partition 2 (offsets 0..N) partition 3 (offsets 0..N) partition 4 (offsets 0..N) partition 5 (offsets 0..N) group coordinator broker B2 computes assignment consumer C1 owns p0, p1 consumer C2 owns p2, p3 consumer C3 owns p4, p5 __consumer_offsets (internal topic, 50 partitions) key: (group.id, topic, partition) → value: committed offset + metadata
The group has three consumers; six partitions get split two-each. Offsets are not stored on the consumer — they are written to __consumer_offsets, which is itself a Kafka topic. On restart, the consumer asks the coordinator for its last committed offset and resumes from there.

The figure carries the whole load-balancing picture. The producer side hashes a key into one of six partitions. The consumer side has three workers; each owns two partitions; each remembers — in __consumer_offsets — the last offset it processed. Add a fourth consumer and the group rebalances: one consumer's partitions get redistributed so all four have either 1 or 2 partitions. Lose a consumer (process crash, network partition, GC pause exceeding session.timeout.ms) and the coordinator detects the missed heartbeat and rebalances the dead consumer's partitions onto the surviving ones, all within 6–10 seconds on the modern default config.

Offsets are facts, commits are promises

An offset is a number: "for partition 3, this group has consumed up to offset 4203". Records 0..4203 are processed; record 4204 is what the next poll() returns. The committed offset lives in __consumer_offsets, an internal Kafka topic with 50 partitions, replicated 3×, with key (group.id, topic, partition). Reading the latest value for that key gives you the consumer's resume point.

The lie everyone tells about offsets is that "committing the offset" means "the work is done". It does not. A commit is a promise the consumer makes to itself — "I claim records 0..4203 are processed; if I crash now, restart from 4204". Whether the work is actually done depends entirely on the order in which you write to your sink and call commit(). Get the order wrong and you choose your poison: at-most-once or at-least-once, with no in-between for the default Kafka consumer.

# consumer_loop.py — the order of these four lines decides delivery semantics
from kafka import KafkaConsumer
import psycopg2, json

consumer = KafkaConsumer(
    "payments.tx_events",
    bootstrap_servers="kafka.razorpay.internal:9092",
    group_id="fraud-rules-v2",
    enable_auto_commit=False,                     # we commit manually
    auto_offset_reset="earliest",                 # if no committed offset, start from beginning
    isolation_level="read_committed",             # ignore aborted transactions (chapter 64)
    max_poll_records=500,
)
pg = psycopg2.connect("dbname=fraud user=fraud_app host=db.razorpay.internal")

while True:
    batch = consumer.poll(timeout_ms=1000)        # returns dict[TopicPartition, list[record]]
    if not batch:
        continue
    with pg.cursor() as cur:
        for tp, records in batch.items():
            for r in records:
                tx = json.loads(r.value)
                # 1. Process: insert fraud-score row keyed by (merchant_id, tx_id)
                cur.execute(
                    "INSERT INTO fraud_scores (merchant_id, tx_id, score, source_offset) "
                    "VALUES (%s, %s, %s, %s) ON CONFLICT (merchant_id, tx_id) DO NOTHING",
                    (tx["merchant_id"], tx["tx_id"], score(tx), r.offset),
                )
        pg.commit()                                # 2. Sink commit FIRST — durable in Postgres
    consumer.commit()                              # 3. Offset commit SECOND — durable in Kafka
# What the topology looks like at steady state:
poll → 432 records from p2, 471 from p3
INSERT 903 rows (5 already-seen, ON CONFLICT DO NOTHING)
pg.commit()                # rows are durable: WAL fsync'd
consumer.commit()          # offsets durable: __consumer_offsets fsync'd, replicas ack'd
poll → next batch starting from offset 4675 on p2, 4892 on p3

The lines that decide everything:

Rebalances: the seconds you can't afford

A rebalance is what happens when the group's membership changes — a consumer joins, leaves, or the coordinator decides one is dead because heartbeats stopped arriving. The pre-2.4 protocol (eager rebalance) made this expensive: every consumer in the group revoked all its partitions, the coordinator computed a new assignment, every consumer picked up its new partitions. During the revoke-compute-reassign window — anywhere from 3 to 30 seconds depending on state-restore cost — no records get processed. For Razorpay's fraud detector during Diwali traffic at 3,000 transactions/second, a 10-second pause means 30,000 transactions queue up; the consumer takes 30 seconds to drain the backlog after the rebalance, during which p99 latency hits 40 seconds. That's the kind of thing that triggers a Slack page.

The 2.4-and-later protocol — cooperative-sticky rebalance, defined in KIP-429 — splits the rebalance into two phases. Phase 1: the coordinator computes the new assignment but only revokes partitions that changed owner. Consumers whose partition list is unchanged keep processing through the rebalance. Phase 2: only the moved partitions get reassigned. For a 1.2 GB-state-per-consumer fraud detector at Dream11, the difference between eager and cooperative-sticky is the difference between a 30-second cluster-wide pause every time a consumer scales out and a 2-second pause on the one consumer whose partitions moved. The migration cost is one config (partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor) and one rolling restart of the consumer fleet.

The other rebalance trigger nobody talks about until it bites them is session timeout. The consumer sends a heartbeat every heartbeat.interval.ms (default 3 seconds); if no heartbeat lands for session.timeout.ms (default 45 seconds since Kafka 3.0; 10 seconds before that), the coordinator declares the consumer dead and rebalances its partitions away. A consumer doing a long Postgres INSERT that holds the GIL — or a Java consumer in a 30-second JVM GC pause — silently misses heartbeats, gets kicked out of the group, and on the next poll loop discovers its partitions are owned by another consumer; its in-flight work is wasted. The fix is max.poll.interval.ms (default 5 minutes) for actual processing time, paired with the heartbeat being sent by a background thread independent of the poll loop. Why two timeouts and not one: heartbeats prove "the consumer process is alive and connected", while max.poll.interval.ms proves "the consumer is making forward progress on records". A consumer can be alive (heartbeats fine) but stuck in a single 10-minute sink call; the second timeout catches that case so the group doesn't sit forever waiting for one wedged consumer. Get those two configs wrong and your group enters a thrash loop where every consumer keeps timing out and rebalancing — kafka-consumer-groups.sh --describe shows the group as PreparingRebalance permanently.

Eager vs cooperative-sticky rebalanceTwo timelines stacked vertically. The top timeline shows eager rebalance: at t=0 a new consumer joins, at t=1 all 3 existing consumers revoke their partitions, the coordinator recomputes assignment from t=1 to t=2, all consumers resume from t=2 onwards — a 4-second cluster-wide pause. The bottom timeline shows cooperative-sticky: only one consumer pauses briefly while its single partition moves; the other consumers keep processing throughout. Same event (one new consumer joins), two protocols Eager (pre-2.4): cluster-wide pause processing REVOKE → COMPUTE → REASSIGN (all 3 paused) processing t=0 (new join) t=4s (resume) Cooperative-sticky (2.4+): only the moved partition pauses consumer C1 (p0, p1) — uninterrupted consumer C2 (p2, p3) — uninterrupted C3: p4, p5 revoke p5 C3: p4 (continues), p5 → C4 C4: not in group join C4: p5 (warming up) 2-second window where only p5 is paused; the other 5 partitions keep processing.
Eager rebalance pauses the entire group for the duration of the assignment computation; cooperative-sticky pauses only the consumers whose partitions are moving. For a stateful consumer fleet (1+ GB of in-memory state per consumer), this is the difference between a 30-second cluster-wide outage and a 2-second hiccup on one node.

Operationally: kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group fraud-rules-v2 shows current owner per partition and the lag. If you see LAG > 100,000 on one partition for more than 30 seconds, either that consumer is stuck in a long sink call (check max.poll.interval.ms), is dead and the group hasn't noticed yet (rare), or the partition is hot — and you're back to the previous chapter's diagnosis tree.

Auto-offset-reset and the "where do new groups start" question

The first time a group.id connects to a topic, there is no committed offset. The consumer must decide where to start — and the only knob is auto.offset.reset. Three values, three different production behaviours:

The trap that bites Indian fintech teams every quarter: you ship a new microservice with a typo in group.id (fraud-rules-V2 vs fraud-rules-v2). The new group has no committed offsets, auto.offset.reset=earliest, and starts replaying 7 days of payments. Postgres melts under the insert load; the on-call engineer can't tell whether to kill the service or let it catch up. Standardise on auto.offset.reset=none for production groups so a typo'd group.id fails loudly instead of silently chewing through a backlog.

A related operation: manual offset reset. Sometimes you legitimately want to replay a window — your fraud model had a bug yesterday and you need to re-score the last 6 hours of transactions. The command is kafka-consumer-groups.sh --reset-offsets --to-datetime 2026-04-25T03:00:00Z --group fraud-rules-v2 --topic payments.tx_events --execute. The __consumer_offsets topic gets a new write with the rewound offset; on next poll, the consumer resumes from there. Verify by re-running --describe and checking that LAG jumps to "records since 03:00 UTC". A common variant is --shift-by -100000 to step back 100k records, useful when you know the count but not the wall-clock time.

One critical operational rule: the consumers must be stopped before you reset offsets. The kafka-consumer-groups.sh tool refuses to reset offsets for an active group precisely because the in-memory state on each consumer doesn't know about the rewind — it would happily continue from its current position and silently undo the reset on the next commit. Stop the fleet, run --reset-offsets, restart the fleet. The downtime is annoying but unambiguous; the alternative — a "live reset" — does not exist for a reason.

Lag is the only metric that matters

Consumer lag — the gap between the topic's log-end offset and the group's committed offset, per partition — is the single number that tells you whether your pipeline is healthy. Lag of zero means the group is caught up; lag growing linearly means the consumer can't keep up with the producer; lag oscillating in a sawtooth pattern means a consumer is doing periodic large batch work and falling behind during it. Every other metric (CPU, memory, throughput) is a derived signal; lag is the load-bearing one.

The cheap way to read lag is kafka-consumer-groups.sh --describe. The production way is to scrape it as a Prometheus metric — either via burrow (LinkedIn's lag-monitoring service that classifies groups as OK/WARNING/STALL based on lag trajectory, not just current value) or via the kafka_consumer_group_lag exporter that newer Kafka versions ship natively. The trajectory matters: a fraud detector with steady 2-minute lag during peak Diwali traffic might be fine; the same detector with lag rising from 30s to 30 minutes over an hour is in trouble. Burrow's STALL state is exactly "lag isn't necessarily huge but the consumer hasn't committed in a while" — that catches a wedged consumer before its lag visibly explodes.

A worked operational pattern at Razorpay: the on-call dashboard shows three lag panels per critical consumer group. The first is current lag per partition (heat map across 64 partitions; one partition turning red catches a hot-partition or stuck-consumer problem). The second is lag trajectory over the last 30 minutes (linear growth = under-provisioned; sawtooth = back-pressure from sink). The third is "time-since-last-commit" per consumer (a value above 10s means a consumer is mid-batch in a long sink call, possibly approaching max.poll.interval.ms). When the system is healthy, all three are flat green. When it isn't, exactly one of the three lights up and tells you which class of problem you have — partition-side, throughput-side, or consumer-side.

The trap with lag in records is that it doesn't tell you "how far behind in time" the consumer is. A consumer that's 100,000 records behind during off-peak (10 records/sec) is 167 minutes behind; the same 100,000 records during peak (10,000 records/sec) is 10 seconds behind. The right monitoring axis for a fraud detector is lag-in-seconds ((now - latest record's timestamp) for the next record the consumer would process), not lag-in-records. Most lag exporters expose both; alert on lag-in-seconds for SLO-bearing consumers and lag-in-records for cost-bearing batch consumers. Mixing them up at 3 a.m. is how you page someone for nothing or fail to page when production is bleeding.

Common confusions

Going deeper

How __consumer_offsets actually works

__consumer_offsets is a regular Kafka topic with 50 partitions, replication factor 3, and log compaction enabled (chapter 52). Compaction means Kafka keeps only the latest value per key — for offsets, the key is (group.id, topic, partition), so old offsets get discarded. This is what stops the topic from growing unbounded: after a year, the partition still has only one record per active (group, topic, partition) triple. The coordinator broker for a group is determined by hash(group.id) mod 50 — every group is pinned to one specific partition of __consumer_offsets, and the broker leading that partition becomes the coordinator. Lose that broker and a leader election promotes a replica; the new leader rebuilds in-memory state by replaying the partition's log. Recovery is typically under 5 seconds.

The coordinator also stores group metadata — current member list, generation ID, current assignment — in the same topic, alongside offsets. Two record types share the partition: OffsetCommit records and GroupMetadata records, distinguished by the version byte in the value. When you run kafka-consumer-groups.sh --describe, the AdminClient reads that partition for your group and assembles the response from those two record types. There is no separate "control plane" database; the topic is the database, and Kafka's own log is what backs the coordinator's state. This is the ZooKeeper-removal payoff that KRaft mode (Kafka 3.3+, default in 4.0) takes to its conclusion: the cluster manages itself with the same primitive it offers to clients.

Static membership: surviving rolling restarts without rebalance

A regular consumer joining the group looks like a new consumer to the coordinator — it gets a fresh member ID and triggers a rebalance. For a stateful fleet on Kubernetes that does rolling restarts (each pod restarts in turn), this means N rebalances per deploy, each pausing the affected partitions. KIP-345 introduces static membership: each consumer has a stable group.instance.id (e.g. fraud-pod-3) that survives restarts. When a consumer with the same instance ID rejoins within session.timeout.ms, the coordinator returns its previous partitions without rebalancing the rest of the group. For PhonePe's stream-processing fleet doing 50 rolling restarts a day, this knocked rebalance frequency from 50/day to ~2/day (only when a pod stays down past the timeout), saving roughly 25 minutes/day of cumulative consumer pause across the group.

Kafka 4.0 and the new consumer rebalance protocol (KIP-848)

Kafka 4.0 (2025) ships a brand-new server-side rebalance protocol — KIP-848 — where the coordinator does the partition-assignment computation instead of one of the consumers. The old protocol elected a "group leader" consumer to compute the assignment, which meant the leader's CPU load determined rebalance latency for the whole group. The new protocol moves the computation into the broker, makes assignment changes incremental at the protocol level (not just the client level), and removes the eager-protocol legacy path entirely. For groups with thousands of consumers (rare; usually means too many consumers per partition), this drops rebalance latency from seconds to tens of milliseconds. As of Kafka 4.0, KIP-848 is GA; the old protocol is still supported but deprecated.

Offset commit at scale: how Razorpay handles 12k consumers

Razorpay runs roughly 12,000 consumer instances across all teams against the central Kafka cluster. With auto-commit at 5 s, that's 12,000 commits every 5 s = 2,400 commits/sec landing on __consumer_offsets's 50 partitions = ~48 commits/sec/partition. Each commit is small (a few hundred bytes), so the absolute load is fine — but the implicit fsync on every commit dominates write latency. The fix Razorpay uses is commit.async() for the steady state and commit.sync() only on consumer shutdown; async commits are batched by the client driver and don't block the poll loop. The trade-off is that an async commit can fail silently — if the consumer crashes between sending the async commit and the broker ack, the offset is not committed, and on restart the batch replays. Combined with the at-least-once sink ordering, this is fine; for at-most-once consumers (analytics-only), use commit.sync().

When to bypass the standard machinery: assign() and external offsets

The whole consumer-group machinery is opt-in. A consumer that calls subscribe(["payments.tx_events"]) joins a group, gets partitions assigned by the coordinator, and commits offsets to __consumer_offsets. A consumer that calls assign([TopicPartition("payments.tx_events", 3)]) directly takes ownership of partition 3 with no group, no rebalance, no automatic offset management — the caller commits offsets manually (often to its own external store) or doesn't commit at all. assign() is what tools like kafka-console-consumer.sh use, and what specialised consumers (Spark Streaming receiver mode, debugging tools, point-in-time replays) use. The trade-off is that the operator now owns coordination — two assign() consumers reading the same partition won't be stopped by Kafka. Useful for debugging, dangerous for production unless you know exactly what you're doing.

The other case where you bypass __consumer_offsets is when the sink is a transactional database. You can store the consumer's offset in the same transaction as the sink write — BEGIN; INSERT row; UPDATE consumer_offset SET offset=4203 WHERE group='fraud-rules-v2' AND partition=3; COMMIT;. The atomic commit means the row and the offset advance or roll back together; there is no window where a crash leaves them inconsistent. On startup, the consumer reads its committed offset from the database (not from __consumer_offsets) and seeks to that offset before its first poll. This pattern eliminates the at-least-once duplicate window for sinks that already participate in a transactional database — Postgres, MySQL, CockroachDB, anything that supports atomic multi-row commits. Flink's two-phase-commit sinks (chapter 65) generalise this idea to non-transactional sinks via 2PC; for plain Postgres the technique is just "commit them together".

Where this leads next

Consumer groups give you the what of parallel reads — partitions are assigned and offsets are tracked. They don't tell you what happens when the broker holding a partition dies. That's the next chapter, /wiki/replication-and-isr-how-kafka-stays-up: how Kafka's in-sync replica protocol turns a single-disk partition into a F+1-disk partition with automatic failover, so a broker death doesn't take down a partition's consumers.

After that, /wiki/retention-compaction-tiered-storage covers what makes __consumer_offsets itself work — log compaction — and how it interacts with the time/size retention policies that decide when records are deleted from a topic. A consumer that lags past the retention boundary discovers that its committed offset points to a deleted record; the resume behaviour there is the kind of edge case that gets hardcoded into the team's runbook after it bites once.

Beyond Build 7, consumer-group offset commits are the foundation for /wiki/checkpointing-the-consistent-snapshot-algorithm (the Chandy–Lamport algorithm Flink uses to snapshot stream state) and for the two-phase-commit transactional sinks that make end-to-end exactly-once possible (Build 9). Every one of those higher-level guarantees is, at the bottom, a question of "when did this consumer commit which offset, and was the side effect already durable when it did?".

The cleanest mental model to carry forward: a consumer group is a stateful cursor on a partitioned log, with the cursor itself stored back in the same log infrastructure. The producer writes records; the broker durably stores them; the coordinator hands a slice of partitions to each consumer; each consumer reads, processes, and commits a per-partition offset. Drop any one of those four roles and the system stops working. Once that picture is in place, every higher-level streaming concept — windows, watermarks, checkpoints, exactly-once — reduces to "what extra metadata do we commit alongside the offset, and what extra atomicity do we need across the offset commit and the side effect?". That's what the next ten chapters answer, one piece at a time.

References