Note: Company names, engineers, incidents, numbers, and scaling scenarios in this article are hypothetical — even when they resemble real ones. See the full disclaimer.

A single-partition log in Python

Last chapter's 60-line TinyLog has two problems any real broker has to solve before it can hold a day of PaisaBridge traffic. The file grows forever — there is no way to delete old data without rewriting it. And the readers seek by byte position, so a consumer that stored "I'm at offset 14,239,772" cannot ask the broker for "the record numbered 14,239,772". This chapter fixes both with the two pieces every production log has: segments and the offset index. By the end you have a 200-line Python program that a Kafka engineer could read and recognise.

A real single-partition log is one logical stream split into bounded segment files, with a sparse index per segment that maps logical record numbers to byte positions. Producers append; consumers fetch by record number. Old segments get deleted when retention says so, never rewritten. Build this once and Kafka's storage layout stops being mysterious — it is the same idea, just with replication and network protocol bolted on later.

What "single-partition" means and what we're keeping out

A partition in Kafka language is a single, ordered, append-only sequence of records — one log. A topic is a set of partitions that together look like one logical stream but parallelise across machines. This chapter builds a partition-of-one: one topic, one partition, one process. Partitioning across machines (Chapter 49), replication (Chapter 51), and the network protocol (Chapters 50, 53) each deserve their own walk-through. Today is just storage layout.

What we build fits in 200 lines: a binary record format, segment files that roll at a size limit, a sparse offset index per segment, a producer API that returns the offset it wrote, and a consumer API that fetches by logical offset and streams forward. Concrete enough that when you read Kafka's LogSegment.scala later, the structure is already familiar.

One partition is many segmentsA horizontal partition split into three segment files. Each segment shows a base offset, a fixed-size data file, and a sparse index file mapping logical offsets to byte positions. The active segment at the right end is the only one being written; older segments are read-only and eventually deleted by retention. A single partition = many segments + per-segment indexes segment 00000000 base_offset = 0 000.log + 000.index (10 MB) read-only · may be deleted segment 00084231 base_offset = 84,231 084231.log + .index (10 MB) read-only · retention 7d active segment 00168901 base_offset = 168,901 168901.log + .index (3.2 MB) producer writes here rolls at 10 MB → new segment Fetch(offset=84,300): 1. Find segment with base_offset ≤ 84,300 < next_base → segment 00084231 2. Binary-search .index for largest indexed_offset ≤ 84,300 → (84,288, byte 720,896) 3. Seek to byte 720,896, scan forward ≤12 records → One disk seek per fetch
The partition is the union of segment files, ordered by base offset. Segments are immutable once full; only the active segment is appended. The sparse index per segment lets a consumer with a logical offset land on the right byte in O(log N) seeks.

The picture is the entire architecture. Everything in the rest of this chapter is just code that implements those three steps.

The record format

Before any code, the binary layout. Each record on disk is a fixed-shape header followed by a variable-length payload:

+--------------+------------+----------+-------------+--------+
| logical_off  | timestamp  | key_len  | payload_len | crc32  |  header (24 B)
|   8 bytes    |  8 bytes   |  4 bytes |   4 bytes   | 4 B    |
+--------------+------------+----------+-------------+--------+
| key (key_len bytes) | payload (payload_len bytes)           |  body
+----------------------------------------------------------------+

The logical offset is the record's number within the partition (0, 1, 2, …) — not its byte position. The timestamp is milliseconds since epoch. The CRC32 is over (timestamp || key_len || payload_len || key || payload); if a torn write changes any of those bytes the consumer detects it. Why store the offset on disk: when a consumer asks for offset 84,300 we scan forward from a sparse-index entry like (84,288, byte 720,896) and need to stop at the right record. Without storing the offset we'd have to count records from base_offset, which breaks the moment compaction (Chapter 52) deletes some records mid-segment. Real Kafka stores it in the record header for the same reason.

The sparse index file is even simpler: pairs of (logical_offset, byte_position), 8 bytes each, written every Nth record (we use every 4th for demo size; Kafka uses every 4 KB of log, roughly every 30–100 records).

