Streaming writes into a lakehouse

A Zerodha tick-data engineer wires Kafka topic nse-trade-ticks into an Iceberg table on S3 at 6:14 a.m. on the morning of an RBI policy day, expecting volume to peak around 11:30 a.m. when the announcement lands. By 11:32, the producer is sustaining 78,000 messages per second across 32 Kafka partitions. The naive integration — one Parquet file per Kafka message — would emit 78,000 files per second, 4.68 lakh files per minute, 28 crore files per hour. S3 PUT charges alone would be ₹3,200 per minute (₹0.0042 per 1,000 PUTs at ap-south-1), and the resulting table would have a query latency in minutes because Iceberg's manifest tree would balloon to gigabytes before lunch. The Iceberg commit protocol (chapter on manifest files and the commit protocol) is also fundamentally a batch primitive — every commit is a CAS swap of the table's metadata pointer, and you cannot CAS-swap 78,000 times a second. Streaming writes into a lakehouse is the engineering of a bridge between a per-message world and a per-snapshot world, and the bridge has three load-bearing pieces: a micro-batch buffer, a two-phase commit between the stream framework and the table format, and a committer thread that snapshots only when the buffer is full enough or old enough.

A streaming engine cannot CAS-commit to Iceberg or Delta on every event — the commit cost is O(seconds) and the event rate is O(thousands/sec). The bridge is a sink that buffers events into local Parquet files, then triggers a commit when the buffer hits a size threshold (default ~128 MB) or a time threshold (default ~1 minute). To survive crashes, the sink uses a two-phase commit (Flink's TwoPhaseCommitSinkFunction): pre-commit writes the data files but does NOT update the table; the commit step CAS-swaps the manifest only after the framework's checkpoint succeeds. This gives end-to-end exactly-once and bounds the small-file count to roughly partitions × commits-per-hour. Compaction handles whatever small files leak through.

Why per-event commits are physically impossible

The Iceberg commit protocol writes a new metadata JSON pointing to a new manifest list, then atomically CAS-swaps the table's metadata_location pointer in the catalog (Glue, Hive Metastore, REST). The fast path takes 200–800 ms — read current metadata, append a manifest entry, PUT the new metadata, CAS the pointer. The slow path, when concurrent writers conflict, takes 2–10 seconds because the CAS retries with exponential backoff. Why no commit can be faster than ~200 ms: each commit needs at least three S3 round trips (read metadata, write metadata, write manifest list), a catalog round trip (CAS the pointer), and a snapshot ID generation step. Even on a perfectly tuned S3 bucket in the same region, that's 4 sequential network hops, each ~30–50 ms. You cannot commit faster than the speed of light through your network stack, and the commit work is inherently sequential per table.

So the math is brutal: at 200 ms per commit, the maximum commit rate is 5 per second per table. At 78,000 events per second, a per-event commit would need 15,600× the achievable rate. Even at 2,000 events per second (a moderate Razorpay payment-events firehose), per-event commits exceed the protocol's capacity by 400×.

The fix is to make every commit cover many events. If a single commit covers 1 minute of events at 78,000 events/sec, that's 4.68 million events per commit and only 60 commits per hour — well within the protocol's capacity. The catch: between commits, those events are sitting somewhere. If the streaming framework crashes during that minute, you must not lose them; if it restarts, you must not double-write them. That "somewhere" plus the protocol around it is the rest of this chapter.

Micro-batch buffering between Kafka and IcebergDiagram showing Kafka topic on the left, a Flink streaming sink in the middle that buffers events into staged Parquet files, and an Iceberg table on the right. A timer or size threshold triggers a commit that promotes the staged files into the table. Per-event stream → per-snapshot table: where the buffering lives Kafka topic nse-trade-ticks 32 partitions 78,000 ev/sec tick-1 tick-2 tick-3 tick-N events arrive continuously offset = 4,138,902 Flink streaming sink writer task per Kafka partition staged Parquet files (open) part-00037-staged.parquet · 84 MB growing as events buffer in commit trigger size ≥ 128 MB OR age ≥ 60 s whichever comes first checkpoint barrier (Flink) pre-commit + commit, every 30 s two-phase Iceberg table trades.iceberg snapshot 1411 snapshot 1412 snapshot 1413 snapshot 1414 (current) commit cadence: 60/hr files added: ~2,000 avg file size: 96 MB manageable manifest tree size
The streaming sink is the bridge. It receives per-event traffic from Kafka but emits per-batch commits to Iceberg. Staged Parquet files accumulate locally; a size or age trigger fires a two-phase commit that promotes them into a new Iceberg snapshot. The cadence is tuned so that file count stays bounded — typically 60 commits/hour, ~2,000 files added per commit, 96 MB average file size.

The two-phase commit between Flink and Iceberg

A Flink streaming job uses checkpoints to recover from failures. Every 30 seconds (default), the framework injects a checkpoint barrier into the data flow. When the barrier reaches a sink, the sink must persist any in-flight state and acknowledge the barrier; once every operator acknowledges, the checkpoint is complete and Flink updates the source's offset checkpoint. If anything in the pipeline crashes, Flink restarts from the last completed checkpoint — replaying messages from that Kafka offset.

For an Iceberg sink to be exactly-once, the commit to Iceberg must align with this checkpoint protocol. The pattern is Flink's TwoPhaseCommitSinkFunction:

  1. Pre-commit. Triggered when the checkpoint barrier reaches the sink. Close all open Parquet files (flush buffers, write footer with row-group statistics, upload to S3). Record their S3 paths in the sink's checkpointed state. Do not commit to the Iceberg table yet. The Parquet files are on S3 but invisible — no manifest references them.
  2. Notify checkpoint complete. Flink confirms the checkpoint succeeded across all operators (sources, transforms, sinks).
  3. Commit. Triggered after notification. The sink now CAS-swaps the Iceberg metadata pointer to add the staged files. This is the actual visibility flip — readers can now see the new rows.

Why this ordering is exactly-once: if the sink crashes between step 1 and step 2, Flink restarts from the previous checkpoint. The staged Parquet files on S3 are orphaned (no manifest references them), and a janitor process eventually deletes them. No double-write — the Iceberg table never saw them. If the sink crashes between step 2 and step 3, the sink's checkpointed state still has the staged file paths; on restart, the sink re-runs the commit step. Iceberg's commit is idempotent (the manifest is keyed by file path; re-adding the same file is a no-op), so re-running step 3 lands the same data exactly once. The crash window is narrow — milliseconds between checkpoint completion and the catalog CAS — but the protocol must handle it.

The Iceberg connector for Flink (in org.apache.iceberg:iceberg-flink-runtime-1.18, used at production scale by Netflix, Apple, ByteDance) implements this. Spark Structured Streaming has an equivalent via the IcebergSource / streaming.write.checkpoint-location config — the mechanics differ but the two-phase shape is the same.

The Delta Lake equivalent (DeltaSink in Spark Structured Streaming) uses a transaction log instead of a manifest, but the two-phase commit shape is identical: stage files first, atomically append to the _delta_log/ directory only after the framework checkpoint succeeds.

A working streaming-write simulation

The mechanics are clearest in 90 lines. Here's a minimal model that simulates Kafka events flowing into an Iceberg-style sink with size and age triggers, two-phase commit, and crash recovery.

# streaming_sink.py — minimal model of a Flink-style two-phase commit sink for Iceberg.
# Demonstrates: micro-batch buffering, size/age commit triggers, two-phase commit,
# orphan file cleanup on crash, exactly-once on restart.

import time, random, uuid
from dataclasses import dataclass, field

@dataclass
class StagedFile:
    path: str
    rows: list = field(default_factory=list)
    size_bytes: int = 0
    opened_at: float = field(default_factory=time.time)

@dataclass
class IcebergTable:
    snapshots: list = field(default_factory=list)
    committed_files: set = field(default_factory=set)

    def commit(self, file_paths):
        new_files = [p for p in file_paths if p not in self.committed_files]
        if not new_files: return self.snapshots[-1] if self.snapshots else None
        snap = {"id": len(self.snapshots) + 1, "files": new_files, "ts": time.time()}
        self.snapshots.append(snap); self.committed_files.update(new_files)
        return snap

class StreamingSink:
    SIZE_TRIGGER = 128 * 1024 * 1024   # 128 MB
    AGE_TRIGGER  = 60                   # seconds
    AVG_ROW_BYTES = 280

    def __init__(self, table, partition_id):
        self.table = table; self.partition_id = partition_id
        self.staged = None; self.checkpointed_paths = []

    def receive(self, event):
        if self.staged is None:
            self.staged = StagedFile(path=f"data/p{self.partition_id}/{uuid.uuid4().hex[:8]}.parquet")
        self.staged.rows.append(event)
        self.staged.size_bytes += self.AVG_ROW_BYTES
        if self._should_close():
            self._close_current()

    def _should_close(self):
        if self.staged is None: return False
        return (self.staged.size_bytes >= self.SIZE_TRIGGER
                or time.time() - self.staged.opened_at >= self.AGE_TRIGGER)

    def _close_current(self):
        # In real Iceberg: write Parquet footer, upload to S3.
        self.checkpointed_paths.append(self.staged.path)
        self.staged = None

    def pre_commit(self):
        # Phase 1: close any open file, persist paths in checkpointed state.
        if self.staged is not None: self._close_current()
        return list(self.checkpointed_paths)   # this is what Flink checkpoints

    def commit(self):
        # Phase 2: atomic CAS to the Iceberg catalog. Iceberg dedups by path.
        snap = self.table.commit(self.checkpointed_paths)
        self.checkpointed_paths = []
        return snap

    def crash_and_recover(self, last_checkpoint_paths):
        # On restart, Flink restores `checkpointed_paths` from the last checkpoint.
        self.checkpointed_paths = list(last_checkpoint_paths)
        self.staged = None
        # Re-run commit; Iceberg's idempotency makes this safe.
        return self.commit()

# Simulate 1 minute of Zerodha tick traffic on one Kafka partition
random.seed(7); table = IcebergTable(); sink = StreamingSink(table, partition_id=12)

for i in range(3000):                            # 3000 events at ~50/sec = 60s
    sink.receive({"txn": f"TXN-{i}", "px": random.uniform(100, 200)})

# Flink checkpoint barrier arrives — two-phase commit
checkpoint_state = sink.pre_commit()             # phase 1: close + record
print(f"Pre-commit recorded {len(checkpoint_state)} staged files")

# Crash simulation: pretend the sink dies HERE, before commit() runs
# On restart, Flink restores the checkpoint state and re-runs commit()
recovered_sink = StreamingSink(table, partition_id=12)
snap = recovered_sink.crash_and_recover(checkpoint_state)
print(f"Recovered, committed snapshot {snap['id']} with {len(snap['files'])} files")

# Re-running commit is idempotent: Iceberg dedups by path
snap2 = recovered_sink.crash_and_recover(checkpoint_state)
print(f"Idempotent re-commit: snapshot returned = {snap2['id'] if snap2 else 'noop'}")

# Sample run:
# Pre-commit recorded 1 staged files
# Recovered, committed snapshot 1 with 1 files
# Idempotent re-commit: snapshot returned = 1

Walk the load-bearing pieces:

The output shows the punchline: pre_commit stages the file, the sink crashes, recovery re-commits — and the table ends up with exactly one snapshot containing exactly one file. Replaying the commit a third time is a no-op because Iceberg dedups by path.

File-size tuning, freshness, and the small-file feedback loop

Three knobs decide the operating point. Tune deliberately.

Commit interval. Flink default is 30 seconds; Iceberg connector default is 60 seconds. Lower interval = lower freshness lag (newer data visible to readers sooner) but more commits and more files. Each commit produces at minimum one file per Kafka partition that received traffic in that interval. At 32 partitions and 60-second commits, you produce ~2,000 files per hour at minimum. Drop the interval to 10 seconds and you produce ~12,000 files per hour — your manifest tree starts to bloat within a day.

Size trigger. Default 128 MB matches Parquet's recommended row group sweet spot (large enough that compression and encoding amortise their fixed costs, small enough that one file is a manageable scan unit). Raising it improves read efficiency (fewer files to open per query) but worsens recovery time (more data to re-flush after a crash). Lowering it bounds memory usage but produces more small files.

Number of writer tasks. Equal to Kafka partitions by default in Flink. More writers = more parallelism (higher ingest throughput) but more files per commit (one file per writer per commit). The right answer for a Zerodha tick stream might be 32 writers (matching Kafka partitions); for a Razorpay payment-events stream with 8 partitions and 100,000 events/sec, you might need to fan out within a writer (multiple files per writer per commit) or accept larger writer-side buffering.

The small-file feedback loop is real. A Swiggy order-events team in 2025 ran a Flink job at 5-second commits because the dashboard team wanted "real-time" visibility. Within 36 hours, the manifest list for the table had grown to 4.2 GB across 1.8 lakh small files. Query latency on the Trino side jumped from 800 ms p95 to 47 seconds because every query had to download the entire manifest list. The fix: raise commit interval to 60 seconds, run hourly compaction (chapter on compaction: small files hell and how to avoid it), and add an alert on manifest_list_size_bytes / number_of_data_files > 1 KB. Within a week, the table was back to 600 ms p95.

The opposite case from the same team's Cred rewards-events table: 1-hour commit interval was set to "minimise files", but stakeholders reported dashboards looking 50 minutes stale during peak hours. The fix: drop to 5-minute commits, accept 12 commits per hour per partition (~1,500 files/hour total), schedule compaction nightly. The table now lags reality by ~3 minutes — well inside the dashboard SLA — and compaction returns the file count to baseline every morning.

Backpressure, watermarks, and the late-event problem

Streaming writes interact with the rest of the streaming machinery you've built up over Build 8.

Backpressure. If S3 PUT latency spikes (say, regional throttling during a Diwali traffic surge), the sink's buffer fills faster than it drains. Flink's backpressure mechanism propagates upstream: the sink reports it's full, the connector slows the consumer, the Kafka consumer pauses, the producer eventually sees lag growth. Why this is the right behaviour: the alternative — drop events when the sink is full — would silently lose data. Backpressure pushes the problem upstream where it surfaces as visible Kafka lag, and lag is observable, alertable, and bounded by Kafka's retention. Production Iceberg sinks expose sink.backpressure_seconds as a metric so the on-call can correlate dashboard staleness with S3 throttling.

Watermarks and partition discovery. A streaming sink writing to a partitioned Iceberg table (e.g. partitioned by days(event_time)) needs to know when a partition is "complete" so downstream consumers can read it without seeing late inserts. Iceberg does not have a built-in watermark concept — the engine (Flink) tracks the watermark and the sink can include it in a custom property. A common pattern: write each batch's max event_time into the snapshot summary, and have downstream readers gate on snapshot.summary['max-event-time'] > partition_end_time before consuming a partition. This is the lakehouse equivalent of "the partition is closed" semantics that data warehouses get from late-arriving-data SLAs.

Late events and the partition rewrite problem. An event timestamped 2026-04-22 14:00 that arrives at 14:47 (47 minutes late) needs to land in the 2026-04-22 partition, even if the sink is currently writing to the 2026-04-22 partition still. The sink keeps a per-partition open file; late events open additional files in older partitions. If an event arrives so late that the partition's open file was already committed, the sink must open a new file — bumping the file count for that partition. Flink's Iceberg connector handles this via per-partition file groups; the cost is one extra file per late-arrival batch per old partition.

Common confusions

Going deeper

Apache Paimon and the streaming-first table format

Most lakehouse formats (Iceberg, Delta, Hudi) were designed batch-first and added streaming write support later. Apache Paimon (incubated at Alibaba, Apache top-level in 2024) flipped the order: streaming was the primary use case, batch reads layered on top. Paimon uses an LSM-tree-style storage layout where Flink writers append change-log files continuously, and a background compactor merges them into base files. The commit cadence is decoupled from the Flink checkpoint — writers append change-log records on every checkpoint barrier (sub-second granularity) without going through a CAS, and the metadata layer reconciles them. The trade-off: Paimon reads are MoR-style merges by default, so query latency is higher than Iceberg CoW reads. For high-write tables (Alibaba uses Paimon for Taobao order events at >1 million events/sec), the throughput gains justify the read overhead. Iceberg V3 and Hudi 0.15 are catching up with similar primitives, but Paimon was first to ship them in production.

Watermark-aligned commits and partition-completion signalling

Some teams need stronger guarantees than "the table eventually contains the data". Pinterest's analytics pipeline (Iceberg + Flink) uses watermark-aligned commits — the sink does not emit a commit until the watermark has advanced past the partition boundary. A 2026-04-22 partition's last commit happens at watermark 2026-04-23 00:01:00, after which no more late events for that partition will arrive (assuming the watermark is honest). Downstream batch jobs subscribe to a "partition complete" notification and start their daily aggregation only after the signal fires. This collapses the late-arriving-data problem that haunted Lambda architectures into a single condition: was the watermark past the partition boundary at commit time?

Concurrent streaming writers and the catalog hot spot

A table with 32 Kafka partitions, each handled by a separate Flink writer task, results in 32 concurrent CAS attempts on every commit cycle. Glue's CAS implementation handles maybe 200 successful CAS/sec per table; Hive Metastore can sustain 50-100. At 32 writers × 60 commits/hour = 1,920 commits/hour, you're well within capacity. But scale to 256 writers (a multi-stream join into a wide fact table) and you'll see CAS retries, which inflate commit latency and cascade into backpressure. The fix is a fan-in committer — one task aggregates the file lists from all writers and does a single CAS per checkpoint cycle. The Iceberg-Flink connector has supported this since 1.2; it's the default in production deployments at Apple and ByteDance.

Why exactly-once is end-to-end, not sink-only

The two-phase commit sink gives exactly-once for the sink. End-to-end exactly-once requires the source to be replayable (Kafka with consumer groups: yes), the framework to track source offsets in checkpointed state (Flink: yes), and the sink to be idempotent on commit (Iceberg: yes). Break any link in that chain — say, the Kafka consumer auto-commits offsets independently of the Flink checkpoint — and you regress to at-least-once. Why this matters in operations: a common production bug is enabling Kafka's enable.auto.commit=true on the underlying KafkaConsumer, then wondering why duplicates appear after a sink crash. The auto-commit advances Kafka's offset before Flink's checkpoint completes; on restart, Flink replays from a Kafka offset that's already past the events the sink hasn't yet committed to Iceberg. Flink ships with enable.auto.commit=false for exactly that reason — the framework owns offset management end-to-end.

Where this leads next

The next chapter, /wiki/cdc-iceberg-the-real-world-pattern, takes the streaming-write machinery from this chapter and applies it to the most common real-world pipeline: Postgres → Debezium → Kafka → Iceberg. The two-phase commit sink remains, but the source is now a CDC stream with insert/update/delete operations, which forces the sink into MoR territory (copy-on-write vs merge-on-read) for any non-trivial mutation rate.

After CDC, /wiki/query-engines-on-top-trino-spark-dremio-duckdb covers the read side: how query engines handle a table that's being written to continuously by a streaming sink, and what isolation guarantees they actually provide.

Build 14 (real-time analytics) revisits the freshness question at a different layer — ClickHouse and Pinot ingest from Kafka directly without going through a lakehouse, and their streaming-write path is a different design point entirely.

References