In short

A distributed OLAP query engine — Spark, Trino, Snowflake, BigQuery, Databricks Photon — has to join two tables that live as partitions on dozens or hundreds of workers. There is no single machine that holds both sides; the join must move data across the network. Two strategies dominate. Broadcast join picks the smaller side, sends a full copy of it to every worker, and then each worker scans only its local slice of the larger side and probes the in-memory broadcast hash table. Network cost: |small| × N_workers. Win condition: the small side fits comfortably in each worker's memory after compression and overhead. Shuffle join (also called hash-partitioned join or repartition join) hash-partitions both sides on the join key and routes matching partitions to the same worker, so each worker performs a local hash join on its share. Network cost: |left| + |right| — both tables fully traverse the network once. Shuffle is the only choice when both sides are too large to broadcast; broadcast is the better choice whenever it fits. The cost-based optimiser uses column statistics to estimate the smaller side's size and picks broadcast iff that estimate falls below a threshold (10 MB by default in Spark, raised to hundreds of MB in production). When key distribution is skewed — one user with 80% of the orders — both strategies degrade because all matching rows pile up on one worker; the fix is salting (append a random suffix to skewed keys, broadcast the small side) or partial broadcast. Get the choice right and a 100 GB × 1 GB join finishes in 30 seconds. Get it wrong by broadcasting the 100 GB side and every executor OOMs at once. This chapter builds a four-worker Python demonstrator that counts bytes-on-the-wire for both algorithms, derives the decision rule, and walks through how Spark, Trino, and Snowflake each implement it.

You have spent six chapters on single-node joins — nested loop, hash, sort-merge, all the cost formulas in pages and rows. Now scale the problem up. The fact table is 100 GB, the dimension table is 1 GB, and they live on a Spark cluster of 100 workers, partitioned by some unrelated key. No single machine has the whole orders table in memory; no single machine has the whole dim_user table on disk. To join them, data has to move.

The question is which data moves and how much. Get this right and your query finishes while you go for chai. Get it wrong and you come back from lunch to find every executor crashed and the Slack channel on fire.

Distributed OLAP engines pick between two main strategies — broadcast and shuffle — with sort-merge as a third option for very large or skewed joins. The choice is made by a cost-based optimiser using column statistics, but the decision is so consequential that production teams routinely override it with hints. Knowing the algorithms and their costs is the difference between writing a JOIN in SQL and engineering a join that runs at scale.

This chapter walks through both algorithms with a tiny multi-worker Python demonstrator that counts every byte that moves over the simulated network, then derives the decision rule, then shows how the three big engines (Spark, Trino, Snowflake) implement it.

The setup: data lives on workers, not on a single machine

In a single-node engine, both inputs to a join sit somewhere accessible — on disk pages reachable by the buffer pool, or in memory. The question was which algorithm minimises page reads. In a distributed engine, both inputs are themselves split across workers, with no shared address space. The question becomes which algorithm minimises bytes shipped between workers, because the network is now the slowest part of the system.

A typical modern OLAP cluster:

The expensive part is moving rows between workers once each worker has read its slice. A join requires that rows from the two tables with matching keys end up on the same worker, and there are exactly two ways to make that happen.

Broadcast join: ship the small side to everyone

The first strategy assumes one side of the join is small. "Small" here means it fits in memory on a single worker after decompression — typically a few hundred MB to a few GB, depending on cluster sizing.

The algorithm:

  1. The driver (or coordinator) reads the small side from object storage and collects it into one place.
  2. The driver broadcasts the small side to every worker. Now every worker has a full local copy of the small side.
  3. Each worker builds a local hash table on the broadcast side, keyed on the join column.
  4. Each worker scans its local slice of the big side and probes the hash table for matches. Matches are emitted as output.
  5. No data ever moves between workers for the big side — every worker only sees its own partitions.
