A single-partition log in Python

Last chapter's 60-line TinyLog has two problems any real broker has to solve before it can hold a day of Razorpay traffic. The file grows forever — there is no way to delete old data without rewriting it. And the readers seek by byte position, so a consumer that stored "I'm at offset 14,239,772" cannot ask the broker for "the record numbered 14,239,772". This chapter fixes both with the two pieces every production log has: segments and the offset index. By the end you have a 200-line Python program that a Kafka engineer could read and recognise.

A real single-partition log is one logical stream split into bounded segment files, with a sparse index per segment that maps logical record numbers to byte positions. Producers append; consumers fetch by record number. Old segments get deleted when retention says so, never rewritten. Build this once and Kafka's storage layout stops being mysterious — it is the same idea, just with replication and network protocol bolted on later.

What "single-partition" means and what we're keeping out

A partition in Kafka language is a single, ordered, append-only sequence of records — one log. A topic is a set of partitions that together look like one logical stream but parallelise across machines. This chapter builds a partition-of-one: one topic, one partition, one process. Partitioning across machines (Chapter 49), replication (Chapter 51), and the network protocol (Chapters 50, 53) each deserve their own walk-through. Today is just storage layout.

What we build fits in 200 lines: a binary record format, segment files that roll at a size limit, a sparse offset index per segment, a producer API that returns the offset it wrote, and a consumer API that fetches by logical offset and streams forward. Concrete enough that when you read Kafka's LogSegment.scala later, the structure is already familiar.

One partition is many segmentsA horizontal partition split into three segment files. Each segment shows a base offset, a fixed-size data file, and a sparse index file mapping logical offsets to byte positions. The active segment at the right end is the only one being written; older segments are read-only and eventually deleted by retention. A single partition = many segments + per-segment indexes segment 00000000 base_offset = 0 000.log + 000.index (10 MB) read-only · may be deleted segment 00084231 base_offset = 84,231 084231.log + .index (10 MB) read-only · retention 7d active segment 00168901 base_offset = 168,901 168901.log + .index (3.2 MB) producer writes here rolls at 10 MB → new segment Fetch(offset=84,300): 1. Find segment with base_offset ≤ 84,300 < next_base → segment 00084231 2. Binary-search .index for largest indexed_offset ≤ 84,300 → (84,288, byte 720,896) 3. Seek to byte 720,896, scan forward ≤12 records → One disk seek per fetch
The partition is the union of segment files, ordered by base offset. Segments are immutable once full; only the active segment is appended. The sparse index per segment lets a consumer with a logical offset land on the right byte in O(log N) seeks.

The picture is the entire architecture. Everything in the rest of this chapter is just code that implements those three steps.

The record format

Before any code, the binary layout. Each record on disk is a fixed-shape header followed by a variable-length payload:

+--------------+------------+----------+-------------+--------+
| logical_off  | timestamp  | key_len  | payload_len | crc32  |  header (24 B)
|   8 bytes    |  8 bytes   |  4 bytes |   4 bytes   | 4 B    |
+--------------+------------+----------+-------------+--------+
| key (key_len bytes) | payload (payload_len bytes)           |  body
+----------------------------------------------------------------+

The logical offset is the record's number within the partition (0, 1, 2, …) — not its byte position. The timestamp is milliseconds since epoch. The CRC32 is over (timestamp || key_len || payload_len || key || payload); if a torn write changes any of those bytes the consumer detects it. Why store the offset on disk: when a consumer asks for offset 84,300 we scan forward from a sparse-index entry like (84,288, byte 720,896) and need to stop at the right record. Without storing the offset we'd have to count records from base_offset, which breaks the moment compaction (Chapter 52) deletes some records mid-segment. Real Kafka stores it in the record header for the same reason.

The sparse index file is even simpler: pairs of (logical_offset, byte_position), 8 bytes each, written every Nth record (we use every 4th for demo size; Kafka uses every 4 KB of log, roughly every 30–100 records).

