MongoDB change streams
MySQL forces your CDC consumer to register itself as a replica. Postgres makes you create a logical replication slot and parse a custom protocol. MongoDB took a third path: every replica set already maintains an internal log of every write — the oplog — and db.collection.watch() exposes that log as a resumable, filterable cursor over the regular MongoDB driver. No replication handshake, no slot bookkeeping, no TCP-level pretend-to-be-a-replica. This chapter shows what's underneath that one method call, what the resume token actually points at, and where the abstraction leaks under load.
A MongoDB change stream is a tailing cursor over the replica set oplog, exposed through any modern MongoDB driver as watch(). Each event carries a resume token — an opaque, ordered cursor position — that lets a consumer crash and resume without losing or duplicating events. Change streams are the easiest CDC primitive of the three big OLTP databases to consume, but the easy ergonomics hide three operational traps: oplog retention, fan-out cost, and the fact that pre-images and post-images are both opt-in.
What the oplog actually is
A MongoDB replica set always has a primary and one or more secondaries. Every write that commits on the primary is recorded as a document in the local.oplog.rs collection — a special capped collection that is the same size on every member and that all secondaries tail to apply writes in order. The oplog is the durability and replication mechanism, and like the MySQL binlog, it accidentally became the foundation of an entire CDC industry.
An oplog document looks like this:
{
"ts": Timestamp(1714032501, 4),
"t": NumberLong(7),
"h": NumberLong("-9183782441238"),
"v": 2,
"op": "u",
"ns": "swiggy.orders",
"ui": UUID("1f4a..."),
"o2": { "_id": ObjectId("66242a91...") },
"o": { "$v": 2, "diff": { "u": { "status": "delivered" } } },
"wall": ISODate("2026-04-25T10:48:21.412Z")
}
The fields that matter for CDC:
ts— the BSONTimestamp(seconds-since-epoch + ordinal). Strictly monotonic across the replica set.op— the operation code:i(insert),u(update),d(delete),c(command — DDL likecreateCollection),n(no-op, used for heartbeats).ns— the namespace, formatted as<database>.<collection>.o2— the filter for the original update/delete (typically{_id: ...}).o— the operation payload. For inserts, the full document. For updates, since MongoDB 5.0, a structured$v: 2diff ({u: {...}}for set,{d: {...}}for unset,{i: {...}}for inserts into arrays). Older formats are compact$set/$unsetoperators.wall— wall-clock time of the write on the primary, for human-readable timelines.
A consumer can technically read the oplog directly — open a tailable cursor on local.oplog.rs, filter on the namespaces it cares about, and parse the documents. This is what every MongoDB CDC tool did before MongoDB 3.6 (October 2017), and it works, but it forces the consumer to handle three concerns that don't belong in application code: oplog format changes between versions, secondary failover (the consumer must re-resolve which member to tail), and resumption semantics across rollbacks. Why MongoDB built watch() on top: the abstraction hides all three. The driver tracks resume tokens, the server handles failover transparently, and oplog format changes ship as silent driver upgrades. The consumer only sees a stable stream of typed change events.
The change event your application receives — operationType, ns, documentKey, updateDescription, _id (the resume token), and optionally fullDocument / fullDocumentBeforeChange — is a server-side projection of the underlying oplog entry. The server runs an aggregation pipeline starting with the special $changeStream stage, optionally followed by $match, $project, and so on. Why this matters operationally: filtering happens server-side. A consumer that only cares about swiggy.orders events on a cluster doing 100k writes/sec across hundreds of collections does not parse 100k events per second client-side — it sends an aggregation pipeline with $match: {"ns.coll": "orders"} and the server filters before transmitting. This is the single biggest performance lever for MongoDB CDC at scale.
Resume tokens — the cursor that survives crashes
Every change event carries a _id field whose value is the resume token — an opaque BSON document the consumer treats as cursor position. Modern resume tokens (since MongoDB 4.2) are version-2 tokens that encode the cluster time, the document's _id, the operation type, and a UUID for the collection. They serialise to base64 strings of ~80–120 bytes:
"82663E2A91000000012B022C0100296E5A1004D8E51A..."
The consumer never inspects the bytes. It only needs three operations: receive a token with each event, persist the most recent durably-processed token, and pass that token back as resumeAfter (or startAfter) on reconnection. The server then resumes from the next event after the saved token, regardless of which oplog file the data physically lives in or whether the original primary still exists.
The key correctness property: resume tokens are durable across replica-set elections. If the primary dies mid-stream, the new primary's oplog includes all committed writes from the old primary (that's the definition of replica-set commit). The consumer reconnects, passes its saved token, and the new primary resumes the stream. No file+offset gymnastics, no "did this transaction make it across the failover" worry. Why this is operationally easier than MySQL binlog: MySQL CDC consumers using file+position-based resumption must re-resolve their position against the new primary's binlog after failover (different filename, possibly different offset). GTID-based MySQL CDC and MongoDB resume tokens both solve this by replacing physical positions with logical ones.
There is a sharp limit, though: a resume token is only useful while the underlying oplog entry it references still exists. If the consumer falls behind and the oplog has rotated past that entry, the server returns a ChangeStreamHistoryLost error (code 286) and the consumer must rebootstrap from a fresh snapshot. This is the MongoDB equivalent of "MySQL purged your binlog file" — same problem, different error code.
Build it: a Python change-stream consumer
The official pymongo driver speaks change streams natively. The example below subscribes to swiggy.orders, prints events as they arrive, and persists the resume token to a local file so a restart picks up where it left off — the exact pattern a small CDC pipeline at a Bengaluru food-delivery startup would have run before they switched to Debezium MongoDB.
import json, os, signal, sys, time
from pathlib import Path
from pymongo import MongoClient
from pymongo.errors import PyMongoError
# Connect to a replica set (change streams REQUIRE a replica set;
# standalone mongod won't work). The replicaSet name comes from rs.conf().
URI = "mongodb://cdc_reader:dev@127.0.0.1:27017/?replicaSet=rs0"
TOKEN_PATH = Path("/var/lib/swiggy-cdc/last_token.json")
client = MongoClient(URI)
coll = client["swiggy"]["orders"]
# Server-side filter pipeline. Anything in the pipeline runs on the
# primary; the consumer only sees what survives the $match.
pipeline = [
{"$match": {
"operationType": {"$in": ["insert", "update", "delete", "replace"]},
# ignore writes from the bootstrap loader (a service account)
"lsid.uid": {"$ne": "bootstrap"},
}},
]
# Resume from saved token if we have one.
resume = None
if TOKEN_PATH.exists():
resume = json.loads(TOKEN_PATH.read_text())
print(f"resuming from saved token at {TOKEN_PATH}", file=sys.stderr)
events_seen = 0
last_token = None
try:
with coll.watch(
pipeline=pipeline,
full_document="updateLookup", # fetch post-image
full_document_before_change="whenAvailable", # pre-image if configured
resume_after=resume,
max_await_time_ms=1000, # heartbeat cadence
) as stream:
for ev in stream:
op = ev["operationType"]
doc_id = ev.get("documentKey", {}).get("_id")
if op == "update":
changed = ev.get("updateDescription", {}).get("updatedFields", {})
summary = f"changed_fields={list(changed.keys())[:5]}"
elif op in ("insert", "replace"):
fd = ev.get("fullDocument", {})
summary = f"merchant={fd.get('merchant_id')} amount={fd.get('amount')}"
else:
summary = ""
print(f" {op:<8} swiggy.orders _id={doc_id} {summary}")
last_token = ev["_id"]
events_seen += 1
# Checkpoint every 100 events; production checkpoints on sink commit.
if events_seen % 100 == 0:
TOKEN_PATH.parent.mkdir(parents=True, exist_ok=True)
TOKEN_PATH.write_text(json.dumps(last_token, default=str))
except PyMongoError as e:
print(f"stream error: {e!r}", file=sys.stderr)
finally:
if last_token is not None:
TOKEN_PATH.write_text(json.dumps(last_token, default=str))
print(f"closed; processed {events_seen} events; last_token persisted")
A representative run, after a separate session creates an order, marks it paid, and cancels it:
insert swiggy.orders _id=ObjectId('66242a91d3b21') merchant=swiggy-bengaluru-koramangala amount=499
update swiggy.orders _id=ObjectId('66242a91d3b21') changed_fields=['status', 'paid_at']
update swiggy.orders _id=ObjectId('66242a91d3b21') changed_fields=['status', 'cancelled_at']
^C
closed; processed 3 events; last_token persisted
A walkthrough of the lines that matter:
replicaSet=rs0in the URI — change streams require a replica set, even a single-node one (rs.initiate()against one mongod is enough for development). A standalone mongod has no oplog andwatch()raisesOperationFailure. Production clusters are always replica sets, but the failure mode bites in dev.pipeline=[{"$match": ...}]— server-side filtering. The match runs on the primary; the wire payload is reduced before transmission. Without this, the consumer receives every event inswiggy.ordersregardless of what it cares about. On a hot collection this is the difference between 100 events/sec and 100k events/sec on the consumer's network interface.full_document="updateLookup"— by default anupdateevent only carries the changed fields, not the full document.updateLookupperforms a follow-up read of the document by_idafter the update. Why this is not a free lunch: the lookup is a separate read on the primary, performed at the consumer's request, and the document state at lookup time may already differ from the document state at the original update — a concurrent later write could have intervened. For exactly-correct historical replay you wantfull_document_before_changeconfigured per-collection (pre-images, MongoDB 6.0+) or you accept the non-strict semantics ofupdateLookup.max_await_time_ms=1000— how long the server holds the cursor open before returning a no-data response (a heartbeat). Lower values mean tighter latency but more CPU on the primary; 1000 ms is the standard production setting. The driver loops automatically.- Token persistence — checkpoint cadence here is "every 100 events". Production CDC pipelines checkpoint after the sink durably commits the batch — checkpointing before sink commit risks losing events on consumer crash; checkpointing after every event maximises retransmissions on crash. The trade-off is the same as in every checkpointing system; see /wiki/checkpointing-the-consistent-snapshot-algorithm.
updateDescription.updatedFields— only contains fields that actually changed. If your sink expects a full row image on every update, you needfullDocument="updateLookup"(current state) or pre-images-and-post-images (historical state). Many MongoDB CDC bugs in the wild come from sinks that assume full documents but consumers that subscribed withoutupdateLookup.
The code is ~50 lines. The Debezium MongoDB connector is ~30,000 lines. Most of the additional code handles snapshot bootstrap, schema inference (MongoDB documents are schemaless on the wire), Kafka transactional sink, and the ChangeStreamHistoryLost recovery path.
Three production traps
The change-streams API is genuinely easier than binlog or logical decoding. The traps that page on-call at Indian shops are not in the API — they are in the configuration around it.
1. Oplog too small for the lag. The oplog is sized at replica-set creation: oplogSize config, default 5% of free disk up to 50 GB. On a write-heavy cluster, 5% is often 30 minutes of writes. If the consumer falls behind by more than the oplog window — restart, network outage, slow sink — the server returns ChangeStreamHistoryLost and the consumer must do a fresh snapshot. Production setting at most Bengaluru shops: oplog sized to 24–72 hours of write traffic, monitored via db.getReplicationInfo().oplogSizeMB against db.getReplicationInfo().tFirst and tLast. A consumer 2 hours behind on a cluster with a 6-hour oplog is fine; 5 hours behind is an active incident.
2. Pre-images and post-images are off by default. Until MongoDB 6.0, change streams could not deliver a deterministic before-image of an updated document — you got updatedFields only, and fullDocument="updateLookup" raced concurrent writes. Since 6.0, per-collection changeStreamPreAndPostImages flips this on, but it costs disk: every write maintains pre/post images in a separate system.preimages collection that is itself capped. Why this matters for downstream joins: a sink that needs the full prior state of a document — for example, a CDC pipeline writing into a Snowflake table that maintains slowly-changing-dimension history — cannot use updateLookup correctness-wise. It needs pre-images, which means it needs the cluster admin to enable them per collection, which means it needs a process for that.
3. Sharded clusters route through mongos, not the primary. On a sharded cluster, watch() against a sharded collection routes through mongos, which fans out the change stream across all shards and merges the resulting events in cluster-time order. The merge is correct but pays for a heartbeat round-trip per shard per heartbeat cycle. A 12-shard cluster with 1-second heartbeats sends 12 round-trips per second per consumer. Several CDC platforms — Debezium MongoDB included — recommend running one consumer per shard for high-throughput cases, even though the merge is the "easy" mode, because the per-shard topology is cheaper.
A monitoring snippet for oplog lag against the primary's current position:
// Run on the primary. Compare against the consumer's last-processed token.
db.getSiblingDB("local").runCommand({
collStats: "oplog.rs", scale: 1024
});
// Approximate oplog window in seconds.
const info = rs.printReplicationInfo();
// "configured oplog size: ... log length start to end: 21600secs (6.0hrs)"
A consumer 5 minutes behind is fine. A consumer 2 hours behind on a 6-hour oplog needs investigation. A consumer approaching the oplog window is a paged incident.
A fourth, subtler trap belongs in the same family: majority read concern is the only safe one for CDC. By default, change streams emit only events whose underlying writes have been acknowledged by a majority of the replica set — equivalent to "this write will not be rolled back". A consumer that switches to local read concern to reduce latency starts seeing events for writes that may later be rolled back if the primary fails before majority acknowledgement, producing duplicate or contradictory events downstream. The rollback window is small (typically milliseconds) but non-zero, and the failure mode at Indian-scale during failover is exactly the kind of thing that produces a "we double-charged 0.01% of orders" post-mortem. Every production CDC consumer of MongoDB should leave read concern at majority and pay the few-millisecond latency cost.
Common confusions
- "MongoDB change streams are the same as the oplog." They are layered. The oplog is the underlying capped collection. Change streams are a server-side aggregation pipeline that projects oplog entries into a stable, resumable, filterable consumer-friendly shape. You can read the oplog directly — pre-3.6 tools did — but you lose resume-token semantics, server-side filter pushdown, and version stability.
- "
fullDocument='updateLookup'gives me the document as it was at the update time." No. It gives you the document as it is now, at the time the lookup runs. A concurrent later write that happens before your lookup is reflected. For historical correctness, usechangeStreamPreAndPostImages(6.0+) andfullDocumentBeforeChange='whenAvailable'. - "Change streams work on a standalone mongod for development." They do not. The change stream API requires a replica set, because the resume token semantics depend on the oplog.
rs.initiate()on a single-node replica set is the standard development setup. - "A resume token guarantees no duplicate delivery." It guarantees no missed events. Duplicates are still possible if the consumer crashes between sink commit and token persistence. Idempotent sinks keyed on
(ns, documentKey, clusterTime)or on_id(the resume token itself) are still required, same as MySQL and Postgres CDC. - "
watch()on a database or cluster is just sugar for many collection watches." Almost.db.watch()opens a single change stream over all collections in the database;client.watch()opens one over the entire cluster. The server merges events from many namespaces into one ordered stream. The advantage is one cursor instead of N; the disadvantage is no per-collection isolation, so a runaway noisy collection slows the whole consumer. - "MongoDB transactions don't show up specially in change streams." They do — multi-document transaction events carry
txnNumberandlsid(logical session id). A correct consumer that needs to apply transactions atomically downstream buffers events with the samelsidand only flushes after the transaction's last event arrives, similar to MySQL'sXID_EVENTboundary.
Going deeper
How $changeStream is actually a pipeline stage
Internally, db.collection.watch(pipeline) is db.collection.aggregate([{$changeStream: {...}}, ...pipeline]). The $changeStream stage is privileged — it cannot appear anywhere except first, and only certain downstream stages are permitted ($match, $project, $addFields, $replaceRoot, $replaceWith, $redact). The rationale is that arbitrary aggregation stages would let consumers run unbounded server-side computation on a hot path. This is also why you cannot $lookup from a change stream into another collection — the lookup would run per-event on the primary and break it. Cross-collection enrichment must happen consumer-side or in a downstream stream processor.
Pre-images, post-images, and the system.preimages collection (6.0+)
When changeStreamPreAndPostImages: {enabled: true} is set on a collection, MongoDB writes a copy of the prior document state to config.system.preimages for every update or delete. This collection is itself capped (default 24-hour expiry, configurable via expireAfterSeconds). The change stream consumer requesting fullDocumentBeforeChange="required" then receives the historical document state directly. The cost is one extra write per change — typically 30% storage overhead and 15% write latency overhead on update-heavy collections. Most production teams enable pre-images only on the collections they actually CDC, not cluster-wide.
Resume tokens v1 vs v2 and the cluster-time invariant
Pre-4.0.7, resume tokens were v1 — a hash that did not encode cluster time. They worked but couldn't survive certain rollback scenarios cleanly. v2 tokens (4.0.7+) encode the BSON Timestamp, the document's _id, and the namespace UUID. The encoding is documented in the BSON Resume Token spec on the MongoDB documentation site. Critically, v2 tokens are ordered: a consumer can compare two tokens with bsonRepresentation ordering and know which one came first. The driver hides this; tools that do batch-checkpoint or fast-forward use it.
Why Razorpay built their own CDC layer for MongoDB
Razorpay's payments core was on MongoDB for years; they later migrated to a mix. During that period, they ran Debezium for MySQL CDC and a homegrown change-stream consumer for MongoDB. The reason was operational, not technical: Debezium MongoDB pre-1.7 had a snapshot bootstrap path that took the primary read-locks for the duration of the snapshot, which on a 200 GB collection was 4–6 hours of degraded reads. Debezium 1.7 (2021) replaced the lock-based snapshot with parallel chunked reads, but by then most large Indian MongoDB shops had homegrown alternatives. The lesson generalises: CDC tools mature in waves, and the wave that fits your scale arrives a year or two after your problem does.
invalidate events, Atlas Stream Processing, and platform direction
Two adjacent operational notes that keep coming up in production. First, invalidate events: a drop, dropDatabase, or rename against a watched collection emits an invalidate event, after which the cursor closes and the consumer must reopen at a fresh starting point. Why this matters for pipeline design: a drop-and-recreate of a collection (a common backfill mistake at junior-engineer scale) silently invalidates the change stream, the consumer reopens with a fresh resume token, and any rows written between the drop and the reopen are lost. Defensive pattern: subscribe at the database or cluster level rather than the collection level when namespaces might be recreated. Second, MongoDB Atlas — the managed cloud — exposes change streams through Atlas Triggers and Atlas Stream Processing (GA 2024); for Atlas-hosted clusters in 2026 the platform increasingly owns the change stream, processor, and sink end-to-end, while self-hosted clusters still standardise on Debezium MongoDB or Confluent's connector.
Where this leads next
The next chapter, /wiki/debeziums-architecture, pulls back to look at the production wrapper around all three CDC sources covered in this section — Postgres logical decoding, MySQL binlog, and MongoDB change streams — and how Debezium turns them into a single Kafka-shaped abstraction.
Two crosslinks worth reading alongside this:
- /wiki/postgres-logical-decoding-from-scratch — same problem, different protocol; Postgres has a slot model that MongoDB doesn't need because MongoDB's replication is already document-shaped.
- /wiki/mysql-binlog-format-and-replication-protocol — the MySQL counterpart; the contrast in API ergonomics is striking and worth absorbing before the next chapter.
By the end of Build 11, the three OLTP CDC primitives are first-class. MongoDB is the easiest of them to consume; the harder problem in MongoDB CDC is operational sizing (oplog) and downstream schema (documents are schemaless), not the API.
A practical migration note for teams making this choice today: if your application database is MongoDB and you are weighing "build CDC with change streams" versus "dual-write to Kafka from the application", pick change streams. Dual-write is the textbook anti-pattern — every two-phase commit between an OLTP write and a Kafka publish either loses events on partial failure or produces duplicates that the consumer must dedup anyway. Change streams move the durability boundary inside the database, which is the only place it can correctly live.
References
- MongoDB manual: Change Streams — the canonical reference for
watch(), resume tokens, pre/post images, and aggregation pipeline restrictions. - MongoDB manual: Replica Set Oplog — what the oplog actually stores, how it's sized, how to monitor lag.
- BSON Resume Token specification — the authoritative description of v1 and v2 resume tokens.
- Debezium MongoDB connector documentation — the production wrapper: snapshot bootstrap, Kafka semantics, schema inference for documents.
- MongoDB engineering blog: change streams internals (2017–2023) — design rationale, especially the post-3.6 evolution to v2 resume tokens and pre-images.
- Atlas Stream Processing announcement (2024) — the managed CDC + stream-processing direction for cloud-hosted MongoDB.
- /wiki/postgres-logical-decoding-from-scratch — the Postgres CDC counterpart; pair with this chapter when comparing source databases.
- /wiki/mysql-binlog-format-and-replication-protocol — the MySQL CDC counterpart; the three together are the OLTP CDC story.
- Kreps, "The Log: What every software engineer should know about real-time data's unifying abstraction" (2013) — the conceptual frame for treating database write logs as the foundational primitive of streaming.
- /wiki/idempotent-producers-and-sequence-numbers — why every CDC consumer needs idempotent downstream writes regardless of source.