Concurrent writers without stepping on each other

At 02:30 a.m., Aditi's streaming job at a Bengaluru payments company flushes a 200 MB Parquet file into the payments table on S3. At 02:30:01, the nightly backfill job, kicked off by a different Airflow worker, also flushes its own file into the same partition. Both writers think they have committed. The morning dashboard, however, shows only one of the two batches — about ₹4 crore of transactions has silently vanished from analytics. The data is still on S3; the table just doesn't know about it.

Two writers committing to the same table at the same time will collide unless the table format has a protocol for resolving the race. All three modern lakehouse formats use optimistic concurrency — write your data files first, then attempt the metadata pointer flip; if you lose the race, replay your commit against the new state and try again. The trap is that the "atomic flip" must be backed by a real CAS-capable system; on naked S3 it is not, which is the most common cause of silent data loss in lakehouse production.

What "concurrent" actually means at the table layer

A lakehouse table is, mechanically, a folder of Parquet files plus a metadata pointer that says "the table at version N is this set of files". Two writers can safely produce data files in parallel — the filesystem hands out unique paths, and Parquet writers do not interfere with each other. The problem is the pointer flip. Both writers want to publish their commit and become version N+1. Only one of them can win; the other must retry against version N+1 to become version N+2.

So when we say "concurrent writers", we mean writers whose data-file-write phases overlap in wall-clock time, but whose commit phases — the metadata pointer flip — must be linearised. The whole game is in how that linearisation happens.

Two writers, overlapping data writes, serialised commitsA timeline shows writer A and writer B both writing data files in parallel between t=0 and t=20s. Their commit attempts overlap at t=22s. Only writer A's commit succeeds against version N. Writer B retries against version N+1 and succeeds as N+2. Concurrent writes, serialised commits A writer A: writing data files (parallel-safe) N+1 commit succeeds B writer B: writing data files (parallel-safe) retry N+2 commit succeeds after retry linearisation point (catalog pointer flip) A wins, B replays
The data-file-write phase can run fully in parallel. The commit phase is serialised through a single atomic primitive — usually a catalog CAS or a filesystem rename. Whoever loses the race re-reads the new state and replays.

Why the data-file phase is parallel-safe but the commit phase is not: filesystems give every file a unique path on creation, so two writers cannot accidentally overwrite each other's data files. But the metadata pointer is a single named cell ("the table is version N"), and only one process can transition it from N to N+1. The whole concurrency story collapses to "what is the atomic-CAS primitive backing that single cell?"

Optimistic concurrency, written out as a protocol

All three modern lakehouse formats — Iceberg, Delta, Hudi — use optimistic concurrency control (OCC). Pessimistic locking would mean every writer takes a lock before writing, which kills throughput on cloud object stores where lock services are expensive and writers are many. OCC instead bets that conflicts are rare and pays the retry cost only when they actually happen.

