MongoDB change streams

MySQL forces your CDC consumer to register itself as a replica. Postgres makes you create a logical replication slot and parse a custom protocol. MongoDB took a third path: every replica set already maintains an internal log of every write — the oplog — and db.collection.watch() exposes that log as a resumable, filterable cursor over the regular MongoDB driver. No replication handshake, no slot bookkeeping, no TCP-level pretend-to-be-a-replica. This chapter shows what's underneath that one method call, what the resume token actually points at, and where the abstraction leaks under load.

A MongoDB change stream is a tailing cursor over the replica set oplog, exposed through any modern MongoDB driver as watch(). Each event carries a resume token — an opaque, ordered cursor position — that lets a consumer crash and resume without losing or duplicating events. Change streams are the easiest CDC primitive of the three big OLTP databases to consume, but the easy ergonomics hide three operational traps: oplog retention, fan-out cost, and the fact that pre-images and post-images are both opt-in.

What the oplog actually is

A MongoDB replica set always has a primary and one or more secondaries. Every write that commits on the primary is recorded as a document in the local.oplog.rs collection — a special capped collection that is the same size on every member and that all secondaries tail to apply writes in order. The oplog is the durability and replication mechanism, and like the MySQL binlog, it accidentally became the foundation of an entire CDC industry.

An oplog document looks like this:

{
  "ts": Timestamp(1714032501, 4),
  "t": NumberLong(7),
  "h": NumberLong("-9183782441238"),
  "v": 2,
  "op": "u",
  "ns": "swiggy.orders",
  "ui": UUID("1f4a..."),
  "o2": { "_id": ObjectId("66242a91...") },
  "o":  { "$v": 2, "diff": { "u": { "status": "delivered" } } },
  "wall": ISODate("2026-04-25T10:48:21.412Z")
}

The fields that matter for CDC:

A consumer can technically read the oplog directly — open a tailable cursor on local.oplog.rs, filter on the namespaces it cares about, and parse the documents. This is what every MongoDB CDC tool did before MongoDB 3.6 (October 2017), and it works, but it forces the consumer to handle three concerns that don't belong in application code: oplog format changes between versions, secondary failover (the consumer must re-resolve which member to tail), and resumption semantics across rollbacks. Why MongoDB built watch() on top: the abstraction hides all three. The driver tracks resume tokens, the server handles failover transparently, and oplog format changes ship as silent driver upgrades. The consumer only sees a stable stream of typed change events.

Anatomy of a change stream — what the driver wrapsA layered diagram showing the oplog at the bottom, the change stream pipeline transforming events in the middle, and the application receiving change events at the top. From oplog entry to change-stream event Layer 1 — local.oplog.rs (capped collection) {ts, op:"u", ns:"swiggy.orders", o2:{_id}, o:{$v:2, diff:{u:{status:"delivered"}}}} raw write log; ~1–10 GB capped Layer 2 — server-side aggregation pipeline ($changeStream stage) $changeStream → $match (ns,op filters) → $project (resumeToken, fullDoc, ...) optional: lookup pre-image / post-image from images collection optional: txnNumber / lsid for transactions Layer 3 — change event delivered to driver / application {operationType:"update", ns:{db:"swiggy",coll:"orders"}, documentKey:{_id}, updateDescription:{updatedFields:{status:"delivered"}}, _id:<resumeToken>} consumer-friendly shape
Three layers between disk and your application. Most CDC bugs in MongoDB live at layer 2 (filter pushdown, pre-image config) or layer 1 (oplog retention), not in layer 3 — but the consumer only sees layer 3, which is part of why diagnosis is hard.

The change event your application receives — operationType, ns, documentKey, updateDescription, _id (the resume token), and optionally fullDocument / fullDocumentBeforeChange — is a server-side projection of the underlying oplog entry. The server runs an aggregation pipeline starting with the special $changeStream stage, optionally followed by $match, $project, and so on. Why this matters operationally: filtering happens server-side. A consumer that only cares about swiggy.orders events on a cluster doing 100k writes/sec across hundreds of collections does not parse 100k events per second client-side — it sends an aggregation pipeline with $match: {"ns.coll": "orders"} and the server filters before transmitting. This is the single biggest performance lever for MongoDB CDC at scale.

Resume tokens — the cursor that survives crashes

Every change event carries a _id field whose value is the resume token — an opaque BSON document the consumer treats as cursor position. Modern resume tokens (since MongoDB 4.2) are version-2 tokens that encode the cluster time, the document's _id, the operation type, and a UUID for the collection. They serialise to base64 strings of ~80–120 bytes:

"82663E2A91000000012B022C0100296E5A1004D8E51A..."

The consumer never inspects the bytes. It only needs three operations: receive a token with each event, persist the most recent durably-processed token, and pass that token back as resumeAfter (or startAfter) on reconnection. The server then resumes from the next event after the saved token, regardless of which oplog file the data physically lives in or whether the original primary still exists.

The key correctness property: resume tokens are durable across replica-set elections. If the primary dies mid-stream, the new primary's oplog includes all committed writes from the old primary (that's the definition of replica-set commit). The consumer reconnects, passes its saved token, and the new primary resumes the stream. No file+offset gymnastics, no "did this transaction make it across the failover" worry. Why this is operationally easier than MySQL binlog: MySQL CDC consumers using file+position-based resumption must re-resolve their position against the new primary's binlog after failover (different filename, possibly different offset). GTID-based MySQL CDC and MongoDB resume tokens both solve this by replacing physical positions with logical ones.

There is a sharp limit, though: a resume token is only useful while the underlying oplog entry it references still exists. If the consumer falls behind and the oplog has rotated past that entry, the server returns a ChangeStreamHistoryLost error (code 286) and the consumer must rebootstrap from a fresh snapshot. This is the MongoDB equivalent of "MySQL purged your binlog file" — same problem, different error code.

Oplog window vs consumer lag — the failure cliffA horizontal timeline showing the oplog as a sliding window of recent writes, with the consumer's last-processed token positioned inside the window when healthy and outside when ChangeStreamHistoryLost fires. Resume token validity is bounded by oplog retention Healthy: consumer 5 min behind, oplog 6h window oplog window (6h) consumer position (5 min behind primary) — old — now → Failed: consumer 8h behind, oplog rotated past token oplog window (6h) saved token points here — entry deleted → ChangeStreamHistoryLost (286): consumer must rebootstrap from snapshot
The oplog is a sliding window. The consumer must stay inside it. Alerting before the cliff (e.g. when lag exceeds 50% of the oplog window) is the only way to avoid a forced full snapshot.

Build it: a Python change-stream consumer

The official pymongo driver speaks change streams natively. The example below subscribes to swiggy.orders, prints events as they arrive, and persists the resume token to a local file so a restart picks up where it left off — the exact pattern a small CDC pipeline at a Bengaluru food-delivery startup would have run before they switched to Debezium MongoDB.

import json, os, signal, sys, time
from pathlib import Path
from pymongo import MongoClient
from pymongo.errors import PyMongoError

# Connect to a replica set (change streams REQUIRE a replica set;
# standalone mongod won't work). The replicaSet name comes from rs.conf().
URI = "mongodb://cdc_reader:dev@127.0.0.1:27017/?replicaSet=rs0"
TOKEN_PATH = Path("/var/lib/swiggy-cdc/last_token.json")

client = MongoClient(URI)
coll = client["swiggy"]["orders"]

# Server-side filter pipeline. Anything in the pipeline runs on the
# primary; the consumer only sees what survives the $match.
pipeline = [
    {"$match": {
        "operationType": {"$in": ["insert", "update", "delete", "replace"]},
        # ignore writes from the bootstrap loader (a service account)
        "lsid.uid": {"$ne": "bootstrap"},
    }},
]

# Resume from saved token if we have one.
resume = None
if TOKEN_PATH.exists():
    resume = json.loads(TOKEN_PATH.read_text())
    print(f"resuming from saved token at {TOKEN_PATH}", file=sys.stderr)

events_seen = 0
last_token = None
try:
    with coll.watch(
        pipeline=pipeline,
        full_document="updateLookup",         # fetch post-image
        full_document_before_change="whenAvailable",  # pre-image if configured
        resume_after=resume,
        max_await_time_ms=1000,               # heartbeat cadence
    ) as stream:
        for ev in stream:
            op = ev["operationType"]
            doc_id = ev.get("documentKey", {}).get("_id")
            if op == "update":
                changed = ev.get("updateDescription", {}).get("updatedFields", {})
                summary = f"changed_fields={list(changed.keys())[:5]}"
            elif op in ("insert", "replace"):
                fd = ev.get("fullDocument", {})
                summary = f"merchant={fd.get('merchant_id')} amount={fd.get('amount')}"
            else:
                summary = ""
            print(f"  {op:<8}  swiggy.orders  _id={doc_id}  {summary}")
            last_token = ev["_id"]
            events_seen += 1
            # Checkpoint every 100 events; production checkpoints on sink commit.
            if events_seen % 100 == 0:
                TOKEN_PATH.parent.mkdir(parents=True, exist_ok=True)
                TOKEN_PATH.write_text(json.dumps(last_token, default=str))
