A single-partition log in Python
Last chapter's 60-line TinyLog has two problems any real broker has to solve before it can hold a day of Razorpay traffic. The file grows forever — there is no way to delete old data without rewriting it. And the readers seek by byte position, so a consumer that stored "I'm at offset 14,239,772" cannot ask the broker for "the record numbered 14,239,772". This chapter fixes both with the two pieces every production log has: segments and the offset index. By the end you have a 200-line Python program that a Kafka engineer could read and recognise.
A real single-partition log is one logical stream split into bounded segment files, with a sparse index per segment that maps logical record numbers to byte positions. Producers append; consumers fetch by record number. Old segments get deleted when retention says so, never rewritten. Build this once and Kafka's storage layout stops being mysterious — it is the same idea, just with replication and network protocol bolted on later.
What "single-partition" means and what we're keeping out
A partition in Kafka language is a single, ordered, append-only sequence of records — one log. A topic is a set of partitions that together look like one logical stream but parallelise across machines. This chapter builds a partition-of-one: one topic, one partition, one process. Partitioning across machines (Chapter 49), replication (Chapter 51), and the network protocol (Chapters 50, 53) each deserve their own walk-through. Today is just storage layout.
What we build fits in 200 lines: a binary record format, segment files that roll at a size limit, a sparse offset index per segment, a producer API that returns the offset it wrote, and a consumer API that fetches by logical offset and streams forward. Concrete enough that when you read Kafka's LogSegment.scala later, the structure is already familiar.
The picture is the entire architecture. Everything in the rest of this chapter is just code that implements those three steps.
The record format
Before any code, the binary layout. Each record on disk is a fixed-shape header followed by a variable-length payload:
+--------------+------------+----------+-------------+--------+
| logical_off | timestamp | key_len | payload_len | crc32 | header (24 B)
| 8 bytes | 8 bytes | 4 bytes | 4 bytes | 4 B |
+--------------+------------+----------+-------------+--------+
| key (key_len bytes) | payload (payload_len bytes) | body
+----------------------------------------------------------------+
The logical offset is the record's number within the partition (0, 1, 2, …) — not its byte position. The timestamp is milliseconds since epoch. The CRC32 is over (timestamp || key_len || payload_len || key || payload); if a torn write changes any of those bytes the consumer detects it. Why store the offset on disk: when a consumer asks for offset 84,300 we scan forward from a sparse-index entry like (84,288, byte 720,896) and need to stop at the right record. Without storing the offset we'd have to count records from base_offset, which breaks the moment compaction (Chapter 52) deletes some records mid-segment. Real Kafka stores it in the record header for the same reason.
The sparse index file is even simpler: pairs of (logical_offset, byte_position), 8 bytes each, written every Nth record (we use every 4th for demo size; Kafka uses every 4 KB of log, roughly every 30–100 records).
000084231.index:
(84231, 0)
(84235, 1248)
(84239, 2496)
(84243, 3744)
...
To find offset 84,300 you binary-search the index for the largest entry whose offset is ≤ 84,300, then seek to that byte position in .log and scan forward at most N-1 records (3 in our case). One seek per fetch, not per record — the same property Kafka relies on to serve millions of fetches per second.
Building it
200 lines of Python. Run it locally; it produces real files in /tmp/dlog/.
# single_partition_log.py — segments + sparse index + producer/consumer API
import os, struct, time, zlib, glob, bisect
from typing import Optional, Iterator
HEADER_FMT = ">QQIIIxxxx" # >= 8+8+4+4+4 = 28 bytes; 4 trailing reserved
HEADER_SIZE = struct.calcsize(HEADER_FMT)
INDEX_ENTRY_FMT = ">QQ" # logical_offset (8) + byte_position (8) = 16 B
INDEX_ENTRY_SIZE = 16
SEGMENT_BYTES_MAX = 10 * 1024 * 1024 # 10 MB before rolling
INDEX_EVERY_N_RECORDS = 4 # sparse index granularity (demo size)
class Segment:
"""One on-disk pair: NNNNN.log + NNNNN.index, both starting at base_offset."""
def __init__(self, dir_path: str, base_offset: int):
self.dir = dir_path
self.base = base_offset
self.log_path = os.path.join(dir_path, f"{base_offset:020d}.log")
self.idx_path = os.path.join(dir_path, f"{base_offset:020d}.index")
# Load existing index entries (logical_offset list for bisect, byte list parallel)
self._idx_offsets: list[int] = []
self._idx_bytes: list[int] = []
if os.path.exists(self.idx_path):
with open(self.idx_path, "rb") as f:
while True:
chunk = f.read(INDEX_ENTRY_SIZE)
if not chunk:
break
off, byte = struct.unpack(INDEX_ENTRY_FMT, chunk)
self._idx_offsets.append(off)
self._idx_bytes.append(byte)
# Counters
self.size_bytes = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0
self.next_offset = self._scan_for_next_offset() if self.size_bytes else self.base
self._records_since_index = 0
# Open append handles
self._log_w = open(self.log_path, "ab")
self._idx_w = open(self.idx_path, "ab")
def _scan_for_next_offset(self) -> int:
# Used only on recovery: walk the .log to find the highest stored offset + 1.
with open(self.log_path, "rb") as f:
last = self.base - 1
while True:
pos = f.tell()
hdr = f.read(HEADER_SIZE)
if len(hdr) < HEADER_SIZE:
return last + 1
logical_off, _ts, klen, plen, _crc = struct.unpack(HEADER_FMT, hdr)
f.seek(klen + plen, os.SEEK_CUR)
last = logical_off
def append(self, key: bytes, payload: bytes) -> int:
"""Append one record; return its logical offset."""
ts = int(time.time() * 1000)
offset = self.next_offset
crc = zlib.crc32(struct.pack(">QII", ts, len(key), len(payload)) + key + payload) & 0xFFFFFFFF
header = struct.pack(HEADER_FMT, offset, ts, len(key), len(payload), crc)
record = header + key + payload
byte_pos = self._log_w.tell()
self._log_w.write(record)
self._log_w.flush()
os.fsync(self._log_w.fileno()) # durable before return
self.size_bytes += len(record)
self.next_offset += 1
# Sparse index: emit every N-th record
self._records_since_index += 1
if self._records_since_index >= INDEX_EVERY_N_RECORDS:
self._idx_w.write(struct.pack(INDEX_ENTRY_FMT, offset, byte_pos))
self._idx_w.flush()
os.fsync(self._idx_w.fileno())
self._idx_offsets.append(offset)
self._idx_bytes.append(byte_pos)
self._records_since_index = 0
return offset
def is_full(self) -> bool:
return self.size_bytes >= SEGMENT_BYTES_MAX
def find_byte_for_offset(self, target: int) -> int:
"""Sparse-index lookup: largest indexed offset ≤ target → its byte position.
If target is below the first indexed offset, start from byte 0."""
i = bisect.bisect_right(self._idx_offsets, target) - 1
return self._idx_bytes[i] if i >= 0 else 0
def read_from(self, target: int) -> Iterator[tuple[int, bytes, bytes]]:
start_byte = self.find_byte_for_offset(target)
with open(self.log_path, "rb") as f:
f.seek(start_byte)
while True:
hdr = f.read(HEADER_SIZE)
if len(hdr) < HEADER_SIZE:
return
logical_off, ts, klen, plen, crc = struct.unpack(HEADER_FMT, hdr)
key = f.read(klen)
payload = f.read(plen)
expected_crc = zlib.crc32(struct.pack(">QII", ts, klen, plen) + key + payload) & 0xFFFFFFFF
if crc != expected_crc:
raise IOError(f"CRC mismatch at offset {logical_off} — segment corrupted")
if logical_off >= target:
yield logical_off, key, payload
class Partition:
"""One partition = ordered segments + producer/consumer API."""
def __init__(self, dir_path: str):
self.dir = dir_path
os.makedirs(dir_path, exist_ok=True)
log_files = sorted(glob.glob(os.path.join(dir_path, "*.log")))
if not log_files:
self.segments = [Segment(dir_path, base_offset=0)]
else:
bases = sorted(int(os.path.basename(p).split(".")[0]) for p in log_files)
self.segments = [Segment(dir_path, b) for b in bases]
self._base_offsets = [s.base for s in self.segments]
@property
def active(self) -> Segment:
return self.segments[-1]
def append(self, key: bytes, payload: bytes) -> int:
if self.active.is_full():
new_base = self.active.next_offset
self.segments.append(Segment(self.dir, new_base))
self._base_offsets.append(new_base)
return self.active.append(key, payload)
def read_from(self, target: int) -> Iterator[tuple[int, bytes, bytes]]:
# Find the segment whose base_offset ≤ target < next_base.
i = bisect.bisect_right(self._base_offsets, target) - 1
if i < 0:
i = 0
for seg in self.segments[i:]:
yield from seg.read_from(target)
def delete_segments_before(self, cutoff_offset: int) -> int:
"""Retention: drop whole segments whose entire range is below cutoff_offset."""
kept = []
deleted = 0
for seg in self.segments:
seg_max = seg.next_offset - 1
if seg_max < cutoff_offset and seg is not self.active:
os.remove(seg.log_path)
os.remove(seg.idx_path)
deleted += 1
else:
kept.append(seg)
self.segments = kept
self._base_offsets = [s.base for s in kept]
return deleted
# --- Demo: write 12 records, fetch from offset 7 ---
if __name__ == "__main__":
import shutil; shutil.rmtree("/tmp/dlog", ignore_errors=True)
p = Partition("/tmp/dlog")
for i in range(12):
off = p.append(key=f"merchant_{i % 3}".encode(),
payload=f'{{"tx":"tx_{i:03d}","amount_paise":{(i+1)*1999_00}}}'.encode())
print(f"append → offset {off}")
print("\n--- consumer.fetch(from_offset=7) ---")
for off, key, payload in p.read_from(7):
print(f" {off}: {key.decode()} → {payload.decode()}")
print(f"\nsegments on disk: {[s.base for s in p.segments]}")
print(f"index entries in segment 0: {list(zip(p.segments[0]._idx_offsets, p.segments[0]._idx_bytes))}")
# Sample run:
append → offset 0
append → offset 1
... (offsets 2..11)
--- consumer.fetch(from_offset=7) ---
7: merchant_1 → {"tx":"tx_007","amount_paise":1599200}
8: merchant_2 → {"tx":"tx_008","amount_paise":1799100}
9: merchant_0 → {"tx":"tx_009","amount_paise":1999000}
10: merchant_1 → {"tx":"tx_010","amount_paise":2198900}
11: merchant_2 → {"tx":"tx_011","amount_paise":2398800}
segments on disk: [0]
index entries in segment 0: [(3, 0), (7, 240), (11, 480)]
The lines that matter:
HEADER_FMT = ">QQIIIxxxx"— fixed-shape, big-endian binary. JSON-on-disk costs 3–10× more bytes plus a parser per record; the fixed shape is also what makes the CRC check possible — read the header, you know exactly how many bytes follow.crc = zlib.crc32(...)thenos.fsync(self._log_w.fileno())— every record carries a checksum and the data is durable beforeappendreturns. A flipped bit (cosmic ray, dying SSD cell) raisesIOErroron the next read instead of silently delivering corrupt data; withoutfsyncthe buffer-cache holds 30 seconds of writes that disappear on kernel panic. Kafka does exactly this.bisect.bisect_right(self._idx_offsets, target) - 1— the lookup. With 100,000 index entries (10 GB segment, real Kafka density) that's 17 in-memory comparisons. The seek that follows is 1 disk seek + at most N-1 records of forward scan. Why granularity is a knob: tighter (smaller N) means a smaller forward scan but a bigger index. Looser means a smaller index but more bytes scanned per fetch. Kafka defaults to one entry per 4 KB of log — on typical workloads about every 50 records, small enough that the scan is invisible, big enough that the index is 0.025% of the data.if self.active.is_full(): self.segments.append(Segment(...))— segment rolling. Crossing the 10 MB threshold closes the active segment and starts a new one whosebase_offsetis the next logical offset. Old segments become read-only;delete_segments_beforefrees disk byos.remove-ing both files. We never rewrite an old segment — there is no "delete record N from the middle of a file" operation in any real broker, and that is what makes retention cheap.
What you have now, beyond the previous chapter's TinyLog: bounded segments with whole-file retention, a logical offset that survives across segments, a sparse index for O(log N) random fetch, a binary record format with CRC, and durability via fsync. The Kafka 0000000000084231.log and 0000000000084231.index files in /var/kafka-logs/ on a Razorpay broker contain the same bytes in the same shape — plus a timestamp index and batched-record wrapper, neither of which changes the core mechanism.
How it survives a crash
If the process is killed mid-append, restart must produce a consistent state. Partition.__init__ lists *.log files and constructs Segment objects in order; each Segment.__init__ calls _scan_for_next_offset if the file has bytes; _scan_for_next_offset walks the segment from byte 0, parsing record headers, and stops on the first partial header. next_offset is now correct; the next append starts from there.
There is one subtle bug our 200 lines do not handle: if the last record's payload is partial (the header was written fully but only some of the body), our scan sees the broken record's offset N and the next append writes N+1 — leaving a CRC-corrupt record between them. Why this is wrong in practice: a consumer reading from offset N would either get a CRC failure (good — read_from raises) or, depending on alignment, get nothing at all. The fix Kafka uses is to truncate the segment to the last known-good record on recovery, re-validating CRCs from the start. Our read_from raising on CRC mismatch is the safety net; a production-grade fix would also truncate_to the byte position before the bad header.
Comparing to what kafka-python shows you
If you've ever pip-installed kafka-python and pointed it at a real broker, the API looks like:
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(bootstrap_servers="kafka.razorpay-internal:9092",
acks="all", linger_ms=5)
producer.send("payments.tx_events", key=b"merchant_42", value=b'{"tx":"...","amount_paise":1999900}')
consumer = KafkaConsumer("payments.tx_events",
bootstrap_servers="kafka.razorpay-internal:9092",
group_id="fraud-rules-v2",
auto_offset_reset="earliest")
for msg in consumer:
print(msg.offset, msg.key, msg.value)
Map that onto what we just built:
producer.send(topic, key, value)isPartition.append(key, payload)plus a network round-trip and a partition selector that hashes the key. The hash function is the topic of Chapter 49.acks="all"is a replication detail (Chapter 51) — the broker waits until every replica has the record before returning the offset. In our single-machine code there are no replicas;fsyncis the entire durability story.linger_ms=5is a producer-side batching knob to coalesce small writes. The disk side benefits from the same trick (onefsyncfor many records); wefsyncper record, the slowest possible thing. Real producers and brokers both batch.consumer = KafkaConsumer(..., auto_offset_reset="earliest")isPartition.read_from(0). Thegroup_idadds the consumer-group concept (Chapter 50).for msg in consumer:is the same generator pattern as ourread_from.msg.offsetis the logical offset.
Reading Kafka source code with our 200 lines in your head, the unfamiliar parts collapse to: replication, network protocol, consumer-group coordination, and the batching knobs. The storage layer — segments, sparse index, fsync, retention as segment delete — is exactly this.
The figure makes the cost model obvious: a fetch is one disk seek (steps 1 and 2 are in-memory). Once the seek completes, every subsequent record is a sequential page-cache read, which is where the GB/s numbers come from. Doubling the records fetched per call doesn't double seeks — there's still only one. This is why Kafka consumers fetch in big batches (max.poll.records=500+), not one record at a time.
Common confusions
- "The offset is the byte position." Only in toy logs. In a real partition the offset is the logical record number; the byte position is what the index tells you. The reason is that records have variable size, so the byte position is not predictable from the offset alone — you need either a dense index (one entry per record, expensive) or a sparse index plus a forward-scan, which is what brokers actually do.
- "Each segment is a different topic or partition." Wrong shape. A topic has many partitions; each partition has many segments. The segments of one partition together form one logical log; readers fetching from offset N just see records, not "segment boundaries". Boundaries are an implementation detail of how the partition stores its bytes, not something the consumer can observe.
- "Old segments are deleted record-by-record." Never. Deletion is by whole segment. A segment is either entirely present or entirely gone; there is no half-deleted segment. This is what makes retention cheap —
os.remove(file)on a 10 MB chunk versus parsing the file and rewriting it. Compaction (Chapter 52) does involve rewriting, but compaction is not retention; it produces a new segment, then deletes the old one whole. - "
fsyncper append is fine for production." It's correct, but it caps throughput at roughly the disk's sync rate (a few thousand syncs/sec on NVMe, much slower on EBS). Real brokers batch — onefsyncper network round-trip's worth of records. Kafka exposes this asflush.messagesandflush.msconfig; the default is "let the OS decide", which means rely on replication-to-quorum for durability, notfsync. In our single-machine code,fsyncper append is the only option because we have no replicas. - "The CRC catches all errors." It catches single-bit and small multi-bit flips on the bytes covered by the CRC. It does not catch a misdirected write (the disk wrote the right bytes to the wrong sector — rare on SSDs, real on RAID controllers), an OS-level reorder where two records swap places (impossible with
O_APPENDsemantics in POSIX), or application bugs that produce wrong-but-correctly-checksummed data. CRC is a layer of defence, not the only one. - "A consumer can rewind to any offset, even one that doesn't exist." It can rewind to any offset within the partition's retention window. Rewinding to an offset that has been deleted by retention returns "OffsetOutOfRangeException" — the consumer must either reset to the earliest available offset or to the latest. This is why
auto.offset.resetis one of the first configs every Kafka onboarding page covers.
Going deeper
Why the index is sparse, not dense
A dense index — one entry per record — would let you fetch any offset in zero forward-scans. Why not? Cost. A dense index for a 1 TB partition with 1 KB records is itself 16 GB (16 bytes × 1 billion entries). That doesn't fit in page cache, so index lookups become disk-bound — exactly what the index is supposed to avoid. A sparse index at one entry per 4 KB of log is 4 MB per GB — fits in RAM, and the forward-scan is bounded at ~50 records, invisible next to the network round-trip. Kafka's default index.interval.bytes=4096 is this trade-off in numbers. Tighter for very small records; looser for large ones.
Memory-mapped indexes and sendfile
Real Kafka opens the index file with mmap(2) rather than pread. This makes the index lookup a memory access — the kernel handles the page-fault if the page isn't resident. Combined with sendfile(2) for the data path (the broker hands the kernel a file descriptor and a byte range; the kernel streams to the network socket without copying through user-space), Kafka achieves zero-copy fetch: the broker's CPU never touches the record bytes. On a 25 Gbps NIC, that's the only way to saturate the link. Python's stdlib mmap works for reads but skipping the user-space copy needs C-level sendfile; Kafka has it via JNI, Pulsar's BookKeeper has it, and Redpanda (C++, userspace network stack) gets closer to PCIe-to-NIC bandwidth than any JVM broker.
Why partitions, not just bigger segments
A partition is the unit of parallelism in Kafka, not a unit of storage. One partition is one log; one consumer (in a group) can read it. To scale throughput, you split a topic into N partitions across brokers. Per-partition order is preserved; cross-partition order is given up. Razorpay's payments.tx_events topic, for example, runs around 64 partitions across 8 brokers, sized so that the busiest merchant's traffic still fits in one partition (since same-key records must share a partition for order). Confluent's tuning rule is roughly "100 × peak MB/s ÷ 50 MB/s/partition" for the partition count. Chapter 49 has the partition function in detail.
What a 7-day retention policy actually costs
At 100k events/sec, average 500 bytes/event, replication factor 3, daily storage is 100,000 × 500 × 86,400 × 3 ≈ 13 TB/day. Seven days of retention is ~90 TB. On AWS gp3 EBS at ₹6.5/GB/month (April 2026), that's roughly ₹6.4 lakh/month for storage alone, before instances. The 2024 trick of tiered storage — keep the recent 24 hours on local NVMe for fetch latency, tier older segments to S3 at ~₹2/GB/month — drops the bill 5–10×. Chapter 52 covers tiered storage; the point now is that "set retention to 30 days because we might need it" is the line that explodes a streaming team's AWS bill.
Where this leads next
This chapter built a single partition. /wiki/partitions-and-parallelism turns one logical log into N partitioned logs across machines, introducing the partition function (hash(key) mod N) and the rebalancing protocol that makes "add a broker" not destroy ordering.
After that, /wiki/consumer-groups-and-offset-management introduces the cooperation pattern: many consumers, each owning a subset of partitions, sharing the work. /wiki/replication-and-isr-how-kafka-stays-up replicates each partition to F+1 machines — the same fsync-then-respond protocol you wrote, but waiting for every replica before returning. /wiki/retention-compaction-tiered-storage is where segment-delete retention, log compaction, and S3 tiering become first-class. /wiki/the-kafka-protocol-on-the-wire covers what producer and consumer say to the broker over TCP — the piece this chapter deliberately skipped. By the end of Build 7 you'll have walked from "60 lines of TinyLog" to "what Kafka actually does"; the structure does not change, only the surrounding details get added one at a time.
References
- Apache Kafka — Storage layer — the canonical reference for segment files, the
.indexand.timeindexformats,index.interval.bytes, and segment rolling. - Jay Kreps — The Log: What every software engineer should know about real-time data's unifying abstraction — the foundational essay; chapters 47 and 48 are this essay turned into running code.
- Travis Jeffery — How Kafka's Storage Internals Work — clear walkthrough of segment + index file layout with diagrams.
- Confluent — Optimizing Your Apache Kafka Deployment — practical numbers for
index.interval.bytes,segment.bytes, partition count sizing. - LWN — A look at Linux's
sendfile— the system call that gives Kafka its zero-copy fetch path; useful background on why a Python broker cannot match a JVM broker's throughput. - Andriy Zabavskyy — Kafka recovery: what really happens after a crash — the segment-truncation logic that our 200-line code doesn't fully implement.
- /wiki/why-logs-the-one-data-structure-streaming-is-built-on — the previous chapter; the why this chapter implements.
- /wiki/partitions-and-parallelism — the next chapter; takes one partition to N.