Concurrent writers: optimistic concurrency, serializability

A Razorpay ingestion fleet runs 18 streaming jobs and 6 micro-batch jobs writing to one Iceberg payments table. At 11:47 a.m. on a normal Tuesday, three of those writers all hold pointer version 8421 in memory and all decide to commit a new snapshot. Exactly one of them wins the catalog CAS introduced in the previous chapter. The other two get a clean error — and what they do in the next 200 milliseconds is the entire subject of this chapter. Optimistic concurrency works because the answer to a lost CAS is almost always cheap; serializability works because the catalog imposes a single global order on commits, and every writer agrees to live in that order.

Lakehouse writers do not lock. They write data files first, then attempt one compare-and-swap on the catalog pointer; the loser of the race rebuilds its manifest tree against the new parent and tries again. Most retries are cheap because appends are commutative — the second writer just appends its manifest entry to the new list and retries. Conflicts that are not commutative — a delete versus an append on the same row, two compactions on the same files — are the ones the protocol must refuse, not silently merge. Serializable isolation falls out of one global order: the order of successful CASes on the catalog row.

What optimistic actually means here

The classical lock-based concurrency control says: before you write, take a lock on what you intend to change; release it on commit. Pessimistic. It assumes conflicts are common and pre-empts them. The optimistic dual says: write speculatively, then at the end check whether anyone else committed a conflicting change while you were writing; if no, commit; if yes, abort and retry. The check-and-commit step is the only critical section.

For a lakehouse this trade-off lands hard on the optimistic side. The "expensive" part of a commit is producing the data files — minutes of Spark or Flink work, hundreds of MB of Parquet. The "cheap" part is the metadata flip — kilobytes of Avro and JSON plus one CAS. Why optimistic wins on this asymmetry: under contention, a pessimistic lock-based scheme would force every writer to serialize through the lock for the entire data-write duration — minutes per writer. Optimistic schemes only serialize through the CAS step at the end, which is milliseconds. So one losing writer's retry costs only the rebuild-and-CAS, not the re-execution of its data work. With 99% of commits succeeding on first try and the 1% that don't paying ms-not-minutes, the system throughput approaches the single-writer throughput multiplied by the writer count.

Pessimistic vs optimistic shapes for a lakehouse commitTwo stacked timelines. Top: pessimistic — three writers serialised end-to-end behind a table lock, each waiting minutes for the previous to finish all phases. Bottom: optimistic — three writers running data writes in parallel, then serialising only on a brief CAS step at the end. The optimistic shape finishes ~3x sooner. Optimistic shapes serialise only the cheap step Pessimistic (table lock held through data write) writer A: data + commit writer B: WAIT, then data + commit writer C: WAIT, then data + commit total wall time = 3 × per-writer time Optimistic (CAS only at end) A: data files in parallel CAS ✓ B: data files in parallel CAS ✗ → rebuild → CAS ✓ C: data files in parallel CAS ✗✗ → CAS ✓ total wall time ≈ 1 × per-writer time + a few ms of CAS retries
The pessimistic shape forces every writer to wait through the previous writer's full data-write phase. The optimistic shape lets data writes happen in parallel and only serialises on the catalog CAS at the end. The CAS itself is a sub-millisecond operation; even three retries cost less than 10 ms.

The asymmetry is the design's whole reason for existing. If lakehouse writers had ms-scale data writes, pessimistic would be fine — Postgres survives this way at OLTP scale. With minutes-to-hours data writes and contention only at commit time, optimistic is the only design that scales to dozens of concurrent writers without queueing them.

A retryable commit, in code

The commit-with-retry loop is small enough to fit in one screen. Here it is, with the conflict-detection logic that distinguishes "safe to merge" from "must abort".

# optimistic_commit.py — the retry loop a real Iceberg/Delta writer runs.
# Demonstrates: parent-version CAS, append-merge on retry, abort on non-commutative conflict.

import json, time, uuid, threading, random

class CASLost(Exception):
    """Raised when the catalog pointer has moved since we read it."""

class ConflictAbort(Exception):
    """Raised when the change cannot be safely re-merged onto the new parent."""

class Catalog:
    def __init__(self):
        self.pointer = "v0"
        self.history = {"v0": {"manifests": [], "deletes": [], "version": 0}}
        self.lock = threading.Lock()

    def read(self):
        with self.lock:
            return self.pointer, dict(self.history[self.pointer])

    def cas(self, expected, new_id, new_state):
        with self.lock:
            if self.pointer != expected:
                raise CASLost(f"pointer moved {expected} -> {self.pointer}")
            self.pointer = new_id
            self.history[new_id] = new_state
            return new_id

