CDC → Iceberg: the real-world pattern

A Razorpay platform engineer is asked at 3 p.m. on a Wednesday to build "a copy of the payments table in the lake, kept fresh within a minute, that the analytics team can query without hitting prod". The naive answer — SELECT * FROM payments every minute — kills the OLTP database within an hour because the analytics team writes scan-heavy queries that lock 8 crore rows. The next answer — nightly dumps — gives a 24-hour staleness that the fraud team rejects. The shape every team converges on, after one or two failed attempts, is a four-stage pipeline: Postgres logical replication slot → Debezium → Kafka → an Iceberg sink that does merge-on-read upserts. By Friday afternoon the pipeline is live, the analytics team is querying a payments table on S3 that lags prod by 23 seconds, the OLTP database is no longer being scanned, and the engineer has discovered that the gap between "I understand each piece" and "the pipeline works under failure" is three subtle protocol decisions that every team reinvents. This chapter is those three decisions, named.

The standard CDC → Iceberg pipeline turns row-level OLTP mutations (insert/update/delete) into append-only Kafka events, then writes those events into Iceberg as merge-on-read deltas. Three protocol pieces hold it together: an upsert key (so update events reconcile to the same row), a tombstone convention (so delete events are visible to readers), and a snapshot-then-stream bootstrap (so the sink starts from a consistent point without locking the source). Iceberg V2's row-level deletes (positional and equality) make the merge tractable; periodic compaction collapses the deltas into base files.

Why CDC into Iceberg is the default lakehouse pipeline

Before 2020, the typical Postgres → analytics warehouse pipeline was nightly pg_dump + COPY into Snowflake, paying 8–24 hours of staleness and an IO storm on prod every night. Three things changed: object storage got cheap enough to store every change, Debezium's architecture made logical decoding usable from JVM stream apps, and Iceberg V2 added row-level deletes — meaning a streaming sink could express "this row was deleted" without rewriting the whole file. By 2026, every payments company, every B2B SaaS, every Indian fintech with more than ₹100 crore in TPV runs some variant of this pipeline.

The mechanism is unintuitive at first. A Postgres UPDATE is one row mutation in place; Kafka is an append-only log; Iceberg is a store of immutable files. How does an in-place UPDATE become an append become an immutable file and still let a SELECT see the right value? The answer: by tracking deletes alongside writes. The sink writes a new row for the post-image and records a delete entry pointing at the pre-image. A reader reconciles them at query time (merge-on-read, /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi) or at compaction time, which collapses the deltas into a clean base file.

CDC pipeline from Postgres to IcebergDiagram showing four stages: Postgres with a replication slot, Debezium connector emitting change events, Kafka topic holding the stream, and an Iceberg sink writing data files plus equality-delete files. Below, an Iceberg snapshot tree shows base files and delete files reconciled at read time. Postgres → Debezium → Kafka → Iceberg (MoR) Postgres payments table UPDATE id=4781 SET status='settled' replication slot pgoutput / wal2json LSN 0/3F2A1B Debezium Kafka Connect task {op:'u', before:{...}, after:{...}, source:{lsn,table}} JSON or Avro, schema registry Kafka db.public.payments key=primary_key offset 18441 c offset 18442 u offset 18443 d offset 18444 c Iceberg sink payments.iceberg data file: post-image part-data-00012.parquet eq-delete file part-eqdel-00012.parquet Iceberg snapshot tree (after 4 commits) snapshot 412 base: file-A, file-B eq-delete: del-1.parq id=4781 deleted snapshot 413 + data: file-C + eq-delete: del-2.parq id=5104 deleted snapshot 414 (current) + data: file-D, file-E + eq-delete: del-3.parq id=4781,5104 deleted future: compaction merges deltas into base drops eq-delete files runs hourly / nightly Reader query: SELECT status FROM payments WHERE id = 4781 1. Plan: scan files A, B, C, D, E that may contain id=4781 (manifest pruning) 2. Apply equality-delete files: row id=4781 in file-A is shadowed by del-3.parq 3. Return latest unshadowed row: post-image of UPDATE in file-D Cost: 1 logical row scanned, 5 physical files opened (until compaction lands)
The four-stage CDC pipeline. Postgres emits row-level changes via its replication slot; Debezium serialises them as JSON or Avro events; Kafka holds them with the primary key as the message key (so all changes for one row land in one partition); the Iceberg sink writes data files for post-images and equality-delete files for pre-images. Readers reconcile at query time; compaction collapses the deltas eventually.

