Flink's two-phase-commit sink

A Swiggy data engineer is mirroring an order-status stream from Kafka into a MySQL orders_live table that the customer-support dashboard queries. The Kafka side is exactly-once — transactional.id and WriteTxnMarkers hold the line. The MySQL side is not. A checkpoint failure between "rows inserted" and "Kafka offset committed" replays 12 minutes of orders and the dashboard shows duplicates. The fix is not to patch the sink — it is to wrap the sink in Flink's TwoPhaseCommitSinkFunction, which is a generic protocol for making any external system participate in Flink's checkpoint as a transactional sink.

A Flink job's checkpoint is a global cut. To make an external sink (MySQL, S3, HTTP) exactly-once, the sink must implement four methods — beginTransaction, preCommit, commit, abort — that map onto Flink's checkpoint barriers. Pre-commit happens when the barrier passes the sink; the actual commit happens when the JobManager confirms the global checkpoint succeeded. Crash between pre-commit and commit, and recovery replays the commit; crash before pre-commit and recovery replays the whole transaction.

The problem: Kafka's protocol does not generalise

Kafka's transactional protocol (InitProducerId, AddPartitionsToTxn, EndTxn, WriteTxnMarkers) works because Kafka owns both ends of the writer. The coordinator persists __transaction_state, the partition leaders honour the markers, and the consumer-side LSO hides uncommitted records. None of that is portable. MySQL has its own transaction system. S3 has none — every PutObject is immediately visible. An HTTP webhook has neither transactions nor read-your-writes.

So the question is: what is the minimum interface an external system must expose for a Flink job to write to it exactly-once? The answer Flink settled on (TwoPhaseCommitSinkFunction, introduced in 1.4) is four methods:

beginTransaction()  → returns a handle to a fresh, isolated transaction
preCommit(handle)   → flush all data; the transaction is now durable but
                       NOT yet visible to readers
commit(handle)      → make the transaction visible to readers (idempotent)
abort(handle)       → discard the transaction's data (idempotent)

If your external system can implement these four methods — even with creative interpretations of "isolated" and "visible" — Flink can plug it into the checkpoint machinery and give you exactly-once writes.

How a checkpoint barrier drives the sink

Flink's chandy-lamport-style checkpoint algorithm is covered in /wiki/checkpointing-the-consistent-snapshot-algorithm. The relevant bit for sinks: the JobManager periodically injects a barrier into every source. Each operator, upon seeing the barrier on all its input channels, snapshots its state and forwards the barrier downstream. When the barrier reaches every sink, the global checkpoint completes — the JobManager writes a checkpoint metadata record and notifies every operator via notifyCheckpointComplete(checkpointId).

A TwoPhaseCommitSinkFunction hooks into exactly two of those events: barrier arrives at sink (call preCommit) and notifyCheckpointComplete (call commit). That is the entire mapping.

Checkpoint barrier driving a 2PC sink lifecycleA timeline showing a checkpoint barrier travelling from a source through a map operator into a sink, and the four sink lifecycle calls (beginTransaction, preCommit, commit, abort) anchored to the barrier and to the JobManager's notifyCheckpointComplete signal. Checkpoint barrier and the four sink callbacks source map operator 2PC sink external system (MySQL) checkpoint N barrier travels → barrier injected map snapshots state barrier hits sink → preCommit(N) flush rows; mark txn durable JobManager: all operators ack checkpoint N → write metadata → notifyCheckpointComplete(N) sink callback fires: commit(handle for N) → MySQL: COMMIT; txn is now visible to readers if JM fails to receive all acks → recovery replays from checkpoint N-1; pending txn for N is aborted on restart commit() must be idempotent — recovery may call it again for a checkpoint that was already committed but whose ack was lost
The barrier triggers preCommit; the JobManager's notifyCheckpointComplete triggers commit. Crash between the two means recovery replays commit (which is idempotent). Crash before preCommit means the whole transaction is aborted and replayed.

