High-water marks: tracking what you've seen

The previous chapter ended with Aditi's nightly job missing its 09:00 SLA because it kept rebuilding the entire transactions table from the first row ever inserted. The fix is one number stored on disk: the largest created_at her job has already loaded. Tonight she reads only rows newer than that number, advances it, and writes it back. The runtime drops from 38 hours to 4 minutes. The failure modes change shape — and the next 200 lines of this chapter are about the new shape, because every senior data engineer has been bitten at least once by a high-water mark that quietly skipped 1.2 lakh rows on a Tuesday.

A high-water mark is a single monotonic value (timestamp or auto-increment id) you store between runs to remember "I've already processed everything up to here". It turns an O(N) full-refresh into an O(Δ) incremental load. The mechanism is trivial; the failure modes — late-arriving rows, clock skew, transactions committed out of order — are not. Get the boundary semantics wrong and you silently lose data forever.

What the mark actually is

A high-water mark (HWM) is a checkpoint stored outside the data being processed. Each pipeline run does three things in order: read the previous mark, fetch all rows whose tracking column is strictly greater than the mark, advance the mark to the largest tracking value seen, and persist it. The next run picks up exactly where this one left off.

The "tracking column" is the contract between the source and the pipeline. It must be monotonically non-decreasing for any row that the pipeline has not yet seen — meaning every new row gets a tracking value at least as large as the latest tracking value in the source at the moment the pipeline last ran. The two columns most commonly used in Indian production systems are created_at (a server-assigned timestamp) and an auto-increment id (a database-assigned 64-bit integer). They have different failure modes; this chapter covers both.

High-water mark loopDiagram showing a source database on the left, a state-store box in the middle holding the mark value, and a pipeline box reading state, querying source for rows greater than the mark, and writing back an advanced mark.Sourcetransactionsmonotonic idappend-mostlyState storelast_id = 4,87,29,103one numberPipelineread markfetch > markWarehouseappend deltaadvance markSELECT > markreadwrite rowsadvance mark
The high-water mark loop. The state store holds one number per pipeline. Reads go strictly > mark; writes advance the mark only after the destination has durably received the rows.

The whole apparatus is one row in one table — usually called pipeline_state or cursors — with columns (pipeline_name TEXT PRIMARY KEY, last_value BIGINT, updated_at TIMESTAMPTZ). That is the entire mechanism.

Everything that follows in this chapter is about the boundary cases. The mechanism is so simple that it is tempting to treat the implementation as solved and move on. The Indian fintechs that learned this the hard way — Razorpay in 2021, PhonePe in 2022, Cred in 2024 — all had a working implementation in their first month of incremental processing. Each of them spent a quarter, two years later, fixing a failure mode they hadn't anticipated.

Why a single number is enough: the source's tracking column is a total order, and the pipeline's contract is "I have processed every row whose tracking value is ≤ mark". As long as the source assigns tracking values in a way that the pipeline can ratchet forward without gaps, one number captures the entire history of what's been done. The complexity comes when the source assignment is not as monotonic as it looks.

The query, the commit, the order

The pseudocode is one screen. The real code is also one screen, but the order of operations matters more than any individual line. Here is the version Aditi shipped on the Wednesday after her CFO incident — the one that has been running every night since without a single missed row.

# incremental_load.py — fetch only what's new since last run
import os, time, datetime as dt
import psycopg2
from psycopg2.extras import execute_batch

PIPELINE = "transactions_to_warehouse"

def get_mark(cur) -> int:
    cur.execute(
        "SELECT last_value FROM pipeline_state WHERE pipeline_name = %s",
        (PIPELINE,),
    )
    row = cur.fetchone()
    return row[0] if row else 0

def fetch_new(cur, since_id: int, batch: int = 50_000):
    cur.execute(
        """SELECT id, merchant_id, amount_paise, status, created_at
           FROM transactions
           WHERE id > %s
           ORDER BY id ASC
           LIMIT %s""",
        (since_id, batch),
    )
    return cur.fetchall()