The three protocol decisions every team reinvents

Every CDC pipeline answers the same three questions. The answers determine whether the lake matches the source under load, under failures, and under schema changes.

1. The upsert key — what makes "the same row" the same row?

Debezium emits one Kafka message per row mutation. The message has a before block (pre-image) and an after block (post-image). For a CDC sink to keep the lake in sync, it must know that two messages refer to the same logical row. The natural answer is the source table's primary key. Debezium uses the PK by default as the Kafka message key, which has two effects: (a) all changes for one row land in one Kafka partition, preserving order, and (b) the sink can use the same key as the upsert key when writing to Iceberg.

Why the upsert key cannot be a surrogate or hash: if you key by MD5(payment_id || updated_at), an UPDATE generates a new key, so the sink writes a new row and never knows the old row should be deleted. The lake duplicates instead of updating. The upsert key must be invariant across the row's lifetime — which is exactly what a primary key is. For tables without a stable PK (sharded writes, append-only event tables), CDC into Iceberg is either inappropriate or requires a synthetic stable key generated upstream.

2. The tombstone convention — how does a DELETE become visible?

A Postgres DELETE FROM payments WHERE id=4781 produces a Debezium event with op: 'd', before: {id: 4781, ...}, after: null. The sink must reflect this in Iceberg such that a SELECT * FROM payments WHERE id=4781 returns no rows. There are two ways to do this in Iceberg V2:

For CDC, equality deletes are the standard. The sink writes the deleted row's PK to an equality-delete file with the same partition spec as the data files. Why equality deletes win for CDC: positional deletes need the (file_path, row_position) tuple of the row being deleted, but the streaming sink seeing the DELETE event has no way to know which Iceberg file contains that row — that file might have been written by a different writer task last week, or by a backfill job, or already merged by compaction. Equality deletes only need the PK, which the sink always has from the Debezium message.

The tradeoff cascades into compaction. Every reader pays the equality-delete read cost on every query, so equality-delete files accumulating without compaction degrade query latency linearly. Production pipelines run compaction at least hourly to drop the equality-delete file count back to single digits per partition.

3. The snapshot-then-stream bootstrap

When you turn on a CDC pipeline for a payments table that already has 12 crore rows, where do you start? If you start from Kafka offset 0, you replay every change since the table was created — except Debezium's stream typically only goes back as far as the Postgres replication slot's retention (a few hours to days). If you start from "now", you've lost every row that existed before "now". Neither works alone.

The standard answer, covered in /wiki/snapshot-cdc-the-bootstrapping-problem, is: take a consistent snapshot of the source table, write it to the destination, then start the stream from the LSN that was current when the snapshot was taken. Debezium handles this end-to-end with its snapshot.mode=initial setting — the connector locks the source table briefly (or uses a consistent transaction with REPEATABLE READ), reads every row, emits each as a synthetic create event, then switches to streaming from the captured LSN.

Why the LSN handoff is exactly right: Postgres's replication slot guarantees that no commits before the LSN you read can produce events on the slot, and no commits after that LSN are missed. Snapshotting at LSN X and streaming from LSN X gives you a consistent cut: every row state the snapshot saw is in the destination, every row state after X is in the stream. The destination ends up with exactly the same set of row states as the source, with each state visible in the order it was committed in Postgres.