Broadcast join: small dimension table replicated to every workerA central driver holds the small dim_user table. Arrows fan out from the driver to four workers, each receiving a full copy of dim_user. Each worker has its own slice of the large orders fact table on local disk. Each worker performs the join locally between its local orders slice and the broadcast dim_user copy.Broadcast join: small side replicated, big side stays putdriverholds dim_user (1 GB)copy of dim_usercopy of dim_userworker 1+ dim_user copyorders[0..25GB]local hash + probeworker 2+ dim_user copyorders[25..50GB]local hash + probeworker 3+ dim_user copyorders[50..75GB]local hash + probeworker 4+ dim_user copyorders[75..100GB]local hash + probeNetwork cost = |dim_user| × N = 1 GB × 4 = 4 GBBig side (orders, 100 GB) never crosses the networkEach worker reads its local 25 GB slice from object storage in parallel, then joins against the in-memory broadcast.No worker-to-worker shuffle. No skew penalty. The bottleneck is the broadcast itself.
Broadcast join. The driver ships a full copy of the small dimension table to every worker; each worker reads its local slice of the big fact table and joins locally. The only network traffic is the broadcast.

The network cost is |small| × N_workers. For our 1 GB dimension and 100 workers, that is 100 GB total bytes shipped — but it is shipped in parallel from one source to many destinations, so wall-clock time is bounded by the broadcast latency (often using a tree fan-out: the driver sends to 10 workers, each of those forwards to 10 more, log(N) hops, very fast in practice).

The 100 GB fact table never moves across the network. Every byte of orders is read by exactly one worker, from object storage to that worker's local memory, processed there, and that worker emits its share of the join output.

Why broadcast is so good when it fits: the alternative is to move both sides over the network. Broadcast moves only one side, and only the small one. The fact table, which is 100x bigger, stays put. Network bandwidth, not CPU, is the bottleneck on most distributed joins, so reducing bytes-on-the-wire by 100x reduces wall time by close to 100x.

The catch — the only catch, but it is fatal — is that every worker must hold the full broadcast side in memory. If your small side is actually 8 GB and you broadcast it to 100 workers each with 8 GB of usable executor memory, every executor immediately runs out of RAM and the query crashes. Production teams routinely raise the broadcast threshold from Spark's default 10 MB to 100 MB or 200 MB, but never beyond what the smallest worker can hold with comfortable headroom.

Shuffle join: hash-partition both sides

The second strategy handles the case where neither side fits in worker memory. Both sides are large; you have to use the network for both. The trick is to ensure that for any given join key, all rows from both tables with that key end up on the same worker, so the per-worker join is a normal local hash join.

The algorithm:

  1. Each worker reads its local slice of the left side.
  2. For each row, compute hash(join_key) mod N_workers to determine which worker that row belongs to.
  3. Send the row to that worker over the network. (This is the shuffle write phase.)
  4. Each worker simultaneously does the same for the right side, partitioning by the same hash function.
  5. Each worker now has a partition of the left side and a partition of the right side, with the property that any matching join key is in the same partition on the same worker.
  6. Each worker performs a local hash join on its partition. Output is emitted in parallel.
Shuffle join: both sides hash-partitioned and routed to the same worker by keyFour workers each hold local slices of both tables. Arrows showing rows being repartitioned: each worker sends each row to one of four workers based on hash(join_key) mod 4. After shuffle, each worker holds matching key partitions of both tables and performs a local hash join.Shuffle join: both sides hash-partitioned by join keybefore shuffleafter shuffleworker 1L slice + R slice (random keys)worker 2L slice + R slice (random keys)worker 3L slice + R slice (random keys)worker 4L slice + R slice (random keys)hash(key) mod 4worker 1: keys mod 4 = 0local hash joinworker 2: keys mod 4 = 1local hash joinworker 3: keys mod 4 = 2local hash joinworker 4: keys mod 4 = 3local hash joinNetwork cost = |L| + |R| (both sides traverse the network once)Works for arbitrarily large tables; cost scales linearly with total dataEach worker becomes responsible for a hash bucket. Matching keys from both sides land together by construction.If keys are skewed, one worker gets all the popular rows and becomes a straggler that holds up the whole query.
Shuffle join. Both tables are hash-partitioned by the join key; matching partitions are routed to the same worker, which then performs a local hash join. Network cost is the sum of both table sizes.

