Streaming writes into a lakehouse
A Zerodha tick-data engineer wires Kafka topic nse-trade-ticks into an Iceberg table on S3 at 6:14 a.m. on the morning of an RBI policy day, expecting volume to peak around 11:30 a.m. when the announcement lands. By 11:32, the producer is sustaining 78,000 messages per second across 32 Kafka partitions. The naive integration — one Parquet file per Kafka message — would emit 78,000 files per second, 4.68 lakh files per minute, 28 crore files per hour. S3 PUT charges alone would be ₹3,200 per minute (₹0.0042 per 1,000 PUTs at ap-south-1), and the resulting table would have a query latency in minutes because Iceberg's manifest tree would balloon to gigabytes before lunch. The Iceberg commit protocol (chapter on manifest files and the commit protocol) is also fundamentally a batch primitive — every commit is a CAS swap of the table's metadata pointer, and you cannot CAS-swap 78,000 times a second. Streaming writes into a lakehouse is the engineering of a bridge between a per-message world and a per-snapshot world, and the bridge has three load-bearing pieces: a micro-batch buffer, a two-phase commit between the stream framework and the table format, and a committer thread that snapshots only when the buffer is full enough or old enough.
A streaming engine cannot CAS-commit to Iceberg or Delta on every event — the commit cost is O(seconds) and the event rate is O(thousands/sec). The bridge is a sink that buffers events into local Parquet files, then triggers a commit when the buffer hits a size threshold (default ~128 MB) or a time threshold (default ~1 minute). To survive crashes, the sink uses a two-phase commit (Flink's TwoPhaseCommitSinkFunction): pre-commit writes the data files but does NOT update the table; the commit step CAS-swaps the manifest only after the framework's checkpoint succeeds. This gives end-to-end exactly-once and bounds the small-file count to roughly partitions × commits-per-hour. Compaction handles whatever small files leak through.
Why per-event commits are physically impossible
The Iceberg commit protocol writes a new metadata JSON pointing to a new manifest list, then atomically CAS-swaps the table's metadata_location pointer in the catalog (Glue, Hive Metastore, REST). The fast path takes 200–800 ms — read current metadata, append a manifest entry, PUT the new metadata, CAS the pointer. The slow path, when concurrent writers conflict, takes 2–10 seconds because the CAS retries with exponential backoff. Why no commit can be faster than ~200 ms: each commit needs at least three S3 round trips (read metadata, write metadata, write manifest list), a catalog round trip (CAS the pointer), and a snapshot ID generation step. Even on a perfectly tuned S3 bucket in the same region, that's 4 sequential network hops, each ~30–50 ms. You cannot commit faster than the speed of light through your network stack, and the commit work is inherently sequential per table.
So the math is brutal: at 200 ms per commit, the maximum commit rate is 5 per second per table. At 78,000 events per second, a per-event commit would need 15,600× the achievable rate. Even at 2,000 events per second (a moderate Razorpay payment-events firehose), per-event commits exceed the protocol's capacity by 400×.
The fix is to make every commit cover many events. If a single commit covers 1 minute of events at 78,000 events/sec, that's 4.68 million events per commit and only 60 commits per hour — well within the protocol's capacity. The catch: between commits, those events are sitting somewhere. If the streaming framework crashes during that minute, you must not lose them; if it restarts, you must not double-write them. That "somewhere" plus the protocol around it is the rest of this chapter.
The two-phase commit between Flink and Iceberg
A Flink streaming job uses checkpoints to recover from failures. Every 30 seconds (default), the framework injects a checkpoint barrier into the data flow. When the barrier reaches a sink, the sink must persist any in-flight state and acknowledge the barrier; once every operator acknowledges, the checkpoint is complete and Flink updates the source's offset checkpoint. If anything in the pipeline crashes, Flink restarts from the last completed checkpoint — replaying messages from that Kafka offset.
For an Iceberg sink to be exactly-once, the commit to Iceberg must align with this checkpoint protocol. The pattern is Flink's TwoPhaseCommitSinkFunction:
- Pre-commit. Triggered when the checkpoint barrier reaches the sink. Close all open Parquet files (flush buffers, write footer with row-group statistics, upload to S3). Record their S3 paths in the sink's checkpointed state. Do not commit to the Iceberg table yet. The Parquet files are on S3 but invisible — no manifest references them.
- Notify checkpoint complete. Flink confirms the checkpoint succeeded across all operators (sources, transforms, sinks).
- Commit. Triggered after notification. The sink now CAS-swaps the Iceberg metadata pointer to add the staged files. This is the actual visibility flip — readers can now see the new rows.
Why this ordering is exactly-once: if the sink crashes between step 1 and step 2, Flink restarts from the previous checkpoint. The staged Parquet files on S3 are orphaned (no manifest references them), and a janitor process eventually deletes them. No double-write — the Iceberg table never saw them. If the sink crashes between step 2 and step 3, the sink's checkpointed state still has the staged file paths; on restart, the sink re-runs the commit step. Iceberg's commit is idempotent (the manifest is keyed by file path; re-adding the same file is a no-op), so re-running step 3 lands the same data exactly once. The crash window is narrow — milliseconds between checkpoint completion and the catalog CAS — but the protocol must handle it.
The Iceberg connector for Flink (in org.apache.iceberg:iceberg-flink-runtime-1.18, used at production scale by Netflix, Apple, ByteDance) implements this. Spark Structured Streaming has an equivalent via the IcebergSource / streaming.write.checkpoint-location config — the mechanics differ but the two-phase shape is the same.
The Delta Lake equivalent (DeltaSink in Spark Structured Streaming) uses a transaction log instead of a manifest, but the two-phase commit shape is identical: stage files first, atomically append to the _delta_log/ directory only after the framework checkpoint succeeds.
A working streaming-write simulation
The mechanics are clearest in 90 lines. Here's a minimal model that simulates Kafka events flowing into an Iceberg-style sink with size and age triggers, two-phase commit, and crash recovery.
# streaming_sink.py — minimal model of a Flink-style two-phase commit sink for Iceberg.
# Demonstrates: micro-batch buffering, size/age commit triggers, two-phase commit,
# orphan file cleanup on crash, exactly-once on restart.
import time, random, uuid
from dataclasses import dataclass, field
@dataclass
class StagedFile:
path: str
rows: list = field(default_factory=list)
size_bytes: int = 0
opened_at: float = field(default_factory=time.time)
@dataclass
class IcebergTable:
snapshots: list = field(default_factory=list)
committed_files: set = field(default_factory=set)
def commit(self, file_paths):
new_files = [p for p in file_paths if p not in self.committed_files]
if not new_files: return self.snapshots[-1] if self.snapshots else None
snap = {"id": len(self.snapshots) + 1, "files": new_files, "ts": time.time()}
self.snapshots.append(snap); self.committed_files.update(new_files)
return snap
class StreamingSink:
SIZE_TRIGGER = 128 * 1024 * 1024 # 128 MB
AGE_TRIGGER = 60 # seconds
AVG_ROW_BYTES = 280
def __init__(self, table, partition_id):
self.table = table; self.partition_id = partition_id
self.staged = None; self.checkpointed_paths = []
def receive(self, event):
if self.staged is None:
self.staged = StagedFile(path=f"data/p{self.partition_id}/{uuid.uuid4().hex[:8]}.parquet")
self.staged.rows.append(event)
self.staged.size_bytes += self.AVG_ROW_BYTES
if self._should_close():
self._close_current()
def _should_close(self):
if self.staged is None: return False
return (self.staged.size_bytes >= self.SIZE_TRIGGER
or time.time() - self.staged.opened_at >= self.AGE_TRIGGER)
def _close_current(self):
# In real Iceberg: write Parquet footer, upload to S3.
self.checkpointed_paths.append(self.staged.path)
self.staged = None
def pre_commit(self):
# Phase 1: close any open file, persist paths in checkpointed state.
if self.staged is not None: self._close_current()
return list(self.checkpointed_paths) # this is what Flink checkpoints
def commit(self):
# Phase 2: atomic CAS to the Iceberg catalog. Iceberg dedups by path.
snap = self.table.commit(self.checkpointed_paths)
self.checkpointed_paths = []
return snap
def crash_and_recover(self, last_checkpoint_paths):
# On restart, Flink restores `checkpointed_paths` from the last checkpoint.
self.checkpointed_paths = list(last_checkpoint_paths)
self.staged = None
# Re-run commit; Iceberg's idempotency makes this safe.
return self.commit()
# Simulate 1 minute of Zerodha tick traffic on one Kafka partition
random.seed(7); table = IcebergTable(); sink = StreamingSink(table, partition_id=12)
for i in range(3000): # 3000 events at ~50/sec = 60s
sink.receive({"txn": f"TXN-{i}", "px": random.uniform(100, 200)})
# Flink checkpoint barrier arrives — two-phase commit
checkpoint_state = sink.pre_commit() # phase 1: close + record
print(f"Pre-commit recorded {len(checkpoint_state)} staged files")
# Crash simulation: pretend the sink dies HERE, before commit() runs
# On restart, Flink restores the checkpoint state and re-runs commit()
recovered_sink = StreamingSink(table, partition_id=12)
snap = recovered_sink.crash_and_recover(checkpoint_state)
print(f"Recovered, committed snapshot {snap['id']} with {len(snap['files'])} files")
# Re-running commit is idempotent: Iceberg dedups by path
snap2 = recovered_sink.crash_and_recover(checkpoint_state)
print(f"Idempotent re-commit: snapshot returned = {snap2['id'] if snap2 else 'noop'}")
# Sample run:
# Pre-commit recorded 1 staged files
# Recovered, committed snapshot 1 with 1 files
# Idempotent re-commit: snapshot returned = 1
Walk the load-bearing pieces:
if self.staged.size_bytes >= self.SIZE_TRIGGER or time.time() - self.staged.opened_at >= self.AGE_TRIGGER— the dual trigger. Size keeps file count bounded under high traffic; age keeps freshness bounded under low traffic. Why both are needed: size-only would mean a low-traffic table never commits. A reporting table that gets one event per minute would have a single open file growing for hours; a query at 10 a.m. would not see events that arrived at 9:00 a.m. Age-only would mean a high-traffic table commits the same 60-second window producing absurdly large files (60s × 78,000 events × 280 bytes = 1.3 GB), which exceeds Parquet's recommended row group size and HDFS-style block alignment. Together they bound both file count and freshness.def pre_commit(self):— phase 1. Closes open files, returns the list of file paths. This list is what Flink checkpoints. The Iceberg table is not updated here. Why deferring the table update is the whole point: if the sink crashes after pre_commit but before commit, the data files exist on S3 but no Iceberg snapshot references them. Readers see no change. On restart, Flink restores the file path list from the checkpoint state, and the recovered sink re-runs commit. The catalog CAS still sees a clean state and lands the new snapshot. No data is lost; no data is double-written.snap = self.table.commit(self.checkpointed_paths)— phase 2. The actual visibility flip. In real Iceberg this is a CAS on the metadata pointer in Glue / Hive / REST catalog. Why this is idempotent: Iceberg'scommit()checks whether the file paths are already in the table's manifest. If a re-run sees the same paths, the second CAS is a no-op (or a redundant snapshot with zero files added). The commit's effect-on-readers is identical whether it ran once or twice. This is what lets crash recovery be safe — the protocol does not require "exactly one call" to commit, only "at least one successful call".recovered_sink.crash_and_recover(checkpoint_state)— the restart path. The new sink instance gets the checkpoint state from Flink (which holds it in distributed state — RocksDB, HDFS, or S3). It re-runs commit. The base case (no crash) and the recovery case run the same code; that's the guarantee.AVG_ROW_BYTES = 280— the assumption that drives the size trigger. Real Parquet writers track file size more precisely (post-encoding, post-compression), but the principle is the same: convert events-buffered into bytes-buffered to drive the trigger.
The output shows the punchline: pre_commit stages the file, the sink crashes, recovery re-commits — and the table ends up with exactly one snapshot containing exactly one file. Replaying the commit a third time is a no-op because Iceberg dedups by path.
File-size tuning, freshness, and the small-file feedback loop
Three knobs decide the operating point. Tune deliberately.
Commit interval. Flink default is 30 seconds; Iceberg connector default is 60 seconds. Lower interval = lower freshness lag (newer data visible to readers sooner) but more commits and more files. Each commit produces at minimum one file per Kafka partition that received traffic in that interval. At 32 partitions and 60-second commits, you produce ~2,000 files per hour at minimum. Drop the interval to 10 seconds and you produce ~12,000 files per hour — your manifest tree starts to bloat within a day.
Size trigger. Default 128 MB matches Parquet's recommended row group sweet spot (large enough that compression and encoding amortise their fixed costs, small enough that one file is a manageable scan unit). Raising it improves read efficiency (fewer files to open per query) but worsens recovery time (more data to re-flush after a crash). Lowering it bounds memory usage but produces more small files.
Number of writer tasks. Equal to Kafka partitions by default in Flink. More writers = more parallelism (higher ingest throughput) but more files per commit (one file per writer per commit). The right answer for a Zerodha tick stream might be 32 writers (matching Kafka partitions); for a Razorpay payment-events stream with 8 partitions and 100,000 events/sec, you might need to fan out within a writer (multiple files per writer per commit) or accept larger writer-side buffering.
The small-file feedback loop is real. A Swiggy order-events team in 2025 ran a Flink job at 5-second commits because the dashboard team wanted "real-time" visibility. Within 36 hours, the manifest list for the table had grown to 4.2 GB across 1.8 lakh small files. Query latency on the Trino side jumped from 800 ms p95 to 47 seconds because every query had to download the entire manifest list. The fix: raise commit interval to 60 seconds, run hourly compaction (chapter on compaction: small files hell and how to avoid it), and add an alert on manifest_list_size_bytes / number_of_data_files > 1 KB. Within a week, the table was back to 600 ms p95.
The opposite case from the same team's Cred rewards-events table: 1-hour commit interval was set to "minimise files", but stakeholders reported dashboards looking 50 minutes stale during peak hours. The fix: drop to 5-minute commits, accept 12 commits per hour per partition (~1,500 files/hour total), schedule compaction nightly. The table now lags reality by ~3 minutes — well inside the dashboard SLA — and compaction returns the file count to baseline every morning.
Backpressure, watermarks, and the late-event problem
Streaming writes interact with the rest of the streaming machinery you've built up over Build 8.
Backpressure. If S3 PUT latency spikes (say, regional throttling during a Diwali traffic surge), the sink's buffer fills faster than it drains. Flink's backpressure mechanism propagates upstream: the sink reports it's full, the connector slows the consumer, the Kafka consumer pauses, the producer eventually sees lag growth. Why this is the right behaviour: the alternative — drop events when the sink is full — would silently lose data. Backpressure pushes the problem upstream where it surfaces as visible Kafka lag, and lag is observable, alertable, and bounded by Kafka's retention. Production Iceberg sinks expose sink.backpressure_seconds as a metric so the on-call can correlate dashboard staleness with S3 throttling.
Watermarks and partition discovery. A streaming sink writing to a partitioned Iceberg table (e.g. partitioned by days(event_time)) needs to know when a partition is "complete" so downstream consumers can read it without seeing late inserts. Iceberg does not have a built-in watermark concept — the engine (Flink) tracks the watermark and the sink can include it in a custom property. A common pattern: write each batch's max event_time into the snapshot summary, and have downstream readers gate on snapshot.summary['max-event-time'] > partition_end_time before consuming a partition. This is the lakehouse equivalent of "the partition is closed" semantics that data warehouses get from late-arriving-data SLAs.
Late events and the partition rewrite problem. An event timestamped 2026-04-22 14:00 that arrives at 14:47 (47 minutes late) needs to land in the 2026-04-22 partition, even if the sink is currently writing to the 2026-04-22 partition still. The sink keeps a per-partition open file; late events open additional files in older partitions. If an event arrives so late that the partition's open file was already committed, the sink must open a new file — bumping the file count for that partition. Flink's Iceberg connector handles this via per-partition file groups; the cost is one extra file per late-arrival batch per old partition.
Common confusions
- "Streaming writes are exactly-once because Iceberg is ACID." Iceberg's atomicity guarantees that a single commit is all-or-nothing. It does not guarantee that the events from Kafka are written exactly once into Iceberg. Exactly-once requires the framework's checkpoint protocol coordinating with the commit — that's what the two-phase commit sink does. Without it, a sink crash mid-commit can produce duplicates (the same Kafka offset reprocessed) or losses (events buffered but not yet staged).
- "Smaller commit intervals = fresher data, no downside." Smaller intervals multiply your file count, which inflates manifest tree size, which slows every read. The Trino query planner reads the entire manifest list before pruning; a 4 GB manifest list adds 30+ seconds to every query. Compaction can claw back read performance, but compaction has a cost too. The sweet spot is the largest interval that meets your freshness SLA — typically 30 s to 5 min.
- "Spark Structured Streaming and Flink behave the same." They share the two-phase commit shape but differ on micro-batch model (Spark is mini-batch by default; Flink is per-record), watermark API, late-event handling, and exactly-once granularity (Spark guarantees per-batch; Flink per-record with KIP-98-style transactional sources). For Iceberg specifically, the Flink connector has more mature exactly-once support; the Spark connector relies on the user setting
checkpointLocationcorrectly and is sensitive to mis-configuration. - "You can write to Iceberg from a Lambda function on every Kafka message." Architecturally yes, practically no. Each Lambda invocation would do a full Iceberg commit (~500 ms minimum). 78,000 messages/second would need 39,000 concurrent Lambdas, all CAS-fighting on the catalog. The catalog dies before the Lambdas do. Streaming sinks must batch.
- "Pre-commit and commit are both writes; the protocol is two writes for one logical change." Pre-commit is many writes (the staged Parquet files), commit is one CAS on the catalog metadata pointer. The cost asymmetry matters: pre-commit is bandwidth-bound (uploading data to S3), commit is latency-bound (one round trip to the catalog). Tuning the two phases separately is normal — bigger checkpoint intervals reduce commit pressure on the catalog without affecting per-record write latency.
Going deeper
Apache Paimon and the streaming-first table format
Most lakehouse formats (Iceberg, Delta, Hudi) were designed batch-first and added streaming write support later. Apache Paimon (incubated at Alibaba, Apache top-level in 2024) flipped the order: streaming was the primary use case, batch reads layered on top. Paimon uses an LSM-tree-style storage layout where Flink writers append change-log files continuously, and a background compactor merges them into base files. The commit cadence is decoupled from the Flink checkpoint — writers append change-log records on every checkpoint barrier (sub-second granularity) without going through a CAS, and the metadata layer reconciles them. The trade-off: Paimon reads are MoR-style merges by default, so query latency is higher than Iceberg CoW reads. For high-write tables (Alibaba uses Paimon for Taobao order events at >1 million events/sec), the throughput gains justify the read overhead. Iceberg V3 and Hudi 0.15 are catching up with similar primitives, but Paimon was first to ship them in production.
Watermark-aligned commits and partition-completion signalling
Some teams need stronger guarantees than "the table eventually contains the data". Pinterest's analytics pipeline (Iceberg + Flink) uses watermark-aligned commits — the sink does not emit a commit until the watermark has advanced past the partition boundary. A 2026-04-22 partition's last commit happens at watermark 2026-04-23 00:01:00, after which no more late events for that partition will arrive (assuming the watermark is honest). Downstream batch jobs subscribe to a "partition complete" notification and start their daily aggregation only after the signal fires. This collapses the late-arriving-data problem that haunted Lambda architectures into a single condition: was the watermark past the partition boundary at commit time?
Concurrent streaming writers and the catalog hot spot
A table with 32 Kafka partitions, each handled by a separate Flink writer task, results in 32 concurrent CAS attempts on every commit cycle. Glue's CAS implementation handles maybe 200 successful CAS/sec per table; Hive Metastore can sustain 50-100. At 32 writers × 60 commits/hour = 1,920 commits/hour, you're well within capacity. But scale to 256 writers (a multi-stream join into a wide fact table) and you'll see CAS retries, which inflate commit latency and cascade into backpressure. The fix is a fan-in committer — one task aggregates the file lists from all writers and does a single CAS per checkpoint cycle. The Iceberg-Flink connector has supported this since 1.2; it's the default in production deployments at Apple and ByteDance.
Why exactly-once is end-to-end, not sink-only
The two-phase commit sink gives exactly-once for the sink. End-to-end exactly-once requires the source to be replayable (Kafka with consumer groups: yes), the framework to track source offsets in checkpointed state (Flink: yes), and the sink to be idempotent on commit (Iceberg: yes). Break any link in that chain — say, the Kafka consumer auto-commits offsets independently of the Flink checkpoint — and you regress to at-least-once. Why this matters in operations: a common production bug is enabling Kafka's enable.auto.commit=true on the underlying KafkaConsumer, then wondering why duplicates appear after a sink crash. The auto-commit advances Kafka's offset before Flink's checkpoint completes; on restart, Flink replays from a Kafka offset that's already past the events the sink hasn't yet committed to Iceberg. Flink ships with enable.auto.commit=false for exactly that reason — the framework owns offset management end-to-end.
Where this leads next
The next chapter, /wiki/cdc-iceberg-the-real-world-pattern, takes the streaming-write machinery from this chapter and applies it to the most common real-world pipeline: Postgres → Debezium → Kafka → Iceberg. The two-phase commit sink remains, but the source is now a CDC stream with insert/update/delete operations, which forces the sink into MoR territory (copy-on-write vs merge-on-read) for any non-trivial mutation rate.
After CDC, /wiki/query-engines-on-top-trino-spark-dremio-duckdb covers the read side: how query engines handle a table that's being written to continuously by a streaming sink, and what isolation guarantees they actually provide.
Build 14 (real-time analytics) revisits the freshness question at a different layer — ClickHouse and Pinot ingest from Kafka directly without going through a lakehouse, and their streaming-write path is a different design point entirely.
References
- Apache Iceberg — Flink Connector docs — canonical reference for the streaming-write integration; includes the
IcebergSinkclass and config knobs. - Stephan Ewen — TwoPhaseCommitSinkFunction in Flink — the original 2018 design doc explaining the two-phase commit pattern that all lakehouse sinks copy.
- Netflix — Building a Real-time Data Platform with Apache Iceberg — Netflix's production write-up of streaming writes at multi-billion-event/day scale.
- Apache Paimon — Streaming Lakehouse design — the streaming-first format from Alibaba; useful contrast to Iceberg's batch-first design.
- ByteDance Engineering — Iceberg + Flink at TikTok scale — the published case study on running Iceberg streaming writes at >1 million events/sec.
- Confluent — Exactly-Once Semantics in Apache Kafka — the KIP-98 reference for the source side of end-to-end exactly-once.
- /wiki/manifest-files-and-the-commit-protocol — the Iceberg commit primitive that the two-phase sink builds on.
- /wiki/compaction-small-files-hell-and-how-to-avoid-it — the chapter that handles the small-file fallout from streaming writes.