000084231.index:
  (84231,    0)
  (84235, 1248)
  (84239, 2496)
  (84243, 3744)
  ...

To find offset 84,300 you binary-search the index for the largest entry whose offset is ≤ 84,300, then seek to that byte position in .log and scan forward at most N-1 records (3 in our case). One seek per fetch, not per record — the same property Kafka relies on to serve millions of fetches per second.

Building it

200 lines of Python. Run it locally; it produces real files in /tmp/dlog/.

# single_partition_log.py — segments + sparse index + producer/consumer API
import os, struct, time, zlib, glob, bisect
from typing import Optional, Iterator

HEADER_FMT = ">QQIIIxxxx"   # >= 8+8+4+4+4 = 28 bytes; 4 trailing reserved
HEADER_SIZE = struct.calcsize(HEADER_FMT)
INDEX_ENTRY_FMT = ">QQ"     # logical_offset (8) + byte_position (8) = 16 B
INDEX_ENTRY_SIZE = 16
SEGMENT_BYTES_MAX = 10 * 1024 * 1024     # 10 MB before rolling
INDEX_EVERY_N_RECORDS = 4                # sparse index granularity (demo size)


class Segment:
    """One on-disk pair: NNNNN.log + NNNNN.index, both starting at base_offset."""

    def __init__(self, dir_path: str, base_offset: int):
        self.dir = dir_path
        self.base = base_offset
        self.log_path = os.path.join(dir_path, f"{base_offset:020d}.log")
        self.idx_path = os.path.join(dir_path, f"{base_offset:020d}.index")
        # Load existing index entries (logical_offset list for bisect, byte list parallel)
        self._idx_offsets: list[int] = []
        self._idx_bytes: list[int] = []
        if os.path.exists(self.idx_path):
            with open(self.idx_path, "rb") as f:
                while True:
                    chunk = f.read(INDEX_ENTRY_SIZE)
                    if not chunk:
                        break
                    off, byte = struct.unpack(INDEX_ENTRY_FMT, chunk)
                    self._idx_offsets.append(off)
                    self._idx_bytes.append(byte)
        # Counters
        self.size_bytes = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0
        self.next_offset = self._scan_for_next_offset() if self.size_bytes else self.base
        self._records_since_index = 0
        # Open append handles
        self._log_w = open(self.log_path, "ab")
        self._idx_w = open(self.idx_path, "ab")

    def _scan_for_next_offset(self) -> int:
        # Used only on recovery: walk the .log to find the highest stored offset + 1.
        with open(self.log_path, "rb") as f:
            last = self.base - 1
            while True:
                pos = f.tell()
                hdr = f.read(HEADER_SIZE)
                if len(hdr) < HEADER_SIZE:
                    return last + 1
                logical_off, _ts, klen, plen, _crc = struct.unpack(HEADER_FMT, hdr)
                f.seek(klen + plen, os.SEEK_CUR)
                last = logical_off

    def append(self, key: bytes, payload: bytes) -> int:
        """Append one record; return its logical offset."""
        ts = int(time.time() * 1000)
        offset = self.next_offset
        crc = zlib.crc32(struct.pack(">QII", ts, len(key), len(payload)) + key + payload) & 0xFFFFFFFF
        header = struct.pack(HEADER_FMT, offset, ts, len(key), len(payload), crc)
        record = header + key + payload
        byte_pos = self._log_w.tell()
        self._log_w.write(record)
        self._log_w.flush()
        os.fsync(self._log_w.fileno())              # durable before return
        self.size_bytes += len(record)
        self.next_offset += 1
        # Sparse index: emit every N-th record
        self._records_since_index += 1
        if self._records_since_index >= INDEX_EVERY_N_RECORDS:
            self._idx_w.write(struct.pack(INDEX_ENTRY_FMT, offset, byte_pos))
            self._idx_w.flush()
            os.fsync(self._idx_w.fileno())
            self._idx_offsets.append(offset)
            self._idx_bytes.append(byte_pos)
            self._records_since_index = 0
        return offset

    def is_full(self) -> bool:
        return self.size_bytes >= SEGMENT_BYTES_MAX

    def find_byte_for_offset(self, target: int) -> int:
        """Sparse-index lookup: largest indexed offset ≤ target → its byte position.
        If target is below the first indexed offset, start from byte 0."""
        i = bisect.bisect_right(self._idx_offsets, target) - 1
        return self._idx_bytes[i] if i >= 0 else 0

    def read_from(self, target: int) -> Iterator[tuple[int, bytes, bytes]]:
        start_byte = self.find_byte_for_offset(target)
        with open(self.log_path, "rb") as f:
            f.seek(start_byte)
            while True:
                hdr = f.read(HEADER_SIZE)
                if len(hdr) < HEADER_SIZE:
                    return
                logical_off, ts, klen, plen, crc = struct.unpack(HEADER_FMT, hdr)
                key = f.read(klen)
                payload = f.read(plen)
                expected_crc = zlib.crc32(struct.pack(">QII", ts, klen, plen) + key + payload) & 0xFFFFFFFF
                if crc != expected_crc:
                    raise IOError(f"CRC mismatch at offset {logical_off} — segment corrupted")
                if logical_off >= target:
                    yield logical_off, key, payload


