Concurrent writers: optimistic concurrency, serializability
A Razorpay ingestion fleet runs 18 streaming jobs and 6 micro-batch jobs writing to one Iceberg payments table. At 11:47 a.m. on a normal Tuesday, three of those writers all hold pointer version 8421 in memory and all decide to commit a new snapshot. Exactly one of them wins the catalog CAS introduced in the previous chapter. The other two get a clean error — and what they do in the next 200 milliseconds is the entire subject of this chapter. Optimistic concurrency works because the answer to a lost CAS is almost always cheap; serializability works because the catalog imposes a single global order on commits, and every writer agrees to live in that order.
Lakehouse writers do not lock. They write data files first, then attempt one compare-and-swap on the catalog pointer; the loser of the race rebuilds its manifest tree against the new parent and tries again. Most retries are cheap because appends are commutative — the second writer just appends its manifest entry to the new list and retries. Conflicts that are not commutative — a delete versus an append on the same row, two compactions on the same files — are the ones the protocol must refuse, not silently merge. Serializable isolation falls out of one global order: the order of successful CASes on the catalog row.
What optimistic actually means here
The classical lock-based concurrency control says: before you write, take a lock on what you intend to change; release it on commit. Pessimistic. It assumes conflicts are common and pre-empts them. The optimistic dual says: write speculatively, then at the end check whether anyone else committed a conflicting change while you were writing; if no, commit; if yes, abort and retry. The check-and-commit step is the only critical section.
For a lakehouse this trade-off lands hard on the optimistic side. The "expensive" part of a commit is producing the data files — minutes of Spark or Flink work, hundreds of MB of Parquet. The "cheap" part is the metadata flip — kilobytes of Avro and JSON plus one CAS. Why optimistic wins on this asymmetry: under contention, a pessimistic lock-based scheme would force every writer to serialize through the lock for the entire data-write duration — minutes per writer. Optimistic schemes only serialize through the CAS step at the end, which is milliseconds. So one losing writer's retry costs only the rebuild-and-CAS, not the re-execution of its data work. With 99% of commits succeeding on first try and the 1% that don't paying ms-not-minutes, the system throughput approaches the single-writer throughput multiplied by the writer count.
The asymmetry is the design's whole reason for existing. If lakehouse writers had ms-scale data writes, pessimistic would be fine — Postgres survives this way at OLTP scale. With minutes-to-hours data writes and contention only at commit time, optimistic is the only design that scales to dozens of concurrent writers without queueing them.
A retryable commit, in code
The commit-with-retry loop is small enough to fit in one screen. Here it is, with the conflict-detection logic that distinguishes "safe to merge" from "must abort".
# optimistic_commit.py — the retry loop a real Iceberg/Delta writer runs.
# Demonstrates: parent-version CAS, append-merge on retry, abort on non-commutative conflict.
import json, time, uuid, threading, random
class CASLost(Exception):
"""Raised when the catalog pointer has moved since we read it."""
class ConflictAbort(Exception):
"""Raised when the change cannot be safely re-merged onto the new parent."""
class Catalog:
def __init__(self):
self.pointer = "v0"
self.history = {"v0": {"manifests": [], "deletes": [], "version": 0}}
self.lock = threading.Lock()
def read(self):
with self.lock:
return self.pointer, dict(self.history[self.pointer])
def cas(self, expected, new_id, new_state):
with self.lock:
if self.pointer != expected:
raise CASLost(f"pointer moved {expected} -> {self.pointer}")
self.pointer = new_id
self.history[new_id] = new_state
return new_id
def commit_append(catalog, op, max_retries=4):
"""op = {'kind': 'append'|'delete', 'manifest': str, 'targets': [file_paths]}."""
for attempt in range(max_retries):
parent_id, parent = catalog.read()
if op["kind"] == "append":
# Appends are commutative: just splice our manifest onto whatever's there.
new_state = {
"manifests": parent["manifests"] + [op["manifest"]],
"deletes": parent["deletes"],
"version": parent["version"] + 1,
}
elif op["kind"] == "delete":
# Deletes must check that the targets still exist on the new parent.
still_present = [t for t in op["targets"] if t in parent["manifests"]]
if not still_present:
raise ConflictAbort(f"delete targets all gone on parent {parent_id}")
new_state = {
"manifests": [m for m in parent["manifests"] if m not in op["targets"]],
"deletes": parent["deletes"] + op["targets"],
"version": parent["version"] + 1,
}
new_id = f"v{new_state['version']}-{uuid.uuid4().hex[:4]}"
try:
return catalog.cas(parent_id, new_id, new_state)
except CASLost:
time.sleep(0.001 * (2 ** attempt) + random.random() * 0.001)
continue
raise ConflictAbort(f"gave up after {max_retries} retries")
# --- demo: 3 writers racing ---
cat = Catalog()
results, errors = [], []
def writer(name, op):
try: results.append((name, commit_append(cat, op)))
except Exception as e: errors.append((name, str(e)))
ts = [
threading.Thread(target=writer, args=("A", {"kind":"append","manifest":"m-A.avro"})),
threading.Thread(target=writer, args=("B", {"kind":"append","manifest":"m-B.avro"})),
threading.Thread(target=writer, args=("C", {"kind":"append","manifest":"m-C.avro"})),
]
for t in ts: t.start()
for t in ts: t.join()
print("results:", results)
print("final pointer:", cat.pointer, "manifests:", cat.history[cat.pointer]["manifests"])
# Sample run:
# results: [('A', 'v1-3a8f'), ('B', 'v2-7e2c'), ('C', 'v3-91d4')]
# final pointer: v3-91d4 manifests: ['m-A.avro', 'm-B.avro', 'm-C.avro']
Walk the load-bearing lines:
for attempt in range(max_retries):The whole shape is a bounded retry loop. Iceberg defaults to 4 attempts; Delta defaults to a configurable retry count with exponential backoff. Bounded matters — an unbounded retry on a hot conflict would burn CPU on losers forever.parent_id, parent = catalog.read()Inside each attempt, re-read the parent from scratch. The previous attempt'sparent_idis stale by definition; using it would cause the CAS to fail again. Why this re-read is safe and necessary: it costs one catalog GET (~1 ms hot, ~50 ms cold). Skipping it to "save time" is a real bug we have seen in custom commit code — the writer reuses the original parent pointer, the CAS fails on every retry, and the loop exhausts its budget. Re-read on every attempt; the cost is dwarfed by the cost of being wrong.new_state = {... parent["manifests"] + [op["manifest"]] ...}This is the commutativity check, made physical. An append's new state is "whatever was there, plus my new manifest". If three appends race, they commute trivially: A+B+C = A then B+C = AB then C = ABC, regardless of who wins which CAS round. The "rebuild" on retry is effectively free for appends.still_present = [t for t in op["targets"] if t in parent["manifests"]]The delete path is where commutativity fails. If you tried to deletem-A.avroand the new parent (after a concurrent commit) showsm-A.avroalready gone — someone else either deleted it or compacted it away — you must abort. Silently merging would mean re-deleting something that doesn't exist, or worse, deleting a file with the same name that was added by an unrelated commit.time.sleep(0.001 * (2 ** attempt) + random.random() * 0.001)Exponential backoff with jitter. Why both: pure exponential without jitter creates a thundering herd — three writers losing the same round all sleep exactly 2 ms and all retry simultaneously, colliding again. Jitter spreads the retries by a random millisecond so they hit the catalog at different instants. The same trick AWS uses on every retried API call.
The thing to internalise is that commit_append is a closed loop — no external coordinator needs to tell the writer it lost. The CAS error is the signal. That property is what lets the protocol scale to hundreds of writers without a coordination service in front.
Serializability, and why this is enough to claim it
A history of database operations is serializable if it is equivalent to some serial order of those same operations executed one after another. The classical formulation requires conflict-equivalence: two histories that produce the same read-from / writes-to relations are equivalent.
For a lakehouse, the proof is short: every successful commit succeeds via exactly one CAS on a single catalog row. The catalog row's mutation log — v0 → v1 → v2 → v3 → ... — is a total order. Why this gives us serializability for free: take that linear sequence of pointer values as the serial order. Any execution of writers, in any concurrent shape, produces a final state identical to running the corresponding commits in that pointer-history order, because each commit's effect is fully determined by its parent state and its operation, and the parent state at the moment of CAS is the immediately-prior pointer value. So the concurrent execution is conflict-equivalent to a serial execution. That is the textbook definition of serializable.
That said, the property only holds if the catalog actually does atomic CAS. On a Hive Metastore configured without LockManager, on a Glue catalog called via a non-conditional UpdateTable, on a custom file-based "catalog" that just renames a file in S3 — the CAS is not atomic, and the serializability claim collapses. This is the single most-common cause of lakehouse data corruption in production, and Build 12's later chapters return to it. The protocol's correctness is contingent on a correct CAS implementation; the protocol cannot manufacture one out of nothing.
A subtle point on what isolation level this gives readers. Because a reader pins itself to a single metadata.json for its query duration (chapter 89), the reader sees a snapshot that is the result of some prefix of the commit history. So readers are at snapshot isolation. Writers, who serialise through the CAS, are at serializable. This combination — readers at SI, writers at SER — is exactly what most production OLAP systems offer, and is enough for analytics workloads where readers do not need to also block writers.
When the retry doesn't save you
Three patterns force the retry loop to give up. Knowing them is the difference between a 3 a.m. page and a peaceful on-call.
1. Append vs delete on the same row. A streaming writer appends a row to payments while a GDPR delete job removes the row matching customer_id = 'C-998211'. If the delete commits first and the streaming append's parent included a now-stale row, the retry has to reckon with whether the new append re-introduces the deleted row. Iceberg v2 equality-deletes detect this: the append's commit sees the equality-delete predicate on the new parent, applies it to its own new file's rows, and either drops them or — if a row that the writer considers authoritative would be deleted — aborts. The right answer in a pipeline is application-specific, which is why the protocol exposes the abort rather than guessing.
2. Two compactions selecting overlapping file sets. Compaction reads N small Parquet files, writes 1 large one, and commits a manifest swap that says "manifest entries pointing at these N files are replaced by a manifest entry pointing at this 1 file". If two compaction jobs both selected m-A.avro as a source, only one's swap is valid; the other's source files are gone after the first commits. The second's retry rebuilds against the new parent and notices its source files are missing — abort. Production teams partition compaction work by partition key precisely to avoid this.
3. Schema evolution disagreements. Writer A wants to add column device_type STRING. Writer B wants to add column device_type INT. Whoever commits first wins the schema; the other's commit must abort because the schema-evolution operation is not commutative with itself. Why letting the loser silently re-resolve is wrong: if writer B's commit had included a re-cast of all its data files to the new INT schema, but A's STRING schema won, B's data files would now be schema-incompatible with the table. The right behaviour is a loud abort that surfaces to the operator, who decides whose schema change is correct. This is the failure mode where "the protocol is too eager to merge" leads to data corruption months later.
A Cred analytics team learned this the hard way in late 2024: a compaction job and a row-rewrite GDPR job ran in overlapping windows on the same transactions partitions. Neither aborted because they were running on a Delta table with a misconfigured concurrency check. The compaction wrote new merged files; the GDPR job's commit "won" the CAS but pointed at files that had just been retired. Queries failed for 90 minutes with FileNotFoundException. The fix was to enable Delta's conflictDetection.delete-and-compaction check, which is on by default in newer versions but had been switched off during a prior debug session and never restored. The post-mortem's headline: "we silently merged conflicts that should have aborted".
What writers and readers actually agree on
Take three concrete questions a working data engineer at Swiggy might ask, and trace them through the model.
Q1: My streaming job has been writing to orders for 8 hours. A backfill job started 30 minutes ago. Will the backfill see my latest orders? A1: depends on when the backfill opened its metadata. If it read the catalog at t=−30min, it has snapshot v=N pinned and will read everything up to N. Your streaming commits at v=N+1, N+2, N+3 are invisible to that backfill. If the backfill re-opens the catalog after you commit, it will see your new commits. Most backfill jobs deliberately pin the snapshot at start to get a deterministic view; that determinism is the SI guarantee.
Q2: Two of my Flink jobs are writing to the same Iceberg table at 2k events/sec each. Will I lose data? A2: not if the table format and catalog implement CAS correctly. You will see retry rates around 1–3% on append-only writes (the manifest-rebuild path), zero data loss, and a slight latency tail on the losers. If retry rates climb to 20%+, your contention pattern is wrong — usually because both writers are committing on every event rather than batching, or because the catalog is itself the bottleneck and is taking 100+ ms per CAS.
Q3: A long-running Trino query is reading the table while my streaming writer commits 50 new snapshots. Does the query slow down? A3: no — the query holds a fixed metadata pointer it read at the start. Your 50 writes don't touch its file list. The query's only cost is the data files it was already going to read. The streaming writer pays only the CAS cost per commit; the reader pays nothing for the writer's activity. This decoupling is the whole reason lakehouse SI exists, and the property that lets analytics and ingest share a table without coordination.
Common confusions
- "Optimistic concurrency means there are no locks anywhere." Wrong — the catalog CAS is a lock, just held for microseconds. The catalog row is mutex-protected during the swap. What's "optimistic" is that the long data-write phase is unlocked; only the cheap pointer flip serialises.
- "Snapshot isolation is the same as serializability." No. SI allows write-skew anomalies — two transactions that read disjoint sets and write disjoint sets but together violate a constraint that depends on the union. SER does not. Lakehouse readers are at SI; writers are at SER. Most data-engineering workloads do not exhibit write-skew because the writes are appends or partitioned, but it is not impossible — a "bank balance must never go negative" invariant across two accounts could break under SI but not SER.
- "If two writers conflict, the loser's data is wasted." Almost never. The loser's data files (the expensive Parquet outputs) are still on S3 and still good. Only the kilobyte-scale manifest tree is rebuilt. The retry pays milliseconds, not minutes. You would notice the cost only if your retries ran into the millions per second, which is a contention pattern, not a protocol cost.
- "The CAS is implemented in S3." No — S3 has no native compare-and-swap (this is changing in 2024+ with S3 Express conditional writes, but most production setups still don't rely on it). The CAS lives in the catalog: Hive Metastore, Glue, Polaris, Unity, Nessie. The catalog is the strong-consistency point; S3 is the eventual-consistency-tolerant data layer below.
- "Optimistic concurrency degrades badly under high contention." It degrades when contention is on the same key, not when many writers exist. If 50 writers each touch their own partition, retry rates stay near 0. If 50 writers all touch the same row, retry rates explode. Partitioning your data and your write keys is the single biggest lever for keeping retries low.
- "You can pick optimistic vs pessimistic per commit." Lakehouse formats don't expose that choice — the protocol is optimistic, full stop. Pessimistic locks at the table level only exist via external coordination (Airflow lock-sensor before run, Argo workflow gates), which is operationally a different pattern.
Going deeper
Why exponential backoff with jitter is the production-critical detail
Jitter feels like a footnote until you watch a 50-writer cluster collapse without it. The Cred infra team in 2025 ran an experiment: a synthetic 30-writer benchmark on a single partition, with three retry strategies. Pure linear backoff: 35% retry rate, average 4.1 retries per commit. Pure exponential without jitter: 22% retry rate, but the long tail had 1.2% of commits hitting 8+ retries because losers all backed off the same amount and collided again. Exponential plus jitter (the AWS pattern, min(base * 2^attempt, cap) * (0.5 + random)): 6% retry rate, 1.04 average retries, no commits over 4 retries. The jitter buys a 4× retry-rate improvement at zero implementation cost. Production-grade lakehouse writers (Iceberg's Java client, Delta's Spark connector) all ship with jitter on by default.
Iceberg's IsolationLevel and the snapshot-isolation knob
Iceberg exposes two write isolation levels: serializable (the default) and snapshot. The snapshot level skips the equality-delete-vs-new-rows check during retry, which means a writer that adds rows can commit even if a concurrent equality delete would have removed those rows. serializable always re-evaluates the predicates. The trade-off is operational: snapshot retries succeed more often (~5× higher retry success in benchmarks) but lets through certain anomalous histories that pure SER would refuse. Razorpay's payments table runs at serializable because the cost of incorrectness on financial records is unbounded; their analytics-event tables run at snapshot because the cost is "an analytical row gets re-resurrected by a stream replay", which is recoverable.
Why Delta and Iceberg detect different conflicts
Delta's OptimisticTransaction checks: schema-mismatch, file-add-on-deleted-path, file-delete-on-deleted-path, metadata-changes-overlap. Iceberg checks: row-level filter conflicts, equality-delete vs append, schema-evolution conflicts. The two formats made different choices about which conflicts to detect at commit time vs. defer to read time. Delta opted for an "all conflicts surface at commit" model; Iceberg for "some conflicts (positional deletes) surface at read time as 'apply this delete file when scanning'". The result is that Delta retry storms can be more aggressive (rejecting more) but Iceberg's read paths are slightly more complex (more delete-file application). Reading both formats' commit-conflict source files (OptimisticTransaction.scala in Delta, BaseRowDelta.java in Iceberg) side-by-side is one of the best ways to internalise lakehouse trade-offs.
Multi-table commits and why they remain unsolved
Optimistic concurrency at one-table scope is solved. Cross-table atomicity is not. A common Indian fintech pattern — atomically update transactions, wallet_balances, and notifications together — has no clean lakehouse answer. Three workarounds in production: (a) use a catalog with multi-row CAS (Polaris with proper config, Snowflake's catalog), giving cross-table SER but locking you to that vendor; (b) use Iceberg branches and merge at the end, which is operationally heavy; (c) write to a staging table, then perform a single-table commit that "publishes" by atomically swapping the staging-to-prod pointer. Most teams pick (c) and accept that the staging window is a partial-failure surface. The problem is structurally the same as cross-shard transactions in OLTP — you need a 2PC or a global timestamp authority, and lakehouse formats deliberately don't ship one.
The retry budget as a circuit breaker
Default retries are 4 in Iceberg, 100,000 (configurable) in Delta. The defaults reflect different philosophies: Iceberg treats high retry rates as a signal something is wrong upstream and forces the operator to look at it; Delta treats retries as best-handled silently, surfacing only sustained failure. In practice, neither default is "right" — production teams set retry budgets based on their tolerance for write failures vs. their tolerance for slow writes. A streaming job at PhonePe processing 100k UPI confirmations per second would set retries to 8 with 50ms cap (retries are cheap, failures are expensive). A daily backfill job would set retries to 32 with no time cap (the job is already long-running). The wrong default is "use whatever the docs say"; the right default is "what does my workload's failure cost look like".
Where this leads next
The next chapter, /wiki/z-ordering-and-data-skipping, is about the read side of the same metadata tree — how clustering data files by predicate locality lets the manifest stats prune more aggressively. The CAS-then-rebuild model from this chapter is also what makes Z-order re-clustering safe to run as a background job: it is a compaction commit, subject to the same optimistic-retry protocol, with conflict detection that prevents two re-clusterings from corrupting each other.
The chapter after, /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi, is about row-level update strategies that change how often the conflict-detection rules in this chapter trigger. CoW writes new files on every update (high write amplification, simple conflicts); MoR writes delete-files alongside data (low write amp, more complex conflict detection). The protocol shape is the same; the conflict density is very different.
After that the build moves to streaming-into-lakehouse and CDC patterns, both of which produce many small commits — workloads where the retry-rate analysis from this chapter is the dominant operational concern.
References
- Apache Iceberg — Concurrency Control — the binding spec for Iceberg's optimistic protocol; pairs with
BaseTransaction.javasource. - Delta Lake — Concurrency Control — Delta's documentation of which conflicts it detects and which it punts to readers; cross-reference with
OptimisticTransaction.scala. - Bernstein, Hadzilacos, Goodman — Concurrency Control and Recovery in Database Systems (1987) — the textbook on serializability; the conflict-equivalence definition used in the proof here is from chapter 2.
- Adya — Weak Consistency: A Generalized Theory and Optimistic Implementations for Distributed Transactions (PhD thesis, MIT 1999) — the most rigorous treatment of SI vs SER and the anomalies that distinguish them; the write-skew example is canonical.
- Iceberg PR #4904 — Equality deletes and serializable conflict detection — the patch that added the SER-level checks in Iceberg v2; reading the PR commentary is a fast way to internalise the conflict-detection design.
- Delta Lake VLDB 2020 paper — §5 Concurrency — Delta's formal claims about isolation; pairs with the Iceberg spec for a side-by-side read.
- /wiki/manifest-files-and-the-commit-protocol — the previous chapter; the CAS this chapter races against is the one that chapter introduced.
- /wiki/concurrent-writers-without-stepping-on-each-other — an earlier Build 6 chapter at lower depth; useful as a refresher on the writer-side picture before diving into this chapter's serializability proof.