In short

MPP (Massively Parallel Processing) is the dominant architecture for analytical databases. A query is split into stages; each stage runs in parallel across N worker nodes; stages are connected by exchanges that move rows across the network. The cheap stages — scan, filter, partial aggregation — happen entirely on local CPU and local storage and scale linearly with worker count. The expensive stage — the shuffle — re-partitions rows by some key (the GROUP BY column, the JOIN key) so that all rows with the same key land on the same worker, allowing the next stage to finish a global computation locally. A 100 GB shuffle across 10 workers on a 10 Gbps network takes ~80 seconds of pure network time, no matter how fast your CPUs are; queries with three shuffles take three times that. Every meaningful MPP optimisation — partial aggregation before the shuffle, broadcasting small tables instead of shuffling them, co-located joins on tables already partitioned by the join key, skew handling via salting — exists to reduce shuffle bytes. The coordinator (one per cluster in Trino and Snowflake, one driver-per-application in Spark) plans the stages and orchestrates the exchanges; workers do the actual work. This chapter builds a tiny four-worker MPP in Python, runs SELECT region, SUM(revenue) FROM sales GROUP BY region through it twice — once with naive shuffling and once with partial aggregation — and shows the 1000x byte reduction that turns an unusable plan into a fast one. The same arithmetic decides whether your Spark job finishes in 20 seconds or 20 minutes.

The previous chapter looked at one operator — the join — and how distributed engines pick between broadcast and shuffle for it. This chapter zooms out to the whole engine. Every analytical query — GROUP BY, JOIN, ORDER BY, window functions, deduplication — runs through the same skeleton: scan locally, do what you can locally, exchange across the network when global state is required, finish locally. The exchanges are the shuffles. Understanding how MPP composes scan-and-shuffle stages, and what each shuffle costs in real wall-clock time, is what separates writing SQL from engineering it.

What MPP is, and what it is not

MPP is a deliberate architectural choice: instead of one big server with one big disk and one big SQL engine, you have N worker nodes — typically 4 to 1000 — each running an identical instance of the engine, each owning a slice of the data. A coordinator (or driver) parses incoming SQL, plans a parallel execution graph, and dispatches stages of work to all workers in lock-step. Workers do scan and CPU-bound work locally on their slice; when a stage needs data that lives on another worker, the engine inserts an exchange (the shuffle) that moves rows across the network. The next stage runs locally again on the re-partitioned data.

Three properties matter:

MPP is not the same as just "running a database on many machines." A clustered Postgres or a sharded MySQL is partitioned, but each shard runs full queries independently — there is no parallel single-query plan. MPP runs one query across many machines simultaneously, and that single query can move data between machines mid-execution.