class Partition:
    """One partition = ordered segments + producer/consumer API."""

    def __init__(self, dir_path: str):
        self.dir = dir_path
        os.makedirs(dir_path, exist_ok=True)
        log_files = sorted(glob.glob(os.path.join(dir_path, "*.log")))
        if not log_files:
            self.segments = [Segment(dir_path, base_offset=0)]
        else:
            bases = sorted(int(os.path.basename(p).split(".")[0]) for p in log_files)
            self.segments = [Segment(dir_path, b) for b in bases]
        self._base_offsets = [s.base for s in self.segments]

    @property
    def active(self) -> Segment:
        return self.segments[-1]

    def append(self, key: bytes, payload: bytes) -> int:
        if self.active.is_full():
            new_base = self.active.next_offset
            self.segments.append(Segment(self.dir, new_base))
            self._base_offsets.append(new_base)
        return self.active.append(key, payload)

    def read_from(self, target: int) -> Iterator[tuple[int, bytes, bytes]]:
        # Find the segment whose base_offset ≤ target < next_base.
        i = bisect.bisect_right(self._base_offsets, target) - 1
        if i < 0:
            i = 0
        for seg in self.segments[i:]:
            yield from seg.read_from(target)

    def delete_segments_before(self, cutoff_offset: int) -> int:
        """Retention: drop whole segments whose entire range is below cutoff_offset."""
        kept = []
        deleted = 0
        for seg in self.segments:
            seg_max = seg.next_offset - 1
            if seg_max < cutoff_offset and seg is not self.active:
                os.remove(seg.log_path)
                os.remove(seg.idx_path)
                deleted += 1
            else:
                kept.append(seg)
        self.segments = kept
        self._base_offsets = [s.base for s in kept]
        return deleted


# --- Demo: write 12 records, fetch from offset 7 ---
if __name__ == "__main__":
    import shutil; shutil.rmtree("/tmp/dlog", ignore_errors=True)
    p = Partition("/tmp/dlog")
    for i in range(12):
        off = p.append(key=f"merchant_{i % 3}".encode(),
                       payload=f'{{"tx":"tx_{i:03d}","amount_paise":{(i+1)*1999_00}}}'.encode())
        print(f"append → offset {off}")
    print("\n--- consumer.fetch(from_offset=7) ---")
    for off, key, payload in p.read_from(7):
        print(f"  {off}: {key.decode()} → {payload.decode()}")
    print(f"\nsegments on disk: {[s.base for s in p.segments]}")
    print(f"index entries in segment 0: {list(zip(p.segments[0]._idx_offsets, p.segments[0]._idx_bytes))}")
# Sample run:
append → offset 0
append → offset 1
... (offsets 2..11)