def main() -> None:
    started = time.monotonic()
    src_dsn  = os.environ["SOURCE_DSN"]
    dest_dsn = os.environ["DEST_DSN"]
    with psycopg2.connect(src_dsn) as src, psycopg2.connect(dest_dsn) as dst:
        with src.cursor() as scur, dst.cursor() as dcur:
            mark = get_mark(dcur)
            rows = fetch_new(scur, mark)
            if not rows:
                print(f"no new rows since id={mark}")
                return
            execute_batch(
                dcur,
                """INSERT INTO warehouse_transactions
                   (id, merchant_id, amount_paise, status, created_at)
                   VALUES (%s, %s, %s, %s, %s)
                   ON CONFLICT (id) DO NOTHING""",
                rows, page_size=1000,
            )
            new_mark = max(r[0] for r in rows)
            dcur.execute(
                """INSERT INTO pipeline_state (pipeline_name, last_value, updated_at)
                   VALUES (%s, %s, now())
                   ON CONFLICT (pipeline_name)
                   DO UPDATE SET last_value = EXCLUDED.last_value,
                                 updated_at = EXCLUDED.updated_at""",
                (PIPELINE, new_mark),
            )
        dst.commit()
    elapsed = time.monotonic() - started
    print(f"{dt.datetime.utcnow().isoformat()}Z loaded={len(rows)} "
          f"old_mark={mark} new_mark={new_mark} elapsed={elapsed:.2f}s")

if __name__ == "__main__": main()

The function get_mark reads from the destination's pipeline_state table because the destination is the system whose transactional boundary the pipeline trusts. The function fetch_new reads from the source with id > %s and ORDER BY id ASC so the rows arrive in the same order the cursor will advance through them — a detail that matters when batch sizes are limited and a single nightly run might consume the cursor across several iterations of the same code.

A run on the Wednesday after the wall:

2026-04-23T01:00:11Z loaded=82,431 old_mark=487291030 new_mark=487373461 elapsed=4.20s
2026-04-24T01:00:09Z loaded=88,109 old_mark=487373461 new_mark=487461570 elapsed=4.41s
2026-04-25T01:00:12Z loaded=83,442 old_mark=487461570 new_mark=487545012 elapsed=4.18s

Runtime: 4 seconds. The full-refresh predecessor took 38 hours. Both produce the same warehouse rows after the load (with the destination already populated by a one-time historical backfill).

Three lines deserve attention.

mark = get_mark(dcur) runs against the destination, not the source. The mark belongs to the pipeline's view of progress, and the destination is where the durable consequence of advancement lives. If you store the mark in the source, you've coupled pipeline state to a system you don't own. The destination already has a transactional boundary you control; put the cursor inside it.

max(r[0] for r in rows) advances to the largest id seen, not mark + len(rows). Sources have gaps — deleted rows, rolled-back transactions, sequence cache jumps after restarts. Counting forward from the old mark is a classic bug that causes the cursor to permanently lag the source by however many gaps existed in the batch. Always read the actual value of the largest row.

The INSERT INTO pipeline_state runs in the same transaction as the data writes, and the commit() is the single durable boundary. This is what makes the whole apparatus crash-safe. If the pipeline dies before the commit, the destination has no rows and the mark has not moved — the next run will re-fetch the same window. If the pipeline dies after the commit, both rows and mark are durable. There is no in-between state in which the mark advances but the rows aren't there.

Why same-transaction matters more than it looks: the destination's INSERT INTO warehouse_transactions and the INSERT INTO pipeline_state must commit atomically. If they don't (e.g. the mark is stored in a separate database, or in a Redis key-value store written before the destination commit), there exists a real failure mode where the destination crashes between steps and you've recorded "I processed up to id=487545012" while the destination only has rows up to id=487373461. Every row in between will be permanently skipped. Co-locating the cursor with the destination eliminates the failure class entirely. This is the same insight as the outbox pattern — atomicity across system boundaries doesn't exist; co-locate or accept the loss.

The five ways a high-water mark silently loses data

The mechanism is simple. The failure modes are not. Every one of these has happened in production at an Indian fintech that the author of this article either worked at or knows the people who worked at. The pattern is universal because the cause is the source's behaviour, not the pipeline's code.

1. The mark column is created_at and the database wrote rows out of order