Network cost is |L| + |R| — each row of each table is shipped exactly once, from the worker that read it from object storage to the worker that owns its hash bucket. For our 100 GB fact and 1 GB dimension, that is 101 GB of network traffic. On a 10 Gbps cluster network, with cross-traffic patterns and contention, this typically takes minutes, not seconds.

But it works. There is no broadcast that has to fit in any single worker's memory. The algorithm scales to arbitrarily large tables on both sides; the only cost is more network and a longer wall-clock time.

Why hash partitioning works: the hash function h(k) = k mod N_workers (or any other deterministic key-to-worker map) is the same function applied to both sides. A row from the left side with key k and a row from the right side with key k both go to worker h(k). Therefore every pair that could match ends up on the same worker, and no pair that could match ends up split across workers. The local hash join on each worker sees the complete picture for its bucket, so no matches are lost.

Sort-merge join with shuffle: an alternative for sorted/skewed data

A third option, used by Spark and Presto for very large joins, is sort-merge join with shuffle. The first phase is identical to shuffle hash join: both sides are hash-partitioned by the join key. But each worker, instead of building a hash table, sorts its partition of both sides on the join key and then runs a merge-join.

This trades CPU (sorting is O(n log n) vs O(n) for hash) for two benefits: (1) it spills more gracefully when the hash table doesn't fit in memory, since merge join only needs one row at a time from each side, and (2) it handles skew slightly better, because the sort is O(n log n) regardless of distribution while a giant hash table on a skewed key blows out memory. Spark's default join algorithm for large-to-large joins is sort-merge for exactly these reasons.

Skew: the hidden killer

Both broadcast and shuffle assume the join keys are reasonably uniformly distributed. Real production data is almost never uniform. Consider an orders table for an Indian e-commerce platform: 10% of all orders are placed against the same merchant_id (the largest seller), 30% are against the top 100 merchants, and the long tail covers the remaining 60%.

In a shuffle join on merchant_id, the worker assigned to the largest merchant's hash bucket receives 10% of the entire 100 GB fact table — 10 GB of rows on a single worker, while every other worker handles ~1 GB. That worker's local hash join takes 10x longer than the others; the entire query is bottlenecked on the slowest worker. This is skew, and it is the most common cause of mysteriously slow distributed joins in practice.

Three solutions:

Salting. Append a random suffix (salt) to the skewed key on both sides. The skewed key merchant_id = 5 becomes (5, salt_0), (5, salt_1), ..., (5, salt_15), distributed across 16 hash buckets. The small side (typically the dimension table) must be replicated with all possible salts so that joins still match. The small side gets 16x bigger; the skewed large side gets 16x more evenly distributed. Net win for severe skew.

Partial broadcast (skew join). Detect at runtime which keys are heavily skewed, broadcast just those rows of the small side to every worker, and shuffle the rest normally. Spark's adaptive query execution does this automatically since 3.0.

Bucketing in storage. If you know joins on merchant_id are common, pre-bucket the table on merchant_id at write time. Then the join is essentially free of shuffle — both tables are already partitioned the same way. But this has to be planned in advance and constrains how the table can be written.

The decision rule

The cost-based optimiser has one job here: estimate the size of each side and pick broadcast or shuffle. The heuristic is simple:

Decision tree: broadcast vs shuffle vs sort-merge joinA decision tree starting from the question "is the smaller side under the broadcast threshold?". If yes, use broadcast. If no, ask "are both sides too large to broadcast?". If yes, use shuffle hash or sort-merge. If both are skewed, apply salting. The diagram shows the threshold value and the three end states.Picking a distributed join algorithmsmaller side < threshold?(default 10 MB; tune to ~100 MB)YESNOBROADCAST JOINship small side to every workernet = |small| × NSHUFFLE JOINhash-partition both sidesnet = |L| + |R|+ skew handlingsalting / partial broadcastif one key dominatesThe threshold is set by row-count and column statistics in the table catalog; misestimates are the #1 cause of OOM joins.
Decision rule. Broadcast if the smaller side fits comfortably in worker memory; otherwise shuffle. Add salting on top of either if a single join key dominates the distribution.