--- consumer.fetch(from_offset=7) ---
  7: merchant_1 → {"tx":"tx_007","amount_paise":1599200}
  8: merchant_2 → {"tx":"tx_008","amount_paise":1799100}
  9: merchant_0 → {"tx":"tx_009","amount_paise":1999000}
  10: merchant_1 → {"tx":"tx_010","amount_paise":2198900}
  11: merchant_2 → {"tx":"tx_011","amount_paise":2398800}

segments on disk: [0]
index entries in segment 0: [(3, 0), (7, 240), (11, 480)]

The lines that matter:

  • HEADER_FMT = ">QQIIIxxxx" — fixed-shape, big-endian binary. JSON-on-disk costs 3–10× more bytes plus a parser per record; the fixed shape is also what makes the CRC check possible — read the header, you know exactly how many bytes follow.
  • crc = zlib.crc32(...) then os.fsync(self._log_w.fileno()) — every record carries a checksum and the data is durable before append returns. A flipped bit (cosmic ray, dying SSD cell) raises IOError on the next read instead of silently delivering corrupt data; without fsync the buffer-cache holds 30 seconds of writes that disappear on kernel panic. Kafka does exactly this.
  • bisect.bisect_right(self._idx_offsets, target) - 1 — the lookup. With 100,000 index entries (10 GB segment, real Kafka density) that's 17 in-memory comparisons. The seek that follows is 1 disk seek + at most N-1 records of forward scan. Why granularity is a knob: tighter (smaller N) means a smaller forward scan but a bigger index. Looser means a smaller index but more bytes scanned per fetch. Kafka defaults to one entry per 4 KB of log — on typical workloads about every 50 records, small enough that the scan is invisible, big enough that the index is 0.025% of the data.
  • if self.active.is_full(): self.segments.append(Segment(...)) — segment rolling. Crossing the 10 MB threshold closes the active segment and starts a new one whose base_offset is the next logical offset. Old segments become read-only; delete_segments_before frees disk by os.remove-ing both files. We never rewrite an old segment — there is no "delete record N from the middle of a file" operation in any real broker, and that is what makes retention cheap.

What you have now, beyond the previous chapter's TinyLog: bounded segments with whole-file retention, a logical offset that survives across segments, a sparse index for O(log N) random fetch, a binary record format with CRC, and durability via fsync. The Kafka 0000000000084231.log and 0000000000084231.index files in /var/kafka-logs/ on a PaisaBridge broker contain the same bytes in the same shape — plus a timestamp index and batched-record wrapper, neither of which changes the core mechanism.

How it survives a crash

If the process is killed mid-append, restart must produce a consistent state. Partition.__init__ lists *.log files and constructs Segment objects in order; each Segment.__init__ calls _scan_for_next_offset if the file has bytes; _scan_for_next_offset walks the segment from byte 0, parsing record headers, and stops on the first partial header. next_offset is now correct; the next append starts from there.

There is one subtle bug our 200 lines do not handle: if the last record's payload is partial (the header was written fully but only some of the body), our scan sees the broken record's offset N and the next append writes N+1 — leaving a CRC-corrupt record between them. Why this is wrong in practice: a consumer reading from offset N would either get a CRC failure (good — read_from raises) or, depending on alignment, get nothing at all. The fix Kafka uses is to truncate the segment to the last known-good record on recovery, re-validating CRCs from the start. Our read_from raising on CRC mismatch is the safety net; a production-grade fix would also truncate_to the byte position before the bad header.

Comparing to what kafka-python shows you

If you've ever pip-installed kafka-python and pointed it at a real broker, the API looks like:

from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers="kafka.razorpay-internal:9092",
                         acks="all", linger_ms=5)
producer.send("payments.tx_events", key=b"merchant_42", value=b'{"tx":"...","amount_paise":1999900}')

consumer = KafkaConsumer("payments.tx_events",
                         bootstrap_servers="kafka.razorpay-internal:9092",
                         group_id="fraud-rules-v2",
                         auto_offset_reset="earliest")
for msg in consumer:
    print(msg.offset, msg.key, msg.value)

