In short

SQL gives you a declarative query language: you describe the result, the planner picks an execution strategy. MongoDB's aggregation framework gives you a compositional query language: you write the execution strategy directly as a sequence of stages, and a (lighter-weight) optimiser tweaks the order and pushes some operators down into indexes.

A pipeline is a Python list. Each element is a one-key document naming the stage operator: {"$match": {...}}, {"$group": {...}}, {"$sort": {...}}, {"$project": {...}}. Documents flow in at the top, get filtered, reshaped, grouped, joined, and exit at the bottom. The stage zoo replaces nearly every SQL clause:

  • $matchWHERE (filter rows; should be first if possible so indexes apply)
  • $groupGROUP BY plus aggregation functions ($sum, $avg, $min, $max, $push, $addToSet)
  • $sortORDER BY (uses an index if before any reshape; otherwise an in-memory sort capped at 100 MB)
  • $projectSELECT column list, plus computed expressions
  • $lookupLEFT OUTER JOIN into another collection
  • $unwind — flatten an array field into one document per element (no SQL analogue; needed because documents nest)
  • $facet — run several pipelines in parallel on the same input (multi-aggregate dashboards in one round trip)
  • $out / $merge — materialise results into a new or existing collection (think CREATE TABLE AS / INSERT ON CONFLICT)

This chapter walks each stage with a real Indian e-commerce orders collection, builds the canonical "top 5 products by revenue in each state in the last quarter" query end-to-end in pymongo, and then unpacks the optimiser: what it reorders, what it can index, and where the 100 MB in-memory cap will silently bite you on production-scale data. By the end you will know when the aggregation pipeline is the right tool, when to lean on $function for custom logic, and when to give up and pipe the data into Spark or Trino.

The thesis: pipelines instead of plans

In a relational database the journey from query to result has two distinct halves. You write a SELECT statement that says what you want. The planner — a thousand-page optimiser with cost models, statistics, and cardinality estimates — decides how to compute it: which join algorithm, which scan, which order, which indexes. You never see the plan unless you ask for EXPLAIN.

MongoDB takes the other end of the trade. You write the plan yourself, as a sequence of stages. The optimiser is small — it reorders a handful of safe transformations and pushes filters down to use indexes — but the heavy lifting of "what algorithm runs at this stage" is your job. Why this design: MongoDB's documents are arbitrary trees, and modelling tree-shaped data in a cost-based planner is genuinely hard — the cardinality estimates that work for rectangular tables (histograms over column values) do not generalise to "how many array elements does the average items field have". The aggregation framework punts that decision back to the developer, who knows their data.

A pipeline is a list of stage documents. Each stage is a single-key map whose key is the operator name (always prefixed with $) and whose value is the operator's parameters. The metaphor is exactly Unix pipes: cat orders.json | grep '2024-' | awk '{sum+=$3} END {print sum}' becomes db.orders.aggregate([{"$match": {...}}, {"$group": {...}}]).

Documents flow through stages — each transforms the streamorderscollection~10M docsmatch</text><text x="210" y="145" text-anchor="middle" font-size="9">filter</text><text x="210" y="160" text-anchor="middle" font-size="9" fill="#374151">date in Q1</text><circle cx="210" cy="180" r="4" fill="#0369a1"/><circle cx="210" cy="190" r="4" fill="#0369a1"/><text x="210" y="220" text-anchor="middle" font-size="9" fill="#6b7280">~2M survive</text><path d="M265 150 L300 150" stroke="#374151" stroke-width="1.5" marker-end="url(#arrow140)"/><rect x="300" y="100" width="100" height="100" fill="#dcfce7" stroke="#15803d" rx="4"/><text x="350" y="125" text-anchor="middle" font-weight="bold">groupaggregateby state~28 groupssort</text><text x="490" y="145" text-anchor="middle" font-size="9">order</text><text x="490" y="160" text-anchor="middle" font-size="9" fill="#374151">by revenue</text><text x="490" y="180" text-anchor="middle" font-size="11">↓</text><text x="490" y="220" text-anchor="middle" font-size="9" fill="#6b7280">28 ordered</text><path d="M545 150 L580 150" stroke="#374151" stroke-width="1.5" marker-end="url(#arrow140)"/><rect x="580" y="100" width="80" height="100" fill="#e9d5ff" stroke="#7e22ce" rx="4"/><text x="620" y="125" text-anchor="middle" font-weight="bold">projectreshapename+rev→ result28 finalEach stage is independent: you can drop one in, take one out, reorder.The optimiser will silently push match beforegroup when safe — never the other way.

