CDC → Iceberg: the real-world pattern
A Razorpay platform engineer is asked at 3 p.m. on a Wednesday to build "a copy of the payments table in the lake, kept fresh within a minute, that the analytics team can query without hitting prod". The naive answer — SELECT * FROM payments every minute — kills the OLTP database within an hour because the analytics team writes scan-heavy queries that lock 8 crore rows. The next answer — nightly dumps — gives a 24-hour staleness that the fraud team rejects. The shape every team converges on, after one or two failed attempts, is a four-stage pipeline: Postgres logical replication slot → Debezium → Kafka → an Iceberg sink that does merge-on-read upserts. By Friday afternoon the pipeline is live, the analytics team is querying a payments table on S3 that lags prod by 23 seconds, the OLTP database is no longer being scanned, and the engineer has discovered that the gap between "I understand each piece" and "the pipeline works under failure" is three subtle protocol decisions that every team reinvents. This chapter is those three decisions, named.
The standard CDC → Iceberg pipeline turns row-level OLTP mutations (insert/update/delete) into append-only Kafka events, then writes those events into Iceberg as merge-on-read deltas. Three protocol pieces hold it together: an upsert key (so update events reconcile to the same row), a tombstone convention (so delete events are visible to readers), and a snapshot-then-stream bootstrap (so the sink starts from a consistent point without locking the source). Iceberg V2's row-level deletes (positional and equality) make the merge tractable; periodic compaction collapses the deltas into base files.
Why CDC into Iceberg is the default lakehouse pipeline
Before 2020, the typical Postgres → analytics warehouse pipeline was nightly pg_dump + COPY into Snowflake, paying 8–24 hours of staleness and an IO storm on prod every night. Three things changed: object storage got cheap enough to store every change, Debezium's architecture made logical decoding usable from JVM stream apps, and Iceberg V2 added row-level deletes — meaning a streaming sink could express "this row was deleted" without rewriting the whole file. By 2026, every payments company, every B2B SaaS, every Indian fintech with more than ₹100 crore in TPV runs some variant of this pipeline.
The mechanism is unintuitive at first. A Postgres UPDATE is one row mutation in place; Kafka is an append-only log; Iceberg is a store of immutable files. How does an in-place UPDATE become an append become an immutable file and still let a SELECT see the right value? The answer: by tracking deletes alongside writes. The sink writes a new row for the post-image and records a delete entry pointing at the pre-image. A reader reconciles them at query time (merge-on-read, /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi) or at compaction time, which collapses the deltas into a clean base file.
The three protocol decisions every team reinvents
Every CDC pipeline answers the same three questions. The answers determine whether the lake matches the source under load, under failures, and under schema changes.
1. The upsert key — what makes "the same row" the same row?
Debezium emits one Kafka message per row mutation. The message has a before block (pre-image) and an after block (post-image). For a CDC sink to keep the lake in sync, it must know that two messages refer to the same logical row. The natural answer is the source table's primary key. Debezium uses the PK by default as the Kafka message key, which has two effects: (a) all changes for one row land in one Kafka partition, preserving order, and (b) the sink can use the same key as the upsert key when writing to Iceberg.
Why the upsert key cannot be a surrogate or hash: if you key by MD5(payment_id || updated_at), an UPDATE generates a new key, so the sink writes a new row and never knows the old row should be deleted. The lake duplicates instead of updating. The upsert key must be invariant across the row's lifetime — which is exactly what a primary key is. For tables without a stable PK (sharded writes, append-only event tables), CDC into Iceberg is either inappropriate or requires a synthetic stable key generated upstream.
2. The tombstone convention — how does a DELETE become visible?
A Postgres DELETE FROM payments WHERE id=4781 produces a Debezium event with op: 'd', before: {id: 4781, ...}, after: null. The sink must reflect this in Iceberg such that a SELECT * FROM payments WHERE id=4781 returns no rows. There are two ways to do this in Iceberg V2:
- Equality delete: write a small Parquet file containing only the deleted row's PK column, marking it as an
equality-deletefile in the manifest. Readers planning a scan check the equality-delete files for matching keys and exclude those rows. Cheap to write (one small file per delete batch), more expensive to read (every reader applies the delete files at scan time). - Positional delete: track which (file, row offset) tuples are deleted. More precise, lower read cost, but requires the writer to know the exact location of the pre-image — which streaming sinks usually don't, because they didn't write the row that's now being deleted.
For CDC, equality deletes are the standard. The sink writes the deleted row's PK to an equality-delete file with the same partition spec as the data files. Why equality deletes win for CDC: positional deletes need the (file_path, row_position) tuple of the row being deleted, but the streaming sink seeing the DELETE event has no way to know which Iceberg file contains that row — that file might have been written by a different writer task last week, or by a backfill job, or already merged by compaction. Equality deletes only need the PK, which the sink always has from the Debezium message.
The tradeoff cascades into compaction. Every reader pays the equality-delete read cost on every query, so equality-delete files accumulating without compaction degrade query latency linearly. Production pipelines run compaction at least hourly to drop the equality-delete file count back to single digits per partition.
3. The snapshot-then-stream bootstrap
When you turn on a CDC pipeline for a payments table that already has 12 crore rows, where do you start? If you start from Kafka offset 0, you replay every change since the table was created — except Debezium's stream typically only goes back as far as the Postgres replication slot's retention (a few hours to days). If you start from "now", you've lost every row that existed before "now". Neither works alone.
The standard answer, covered in /wiki/snapshot-cdc-the-bootstrapping-problem, is: take a consistent snapshot of the source table, write it to the destination, then start the stream from the LSN that was current when the snapshot was taken. Debezium handles this end-to-end with its snapshot.mode=initial setting — the connector locks the source table briefly (or uses a consistent transaction with REPEATABLE READ), reads every row, emits each as a synthetic create event, then switches to streaming from the captured LSN.
Why the LSN handoff is exactly right: Postgres's replication slot guarantees that no commits before the LSN you read can produce events on the slot, and no commits after that LSN are missed. Snapshotting at LSN X and streaming from LSN X gives you a consistent cut: every row state the snapshot saw is in the destination, every row state after X is in the stream. The destination ends up with exactly the same set of row states as the source, with each state visible in the order it was committed in Postgres.
For the lakehouse case, the snapshot phase typically takes hours for tables of crores of rows; during that time, the stream is buffered in Kafka (Kafka's retention must exceed the snapshot duration, or the stream is lost). The Iceberg sink writes the snapshot rows as a single big batch (often via Spark or Flink batch mode) and then switches to streaming-write mode for the rest. The transition is the operational risk — every team has at least one war story about a snapshot taking longer than Kafka retention and re-running the bootstrap from scratch.
A working CDC sink simulation
The mechanics of equality-delete reconciliation are clearest when you write the merge yourself. Here is a 90-line model of a CDC sink that consumes Debezium-style events and produces Iceberg-style data files plus equality-delete files, with a query path that reconciles them.
# cdc_iceberg_sink.py — minimal CDC-to-Iceberg sink with equality-delete merge.
# Models: insert / update / delete events, MoR reads, and compaction.
import uuid, time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class DataFile:
path: str
rows: dict = field(default_factory=dict) # pk -> row
@dataclass
class EqualityDeleteFile:
path: str
keys: set = field(default_factory=set) # pks marked deleted
@dataclass
class IcebergV2Table:
data_files: list = field(default_factory=list)
eq_delete_files: list = field(default_factory=list)
def commit(self, new_data, new_eq_deletes):
self.data_files.extend(new_data)
self.eq_delete_files.extend(new_eq_deletes)
def scan(self, pk=None):
# Apply equality deletes: a delete in file F shadows any data row with that pk
# written before F's sequence number. We model sequence by file order.
deleted = set()
results = {}
for i, df in enumerate(self.data_files):
for k, row in df.rows.items():
if k in deleted: continue
if pk is not None and k != pk: continue
results[k] = row # later writes overwrite earlier (last-write-wins)
# apply any eq-delete file that came at or before this data file index
for ef in self.eq_delete_files[: i + 1]:
for k in ef.keys:
if k in results: del results[k]
deleted.add(k)
return results
def compact(self):
# Materialise the current logical state into a single base file,
# drop all the deltas. This is the read-path's friend.
live = self.scan()
new_base = DataFile(path=f"base/{uuid.uuid4().hex[:8]}.parquet", rows=live)
self.data_files = [new_base]
self.eq_delete_files = []
class CDCSink:
def __init__(self, table):
self.table = table
self.buffer_data = DataFile(path="")
self.buffer_eqdel = EqualityDeleteFile(path="")
def handle_event(self, ev):
op, pk, after = ev["op"], ev["pk"], ev.get("after")
if op in ("c", "u", "r"): # create, update, snapshot-read
self.buffer_data.rows[pk] = after
if op == "u":
# update: shadow the pre-image, then write the post-image
self.buffer_eqdel.keys.add(pk)
elif op == "d":
self.buffer_eqdel.keys.add(pk)
if pk in self.buffer_data.rows:
del self.buffer_data.rows[pk]
def commit_micro_batch(self):
new_data, new_eqdel = [], []
if self.buffer_data.rows:
self.buffer_data.path = f"data/{uuid.uuid4().hex[:8]}.parquet"
new_data = [self.buffer_data]
if self.buffer_eqdel.keys:
self.buffer_eqdel.path = f"eqdel/{uuid.uuid4().hex[:8]}.parquet"
new_eqdel = [self.buffer_eqdel]
self.table.commit(new_data, new_eqdel)
self.buffer_data = DataFile(path="")
self.buffer_eqdel = EqualityDeleteFile(path="")
# Drive a small CDC stream
table = IcebergV2Table(); sink = CDCSink(table)
events = [
{"op": "c", "pk": "P-4781", "after": {"id": "P-4781", "amt": 1500, "status": "init"}},
{"op": "c", "pk": "P-4782", "after": {"id": "P-4782", "amt": 2400, "status": "init"}},
{"op": "u", "pk": "P-4781", "after": {"id": "P-4781", "amt": 1500, "status": "settled"}},
{"op": "d", "pk": "P-4782"},
{"op": "c", "pk": "P-4783", "after": {"id": "P-4783", "amt": 9999, "status": "init"}},
]
for ev in events: sink.handle_event(ev)
sink.commit_micro_batch()
print("Live state after batch 1:", table.scan())
print(f"Files: {len(table.data_files)} data, {len(table.eq_delete_files)} eq-delete")
# Another batch: a late update for 4781
sink.handle_event({"op": "u", "pk": "P-4781",
"after": {"id": "P-4781", "amt": 1500, "status": "refunded"}})
sink.commit_micro_batch()
print("Live state after batch 2:", table.scan())
print(f"Files: {len(table.data_files)} data, {len(table.eq_delete_files)} eq-delete")
table.compact()
print("After compaction:", table.scan())
print(f"Files: {len(table.data_files)} data, {len(table.eq_delete_files)} eq-delete")
# Sample run:
# Live state after batch 1: {'P-4781': {'id': 'P-4781', 'amt': 1500, 'status': 'settled'},
# 'P-4783': {'id': 'P-4783', 'amt': 9999, 'status': 'init'}}
# Files: 1 data, 1 eq-delete
# Live state after batch 2: {'P-4781': {'id': 'P-4781', 'amt': 1500, 'status': 'refunded'},
# 'P-4783': {'id': 'P-4783', 'amt': 9999, 'status': 'init'}}
# Files: 2 data, 2 eq-delete
# After compaction: {'P-4781': {'id': 'P-4781', 'amt': 1500, 'status': 'refunded'},
# 'P-4783': {'id': 'P-4783', 'amt': 9999, 'status': 'init'}}
# Files: 1 data, 0 eq-delete
Walk the load-bearing pieces:
if op == "u": self.buffer_eqdel.keys.add(pk)— an UPDATE writes both an entry in the data buffer (the post-image) and an entry in the equality-delete buffer (shadowing the pre-image). Why both: without the eq-delete, a reader would see the old row from a previous data file plus the new row from the current data file — i.e., the same payment with two statuses. The eq-delete shadows every prior version of the same PK so the reader sees only the latest post-image.for ef in self.eq_delete_files[: i + 1]:— the read path applies equality-delete files up to the current data file's sequence number. This is the simplification of Iceberg V2's actual sequence-number rules; in production, each file has asequence_numberand a delete file applies only to data files with strictly smaller sequence numbers.def compact(self):— collapses the logical state into a single base file and drops the deltas. In real Iceberg this isSpark.rewrite_data_filesoriceberg-flink-runtime'sRewriteDataFilesaction. After compaction, queries no longer pay the eq-delete cost — until the next stream of CDC events accumulates more deltas.self.buffer_data = DataFile(path="")— each commit cycle starts a fresh buffer. In production, the buffer is a per-Kafka-partition Parquet writer, sized to ~128 MB, identical to the /wiki/streaming-writes-into-a-lakehouse sink; the only addition is the eq-delete writer running in parallel.if op == "d": ... if pk in self.buffer_data.rows: del self.buffer_data.rows[pk]— when a delete arrives in the same micro-batch as an earlier insert for the same PK, the sink shortcuts by dropping the in-buffer row and writing only the eq-delete. Why this matters: a payment created and refunded within one second producescfollowed bydin Kafka. Without the in-buffer cancellation, the sink writes the row to a data file and shadows it — a dead write. At 50,000 events/sec on a Razorpay payment topic this saves 4–5% of S3 PUT cost.
The output shows the punchline: after batch 1, the P-4782 row is invisible (its create plus delete cancelled in-buffer); P-4781 shows status: 'settled' (the update post-image). After batch 2, P-4781 shows status: 'refunded' (the latest update). After compaction, all of this collapses to a single base file with two rows and zero delete files — and the reader pays no eq-delete cost on subsequent queries.
Schema evolution and the producer-consumer split-brain
Postgres tables change. Columns get added, types widen, defaults change. The CDC pipeline has to handle this without breaking the Iceberg table (where readers may have queries baked in) and without dropping events. The standard approach uses a schema registry (Confluent Schema Registry, AWS Glue Schema Registry) that Debezium publishes to and the Iceberg sink reads from. Each Kafka message carries a schema ID; the sink looks up the schema, maps it to the current Iceberg schema, and either applies the message directly (compatible) or evolves the Iceberg schema (additive change) or routes to a dead-letter topic (incompatible).
Three rules in production:
- Additive changes (new columns) flow through automatically. The sink calls
ALTER TABLE ... ADD COLUMNon Iceberg; old files default the column to NULL, new files include it. No downtime. - Type widenings flow through manually. Iceberg V2 only supports a fixed set of promotions (int→long, float→double); other widenings need a rename + dual-write phase.
- Renames and deletions break. A Postgres
DROP COLUMN amount; ADD COLUMN amount_inris rename+rebuild semantically, but Debezium emits it as two unrelated DDL events. Teams pause the sink, align the Iceberg schema, restart at the right LSN, and backfill the affected window. See /wiki/schema-evolution-across-the-cdc-boundary.
The Indian context here is GST-mandated schema changes. When a regulatory deadline forces every payments table in the country to add a gst_state_code column on a fixed date, the CDC sinks across Razorpay, Cred, PhonePe all see the same evolution event in their Debezium streams within the same week. Pipelines that handled additive changes automatically rolled through; pipelines hard-coded to the old schema needed manual intervention. The teams that survived had alerts on iceberg_schema_drift_count > 0.
Common confusions
- "Debezium guarantees exactly-once delivery into Iceberg." Debezium guarantees at-least-once delivery into Kafka and an offset-tracking model that, combined with an idempotent sink and Kafka's exactly-once support, can give end-to-end exactly-once. The Iceberg sink must be idempotent on its own — typically by using the
op + pk + lsntriple as a dedup key during commit, or by relying on Iceberg's natural file-path idempotency on retry. Without the sink-side care, you regress to at-least-once and see duplicate rows after restarts. - "Equality deletes are slow, so positional deletes are always better." Positional deletes are unusable for streaming CDC because the sink doesn't know which file the pre-image was written to. Positional deletes are the right primitive for batch jobs that rewrite known rows (e.g. a GDPR delete job that scans for the user's PII and rewrites the affected files); they are the wrong primitive for a streaming sink reacting to row mutations.
- "You can replay Kafka and rebuild Iceberg from scratch." Yes, but Kafka's retention is finite (typically 7 days), so "replay from scratch" only works if your Kafka topic still has the full history. Production CDC topics are often configured with infinite retention specifically to enable rebuild scenarios; the cost is Kafka storage on broker disk, which becomes a multi-petabyte problem within a year. The alternative is taking a fresh
snapshot.mode=initialfrom Postgres, which works only if the source table can tolerate the snapshot scan. - "Iceberg V1 supports CDC." Iceberg V1 has no row-level deletes — it only has file-level deletes (drop a whole Parquet file). To handle a single-row update in V1, you must rewrite the entire file containing that row (copy-on-write). For low-mutation tables this is fine; for any table getting more than ~1% mutation per hour, V1 becomes uneconomical. Iceberg V2 (released 2022) added equality and positional deletes specifically to support CDC use cases.
- "Kafka's primary key partitioning preserves global order." It preserves order within a partition — which means within a primary key, every change for that row appears in commit order. Across partitions, there is no global order. Two rows updated in the same Postgres transaction may land in different Kafka partitions and arrive at the sink out of source-commit order. Most CDC sinks ignore this because per-row order is what matters for the lakehouse copy; some teams that need cross-row consistency (e.g., joining
ordersandorder_itemswith referential integrity) instead use the LSN as a global ordering key and reconcile downstream.
Going deeper
The Razorpay 2024 payment-events war story
Razorpay's 2024 lakehouse migration moved 47 OLTP tables (across payments, refunds, payouts, settlements) onto a CDC → Iceberg pipeline running on Flink + Debezium + Glue Iceberg. The first deployment used Iceberg V1 with copy-on-write; on Diwali week, mutation rate on payments peaked at 14% per hour, and the file-rewrite cost on the writer side caused S3 PUT throttling that backed up Kafka by 47 minutes. The team migrated payments to V2 with equality deletes within the week. Three months later, the same team published an internal post-mortem identifying that equality-delete file count had grown to 4,800 per partition because compaction was misconfigured (running once per day, not once per hour). The compaction-frequency tuning is now a checklist item for every new CDC table at Razorpay; the rule of thumb they share publicly is "compact whenever delete file count exceeds 50 per partition, no matter the schedule".
Multi-table CDC and referential consistency
A single OLTP transaction often touches multiple tables — an order INSERT plus three order_items INSERTs, all under the same XID. Postgres's WAL emits them as separate row events with the same xmin. Debezium can emit them as separate Kafka messages, but they may land on different partitions (since each table has its own primary key) and so may be processed by the Iceberg sink at slightly different times. For analytics queries that join orders and order_items, this creates a brief window where a query could see the orders row but not its items — a transient referential-consistency violation. The high-end fix is transactional outbox + a single CDC topic ordered by xid, which is what Debezium 2.x's transaction.metadata mode produces. Most teams accept the eventual-consistency window because the alternative (single-partition global ordering) caps throughput at one partition's bandwidth. This trade-off is one of the most underdiscussed parts of running CDC at scale.
Why some teams skip Kafka
Iceberg V3 (rolling out in 2026) and Hudi 0.15 both ship a "direct CDC connector" that consumes from Postgres logical replication and writes to Iceberg without a Kafka intermediary. The pitch: fewer hops, lower lag, no Kafka cluster to operate. The cost: you lose Kafka's replay capability (no rebuild from history), you lose Kafka's fan-out (only one consumer per slot), and you lose Kafka's buffering during sink outages (the slot fills up, source-side WAL retention pressure builds, and the source DBA pages you). For most teams the Kafka hop is worth it; for tightly-coupled, single-consumer use cases the direct connector is winning ground in 2026. PhonePe's settlement-events pipeline switched from Kafka-mediated to direct in late 2025; their published reasoning was a 38-second p95 latency reduction (from ~50s to ~12s) and a 23% cost reduction.
The dead-letter topic, and why CDC pipelines need one
Schema-incompatible events, malformed events from Debezium bugs, events that the sink can't merge (e.g. equality-delete on a primary key that doesn't exist in the table) all need to go somewhere. The standard pattern is a dead-letter Kafka topic that the sink writes to whenever an event can't be processed. The Iceberg sink does not block the main stream on these errors — it logs, increments a metric, writes to DLQ, and moves on. Why this matters: an early-2020s pattern was to halt the sink on any error, on the theory that "data integrity matters more than freshness". In practice, every CDC pipeline encounters at least one bug per year that produces unprocessable events; a halt-on-error sink stays halted until a human pages, and the 4-hour outage costs more than a few rows in DLQ. Modern CDC pipelines DLQ + alert + continue, with a daily DLQ-replay job that retries failed events after the bug is fixed.
Where this leads next
/wiki/streaming-writes-into-a-lakehouse is the upstream chapter that built the two-phase commit sink the CDC pipeline reuses. /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi covers the read-side trade-off in depth — why CDC-into-MoR is the only economical option for high-mutation tables.
The next chapter, /wiki/query-engines-on-top-trino-spark-dremio-duckdb, covers what the analytics team actually does with the CDC-fed Iceberg table: how Trino, Spark, Dremio, and DuckDB each handle MoR reads, equality-delete application, and stale-snapshot semantics differently — and which one the Razorpay analytics team would pick for which workload.
After that, Build 13 (the semantic layer) takes the CDC-fed table as raw input and asks: "what does active_customer mean, and who decides?" The CDC pipeline has delivered a faithful copy of the OLTP source; the semantic layer turns that copy into business metrics that survive analyst turnover.
References
- Debezium documentation — Postgres connector — canonical reference for the PostgreSQL → Kafka half of the pipeline; includes the snapshot-then-stream protocol and schema-evolution rules.
- Iceberg V2 spec — row-level deletes — the formal definition of equality and positional delete files, sequence numbers, and read-path reconciliation.
- Confluent — CDC into Iceberg with Debezium and Flink — a production walkthrough including the schema-registry integration and DLQ setup.
- Apple — Operating Iceberg at Scale — Apple's published account of running CDC-fed Iceberg tables across Spark, Trino, and Flink writers, including the compaction tuning that keeps eq-delete count bounded.
- Razorpay Engineering — Building a Lakehouse for Payments — the Indian-context reference for the CDC pattern at fintech scale.
- Apache Hudi — Record-level Index for CDC — the alternative table format's take on the same problem; useful contrast with Iceberg's eq-delete approach.
- /wiki/postgres-logical-decoding-from-scratch — the source-side mechanism the CDC pipeline depends on.
- /wiki/snapshot-cdc-the-bootstrapping-problem — the bootstrap protocol that lets a new pipeline start without losing rows or locking the source.