Map that onto what we just built:

  • producer.send(topic, key, value) is Partition.append(key, payload) plus a network round-trip and a partition selector that hashes the key. The hash function is the topic of Chapter 49.
  • acks="all" is a replication detail (Chapter 51) — the broker waits until every replica has the record before returning the offset. In our single-machine code there are no replicas; fsync is the entire durability story.
  • linger_ms=5 is a producer-side batching knob to coalesce small writes. The disk side benefits from the same trick (one fsync for many records); we fsync per record, the slowest possible thing. Real producers and brokers both batch.
  • consumer = KafkaConsumer(..., auto_offset_reset="earliest") is Partition.read_from(0). The group_id adds the consumer-group concept (Chapter 50).
  • for msg in consumer: is the same generator pattern as our read_from. msg.offset is the logical offset.

Reading Kafka source code with our 200 lines in your head, the unfamiliar parts collapse to: replication, network protocol, consumer-group coordination, and the batching knobs. The storage layer — segments, sparse index, fsync, retention as segment delete — is exactly this.

Fetch by logical offset, step by stepThree-step diagram of a fetch: 1) consumer asks for offset, 2) partition routes to segment via base-offset table, 3) segment uses sparse index to land on a byte position then scans forward. Annotations show the cost of each step. fetch(offset=84,300): three steps, one disk seek 1. Partition router base_offsets = [0, 84231, 168901] bisect_right(...) - 1 → index 1 picks segment 84231 O(log S), in-memory ~50 ns 2. Sparse index lookup idx = [(84231, 0), (84288, 720_896), (84352, 1_441_792)…] → start at byte 720,896 O(log I), page-cache resident ~5 µs 3. Seek + forward-scan f.seek(720_896) read sequentially: 84,288 skip … (12 skips) 84,300 → yield, stream on 1 seek + sequential read ~100 µs cold, ~5 µs warm
Three steps, total ~100 microseconds on a cold cache, ~5 microseconds on a warm one. The disk does sequential I/O for the entire stream after the seek; this is what gives the log its throughput.

The figure makes the cost model obvious: a fetch is one disk seek (steps 1 and 2 are in-memory). Once the seek completes, every subsequent record is a sequential page-cache read, which is where the GB/s numbers come from. Doubling the records fetched per call doesn't double seeks — there's still only one. This is why Kafka consumers fetch in big batches (max.poll.records=500+), not one record at a time.

Common confusions

  • "The offset is the byte position." Only in toy logs. In a real partition the offset is the logical record number; the byte position is what the index tells you. The reason is that records have variable size, so the byte position is not predictable from the offset alone — you need either a dense index (one entry per record, expensive) or a sparse index plus a forward-scan, which is what brokers actually do.
  • "Each segment is a different topic or partition." Wrong shape. A topic has many partitions; each partition has many segments. The segments of one partition together form one logical log; readers fetching from offset N just see records, not "segment boundaries". Boundaries are an implementation detail of how the partition stores its bytes, not something the consumer can observe.
  • "Old segments are deleted record-by-record." Never. Deletion is by whole segment. A segment is either entirely present or entirely gone; there is no half-deleted segment. This is what makes retention cheap — os.remove(file) on a 10 MB chunk versus parsing the file and rewriting it. Compaction (Chapter 52) does involve rewriting, but compaction is not retention; it produces a new segment, then deletes the old one whole.
  • "fsync per append is fine for production." It's correct, but it caps throughput at roughly the disk's sync rate (a few thousand syncs/sec on NVMe, much slower on EBS). Real brokers batch — one fsync per network round-trip's worth of records. Kafka exposes this as flush.messages and flush.ms config; the default is "let the OS decide", which means rely on replication-to-quorum for durability, not fsync. In our single-machine code, fsync per append is the only option because we have no replicas.
  • "The CRC catches all errors." It catches single-bit and small multi-bit flips on the bytes covered by the CRC. It does not catch a misdirected write (the disk wrote the right bytes to the wrong sector — rare on SSDs, real on RAID controllers), an OS-level reorder where two records swap places (impossible with O_APPEND semantics in POSIX), or application bugs that produce wrong-but-correctly-checksummed data. CRC is a layer of defence, not the only one.
  • "A consumer can rewind to any offset, even one that doesn't exist." It can rewind to any offset within the partition's retention window. Rewinding to an offset that has been deleted by retention returns "OffsetOutOfRangeException" — the consumer must either reset to the earliest available offset or to the latest. This is why auto.offset.reset is one of the first configs every Kafka onboarding page covers.

