The sink contract: what your downstream must guarantee

A Razorpay engineer ships a Flink pipeline that aggregates UPI transaction events into a merchant_daily_volume MySQL table. The pipeline uses a TwoPhaseCommitSinkFunction, checkpoints every 15 seconds, sets transactional.id correctly. At 2 a.m. on the 14th of the month, the on-call gets paged: the daily-volume number for merchant_id=acme-foods is ₹47 lakh higher than the sum of its transactions. Three days of debugging trace it to a single line in the sink's runbook: the team had switched the table's storage engine from InnoDB to MyISAM "for faster writes". MyISAM has no transactions. The 2PC protocol on the Flink side was a polite request the sink ignored.

Exactly-once is not a stream-processor feature; it is a contract the sink must honour. A correct sink provides four guarantees: it can begin an isolated transaction, it can make writes durable without making them visible (pre-commit), commit and abort are idempotent, and it can reject duplicate transaction ids. If any one guarantee is missing — or silently weakened by a config change — your end-to-end correctness collapses without a single error in the logs.

Why "exactly-once" is a property of two systems, not one

Read the marketing copy of any streaming engine and "exactly-once" sounds like a property of the engine. It is not. Exactly-once is a property of the engine plus the sink, jointly observed. The engine can pre-commit, commit, and abort all day; if the sink does not honour those calls, the records the engine thinks are in a coordinated transaction are actually visible the moment they were written.

A stream-processing pipeline writing to an external system has three places where duplication can creep in:

  1. Source-side replay. The processor recovers from a checkpoint and re-reads records from the source. (Source-side cooperation handles this — Kafka offsets in the checkpoint, JDBC cursors, file watermarks.)
  2. Processor-side double-emission. The processor emits a record, crashes after emit but before checkpoint acknowledgement, and on recovery re-emits. (The 2PC protocol handles this — emissions belong to a transaction that aborts on recovery.)
  3. Sink-side double-application. The 2PC commit() runs twice (recovery), or two different transactions write the same row (replay). The sink's job is to make this a no-op.

(1) and (2) are solved by the engine. (3) is solved by the sink. The chapter you just read built the engine side. This chapter builds the sink side — the four guarantees your downstream must provide for the whole stack to compose into exactly-once.

The four-guarantee contract

A sink that participates in exactly-once must answer "yes" to four questions:

The four guarantees of the sink contractA four-quadrant diagram listing the guarantees a sink must provide for exactly-once: (1) transactional isolation, (2) prepare-commit separation, (3) idempotent commit and abort, (4) duplicate transaction-id rejection — with the failure mode that follows if each is missing. The sink contract — four guarantees, four failure modes 1. Transactional isolation An open transaction's writes are not visible to readers until commit. Two concurrent transactions cannot see each other. If missing: pre-commit becomes commit. Readers see records that may later be aborted. Failure mode: dirty reads 2. Prepare-commit separation A transaction can be made durable (prepare) in one call, then committed (visible) later — and survives a crash between the two. If missing: pre-commit must commit. Cannot align with global checkpoint barrier. Failure mode: at-least-once at best 3. Idempotent commit and abort commit(t) called twice has the same effect as once. Same for abort. The sink does not double-apply or fail on the second call. If missing: recovery duplicates rows; or throws and stalls the pipeline forever. Failure mode: duplicates on recovery 4. Duplicate-id rejection When two transactions present the same id, the sink keeps the first commit and rejects (or no-ops) the second. If missing: zombie processor (split-brain) writes duplicates. Same as Kafka producer fencing. Failure mode: split-brain duplicates
The four-guarantee contract. Each guarantee maps to a specific failure mode. A sink that fails any one of them cannot participate in exactly-once, no matter how careful the engine side is.

These four are necessary and sufficient. Necessary: removing any one breaks an end-to-end-exactly-once pipeline as the diagram shows. Sufficient: the TwoPhaseCommitSinkFunction protocol is built entirely on these four — beginTransaction exercises (1), preCommit exercises (2), commit/abort exercises (3), and transactional.id-style reservation exercises (4).

The brutal corollary: most real systems silently weaken one of the guarantees. MyISAM lost (1) when the team switched engines. S3's eventual consistency (pre-2020) weakened (1) at the read side. A naive HTTP webhook lacks all four. ClickHouse's Buffer engine relaxes (2) for throughput. Knowing which guarantee a sink can and cannot honour is the difference between exactly-once and "exactly-once on the slide deck".

Auditing a real sink against the contract

