The sink contract: what your downstream must guarantee
A Razorpay engineer ships a Flink pipeline that aggregates UPI transaction events into a merchant_daily_volume MySQL table. The pipeline uses a TwoPhaseCommitSinkFunction, checkpoints every 15 seconds, sets transactional.id correctly. At 2 a.m. on the 14th of the month, the on-call gets paged: the daily-volume number for merchant_id=acme-foods is ₹47 lakh higher than the sum of its transactions. Three days of debugging trace it to a single line in the sink's runbook: the team had switched the table's storage engine from InnoDB to MyISAM "for faster writes". MyISAM has no transactions. The 2PC protocol on the Flink side was a polite request the sink ignored.
Exactly-once is not a stream-processor feature; it is a contract the sink must honour. A correct sink provides four guarantees: it can begin an isolated transaction, it can make writes durable without making them visible (pre-commit), commit and abort are idempotent, and it can reject duplicate transaction ids. If any one guarantee is missing — or silently weakened by a config change — your end-to-end correctness collapses without a single error in the logs.
Why "exactly-once" is a property of two systems, not one
Read the marketing copy of any streaming engine and "exactly-once" sounds like a property of the engine. It is not. Exactly-once is a property of the engine plus the sink, jointly observed. The engine can pre-commit, commit, and abort all day; if the sink does not honour those calls, the records the engine thinks are in a coordinated transaction are actually visible the moment they were written.
A stream-processing pipeline writing to an external system has three places where duplication can creep in:
- Source-side replay. The processor recovers from a checkpoint and re-reads records from the source. (Source-side cooperation handles this — Kafka offsets in the checkpoint, JDBC cursors, file watermarks.)
- Processor-side double-emission. The processor emits a record, crashes after emit but before checkpoint acknowledgement, and on recovery re-emits. (The 2PC protocol handles this — emissions belong to a transaction that aborts on recovery.)
- Sink-side double-application. The 2PC
commit()runs twice (recovery), or two different transactions write the same row (replay). The sink's job is to make this a no-op.
(1) and (2) are solved by the engine. (3) is solved by the sink. The chapter you just read built the engine side. This chapter builds the sink side — the four guarantees your downstream must provide for the whole stack to compose into exactly-once.
The four-guarantee contract
A sink that participates in exactly-once must answer "yes" to four questions:
These four are necessary and sufficient. Necessary: removing any one breaks an end-to-end-exactly-once pipeline as the diagram shows. Sufficient: the TwoPhaseCommitSinkFunction protocol is built entirely on these four — beginTransaction exercises (1), preCommit exercises (2), commit/abort exercises (3), and transactional.id-style reservation exercises (4).
The brutal corollary: most real systems silently weaken one of the guarantees. MyISAM lost (1) when the team switched engines. S3's eventual consistency (pre-2020) weakened (1) at the read side. A naive HTTP webhook lacks all four. ClickHouse's Buffer engine relaxes (2) for throughput. Knowing which guarantee a sink can and cannot honour is the difference between exactly-once and "exactly-once on the slide deck".
Auditing a real sink against the contract
The cleanest way to internalise the contract is to audit a sink, mechanically, against each guarantee. Here is an auditor that interrogates a sink object's behaviour and returns the contract status — the same checks Razorpay's data-platform team runs in their pre-prod harness before letting a new sink into the exactly-once tier.
# sink_contract_audit.py — interrogate a sink for the four guarantees
import sqlite3, time, threading, uuid
from contextlib import contextmanager
class SinkContractAuditor:
"""Audits a sink object for the four exactly-once guarantees.
A 'sink' here exposes: begin_txn() -> handle, write(handle, row),
pre_commit(handle), commit(handle), abort(handle), reader_query()."""
def __init__(self, sink):
self.sink = sink
self.results = {}
def audit_isolation(self) -> tuple[bool, str]:
"""Guarantee 1: open transaction's writes invisible to readers."""
h = self.sink.begin_txn()
self.sink.write(h, {"id": "audit-iso-1", "v": 100})
# Pre-commit is allowed to make data durable; it must NOT make it visible.
self.sink.pre_commit(h)
rows_before_commit = self.sink.reader_query("audit-iso-1")
self.sink.commit(h)
rows_after_commit = self.sink.reader_query("audit-iso-1")
if rows_before_commit:
return False, f"dirty read: row visible during pre-commit ({rows_before_commit})"
if not rows_after_commit:
return False, "lost write: row not visible after commit"
return True, "pre-commit invisible, commit visible — isolation honoured"
def audit_prepare_commit_separation(self) -> tuple[bool, str]:
"""Guarantee 2: pre-commit makes durable; crash between pre-commit and commit
is recoverable without losing the transaction."""
h = self.sink.begin_txn()
self.sink.write(h, {"id": "audit-pcs-1", "v": 200})
self.sink.pre_commit(h)
# Simulate a process restart: reload the sink from durable state.
recovered = self.sink.reload_from_durable_state()
if h.txn_id not in recovered.list_pending_txns():
return False, "prepared txn lost on restart — no prepare-commit separation"
recovered.commit_by_id(h.txn_id)
if not recovered.reader_query("audit-pcs-1"):
return False, "row lost despite prepare being durable"
return True, "prepared txn survived restart and committed cleanly"
def audit_commit_idempotency(self) -> tuple[bool, str]:
"""Guarantee 3a: commit(h) called twice has same effect as once."""
h = self.sink.begin_txn()
self.sink.write(h, {"id": "audit-ci-1", "v": 300})
self.sink.pre_commit(h)
self.sink.commit(h)
try:
self.sink.commit(h) # second call: must not throw, must not duplicate
except Exception as e:
return False, f"second commit raised {type(e).__name__}: {e}"
rows = self.sink.reader_query("audit-ci-1")
if len(rows) != 1:
return False, f"second commit duplicated row (count={len(rows)})"
return True, "commit is idempotent"
def audit_duplicate_id_rejection(self) -> tuple[bool, str]:
"""Guarantee 4: two txns with the same id — second is rejected (or no-op)."""
h1 = self.sink.begin_txn(forced_id="zombie-001")
self.sink.write(h1, {"id": "audit-dup-1", "v": 400})
# A zombie processor restarts and tries to reuse the same id.
h2 = self.sink.begin_txn(forced_id="zombie-001")
try:
self.sink.write(h2, {"id": "audit-dup-1", "v": 999}) # different value
self.sink.pre_commit(h2)
self.sink.commit(h2)
except Exception:
pass # rejection via exception is acceptable
# Now the original processor (h1) commits. The sink should pick a winner deterministically.
try:
self.sink.pre_commit(h1)
self.sink.commit(h1)
except Exception:
pass
rows = self.sink.reader_query("audit-dup-1")
if len(rows) != 1:
return False, f"split-brain produced {len(rows)} rows for one logical write"
return True, f"duplicate-id rejection enforced (winner: v={rows[0]['v']})"
def run_all(self) -> dict:
for name, fn in [("isolation", self.audit_isolation),
("prepare_commit_separation", self.audit_prepare_commit_separation),
("commit_idempotency", self.audit_commit_idempotency),
("duplicate_id_rejection", self.audit_duplicate_id_rejection)]:
ok, msg = fn()
self.results[name] = {"pass": ok, "detail": msg}
return self.results
# A reference sink that honours all four guarantees (for shape only)
class CorrectSqliteSink:
def __init__(self, path):
self.conn = sqlite3.connect(path, isolation_level=None)
self.conn.execute("CREATE TABLE IF NOT EXISTS t (id TEXT PRIMARY KEY, v INT, txn_id TEXT)")
self.conn.execute("CREATE TABLE IF NOT EXISTS prepared (txn_id TEXT PRIMARY KEY, committed INT DEFAULT 0)")
self.conn.execute("CREATE TABLE IF NOT EXISTS reserved_ids (txn_id TEXT PRIMARY KEY)")
self.path = path
class _H:
def __init__(self, txn_id): self.txn_id, self.rows = txn_id, []
def begin_txn(self, forced_id=None):
tid = forced_id or f"t-{uuid.uuid4().hex[:8]}"
try:
self.conn.execute("INSERT INTO reserved_ids VALUES (?)", (tid,))
except sqlite3.IntegrityError:
raise RuntimeError(f"txn id {tid} already in use") # guarantee 4
return self._H(tid)
def write(self, h, row): h.rows.append(row)
def pre_commit(self, h):
self.conn.execute("BEGIN IMMEDIATE")
for r in h.rows:
self.conn.execute("INSERT OR REPLACE INTO t VALUES (?, ?, ?)", (r["id"], r["v"], h.txn_id))
self.conn.execute("INSERT OR REPLACE INTO prepared VALUES (?, 0)", (h.txn_id,))
# Pre-commit kept invisible: rows live in t but a reader filters on prepared.committed=1
self.conn.execute("COMMIT")
def commit(self, h):
row = self.conn.execute("SELECT committed FROM prepared WHERE txn_id=?", (h.txn_id,)).fetchone()
if row and row[0] == 1: return # idempotent
self.conn.execute("UPDATE prepared SET committed=1 WHERE txn_id=?", (h.txn_id,))
def abort(self, h):
self.conn.execute("DELETE FROM t WHERE txn_id=?", (h.txn_id,))
self.conn.execute("DELETE FROM prepared WHERE txn_id=?", (h.txn_id,))
def reader_query(self, row_id):
return [dict(id=r[0], v=r[1]) for r in self.conn.execute(
"SELECT t.id, t.v FROM t JOIN prepared p ON t.txn_id=p.txn_id "
"WHERE p.committed=1 AND t.id=?", (row_id,))]
def reload_from_durable_state(self):
return CorrectSqliteSink(self.path)
def list_pending_txns(self):
return [r[0] for r in self.conn.execute("SELECT txn_id FROM prepared WHERE committed=0")]
def commit_by_id(self, tid):
self.conn.execute("UPDATE prepared SET committed=1 WHERE txn_id=?", (tid,))
import os
if os.path.exists("/tmp/sink_audit.db"): os.remove("/tmp/sink_audit.db")
sink = CorrectSqliteSink("/tmp/sink_audit.db")
auditor = SinkContractAuditor(sink)
for name, result in auditor.run_all().items():
mark = "PASS" if result["pass"] else "FAIL"
print(f" [{mark}] {name}: {result['detail']}")
[PASS] isolation: pre-commit invisible, commit visible — isolation honoured
[PASS] prepare_commit_separation: prepared txn survived restart and committed cleanly
[PASS] commit_idempotency: commit is idempotent
[PASS] duplicate_id_rejection: duplicate-id rejection enforced (winner: v=400)
The load-bearing lines:
pre_commitwrites rows intotbut only flipsprepared.committed=1incommit. Why: this is the cheapest possible implementation of guarantee (1) — the row is durable int(so a crash recovers it), but thereader_queryfilterp.committed=1makes it invisible to readers until commit. In real systems this same trick is implemented by transaction logs, snapshot isolation, or LSO markers; the structural pattern is identical.begin_txnreserves the id atomically via thereserved_idsPRIMARY KEY constraint. Why: guarantee (4) needs a single linearisable point that says "this id is taken". A SQL PRIMARY KEY does this for free — the second insert fails withIntegrityError. Kafka achieves the same with the__transaction_statetopic and producer-id-fencing; an HTTP sink can use a RedisSETNXon the idempotency-key.commitshort-circuits ifprepared.committedis already 1. Why: idempotency is checked, not assumed. A naive sink that just runsUPDATE prepared SET committed=1would technically be idempotent for this row, but if commit also triggered downstream emissions (an HTTP call, a Kafka write), each duplicate commit would re-fire them. The pattern: every side-effect inside commit must be guarded by an "already done?" check.reload_from_durable_statereturns a fresh sink instance reading the same SQLite file. This simulates a process crash and restart — the prepared transactions survive inpreparedwithcommitted=0, and the recovered sink can finish them. The auditor'saudit_prepare_commit_separationtest exploits exactly this.- The
audit_duplicate_id_rejectiontest deliberately gives both txns the same id "zombie-001". A correctly-fenced sink rejects the second; a broken sink would commit both, leaving two rows for the same logical write. This is the same fencing that Kafka'sepochmechanism enforces — every change of leadership for atransactional.idbumps the epoch and invalidates the previous one.
The auditor runs in 50 ms against a SQLite-backed sink. Run it against your actual production sink (with a test schema) every CI build, and you catch the MyISAM-style regressions before they ship.
Real sinks against the four guarantees
Different real-world sinks honour the contract to different extents. Here is the mapping the Flipkart data-platform team maintains as part of their internal sink-tier registry:
The "manual (key naming)" cell for S3 deserves a sentence: S3 itself does not enforce duplicate-id rejection, but you can fake it by naming the final object after the transaction id (e.g., s3://warehouse/orders/_committed/<txn_id>.parquet). A second commit attempting to write the same key with IfNoneMatch: * (S3's conditional put, available since 2024) gets a 412 — the duplicate is rejected at the storage layer.
Common confusions
- "Exactly-once means duplicates never reach the sink." No — duplicates absolutely reach the sink. The contract is that the sink absorbs them: a duplicate
commit(t)is a no-op, a duplicatetransactional.idis rejected. Razorpay's pre-prod harness shows 3–5 duplicate commits per checkpoint failure during chaos testing; they all dedup correctly, and the auditor reports zero observable double-applies. - "InnoDB and MyISAM are interchangeable for OLAP-style writes." No — MyISAM has no transactions, no
XA PREPARE, noXA COMMIT. A pipeline that writes to MyISAM with a 2PC sink protocol on the engine side is silently running at-least-once. The query plan looks the same, the writes complete, but the contract is broken. Flipkart's catalogue team caught this exact regression in 2023 when an old "for performance" tweak surfaced during an exactly-once migration. - "Idempotency is something the application code does." Partly. Application-key idempotency (
INSERT ... ON DUPLICATE KEY UPDATE) handles re-emitted records; sink-protocol idempotency handles re-issuedcommit(t)calls. They are different layers. A correct exactly-once stack uses both — application-key idempotency for source-side replay, sink-protocol idempotency for processor-side recovery. - "S3 has no concept of a transaction so it can't be exactly-once." Wrong — S3's multipart upload is the transaction.
CreateMultipartUploadisbeginTransaction,UploadPartiswrite,CompleteMultipartUploadiscommit,AbortMultipartUploadisabort. Parts are not visible until completion. This satisfies guarantees (1)–(3); guarantee (4) needs the conditional-put trick or a Glue-catalog reservation table. - "If the engine guarantees exactly-once, the sink doesn't matter." This is the marketing-deck error. The engine's guarantee is conditional on the sink honouring the contract. A pipeline reading "exactly-once" in the Flink dashboard while writing to a MyISAM table is fooling itself — the dashboard reflects engine state, not sink state.
- "You can add exactly-once to a sink with a wrapper layer." Sometimes. For HTTP, an idempotency-key middleware can give you guarantees (3) and (4); but it cannot give you (1) or (2) — a partially-applied POST is visible immediately. The wrapper-layer pattern works only when the underlying sink already provides isolation, and you only need to add the recovery semantics on top. Otherwise you need a "staging then commit" architecture: write to S3 first (which honours the contract), then publish a manifest, then have the HTTP endpoint pull from the manifest.
Going deeper
What "isolation" means at the storage-engine level
Guarantee (1) — transactional isolation — is implemented differently in every storage engine, and the differences leak through. InnoDB uses MVCC with undo logs: an open transaction's writes go to the row but are tagged with the transaction's id; readers at READ COMMITTED skip uncommitted versions via the undo log. PostgreSQL uses MVCC with version chains directly in the heap pages. Kafka uses the LSO (last-stable-offset) — readers with isolation.level=read_committed don't read past the LSO, which only advances past a transaction's records when its commit marker lands. S3 uses object-key invisibility — uncompleted multipart uploads are not listable via ListObjects and not GETtable. The contract abstracts all of these, but the failure modes differ: InnoDB can deadlock, Kafka can stall the LSO if a transaction times out, S3 multipart uploads orphan and cost money. Auditing the contract is necessary; auditing the engine-specific failure modes is what runbooks are for.
Why duplicate-id rejection is "fencing", and why fencing is harder than it looks
Guarantee (4) is the same problem as distributed-system fencing — given that one process may be a zombie (network-partitioned, GC-paused, slow-poke), how does a second process safely take over? The answer is always the same: a monotonic epoch (or equivalent) plus an atomic compare-and-set at the durable boundary. Kafka's transactional.id carries a producer_epoch that the coordinator increments on every InitProducerId; old producers' commits get rejected with INVALID_PRODUCER_EPOCH. This is the same pattern as ZooKeeper's session epochs and Raft's term numbers. The hard part is that the fencing must happen at the durable boundary, not at the application layer — a fence in application code is bypassed by a zombie that lost the in-memory state but kept the network connection. Razorpay's engineering team learned this when a paused JVM held an open Kafka producer connection for 47 seconds, woke up, and tried to commit its old transaction; the fencing in __transaction_state rejected it on the broker side, even though the producer had no idea it had been replaced.
The "exactly-once at the boundary" generalisation
The four-guarantee contract is the streaming-sink instance of a more general principle: whenever two systems compose to provide a property, the property is the conjunction of what each provides. End-to-end exactly-once = source-side replay + engine-side coordination + sink-side contract. Drop any one and you don't get exactly-once; you get a weaker version. This generalises beyond streaming: an end-to-end financial-transaction system needs the originating bank, the network, and the receiving bank to all provide "no double-debit, no double-credit, idempotent retry". UPI's NPCI specification reads almost exactly like the four-guarantee contract — txn_id reservation, prepare/commit at the issuer, idempotent retries, settlement reversal as a known operation. The naming is different; the structure is identical.
Operating cost: the price of every guarantee
Each guarantee costs something. Guarantee (1) — isolation — costs lock contention (InnoDB row locks during XA PREPARE) or version-chain bloat (PostgreSQL bloat from long-running transactions). Guarantee (2) — prepare-commit separation — costs latency (records held for one checkpoint interval before being visible). Guarantee (3) — idempotent commit — costs storage for the dedup state (__transaction_state, prepared-txn tables, idempotency-key tables in Redis). Guarantee (4) — duplicate-id rejection — costs a synchronous coordination round-trip per transaction (the InitProducerId RPC for Kafka, the reservation insert for SQL). At Zerodha's order-stream scale (10 lakh orders/sec at peak during option expiry), the dedup state alone is ~80 GB of Redis memory. The right question for a production team is never "do we want exactly-once" but "are we willing to pay the four costs that buy it".
Where this leads next
The next chapters compose the contract into real workloads:
- /wiki/end-to-end-source-process-sink — how a Kafka source's offset commit is yoked to a 2PC sink's commit, so the four guarantees compose end-to-end.
- /wiki/wall-exactly-once-is-a-lie-everyone-tells — the philosophical limit. Even with all four guarantees met, "exactly-once" assumes a closed system; the moment a side-effect crosses to a non-cooperating sink, the contract weakens.
- /wiki/idempotency-everywhere-application-key-driven — the alternative when 2PC is overkill: at-least-once delivery + idempotent application keys, which is what most Indian fintech analytics pipelines actually run.
The pattern to internalise: when a vendor says "exactly-once", ask which of the four guarantees their sink provides — and which ones you must layer on. The vendor is selling the engine; the sink is your problem.
References
- Apache Kafka KIP-98: Exactly-Once Delivery and Transactional Messaging — the producer-fencing and transactional-id design that operationalises guarantee (4) at scale.
- Apache Flink: An Overview of End-to-End Exactly-Once Processing (Nowojski, 2018) — the article whose four-method skeleton this chapter audits.
- MySQL XA Transactions documentation — the XA prepare/commit/rollback semantics that satisfy guarantees (1)–(3) for InnoDB sinks.
- Stripe Engineering: Designing robust and predictable APIs with idempotency — the idempotency-key contract that makes naive HTTP webhooks honour guarantees (3) and (4).
- AWS S3 multipart upload overview — how
CreateMultipartUpload/UploadPart/CompleteMultipartUploadmap to the four-method skeleton. - /wiki/flinks-two-phase-commit-sink — the engine-side protocol the contract supports.
- /wiki/kafka-transactions-under-the-hood — the wire-level mechanism that makes Kafka satisfy all four guarantees.
- Pat Helland — Idempotence is not a medical condition (CIDR 2012) — the foundational essay on why idempotency is a system-design property, not a property of one method.