def commit_append(catalog, op, max_retries=4):
    """op = {'kind': 'append'|'delete', 'manifest': str, 'targets': [file_paths]}."""
    for attempt in range(max_retries):
        parent_id, parent = catalog.read()

        if op["kind"] == "append":
            # Appends are commutative: just splice our manifest onto whatever's there.
            new_state = {
                "manifests": parent["manifests"] + [op["manifest"]],
                "deletes":   parent["deletes"],
                "version":   parent["version"] + 1,
            }
        elif op["kind"] == "delete":
            # Deletes must check that the targets still exist on the new parent.
            still_present = [t for t in op["targets"] if t in parent["manifests"]]
            if not still_present:
                raise ConflictAbort(f"delete targets all gone on parent {parent_id}")
            new_state = {
                "manifests": [m for m in parent["manifests"] if m not in op["targets"]],
                "deletes":   parent["deletes"] + op["targets"],
                "version":   parent["version"] + 1,
            }

        new_id = f"v{new_state['version']}-{uuid.uuid4().hex[:4]}"
        try:
            return catalog.cas(parent_id, new_id, new_state)
        except CASLost:
            time.sleep(0.001 * (2 ** attempt) + random.random() * 0.001)
            continue
    raise ConflictAbort(f"gave up after {max_retries} retries")

# --- demo: 3 writers racing ---
cat = Catalog()
results, errors = [], []
def writer(name, op):
    try: results.append((name, commit_append(cat, op)))
    except Exception as e: errors.append((name, str(e)))

ts = [
    threading.Thread(target=writer, args=("A", {"kind":"append","manifest":"m-A.avro"})),
    threading.Thread(target=writer, args=("B", {"kind":"append","manifest":"m-B.avro"})),
    threading.Thread(target=writer, args=("C", {"kind":"append","manifest":"m-C.avro"})),
]
for t in ts: t.start()
for t in ts: t.join()
print("results:", results)
print("final pointer:", cat.pointer, "manifests:", cat.history[cat.pointer]["manifests"])

# Sample run:
# results: [('A', 'v1-3a8f'), ('B', 'v2-7e2c'), ('C', 'v3-91d4')]
# final pointer: v3-91d4 manifests: ['m-A.avro', 'm-B.avro', 'm-C.avro']

Walk the load-bearing lines:

The thing to internalise is that commit_append is a closed loop — no external coordinator needs to tell the writer it lost. The CAS error is the signal. That property is what lets the protocol scale to hundreds of writers without a coordination service in front.

Serializability, and why this is enough to claim it

A history of database operations is serializable if it is equivalent to some serial order of those same operations executed one after another. The classical formulation requires conflict-equivalence: two histories that produce the same read-from / writes-to relations are equivalent.

For a lakehouse, the proof is short: every successful commit succeeds via exactly one CAS on a single catalog row. The catalog row's mutation log — v0 → v1 → v2 → v3 → ... — is a total order. Why this gives us serializability for free: take that linear sequence of pointer values as the serial order. Any execution of writers, in any concurrent shape, produces a final state identical to running the corresponding commits in that pointer-history order, because each commit's effect is fully determined by its parent state and its operation, and the parent state at the moment of CAS is the immediately-prior pointer value. So the concurrent execution is conflict-equivalent to a serial execution. That is the textbook definition of serializable.

Concurrent writes serialised by the catalog pointer historyDiagram with three concurrent writer timelines on the left and a single linear catalog-pointer history on the right. Arrows from writer commits to pointer states show that the concurrent execution maps to one serial order: A's commit becomes v1, B's commit becomes v2, C's commit becomes v3. Three concurrent commits, one serial order writer A writer B writer C concurrent attempts in wall-clock time catalog pointer history v0 → v1 (A's commit) v1 → v2 (B's commit) v2 → v3 (C's commit) linear history is the serial order
Concurrent commits on the left collapse to a single linear sequence of catalog pointer transitions on the right. The order in which writers won their CASes — not the order in which they started, finished, or thought they would commit — is the serial order their effects took on the table. A reader at any pointer value sees the table as if the commits had run one after another in that order.

That said, the property only holds if the catalog actually does atomic CAS. On a Hive Metastore configured without LockManager, on a Glue catalog called via a non-conditional UpdateTable, on a custom file-based "catalog" that just renames a file in S3 — the CAS is not atomic, and the serializability claim collapses. This is the single most-common cause of lakehouse data corruption in production, and Build 12's later chapters return to it. The protocol's correctness is contingent on a correct CAS implementation; the protocol cannot manufacture one out of nothing.

