Concurrent writers without stepping on each other
At 02:30 a.m., Aditi's streaming job at a Bengaluru payments company flushes a 200 MB Parquet file into the payments table on S3. At 02:30:01, the nightly backfill job, kicked off by a different Airflow worker, also flushes its own file into the same partition. Both writers think they have committed. The morning dashboard, however, shows only one of the two batches — about ₹4 crore of transactions has silently vanished from analytics. The data is still on S3; the table just doesn't know about it.
Two writers committing to the same table at the same time will collide unless the table format has a protocol for resolving the race. All three modern lakehouse formats use optimistic concurrency — write your data files first, then attempt the metadata pointer flip; if you lose the race, replay your commit against the new state and try again. The trap is that the "atomic flip" must be backed by a real CAS-capable system; on naked S3 it is not, which is the most common cause of silent data loss in lakehouse production.
What "concurrent" actually means at the table layer
A lakehouse table is, mechanically, a folder of Parquet files plus a metadata pointer that says "the table at version N is this set of files". Two writers can safely produce data files in parallel — the filesystem hands out unique paths, and Parquet writers do not interfere with each other. The problem is the pointer flip. Both writers want to publish their commit and become version N+1. Only one of them can win; the other must retry against version N+1 to become version N+2.
So when we say "concurrent writers", we mean writers whose data-file-write phases overlap in wall-clock time, but whose commit phases — the metadata pointer flip — must be linearised. The whole game is in how that linearisation happens.
Why the data-file phase is parallel-safe but the commit phase is not: filesystems give every file a unique path on creation, so two writers cannot accidentally overwrite each other's data files. But the metadata pointer is a single named cell ("the table is version N"), and only one process can transition it from N to N+1. The whole concurrency story collapses to "what is the atomic-CAS primitive backing that single cell?"
Optimistic concurrency, written out as a protocol
All three modern lakehouse formats — Iceberg, Delta, Hudi — use optimistic concurrency control (OCC). Pessimistic locking would mean every writer takes a lock before writing, which kills throughput on cloud object stores where lock services are expensive and writers are many. OCC instead bets that conflicts are rare and pays the retry cost only when they actually happen.
The protocol, written generically, is six steps:
- Read the current snapshot. Writer fetches "the table is version N, here are its data files".
- Compute the planned commit. Writer decides "I will add files F1, F2; I will delete file F0; I will record stats S".
- Write data files. Writer puts F1 and F2 to S3. These have unique paths and cannot conflict.
- Build a new metadata snapshot locally. Writer constructs the version-N+1 metadata: manifest entries for F1, F2; tombstones for F0; updated table-level stats.
- Attempt the atomic commit. Writer calls catalog CAS: "if the table is currently at version N, set it to version N+1 with this metadata path".
- On success, done. On conflict, retry. If the catalog says "no, the table is now at version N+1", writer re-reads the new state, replays its planned commit against version N+1 (this is where the format's conflict-detection logic lives), and tries CAS again as version N+2.
The protocol's correctness rests on one assumption: step 5 is a true CAS (compare-and-swap), not a "read then write". On a database-backed catalog (Glue, Nessie, JDBC), this is a single transaction. On a pure-filesystem table layout (Delta on local disk, Iceberg with a Hadoop-style catalog), this is a filesystem rename. On naked S3, neither is available — and that is where the failure modes live. Why "read then write" is not a substitute for CAS: between your read of "version is N" and your write of "version is N+1", another writer can read N, write N+1, and you will then overwrite their N+1 with your own N+1, silently losing their commit. The CAS atomic compares-and-sets in one indivisible step; the read-then-write does not. There is no amount of "be careful" code on the client that fixes this — the storage substrate must offer CAS or you have lost.
A worked Python implementation that you can run
Below is a complete OCC-based commit coordinator for a tiny lakehouse-style table. It uses SQLite as the catalog (because SQLite gives you real CAS through BEGIN IMMEDIATE), spawns two writers from threads, and shows that one writer wins, the other retries, and the final table state is consistent. Run it; the output is reproducible.
# occ_commit.py — two writers, optimistic concurrency, deterministic correctness.
import os, json, time, uuid, random, sqlite3
import threading
from concurrent.futures import ThreadPoolExecutor
ROOT = "/tmp/occ_lake"
DB = f"{ROOT}/_catalog.db"
def init_lake():
os.makedirs(ROOT, exist_ok=True)
con = sqlite3.connect(DB, isolation_level=None)
con.execute("CREATE TABLE IF NOT EXISTS catalog "
"(table_name TEXT PRIMARY KEY, version INT, manifest TEXT)")
con.execute("INSERT OR IGNORE INTO catalog VALUES ('payments', 0, '[]')")
con.close()
def read_snapshot():
con = sqlite3.connect(DB, isolation_level=None)
row = con.execute("SELECT version, manifest FROM catalog "
"WHERE table_name='payments'").fetchone()
con.close()
return row[0], json.loads(row[1])
def write_data_file(writer_id, batch):
path = f"{ROOT}/{writer_id}_{uuid.uuid4().hex[:8]}.parquet"
with open(path, "w") as f: json.dump(batch, f)
return {"path": path, "rows": len(batch), "writer": writer_id}
def cas_commit(expected_version, new_manifest):
"""True CAS via SQLite BEGIN IMMEDIATE."""
con = sqlite3.connect(DB, isolation_level=None)
try:
con.execute("BEGIN IMMEDIATE")
cur = con.execute("SELECT version FROM catalog "
"WHERE table_name='payments'")
actual = cur.fetchone()[0]
if actual != expected_version:
con.execute("ROLLBACK")
return False
con.execute("UPDATE catalog SET version=?, manifest=? "
"WHERE table_name='payments'",
(expected_version + 1, json.dumps(new_manifest)))
con.execute("COMMIT")
return True
finally:
con.close()
def writer(writer_id, batch):
retries = 0
while True:
version, manifest = read_snapshot() # step 1
new_file = write_data_file(writer_id, batch) # step 3
new_manifest = manifest + [new_file] # step 4
if cas_commit(version, new_manifest): # step 5
return version + 1, retries
retries += 1
time.sleep(random.uniform(0.001, 0.01)) # back-off
if __name__ == "__main__":
init_lake()
batches = [[{"txn": f"T{i}-A", "amount": 100} for i in range(50)],
[{"txn": f"T{i}-B", "amount": 200} for i in range(50)]]
with ThreadPoolExecutor(max_workers=2) as ex:
f1 = ex.submit(writer, "A", batches[0])
f2 = ex.submit(writer, "B", batches[1])
r1, r2 = f1.result(), f2.result()
print(f"writer A landed at v{r1[0]} after {r1[1]} retries")
print(f"writer B landed at v{r2[0]} after {r2[1]} retries")
final_v, final_m = read_snapshot()
print(f"final table version: {final_v}; files: {len(final_m)}")
# Sample run:
writer A landed at v1 after 0 retries
writer B landed at v2 after 1 retries
final table version: 2; files: 2
The lines that matter: con.execute("BEGIN IMMEDIATE") is the keystone. SQLite's BEGIN IMMEDIATE acquires a reserved lock immediately, so the subsequent SELECT + UPDATE is a real CAS. if actual != expected_version: con.execute("ROLLBACK"); return False is the conflict-detect — if the version moved while we were preparing our commit, we abort and let the writer retry. time.sleep(random.uniform(0.001, 0.01)) is exponential-backoff in spirit; without jitter, two retrying writers can keep colliding indefinitely (live-lock). new_file = write_data_file(...) runs outside the CAS — the data file is written speculatively, and it's fine if multiple writers all write data files since each gets a unique UUID path. Why writing data files outside the CAS is correct: even if writer B's commit fails and it retries, writer B's already-written data file is just an orphan on disk. It is not in any manifest, so no reader sees it. A periodic cleanup job (Iceberg's remove_orphan_files, Delta's VACUUM) sweeps these. The cost of an orphan file is storage, not correctness.
If you replace cas_commit with a non-CAS "read then write" version (no BEGIN IMMEDIATE), the test will lose writes — both threads will read version 0, both will write version=1, and one of them silently overwrites the other. That's the Delta-on-S3 corruption story, simulated.
Where each format draws its conflict line
OCC says "if the table moved while you were preparing, retry". But "retry" is only correct if your planned commit can be replayed against the new state. The formats differ on what counts as "the new state has invalidated my plan":
- Iceberg. Two append-only commits to the same partition do not invalidate each other — they both add new files; nothing is deleted. The retry is trivial: just re-resolve the parent snapshot pointer and try CAS again. Two MERGE commits that both target the same data file do conflict; the loser must re-read the data file in the new state and re-plan. Iceberg is serialisable by default, with an opt-in snapshot-isolation mode that's slightly cheaper but allows write-skew.
- Delta. Same model. Two appends to the same partition are not a conflict; two MERGEs that touch overlapping files are. Delta's protocol explicitly enumerates the conflict types:
ConcurrentAppendException,ConcurrentDeleteReadException,ConcurrentDeleteDeleteException,MetadataChangedException. The matrix of "which of these can be safely retried, which must abort" is in the Delta protocol document. - Hudi. Hudi's MVCC partitions the table into file groups and routes each updated key to a specific file group. Two writers updating the same key conflict at the file-group level. Two writers updating disjoint keys may not conflict at all, even within the same partition. This makes Hudi friendlier for high-concurrency upsert workloads, at the cost of a more complex conflict-detector.
Aditi's payments table at her Bengaluru fintech has two writers: a streaming one that ingests every 30 s into today's partition, and a backfill that occasionally rewrites yesterday's partition for late-arriving Razorpay reconciliation events. On Iceberg, these never conflict because they touch different partitions. The same is true on Delta. On Hudi, Aditi has additionally configured the streaming writer's file-group hash to be disjoint from the backfill's, which gives her an extra layer of "they cannot collide at all" insurance.
What can go wrong
These are the patterns that show up at month 3 of running concurrent writers in production:
- The "Delta-on-S3 silent loss" failure. Two writers both successfully PUT
_delta_log/00000000000000000007.jsonbecause S3's PUT is last-writer-wins, not CAS. One commit's worth of data becomes invisible. This is the most common silent-data-loss failure in lakehouse production. The fix is to use DynamoDB-backed Delta (theDynamoDBLogStoreconfiguration) or move to Iceberg with a real catalog. Test by running two writers in parallel for 1000 commits each and checking that the final version count equals 2000; if it's less, you have the bug. - The "live-lock" failure. Without backoff jitter, two writers can collide on every retry. Each one re-reads, replays, retries, collides again. The fix is exponential backoff with random jitter — make sure your retry sleep is random, not deterministic.
- The "I retried but my plan was no longer valid" failure. A writer planned a MERGE that would update file F0. The conflicting writer's commit deleted F0. Naive retry tries to update a file that no longer exists. The format's conflict-detector is supposed to catch this, but custom commit code (especially in homemade Iceberg/Delta clients) often doesn't replay the plan against the new state, just re-runs the CAS. Use the official client.
- The "long-running writer starves" failure. A 2-hour MERGE keeps losing the CAS race to 30-second appends. Each retry costs 2 hours. The fix is partition-level locking via the catalog (Iceberg's
commit-num-retriesandcommit-min-retry-wait-ms) or splitting the MERGE into smaller chunks that each retry quickly. - The "S3 strong consistency, but not for listings" failure. S3's strong read-after-write is per-key, not for
LIST. A writer that resolves the table state by listing_delta_log/may see stale results immediately after a commit. Iceberg avoids this by reading the metadata pointer through the catalog (single-key strong read). Delta on S3 needs explicit consistency layering. - The "we made the catalog the bottleneck" failure. All commits go through one Glue catalog write. Glue's commit throughput tops out around 10 commits/sec per table. A table receiving 50 commits/sec from 50 streaming writers will queue. The fix is either to batch commits (one writer, multiple writers' files) or to partition the table so each writer commits to its own logical sub-table.
Common confusions
- "OCC is the same as no locking." OCC has locking — the CAS is a momentary lock, just very short. The difference from pessimistic locking is duration: OCC holds the lock for microseconds (the catalog CAS), pessimistic locking holds it for the duration of the write (potentially minutes). The bet OCC makes is that conflicts are rare; if they are, the cost of the occasional retry is much less than the cost of every writer queuing.
- "Snapshot isolation is the same as serialisable." They differ on write-skew. Snapshot isolation lets two transactions read overlapping data and write disjoint changes that, taken together, violate a constraint that neither transaction alone would have violated. Serialisable forbids this. For a lakehouse, the difference matters mainly for cross-row constraints (which most tables don't have); for append-only fact tables, snapshot isolation is fine.
- "Two appends to the same partition collide." They don't. The whole point of partitioned + append-only design is that two writers add new files; nothing is rewritten. The only way for two appends to collide is if you have a partition-level uniqueness constraint, which most lakehouse tables don't.
- "S3 PUT is atomic, so I don't need a catalog." S3 PUT is atomic for that key, but it has last-writer-wins semantics — there is no compare-and-swap. Two PUTs to the same key both succeed, and only one survives. CAS requires a system that can reject the second writer; S3 alone cannot. The S3 Express One Zone offering added conditional writes in late 2024, which closed this gap for the standard S3 API but only on that storage class.
- "Hudi solves concurrent writes; Iceberg and Delta don't." Hudi is more friendly to writers landing in disjoint file groups, but all three formats handle concurrent writers correctly when paired with a real CAS-capable backend. The real question is "does your storage backend offer CAS?" — not "which format did you pick?".
- "Retries are free." They cost CPU on the client and time. A writer that takes 2 minutes to write data and then keeps retrying for 10 minutes is wasting compute. Monitor
commit_retry_countand alarm when it exceeds 5; that's a sign of either too many concurrent writers or too coarse a partition layout.
Going deeper
Why DynamoDB is the canonical S3 commit coordinator
When Delta on S3 needs CAS, the standard fix is to put a DynamoDB table in front of the S3 commits. DynamoDB offers conditional writes (write-if-attribute-equals), which gives you the CAS primitive S3 lacks. The Delta DynamoDBLogStore writes the commit log entry to S3 first, then conditionally writes "the latest version is N" to DynamoDB. Two writers race the DynamoDB conditional write; the loser reads the new version, replays, retries. This is exactly the OCC protocol described above, with DynamoDB playing the role of the SQLite BEGIN IMMEDIATE in our toy example. Iceberg's REST catalog and Glue catalog play the same role for Iceberg tables.
How Iceberg's REST catalog separates "table state" from "table contents"
Iceberg's REST catalog spec defines a small API: GetTable, UpdateTable with a requirements precondition list. The requirements list is what implements OCC — a writer says "update the table to this metadata, but only if the current snapshot ID is X". The catalog server checks the precondition under its own transaction (PostgreSQL row lock, DynamoDB conditional write, etc.) and accepts or rejects. This pattern — pushing CAS into a small purpose-built service — is what makes Iceberg portable across cloud providers without re-implementing concurrency for each one.
The "watermark commit" optimisation for streaming writers
For streaming writers committing every 30 s, the OCC retry-on-conflict overhead can dominate. The optimisation is to assign each streaming writer its own committer thread and to rate-limit commits to the catalog's throughput. PhonePe's data platform team published a 2023 write-up (cited in references) describing how they reduced commit conflicts on a 12-writer Iceberg table from 30% retry rate to under 2% by serialising the catalog commits through a single coordinator and batching multiple writers' file lists per commit.
Multi-table transactions — the next frontier
OCC as described handles a single table. Real workloads often want "atomically commit two tables" — e.g., insert a payment row and update the merchant's running total. Iceberg's REST spec added a multi-table transaction extension in 2024; Delta's protocol added it via Unity Catalog. Hudi has not added this yet. None of the three formats supports cross-cluster cross-region multi-table transactions; for those, you still build above the table format using sagas or two-phase commit.
Where this leads next
The next chapter, /wiki/time-travel-and-zero-copy-clones-for-data-engineers, uses the snapshot-versioning machinery built up here to talk about reading old snapshots cheaply, and how zero-copy clones are implemented as metadata-only branch operations that share the underlying data files.
For the underlying mechanics of how table formats actually represent commits on disk, read /wiki/iceberg-delta-hudi-from-the-producers-perspective. For why all three formats independently arrived at "commit conflicts at file granularity, not row granularity", read /wiki/compaction-small-files-hell-and-how-to-avoid-it — the same file-granularity choice drives compaction policy.
References
- Apache Iceberg: Optimistic concurrency — the "Reliability and concurrency" section walks through the conflict-detection logic file by file.
- Delta Lake protocol — concurrency control — the canonical enumeration of conflict types and which can be retried.
- Apache Hudi: concurrency control — Hudi's MVCC and timeline-based conflict detection.
- Databricks: DynamoDBLogStore for Delta on S3 — why naked S3 cannot back Delta correctly and how DynamoDB plugs the CAS gap.
- Kraska et al. — Optimistic concurrency control by validation, 1981 — the foundational paper; the protocol has not changed in 40 years, only the storage substrates have.
- PhonePe Engineering: Streaming Iceberg writers at scale — production write-up from PhonePe's data platform team on commit-conflict reduction at 12-writer concurrency.
- Amazon S3 Express One Zone conditional writes — the late-2024 feature that finally added CAS-style writes to the S3 API on this storage class.
- /wiki/iceberg-delta-hudi-from-the-producers-perspective — the table-format chapter this one builds on.