Retention, compaction, tiered storage

The previous chapter ended with three replicas of the same partition fsynced to disk on three different brokers, the producer happy, the consumer reading at the high-water mark. Now imagine that's been running for 18 months. Razorpay's payments.tx_events partition 3 has 14 TB of records on each broker, the SSDs are 87% full, and the on-call engineer is being paged about disk pressure on a Saturday. Records from June 2024 are still on disk even though no consumer has read them in 9 months. Something has to delete old data, and that something has three flavours — time-based retention, size-based retention, and log compaction — plus a fourth knob, tiered storage, that moves cold segments to S3 instead of deleting them.

A Kafka log is split into segment files; retention deletes whole segments past a time or size threshold, compaction garbage-collects keyed records by keeping only the latest value per key, and tiered storage offloads cold segments to S3 so the local disk only holds the hot tail. The three mechanisms compose: an event-stream topic uses time retention, an offsets-style state topic uses compaction, and a long-history topic combines compaction with tiered storage to hold years of data on cheap storage.

Segments are the unit everything operates on

A Kafka partition is not one giant file — it's a sequence of fixed-size segment files on disk, each holding a contiguous range of offsets. Default log.segment.bytes=1073741824 (1 GB), so for a partition writing 50 MB/sec, a new segment rolls roughly every 21 seconds. The active segment is the one currently being appended to; once it hits the size threshold or the time threshold (log.roll.ms, default 7 days), the broker closes it, fsyncs the index files, and opens a fresh segment. Closed segments are immutable — and that immutability is what makes retention and compaction tractable. You don't shrink a file in place; you delete the file.

Inside one segment directory you actually find four files: 00000000000000004210.log (the records themselves), 00000000000000004210.index (offset → byte position), 00000000000000004210.timeindex (timestamp → offset), and 00000000000000004210.snapshot (transactional state). The 16-digit number in the filename is the base offset — the offset of the first record in that segment. So a partition's directory listing tells you the segment boundary structure at a glance: you can read off "this partition has segments starting at offsets 0, 4210, 8420, 12630, …" and the gaps are the segment sizes. The active segment is whichever has the highest base offset.

Partition log split into segmentsA horizontal timeline of a single partition shows five segment files, each labeled with its base offset and size. Segments 1 through 4 are closed and immutable. Segment 5 is the active segment receiving new appends. Below the timeline a row indicates which segments are eligible for retention deletion (segments 1 and 2 because they are older than the retention window) versus which segments are being kept (3, 4, 5). A separate annotation on the right shows the four files inside each segment directory: .log, .index, .timeindex, .snapshot. A partition is a sequence of immutable segment files segment 1 base = 0 1 GB, 14 days old segment 2 base = 4210 1 GB, 9 days old segment 3 base = 8420 1 GB, 5 days old segment 4 base = 12630 1 GB, 2 days old segment 5 base = 16840 ACTIVE offset 0 log-end-offset = 19821 eligible for deletion retention.ms = 7 days kept on disk (within retention window) consumers can replay any offset in this range per segment .log (records) .index (offset → byte) .timeindex (ts → offset) .snapshot (txn state) retention deletes whole segments — never partial files because segments are immutable, deletion is just an unlink — fast, no rewrite, no fsync stall
Every retention or tiered-storage operation works at segment granularity. The active segment is exempt — it's the only file being written, and it stays until it rolls. This is why a partition with very low traffic can hold "old" data well past the retention window: if no new records arrive to roll the active segment, the old records inside it stay too. Setting log.roll.ms alongside retention.ms is what fixes that edge case.

Why segment files instead of one big file: deletion of arbitrary record ranges from the middle of a file requires either rewriting the file (expensive) or marking dead bytes and compacting later (complex). Whole-file deletion is unlink() — one syscall, no IO. By making segments the deletion unit, retention becomes near-free and compaction becomes a controlled rewrite of one segment at a time. The trade-off is that you can't delete a single record below the retention boundary; you delete the whole segment that contains it. For payments-grade compliance — "delete all records of customer X by court order" — this is why GDPR-style deletes need compaction (with a tombstone keyed on customer_id) rather than retention.