A subtle point on what isolation level this gives readers. Because a reader pins itself to a single metadata.json for its query duration (chapter 89), the reader sees a snapshot that is the result of some prefix of the commit history. So readers are at snapshot isolation. Writers, who serialise through the CAS, are at serializable. This combination — readers at SI, writers at SER — is exactly what most production OLAP systems offer, and is enough for analytics workloads where readers do not need to also block writers.

When the retry doesn't save you

Three patterns force the retry loop to give up. Knowing them is the difference between a 3 a.m. page and a peaceful on-call.

1. Append vs delete on the same row. A streaming writer appends a row to payments while a GDPR delete job removes the row matching customer_id = 'C-998211'. If the delete commits first and the streaming append's parent included a now-stale row, the retry has to reckon with whether the new append re-introduces the deleted row. Iceberg v2 equality-deletes detect this: the append's commit sees the equality-delete predicate on the new parent, applies it to its own new file's rows, and either drops them or — if a row that the writer considers authoritative would be deleted — aborts. The right answer in a pipeline is application-specific, which is why the protocol exposes the abort rather than guessing.

2. Two compactions selecting overlapping file sets. Compaction reads N small Parquet files, writes 1 large one, and commits a manifest swap that says "manifest entries pointing at these N files are replaced by a manifest entry pointing at this 1 file". If two compaction jobs both selected m-A.avro as a source, only one's swap is valid; the other's source files are gone after the first commits. The second's retry rebuilds against the new parent and notices its source files are missing — abort. Production teams partition compaction work by partition key precisely to avoid this.

3. Schema evolution disagreements. Writer A wants to add column device_type STRING. Writer B wants to add column device_type INT. Whoever commits first wins the schema; the other's commit must abort because the schema-evolution operation is not commutative with itself. Why letting the loser silently re-resolve is wrong: if writer B's commit had included a re-cast of all its data files to the new INT schema, but A's STRING schema won, B's data files would now be schema-incompatible with the table. The right behaviour is a loud abort that surfaces to the operator, who decides whose schema change is correct. This is the failure mode where "the protocol is too eager to merge" leads to data corruption months later.

A Cred analytics team learned this the hard way in late 2024: a compaction job and a row-rewrite GDPR job ran in overlapping windows on the same transactions partitions. Neither aborted because they were running on a Delta table with a misconfigured concurrency check. The compaction wrote new merged files; the GDPR job's commit "won" the CAS but pointed at files that had just been retired. Queries failed for 90 minutes with FileNotFoundException. The fix was to enable Delta's conflictDetection.delete-and-compaction check, which is on by default in newer versions but had been switched off during a prior debug session and never restored. The post-mortem's headline: "we silently merged conflicts that should have aborted".

What writers and readers actually agree on

Take three concrete questions a working data engineer at Swiggy might ask, and trace them through the model.

Q1: My streaming job has been writing to orders for 8 hours. A backfill job started 30 minutes ago. Will the backfill see my latest orders? A1: depends on when the backfill opened its metadata. If it read the catalog at t=−30min, it has snapshot v=N pinned and will read everything up to N. Your streaming commits at v=N+1, N+2, N+3 are invisible to that backfill. If the backfill re-opens the catalog after you commit, it will see your new commits. Most backfill jobs deliberately pin the snapshot at start to get a deterministic view; that determinism is the SI guarantee.

Q2: Two of my Flink jobs are writing to the same Iceberg table at 2k events/sec each. Will I lose data? A2: not if the table format and catalog implement CAS correctly. You will see retry rates around 1–3% on append-only writes (the manifest-rebuild path), zero data loss, and a slight latency tail on the losers. If retry rates climb to 20%+, your contention pattern is wrong — usually because both writers are committing on every event rather than batching, or because the catalog is itself the bottleneck and is taking 100+ ms per CAS.

Q3: A long-running Trino query is reading the table while my streaming writer commits 50 new snapshots. Does the query slow down? A3: no — the query holds a fixed metadata pointer it read at the start. Your 50 writes don't touch its file list. The query's only cost is the data files it was already going to read. The streaming writer pays only the CAS cost per commit; the reader pays nothing for the writer's activity. This decoupling is the whole reason lakehouse SI exists, and the property that lets analytics and ingest share a table without coordination.

Common confusions

Going deeper

Why exponential backoff with jitter is the production-critical detail