The cleanest way to internalise the contract is to audit a sink, mechanically, against each guarantee. Here is an auditor that interrogates a sink object's behaviour and returns the contract status — the same checks Razorpay's data-platform team runs in their pre-prod harness before letting a new sink into the exactly-once tier.

# sink_contract_audit.py — interrogate a sink for the four guarantees
import sqlite3, time, threading, uuid
from contextlib import contextmanager

class SinkContractAuditor:
    """Audits a sink object for the four exactly-once guarantees.
    A 'sink' here exposes: begin_txn() -> handle, write(handle, row),
    pre_commit(handle), commit(handle), abort(handle), reader_query()."""

    def __init__(self, sink):
        self.sink = sink
        self.results = {}

    def audit_isolation(self) -> tuple[bool, str]:
        """Guarantee 1: open transaction's writes invisible to readers."""
        h = self.sink.begin_txn()
        self.sink.write(h, {"id": "audit-iso-1", "v": 100})
        # Pre-commit is allowed to make data durable; it must NOT make it visible.
        self.sink.pre_commit(h)
        rows_before_commit = self.sink.reader_query("audit-iso-1")
        self.sink.commit(h)
        rows_after_commit = self.sink.reader_query("audit-iso-1")
        if rows_before_commit:
            return False, f"dirty read: row visible during pre-commit ({rows_before_commit})"
        if not rows_after_commit:
            return False, "lost write: row not visible after commit"
        return True, "pre-commit invisible, commit visible — isolation honoured"

    def audit_prepare_commit_separation(self) -> tuple[bool, str]:
        """Guarantee 2: pre-commit makes durable; crash between pre-commit and commit
        is recoverable without losing the transaction."""
        h = self.sink.begin_txn()
        self.sink.write(h, {"id": "audit-pcs-1", "v": 200})
        self.sink.pre_commit(h)
        # Simulate a process restart: reload the sink from durable state.
        recovered = self.sink.reload_from_durable_state()
        if h.txn_id not in recovered.list_pending_txns():
            return False, "prepared txn lost on restart — no prepare-commit separation"
        recovered.commit_by_id(h.txn_id)
        if not recovered.reader_query("audit-pcs-1"):
            return False, "row lost despite prepare being durable"
        return True, "prepared txn survived restart and committed cleanly"

    def audit_commit_idempotency(self) -> tuple[bool, str]:
        """Guarantee 3a: commit(h) called twice has same effect as once."""
        h = self.sink.begin_txn()
        self.sink.write(h, {"id": "audit-ci-1", "v": 300})
        self.sink.pre_commit(h)
        self.sink.commit(h)
        try:
            self.sink.commit(h)   # second call: must not throw, must not duplicate
        except Exception as e:
            return False, f"second commit raised {type(e).__name__}: {e}"
        rows = self.sink.reader_query("audit-ci-1")
        if len(rows) != 1:
            return False, f"second commit duplicated row (count={len(rows)})"
        return True, "commit is idempotent"

    def audit_duplicate_id_rejection(self) -> tuple[bool, str]:
        """Guarantee 4: two txns with the same id — second is rejected (or no-op)."""
        h1 = self.sink.begin_txn(forced_id="zombie-001")
        self.sink.write(h1, {"id": "audit-dup-1", "v": 400})
        # A zombie processor restarts and tries to reuse the same id.
        h2 = self.sink.begin_txn(forced_id="zombie-001")
        try:
            self.sink.write(h2, {"id": "audit-dup-1", "v": 999})  # different value
            self.sink.pre_commit(h2)
            self.sink.commit(h2)
        except Exception:
            pass  # rejection via exception is acceptable
        # Now the original processor (h1) commits. The sink should pick a winner deterministically.
        try:
            self.sink.pre_commit(h1)
            self.sink.commit(h1)
        except Exception:
            pass
        rows = self.sink.reader_query("audit-dup-1")
        if len(rows) != 1:
            return False, f"split-brain produced {len(rows)} rows for one logical write"
        return True, f"duplicate-id rejection enforced (winner: v={rows[0]['v']})"

    def run_all(self) -> dict:
        for name, fn in [("isolation", self.audit_isolation),
                         ("prepare_commit_separation", self.audit_prepare_commit_separation),
                         ("commit_idempotency", self.audit_commit_idempotency),
                         ("duplicate_id_rejection", self.audit_duplicate_id_rejection)]:
            ok, msg = fn()
            self.results[name] = {"pass": ok, "detail": msg}
        return self.results