The threshold is the only really tunable parameter. Spark's default spark.sql.autoBroadcastJoinThreshold is 10 MB — extremely conservative, designed for the worst case where workers have very little RAM. In production, teams routinely raise it to 100 MB, 200 MB, or even 500 MB once they know how much headroom each worker has. The threshold is per-table, not per-worker — Spark estimates the total in-memory size of one side and compares to the threshold.

The estimate comes from column statistics: row count, average row size, NDV (number of distinct values). If statistics are stale or missing, the optimiser falls back to defaults that are often wrong. This is the single biggest source of OOMs in distributed-join production: the optimiser thought the small side was 50 MB based on stale stats, broadcast it, and the actual side was 5 GB.

A four-worker Python demonstrator

Let's build a tiny working version that counts bytes-on-the-wire for both algorithms. The demonstrator has four "workers" — really just lists in a single Python process — and a "network" that tracks every row sent between workers.

# olap_join_demo.py
from __future__ import annotations
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Iterable
import hashlib

ROW_BYTES = 100  # assume each row is ~100 bytes for accounting

@dataclass
class Worker:
    wid: int
    left_local: list = field(default_factory=list)   # local slice of L
    right_local: list = field(default_factory=list)  # local slice of R
    received_left: list = field(default_factory=list)
    received_right: list = field(default_factory=list)

@dataclass
class Cluster:
    workers: list[Worker]
    bytes_shipped: int = 0  # accumulator: total bytes moved over network

    def send(self, src: int, dst: int, rows: Iterable):
        rows = list(rows)
        if src != dst:
            self.bytes_shipped += len(rows) * ROW_BYTES
        return rows

    @property
    def n(self) -> int:
        return len(self.workers)

def partition_key(key, n_workers: int) -> int:
    h = hashlib.md5(str(key).encode()).digest()
    return int.from_bytes(h[:4], "big") % n_workers

That is the substrate: a Cluster with N workers, a bytes_shipped counter that ticks up on every cross-worker send, and a hash-based partitioner. Now broadcast:

def broadcast_join(cluster: Cluster, build_holder: int = 0) -> tuple[list, int]:
    """Broadcast join: worker `build_holder` ships its right_local
    (the small side) to every other worker, then each worker performs
    a local hash join."""
    # Step 1: gather the small side onto one worker (assume already there).
    small_side = cluster.workers[build_holder].right_local
    # Step 2: broadcast to every worker.
    for w in cluster.workers:
        w.received_right = cluster.send(build_holder, w.wid, small_side)
    # Step 3: each worker builds a hash table on the broadcast side
    # and probes against its local left slice.
    output = []
    for w in cluster.workers:
        ht = defaultdict(list)
        for row in w.received_right:
            ht[row[0]].append(row)  # row[0] is the join key
        for left_row in w.left_local:
            for right_row in ht.get(left_row[0], []):
                output.append(left_row + right_row)
    return output, cluster.bytes_shipped

And shuffle:

def shuffle_join(cluster: Cluster) -> tuple[list, int]:
    """Shuffle hash join: every worker partitions both sides by hash(key)
    and routes matching partitions to the same destination worker."""
    n = cluster.n
    # Step 1: each worker partitions its local L and R by hash, sends.
    inflight_left = [[] for _ in range(n)]
    inflight_right = [[] for _ in range(n)]
    for w in cluster.workers:
        for row in w.left_local:
            dst = partition_key(row[0], n)
            inflight_left[dst].extend(cluster.send(w.wid, dst, [row]))
        for row in w.right_local:
            dst = partition_key(row[0], n)
            inflight_right[dst].extend(cluster.send(w.wid, dst, [row]))
    # Step 2: each worker performs a local hash join on its bucket.
    output = []
    for wid, w in enumerate(cluster.workers):
        ht = defaultdict(list)
        for row in inflight_right[wid]:
            ht[row[0]].append(row)
        for left_row in inflight_left[wid]:
            for right_row in ht.get(left_row[0], []):
                output.append(left_row + right_row)
    return output, cluster.bytes_shipped