The stream-of-documents abstraction is the single most important thing to internalise. A stage does not see your collection — it sees whatever the previous stage emitted. After $group, the documents no longer have an _id from the original collection; they have whatever _id you defined as the group key. After $project, fields you did not include are gone. After $unwind: "$items", one input document with three array elements has become three output documents. Stages compose because they all speak the same language: documents in, documents out.

The stage zoo

There are around 30 aggregation stages. You will use eight of them constantly. The rest exist for niche needs ($bucket for histograms, $graphLookup for recursive joins, $geoNear for geospatial). Here is the working set.

The stage zoo: seven operators that cover 95% of pipelinesmatch</text><text x="120" y="88" text-anchor="middle">filter documents</text><text x="120" y="103" text-anchor="middle" font-family="monospace" font-size="9">{"match": {state: "KA"}}SQL: WHEREuses indexes if firstgroup</text><text x="340" y="88" text-anchor="middle">aggregate by key</text><text x="340" y="103" text-anchor="middle" font-family="monospace" font-size="9">_id +sum/avg/pushSQL: GROUP BY100 MB in-memory capsort</text><text x="560" y="88" text-anchor="middle">order documents</text><text x="560" y="103" text-anchor="middle" font-family="monospace" font-size="9">{"sort": {revenue: -1}}SQL: ORDER BYindexed if at startproject</text><text x="120" y="193" text-anchor="middle">reshape fields</text><text x="120" y="208" text-anchor="middle" font-family="monospace" font-size="9">include/exclude/compute</text><text x="120" y="225" text-anchor="middle" font-style="italic" fill="#6b7280">SQL: SELECT col list</text><text x="120" y="239" text-anchor="middle" font-size="9" fill="#7e22ce">drops unused fields</text><rect x="240" y="155" width="200" height="90" fill="#fecaca" stroke="#b91c1c" rx="4"/><text x="340" y="175" text-anchor="middle" font-weight="bold" font-size="12">lookupjoin another collectionfrom + localField + foreignFieldSQL: LEFT OUTER JOINexpensive — index foreignField!unwind</text><text x="560" y="193" text-anchor="middle">explode arrays</text><text x="560" y="208" text-anchor="middle" font-family="monospace" font-size="9">{"unwind": "items"}</text><text x="560" y="225" text-anchor="middle" font-style="italic" fill="#6b7280">SQL: cross-apply / lateral</text><text x="560" y="239" text-anchor="middle" font-size="9" fill="#c2410c">multiplies doc count by N</text><rect x="140" y="260" width="400" height="90" fill="#cffafe" stroke="#0e7490" rx="4"/><text x="340" y="280" text-anchor="middle" font-weight="bold" font-size="12">facetrun many pipelines in parallel on the same input{"$facet": {"by_state": [...], "by_month": [...], "totals": [...]}}SQL: multiple SELECTs in one round trip — dashboardssingle scan, multiple aggregates — but no further stages can run inside a facet sub-pipeline

$match — the filter

Filtering is done with the same query operators you use for find(). Why this matters: any index that works for find({"shipping.state": "KA"}) also works for {"$match": {"shipping.state": "KA"}}as long as the $match is the first stage of the pipeline. The moment a $group, $project, $unwind or any reshape stage runs ahead of it, the index is gone — there is no longer a "collection" to index, only a transient stream.

{"$match": {"date": {"$gte": datetime(2024, 1, 1), "$lt": datetime(2024, 4, 1)},
            "status": "delivered"}}

$group — the aggregator

Every group has an _id (the grouping key — a field, an expression, or a map of fields for compound keys) and zero or more accumulator expressions. The standard accumulators are:

Accumulator What it does
$sum: <expr> sum of the expression across the group
$avg: <expr> mean
$min, $max extremes
$first, $last first/last value in the group's sort order
$addToSet distinct values as an array
$push all values as an array (with duplicates)
$count: {} row count (since 5.0; equivalent to $sum: 1)

A grouping key can be the literal null if you want a single-bucket grand total: {"$group": {"_id": null, "total": {"$sum": "$amount"}}} is the aggregation way of writing SELECT SUM(amount) FROM orders.

$sort, $project, and friends

$sort takes a map of field-to-direction (1 ascending, -1 descending). It uses an index when it is the first stage and the sort key is a prefix of an index. After any reshape stage it is an in-memory sort, capped at 100 MB unless you pass allowDiskUse=True.

$project is the most flexible stage. It can include or exclude fields ({"name": 1, "_id": 0}), compute new fields from expressions ({"profit": {"$subtract": ["$revenue", "$cost"]}}), and reshape arrays with $slice, $map, $filter, $reduce. Since 4.2 there is also $set (formerly $addFields) which adds fields without dropping existing ones — clearer when you only mean to enrich.

$lookup is the join that the original MongoDB design pretended you would never need. Added in 3.2 to silence "MongoDB has no joins" critiques, it left-outer-joins documents from a target collection on a key. The output adds a new array field on each input document holding all matching target docs. Why it is expensive: for each input document, MongoDB issues a query against the target collection. Without an index on the foreign field, every input doc triggers a full target collection scan — a 100k × 100k cartesian disaster. Always index the foreignField. The 2020 $lookup performance blog walks the SLP-vs-NLJ algorithms MongoDB picks at runtime.

$unwind deserves a careful look. Documents in MongoDB nest — an Order has an items array. SQL has no concept of "explode this column into rows", but $unwind is exactly that: one input doc with items: [a, b, c] becomes three output docs, each with items: a, items: b, items: c (the rest of the document duplicated). It is essential for "compute total revenue per product" when products live inside a per-order array, and it can multiply your document count by 10× or 100× — be careful what you put after it.

$facet runs N sub-pipelines on the same input in parallel and returns a single document with N array fields. It is the dashboard query: "give me total sales, top 10 products, and revenue by state, all from one scan." Each sub-pipeline runs to completion independently; you cannot chain stages after a $facet.

$out and $merge write the pipeline's output to a collection. $out replaces; $merge upserts (with conflict-resolution rules). They are how you build materialised views and ETL targets in MongoDB.

The optimiser: what reorders, what does not

MongoDB's pipeline optimiser is a few hundred lines of rules, not a cost-based planner. It does five useful things:

  1. $match pushdown. A $match that comes after a $project, $addFields, or $lookup is pushed in front whenever the fields it references were not modified by the earlier stage. Why this matters: it is the difference between filtering 10 million documents before an expensive $lookup (cheap) versus after (expensive — you paid the lookup cost on rows you immediately discard). The optimiser does this for you, but only when it can prove safety. If your $match references a field that $project computed, it cannot move.
  2. $sort + $limit collapse. Adjacent $sort followed by $limit is rewritten to a top-K algorithm that holds only K documents in memory rather than sorting the full stream. The savings here are huge — $sort: {revenue: -1}, $limit: 10 over 10M documents drops from "sort 10M and discard 9.999M" to "maintain a 10-element heap".
  3. $match coalesce. Two adjacent $match stages merge into one with $and.
  4. $skip / $limit swaps. $skip after $limit is reordered when safe so the limit applies first.
  5. $lookup + $unwind fusion. When $lookup is immediately followed by $unwind of the joined array, the optimiser fuses them so unmatched documents (which would have been an empty array, then dropped by $unwind) are never produced.

What the optimiser does not do: estimate cardinalities, pick join algorithms, decide between hash-aggregate and sort-aggregate, push selections into $lookup sub-pipelines (since 6.0 it tries harder, but historically not). The MongoDB SIGMOD 2019 paper on the aggregation framework is the authoritative reference for what the engine guarantees and what it punts.