The lifecycle of one transaction, end to end:

  1. beginTransaction() — called when the sink starts processing data after the previous checkpoint. Allocates a new transaction handle (e.g., a MySQL START TRANSACTION and the resulting connection, or an S3 multipart upload id, or a unique idempotency key for an HTTP endpoint).
  2. Records flow in. The sink writes them through the open transaction handle. In MySQL, that means INSERT statements on the open connection. In S3, UploadPart calls. In HTTP, buffered into a local file keyed by the handle.
  3. Barrier arrives at the sink. Flink calls snapshotState(checkpointId), which internally calls preCommit(handle). For MySQL, this is not COMMIT — it is a flush + a XA PREPARE-equivalent (or storing the connection handle in operator state without committing). The transaction is durable but not visible.
  4. Sink rolls forward to a fresh transaction. It calls beginTransaction() again so subsequent records belong to checkpoint N+1, not N.
  5. JobManager's notifyCheckpointComplete(N). All operators acknowledged checkpoint N, the metadata file is written. Now the sink calls commit(handle for N) — for MySQL, this is the actual COMMIT. For S3, CompleteMultipartUpload. For HTTP, the buffered file gets POSTed.

Crash recovery is the cleanest part of the protocol. On restart, Flink restores operator state from the latest checkpoint; the sink's state contains the handle of any pending transaction. The sink calls commit(restored_handle) to redo the commit (idempotent — the transaction may already be committed, in which case the call is a no-op) and any older pending transactions are abort()ed.

A working TwoPhaseCommitSinkFunction for MySQL

The cleanest way to internalise the lifecycle is to write the four methods. This is a faithful translation of the Java interface to Python — the actual Flink class is in org.apache.flink.streaming.api.functions.sink, but the logic is the same.

# flink_2pc_sink.py — a TwoPhaseCommitSinkFunction modelled in Python
import sqlite3, uuid, time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class MySQLTxnHandle:
    txn_id: str
    rows: list = field(default_factory=list)
    prepared_at: Optional[float] = None
    committed: bool = False

class TwoPhaseCommitMySQLSink:
    """Mirrors Flink's TwoPhaseCommitSinkFunction lifecycle. Persists pending
    transactions to the operator's state so recovery can replay commits."""

    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("CREATE TABLE IF NOT EXISTS orders_live (id TEXT PRIMARY KEY, status TEXT, ts REAL)")
        self.conn.execute("CREATE TABLE IF NOT EXISTS _txn_log (txn_id TEXT PRIMARY KEY, prepared_at REAL)")
        self.conn.commit()
        self.current: Optional[MySQLTxnHandle] = None
        self.pending: dict[str, MySQLTxnHandle] = {}   # checkpoint_id → handle (operator state)

    def begin_transaction(self) -> MySQLTxnHandle:
        h = MySQLTxnHandle(txn_id=f"flink-txn-{uuid.uuid4().hex[:8]}")
        self.current = h
        return h

    def invoke(self, row: dict):
        # Stage in memory; do NOT write to MySQL yet — the MySQL transaction is implicit
        # and we want it to span everything until the next barrier.
        self.current.rows.append(row)

    def pre_commit(self, h: MySQLTxnHandle):
        # Flush rows under a single MySQL transaction; record the txn_id in _txn_log.
        # Crucially, we INSERT but do NOT COMMIT — the connection holds the row locks.
        # In real Flink+MySQL: use XA START / XA END / XA PREPARE.
        self.conn.execute("BEGIN")
        for r in h.rows:
            self.conn.execute("INSERT OR REPLACE INTO orders_live VALUES (?, ?, ?)",
                              (r["id"], r["status"], r["ts"]))
        self.conn.execute("INSERT OR REPLACE INTO _txn_log VALUES (?, ?)", (h.txn_id, time.time()))
        # In sqlite we have to commit here (no XA); in real MySQL this would be XA PREPARE.
        # The semantic equivalent is: rows are durable, txn_id is logged, but a follower
        # query running with READ COMMITTED on a snapshot taken before commit() would not see them.
        self.conn.commit()
        h.prepared_at = time.time()

    def commit(self, h: MySQLTxnHandle):
        # Idempotent — recovery may call this for a txn that already ran.
        if h.committed:
            return
        # In real MySQL+XA: XA COMMIT 'txn_id'. Here, we simply mark the row visible
        # by writing a "committed" marker (or in real systems, the XA COMMIT phase).
        # For this simulator the rows are already in orders_live; we just record completion.
        h.committed = True
        # Garbage-collect the txn_log entry — tells future recoveries this txn is done.
        self.conn.execute("DELETE FROM _txn_log WHERE txn_id=?", (h.txn_id,))
        self.conn.commit()

    def abort(self, h: MySQLTxnHandle):
        # Idempotent. Rolls back if the prepare wrote rows; here we DELETE them.
        for r in h.rows:
            self.conn.execute("DELETE FROM orders_live WHERE id=?", (r["id"],))
        self.conn.execute("DELETE FROM _txn_log WHERE txn_id=?", (h.txn_id,))
        self.conn.commit()

    def snapshot_state(self, checkpoint_id: int):
        """Called by Flink when the barrier arrives. preCommit + roll forward."""
        self.pre_commit(self.current)
        self.pending[checkpoint_id] = self.current
        self.current = self.begin_transaction()

    def notify_checkpoint_complete(self, checkpoint_id: int):
        """Called by Flink when the JM confirms the global checkpoint succeeded."""
        h = self.pending.pop(checkpoint_id, None)
        if h is not None:
            self.commit(h)

    def restore_from_state(self, restored_pending: dict):
        """Called on recovery. Replay commits for every checkpoint that the JM
        confirmed completed; abort the rest."""
        for cid, h in restored_pending.items():
            self.commit(h)   # idempotent — finishes any prepared-but-uncommitted txn
        self.current = self.begin_transaction()