The segment count is also what you watch on the broker. kafka.log:type=Log,name=NumLogSegments,topic=...,partition=... reports the live count per partition; multiply by replication factor and topic count and you get the broker's open-file-handle pressure. Default nofile=1048576 on most production-tuned Linux boxes; brokers with 50,000 partitions × 4 files × 3 replicas can saturate that limit during a rolling restart when every broker is opening files at once. The fix is large segments (log.segment.bytes=2147483648 for high-traffic topics) so there are fewer files per partition, balanced against the cost — bigger segments mean retention granularity is coarser, so a topic with retention.ms=86400000 (1 day) and log.segment.bytes=2 GB may keep up to 2 days of records in pathological low-traffic cases.

Time and size retention: the eviction policies

The simplest cleanup policy is cleanup.policy=delete (the default). Two thresholds run continuously: time-based via retention.ms (or retention.minutes/retention.hours for convenience; default 604800000 = 7 days) and size-based via retention.bytes (default -1, disabled). A segment is eligible for deletion when its largest record timestamp is older than retention.ms ago, OR when the partition's total size exceeds retention.bytes. The log cleaner thread runs every log.retention.check.interval.ms (default 5 minutes) and unlinks any eligible segment.

The largest-timestamp rule has a subtle consequence: a segment isn't deleted at the moment its records become old, it's deleted when the newest record in the segment is older than the threshold. For a 1 GB segment that took 21 seconds to fill, this is barely visible; for a low-traffic topic where one segment holds 3 weeks of records, the oldest record can be 7 days + 3 weeks old before the segment goes. Why the largest timestamp instead of the smallest: using the smallest timestamp would mean the segment becomes eligible 21 seconds after the first record, by which time only a few of the 1 GB records are actually past the retention window. The largest-timestamp rule guarantees no record is deleted before its individual retention boundary; the cost is that some records are kept slightly longer than the configured window. This conservative skew is what compliance requires — never delete data early, sometimes keep it slightly late.

Size-based retention applies to total partition size on disk and is usually the safety net rather than the primary policy. Razorpay sets retention.ms=604800000 (7 days) on payments.tx_events to satisfy the regulatory replay requirement, plus retention.bytes=536870912000 (500 GB per partition) as a disk-pressure safety in case a downstream consumer's lag pushes total size past projection. With both set, the more aggressive policy wins — whichever fires first deletes the segment. For analytics topics that ingest 10× normal volume during a Flipkart Big Billion Day surge, size retention can kick in well before the time window expires, which is the desired behaviour: better to lose 2 hours of click events than to fill the broker disk and crash producer writes for everyone.

The third knob, log.retention.check.interval.ms, decides how often the cleaner sweeps. Default 5 minutes is fine for steady-state; for a topic that's bleeding disk pressure during an incident, dropping it to 30 seconds via kafka-configs.sh --alter makes deletion responsive without restarting the broker. Producer write latency is unaffected by the cleaner because deletion runs on a separate thread and the active segment is never deleted — but consumer reads can briefly stall if a consumer was reading from a segment the cleaner just deleted (OffsetOutOfRangeException, with auto.offset.reset deciding whether to jump to earliest or latest).

Log compaction: the keyed-state mechanism

cleanup.policy=compact is the second mechanism, and it's a different beast. Instead of deleting whole segments by age, compaction rewrites segments to keep only the most recent value per key. For Kafka's own __consumer_offsets topic, every record is (consumer_group_id, topic, partition) → committed_offset; over a year that topic accumulates millions of redundant offset commits, but the only one that matters per consumer-group-topic-partition is the latest. Compaction garbage-collects the old ones while preserving the invariant that "the latest value for every key that ever existed is still in the log".