Why this demonstrator is faithful: real distributed engines do exactly this — partition both sides by hash(key) mod N, send rows to their destination workers, run a local hash join. The accounting is also realistic: every row that crosses a worker boundary counts. The only thing missing is parallelism (real workers run in parallel, mine run in a loop) and the network is a perfect interconnect (no contention). For comparing algorithms on bytes-shipped, that is enough.

# Quick sanity check.
def make_cluster(n_workers: int, n_left: int, n_right: int, key_range: int):
    import random
    random.seed(42)
    cluster = Cluster([Worker(i) for i in range(n_workers)])
    # Distribute L randomly across all workers.
    for i in range(n_left):
        wid = i % n_workers
        cluster.workers[wid].left_local.append((random.randint(0, key_range), f"L{i}"))
    # Put all of R on worker 0 (small side, ready to broadcast).
    for i in range(n_right):
        cluster.workers[0].right_local.append((random.randint(0, key_range), f"R{i}"))
    return cluster

# Broadcast:
c = make_cluster(n_workers=4, n_left=10000, n_right=100, key_range=200)
out, bytes_b = broadcast_join(c)
print(f"broadcast: {len(out)} matches, {bytes_b/1024:.1f} KB shipped")

# Shuffle: distribute R across workers first.
c = make_cluster(n_workers=4, n_left=10000, n_right=100, key_range=200)
# Move R from worker 0 to be evenly distributed (simulate it was already partitioned).
all_r = c.workers[0].right_local
c.workers[0].right_local = []
for i, row in enumerate(all_r):
    c.workers[i % 4].right_local.append(row)
c.bytes_shipped = 0  # reset
out, bytes_s = shuffle_join(c)
print(f"shuffle:   {len(out)} matches, {bytes_s/1024:.1f} KB shipped")

Running this with 10,000 left rows and 100 right rows over 4 workers, the broadcast version ships about 30 KB (3 copies of the 100-row right side, since worker 0 doesn't ship to itself). The shuffle version ships roughly 750 KB (10,000 left rows + 100 right rows minus the ~25% that didn't need to move, all at 100 B/row). Broadcast wins by 25x because the right side is tiny and the left side never moves.

Now flip the sizes — make both sides 10,000 rows. Broadcast would have to ship 10,000 rows × 3 destinations = 3 MB. Shuffle ships 20,000 rows × ~75% (the rows that need to cross workers) = 1.5 MB. Shuffle wins, and as the sides grow more comparable, shuffle's advantage compounds.

A real-world join: orders × dim_user on a 100-worker cluster.

You are running an OLAP query on a Spark cluster: SELECT u.city, SUM(o.amount) FROM orders o JOIN dim_user u ON o.user_id = u.user_id GROUP BY u.city.

  • orders: the fact table, 1 billion rows, 100 GB on disk after Parquet compression
  • dim_user: the dimension, 50 million rows, 1 GB on disk after compression
  • Cluster: 100 workers, each with 8 GB of executor memory available for shuffle and broadcast hash tables

Option A — broadcast dim_user.

  1. Driver pulls the 1 GB dim table to itself (4 GB after decompression in memory, since column store expands).
  2. Driver broadcasts to all 100 workers. Network cost: 1 GB × 100 = 100 GB total bytes (in parallel, takes ~30 seconds via tree-based broadcast).
  3. Each worker reads its local 1 GB slice of orders from S3 (parallel, ~10 seconds).
  4. Each worker builds a 4 GB hash table on user_id (fits in 8 GB executor memory with headroom).
  5. Each worker probes 1 GB of orders against the hash table, emits joined rows. ~5 seconds.

Total wall time: ~45 seconds end-to-end. Network usage: bounded by broadcast latency.

Option B — shuffle both sides.

  1. Each worker reads its local slice of both tables in parallel.
  2. Both sides hash-partition on user_id. Each worker ships ~99% of its rows to other workers (only 1% land in its own bucket by chance with 100 workers).
  3. Network cost: 100 GB + 1 GB = 101 GB, all at once, with N² fan-out — competing for limited cross-rack bandwidth.
  4. Each worker receives ~1 GB of orders + ~10 MB of dim, performs local hash join.