000084231.index:
  (84231,    0)
  (84235, 1248)
  (84239, 2496)
  (84243, 3744)
  ...

To find offset 84,300 you binary-search the index for the largest entry whose offset is ≤ 84,300, then seek to that byte position in .log and scan forward at most N-1 records (3 in our case). One seek per fetch, not per record — the same property Kafka relies on to serve millions of fetches per second.

Building it

200 lines of Python. Run it locally; it produces real files in /tmp/dlog/.

# single_partition_log.py — segments + sparse index + producer/consumer API
import os, struct, time, zlib, glob, bisect
from typing import Optional, Iterator

HEADER_FMT = ">QQIIIxxxx"   # >= 8+8+4+4+4 = 28 bytes; 4 trailing reserved
HEADER_SIZE = struct.calcsize(HEADER_FMT)
INDEX_ENTRY_FMT = ">QQ"     # logical_offset (8) + byte_position (8) = 16 B
INDEX_ENTRY_SIZE = 16
SEGMENT_BYTES_MAX = 10 * 1024 * 1024     # 10 MB before rolling
INDEX_EVERY_N_RECORDS = 4                # sparse index granularity (demo size)


class Segment:
    """One on-disk pair: NNNNN.log + NNNNN.index, both starting at base_offset."""

    def __init__(self, dir_path: str, base_offset: int):
        self.dir = dir_path
        self.base = base_offset
        self.log_path = os.path.join(dir_path, f"{base_offset:020d}.log")
        self.idx_path = os.path.join(dir_path, f"{base_offset:020d}.index")
        # Load existing index entries (logical_offset list for bisect, byte list parallel)
        self._idx_offsets: list[int] = []
        self._idx_bytes: list[int] = []
        if os.path.exists(self.idx_path):
            with open(self.idx_path, "rb") as f:
                while True:
                    chunk = f.read(INDEX_ENTRY_SIZE)
                    if not chunk:
                        break
                    off, byte = struct.unpack(INDEX_ENTRY_FMT, chunk)
                    self._idx_offsets.append(off)
                    self._idx_bytes.append(byte)
        # Counters
        self.size_bytes = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0
        self.next_offset = self._scan_for_next_offset() if self.size_bytes else self.base
        self._records_since_index = 0
        # Open append handles
        self._log_w = open(self.log_path, "ab")
        self._idx_w = open(self.idx_path, "ab")

    def _scan_for_next_offset(self) -> int:
        # Used only on recovery: walk the .log to find the highest stored offset + 1.
        with open(self.log_path, "rb") as f:
            last = self.base - 1
            while True:
                pos = f.tell()
                hdr = f.read(HEADER_SIZE)
                if len(hdr) < HEADER_SIZE:
                    return last + 1
                logical_off, _ts, klen, plen, _crc = struct.unpack(HEADER_FMT, hdr)
                f.seek(klen + plen, os.SEEK_CUR)
                last = logical_off

    def append(self, key: bytes, payload: bytes) -> int:
        """Append one record; return its logical offset."""
        ts = int(time.time() * 1000)
        offset = self.next_offset
        crc = zlib.crc32(struct.pack(">QII", ts, len(key), len(payload)) + key + payload) & 0xFFFFFFFF
        header = struct.pack(HEADER_FMT, offset, ts, len(key), len(payload), crc)
        record = header + key + payload
        byte_pos = self._log_w.tell()
        self._log_w.write(record)
        self._log_w.flush()
        os.fsync(self._log_w.fileno())              # durable before return
        self.size_bytes += len(record)
        self.next_offset += 1
        # Sparse index: emit every N-th record
        self._records_since_index += 1
        if self._records_since_index >= INDEX_EVERY_N_RECORDS:
            self._idx_w.write(struct.pack(INDEX_ENTRY_FMT, offset, byte_pos))
            self._idx_w.flush()
            os.fsync(self._idx_w.fileno())
            self._idx_offsets.append(offset)
            self._idx_bytes.append(byte_pos)
            self._records_since_index = 0
        return offset

    def is_full(self) -> bool:
        return self.size_bytes >= SEGMENT_BYTES_MAX

    def find_byte_for_offset(self, target: int) -> int:
        """Sparse-index lookup: largest indexed offset ≤ target → its byte position.
        If target is below the first indexed offset, start from byte 0."""
        i = bisect.bisect_right(self._idx_offsets, target) - 1
        return self._idx_bytes[i] if i >= 0 else 0

    def read_from(self, target: int) -> Iterator[tuple[int, bytes, bytes]]:
        start_byte = self.find_byte_for_offset(target)
        with open(self.log_path, "rb") as f:
            f.seek(start_byte)
            while True:
                hdr = f.read(HEADER_SIZE)
                if len(hdr) < HEADER_SIZE:
                    return
                logical_off, ts, klen, plen, crc = struct.unpack(HEADER_FMT, hdr)
                key = f.read(klen)
                payload = f.read(plen)
                expected_crc = zlib.crc32(struct.pack(">QII", ts, klen, plen) + key + payload) & 0xFFFFFFFF
                if crc != expected_crc:
                    raise IOError(f"CRC mismatch at offset {logical_off} — segment corrupted")
                if logical_off >= target:
                    yield logical_off, key, payload