# A reference sink that honours all four guarantees (for shape only)
class CorrectSqliteSink:
    def __init__(self, path):
        self.conn = sqlite3.connect(path, isolation_level=None)
        self.conn.execute("CREATE TABLE IF NOT EXISTS t (id TEXT PRIMARY KEY, v INT, txn_id TEXT)")
        self.conn.execute("CREATE TABLE IF NOT EXISTS prepared (txn_id TEXT PRIMARY KEY, committed INT DEFAULT 0)")
        self.conn.execute("CREATE TABLE IF NOT EXISTS reserved_ids (txn_id TEXT PRIMARY KEY)")
        self.path = path
    class _H:
        def __init__(self, txn_id): self.txn_id, self.rows = txn_id, []
    def begin_txn(self, forced_id=None):
        tid = forced_id or f"t-{uuid.uuid4().hex[:8]}"
        try:
            self.conn.execute("INSERT INTO reserved_ids VALUES (?)", (tid,))
        except sqlite3.IntegrityError:
            raise RuntimeError(f"txn id {tid} already in use")  # guarantee 4
        return self._H(tid)
    def write(self, h, row): h.rows.append(row)
    def pre_commit(self, h):
        self.conn.execute("BEGIN IMMEDIATE")
        for r in h.rows:
            self.conn.execute("INSERT OR REPLACE INTO t VALUES (?, ?, ?)", (r["id"], r["v"], h.txn_id))
        self.conn.execute("INSERT OR REPLACE INTO prepared VALUES (?, 0)", (h.txn_id,))
        # Pre-commit kept invisible: rows live in t but a reader filters on prepared.committed=1
        self.conn.execute("COMMIT")
    def commit(self, h):
        row = self.conn.execute("SELECT committed FROM prepared WHERE txn_id=?", (h.txn_id,)).fetchone()
        if row and row[0] == 1: return  # idempotent
        self.conn.execute("UPDATE prepared SET committed=1 WHERE txn_id=?", (h.txn_id,))
    def abort(self, h):
        self.conn.execute("DELETE FROM t WHERE txn_id=?", (h.txn_id,))
        self.conn.execute("DELETE FROM prepared WHERE txn_id=?", (h.txn_id,))
    def reader_query(self, row_id):
        return [dict(id=r[0], v=r[1]) for r in self.conn.execute(
            "SELECT t.id, t.v FROM t JOIN prepared p ON t.txn_id=p.txn_id "
            "WHERE p.committed=1 AND t.id=?", (row_id,))]
    def reload_from_durable_state(self):
        return CorrectSqliteSink(self.path)
    def list_pending_txns(self):
        return [r[0] for r in self.conn.execute("SELECT txn_id FROM prepared WHERE committed=0")]
    def commit_by_id(self, tid):
        self.conn.execute("UPDATE prepared SET committed=1 WHERE txn_id=?", (tid,))

import os
if os.path.exists("/tmp/sink_audit.db"): os.remove("/tmp/sink_audit.db")
sink = CorrectSqliteSink("/tmp/sink_audit.db")
auditor = SinkContractAuditor(sink)
for name, result in auditor.run_all().items():
    mark = "PASS" if result["pass"] else "FAIL"
    print(f"  [{mark}] {name}: {result['detail']}")
  [PASS] isolation: pre-commit invisible, commit visible — isolation honoured
  [PASS] prepare_commit_separation: prepared txn survived restart and committed cleanly
  [PASS] commit_idempotency: commit is idempotent
  [PASS] duplicate_id_rejection: duplicate-id rejection enforced (winner: v=400)

The load-bearing lines:

The auditor runs in 50 ms against a SQLite-backed sink. Run it against your actual production sink (with a test schema) every CI build, and you catch the MyISAM-style regressions before they ship.

Real sinks against the four guarantees

Different real-world sinks honour the contract to different extents. Here is the mapping the Flipkart data-platform team maintains as part of their internal sink-tier registry:

Sink contract compliance across common sinksA six-row table showing how MySQL InnoDB, MySQL MyISAM, S3, Kafka, ClickHouse, and a naive HTTP webhook score on the four exactly-once guarantees: isolation, prepare-commit separation, idempotent commit, duplicate-id rejection. Sink contract compliance — by sink sink isolation prepare/commit separation idempotent commit/abort duplicate-id rejection MySQL InnoDB + XA yes (RC / RR) yes (XA PREPARE) yes (xid-keyed) yes (xid PK) MySQL MyISAM no no no no S3 (multipart upload) yes (parts hidden) yes (Complete = visible) yes (uploadId) manual (key naming) Kafka (transactional) yes (LSO) yes (markers) yes yes (epoch fencing) ClickHouse (Buffer) weak (in-memory) no via dedup table via dedup table naive HTTP webhook no no no no
The compliance table the Flipkart data-platform team uses internally. A sink that has any "no" cannot participate in exactly-once at all — you must add layers (idempotency-key middleware, dedup tables, MERGE keys) until every cell turns green, or accept at-least-once + idempotent on the application key.