Three optimiser concerns: pushdown, index, memory cap1. match pushdown</text><rect x="20" y="65" width="200" height="35" fill="#fee2e2" stroke="#b91c1c"/><text x="120" y="80" text-anchor="middle" font-size="9">lookup → match (slow)</text><text x="120" y="93" text-anchor="middle" font-size="8" fill="#6b7280">join 10M, then drop 99%</text><text x="120" y="115" text-anchor="middle" font-size="14">↓ optimiser rewrites</text><rect x="20" y="125" width="200" height="35" fill="#dcfce7" stroke="#15803d"/><text x="120" y="140" text-anchor="middle" font-size="9">match → lookup (fast)</text><text x="120" y="153" text-anchor="middle" font-size="8" fill="#6b7280">filter to 100k, then join</text><text x="340" y="55" text-anchor="middle" font-weight="bold" font-size="11">2. Index on first stage</text><rect x="240" y="65" width="200" height="95" fill="#fef3c7" stroke="#a16207" rx="3"/><text x="340" y="83" text-anchor="middle" font-size="9">[match, sort,group]↑ both can use index on{date: 1, state: 1}[project,match, sort]</text><text x="340" y="142" text-anchor="middle" font-size="8" fill="#b91c1c">↑ index gone after reshape</text><text x="340" y="155" text-anchor="middle" font-size="8" fill="#374151">— full collection scan</text><text x="560" y="55" text-anchor="middle" font-weight="bold" font-size="11">3. 100 MB in-memory cap</text><rect x="460" y="65" width="200" height="95" fill="#dbeafe" stroke="#0369a1" rx="3"/><text x="560" y="83" text-anchor="middle" font-size="9">sort, group,bucket— each capped at 100 MBover → QueryExceededMemoryLimitfix:allowDiskUse=TrueAnti-patterns the optimiser cannot fixunwind on a 1000-element array BEFOREmatch — multiplies the stream by 1000Always filter the parent doc first, then unwind only what survives.lookup without an index on foreignField — N×M cross product</text><text x="340" y="283" text-anchor="middle" font-size="9" fill="#6b7280">Index foreignField. The optimiser will not warn you.</text><rect x="20" y="300" width="640" height="35" fill="#fee2e2" stroke="#b91c1c" rx="3"/><text x="340" y="315" text-anchor="middle" font-size="10">sort on a $project-computed field — cannot use an index, in-memory onlyPre-compute the field at write time and index it, or accept the in-memory sort cost.

A real pipeline in pymongo

Let's run a complete aggregation against a realistic Indian e-commerce dataset. The collection orders looks like this for each document:

{
  "_id": ObjectId("..."),
  "order_id": "ORD-2024-0019283",
  "customer_id": "CUST-77123",
  "date": ISODate("2024-02-14T11:23:00Z"),
  "status": "delivered",
  "shipping": {"state": "Karnataka", "city": "Bengaluru", "pincode": "560001"},
  "items": [
    {"sku": "SHO-101", "name": "Bata Power running shoe", "price": 2199, "qty": 1},
    {"sku": "MIX-203", "name": "Bajaj Twister mixer", "price": 4599, "qty": 1}
  ],
  "payment": {"method": "UPI", "amount": 6798}
}

Let's compute a simple but realistic dashboard query first: monthly revenue by region for the last calendar year. This is the bread-and-butter aggregation a finance team runs every month-end.

from datetime import datetime
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017")
db = client["bharat_bazaar"]
pipeline = [
    {"$match": {
        "status": "delivered",
        "date": {"$gte": datetime(2024, 1, 1), "$lt": datetime(2025, 1, 1)},
    }},
    {"$unwind": "$items"},
    {"$group": {
        "_id": {
            "region": "$shipping.state",
            "month": {"$dateTrunc": {"date": "$date", "unit": "month"}},
        },
        "revenue":  {"$sum": {"$multiply": ["$items.price", "$items.qty"]}},
        "orders":   {"$addToSet": "$order_id"},
    }},
    {"$project": {
        "_id": 0,
        "region": "$_id.region",
        "month":  "$_id.month",
        "revenue": 1,
        "order_count": {"$size": "$orders"},
    }},
    {"$sort": {"region": 1, "month": 1}},
]
for row in db.orders.aggregate(pipeline, allowDiskUse=True):
    print(row)