# Demo: two checkpoints, one crash between them
sink = TwoPhaseCommitMySQLSink("orders.db")
sink.begin_transaction()

# Checkpoint 1: 3 records
for r in [{"id":"o-101","status":"PLACED","ts":1.0},
          {"id":"o-102","status":"PICKED","ts":1.5},
          {"id":"o-103","status":"DELIVERED","ts":2.0}]:
    sink.invoke(r)
sink.snapshot_state(checkpoint_id=1)
sink.notify_checkpoint_complete(1)

# Checkpoint 2: 2 records, then a crash before notifyCheckpointComplete
for r in [{"id":"o-104","status":"PLACED","ts":3.0},
          {"id":"o-105","status":"PICKED","ts":3.5}]:
    sink.invoke(r)
sink.snapshot_state(checkpoint_id=2)
print("[crash before notifyCheckpointComplete(2)]")

# Recovery: restore from state, replay commit
recovered_pending = dict(sink.pending)   # operator state was persisted with checkpoint 2
sink2 = TwoPhaseCommitMySQLSink("orders.db")
sink2.restore_from_state(recovered_pending)

# Inspect
rows = list(sink2.conn.execute("SELECT id, status FROM orders_live ORDER BY id"))
print("orders_live after recovery:", rows)
pending = list(sink2.conn.execute("SELECT txn_id FROM _txn_log"))
print("_txn_log (should be empty if recovery completed):", pending)
[crash before notifyCheckpointComplete(2)]
orders_live after recovery: [('o-101', 'PLACED'), ('o-102', 'PICKED'), ('o-103', 'DELIVERED'), ('o-104', 'PLACED'), ('o-105', 'PICKED')]
_txn_log (should be empty if recovery completed): []

Walking through the load-bearing lines:

The 90-line model misses two pieces of real Flink-MySQL: actual XA START/XA PREPARE/XA COMMIT calls (sqlite has no XA), and the timeout-based abort that production deployments add to clean up orphaned prepared transactions. Add those, and you have a faithful sink. The four-method skeleton is unchanged.

Where the protocol meets reality: per-sink mappings

The four methods are the same for every external system, but what each method means differs sharply:

How beginTransaction/preCommit/commit/abort map to four real sinksA four-row table comparing the four 2PC methods across MySQL XA, Kafka transactional producer, S3 multipart upload, and an HTTP idempotency-key endpoint, showing the concrete operation each method invokes. Same four methods, four very different mappings sink type beginTransaction preCommit commit abort MySQL XA XA START 'xid' XA END + XA PREPARE 'xid' XA COMMIT 'xid' XA ROLLBACK 'xid' Kafka producer beginTransaction() on KafkaProducer flush() + close producer (handle stored) resumeTransaction + commitTransaction abortTransaction S3 multipart CreateMultipartUpload UploadPart for each buffered chunk CompleteMultipartUpload (now visible) AbortMultipartUpload HTTP webhook w/ idempotency-key generate idem-key, open local buffer fsync buffer file, store idem-key in state POST batch w/ key, retry until 2xx delete buffer file
The four-method skeleton hides the heterogeneity. MySQL gives you XA. Kafka gives you a transactional producer. S3 gives you multipart upload. HTTP gives you nothing — you must layer an idempotency-key protocol on top yourself.