except PyMongoError as e:
    print(f"stream error: {e!r}", file=sys.stderr)
finally:
    if last_token is not None:
        TOKEN_PATH.write_text(json.dumps(last_token, default=str))
    print(f"closed; processed {events_seen} events; last_token persisted")

A representative run, after a separate session creates an order, marks it paid, and cancels it:

  insert    swiggy.orders  _id=ObjectId('66242a91d3b21')  merchant=swiggy-bengaluru-koramangala amount=499
  update    swiggy.orders  _id=ObjectId('66242a91d3b21')  changed_fields=['status', 'paid_at']
  update    swiggy.orders  _id=ObjectId('66242a91d3b21')  changed_fields=['status', 'cancelled_at']
^C
closed; processed 3 events; last_token persisted

A walkthrough of the lines that matter:

The code is ~50 lines. The Debezium MongoDB connector is ~30,000 lines. Most of the additional code handles snapshot bootstrap, schema inference (MongoDB documents are schemaless on the wire), Kafka transactional sink, and the ChangeStreamHistoryLost recovery path.

Three production traps

The change-streams API is genuinely easier than binlog or logical decoding. The traps that page on-call at Indian shops are not in the API — they are in the configuration around it.

1. Oplog too small for the lag. The oplog is sized at replica-set creation: oplogSize config, default 5% of free disk up to 50 GB. On a write-heavy cluster, 5% is often 30 minutes of writes. If the consumer falls behind by more than the oplog window — restart, network outage, slow sink — the server returns ChangeStreamHistoryLost and the consumer must do a fresh snapshot. Production setting at most Bengaluru shops: oplog sized to 24–72 hours of write traffic, monitored via db.getReplicationInfo().oplogSizeMB against db.getReplicationInfo().tFirst and tLast. A consumer 2 hours behind on a cluster with a 6-hour oplog is fine; 5 hours behind is an active incident.

2. Pre-images and post-images are off by default. Until MongoDB 6.0, change streams could not deliver a deterministic before-image of an updated document — you got updatedFields only, and fullDocument="updateLookup" raced concurrent writes. Since 6.0, per-collection changeStreamPreAndPostImages flips this on, but it costs disk: every write maintains pre/post images in a separate system.preimages collection that is itself capped. Why this matters for downstream joins: a sink that needs the full prior state of a document — for example, a CDC pipeline writing into a Snowflake table that maintains slowly-changing-dimension history — cannot use updateLookup correctness-wise. It needs pre-images, which means it needs the cluster admin to enable them per collection, which means it needs a process for that.

3. Sharded clusters route through mongos, not the primary. On a sharded cluster, watch() against a sharded collection routes through mongos, which fans out the change stream across all shards and merges the resulting events in cluster-time order. The merge is correct but pays for a heartbeat round-trip per shard per heartbeat cycle. A 12-shard cluster with 1-second heartbeats sends 12 round-trips per second per consumer. Several CDC platforms — Debezium MongoDB included — recommend running one consumer per shard for high-throughput cases, even though the merge is the "easy" mode, because the per-shard topology is cheaper.

A monitoring snippet for oplog lag against the primary's current position:

// Run on the primary. Compare against the consumer's last-processed token.
db.getSiblingDB("local").runCommand({
  collStats: "oplog.rs", scale: 1024
});

// Approximate oplog window in seconds.
const info = rs.printReplicationInfo();
// "configured oplog size: ... log length start to end: 21600secs (6.0hrs)"

A consumer 5 minutes behind is fine. A consumer 2 hours behind on a 6-hour oplog needs investigation. A consumer approaching the oplog window is a paged incident.

A fourth, subtler trap belongs in the same family: majority read concern is the only safe one for CDC. By default, change streams emit only events whose underlying writes have been acknowledged by a majority of the replica set — equivalent to "this write will not be rolled back". A consumer that switches to local read concern to reduce latency starts seeing events for writes that may later be rolled back if the primary fails before majority acknowledgement, producing duplicate or contradictory events downstream. The rollback window is small (typically milliseconds) but non-zero, and the failure mode at Indian-scale during failover is exactly the kind of thing that produces a "we double-charged 0.01% of orders" post-mortem. Every production CDC consumer of MongoDB should leave read concern at majority and pay the few-millisecond latency cost.