Walk through what happens. Why $match first, before $unwind: filtering at the document level (28M orders → 8M delivered Q1–Q4) is cheap and uses the {date: 1, status: 1} compound index. Unwinding first would balloon 28M documents into 60M item-rows before the filter runs — wasteful by 7×. After $unwind each item becomes its own document; $group then sums revenue per (state, month) and uses $addToSet to count distinct orders (since one order has multiple items, a naive $sum: 1 would double-count). $project flattens the compound _id and renames fields. $sort orders the dashboard for display.

The canonical "top 5 per group" query

Now the harder query, the one every product analytics team eventually writes: top 5 products by revenue in each state, last quarter.

In SQL this requires window functions (ROW_NUMBER() OVER (PARTITION BY state ORDER BY revenue DESC) plus an outer filter). In MongoDB you do it with a clever double-$group pattern.

Top 5 products per state

You are an analyst at Bharat Bazaar. The CFO wants a one-page report: for each Indian state, the top five SKUs by Q1 2024 revenue. The orders collection has 28M documents in it; the production cluster is a 3-shard replica set.

from datetime import datetime
from pymongo import MongoClient

client = MongoClient("mongodb://prod-mongo:27017")
db = client["bharat_bazaar"]

pipeline = [
    # Stage 1: filter to Q1 2024 delivered orders. Uses index on (status, date).
    {"$match": {
        "status": "delivered",
        "date": {"$gte": datetime(2024, 1, 1), "$lt": datetime(2024, 4, 1)},
    }},

    # Stage 2: explode the items array — one document per line item.
    {"$unwind": "$items"},

    # Stage 3: group by (state, product) — sum revenue and count units sold.
    {"$group": {
        "_id": {"state": "$shipping.state", "product": "$items.name"},
        "revenue": {"$sum": {"$multiply": ["$items.price", "$items.qty"]}},
        "units":   {"$sum": "$items.qty"},
    }},

    # Stage 4: sort within each state by revenue descending.
    # The sort key is (state asc, revenue desc) so products in the same state
    # arrive in revenue order — which the next $group's $push will preserve.
    {"$sort": {"_id.state": 1, "revenue": -1}},

    # Stage 5: re-group by state, pushing all products (already sorted by revenue)
    # into an array. $$ROOT means "the whole current document".
    {"$group": {
        "_id": "$_id.state",
        "top_products": {"$push": "$$ROOT"},
    }},

    # Stage 6: keep only the first 5 elements of each state's array.
    {"$project": {
        "_id": 0,
        "state": "$_id",
        "top_products": {"$slice": ["$top_products", 5]},
    }},

    # Stage 7: write the dashboard table to a materialised collection.
    {"$merge": {"into": "dashboard_top_products_q1_2024",
                "whenMatched": "replace",
                "whenNotMatched": "insert"}},
]

db.orders.aggregate(pipeline, allowDiskUse=True)
print(db.dashboard_top_products_q1_2024.count_documents({}))

What each stage emits:

Stage Input docs Output docs What changed
$match 28,000,000 6,400,000 dropped non-delivered + non-Q1
$unwind 6.4M 21,000,000 each order's items exploded into rows
$group (state, product) 21M 95,000 summed revenue per (state, SKU)
$sort 95k 95k order preserved, now grouped within state
$group (state) 95k 28 one doc per state, products in array
$project (slice 5) 28 28 trimmed to top 5
$merge 28 28 written persisted to materialised view

Why the double-$group works. The first $group aggregates revenue by (state, product). The $sort then orders the entire stream by state, then revenue descending — meaning products for Karnataka all arrive contiguously, in revenue order. The second $group uses $push to collect all products for a state into an array; $push preserves the input order, so the array is already sorted by revenue. $slice then trims to 5.

Index strategy. The query needs three indexes to run in production:

db.orders.create_index([("status", 1), ("date", 1)])              # $match
db.orders.create_index([("shipping.state", 1)])                   # filter / group
# No index helps the post-$unwind aggregation — that's a scan over 21M item-rows.

Memory. The first $group produces 95k buckets — well under 100 MB. The $sort over 95k documents is also fine. The second $group produces 28 docs. We pass allowDiskUse=True defensively in case a future quarter has more variety.