For the lakehouse case, the snapshot phase typically takes hours for tables of crores of rows; during that time, the stream is buffered in Kafka (Kafka's retention must exceed the snapshot duration, or the stream is lost). The Iceberg sink writes the snapshot rows as a single big batch (often via Spark or Flink batch mode) and then switches to streaming-write mode for the rest. The transition is the operational risk — every team has at least one war story about a snapshot taking longer than Kafka retention and re-running the bootstrap from scratch.

A working CDC sink simulation

The mechanics of equality-delete reconciliation are clearest when you write the merge yourself. Here is a 90-line model of a CDC sink that consumes Debezium-style events and produces Iceberg-style data files plus equality-delete files, with a query path that reconciles them.

# cdc_iceberg_sink.py — minimal CDC-to-Iceberg sink with equality-delete merge.
# Models: insert / update / delete events, MoR reads, and compaction.

import uuid, time
from collections import defaultdict
from dataclasses import dataclass, field

@dataclass
class DataFile:
    path: str
    rows: dict = field(default_factory=dict)   # pk -> row

@dataclass
class EqualityDeleteFile:
    path: str
    keys: set = field(default_factory=set)     # pks marked deleted

@dataclass
class IcebergV2Table:
    data_files: list = field(default_factory=list)
    eq_delete_files: list = field(default_factory=list)

    def commit(self, new_data, new_eq_deletes):
        self.data_files.extend(new_data)
        self.eq_delete_files.extend(new_eq_deletes)

    def scan(self, pk=None):
        # Apply equality deletes: a delete in file F shadows any data row with that pk
        # written before F's sequence number. We model sequence by file order.
        deleted = set()
        results = {}
        for i, df in enumerate(self.data_files):
            for k, row in df.rows.items():
                if k in deleted: continue
                if pk is not None and k != pk: continue
                results[k] = row    # later writes overwrite earlier (last-write-wins)
            # apply any eq-delete file that came at or before this data file index
            for ef in self.eq_delete_files[: i + 1]:
                for k in ef.keys:
                    if k in results: del results[k]
                    deleted.add(k)
        return results

    def compact(self):
        # Materialise the current logical state into a single base file,
        # drop all the deltas. This is the read-path's friend.
        live = self.scan()
        new_base = DataFile(path=f"base/{uuid.uuid4().hex[:8]}.parquet", rows=live)
        self.data_files = [new_base]
        self.eq_delete_files = []

class CDCSink:
    def __init__(self, table):
        self.table = table
        self.buffer_data = DataFile(path="")
        self.buffer_eqdel = EqualityDeleteFile(path="")

    def handle_event(self, ev):
        op, pk, after = ev["op"], ev["pk"], ev.get("after")
        if op in ("c", "u", "r"):    # create, update, snapshot-read
            self.buffer_data.rows[pk] = after
            if op == "u":
                # update: shadow the pre-image, then write the post-image
                self.buffer_eqdel.keys.add(pk)
        elif op == "d":
            self.buffer_eqdel.keys.add(pk)
            if pk in self.buffer_data.rows:
                del self.buffer_data.rows[pk]

    def commit_micro_batch(self):
        new_data, new_eqdel = [], []
        if self.buffer_data.rows:
            self.buffer_data.path = f"data/{uuid.uuid4().hex[:8]}.parquet"
            new_data = [self.buffer_data]
        if self.buffer_eqdel.keys:
            self.buffer_eqdel.path = f"eqdel/{uuid.uuid4().hex[:8]}.parquet"
            new_eqdel = [self.buffer_eqdel]
        self.table.commit(new_data, new_eqdel)
        self.buffer_data = DataFile(path="")
        self.buffer_eqdel = EqualityDeleteFile(path="")

# Drive a small CDC stream
table = IcebergV2Table(); sink = CDCSink(table)
events = [
    {"op": "c", "pk": "P-4781", "after": {"id": "P-4781", "amt": 1500, "status": "init"}},
    {"op": "c", "pk": "P-4782", "after": {"id": "P-4782", "amt": 2400, "status": "init"}},
    {"op": "u", "pk": "P-4781", "after": {"id": "P-4781", "amt": 1500, "status": "settled"}},
    {"op": "d", "pk": "P-4782"},
    {"op": "c", "pk": "P-4783", "after": {"id": "P-4783", "amt": 9999, "status": "init"}},
]
for ev in events: sink.handle_event(ev)
sink.commit_micro_batch()

print("Live state after batch 1:", table.scan())
print(f"Files: {len(table.data_files)} data, {len(table.eq_delete_files)} eq-delete")

# Another batch: a late update for 4781
sink.handle_event({"op": "u", "pk": "P-4781",
                   "after": {"id": "P-4781", "amt": 1500, "status": "refunded"}})
sink.commit_micro_batch()
print("Live state after batch 2:", table.scan())
print(f"Files: {len(table.data_files)} data, {len(table.eq_delete_files)} eq-delete")

table.compact()
print("After compaction:", table.scan())
print(f"Files: {len(table.data_files)} data, {len(table.eq_delete_files)} eq-delete")

# Sample run:
# Live state after batch 1: {'P-4781': {'id': 'P-4781', 'amt': 1500, 'status': 'settled'},
#                            'P-4783': {'id': 'P-4783', 'amt': 9999, 'status': 'init'}}
# Files: 1 data, 1 eq-delete
# Live state after batch 2: {'P-4781': {'id': 'P-4781', 'amt': 1500, 'status': 'refunded'},
#                            'P-4783': {'id': 'P-4783', 'amt': 9999, 'status': 'init'}}
# Files: 2 data, 2 eq-delete
# After compaction: {'P-4781': {'id': 'P-4781', 'amt': 1500, 'status': 'refunded'},
#                    'P-4783': {'id': 'P-4783', 'amt': 9999, 'status': 'init'}}
# Files: 1 data, 0 eq-delete

Walk the load-bearing pieces:

The output shows the punchline: after batch 1, the P-4782 row is invisible (its create plus delete cancelled in-buffer); P-4781 shows status: 'settled' (the update post-image). After batch 2, P-4781 shows status: 'refunded' (the latest update). After compaction, all of this collapses to a single base file with two rows and zero delete files — and the reader pays no eq-delete cost on subsequent queries.

Equality-delete merge at read timeThree vertical lanes show data files, equality-delete files, and the reconciled view a query sees. The middle lane shadows specific primary keys in the data lane, and the rightmost lane shows the visible result. How a Trino reader reconciles 3 data files + 2 eq-delete files Data files (in order) file-A · seq=1 P-4781 init ₹1500 P-4782 init ₹2400 P-4790 init ₹500 file-B · seq=2 P-4781 settled ₹1500 P-4791 init ₹3200 post-image of UPDATE file-C · seq=3 P-4781 refunded ₹1500 P-4792 init ₹780 post-image of UPDATE Eq-delete files del-1 · applies-to-seq<2 shadows PK = P-4782 DELETE op del-2 · applies-to-seq<3 shadows PK = P-4781 UPDATE pre-image Reconciled query result SELECT * FROM payments; P-4790 init ₹500 (file-A) P-4791 init ₹3200 (file-B) P-4781 refunded ₹1500 (file-C) P-4792 init ₹780 (file-C) P-4782 (deleted) not present P-4781 (init) shadowed P-4781 (settled) shadowed 4 rows scanned, 4 returned 3 data + 2 delete files opened
The read-side reconciliation. A reader opens all three data files plus both eq-delete files, then applies the deletes by sequence number: del-1 shadows P-4782 in file-A; del-2 shadows P-4781 in files A and B (but not the latest copy in file-C). The query returns four logical rows from five physical files. Compaction would collapse this into a single base file with four rows and zero deletes — at which point queries open only one file.

Schema evolution and the producer-consumer split-brain

Postgres tables change. Columns get added, types widen, defaults change. The CDC pipeline has to handle this without breaking the Iceberg table (where readers may have queries baked in) and without dropping events. The standard approach uses a schema registry (Confluent Schema Registry, AWS Glue Schema Registry) that Debezium publishes to and the Iceberg sink reads from. Each Kafka message carries a schema ID; the sink looks up the schema, maps it to the current Iceberg schema, and either applies the message directly (compatible) or evolves the Iceberg schema (additive change) or routes to a dead-letter topic (incompatible).

Three rules in production:

The Indian context here is GST-mandated schema changes. When a regulatory deadline forces every payments table in the country to add a gst_state_code column on a fixed date, the CDC sinks across Razorpay, Cred, PhonePe all see the same evolution event in their Debezium streams within the same week. Pipelines that handled additive changes automatically rolled through; pipelines hard-coded to the old schema needed manual intervention. The teams that survived had alerts on iceberg_schema_drift_count > 0.

Common confusions

Going deeper

The Razorpay 2024 payment-events war story

Razorpay's 2024 lakehouse migration moved 47 OLTP tables (across payments, refunds, payouts, settlements) onto a CDC → Iceberg pipeline running on Flink + Debezium + Glue Iceberg. The first deployment used Iceberg V1 with copy-on-write; on Diwali week, mutation rate on payments peaked at 14% per hour, and the file-rewrite cost on the writer side caused S3 PUT throttling that backed up Kafka by 47 minutes. The team migrated payments to V2 with equality deletes within the week. Three months later, the same team published an internal post-mortem identifying that equality-delete file count had grown to 4,800 per partition because compaction was misconfigured (running once per day, not once per hour). The compaction-frequency tuning is now a checklist item for every new CDC table at Razorpay; the rule of thumb they share publicly is "compact whenever delete file count exceeds 50 per partition, no matter the schedule".

Multi-table CDC and referential consistency

A single OLTP transaction often touches multiple tables — an order INSERT plus three order_items INSERTs, all under the same XID. Postgres's WAL emits them as separate row events with the same xmin. Debezium can emit them as separate Kafka messages, but they may land on different partitions (since each table has its own primary key) and so may be processed by the Iceberg sink at slightly different times. For analytics queries that join orders and order_items, this creates a brief window where a query could see the orders row but not its items — a transient referential-consistency violation. The high-end fix is transactional outbox + a single CDC topic ordered by xid, which is what Debezium 2.x's transaction.metadata mode produces. Most teams accept the eventual-consistency window because the alternative (single-partition global ordering) caps throughput at one partition's bandwidth. This trade-off is one of the most underdiscussed parts of running CDC at scale.

Why some teams skip Kafka

Iceberg V3 (rolling out in 2026) and Hudi 0.15 both ship a "direct CDC connector" that consumes from Postgres logical replication and writes to Iceberg without a Kafka intermediary. The pitch: fewer hops, lower lag, no Kafka cluster to operate. The cost: you lose Kafka's replay capability (no rebuild from history), you lose Kafka's fan-out (only one consumer per slot), and you lose Kafka's buffering during sink outages (the slot fills up, source-side WAL retention pressure builds, and the source DBA pages you). For most teams the Kafka hop is worth it; for tightly-coupled, single-consumer use cases the direct connector is winning ground in 2026. PhonePe's settlement-events pipeline switched from Kafka-mediated to direct in late 2025; their published reasoning was a 38-second p95 latency reduction (from ~50s to ~12s) and a 23% cost reduction.

The dead-letter topic, and why CDC pipelines need one

Schema-incompatible events, malformed events from Debezium bugs, events that the sink can't merge (e.g. equality-delete on a primary key that doesn't exist in the table) all need to go somewhere. The standard pattern is a dead-letter Kafka topic that the sink writes to whenever an event can't be processed. The Iceberg sink does not block the main stream on these errors — it logs, increments a metric, writes to DLQ, and moves on. Why this matters: an early-2020s pattern was to halt the sink on any error, on the theory that "data integrity matters more than freshness". In practice, every CDC pipeline encounters at least one bug per year that produces unprocessable events; a halt-on-error sink stays halted until a human pages, and the 4-hour outage costs more than a few rows in DLQ. Modern CDC pipelines DLQ + alert + continue, with a daily DLQ-replay job that retries failed events after the bug is fixed.

Where this leads next

/wiki/streaming-writes-into-a-lakehouse is the upstream chapter that built the two-phase commit sink the CDC pipeline reuses. /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi covers the read-side trade-off in depth — why CDC-into-MoR is the only economical option for high-mutation tables.

The next chapter, /wiki/query-engines-on-top-trino-spark-dremio-duckdb, covers what the analytics team actually does with the CDC-fed Iceberg table: how Trino, Spark, Dremio, and DuckDB each handle MoR reads, equality-delete application, and stale-snapshot semantics differently — and which one the Razorpay analytics team would pick for which workload.

After that, Build 13 (the semantic layer) takes the CDC-fed table as raw input and asks: "what does active_customer mean, and who decides?" The CDC pipeline has delivered a faithful copy of the OLTP source; the semantic layer turns that copy into business metrics that survive analyst turnover.

References