class Partition:
    """One partition = ordered segments + producer/consumer API."""

    def __init__(self, dir_path: str):
        self.dir = dir_path
        os.makedirs(dir_path, exist_ok=True)
        log_files = sorted(glob.glob(os.path.join(dir_path, "*.log")))
        if not log_files:
            self.segments = [Segment(dir_path, base_offset=0)]
        else:
            bases = sorted(int(os.path.basename(p).split(".")[0]) for p in log_files)
            self.segments = [Segment(dir_path, b) for b in bases]
        self._base_offsets = [s.base for s in self.segments]

    @property
    def active(self) -> Segment:
        return self.segments[-1]

    def append(self, key: bytes, payload: bytes) -> int:
        if self.active.is_full():
            new_base = self.active.next_offset
            self.segments.append(Segment(self.dir, new_base))
            self._base_offsets.append(new_base)
        return self.active.append(key, payload)

    def read_from(self, target: int) -> Iterator[tuple[int, bytes, bytes]]:
        # Find the segment whose base_offset ≤ target < next_base.
        i = bisect.bisect_right(self._base_offsets, target) - 1
        if i < 0:
            i = 0
        for seg in self.segments[i:]:
            yield from seg.read_from(target)

    def delete_segments_before(self, cutoff_offset: int) -> int:
        """Retention: drop whole segments whose entire range is below cutoff_offset."""
        kept = []
        deleted = 0
        for seg in self.segments:
            seg_max = seg.next_offset - 1
            if seg_max < cutoff_offset and seg is not self.active:
                os.remove(seg.log_path)
                os.remove(seg.idx_path)
                deleted += 1
            else:
                kept.append(seg)
        self.segments = kept
        self._base_offsets = [s.base for s in kept]
        return deleted


# --- Demo: write 12 records, fetch from offset 7 ---
if __name__ == "__main__":
    import shutil; shutil.rmtree("/tmp/dlog", ignore_errors=True)
    p = Partition("/tmp/dlog")
    for i in range(12):
        off = p.append(key=f"merchant_{i % 3}".encode(),
                       payload=f'{{"tx":"tx_{i:03d}","amount_paise":{(i+1)*1999_00}}}'.encode())
        print(f"append → offset {off}")
    print("\n--- consumer.fetch(from_offset=7) ---")
    for off, key, payload in p.read_from(7):
        print(f"  {off}: {key.decode()} → {payload.decode()}")
    print(f"\nsegments on disk: {[s.base for s in p.segments]}")
    print(f"index entries in segment 0: {list(zip(p.segments[0]._idx_offsets, p.segments[0]._idx_bytes))}")