Cost on the real cluster. Roughly 8 seconds end-to-end on a 3-shard cluster with appropriate indexes. Without the (status, date) index it is ~3 minutes. Without the unwind-then-group trick (e.g., trying to use $lookup to join product master), it is much worse.

Run it via cron at 02:00 IST every morning to refresh the dashboard collection. The dashboard front-end then reads from dashboard_top_products_q1_2024 with a simple find() — no aggregation cost at user-facing read time.

The escape hatches: $function and $accumulator

For computations the built-in operators cannot express, MongoDB 4.4 added $function (run arbitrary JavaScript on each document) and $accumulator (custom aggregation with init/accumulate/merge/finalize hooks). Together they make the pipeline Turing-complete: you can compute essentially anything, including stateful accumulations and multi-pass algorithms.

{"$addFields": {
    "is_premium": {"$function": {
        "body": "function(amount, items) { return amount > 5000 && items.length > 2; }",
        "args": ["$payment.amount", "$items"],
        "lang": "js",
    }}
}}

The cost is real. JavaScript runs in an embedded V8 interpreter inside the database server; each call has fixed overhead, no JIT for tight loops, and defeats every optimisation. The optimiser cannot reason about a JS function — it cannot push $match past it, cannot use indexes for fields it might touch. Why use it sparingly: a single $function call per document on a 10M-document pipeline is millions of V8 invocations, each ~30 μs. That is five minutes of pure scripting overhead added to a query that should have run in ten seconds.

The deeper truth is that when you find yourself reaching for $function, you are usually two steps away from "this should run somewhere else". The aggregation pipeline is excellent at the SQL-replacement workload — filter, group, join, project. It is mediocre at multi-step custom logic, and it has no concurrency model, no shuffle, no spill-to-distributed-storage. For real ETL — multiple sources, complex transformations, intermediate state — push the data into Spark or Trino and write the logic in a language that has a real compiler. The aggregation pipeline is for queries, not programs.

When the pipeline is the right tool

A practical rubric:

The historical note worth knowing: before the aggregation framework existed (MongoDB 2.2, 2012), the only way to do these queries was MapReduce — a JavaScript-based, slow, awkward API that everyone hated. The aggregation framework was the response. It was successful enough that MongoDB has since deprecated MapReduce. Couchbase's N1QL took the opposite path — give document databases full SQL, including SELECT/FROM/WHERE/GROUP BY/JOIN — which is more familiar but harder to extend with document-native operators like $unwind. Both paths converge on the same goal: bring SQL-class analytics to JSON data. The aggregation pipeline is MongoDB's bet, and despite its quirks it works at production scale.

Going deeper

Two engineering-grade resources go beyond what this chapter covers. Kyle Banker's Aggregation Framework Internals walks the C++ executor — how stages communicate via DocumentSource interfaces, how the optimiser passes are structured, why $lookup has both nested-loop and hash-join implementations and when each kicks in. The MongoDB SIGMOD 2019 paper formalises the pipeline algebra and proves correctness of the rewrite rules — useful if you ever need to reason about whether a custom optimisation is safe.

The MongoDB aggregation reference is the day-to-day operator catalogue. The $lookup performance blog is essential reading before you ship any pipeline that joins two collections of meaningful size. Comparison pieces like "MongoDB vs SQL Joins" frame the design trade-offs from the relational angle, which is useful when a teammate raised on Postgres asks why your $lookup is the way it is.

References

  1. MongoDB Aggregation Pipeline Reference — the canonical operator and stage catalogue.
  2. Kyle Banker — MongoDB in Action / Aggregation Framework Internals chapter.
  3. MongoDB Engineering — "Tunable Consistency in MongoDB" SIGMOD 2019 (companion paper covers the aggregation engine).
  4. MongoDB Engineering Blog — $lookup performance and join algorithms.
  5. "MongoDB vs SQL: Joins, Indexes, and Aggregations Compared" — relational-to-document mapping cheat sheet.
  6. Couchbase N1QL: SQL for JSON — the alternative design that gave document databases full SQL semantics.