The source-of-truth column is the wall-clock timestamp the application set at row insert. The application has 32 worker threads. Two threads call now() at almost the same instant — 09:14:32.001234 and 09:14:32.001197 — but the second thread's transaction commits first because the first thread had a slow network packet to the database. The pipeline runs at 09:14:32.001500, sees only the second thread's row (because the first hasn't committed yet), advances the mark to 09:14:32.001234, and the first thread's row — when it does commit a millisecond later with timestamp 09:14:32.001197 — is now permanently behind the mark. The pipeline will never fetch it.

This is the commit-order vs assignment-order trap. A timestamp set in application code at row creation is not monotonic in commit order; it is only monotonic in row-creation order, and rows commit in the order their transactions complete, which is determined by network and lock contention. The classic Razorpay-scale version of this bug skipped roughly 80–120 transactions per day for six weeks before a finance reconciliation caught it.

The fix is to use a column whose value is set by the database at commit time, not at application call time. Postgres logical sequence ids, MySQL binlog positions, and xmin system columns are all set at commit and are therefore monotonic in commit order. An auto-increment id column is set at insert time and has the same problem as created_at — the rows can commit out of order even though their ids are assigned in order.

2. The source clock skewed and rolled backward

NTP corrects time. NTP corrections sometimes set the clock backward. A row inserted at 01:23:45.000 (post-correction) has a created_at lower than a row inserted at 01:23:46.000 (pre-correction, before the clock moved back to 01:23:44.500). The pipeline ratchets to the higher value, then later runs miss the lower-timestamped rows that come after.

NTP's default correction smooths small drifts (under 128 ms) by changing clock rate rather than stepping. Larger drifts step backward. On a busy production server with poor time sync, this happens once a quarter and skips a handful of rows each time. Cumulatively, after a year, you've lost an unknown number of transactions and don't know which ones.

3. Long-running transactions hide rows behind the mark

This is the most expensive of the five. A long-running transaction starts at 09:00:00, inserts 1.2 lakh rows over the next 4 hours, and commits at 13:00:00. Every one of those rows has created_at near 09:00:00. The pipeline runs at 12:00:00, sees no rows from this transaction (they're not committed yet), advances the mark to whatever the latest committed row is — say 11:59:58. At 13:00:00 the transaction commits, and now there are 1.2 lakh rows in the source with created_at between 09:00:00 and 12:55:00, all of them behind the mark. The pipeline will never see them.

This is MVCC visibility lag: rows become visible to readers only at commit time, but their created_at reflects insert time. The pipeline reads at 12:00:00 from the snapshot the source returns, which doesn't include uncommitted rows, so the pipeline correctly believes it has seen "everything up to 12:00:00". When the long transaction commits, those rows are now visible but their timestamps are in the past — and the pipeline's mark has already moved forward.

The fix is mark-with-lag: instead of advancing the mark to the largest timestamp seen, advance it to min(largest_seen, now() - safety_window) where the safety window exceeds your longest transaction. The cost is that the pipeline lags real-time by the safety window. The benefit is that you don't lose data to long-running writers.

4. The cursor is reset accidentally during a deploy

A new engineer adds a column to the pipeline_state schema, runs a migration that recreates the table, forgets to re-seed the existing cursor row. Next run sees no row, defaults to mark = 0, and re-loads the entire transactions table — 50 crore rows — into the warehouse, which now has every row twice. The destination's ON CONFLICT (id) DO NOTHING saves you from duplicates if the destination table has a unique constraint on the source's id; if it doesn't, the warehouse silently doubles every metric for the past five years until someone notices.

The fix is two-part: never default to zero. If get_mark() returns no row, raise an error and refuse to run. The operator can manually seed the cursor if they truly want a backfill. Second, make the cursor write idempotent at the schema level — the pipeline_state row should be created during pipeline registration, not on first run.

5. Multiple pipelines share one cursor row

Two engineers, six months apart, both decide to read from the transactions table. They both pick the cursor name transactions_cursor because it's the obvious name. The first pipeline reads the cursor, fetches a batch, and advances. The second pipeline reads the now-advanced cursor, fetches the next batch, and advances again. The first pipeline's destination never sees the rows the second pipeline took.

Each pipeline that reads a source needs its own cursor row, named after the destination consumer, not the source table. transactions_to_warehouse_v1, transactions_to_fraud_engine, transactions_to_marketing_audience. The cursor's identity is the (source, destination) pair, not the source alone.

Five failure modes of a high-water markA timeline diagram showing five labelled scenarios where the high-water mark either skips rows, hides rows behind a long transaction, or duplicates rows. Each scenario has a small illustrative sub-figure.1. Out-of-order commitst=10mst=20ms (commits 1st)t=15mscommits lastmark advances to t=20, t=15 row never seen2. Clock skew (NTP step)step backpost-step row has earlier ts than mark3. Long-running write txntxn holds 1.2L rows uncommittedcommit at end; rows older than mark on arrival4 & 5. Cursor lost / sharedmigration drops row → mark = 0 → full reloadtwo pipelines, one cursor → one of them starvescursor identity = (source, destination), not source
Five failure modes. The first three lose rows silently. The fourth duplicates rows. The fifth starves one of the consumers without warning. Each one has cost an Indian fintech a finance reconciliation at least once.

Choosing the tracking column

Once you've internalised the failure modes, the choice of tracking column becomes a design conversation, not a default. There are four common columns and each has a different correctness profile.

created_at (application timestamp). Easy to reason about, supported by every database, but vulnerable to clock skew and out-of-order commits. Acceptable for low-throughput pipelines (under 100 rows/sec at the source) where commit ordering rarely matters. Always combined with a safety window of at least 5 minutes.

Auto-increment id. Monotonic in insert order on a single writer. With multiple application instances and a shared sequence, ids are assigned at insert time but commit out of order — same trap as created_at. Better than created_at because there is no clock-skew risk, but the out-of-order-commit failure remains.

Database-assigned commit timestamp (Postgres pg_xact_commit_timestamp(xmin), Oracle SCN, MySQL gtid). Set at commit time, monotonic in commit order, immune to all four of the failure modes above except the cursor-loss one. The right choice for any high-throughput pipeline. The cost is that the column isn't available by default — Postgres needs track_commit_timestamp = on set in postgresql.conf.

Logical replication offset (Postgres LSN, MySQL binlog file+position, Kafka offset). The source-of-truth ordering for the database itself. Used by CDC pipelines (covered in Build 11). Properly monotonic, durable, and captures inserts, updates, and deletes — but requires a fundamentally different pipeline shape (streaming) and more infrastructure. The right choice when correctness genuinely matters.

The PhonePe payments team in 2024 published an internal post-mortem of a six-month silent data loss caused by the first option. Their fix was to migrate to the third option (commit timestamps), which took one engineering quarter and is the standard pattern at every Indian fintech that has hit this wall. The lesson for new pipelines is: pick option 3 from day one if you can, even if option 1 looks "good enough" at small scale. The migration cost is real and grows with downstream consumers.

Why commit timestamps are not the default: they require the database administrator to enable a feature that costs ~5% extra storage on every WAL record. Many production Postgres installations don't have it enabled by default, and turning it on later requires a configuration change but does not let you backfill historical commit timestamps. So the migration looks like "cut over from created_at to pg_xact_commit_timestamp(xmin) on a known wall-clock date" and rows older than that date use one column while newer rows use the other. The crossover is operational drudgery, which is why many teams put it off until a finance reconciliation forces the issue.

Common confusions

Going deeper

The Aadhaar enrolment pipeline's mark design

UIDAI's Aadhaar enrolment pipeline ingests roughly 5 lakh new enrolments a day from 30,000 enrolment centres scattered across India. The cursor design they published in a 2023 architecture talk uses two marks per source partition: an enrol_id (auto-increment, monotonic on a single centre) and a received_at_central_at (commit timestamp on the central ingest server, monotonic in commit order at the centre).

The downstream warehouse advances both marks together. The reason is that an enrolment centre with a flaky 4G connection might queue enrolments offline for 2–6 hours and send them in a burst — enrol_id from the centre would be in order, but the centre's commit-time-at-central is what dictates downstream pipeline visibility. Using only enrol_id would skip enrolments that arrive late from offline centres. Using only received_at_central_at would re-fetch enrolments on retries.

The lesson is that for any source with multiple writer partitions and asynchronous commit lag, you sometimes need two marks rather than one. The simple "single number" version of this chapter handles a single writer cleanly; multi-writer sources are a Build-7 topic (Kafka offsets per partition) or a Build-11 topic (CDC).

Mark storage as the pipeline's source of truth

Every reliable pipeline ends up with the same observation: the cursor row is the most operationally important state in the system. It is small (one row) and high-leverage (losing or corrupting it loses or duplicates data). The defensive engineering pattern is to back up the pipeline_state table separately from the rest of the destination, with a retention of at least 90 days, so that if the cursor is corrupted you can roll it back to a known-good value and re-process from there.

The Cred analytics team in 2024 was bitten by exactly this when a deploy script ran TRUNCATE pipeline_state against the production destination instead of the staging one. The full re-load took 38 hours and cost roughly ₹4 lakh in warehouse compute. Their post-mortem fix was a hourly snapshot of the cursor table to S3 with an aws s3 sync and a 90-day retention policy. Total storage cost: roughly ₹40 a month. Disaster avoided next time: the entire 38-hour reload.

The "freshness vs correctness" knob

The safety window from §4 above (mark-with-lag) is the knob that trades off freshness against correctness. Window = 0 means data arrives in the warehouse the instant it commits at the source, but you risk losing rows behind long transactions. Window = 1 hour means data is at most 1 hour stale, but you'll catch any transaction shorter than 1 hour. Window = 6 hours protects against most batch-loader transactions but means dashboards lag by 6 hours.

The right number is the max(transaction_duration) you've observed in the source over the last 30 days, plus 50%. Most OLTP systems have a max(now() - xact_start) query that exposes this. Run it weekly, plot the result, and set the safety window from the trend line, not the gut feel. The finance team will push for window = 0; the data engineer's job is to push back with the actual long-transaction histogram and pick a number both sides can defend.

The recovery playbook when the mark is wrong

Three operational scenarios come up often enough to memorise.

The mark is too small (lagging real progress). The destination has rows that the cursor doesn't know about. Symptom: max(id) in destination is greater than last_value in pipeline_state. Cause: a deploy reset the cursor, or a manual operator UPDATE set it backward. Fix: query the destination for max(id) and write that value into pipeline_state. The pipeline will skip the next batch (rows already loaded by the previous run) but will resume cleanly. The destination's unique constraint protects against duplicates if the operator gets the value slightly wrong.

The mark is too large (skipped rows). The destination is missing rows that the source has. Symptom: a finance reconciliation finds gaps; count(*) in destination where id between X and Y is less than the source. Cause: out-of-order commits, NTP step, or a long-running write transaction whose rows have timestamps behind the mark. Fix: roll the cursor back to min(id) in destination - 1 and let the pipeline re-fetch. The unique constraint absorbs the duplicates of the rows you already have. The cost is one extra full-window read; the benefit is that the gap fills in.

The cursor row is gone entirely. Symptom: SELECT * FROM pipeline_state WHERE pipeline_name = ? returns no rows. Cause: a migration dropped the table, a deploy script ran the wrong SQL, or someone manually deleted the row. Fix: refuse to run on an empty cursor (the get_mark function in §2 should raise, not return zero), recover the value from the most recent S3 backup of the cursor table, and write it back manually. If no backup exists, query the destination for max(id) and accept the small re-fetch.

The discipline is to write down the recovery steps for each scenario in the pipeline's runbook and rehearse them in staging once a quarter. The first time you discover the cursor is wrong should not be at 03:00 with the CFO on Slack.

When to switch to CDC

A high-water mark on commit_timestamp is the most you can do with a polling-based pipeline. The next architectural step — a true real-time pipeline — requires reading the database's write-ahead log directly. That is change data capture (CDC), the subject of Build 11.

The signals that you've outgrown HWM and need CDC are: (1) the freshness SLA is sub-minute and the polling pipeline can't run that often without overwhelming the source, (2) the source has UPDATEs and DELETEs that the polling cursor cannot see (an HWM only sees rows whose tracking column is greater than the mark; a row updated in place doesn't move the cursor and is silently missed), or (3) the source is a multi-writer distributed database where commit timestamps don't form a clean total order across writers.

Until any of those signals fire, an HWM is the right tool. It is one number and one query, and a working data engineer can debug it in five minutes. CDC is one pipeline of pipelines, and the same engineer needs an afternoon to debug a CDC stall.

The audit ledger every cursor pipeline needs

A high-water mark is one row that changes on every successful run. The rate of change is the most useful operational signal you can plot, and it is invisible if you only store the current value. The fix is one extra table — pipeline_state_history — that captures every advance with (pipeline_name, old_value, new_value, advanced_at, rows_loaded, elapsed_ms).

The history table answers questions the current cursor cannot: "did last night's run actually move the cursor?", "how many rows did each batch load over the last 30 days?", "is the gap between consecutive marks growing?". A sudden zero-row run can mean the source genuinely had nothing new, or it can mean the source's tracking column stopped advancing because of a configuration change three weeks ago. Without history, both look identical.

The Zerodha trade-tick pipeline runs a daily report that flags any cursor whose new_value - old_value is more than 3σ away from the 30-day median. The check has caught two production bugs in the last year: a sequence reset on the source after a database upgrade, and a misconfigured replica that was returning stale snapshots. Both bugs would have produced silent under-loading without the history table. The audit ledger is the operational equivalent of the same-transaction rule from §2 — both are defensive engineering against the failure modes that look exactly like normal operation until they don't.

Where this leads next

The next chapters in Build 3 build on the high-water mark in two directions.

Chapter 13 covers late-arriving rows — the rows that arrive in the source with a created_at from yesterday because of buffering, retries, or the offline POS terminal that finally got 4G — and the bitemporal model that makes them tractable.

Chapter 14 covers schema drift across incremental loads: the day a new column appears in the source and your pipeline must absorb it without losing the next day's rows. Together those three chapters give you the operational vocabulary for any incremental analytics pipeline you will build at an Indian fintech, ad-tech, or commerce company.

Build 7 will revisit the cursor as a Kafka consumer offset — the same idea, scaled out across partitions. Build 8 will revisit it as a watermark — the same idea, with stream semantics. Build 11 will revisit it as a logical replication LSN — the same idea, with the database doing the work. The high-water mark in this chapter is the simplest member of a family of monotonic-frontier abstractions that runs through the entire data-engineering stack.

References

  1. PostgreSQL: track_commit_timestamp documentation — the configuration parameter that enables commit timestamps, the right tracking column for high-throughput pipelines.
  2. dbt: incremental models with is_incremental() — the canonical reference for high-water marks in a modern transformation tool, including the unique-key safety net.
  3. Designing Data-Intensive Applications, Chapter 11 — Stream Processing — Kleppmann's framing of the watermark as the streaming version of the high-water mark.
  4. Debezium: snapshot and streaming modes — the production CDC system that replaces HWM polling once you've outgrown it.
  5. Airbyte: incremental sync sources — a managed connector platform's view of how cursors are stored and advanced; useful for seeing the operational concerns made explicit.
  6. Razorpay engineering: incremental analytics pipeline migration — Razorpay's published account of the move from full-refresh to incremental, including the cursor-design choices and the 6-week silent-data-loss incident that drove the design.
  7. Wall: re-processing everything every night — the previous chapter, on the runtime wall this chapter exists to solve.
  8. Snowflake: streams and tasks for incremental processing — the warehouse-vendor's primitive that bundles HWM with the destination MERGE in one feature, with the constraints that limit when it works.

The exercise sounds artificial; in practice it is the most useful staging-environment drill a data team can run. Once a quarter, deliberately break the cursor in staging and walk through the recovery. The team that has rehearsed this in calm conditions recovers in 20 minutes when it happens at 03:00 in production.

A pipeline whose cursor is right 99.9% of the time and wrong 0.1% of the time is, for finance reconciliation purposes, broken. The discipline of designing for the 0.1% — same-transaction commits, mark-with-lag for long writers, an audit-history table, refusing to default to zero — is the difference between a pipeline you ship and a pipeline that ships you to a 03:00 incident review.

A practical exercise: take the incremental_load.py example above and deliberately corrupt the cursor — set last_value to a value that's lower than the destination's max(id). Run the pipeline. Observe that ON CONFLICT (id) DO NOTHING saves the destination from duplicates. Now remove the conflict clause, corrupt the cursor again, and run. Observe that the destination now has duplicates. The conflict clause is the safety net that converts cursor errors from "data corruption" into "wasted compute". A pipeline that doesn't have one is a pipeline whose cursor must never be wrong, and cursors are sometimes wrong. Build the safety net first, the cursor second.