Flink's two-phase-commit sink
A Swiggy data engineer is mirroring an order-status stream from Kafka into a MySQL orders_live table that the customer-support dashboard queries. The Kafka side is exactly-once — transactional.id and WriteTxnMarkers hold the line. The MySQL side is not. A checkpoint failure between "rows inserted" and "Kafka offset committed" replays 12 minutes of orders and the dashboard shows duplicates. The fix is not to patch the sink — it is to wrap the sink in Flink's TwoPhaseCommitSinkFunction, which is a generic protocol for making any external system participate in Flink's checkpoint as a transactional sink.
A Flink job's checkpoint is a global cut. To make an external sink (MySQL, S3, HTTP) exactly-once, the sink must implement four methods — beginTransaction, preCommit, commit, abort — that map onto Flink's checkpoint barriers. Pre-commit happens when the barrier passes the sink; the actual commit happens when the JobManager confirms the global checkpoint succeeded. Crash between pre-commit and commit, and recovery replays the commit; crash before pre-commit and recovery replays the whole transaction.
The problem: Kafka's protocol does not generalise
Kafka's transactional protocol (InitProducerId, AddPartitionsToTxn, EndTxn, WriteTxnMarkers) works because Kafka owns both ends of the writer. The coordinator persists __transaction_state, the partition leaders honour the markers, and the consumer-side LSO hides uncommitted records. None of that is portable. MySQL has its own transaction system. S3 has none — every PutObject is immediately visible. An HTTP webhook has neither transactions nor read-your-writes.
So the question is: what is the minimum interface an external system must expose for a Flink job to write to it exactly-once? The answer Flink settled on (TwoPhaseCommitSinkFunction, introduced in 1.4) is four methods:
beginTransaction() → returns a handle to a fresh, isolated transaction
preCommit(handle) → flush all data; the transaction is now durable but
NOT yet visible to readers
commit(handle) → make the transaction visible to readers (idempotent)
abort(handle) → discard the transaction's data (idempotent)
If your external system can implement these four methods — even with creative interpretations of "isolated" and "visible" — Flink can plug it into the checkpoint machinery and give you exactly-once writes.
How a checkpoint barrier drives the sink
Flink's chandy-lamport-style checkpoint algorithm is covered in /wiki/checkpointing-the-consistent-snapshot-algorithm. The relevant bit for sinks: the JobManager periodically injects a barrier into every source. Each operator, upon seeing the barrier on all its input channels, snapshots its state and forwards the barrier downstream. When the barrier reaches every sink, the global checkpoint completes — the JobManager writes a checkpoint metadata record and notifies every operator via notifyCheckpointComplete(checkpointId).
A TwoPhaseCommitSinkFunction hooks into exactly two of those events: barrier arrives at sink (call preCommit) and notifyCheckpointComplete (call commit). That is the entire mapping.
The lifecycle of one transaction, end to end:
beginTransaction()— called when the sink starts processing data after the previous checkpoint. Allocates a new transaction handle (e.g., a MySQLSTART TRANSACTIONand the resulting connection, or an S3 multipart upload id, or a unique idempotency key for an HTTP endpoint).- Records flow in. The sink writes them through the open transaction handle. In MySQL, that means
INSERTstatements on the open connection. In S3,UploadPartcalls. In HTTP, buffered into a local file keyed by the handle. - Barrier arrives at the sink. Flink calls
snapshotState(checkpointId), which internally callspreCommit(handle). For MySQL, this is notCOMMIT— it is a flush + aXA PREPARE-equivalent (or storing the connection handle in operator state without committing). The transaction is durable but not visible. - Sink rolls forward to a fresh transaction. It calls
beginTransaction()again so subsequent records belong to checkpoint N+1, not N. - JobManager's
notifyCheckpointComplete(N). All operators acknowledged checkpoint N, the metadata file is written. Now the sink callscommit(handle for N)— for MySQL, this is the actualCOMMIT. For S3,CompleteMultipartUpload. For HTTP, the buffered file gets POSTed.
Crash recovery is the cleanest part of the protocol. On restart, Flink restores operator state from the latest checkpoint; the sink's state contains the handle of any pending transaction. The sink calls commit(restored_handle) to redo the commit (idempotent — the transaction may already be committed, in which case the call is a no-op) and any older pending transactions are abort()ed.
A working TwoPhaseCommitSinkFunction for MySQL
The cleanest way to internalise the lifecycle is to write the four methods. This is a faithful translation of the Java interface to Python — the actual Flink class is in org.apache.flink.streaming.api.functions.sink, but the logic is the same.
# flink_2pc_sink.py — a TwoPhaseCommitSinkFunction modelled in Python
import sqlite3, uuid, time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class MySQLTxnHandle:
txn_id: str
rows: list = field(default_factory=list)
prepared_at: Optional[float] = None
committed: bool = False
class TwoPhaseCommitMySQLSink:
"""Mirrors Flink's TwoPhaseCommitSinkFunction lifecycle. Persists pending
transactions to the operator's state so recovery can replay commits."""
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self.conn.execute("CREATE TABLE IF NOT EXISTS orders_live (id TEXT PRIMARY KEY, status TEXT, ts REAL)")
self.conn.execute("CREATE TABLE IF NOT EXISTS _txn_log (txn_id TEXT PRIMARY KEY, prepared_at REAL)")
self.conn.commit()
self.current: Optional[MySQLTxnHandle] = None
self.pending: dict[str, MySQLTxnHandle] = {} # checkpoint_id → handle (operator state)
def begin_transaction(self) -> MySQLTxnHandle:
h = MySQLTxnHandle(txn_id=f"flink-txn-{uuid.uuid4().hex[:8]}")
self.current = h
return h
def invoke(self, row: dict):
# Stage in memory; do NOT write to MySQL yet — the MySQL transaction is implicit
# and we want it to span everything until the next barrier.
self.current.rows.append(row)
def pre_commit(self, h: MySQLTxnHandle):
# Flush rows under a single MySQL transaction; record the txn_id in _txn_log.
# Crucially, we INSERT but do NOT COMMIT — the connection holds the row locks.
# In real Flink+MySQL: use XA START / XA END / XA PREPARE.
self.conn.execute("BEGIN")
for r in h.rows:
self.conn.execute("INSERT OR REPLACE INTO orders_live VALUES (?, ?, ?)",
(r["id"], r["status"], r["ts"]))
self.conn.execute("INSERT OR REPLACE INTO _txn_log VALUES (?, ?)", (h.txn_id, time.time()))
# In sqlite we have to commit here (no XA); in real MySQL this would be XA PREPARE.
# The semantic equivalent is: rows are durable, txn_id is logged, but a follower
# query running with READ COMMITTED on a snapshot taken before commit() would not see them.
self.conn.commit()
h.prepared_at = time.time()
def commit(self, h: MySQLTxnHandle):
# Idempotent — recovery may call this for a txn that already ran.
if h.committed:
return
# In real MySQL+XA: XA COMMIT 'txn_id'. Here, we simply mark the row visible
# by writing a "committed" marker (or in real systems, the XA COMMIT phase).
# For this simulator the rows are already in orders_live; we just record completion.
h.committed = True
# Garbage-collect the txn_log entry — tells future recoveries this txn is done.
self.conn.execute("DELETE FROM _txn_log WHERE txn_id=?", (h.txn_id,))
self.conn.commit()
def abort(self, h: MySQLTxnHandle):
# Idempotent. Rolls back if the prepare wrote rows; here we DELETE them.
for r in h.rows:
self.conn.execute("DELETE FROM orders_live WHERE id=?", (r["id"],))
self.conn.execute("DELETE FROM _txn_log WHERE txn_id=?", (h.txn_id,))
self.conn.commit()
def snapshot_state(self, checkpoint_id: int):
"""Called by Flink when the barrier arrives. preCommit + roll forward."""
self.pre_commit(self.current)
self.pending[checkpoint_id] = self.current
self.current = self.begin_transaction()
def notify_checkpoint_complete(self, checkpoint_id: int):
"""Called by Flink when the JM confirms the global checkpoint succeeded."""
h = self.pending.pop(checkpoint_id, None)
if h is not None:
self.commit(h)
def restore_from_state(self, restored_pending: dict):
"""Called on recovery. Replay commits for every checkpoint that the JM
confirmed completed; abort the rest."""
for cid, h in restored_pending.items():
self.commit(h) # idempotent — finishes any prepared-but-uncommitted txn
self.current = self.begin_transaction()
# Demo: two checkpoints, one crash between them
sink = TwoPhaseCommitMySQLSink("orders.db")
sink.begin_transaction()
# Checkpoint 1: 3 records
for r in [{"id":"o-101","status":"PLACED","ts":1.0},
{"id":"o-102","status":"PICKED","ts":1.5},
{"id":"o-103","status":"DELIVERED","ts":2.0}]:
sink.invoke(r)
sink.snapshot_state(checkpoint_id=1)
sink.notify_checkpoint_complete(1)
# Checkpoint 2: 2 records, then a crash before notifyCheckpointComplete
for r in [{"id":"o-104","status":"PLACED","ts":3.0},
{"id":"o-105","status":"PICKED","ts":3.5}]:
sink.invoke(r)
sink.snapshot_state(checkpoint_id=2)
print("[crash before notifyCheckpointComplete(2)]")
# Recovery: restore from state, replay commit
recovered_pending = dict(sink.pending) # operator state was persisted with checkpoint 2
sink2 = TwoPhaseCommitMySQLSink("orders.db")
sink2.restore_from_state(recovered_pending)
# Inspect
rows = list(sink2.conn.execute("SELECT id, status FROM orders_live ORDER BY id"))
print("orders_live after recovery:", rows)
pending = list(sink2.conn.execute("SELECT txn_id FROM _txn_log"))
print("_txn_log (should be empty if recovery completed):", pending)
[crash before notifyCheckpointComplete(2)]
orders_live after recovery: [('o-101', 'PLACED'), ('o-102', 'PICKED'), ('o-103', 'DELIVERED'), ('o-104', 'PLACED'), ('o-105', 'PICKED')]
_txn_log (should be empty if recovery completed): []
Walking through the load-bearing lines:
snapshot_stateis the barrier handler. It callspre_commit(flushing rows + writing_txn_log) and stores the handle inself.pendingkeyed bycheckpoint_id. Why store handles in operator state: when Flink takes a checkpoint, every operator's state is serialised — includingself.pending. On recovery, that map is restored andrestore_from_statewalks it to finish any commits that the original sink did not get to before the crash.commitis idempotent. It checksh.committedand short-circuits if already done. Why idempotency is non-negotiable here: recovery may replaycommitfor a transaction whose originalcommitsucceeded but whosenotifyCheckpointCompleteack was lost between sink and JobManager. Without idempotency, the secondcommitcall would either fail or duplicate rows. TheXA COMMITfamily is idempotent on the MySQL side, but yourcommitmethod must also handle "already done" gracefully.pre_commitwrites to_txn_log. This is the durable evidence that the transaction made it to the prepared state. If the sink crashes afterpre_commitbut beforecommit, recovery sees the_txn_logrow and either re-commits (if Flink restored the handle) or aborts (if the handle is gone — but with Flink's checkpointing, the handle is always restored). Why two pieces of durable state (rows + txn_log) instead of one: the rows alone do not tell a recovering sink which transaction id to commit; the txn_log alone does not tell it which rows belonged to that transaction. Together they form a self-describing prepared-transaction record, the same role__transaction_stateplays for Kafka. Either piece without the other is undecidable on recovery.restore_from_statereplays commits. On startup, Flink calls this with the deserialisedpendingmap from the latest checkpoint. The sink walks the map, callscommiton each handle (idempotent), and is back in business.abortis the safety net for the source-side equivalent. If a checkpoint fails — say the JobManager loses quorum and aborts checkpoint N — the sink must abort the prepared transaction for N. In real Flink, this is handled by the framework callingabortduring recovery for any handles that don't appear in the recovered checkpoint metadata.
The 90-line model misses two pieces of real Flink-MySQL: actual XA START/XA PREPARE/XA COMMIT calls (sqlite has no XA), and the timeout-based abort that production deployments add to clean up orphaned prepared transactions. Add those, and you have a faithful sink. The four-method skeleton is unchanged.
Where the protocol meets reality: per-sink mappings
The four methods are the same for every external system, but what each method means differs sharply:
The HTTP row is the hardest. The endpoint must accept an Idempotency-Key header and return the same response for the same key (the idempotency-key pattern that Stripe popularised — and that PhonePe's payment-status webhook now uses for cross-bank settlement). Without that, no amount of Flink-side machinery can give you exactly-once delivery to the endpoint.
Common confusions
- "Pre-commit means the data is visible." No — pre-commit means the data is durable. In MySQL XA, that is between
XA PREPAREandXA COMMIT; the rows are written but aREAD COMMITTEDquery started before commit will not see them. In S3 multipart, parts are uploaded but the object is not yet listed. In Kafka, records are appended to the partition log but the LSO has not advanced. Visibility happens only atcommit. - "
abortis for application errors." No —abortis called by the framework when a checkpoint fails (JobManager loses quorum, an upstream operator's snapshot fails, network partition between TaskManager and JM). Application-level errors (aSQLException, an HTTP 500) cause Flink to fail the task and restart from the previous checkpoint, which then triggersabortfor the current transaction during recovery. You don't callabortfrom your sink logic. - "Idempotent commit is automatic in XA / Kafka." Partly.
XA COMMIT 'xid'is idempotent on the database side — the second call returns success without re-applying. Kafka'scommitTransactionafterresumeTransactionlikewise. But yourcommitmethod is also called for handles whose underlying transaction may have already been garbage-collected by the external system (XA prepared transaction expired, S3 multipart upload aborted by a lifecycle rule). Defensive idempotency = catch "transaction not found" errors and treat them as success. - "You need 2PC for at-least-once sinks." No — at-least-once needs only the sink to be idempotent on the application key. 2PC is specifically what gets you from at-least-once to exactly-once-effective: Flink replays records on recovery, but the 2PC sink ensures replays don't materialise duplicate transactions on the external side. Razorpay's analytics path has both — at-least-once for events tagged with a payment-event id (idempotent on the warehouse), and 2PC for the financial-summary aggregation that writes to MySQL.
- "Flink's exactly-once and Kafka's exactly-once are the same thing." Related but distinct. Kafka's protocol is exactly-once within Kafka — Kafka producer to Kafka consumer through the broker. Flink's 2PC sink is exactly-once across Kafka (or any source) and an arbitrary external sink. The Kafka-to-Kafka case in Flink uses the same
TwoPhaseCommitSinkFunctionskeleton, but the implementation calls Kafka's transactional API instead of MySQL XA — the two systems compose; they don't compete. - "Pre-commit duration doesn't matter." It absolutely does. Between pre-commit and commit, the sink's transaction holds locks on the external system. A long checkpoint completion means a long-held lock — for MySQL XA, that means row locks held for the duration. Swiggy's order-status pipeline once had a 90-second checkpoint interval and watched MySQL deadlock alerts climb; dropping to 15-second checkpoints (and accepting more frequent commits) cleared the deadlocks.
Going deeper
Why end-to-end exactly-once needs both source-side and sink-side cooperation
Flink's TwoPhaseCommitSinkFunction solves the sink-side replay problem. The source side has its own problem: when Flink replays from a checkpoint, the source must replay the same records — otherwise "exactly once" becomes "exactly some". For Kafka sources, this works because Flink stores the consumer offsets in the checkpoint, not in __consumer_offsets, and on recovery seeks to the checkpointed offsets before reading. For a JDBC source, Flink stores the cursor (e.g., last updated_at watermark) in checkpoint state. The 2PC sink protocol assumes the source replays correctly; if your source is non-replayable (a UDP feed, a flaky webhook), no amount of sink-side 2PC saves you.
Why the "two-generals problem" is not actually a problem here
A reader who has seen the /wiki/transactional-writes-2pc-wearing-a-hat chapter knows that classical 2PC has a coordinator-failure window: between "coordinator decides commit" and "all participants get the commit message", the coordinator can crash and leave participants blocked. Flink dodges this because the JobManager's checkpoint metadata is the durable commit decision — it is written to a fault-tolerant store (HDFS, S3, ZooKeeper) before notifyCheckpointComplete fires. If the JM crashes after writing metadata but before notifying sinks, recovery reads the metadata and replays the notification. The participants are never blocked indefinitely; the worst case is a delayed commit, not an undecided one.
XA prepared transactions and the production hazard of "in-doubt"
MySQL's XA PREPARE puts a transaction into the in-doubt state. Production hazard: if the Flink job is killed and never restarts, the prepared transaction sits in MySQL forever, holding locks. XA RECOVER lists in-doubt transactions; on the operator side, you must either restart Flink (which will replay the commit) or manually XA ROLLBACK the orphans. Production deployments at companies like Cred run a cron'd cleanup that lists in-doubt transactions older than 2 × checkpoint_interval and aborts them. Without this, a single orphaned XA transaction can stall the entire MySQL replica through lock waits.
Kafka-as-sink: why Flink's KafkaSink chose read_committed over a custom protocol
The original Flink-Kafka 2PC sink used a transactional.id per parallel task. On recovery, Flink would resume the open transaction by transactional.id and commit it. This worked but required reading transactional.id.expiration.ms carefully — a long downtime would expire the transactional id and lose the commit. Modern KafkaSink (Flink 1.14+) uses the same skeleton but adds transactional.id.prefix so each attempt allocates a fresh id, sidestepping expiration. The downstream Kafka consumer must use isolation.level=read_committed to honour the marker — without that, the consumer reads pre-commit records as if they were committed, defeating the protocol.
When 2PC is overkill: the at-least-once-plus-idempotent alternative
If your sink table has a natural key (order_id, payment_event_id), at-least-once + INSERT ON CONFLICT DO UPDATE (Postgres) or INSERT IGNORE (MySQL) is simpler, cheaper, and just as correct. 2PC adds latency (records held for one checkpoint interval before being visible) and operational complexity (XA, in-doubt cleanup). Use 2PC when the sink does not have a natural key, when you need transactional aggregates (sum, count) that idempotency cannot give you, or when a downstream consumer reads with snapshot isolation and would see partial state. For most Indian fintech analytics workloads — Razorpay's payment-events warehouse, PhonePe's transaction logs — at-least-once + idempotent merge is the right answer; 2PC is reserved for the financial-close path where partial visibility is unacceptable.
Where this leads next
The two-phase-commit sink is the reusable skeleton. The next chapters compose it with the source-side protocol and walk through real workloads:
- /wiki/end-to-end-source-process-sink — how Kafka source + Flink + 2PC sink form a closed exactly-once loop, with
sendOffsetsToTransactionlinking source offsets to sink commits. - /wiki/wall-exactly-once-is-a-lie-everyone-tells — the philosophical limit. Even with 2PC sinks, "exactly-once" is "exactly-once-effective on a closed system"; the moment you cross a network or a non-transactional sink, the contract weakens.
- /wiki/state-stores-why-rocksdb-is-in-every-streaming-engine — the local state that Flink checkpoints. The 2PC sink's
pendingmap is part of that state.
The pattern to internalise: every external system has its own commit primitive, but Flink's checkpoint barriers can be projected onto any of them through the same four methods. If you can answer "what does preCommit mean for my sink" and "is commit idempotent" — you can wire any sink into Flink's exactly-once contract.
References
- An Overview of End-to-End Exactly-Once Processing in Apache Flink (Piotr Nowojski, dataArtisans 2018) — the canonical introduction, with the original sequence diagrams that this chapter extends.
- Apache Flink source:
TwoPhaseCommitSinkFunction.java— the actual base class. ~400 lines, readable in one sitting. - FLIP-1: Fine Grained Recovery from Task Failures — the proposal that motivated splitting checkpoint completion into pre-commit and commit phases.
- MySQL XA Transactions documentation — the
XA START/XA PREPARE/XA COMMITreference; pay attention to the in-doubt section. - Stripe Engineering: Designing robust and predictable APIs with idempotency — the idempotency-key pattern that HTTP sinks must implement to participate in 2PC.
- /wiki/kafka-transactions-under-the-hood — the wire-level Kafka protocol; the Kafka-as-sink case of
TwoPhaseCommitSinkFunctioncalls into exactly this protocol. - /wiki/checkpointing-the-consistent-snapshot-algorithm — the chandy-lamport algorithm and how barriers thread through the dataflow graph; the foundation 2PC sinks build on.
- /wiki/transactional-writes-2pc-wearing-a-hat — the conceptual 2PC view; this chapter is the streaming-sink instantiation of that protocol.