# Sample run:
append → offset 0
append → offset 1
... (offsets 2..11)

--- consumer.fetch(from_offset=7) ---
  7: merchant_1 → {"tx":"tx_007","amount_paise":1599200}
  8: merchant_2 → {"tx":"tx_008","amount_paise":1799100}
  9: merchant_0 → {"tx":"tx_009","amount_paise":1999000}
  10: merchant_1 → {"tx":"tx_010","amount_paise":2198900}
  11: merchant_2 → {"tx":"tx_011","amount_paise":2398800}

segments on disk: [0]
index entries in segment 0: [(3, 0), (7, 240), (11, 480)]

The lines that matter:

What you have now, beyond the previous chapter's TinyLog: bounded segments with whole-file retention, a logical offset that survives across segments, a sparse index for O(log N) random fetch, a binary record format with CRC, and durability via fsync. The Kafka 0000000000084231.log and 0000000000084231.index files in /var/kafka-logs/ on a Razorpay broker contain the same bytes in the same shape — plus a timestamp index and batched-record wrapper, neither of which changes the core mechanism.

How it survives a crash

If the process is killed mid-append, restart must produce a consistent state. Partition.__init__ lists *.log files and constructs Segment objects in order; each Segment.__init__ calls _scan_for_next_offset if the file has bytes; _scan_for_next_offset walks the segment from byte 0, parsing record headers, and stops on the first partial header. next_offset is now correct; the next append starts from there.

There is one subtle bug our 200 lines do not handle: if the last record's payload is partial (the header was written fully but only some of the body), our scan sees the broken record's offset N and the next append writes N+1 — leaving a CRC-corrupt record between them. Why this is wrong in practice: a consumer reading from offset N would either get a CRC failure (good — read_from raises) or, depending on alignment, get nothing at all. The fix Kafka uses is to truncate the segment to the last known-good record on recovery, re-validating CRCs from the start. Our read_from raising on CRC mismatch is the safety net; a production-grade fix would also truncate_to the byte position before the bad header.

Comparing to what kafka-python shows you

If you've ever pip-installed kafka-python and pointed it at a real broker, the API looks like:

from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers="kafka.razorpay-internal:9092",
                         acks="all", linger_ms=5)
producer.send("payments.tx_events", key=b"merchant_42", value=b'{"tx":"...","amount_paise":1999900}')

consumer = KafkaConsumer("payments.tx_events",
                         bootstrap_servers="kafka.razorpay-internal:9092",
                         group_id="fraud-rules-v2",
                         auto_offset_reset="earliest")
for msg in consumer:
    print(msg.offset, msg.key, msg.value)

Map that onto what we just built:

Reading Kafka source code with our 200 lines in your head, the unfamiliar parts collapse to: replication, network protocol, consumer-group coordination, and the batching knobs. The storage layer — segments, sparse index, fsync, retention as segment delete — is exactly this.

Fetch by logical offset, step by stepThree-step diagram of a fetch: 1) consumer asks for offset, 2) partition routes to segment via base-offset table, 3) segment uses sparse index to land on a byte position then scans forward. Annotations show the cost of each step. fetch(offset=84,300): three steps, one disk seek 1. Partition router base_offsets = [0, 84231, 168901] bisect_right(...) - 1 → index 1 picks segment 84231 O(log S), in-memory ~50 ns 2. Sparse index lookup idx = [(84231, 0), (84288, 720_896), (84352, 1_441_792)…] → start at byte 720,896 O(log I), page-cache resident ~5 µs 3. Seek + forward-scan f.seek(720_896) read sequentially: 84,288 skip … (12 skips) 84,300 → yield, stream on 1 seek + sequential read ~100 µs cold, ~5 µs warm
Three steps, total ~100 microseconds on a cold cache, ~5 microseconds on a warm one. The disk does sequential I/O for the entire stream after the seek; this is what gives the log its throughput.