MPP cluster layout: coordinator and N worker nodes, each with local CPU and local storageA coordinator at the top connects via control-plane lines to four worker nodes below. Each worker node is drawn as a box containing a CPU icon, a RAM region, and a local-storage cylinder. The original sales table is shown partitioned across the workers, with each worker holding a quarter of the rows.MPP cluster: shared-nothing workers + a coordinatorcoordinatorparses, plans, dispatchescontrol plane (small)worker 116 cores / 64 GB RAMsales[0..25%]25M rows localworker 216 cores / 64 GB RAMsales[25..50%]25M rows localworker 316 cores / 64 GB RAMsales[50..75%]25M rows localworker 416 cores / 64 GB RAMsales[75..100%]25M rows localEach worker scans only its slice. Coordinator never sees data — only metadata, plan, and final results.In some designs (BigQuery's Dremel, modern Spark) the coordinator role is itself distributed.
An MPP cluster. The coordinator parses SQL and dispatches a parallel plan; each worker owns a slice of every table on local storage and runs the same operator code in parallel on its slice. The control-plane traffic between coordinator and workers is small — kilobytes of plan and metadata. The expensive traffic is between workers, during shuffles, in the next diagram.

Why shared-nothing is the foundation: if two workers shared a disk, every write would contend on a global lock manager and every scan would compete for the same disk bandwidth. By giving each worker exclusive ownership of its slice, you make scaling linear — 10 workers means 10x the disk bandwidth, 10x the RAM, 10x the cores, with zero coordination overhead during the scan phase. The price is paid only at the exchange boundaries, where data has to move.

The stages of an MPP query

A query goes through five stages, in order:

  1. Plan. The coordinator parses the SQL, runs the cost-based optimiser, and produces a parallel plan: a DAG of stages connected by exchanges. Each stage is a tree of operators (scan → filter → partial aggregation → exchange → final aggregation, for example). The plan is shipped to every worker as a small structured message.

  2. Scan stage. Each worker reads its local partition of the input table from local disk or object storage. This is fully parallel — no network — and saturates the cluster's aggregate disk bandwidth. For a 100 GB table on a 100-worker cluster, each worker reads 1 GB; the total scan finishes as fast as one worker reads 1 GB.

  3. Local aggregation / filter. Each worker applies whatever can be applied locally without seeing rows from other workers: predicates (WHERE), projections, partial aggregates (running a SUM per local group). This stage adds CPU work but does not move bytes between workers.

  4. Exchange stage (shuffle). The result of the local stage is re-partitioned by some key — typically the GROUP BY column or the JOIN key. Each worker hashes each row's key, computes dst = hash(key) mod N, and ships the row to worker dst. This is the network-bound stage. Wall-clock cost = (total bytes shipped) / (effective cross-worker bandwidth).

  5. Final aggregation / coordinator collect. Each worker now has every row with its assigned keys. It finishes the aggregation locally — combining the partial sums it received with its own, producing one final row per key it owns. The coordinator then gathers the (now small) per-worker results and returns them to the client.

The Stonebraker [2] argument against MapReduce was specifically that it forced every operator to go through this five-stage cycle, even when the data structure already supported faster paths. MPP databases can skip the shuffle entirely when the data is already partitioned correctly (co-located joins) or replace it with a broadcast when one side is small. MapReduce had no plan-time optimiser; MPP databases live and die by it.

The shuffle stage: workers re-partition local data by key and ship rows to destination workersFour worker nodes are shown in two columns. The left column shows each worker holding mixed-key local rows after a scan. Each worker hashes its rows and sends them to the destination worker by key. Arrows fan out from each source worker to all destination workers, representing the all-to-all network traffic. The right column shows each destination worker now holding only its assigned key bucket.Shuffle: re-partition by key across the networkbefore shuffle (mixed keys per worker)after shuffle (one key bucket per worker)worker 1scan output: KA, MH, TN, KA, DL ...worker 2scan output: MH, TN, MH, KA, GJ ...worker 3scan output: TN, DL, KA, GJ, MH ...worker 4scan output: GJ, KA, DL, MH, TN ...hash(region) mod 4 -> destination workerworker 1: bucket 0 (KA)all KA rows from clusterworker 2: bucket 1 (MH, GJ)all MH, GJ rows from clusterworker 3: bucket 2 (TN)all TN rows from clusterworker 4: bucket 3 (DL)all DL rows from clusterNetwork cost = total rows shipped × row widthWall-clock cost = bytes / cross-rack bandwidth (often the bottleneck)By construction, every row with the same key now lives on the same worker. The next stage (final aggregation) runs locally.Cross-traffic is N-to-N (all-to-all). Bandwidth contention is the real-world bottleneck, not raw link speed.
The shuffle stage. Each worker computes hash(key) mod N for every row in its local scan output and ships the row to the destination worker that owns that hash bucket. After the shuffle, each worker holds all rows with its assigned keys and can finish the aggregation or join locally. The arrows form an all-to-all pattern — every worker talks to every other worker — which is what makes the shuffle the dominant cost.

What the shuffle actually costs

The shuffle is the only stage where wall-clock time is bounded by the network rather than by CPU or disk. Order-of-magnitude arithmetic:

This is why every MPP optimisation, without exception, is about reducing shuffle bytes. The cost ladder is fundamental:

The MPP cost ladder: scan is cheap, shuffle is expensive, cross-shuffle joins are devastatingA horizontal ladder with four rungs from cheap to expensive. Rung 1: parallel scan, fully local, microseconds per row. Rung 2: local aggregation, CPU only. Rung 3: shuffle, network bound, seconds for tens of GB. Rung 4: cross-shuffle joins (multiple shuffles), network bound and compounding, minutes.The MPP cost ladder: from cheap (left) to devastating (right)scanparallel, local diskno network~1 GB/s/workerlocal agg / filterCPU only, no networkreduces output 100x+~10 ns/rowshufflenetwork, all-to-allre-partition by key~5 GB/s aggregatecross-shuffle joinsmultiple shuffles chainedN× the costminutes to hoursmicrosecondsmillisecondssecondsminutes-hoursEach rung is roughly 1000× more expensive than the lastEvery MPP optimisation exists to push work down the ladder: do more locally, shuffle less, avoid chained shuffles.A query plan with 5 shuffles can be 100× slower than the same query with 1 shuffle on the same hardware.Reading a query plan = counting the shuffles.
The MPP cost ladder. Scan is microseconds per row, parallel and local. Local aggregation and filter are CPU-bound but still local. The shuffle jumps the cost by three orders of magnitude because it goes through the network. Multiple shuffles in one query — common with joins followed by group-bys followed by sorts — compound to wall-clock times measured in minutes or hours.

Why the ladder is so steep: local CPU operates at clock speeds — billions of operations per second per core. Local disk at 1 GB/s per worker is "slow" by CPU standards but still 100x faster than cross-worker network. The cluster network, while individually high-bandwidth, is shared by N workers all talking at once; effective throughput per worker drops by N. Three orders of magnitude per rung is the right mental model.

The pre-shuffle aggregation trick

The single most important MPP optimisation is partial aggregation before the shuffle. The idea is simple: for any aggregate that is associative and commutativeSUM, COUNT, MIN, MAX, AVG (decomposed into SUM/COUNT) — you can aggregate locally on each worker first, ship only the per-group partial results, and combine them on the destination worker.

Without partial aggregation, a query like SELECT region, SUM(revenue) FROM sales GROUP BY region shuffles every single row of sales across the network — billions of rows. With partial aggregation, each worker reduces its 100M local rows to one row per region (28 Indian states/UTs at most) before the shuffle. Network traffic drops by a factor of millions.

A 1-billion-row aggregation, two ways.

Query: SELECT region, SUM(revenue) FROM sales GROUP BY region — sum revenue per Indian state. Cluster: 10 workers; 1 billion rows in sales, 100M per worker; each row is roughly 50 bytes wide. There are 28 distinct values of region (state codes).

Option A — naive shuffle, no partial aggregation.

  1. Each worker scans its 100M local rows from disk: ~5 GB per worker, parallel, takes ~5 seconds.
  2. Each worker shuffles every scanned row over the network, hash-partitioned by region. 100M rows × 50 B = 5 GB shipped per worker, 50 GB total cluster shuffle.
  3. On a 10-worker cluster with ~5 GB/s effective aggregate cross-worker bandwidth, the shuffle takes 50 / 5 = 10 seconds, plus serialisation overhead — call it 15–20 seconds.
  4. Final aggregation on each receiving worker: combine its bucket of rows into one row per region. Trivial CPU.
  5. Total wall time: ~25 seconds, completely dominated by the shuffle.

Option B — partial aggregation before the shuffle.

  1. Each worker scans its 100M local rows: ~5 seconds, same as before.
  2. Each worker pre-aggregates locally: build a small hash map region -> running_sum, running_count. After scanning all 100M rows, the map has at most 28 entries (one per state present in the local slice).
  3. Each worker ships only its 28-row partial result over the network: 28 × ~50 B = 1.4 KB per worker, 14 KB total cluster shuffle.
  4. The shuffle is essentially instantaneous — 14 KB on a 10 Gbps network is microseconds.
  5. Each receiving worker combines the partial sums: 28 keys × 10 source workers = 280 partial rows, summed in microseconds.
  6. Total wall time: ~5 seconds, dominated by the scan.

The partial-aggregation plan is 5 seconds vs 25 seconds — 5x faster end-to-end, and the shuffle data volume drops by roughly 1000x (50 GB → 14 KB). At higher cardinality (millions of distinct keys), the gap narrows because partial aggregation produces a less-compact intermediate — but for low-cardinality grouping (states, product categories, dates), partial aggregation is the difference between a usable query and an unusable one.

Spark, Trino, Snowflake, BigQuery, and Vertica all do this automatically when the optimiser proves the aggregate is decomposable. You will see it in EXPLAIN output as separate HashAggregate(partial) and HashAggregate(final) (or equivalent) operators with a shuffle exchange between them.

A four-worker Python MPP

Let's build a tiny working MPP that runs the worked example end to end, both ways. Workers are just Python objects with local data and a bytes_shipped counter.

# mpp_demo.py
from __future__ import annotations
from dataclasses import dataclass, field
from collections import defaultdict
import hashlib, random

ROW_BYTES = 50  # nominal width per row for accounting

@dataclass
class Worker:
    wid: int
    local_rows: list = field(default_factory=list)
    received: list = field(default_factory=list)

@dataclass
class Cluster:
    workers: list[Worker]
    bytes_shipped: int = 0
    def send(self, src: int, dst: int, rows):
        rows = list(rows)
        if src != dst:
            self.bytes_shipped += len(rows) * ROW_BYTES
        return rows
    @property
    def n(self): return len(self.workers)

def hash_partition(key, n: int) -> int:
    h = hashlib.md5(str(key).encode()).digest()
    return int.from_bytes(h[:4], "big") % n

Now the two query plans. The naive plan ships every row over the wire; the optimised plan does partial aggregation locally first.

def naive_groupby_sum(cluster: Cluster, key_idx: int, val_idx: int):
    """Ship every row, then aggregate on the receiver."""
    n = cluster.n
    inflight = [[] for _ in range(n)]
    # Scan + shuffle: every row goes over the wire.
    for w in cluster.workers:
        for row in w.local_rows:
            dst = hash_partition(row[key_idx], n)
            inflight[dst].extend(cluster.send(w.wid, dst, [row]))
    # Final aggregate per destination worker.
    results = {}
    for wid, w in enumerate(cluster.workers):
        local = defaultdict(int)
        for row in inflight[wid]:
            local[row[key_idx]] += row[val_idx]
        results.update(local)
    return results, cluster.bytes_shipped

def partial_agg_groupby_sum(cluster: Cluster, key_idx: int, val_idx: int):
    """Pre-aggregate locally, then ship only partial sums."""
    n = cluster.n
    inflight = [[] for _ in range(n)]
    # Scan + LOCAL partial aggregation: collapse to one row per key per worker.
    for w in cluster.workers:
        partial = defaultdict(int)
        for row in w.local_rows:
            partial[row[key_idx]] += row[val_idx]
        # Shuffle only the partial sums (one row per key per source worker).
        for k, v in partial.items():
            dst = hash_partition(k, n)
            inflight[dst].extend(cluster.send(w.wid, dst, [(k, v)]))
    # Final aggregate: combine partial sums per destination worker.
    results = {}
    for wid in range(n):
        final = defaultdict(int)
        for k, v in inflight[wid]:
            final[k] += v
        results.update(final)
    return results, cluster.bytes_shipped

Why this demonstrator captures the real win: the only difference between the two functions is whether each worker collapses its rows by key before sending them. The naive version shuffles N rows; the partial-agg version shuffles at most K rows per worker, where K is the number of distinct keys. When K ≪ N (low-cardinality grouping like states or categories), the byte reduction is exactly the ratio N/K — millions to one in our worked example.

# Drive the comparison.
def make_cluster(n_workers=10, rows_per_worker=100_000, n_regions=28):
    random.seed(0)
    cluster = Cluster([Worker(i) for i in range(n_workers)])
    regions = [f"R{i:02d}" for i in range(n_regions)]
    for w in cluster.workers:
        for _ in range(rows_per_worker):
            w.local_rows.append((random.choice(regions), random.randint(1, 1000)))
    return cluster

c1 = make_cluster()
res1, bytes1 = naive_groupby_sum(c1, key_idx=0, val_idx=1)
print(f"naive:        {len(res1)} groups, {bytes1/1024:.1f} KB shipped")

c2 = make_cluster()
res2, bytes2 = partial_agg_groupby_sum(c2, key_idx=0, val_idx=1)
print(f"partial agg:  {len(res2)} groups, {bytes2/1024:.1f} KB shipped")
print(f"reduction: {bytes1 / max(bytes2, 1):.0f}x")
assert res1 == res2  # identical answers

With 10 workers × 100k rows = 1M total rows over 28 regions, the naive plan ships about 45 MB (every row crosses a worker boundary except the small fraction that hash to its own bucket). The partial-agg plan ships about 12 KB (28 keys × 10 source workers × ~50 B). The byte reduction is roughly 3500x at this scale, and it grows linearly with row count. Run it with 10x more rows per worker and the naive cost grows 10x while the partial-agg cost stays the same.

Optimisations beyond partial aggregation

Partial aggregation is the cheapest and most important shuffle reducer. Real systems layer several more.

Broadcast small tables instead of shuffling. Covered in the previous chapter on join algorithms. If one side of a join is small (under ~100 MB after compression), broadcast it to every worker so the join becomes local on the larger side. This eliminates the shuffle of the larger side entirely — at the cost of |small| × N total bytes broadcast, which is usually much less than |large| shuffled.

Co-located joins. If both tables in a join are already hash-partitioned by the join key (e.g., orders and order_items both partitioned by order_id), no shuffle is needed at all. Each worker's slice of orders joins against its slice of order_items locally. This is sometimes called "bucket join" (Spark) or "partition-wise join" (Vertica, BigQuery). Vertica's [1] projection system was specifically designed to support this — you can pre-partition tables on commonly-joined columns and never shuffle them.

Skew handling. When one key dominates (one merchant has 30% of orders), all that key's rows pile up on one worker, which becomes a straggler. Solutions: salting (append a random suffix to the key, distribute across multiple buckets, replicate the small side); partial broadcast (broadcast just the rows of the small side that match skewed keys, shuffle the rest); adaptive splitting (Spark AQE detects oversized partitions at runtime and splits them). All three trade extra work for balanced execution.

Bloom-filter pushdown. Before shuffling the large side of a join, build a bloom filter from the small side's keys and push it into the large-side scan. Rows that don't match are filtered out before they ever enter the shuffle. For star-schema queries with a selective dimension predicate, this routinely cuts shuffle volume by 90%+.

Column pruning + projection pushdown. Only ship the columns that downstream stages actually need. If your query selects region and revenue, the shuffle should ship those two columns, not all 50 columns of sales. Columnar storage formats (Parquet, ORC) make this near-free at the scan stage; the engine just doesn't read the other columns.

The Spark SQL paper [3] describes how Catalyst composes all these optimisations into a single rewrite-rule framework. The Snowflake paper [4] describes how its execution engine does the same thing internally without exposing knobs to the user.

Coordinator designs: one, none, or many

The coordinator's job is to parse SQL, plan the query, dispatch stages, monitor progress, and return results to the client. How many coordinators a system has is one of the architectural choices that distinguishes engines.

Single coordinator (Trino, Presto, Snowflake's "cloud services" tier). One node parses every query and orchestrates every stage. Workers are stateless from the coordinator's view — they receive plan fragments and execute them. Pros: simple control flow, single source of truth for plan state, easy to reason about. Cons: the coordinator is a single point of failure and a scaling ceiling — at sufficient query rate, the coordinator's CPU becomes the bottleneck. Trino solves this in production by running multiple coordinators and load-balancing queries across them; each query still has one coordinator end-to-end.

Driver-per-application (Spark). Spark has no global coordinator. Each Spark application (job submission) gets its own driver process, which parses the user's code (Scala / Python / SQL), builds the plan, and dispatches tasks to executors. Drivers don't share state across applications; Spark's cluster manager (YARN, Kubernetes, standalone) handles only resource allocation. Pros: no single bottleneck across the whole cluster — many drivers run in parallel for many applications. Cons: each driver is itself a single point of failure for its own application, and inter-application sharing of cached data requires explicit machinery (Spark's SparkSession caching, Tachyon/Alluxio, etc.).

Distributed coordination (BigQuery / Dremel). Dremel's design has a tree of intermediate aggregator nodes between the root coordinator and the leaf workers. Each level of the tree partially aggregates the level below. For deep aggregation queries, this avoids piling all the final-aggregation work on one coordinator. The Microsoft SCOPE paper [5] describes a similar tree-based execution structure for Cosmos.

Hybrid (Snowflake). The cloud services layer (parsing, planning, optimising, security) runs as a stateless multi-tenant service across many nodes; the actual compute runs on per-customer "virtual warehouses" (worker clusters). The coordinator role is split: services handle planning, the warehouse's lead node handles execution coordination. We'll dig into this in the next chapter on storage-compute separation.

The DeWitt and Gray paper [6] from 1992 already laid out these trade-offs — the basic algebra of MPP execution has not changed in three decades; what has changed is the network speed and the deployment substrate (cloud, object storage, containers).

Reading a real query plan

When you run EXPLAIN on a Spark, Trino, or Snowflake query, you are looking at the MPP execution graph. The two patterns to identify:

Exchange operators (Spark calls them Exchange; Trino calls them RemoteExchange; Snowflake hides them under GlobalAggregateNode etc.). Each Exchange is a shuffle. Count them. A query with 5 exchanges is going to be slower than a query with 1, on the same data. If you see an exchange that surprises you — a shuffle the optimiser inserted but you didn't expect — that's where to focus tuning effort.

Partial vs final aggregates. Look for paired operators like HashAggregate(keys=[region], partial=true) followed by an Exchange followed by HashAggregate(keys=[region], partial=false). That is the partial-aggregation optimisation in action. If you only see one HashAggregate with no partial step, the optimiser couldn't decompose the aggregate (e.g., MEDIAN and PERCENTILE_DISC are not decomposable; SUM, COUNT, MIN, MAX, AVG are).

Once you can read these plans, predicting wall-clock time becomes a matter of summing the shuffle byte estimates from the plan's row-count statistics. Production teams often have one engineer who specialises in reading plans and writing rewrite rules — that engineer is the one whose work most reliably saves the most cluster-hours.

Common confusions

Going deeper

MPP architecture has been studied for 35 years; the modern cloud incarnations are mostly engineering refinements on the algebra DeWitt and Gray laid out. This section covers how the implementations have evolved.

Vectorised execution and the shuffle

Modern engines (Snowflake, Photon, DuckDB, BigQuery) execute operators on batches of rows (typically 1024 to 10000 rows at a time) rather than row-by-row. This affects the shuffle: rather than serialising and shipping one row at a time, the engine ships compressed columnar batches across the network. Net effect: shuffle bytes drop 3–5x because columnar formats compress better than row formats, and serialisation CPU overhead drops 10x. The shuffle is still the bottleneck; vectorisation just makes it less of one.

Shuffle service (push vs pull)

Spark and Trino traditionally implemented shuffle as pull-based: the receiving worker initiates a request to each sending worker for its bucket. This requires sending workers to keep their shuffle output on local disk until all receivers have fetched it, which (a) consumes lots of disk and (b) makes worker failures expensive (lose a sender's disk = lose the shuffle output, restart the stage). Modern push-based shuffle services (Spark 3.2+ "Magnet", Trino's external shuffle) push shuffle output to a separate shuffle service, decoupling it from worker lifecycle. Failures become cheap; resource isolation improves.

Adaptive query execution

Spark 3.0's AQE re-plans the query after each shuffle stage based on the observed row counts. Three things it does that matter: (1) coalesce small post-shuffle partitions into bigger ones (avoids the "many tiny tasks" problem), (2) switch a sort-merge join to broadcast if the post-shuffle data is small enough, (3) detect skewed partitions and split them (skew handling). The same data, the same query, can run 10x faster with AQE on, particularly for skewed workloads.

Cost-based optimiser internals

The optimiser's job is to pick the plan with the lowest estimated cost. Cost is dominated by shuffle bytes, so the optimiser searches over join order, join algorithm (broadcast vs shuffle vs sort-merge), and partial-aggregation insertion. The search space is exponential in the number of joins; modern optimisers use dynamic programming up to ~12-way joins and switch to genetic algorithms beyond that. Cardinality estimation — predicting how many rows each operator will produce — is the hardest and least-solved problem. Stale stats are the #1 cause of wrong plans in production.

Cloud-native MPP

The shift from on-prem (Vertica, Greenplum, Teradata) to cloud-native (Snowflake, BigQuery, Databricks) changed two things: (1) storage moved from local disks to object storage (S3, GCS), which is slower per-byte but vastly cheaper and infinitely scalable, and (2) compute became elastic — you can spin up 1000 workers for a single query and tear them down five minutes later. Both shifts make the shuffle even more dominant: object storage is fast enough that scan is rarely the bottleneck, and elastic compute means shuffle pattern matters more than worker count. Snowflake's architecture paper [4] is the canonical reference for this transition.

Where this leads next

You now understand why every analytical engine, from a self-managed Trino to a fully-hidden BigQuery, is built around the same shape: parallel scan, local work, exchange, parallel finish. The next chapters look at the architectural choices layered on top of this skeleton.

The shuffle is the heart of any MPP system. Once you have built one in 60 lines of Python, every Spark plan, every Snowflake query, every BigQuery dashboard becomes legible. The arithmetic is the same: scan locally, do what you can locally, shuffle as little as possible, finish locally. Every other optimisation is a refinement of those four words.

References

  1. Lamb et al., The Vertica Analytic Database: C-Store 7 Years Later, VLDB 2012 — Vertica's MPP architecture, projection-based storage, and how co-located joins eliminate shuffles for tables partitioned on the join key.
  2. DeWitt and Stonebraker, MapReduce: A Major Step Backwards, 2008 — the famous polemic arguing that MapReduce reinvented MPP badly. The technical critique still stands and explains why modern engines (Spark, Trino) are MPP databases that happen to look like MapReduce, not the other way around.
  3. Armbrust et al., Spark SQL: Relational Data Processing in Spark, SIGMOD 2015 — the Spark SQL paper, including how Catalyst composes optimisations like partial aggregation and broadcast joins as rewrite rules.
  4. Dageville et al., The Snowflake Elastic Data Warehouse, SIGMOD 2016 — Snowflake's architecture: separated storage and compute, hidden coordinator decisions, virtual warehouses as elastic MPP clusters.
  5. Chaiken et al., SCOPE: Easy and Efficient Parallel Processing of Massive Data Sets, VLDB 2008 — Microsoft's SCOPE / Cosmos distributed system, with tree-structured execution and a dataflow language compiled into MPP plans.
  6. DeWitt and Gray, Parallel Database Systems: The Future of High Performance Database Systems, CACM 1992 — the foundational MPP paper, introducing shared-nothing architecture, hash-partitioning, and the parallel-execution algebra still used today.