Jitter feels like a footnote until you watch a 50-writer cluster collapse without it. The Cred infra team in 2025 ran an experiment: a synthetic 30-writer benchmark on a single partition, with three retry strategies. Pure linear backoff: 35% retry rate, average 4.1 retries per commit. Pure exponential without jitter: 22% retry rate, but the long tail had 1.2% of commits hitting 8+ retries because losers all backed off the same amount and collided again. Exponential plus jitter (the AWS pattern, min(base * 2^attempt, cap) * (0.5 + random)): 6% retry rate, 1.04 average retries, no commits over 4 retries. The jitter buys a 4× retry-rate improvement at zero implementation cost. Production-grade lakehouse writers (Iceberg's Java client, Delta's Spark connector) all ship with jitter on by default.

Iceberg's IsolationLevel and the snapshot-isolation knob

Iceberg exposes two write isolation levels: serializable (the default) and snapshot. The snapshot level skips the equality-delete-vs-new-rows check during retry, which means a writer that adds rows can commit even if a concurrent equality delete would have removed those rows. serializable always re-evaluates the predicates. The trade-off is operational: snapshot retries succeed more often (~5× higher retry success in benchmarks) but lets through certain anomalous histories that pure SER would refuse. Razorpay's payments table runs at serializable because the cost of incorrectness on financial records is unbounded; their analytics-event tables run at snapshot because the cost is "an analytical row gets re-resurrected by a stream replay", which is recoverable.

Why Delta and Iceberg detect different conflicts

Delta's OptimisticTransaction checks: schema-mismatch, file-add-on-deleted-path, file-delete-on-deleted-path, metadata-changes-overlap. Iceberg checks: row-level filter conflicts, equality-delete vs append, schema-evolution conflicts. The two formats made different choices about which conflicts to detect at commit time vs. defer to read time. Delta opted for an "all conflicts surface at commit" model; Iceberg for "some conflicts (positional deletes) surface at read time as 'apply this delete file when scanning'". The result is that Delta retry storms can be more aggressive (rejecting more) but Iceberg's read paths are slightly more complex (more delete-file application). Reading both formats' commit-conflict source files (OptimisticTransaction.scala in Delta, BaseRowDelta.java in Iceberg) side-by-side is one of the best ways to internalise lakehouse trade-offs.

Multi-table commits and why they remain unsolved

Optimistic concurrency at one-table scope is solved. Cross-table atomicity is not. A common Indian fintech pattern — atomically update transactions, wallet_balances, and notifications together — has no clean lakehouse answer. Three workarounds in production: (a) use a catalog with multi-row CAS (Polaris with proper config, Snowflake's catalog), giving cross-table SER but locking you to that vendor; (b) use Iceberg branches and merge at the end, which is operationally heavy; (c) write to a staging table, then perform a single-table commit that "publishes" by atomically swapping the staging-to-prod pointer. Most teams pick (c) and accept that the staging window is a partial-failure surface. The problem is structurally the same as cross-shard transactions in OLTP — you need a 2PC or a global timestamp authority, and lakehouse formats deliberately don't ship one.

The retry budget as a circuit breaker

Default retries are 4 in Iceberg, 100,000 (configurable) in Delta. The defaults reflect different philosophies: Iceberg treats high retry rates as a signal something is wrong upstream and forces the operator to look at it; Delta treats retries as best-handled silently, surfacing only sustained failure. In practice, neither default is "right" — production teams set retry budgets based on their tolerance for write failures vs. their tolerance for slow writes. A streaming job at PhonePe processing 100k UPI confirmations per second would set retries to 8 with 50ms cap (retries are cheap, failures are expensive). A daily backfill job would set retries to 32 with no time cap (the job is already long-running). The wrong default is "use whatever the docs say"; the right default is "what does my workload's failure cost look like".

Where this leads next

The next chapter, /wiki/z-ordering-and-data-skipping, is about the read side of the same metadata tree — how clustering data files by predicate locality lets the manifest stats prune more aggressively. The CAS-then-rebuild model from this chapter is also what makes Z-order re-clustering safe to run as a background job: it is a compaction commit, subject to the same optimistic-retry protocol, with conflict detection that prevents two re-clusterings from corrupting each other.

The chapter after, /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi, is about row-level update strategies that change how often the conflict-detection rules in this chapter trigger. CoW writes new files on every update (high write amplification, simple conflicts); MoR writes delete-files alongside data (low write amp, more complex conflict detection). The protocol shape is the same; the conflict density is very different.

After that the build moves to streaming-into-lakehouse and CDC patterns, both of which produce many small commits — workloads where the retry-rate analysis from this chapter is the dominant operational concern.

References