Common confusions

Going deeper

How $changeStream is actually a pipeline stage

Internally, db.collection.watch(pipeline) is db.collection.aggregate([{$changeStream: {...}}, ...pipeline]). The $changeStream stage is privileged — it cannot appear anywhere except first, and only certain downstream stages are permitted ($match, $project, $addFields, $replaceRoot, $replaceWith, $redact). The rationale is that arbitrary aggregation stages would let consumers run unbounded server-side computation on a hot path. This is also why you cannot $lookup from a change stream into another collection — the lookup would run per-event on the primary and break it. Cross-collection enrichment must happen consumer-side or in a downstream stream processor.

Pre-images, post-images, and the system.preimages collection (6.0+)

When changeStreamPreAndPostImages: {enabled: true} is set on a collection, MongoDB writes a copy of the prior document state to config.system.preimages for every update or delete. This collection is itself capped (default 24-hour expiry, configurable via expireAfterSeconds). The change stream consumer requesting fullDocumentBeforeChange="required" then receives the historical document state directly. The cost is one extra write per change — typically 30% storage overhead and 15% write latency overhead on update-heavy collections. Most production teams enable pre-images only on the collections they actually CDC, not cluster-wide.

Resume tokens v1 vs v2 and the cluster-time invariant

Pre-4.0.7, resume tokens were v1 — a hash that did not encode cluster time. They worked but couldn't survive certain rollback scenarios cleanly. v2 tokens (4.0.7+) encode the BSON Timestamp, the document's _id, and the namespace UUID. The encoding is documented in the BSON Resume Token spec on the MongoDB documentation site. Critically, v2 tokens are ordered: a consumer can compare two tokens with bsonRepresentation ordering and know which one came first. The driver hides this; tools that do batch-checkpoint or fast-forward use it.

Why Razorpay built their own CDC layer for MongoDB

Razorpay's payments core was on MongoDB for years; they later migrated to a mix. During that period, they ran Debezium for MySQL CDC and a homegrown change-stream consumer for MongoDB. The reason was operational, not technical: Debezium MongoDB pre-1.7 had a snapshot bootstrap path that took the primary read-locks for the duration of the snapshot, which on a 200 GB collection was 4–6 hours of degraded reads. Debezium 1.7 (2021) replaced the lock-based snapshot with parallel chunked reads, but by then most large Indian MongoDB shops had homegrown alternatives. The lesson generalises: CDC tools mature in waves, and the wave that fits your scale arrives a year or two after your problem does.

invalidate events, Atlas Stream Processing, and platform direction

Two adjacent operational notes that keep coming up in production. First, invalidate events: a drop, dropDatabase, or rename against a watched collection emits an invalidate event, after which the cursor closes and the consumer must reopen at a fresh starting point. Why this matters for pipeline design: a drop-and-recreate of a collection (a common backfill mistake at junior-engineer scale) silently invalidates the change stream, the consumer reopens with a fresh resume token, and any rows written between the drop and the reopen are lost. Defensive pattern: subscribe at the database or cluster level rather than the collection level when namespaces might be recreated. Second, MongoDB Atlas — the managed cloud — exposes change streams through Atlas Triggers and Atlas Stream Processing (GA 2024); for Atlas-hosted clusters in 2026 the platform increasingly owns the change stream, processor, and sink end-to-end, while self-hosted clusters still standardise on Debezium MongoDB or Confluent's connector.

Where this leads next

The next chapter, /wiki/debeziums-architecture, pulls back to look at the production wrapper around all three CDC sources covered in this section — Postgres logical decoding, MySQL binlog, and MongoDB change streams — and how Debezium turns them into a single Kafka-shaped abstraction.

Two crosslinks worth reading alongside this:

By the end of Build 11, the three OLTP CDC primitives are first-class. MongoDB is the easiest of them to consume; the harder problem in MongoDB CDC is operational sizing (oplog) and downstream schema (documents are schemaless), not the API.

A practical migration note for teams making this choice today: if your application database is MongoDB and you are weighing "build CDC with change streams" versus "dual-write to Kafka from the application", pick change streams. Dual-write is the textbook anti-pattern — every two-phase commit between an OLTP write and a Kafka publish either loses events on partial failure or produces duplicates that the consumer must dedup anyway. Change streams move the durability boundary inside the database, which is the only place it can correctly live.

References