Total wall time: ~5 minutes. Network is the bottleneck; the all-to-all shuffle of 100 GB across a 10 Gbps cluster is fundamentally slow.

Option C — broadcast orders (the wrong choice).

  1. Driver tries to pull 100 GB to itself — immediately fails on driver memory limits (typically 8–16 GB).
  2. If you somehow got past the driver, each worker would try to hold 100 GB in 8 GB executor memory. Every executor OOMs simultaneously.
  3. Query crashes within seconds.

The decision. Option A wins by ~7x over option B. The Spark optimiser, with reasonable statistics, picks A automatically because spark.sql.autoBroadcastJoinThreshold (raised to 2 GB in production) covers dim_user. If statistics are stale and Spark doesn't realise dim_user is broadcastable, you can force it with a hint:

SELECT /*+ BROADCAST(u) */ u.city, SUM(o.amount)
FROM orders o JOIN dim_user u ON o.user_id = u.user_id
GROUP BY u.city

The /*+ BROADCAST(u) */ hint tells Spark to use broadcast no matter what its cost estimate says. Use it when you know better than the optimiser; never use it speculatively, because broadcasting a too-big side is the fastest way to OOM your whole cluster.

How real engines decide

Apache Spark uses a cost-based optimiser (Catalyst) with broadcast as the first preference whenever the smaller side's estimated in-memory size falls below spark.sql.autoBroadcastJoinThreshold (default 10 MB, often tuned higher). If broadcast is rejected, Spark picks sort-merge join by default for large joins — sort-merge is preferred over shuffle hash because it spills more gracefully to disk when the local hash table doesn't fit. Hints (/*+ BROADCAST(t) */, /*+ SHUFFLE_HASH(t) */, /*+ SHUFFLE_MERGE(t) */) override the cost-based choice. Adaptive Query Execution (AQE) added in Spark 3.0 can switch from shuffle to broadcast at runtime if the actual data size after a filter turns out to be small enough.

Trino / Presto make the same decision at plan time using table statistics. The JOIN_DISTRIBUTION_TYPE session property controls the policy: BROADCAST always broadcasts the right side, PARTITIONED always shuffles, AUTOMATIC (default) lets the cost-based optimiser pick. The cost model includes both network bytes and worker memory pressure. See the Trino blog on distributed joins for the original design.

Snowflake hides this completely. The user writes a SQL JOIN and the system picks the strategy internally based on the optimiser's view of the data. Snowflake has no equivalent of Spark's BROADCAST hint — you cannot override the choice. The reasoning is that Snowflake's elastic compute (warehouses can be resized at any moment) makes the broadcast/shuffle decision dynamic in ways the user cannot predict, so the system is the only entity with the right information. This is a deliberately opinionated trade: less control, less foot-gun.

BigQuery similarly hides the choice but exposes it in the query plan visualisation, so you can see which side was broadcast or shuffled after the fact. Tuning is mostly via clustering and partitioning at table creation time, not via per-query hints.

Databricks Photon is a vectorised execution engine that runs on top of Spark and uses the same broadcast/shuffle decision logic, but with a much faster local join implementation per worker. The decision rule does not change; only the constant factor on the local hash join shrinks by 5–10x.

Common confusions

Going deeper

If you want to understand why production clusters spend more engineering effort on join planning than on every other operator combined, this section is for you. Bloom-filter pushdown, dynamic partition pruning, and adaptive query execution are all about making this single decision smarter.

Bloom-filter join (semi-join reduction)

Before broadcasting or shuffling, modern engines often build a small probabilistic filter from the small side and push it into the scan of the large side. A bloom filter on the small side's join keys, broadcast to every worker, allows each worker to filter out 90%+ of large-side rows that have no match before any join machinery runs. Spark calls this "runtime bloom filter pushdown"; Trino calls it "dynamic filtering." It is essentially a cheap broadcast of just the keyspace, used to shrink the input to whatever join algorithm runs next. For star-schema queries (one big fact, many small dimensions), this routinely cuts the fact-side scan from terabytes to gigabytes.