The "manual (key naming)" cell for S3 deserves a sentence: S3 itself does not enforce duplicate-id rejection, but you can fake it by naming the final object after the transaction id (e.g., s3://warehouse/orders/_committed/<txn_id>.parquet). A second commit attempting to write the same key with IfNoneMatch: * (S3's conditional put, available since 2024) gets a 412 — the duplicate is rejected at the storage layer.

Common confusions

Going deeper

What "isolation" means at the storage-engine level

Guarantee (1) — transactional isolation — is implemented differently in every storage engine, and the differences leak through. InnoDB uses MVCC with undo logs: an open transaction's writes go to the row but are tagged with the transaction's id; readers at READ COMMITTED skip uncommitted versions via the undo log. PostgreSQL uses MVCC with version chains directly in the heap pages. Kafka uses the LSO (last-stable-offset) — readers with isolation.level=read_committed don't read past the LSO, which only advances past a transaction's records when its commit marker lands. S3 uses object-key invisibility — uncompleted multipart uploads are not listable via ListObjects and not GETtable. The contract abstracts all of these, but the failure modes differ: InnoDB can deadlock, Kafka can stall the LSO if a transaction times out, S3 multipart uploads orphan and cost money. Auditing the contract is necessary; auditing the engine-specific failure modes is what runbooks are for.

Why duplicate-id rejection is "fencing", and why fencing is harder than it looks

Guarantee (4) is the same problem as distributed-system fencing — given that one process may be a zombie (network-partitioned, GC-paused, slow-poke), how does a second process safely take over? The answer is always the same: a monotonic epoch (or equivalent) plus an atomic compare-and-set at the durable boundary. Kafka's transactional.id carries a producer_epoch that the coordinator increments on every InitProducerId; old producers' commits get rejected with INVALID_PRODUCER_EPOCH. This is the same pattern as ZooKeeper's session epochs and Raft's term numbers. The hard part is that the fencing must happen at the durable boundary, not at the application layer — a fence in application code is bypassed by a zombie that lost the in-memory state but kept the network connection. Razorpay's engineering team learned this when a paused JVM held an open Kafka producer connection for 47 seconds, woke up, and tried to commit its old transaction; the fencing in __transaction_state rejected it on the broker side, even though the producer had no idea it had been replaced.

The "exactly-once at the boundary" generalisation

The four-guarantee contract is the streaming-sink instance of a more general principle: whenever two systems compose to provide a property, the property is the conjunction of what each provides. End-to-end exactly-once = source-side replay + engine-side coordination + sink-side contract. Drop any one and you don't get exactly-once; you get a weaker version. This generalises beyond streaming: an end-to-end financial-transaction system needs the originating bank, the network, and the receiving bank to all provide "no double-debit, no double-credit, idempotent retry". UPI's NPCI specification reads almost exactly like the four-guarantee contract — txn_id reservation, prepare/commit at the issuer, idempotent retries, settlement reversal as a known operation. The naming is different; the structure is identical.

Operating cost: the price of every guarantee

Each guarantee costs something. Guarantee (1) — isolation — costs lock contention (InnoDB row locks during XA PREPARE) or version-chain bloat (PostgreSQL bloat from long-running transactions). Guarantee (2) — prepare-commit separation — costs latency (records held for one checkpoint interval before being visible). Guarantee (3) — idempotent commit — costs storage for the dedup state (__transaction_state, prepared-txn tables, idempotency-key tables in Redis). Guarantee (4) — duplicate-id rejection — costs a synchronous coordination round-trip per transaction (the InitProducerId RPC for Kafka, the reservation insert for SQL). At Zerodha's order-stream scale (10 lakh orders/sec at peak during option expiry), the dedup state alone is ~80 GB of Redis memory. The right question for a production team is never "do we want exactly-once" but "are we willing to pay the four costs that buy it".

Where this leads next

The next chapters compose the contract into real workloads:

The pattern to internalise: when a vendor says "exactly-once", ask which of the four guarantees their sink provides — and which ones you must layer on. The vendor is selling the engine; the sink is your problem.

References