The figure makes the cost model obvious: a fetch is one disk seek (steps 1 and 2 are in-memory). Once the seek completes, every subsequent record is a sequential page-cache read, which is where the GB/s numbers come from. Doubling the records fetched per call doesn't double seeks — there's still only one. This is why Kafka consumers fetch in big batches (max.poll.records=500+), not one record at a time.

Common confusions

Going deeper

Why the index is sparse, not dense

A dense index — one entry per record — would let you fetch any offset in zero forward-scans. Why not? Cost. A dense index for a 1 TB partition with 1 KB records is itself 16 GB (16 bytes × 1 billion entries). That doesn't fit in page cache, so index lookups become disk-bound — exactly what the index is supposed to avoid. A sparse index at one entry per 4 KB of log is 4 MB per GB — fits in RAM, and the forward-scan is bounded at ~50 records, invisible next to the network round-trip. Kafka's default index.interval.bytes=4096 is this trade-off in numbers. Tighter for very small records; looser for large ones.

Memory-mapped indexes and sendfile

Real Kafka opens the index file with mmap(2) rather than pread. This makes the index lookup a memory access — the kernel handles the page-fault if the page isn't resident. Combined with sendfile(2) for the data path (the broker hands the kernel a file descriptor and a byte range; the kernel streams to the network socket without copying through user-space), Kafka achieves zero-copy fetch: the broker's CPU never touches the record bytes. On a 25 Gbps NIC, that's the only way to saturate the link. Python's stdlib mmap works for reads but skipping the user-space copy needs C-level sendfile; Kafka has it via JNI, Pulsar's BookKeeper has it, and Redpanda (C++, userspace network stack) gets closer to PCIe-to-NIC bandwidth than any JVM broker.

Why partitions, not just bigger segments

A partition is the unit of parallelism in Kafka, not a unit of storage. One partition is one log; one consumer (in a group) can read it. To scale throughput, you split a topic into N partitions across brokers. Per-partition order is preserved; cross-partition order is given up. Razorpay's payments.tx_events topic, for example, runs around 64 partitions across 8 brokers, sized so that the busiest merchant's traffic still fits in one partition (since same-key records must share a partition for order). Confluent's tuning rule is roughly "100 × peak MB/s ÷ 50 MB/s/partition" for the partition count. Chapter 49 has the partition function in detail.

What a 7-day retention policy actually costs

At 100k events/sec, average 500 bytes/event, replication factor 3, daily storage is 100,000 × 500 × 86,400 × 3 ≈ 13 TB/day. Seven days of retention is ~90 TB. On AWS gp3 EBS at ₹6.5/GB/month (April 2026), that's roughly ₹6.4 lakh/month for storage alone, before instances. The 2024 trick of tiered storage — keep the recent 24 hours on local NVMe for fetch latency, tier older segments to S3 at ~₹2/GB/month — drops the bill 5–10×. Chapter 52 covers tiered storage; the point now is that "set retention to 30 days because we might need it" is the line that explodes a streaming team's AWS bill.

Where this leads next

This chapter built a single partition. /wiki/partitions-and-parallelism turns one logical log into N partitioned logs across machines, introducing the partition function (hash(key) mod N) and the rebalancing protocol that makes "add a broker" not destroy ordering.

After that, /wiki/consumer-groups-and-offset-management introduces the cooperation pattern: many consumers, each owning a subset of partitions, sharing the work. /wiki/replication-and-isr-how-kafka-stays-up replicates each partition to F+1 machines — the same fsync-then-respond protocol you wrote, but waiting for every replica before returning. /wiki/retention-compaction-tiered-storage is where segment-delete retention, log compaction, and S3 tiering become first-class. /wiki/the-kafka-protocol-on-the-wire covers what producer and consumer say to the broker over TCP — the piece this chapter deliberately skipped. By the end of Build 7 you'll have walked from "60 lines of TinyLog" to "what Kafka actually does"; the structure does not change, only the surrounding details get added one at a time.

References