Dynamic partition pruning

A related trick: if the fact table is partitioned on the join key, the small side's distinct keys can be used at runtime to skip whole partitions of the fact side. Spark added this in 3.0 (DPP); BigQuery and Snowflake do similar things automatically. For a partitioned fact table, dynamic partition pruning often reduces the fact-side scan by 100x, making even shuffle joins fast.

Adaptive Query Execution (AQE)

Spark 3.0's AQE does runtime re-planning after each shuffle stage. The classic problem: the optimiser estimated the right side at 50 MB (broadcast threshold), but a WHERE clause turned out to be more selective than expected and the right side post-filter is only 10 MB. AQE notices this after the filter stage and switches a shuffle plan to broadcast on the fly. Conversely, if a planned broadcast turns out to be too big, AQE can switch to shuffle. This eliminates a huge class of OOMs caused by stale statistics.

Skew join handling in production

Spark AQE has automatic skew detection: after a shuffle, if any partition is more than spark.sql.adaptive.skewJoin.skewedPartitionFactor × the median, it splits that partition into smaller chunks and replicates the matching rows from the other side. This is essentially salting done automatically. Trino and BigQuery have similar features. The user-facing impact: a query that used to take 10 minutes because of one skewed key now takes 90 seconds.

Why sort-merge dominates for large-large joins

When neither side is broadcastable, you would think shuffle hash join is best — it is O(n) after the shuffle, vs sort-merge's O(n log n). In practice Spark and most modern engines pick sort-merge for large joins because (1) sort-merge spills to disk gracefully when the per-worker partition doesn't fit in memory (hash join can't spill mid-build without doubling complexity), (2) sort-merge handles skew slightly better since the sort runtime is data-independent, and (3) sort-merge produces sorted output that downstream operators (window functions, range joins) can exploit. The CPU-cost penalty is log n, which on million-row partitions is only 20x — small relative to the I/O cost.

Photon and vectorised local joins

Databricks Photon, Snowflake's vectorised executor, and DuckDB all show that the local join on each worker can be made dramatically faster with vectorised SIMD-friendly hash table implementations and CPU-cache-aware partitioning. Photon's papers report 5–10x speedups on the local join phase. The broadcast/shuffle decision is unaffected; the constants on the local side just get smaller.

Where this leads next

You now understand why distributed joins are the hardest operator to plan, and how the two main strategies trade network bytes against worker memory. The next chapter looks at engines that bypass distributed execution entirely by running on a single very fat node.

The broadcast vs shuffle decision is the single most consequential choice a distributed query optimiser makes. Master it and you can read any Spark or Trino query plan and predict its wall-clock time to within a factor of two. Get it wrong and your query takes ten hours instead of one minute, or crashes the cluster.

References

  1. Apache Spark, Performance Tuning — Join Strategy Hints — the canonical reference for BROADCAST, SHUFFLE_HASH, and SHUFFLE_MERGE hints, plus the autoBroadcastJoinThreshold parameter and AQE settings.
  2. Trino Project, Distributed Query Execution and Join Strategies — Trino's documentation of how the cost-based optimiser picks broadcast vs partitioned joins, with worked examples.
  3. Blanas, Patel, Ercegovac, Rao, Shekita, Tian, A Comparison of Join Algorithms for Log Processing in MapReduce, SIGMOD 2010 — empirical comparison of broadcast, shuffle hash, and sort-merge joins on distributed clusters; the foundational measurements behind modern OLAP planners.
  4. Dageville et al., The Snowflake Elastic Data Warehouse, SIGMOD 2016 — Snowflake's architecture paper, explaining why distributed-join decisions are hidden from users and made dynamically based on warehouse sizing.
  5. Behm et al., Photon: A Fast Query Engine for Lakehouse Systems, SIGMOD 2022 — Databricks Photon paper, including how vectorised local-join execution composes with Spark's broadcast/shuffle decisions.
  6. DeWitt and Gray, Parallel Database Systems: The Future of High Performance Database Systems, CACM 1992 — the original paper formalising hash-partitioned (shuffle) and broadcast joins as the two primitives of distributed query execution.