Going deeper

Why the index is sparse, not dense

A dense index — one entry per record — would let you fetch any offset in zero forward-scans. Why not? Cost. A dense index for a 1 TB partition with 1 KB records is itself 16 GB (16 bytes × 1 billion entries). That doesn't fit in page cache, so index lookups become disk-bound — exactly what the index is supposed to avoid. A sparse index at one entry per 4 KB of log is 4 MB per GB — fits in RAM, and the forward-scan is bounded at ~50 records, invisible next to the network round-trip. Kafka's default index.interval.bytes=4096 is this trade-off in numbers. Tighter for very small records; looser for large ones.

Memory-mapped indexes and sendfile

Real Kafka opens the index file with mmap(2) rather than pread. This makes the index lookup a memory access — the kernel handles the page-fault if the page isn't resident. Combined with sendfile(2) for the data path (the broker hands the kernel a file descriptor and a byte range; the kernel streams to the network socket without copying through user-space), Kafka achieves zero-copy fetch: the broker's CPU never touches the record bytes. On a 25 Gbps NIC, that's the only way to saturate the link. Python's stdlib mmap works for reads but skipping the user-space copy needs C-level sendfile; Kafka has it via JNI, Pulsar's BookKeeper has it, and Redpanda (C++, userspace network stack) gets closer to PCIe-to-NIC bandwidth than any JVM broker.

Why partitions, not just bigger segments

A partition is the unit of parallelism in Kafka, not a unit of storage. One partition is one log; one consumer (in a group) can read it. To scale throughput, you split a topic into N partitions across brokers. Per-partition order is preserved; cross-partition order is given up. PaisaBridge's payments.tx_events topic, for example, runs around 64 partitions across 8 brokers, sized so that the busiest merchant's traffic still fits in one partition (since same-key records must share a partition for order). Confluent's tuning rule is roughly "100 × peak MB/s ÷ 50 MB/s/partition" for the partition count. Chapter 49 has the partition function in detail.

What a 7-day retention policy actually costs

At 100k events/sec, average 500 bytes/event, replication factor 3, daily storage is 100,000 × 500 × 86,400 × 3 ≈ 13 TB/day. Seven days of retention is ~90 TB. On AWS gp3 EBS at ₹6.5/GB/month (April 2026), that's roughly ₹6.4 lakh/month for storage alone, before instances. The 2024 trick of tiered storage — keep the recent 24 hours on local NVMe for fetch latency, tier older segments to S3 at ~₹2/GB/month — drops the bill 5–10×. Chapter 52 covers tiered storage; the point now is that "set retention to 30 days because we might need it" is the line that explodes a streaming team's AWS bill.

Where this leads next

This chapter built a single partition. /wiki/partitions-and-parallelism turns one logical log into N partitioned logs across machines, introducing the partition function (hash(key) mod N) and the rebalancing protocol that makes "add a broker" not destroy ordering.

After that, /wiki/consumer-groups-and-offset-management introduces the cooperation pattern: many consumers, each owning a subset of partitions, sharing the work. /wiki/replication-and-isr-how-kafka-stays-up replicates each partition to F+1 machines — the same fsync-then-respond protocol you wrote, but waiting for every replica before returning. /wiki/retention-compaction-tiered-storage is where segment-delete retention, log compaction, and S3 tiering become first-class. /wiki/the-kafka-protocol-on-the-wire covers what producer and consumer say to the broker over TCP — the piece this chapter deliberately skipped. By the end of Build 7 you'll have walked from "60 lines of TinyLog" to "what Kafka actually does"; the structure does not change, only the surrounding details get added one at a time.

References