The protocol, written generically, is six steps:

  1. Read the current snapshot. Writer fetches "the table is version N, here are its data files".
  2. Compute the planned commit. Writer decides "I will add files F1, F2; I will delete file F0; I will record stats S".
  3. Write data files. Writer puts F1 and F2 to S3. These have unique paths and cannot conflict.
  4. Build a new metadata snapshot locally. Writer constructs the version-N+1 metadata: manifest entries for F1, F2; tombstones for F0; updated table-level stats.
  5. Attempt the atomic commit. Writer calls catalog CAS: "if the table is currently at version N, set it to version N+1 with this metadata path".
  6. On success, done. On conflict, retry. If the catalog says "no, the table is now at version N+1", writer re-reads the new state, replays its planned commit against version N+1 (this is where the format's conflict-detection logic lives), and tries CAS again as version N+2.
The OCC commit protocol, six stepsA flowchart showing the six steps of optimistic concurrency. The decision diamond after step 5 has two branches: success goes to "done"; conflict loops back to step 1 with a counter showing "retry++". OCC for table-format commits 1. Read snapshot N cheap (catalog read) 2. Plan commit add F1, F2; tombstone F0 3. Write data files parallel-safe (unique paths) 4. Build new metadata in-memory; not yet visible 5. CAS pointer N → N+1 the only atomic step success? done conflict-detect re-read N+1; replay plan yes no, retry
Steps 1–4 are speculative — none of them are visible to anyone else. Step 5 is the only step where the table state changes, and it must be a real compare-and-swap.

The protocol's correctness rests on one assumption: step 5 is a true CAS (compare-and-swap), not a "read then write". On a database-backed catalog (Glue, Nessie, JDBC), this is a single transaction. On a pure-filesystem table layout (Delta on local disk, Iceberg with a Hadoop-style catalog), this is a filesystem rename. On naked S3, neither is available — and that is where the failure modes live. Why "read then write" is not a substitute for CAS: between your read of "version is N" and your write of "version is N+1", another writer can read N, write N+1, and you will then overwrite their N+1 with your own N+1, silently losing their commit. The CAS atomic compares-and-sets in one indivisible step; the read-then-write does not. There is no amount of "be careful" code on the client that fixes this — the storage substrate must offer CAS or you have lost.

A worked Python implementation that you can run

Below is a complete OCC-based commit coordinator for a tiny lakehouse-style table. It uses SQLite as the catalog (because SQLite gives you real CAS through BEGIN IMMEDIATE), spawns two writers from threads, and shows that one writer wins, the other retries, and the final table state is consistent. Run it; the output is reproducible.

# occ_commit.py — two writers, optimistic concurrency, deterministic correctness.
import os, json, time, uuid, random, sqlite3
import threading
from concurrent.futures import ThreadPoolExecutor

ROOT = "/tmp/occ_lake"
DB = f"{ROOT}/_catalog.db"

def init_lake():
    os.makedirs(ROOT, exist_ok=True)
    con = sqlite3.connect(DB, isolation_level=None)
    con.execute("CREATE TABLE IF NOT EXISTS catalog "
                "(table_name TEXT PRIMARY KEY, version INT, manifest TEXT)")
    con.execute("INSERT OR IGNORE INTO catalog VALUES ('payments', 0, '[]')")
    con.close()

def read_snapshot():
    con = sqlite3.connect(DB, isolation_level=None)
    row = con.execute("SELECT version, manifest FROM catalog "
                      "WHERE table_name='payments'").fetchone()
    con.close()
    return row[0], json.loads(row[1])

def write_data_file(writer_id, batch):
    path = f"{ROOT}/{writer_id}_{uuid.uuid4().hex[:8]}.parquet"
    with open(path, "w") as f: json.dump(batch, f)
    return {"path": path, "rows": len(batch), "writer": writer_id}

def cas_commit(expected_version, new_manifest):
    """True CAS via SQLite BEGIN IMMEDIATE."""
    con = sqlite3.connect(DB, isolation_level=None)
    try:
        con.execute("BEGIN IMMEDIATE")
        cur = con.execute("SELECT version FROM catalog "
                          "WHERE table_name='payments'")
        actual = cur.fetchone()[0]
        if actual != expected_version:
            con.execute("ROLLBACK")
            return False
        con.execute("UPDATE catalog SET version=?, manifest=? "
                    "WHERE table_name='payments'",
                    (expected_version + 1, json.dumps(new_manifest)))
        con.execute("COMMIT")
        return True
    finally:
        con.close()

def writer(writer_id, batch):
    retries = 0
    while True:
        version, manifest = read_snapshot()         # step 1
        new_file = write_data_file(writer_id, batch) # step 3
        new_manifest = manifest + [new_file]         # step 4
        if cas_commit(version, new_manifest):        # step 5
            return version + 1, retries
        retries += 1
        time.sleep(random.uniform(0.001, 0.01))      # back-off

if __name__ == "__main__":
    init_lake()
    batches = [[{"txn": f"T{i}-A", "amount": 100} for i in range(50)],
               [{"txn": f"T{i}-B", "amount": 200} for i in range(50)]]
    with ThreadPoolExecutor(max_workers=2) as ex:
        f1 = ex.submit(writer, "A", batches[0])
        f2 = ex.submit(writer, "B", batches[1])
        r1, r2 = f1.result(), f2.result()
    print(f"writer A landed at v{r1[0]} after {r1[1]} retries")
    print(f"writer B landed at v{r2[0]} after {r2[1]} retries")
    final_v, final_m = read_snapshot()
    print(f"final table version: {final_v}; files: {len(final_m)}")
# Sample run:
writer A landed at v1 after 0 retries
writer B landed at v2 after 1 retries
final table version: 2; files: 2

The lines that matter: con.execute("BEGIN IMMEDIATE") is the keystone. SQLite's BEGIN IMMEDIATE acquires a reserved lock immediately, so the subsequent SELECT + UPDATE is a real CAS. if actual != expected_version: con.execute("ROLLBACK"); return False is the conflict-detect — if the version moved while we were preparing our commit, we abort and let the writer retry. time.sleep(random.uniform(0.001, 0.01)) is exponential-backoff in spirit; without jitter, two retrying writers can keep colliding indefinitely (live-lock). new_file = write_data_file(...) runs outside the CAS — the data file is written speculatively, and it's fine if multiple writers all write data files since each gets a unique UUID path. Why writing data files outside the CAS is correct: even if writer B's commit fails and it retries, writer B's already-written data file is just an orphan on disk. It is not in any manifest, so no reader sees it. A periodic cleanup job (Iceberg's remove_orphan_files, Delta's VACUUM) sweeps these. The cost of an orphan file is storage, not correctness.

If you replace cas_commit with a non-CAS "read then write" version (no BEGIN IMMEDIATE), the test will lose writes — both threads will read version 0, both will write version=1, and one of them silently overwrites the other. That's the Delta-on-S3 corruption story, simulated.

Where each format draws its conflict line

OCC says "if the table moved while you were preparing, retry". But "retry" is only correct if your planned commit can be replayed against the new state. The formats differ on what counts as "the new state has invalidated my plan":

Aditi's payments table at her Bengaluru fintech has two writers: a streaming one that ingests every 30 s into today's partition, and a backfill that occasionally rewrites yesterday's partition for late-arriving Razorpay reconciliation events. On Iceberg, these never conflict because they touch different partitions. The same is true on Delta. On Hudi, Aditi has additionally configured the streaming writer's file-group hash to be disjoint from the backfill's, which gives her an extra layer of "they cannot collide at all" insurance.

What can go wrong

These are the patterns that show up at month 3 of running concurrent writers in production:

Common confusions

Going deeper

Why DynamoDB is the canonical S3 commit coordinator

When Delta on S3 needs CAS, the standard fix is to put a DynamoDB table in front of the S3 commits. DynamoDB offers conditional writes (write-if-attribute-equals), which gives you the CAS primitive S3 lacks. The Delta DynamoDBLogStore writes the commit log entry to S3 first, then conditionally writes "the latest version is N" to DynamoDB. Two writers race the DynamoDB conditional write; the loser reads the new version, replays, retries. This is exactly the OCC protocol described above, with DynamoDB playing the role of the SQLite BEGIN IMMEDIATE in our toy example. Iceberg's REST catalog and Glue catalog play the same role for Iceberg tables.

How Iceberg's REST catalog separates "table state" from "table contents"

Iceberg's REST catalog spec defines a small API: GetTable, UpdateTable with a requirements precondition list. The requirements list is what implements OCC — a writer says "update the table to this metadata, but only if the current snapshot ID is X". The catalog server checks the precondition under its own transaction (PostgreSQL row lock, DynamoDB conditional write, etc.) and accepts or rejects. This pattern — pushing CAS into a small purpose-built service — is what makes Iceberg portable across cloud providers without re-implementing concurrency for each one.

The "watermark commit" optimisation for streaming writers

For streaming writers committing every 30 s, the OCC retry-on-conflict overhead can dominate. The optimisation is to assign each streaming writer its own committer thread and to rate-limit commits to the catalog's throughput. PhonePe's data platform team published a 2023 write-up (cited in references) describing how they reduced commit conflicts on a 12-writer Iceberg table from 30% retry rate to under 2% by serialising the catalog commits through a single coordinator and batching multiple writers' file lists per commit.

Multi-table transactions — the next frontier

OCC as described handles a single table. Real workloads often want "atomically commit two tables" — e.g., insert a payment row and update the merchant's running total. Iceberg's REST spec added a multi-table transaction extension in 2024; Delta's protocol added it via Unity Catalog. Hudi has not added this yet. None of the three formats supports cross-cluster cross-region multi-table transactions; for those, you still build above the table format using sagas or two-phase commit.

Where this leads next

The next chapter, /wiki/time-travel-and-zero-copy-clones-for-data-engineers, uses the snapshot-versioning machinery built up here to talk about reading old snapshots cheaply, and how zero-copy clones are implemented as metadata-only branch operations that share the underlying data files.

For the underlying mechanics of how table formats actually represent commits on disk, read /wiki/iceberg-delta-hudi-from-the-producers-perspective. For why all three formats independently arrived at "commit conflicts at file granularity, not row granularity", read /wiki/compaction-small-files-hell-and-how-to-avoid-it — the same file-granularity choice drives compaction policy.

References