The HTTP row is the hardest. The endpoint must accept an Idempotency-Key header and return the same response for the same key (the idempotency-key pattern that Stripe popularised — and that PhonePe's payment-status webhook now uses for cross-bank settlement). Without that, no amount of Flink-side machinery can give you exactly-once delivery to the endpoint.

Common confusions

Going deeper

Why end-to-end exactly-once needs both source-side and sink-side cooperation

Flink's TwoPhaseCommitSinkFunction solves the sink-side replay problem. The source side has its own problem: when Flink replays from a checkpoint, the source must replay the same records — otherwise "exactly once" becomes "exactly some". For Kafka sources, this works because Flink stores the consumer offsets in the checkpoint, not in __consumer_offsets, and on recovery seeks to the checkpointed offsets before reading. For a JDBC source, Flink stores the cursor (e.g., last updated_at watermark) in checkpoint state. The 2PC sink protocol assumes the source replays correctly; if your source is non-replayable (a UDP feed, a flaky webhook), no amount of sink-side 2PC saves you.

Why the "two-generals problem" is not actually a problem here

A reader who has seen the /wiki/transactional-writes-2pc-wearing-a-hat chapter knows that classical 2PC has a coordinator-failure window: between "coordinator decides commit" and "all participants get the commit message", the coordinator can crash and leave participants blocked. Flink dodges this because the JobManager's checkpoint metadata is the durable commit decision — it is written to a fault-tolerant store (HDFS, S3, ZooKeeper) before notifyCheckpointComplete fires. If the JM crashes after writing metadata but before notifying sinks, recovery reads the metadata and replays the notification. The participants are never blocked indefinitely; the worst case is a delayed commit, not an undecided one.

XA prepared transactions and the production hazard of "in-doubt"

MySQL's XA PREPARE puts a transaction into the in-doubt state. Production hazard: if the Flink job is killed and never restarts, the prepared transaction sits in MySQL forever, holding locks. XA RECOVER lists in-doubt transactions; on the operator side, you must either restart Flink (which will replay the commit) or manually XA ROLLBACK the orphans. Production deployments at companies like Cred run a cron'd cleanup that lists in-doubt transactions older than 2 × checkpoint_interval and aborts them. Without this, a single orphaned XA transaction can stall the entire MySQL replica through lock waits.

Kafka-as-sink: why Flink's KafkaSink chose read_committed over a custom protocol

The original Flink-Kafka 2PC sink used a transactional.id per parallel task. On recovery, Flink would resume the open transaction by transactional.id and commit it. This worked but required reading transactional.id.expiration.ms carefully — a long downtime would expire the transactional id and lose the commit. Modern KafkaSink (Flink 1.14+) uses the same skeleton but adds transactional.id.prefix so each attempt allocates a fresh id, sidestepping expiration. The downstream Kafka consumer must use isolation.level=read_committed to honour the marker — without that, the consumer reads pre-commit records as if they were committed, defeating the protocol.

When 2PC is overkill: the at-least-once-plus-idempotent alternative

If your sink table has a natural key (order_id, payment_event_id), at-least-once + INSERT ON CONFLICT DO UPDATE (Postgres) or INSERT IGNORE (MySQL) is simpler, cheaper, and just as correct. 2PC adds latency (records held for one checkpoint interval before being visible) and operational complexity (XA, in-doubt cleanup). Use 2PC when the sink does not have a natural key, when you need transactional aggregates (sum, count) that idempotency cannot give you, or when a downstream consumer reads with snapshot isolation and would see partial state. For most Indian fintech analytics workloads — Razorpay's payment-events warehouse, PhonePe's transaction logs — at-least-once + idempotent merge is the right answer; 2PC is reserved for the financial-close path where partial visibility is unacceptable.

Where this leads next

The two-phase-commit sink is the reusable skeleton. The next chapters compose it with the source-side protocol and walk through real workloads:

The pattern to internalise: every external system has its own commit primitive, but Flink's checkpoint barriers can be projected onto any of them through the same four methods. If you can answer "what does preCommit mean for my sink" and "is commit idempotent" — you can wire any sink into Flink's exactly-once contract.

References