The compaction algorithm runs in two passes per partition. Pass one: walk the closed segments and build a hash map from key to offset-of-latest-value. Pass two: walk the closed segments again and emit a new combined segment containing only records whose offset matches the map (i.e., the latest record for that key) plus any record in the active segment (which compaction never touches). When the new segment is fsynced, the old segments are unlinked. The active segment plus a configurable buffer (min.cleanable.dirty.ratio, default 0.5 — compaction only fires when ≥50% of the partition's bytes are stale) means compaction runs in batches rather than continuously.

# compaction_demo.py — how the keyed view evolves as records get compacted
from collections import OrderedDict

records = [
    # (offset, key, value)  — chronological producer log
    (0,  "user:101", "balance=500"),
    (1,  "user:102", "balance=1200"),
    (2,  "user:101", "balance=480"),       # supersedes offset 0
    (3,  "user:103", "balance=300"),
    (4,  "user:102", "balance=1180"),      # supersedes offset 1
    (5,  "user:101", "balance=460"),       # supersedes offset 2
    (6,  "user:104", "balance=900"),
    (7,  "user:103", None),                # tombstone — delete user:103
    (8,  "user:101", "balance=440"),       # supersedes offset 5
    (9,  "user:105", "balance=750"),
]

def compact(records, tombstone_retention_ms=86_400_000, now_ms=10):
    # Pass 1: find latest offset per key
    latest_offset = OrderedDict()
    for offset, key, _ in records:
        latest_offset[key] = offset

    # Pass 2: emit only records whose offset matches the latest map
    keep = set(latest_offset.values())
    compacted = [(o, k, v) for (o, k, v) in records if o in keep]

    # Drop tombstones older than tombstone_retention_ms (delete.retention.ms)
    final = [(o, k, v) for (o, k, v) in compacted
             if v is not None or (now_ms - o) < tombstone_retention_ms]
    return final

print("Before compaction:", len(records), "records")
for r in records: print(" ", r)
print("\nAfter compaction:", len(compact(records)), "records")
for r in compact(records): print(" ", r)
Before compaction: 10 records
  (0, 'user:101', 'balance=500')
  (1, 'user:102', 'balance=1200')
  (2, 'user:101', 'balance=480')
  (3, 'user:103', 'balance=300')
  (4, 'user:102', 'balance=1180')
  (5, 'user:101', 'balance=460')
  (6, 'user:104', 'balance=900')
  (7, 'user:103', None)
  (8, 'user:101', 'balance=440')
  (9, 'user:105', 'balance=750')

After compaction: 5 records
  (4, 'user:102', 'balance=1180')
  (6, 'user:104', 'balance=900')
  (7, 'user:103', None)
  (8, 'user:101', 'balance=440')
  (9, 'user:105', 'balance=750')

The key lines that decide everything:

The output has 5 records instead of 10. user:101 went from 4 records (at offsets 0, 2, 5, 8) down to 1 record at offset 8, which is the latest. user:102 collapsed from 2 to 1. user:103 still has its tombstone at offset 7 — it'll be there until delete.retention.ms elapses, then it gets pruned too. The result is that the partition holds, at any given time, exactly the live keyed state plus tombstones for recently-deleted keys. This is what makes compacted topics usable as durable key-value stores: a consumer that boots cold can read from offset 0 to log-end-offset and rebuild the entire state, because compaction guarantees no key has been silently lost.

The third policy is cleanup.policy=compact,delete — compaction plus time-based retention. This is the "I want a keyed state store with a maximum age" combination. Records that fall past retention.ms are deleted regardless of whether they're the latest value for their key; within the retention window, compaction collapses redundancy. For Zerodha's per-trade reconciliation state — where every trade has a key and a final settlement state, but trades older than 90 days are archived to long-term storage and don't need to be in the topic — this combination keeps the topic compact and bounded.

Tiered storage: the cold-data offload

Retention deletes data when it gets old. But "delete" isn't always what you want — sometimes you want "move it somewhere cheap and keep reading from it on demand". KIP-405 (GA in Kafka 3.6, late 2023) is tiered storage: closed segments older than local.retention.ms get uploaded to a remote store (S3, GCS, Azure Blob), the local file is deleted, and the broker keeps a pointer in its metadata. When a consumer requests an offset that lives in a remote segment, the broker fetches the segment back on demand, serves the records, and either caches them locally or streams them through.

The motivation is cost. Local NVMe on a Kafka broker (whether bare metal or AWS i4i.4xlarge) costs roughly ₹15–₹20 per GB-month; S3 standard is ₹1.80 per GB-month, and S3 Glacier Instant Retrieval is ₹0.40 per GB-month. For Razorpay holding 5 years of payments.tx_events for compliance — ~250 TB compressed — the local-only cost would be ₹50 lakh per month just for the disk; tiered to S3 it drops to ₹4.5 lakh. The trade-off is read latency: local-disk reads are 200 µs, remote-segment reads are 100–500 ms (download from S3, then serve). For consumers reading the hot tail (last 7 days) the latency is unchanged because that data is still on local disk; for a one-time backfill replay reading from 4 years ago, the slow first read is acceptable.

Tiered storage: hot tail on local disk, cold history in S3A horizontal timeline shows segments arranged left to right by age. The leftmost (oldest) segments are stored in S3, shown in a wide box labeled "remote tier". The middle segments are in transition, indicated by a striped fill, marked as "uploading". The rightmost (newest) segments including the active segment sit in a smaller box labeled "local tier" on broker disk. A consumer reading from offset zero is shown reading first from S3, then crossing into local. A second consumer reading the latest offset reads only from local disk. An arrow labeled local.retention.ms points at the boundary between local and remote tiers. Tiered storage: where each segment lives by age REMOTE TIER (S3 / GCS / Azure Blob) segments older than local.retention.ms (e.g. 7 days) ~₹1.80 / GB-month, 100–500 ms first-byte latency uploading in flight LOCAL TIER hot tail: last 7 days ~200 µs reads offset 0 (4 years old) log-end-offset local.retention.ms backfill consumer (reads from offset 0): first reads come from S3 (slow), once it crosses into the local tier latency drops to 200 µs live consumer (reads at log-end-offset): always served from local disk — never touches S3 in steady state retention.ms still applies — segments age out of the remote tier too, just much later (e.g. 5 years instead of 7 days)
The two-tier model means the broker's local disk only sizes for the hot tail, decoupled from total topic history. Local disk size becomes a function of throughput × local.retention.ms, not total compliance retention. For a 50 MB/sec partition with 7-day local retention, that's ~30 TB local — fine on one i4i.4xlarge — versus 4.4 PB local without tiering, which is the difference between a normal cluster and an unaffordable one.

The mechanism inside the broker: a RemoteLogManager thread monitors closed segments per partition. When a segment's largest timestamp is older than local.retention.ms, the manager uploads .log, .index, .timeindex, and .snapshot to the remote store, atomically updates the partition's remote-log metadata (stored in an internal compacted topic, __remote_log_metadata), and deletes the local files. The metadata maps (partition, base_offset) → (remote_object_key, leader_epoch, segment_size). On a consumer fetch for an offset that resolves to a remote segment, the broker streams the segment from S3 and either serves it through, caches it in a local read-through buffer (configurable size via remote.log.reader.threads and the buffer pool), or both.

The integration with replication: only the leader uploads to remote storage. Followers don't duplicate the upload — they share the remote tier, so a partition with replication.factor=3 has one S3 copy, not three. Why one copy is safe: S3 is itself replicated 11-nines durability across multiple availability zones internally. Storing three Kafka-replica copies of every cold segment in S3 would triple the cost for no additional durability — the underlying object store is already more durable than three local disks. The trade-off is that if the remote tier becomes unavailable (a regional S3 outage), all three Kafka replicas are equally blind, while three local copies might survive a single-AZ failure. For most production deployments that's the correct trade-off; for ultra-high-availability topics, you can configure two independent remote stores via the pluggable RemoteStorageManager interface.

The configuration is per-topic:

Knob What it controls Sensible value
remote.log.storage.enable=true (broker-level) Turns the feature on required
remote.storage.enable=true (topic-level) Tiered storage for this topic per-topic
local.retention.ms How long segments stay on broker disk 7d for most topics
local.retention.bytes Local disk safety cap per partition 500 GB
retention.ms Total retention (across both tiers) 5y for compliance
remote.log.metadata.manager.class.name Pluggable metadata backend TopicBasedRemoteLogMetadataManager (default)
remote.log.storage.manager.class.name Pluggable storage backend S3RemoteStorageManager or vendor equivalent

For Razorpay's payments.tx_events: local.retention.ms=604800000 (7 days hot), retention.ms=157680000000 (5 years total), tiered storage on, S3 backend. Producer and live-consumer latency are unchanged from a non-tiered cluster (because they only touch the local tier); a backfill replay from 2 years ago is the only path that hits S3, and the SRE team accepts the 200 ms first-byte latency for that case because the alternative is paying ₹50 lakh/month for local NVMe holding 5-year history.

Putting the three policies together

The three policies — time/size retention, compaction, and tiered storage — are orthogonal and compose. The cleanup policy decides what to do with old data; the tiered-storage flag decides where old data lives before that decision applies.

Topic class cleanup.policy local.retention.ms retention.ms Tiered?
Event stream (clickstream, logs) delete n/a 7d no
Compacted state (__consumer_offsets, ksqlDB tables) compact n/a infinite no
Bounded compacted state compact,delete n/a 90d no
Long-history audit log (Razorpay payments) delete 7d 5y yes
Hybrid: keyed state with long history compact 7d infinite yes

The hybrid case at the bottom is interesting. A topic configured cleanup.policy=compact with tiered storage and infinite retention is effectively a keyed state store with unlimited history — every key's latest value is preserved (compaction guarantee), and the historical updates are tiered to S3 indefinitely. PhonePe uses this pattern for their merchant-balance topic: the latest balance per merchant is always one S3 fetch away even for long-tail merchants who haven't been touched in months, and the audit trail of every balance change is in remote storage for 7-year RBI compliance. The local broker disk only holds the hot-merchant balances and recent updates; the bulk of the topic lives in S3.

There's one more interaction worth noting. The cleanup policy operates per partition independently — a topic doesn't have a single global cleanup state. So a 64-partition topic during compaction has 64 cleaner threads (well, scheduled across log.cleaner.threads worker threads, default 1; for compaction-heavy clusters bump this to 4 or 8). For a __consumer_offsets topic with 50 partitions and aggressive compaction, single-threaded cleaning can fall behind during a high-rebalance period — every consumer-group offset commit lands in this topic, and a 10-minute rebalance storm can produce 500 MB of new commits per partition. The signal to watch is kafka.log:type=LogCleaner,name=cleaner-recopy-percent and max-clean-time-secs; if the latter exceeds your min.cleanable.dirty.ratio × segment-size threshold, you've got a backlog.

Common confusions

Going deeper

The log cleaner internals: dedupe map and IO bookkeeping

The compaction thread allocates a fixed-size dedupe map (log.cleaner.dedupe.buffer.size, default 128 MB) and walks the dirty section of the partition computing a hash of every key. The map records key_hash → latest_offset; when the buffer fills, compaction processes the prefix it has data for and re-runs to cover the rest. For a partition with 100 million unique keys and 16-byte key hashes, that's 1.6 GB of map needed per pass — the default 128 MB triggers a multi-pass compaction. Tuning the buffer size for a state-store-style topic with many unique keys is the difference between compaction completing in 10 minutes versus 4 hours. The kafka.log:type=LogCleanerManager,name=max-buffer-utilization-percent metric tells you whether your buffer is undersized.

Why timestamps are tricky for retention

Kafka records carry two timestamps: producer-supplied (CreateTime) or broker-stamped (LogAppendTime), controlled by message.timestamp.type. For retention purposes the broker uses whichever timestamp the topic is configured for. If the topic is CreateTime and a producer sends a record with a backdated timestamp (clock skew, late-arriving event), that record can age out of retention faster than the producer expected. Worse, if a backdated record is the largest timestamp in its segment, the entire segment becomes deletion-eligible immediately. Most production teams set message.timestamp.type=LogAppendTime for retention safety — the broker stamps the record as it lands, removing any client-clock dependency from retention behaviour. The cost is that consumers can't distinguish event time from ingest time without a separate header.

Tiered storage and consumer offset commits

When a consumer reads from a remote segment, the offset commit it issues to __consumer_offsets is identical to a local read — just an offset number. The fact that the underlying segment lives in S3 is invisible to the consumer protocol. But it has a cost implication: a slow consumer reading 2 years of history triggers continuous S3 GET requests (one per segment, paid per request and per byte). For Flipkart's data-engineering team running a one-time backfill of 18 months of events.click from tiered storage, the S3 cost was ₹3.2 lakh just for the GET charges on a single replay job. Their fix was to set remote.fetch.max.bytes higher and run the backfill as a single sequential reader — minimising request count by reading large segments through end-to-end.

Tombstones and the GDPR delete pattern

For compacted topics, the only way to delete a specific record is to publish a tombstone with the same key. RBI's draft data localisation rules and equivalent GDPR provisions require the ability to fully delete a customer's records on request. For a customer_profiles compacted topic at Cred, the delete flow is: receive deletion request → produce a record (key=customer_id, value=null) → wait for compaction (next cleaner pass, ≤ log.cleaner.backoff.ms cycle) → wait for delete.retention.ms → tombstone is purged → key has no trace in the topic. End-to-end this is hours, not seconds; the regulator's deadline is typically 30 days, so the budget is comfortable. For non-compacted topics, the delete pattern is different — you can't surgically remove one record, so PII either lives in compacted topics from the start or is encrypted-at-rest with per-customer keys (crypto-shredding: throw away the key, the records become gibberish).

The migration story: turning on tiered storage on a running topic

KIP-405 supports enabling tiered storage on an existing topic. Once remote.storage.enable=true is set, the broker starts uploading closed segments older than local.retention.ms and deleting their local copies after upload completes. The migration is gradual — historic segments age into the remote tier over the time it takes them to cross the threshold, plus the cleanup cycle. For a topic with 18 months of history and a target local.retention.ms=7d, all but the last 7 days of segments will migrate within ~1 hour (limited by S3 upload bandwidth), freeing up significant local disk. The reverse migration — turning tiered off — is unsupported in 3.6; you'd have to create a new topic and replicate. Plan ahead.

Where this leads next

Retention, compaction, and tiered storage close out the core durability story for a single Kafka topic. With these three knobs you can configure any topic to balance cost, retention requirement, and read latency the way the use case demands.

The next chapter, /wiki/zookeeper-vs-kraft-and-the-controllers-job, walks through the controller — the brain that decides which broker is the leader for which partition, manages ISR membership transitions, and stores the cluster's metadata. Replication, retention, and compaction all rely on the controller being consistent and available; it's the coordination layer underneath everything Build 7 has covered so far.

Beyond Build 7, compaction is a recurring pattern. /wiki/kafka-streams-state-stores-rocksdb-and-changelog-topics (Build 8) uses compacted topics as the durable backing for stream-processing state — every state mutation is written to a compacted "changelog topic", and a recovering task rebuilds its RocksDB instance by replaying the changelog from offset 0. The compaction guarantee that the latest value per key is preserved is what makes that recovery bounded in time; without it, the changelog would grow forever and recovery would scale with throughput rather than with state size.

Tiered storage also reshapes how the lakehouse (/wiki/the-lakehouse-thesis-from-warehouse-plus-lake-to-one-system, Build 12) interacts with stream history. With Kafka's cold tier on the same S3 bucket as Iceberg tables, you can read 5 years of history through Kafka or query the same data as Iceberg tables — the underlying bytes are reachable through both lenses. The next decade of streaming-plus-analytics architecture is built on this convergence.

The mental model to carry forward is layered: replication keeps the partition available, retention bounds local disk usage by age, compaction collapses keyed history, tiered storage shifts cold bytes to cheap storage. Each layer answers a separate question, and the production-grade Kafka topic uses three or four of them at once — none of which are toggled by accident.

References