In short

A GROUP BY is a shuffle. Every row sharing a group-key value has to meet in one place so the aggregate can be computed, and the two canonical ways to arrange that meeting are hashing and sorting. Hash aggregation builds a hashmap keyed by the group key, updating a running accumulator (SUM, COUNT, MIN, …) on each row insert. In memory it is a single linear pass over the input — O(|R|) CPU and B_R page reads. When the hashmap exceeds memory you switch to external hash aggregation, which partitions the input by h(key) mod k exactly like grace hash join, aggregates each partition in isolation, and concatenates. Cost: 3·B_R — one pass to read, one to write partitions, one to re-read. Sort aggregation sorts the input on the group key using external merge sort, then does a single linear pass that emits one output row every time the key changes. Cost: 2·B_R·(1 + ⌈log_{M−1}(N/M)⌉) for the sort plus B_R for the final pass — dominated by the sort, so roughly 4·B_R in realistic regimes. On top of both, pre-aggregation (the combiner trick MapReduce and Spark use) feeds the input through a tiny in-memory map first, collapsing partial groups before they ever hit the spill path. MIN, MAX, COUNT, SUM are algebraic — their partials combine cleanly, so pre-aggregation and distributed aggregation work for free. AVG is just SUM + COUNT + divide. MEDIAN, COUNT(DISTINCT) are holistic — their partials cannot be combined, so they need sorting, sketches, or a full shuffle. SELECT DISTINCT is GROUP BY without an aggregate and has the same cost profile.

The query that forces you to choose

You are running this against a global orders table:

SELECT country, COUNT(*), SUM(amount)
FROM orders
GROUP BY country;

Five hundred million rows. Roughly two hundred distinct countries. Your buffer pool is 100 MB.

You already know from iterator-model execution that the physical plan is a tree, and somewhere in it sits a GroupAggregateExec node that consumes rows from a scan and emits one row per country. The question is how it does that.

Two hundred accumulators fit in a few kilobytes. A hashmap keyed by country name, with (count, sum) pairs for the values, does not break a sweat. One pass over the input — B_R page reads — and you are done. Sort aggregation would force you to sort five hundred million rows first, which is roughly 4·B_R I/O even at best. Hash aggregation is four times cheaper on this query, and every analytical engine on earth picks it.

Now change the query:

SELECT customer_id, COUNT(*), SUM(amount)
FROM orders
GROUP BY customer_id;

Same table. Same buffer pool. But now there are fifty million distinct customer_ids, and a hashmap with fifty million entries does not fit in 100 MB. Hash aggregation either spills to disk (three-pass grace-style partitioning) or pre-aggregates so aggressively that the hashmap does fit. Sort aggregation, meanwhile, is now competitive: the sort costs 4·B_R whether there are two hundred groups or fifty million, whereas hash's cost shot up.

Which of the two you pick depends on the number of distinct keys, the memory budget, whether the input is already sorted, and what the operator above you wants. This chapter builds both algorithms, derives their costs, and lays out the four regimes where one or the other wins.

The aggregation operator contract

A GroupAggregateExec is a Volcano iterator just like HashJoinExec or SortMergeJoinExec. Its contract is the same open(), next(), close() from chapter 44, with two pieces of configuration at construction time: a list of column indices that form the group key, and a list of (aggregate_function, source_column) pairs that say what to compute per group.

# query/exec/agg.py
from dataclasses import dataclass, field
from typing import Callable, Iterator, Any

Row = tuple

@dataclass
class AggSpec:
    factory: Callable[[], "Accumulator"]   # fresh accumulator per group
    source: int                            # column index to feed in
    name: str                              # output column name

@dataclass
class GroupAggregateExec:
    child: Any                              # input operator
    group_cols: tuple[int, ...]             # indices forming the group key
    aggs: list[AggSpec]                     # SUM/COUNT/AVG/…
    mode: str = "hash"                      # "hash" | "sort"

An Accumulator is the small interface that SUM, COUNT, MIN and friends all satisfy. Two methods and nothing else: add(value) updates the running state, finalise() returns the output value at the end.

class Accumulator:
    def add(self, value: Any) -> None: ...
    def finalise(self) -> Any: ...

class Count(Accumulator):
    def __init__(self): self.n = 0
    def add(self, v): self.n += 0 if v is None else 1
    def finalise(self): return self.n

class Sum(Accumulator):
    def __init__(self): self.s = 0
    def add(self, v):
        if v is not None: self.s += v
    def finalise(self): return self.s

class Min(Accumulator):
    def __init__(self): self.m = None
    def add(self, v):
        if v is not None and (self.m is None or v < self.m): self.m = v
    def finalise(self): return self.m

class Avg(Accumulator):
    def __init__(self): self.s, self.n = 0, 0
    def add(self, v):
        if v is not None: self.s += v; self.n += 1
    def finalise(self): return None if self.n == 0 else self.s / self.n

Why separate add and finalise: most aggregates have a running state that is not the output. AVG carries (sum, count) internally and divides once at the end — calling the division on every add would be wasted work, and more importantly, it would make merging partial accumulators across partitions impossible. The split between add (row-level update) and finalise (group-level readout) is the same split as Spark's createCombiner / mergeValue / mergeCombiners trio. It is the interface that makes distributed and pre-aggregation work.

Why every accumulator has to handle None: SQL's three-valued logic says NULL is neither present nor absent — COUNT(x) skips nulls, COUNT(*) counts them, SUM skips them, MIN skips them. The accumulator is the right place to encode those rules once, so the operator above does not have to branch on column semantics. A common bug in amateur engines is SUM returning NULL + 3 = NULL and poisoning the whole group; filtering nulls inside add avoids it.

The aggregates fall into three buckets based on how their partial states combine — this distinction becomes load-bearing two sections from now, when partitioning enters the picture:

Every optimisation in the rest of this chapter applies cleanly to distributive and algebraic aggregates. Holistic aggregates need either sorting or an approximation algorithm — they are the reason HyperLogLog exists.

Hash-based aggregation

The algorithm is three lines in English. Scan the input. For each row, compute the group key and look it up in a dict; if absent, insert a fresh tuple of accumulators. Call add on each accumulator with the right source column. At end of input, finalise and emit one row per entry.

# query/exec/hash_agg.py
from collections import OrderedDict
from typing import Iterator

class HashAggIterator:
    def __init__(self, exec: GroupAggregateExec):
        self.exec = exec
        self.table: dict[tuple, list[Accumulator]] = {}

    def _group_key(self, row: Row) -> tuple:
        return tuple(row[i] for i in self.exec.group_cols)

    def _ensure_bucket(self, key: tuple) -> list[Accumulator]:
        bucket = self.table.get(key)
        if bucket is None:
            bucket = [spec.factory() for spec in self.exec.aggs]
            self.table[key] = bucket
        return bucket

    def __iter__(self) -> Iterator[Row]:
        self.exec.child.open()
        try:
            for row in self.exec.child:
                bucket = self._ensure_bucket(self._group_key(row))
                for acc, spec in zip(bucket, self.exec.aggs):
                    acc.add(row[spec.source])
        finally:
            self.exec.child.close()
        for key, bucket in self.table.items():
            yield key + tuple(a.finalise() for a in bucket)

Thirty lines including whitespace. The core of every analytical engine's hash aggregator is this shape — Postgres's ExecAgg with AGG_HASHED strategy, DuckDB's PhysicalHashAggregate, Spark's HashAggregateExec all look like this when you strip away their bookkeeping.

Why the whole input is consumed before the first output row is emitted: the last row of the input might land in a brand-new group, or update the running SUM of an existing group. You cannot safely emit any group while input remains, because the emitted value might still change. This makes hash aggregation a blocking operator in the sense of chapter 44 — its open() has to drain the child before the first next() can return. The same is true of sort aggregation, with one small exception noted later.

Deriving the cost

Let B_R be the number of input pages and G the number of distinct groups. Let M be the number of memory pages available to the operator.

Case 1: the hashmap fits in memory (G · s_acc < M, where s_acc is accumulator bytes). One linear pass over the input. Every row: one hash, one dict lookup, one add per aggregate — amortised O(1). Total CPU is O(|R| · a) where a is the number of aggregates, and I/O is just the scan:

\boxed{\;\text{I/O}_{\text{hash agg, in-memory}} = B_R\;}

Why the I/O is B_R and not B_R + \text{anything}: the hashmap lives in memory, the output rows stream up through the iterator protocol to whatever parent is consuming them, and the per-row update is O(1). You never re-read a page. There is no cheaper algorithm possible; every aggregation over an unsorted input has to at least read the input once, so B_R is the theoretical floor.

This is a spectacular number. A query like SELECT country, COUNT(*) FROM orders GROUP BY country with 500 M rows and 200 countries runs at disk-scan speed — roughly B_R / \text{scan-bandwidth} wall-clock. Nothing beats it on a single machine.

Case 2: the hashmap does not fit. Which is the next section.

External hash aggregation

The moment the hashmap exceeds memory, naive Python would silently page it to swap and every add becomes a 10 ms random disk read — exactly the failure mode hash join warned about. The database fixes it with the same partitioning trick grace hash join uses: partition the input by h(key) mod k so that rows with the same group key land in the same partition, then aggregate each partition independently, in memory, one at a time.

import pickle, tempfile, os

def external_hash_agg(child, group_cols, aggs, k=16):
    """Partition by hash(group_key) % k, then aggregate each partition."""
    dirpath = tempfile.mkdtemp()
    files = [open(f"{dirpath}/part_{i}", "wb") for i in range(k)]
    try:
        child.open()
        for row in child:
            key = tuple(row[c] for c in group_cols)
            pickle.dump(row, files[hash(key) % k])
        child.close()
    finally:
        for f in files: f.close()
    for i in range(k):
        table: dict[tuple, list[Accumulator]] = {}
        with open(f"{dirpath}/part_{i}", "rb") as f:
            while True:
                try: row = pickle.load(f)
                except EOFError: break
                key = tuple(row[c] for c in group_cols)
                bucket = table.setdefault(key, [s.factory() for s in aggs])
                for acc, spec in zip(bucket, aggs):
                    acc.add(row[spec.source])
        for key, bucket in table.items():
            yield key + tuple(a.finalise() for a in bucket)
        os.remove(f"{dirpath}/part_{i}")

Two phases, two passes' worth of code. The partition pass writes every input row to one of k spill files based on the hash of its group key. The aggregation pass, for each partition in turn, loads it into an in-memory hashmap and runs the normal hash-aggregate algorithm. Because h is deterministic, all rows with the same group key land in the same partition, so no cross-partition state needs to be maintained — each partition's result is independent and can be emitted directly.

Deriving the cost

Count page reads and writes.

\text{I/O}_{\text{ext hash agg}} = \underbrace{B_R}_{\text{partition read}} + \underbrace{B_R}_{\text{partition write}} + \underbrace{B_R}_{\text{aggregation read}}
\boxed{\;\text{I/O}_{\text{ext hash agg}} = 3 \cdot B_R\;}

Why exactly three and not more: each page of input is touched three times — read once during partitioning, written once as part of a partition file, read once during per-partition aggregation. The output goes up through the iterator without being written to disk, so there is no fourth pass. This is the same 3 that shows up in grace hash join's 3(B_R + B_S) — it is the grace partitioning pattern applied to an aggregation where there is only one input.

Choosing k. You want each partition's hashmap (not the partition file itself — the hashmap of its distinct groups) to fit in memory. If G is the total number of groups, each partition holds roughly G/k groups. Pick k so that G/k · s_acc < M, i.e. k ≥ G · s_acc / M. In practice k is typically 16 to 256 on a healthy analytical query, and 1024+ when working with billions of groups.

Skewed keys are the pain point. If one group contains twenty percent of all rows, twenty percent of the input ends up in one partition, and that partition's hashmap still does not fit. You have to recurse: detect the overflow during the aggregation pass, re-partition the offending partition with a different hash function, and try again. Postgres does exactly this — see ExecHashAgg in nodeAgg.c. The escape hatch is that a single hot key (one value appearing a billion times) cannot be split at all by any hash function, which is fine for aggregation — one hot key means one accumulator in memory, so the big partition shrinks to a tiny output anyway.

Sort-based aggregation

Sorting rearranges the input so that all rows with the same group key are physically adjacent. Once that is true, aggregation becomes a one-cursor linear pass: open an accumulator on the first row, keep updating as long as the key is unchanged, emit-and-reset the moment the key changes.

def sort_agg(sorted_child, group_cols, aggs):
    """Assumes sorted_child is already sorted on group_cols."""
    current_key = None
    bucket: list[Accumulator] = []
    sorted_child.open()
    try:
        for row in sorted_child:
            key = tuple(row[c] for c in group_cols)
            if current_key is None:
                current_key = key
                bucket = [s.factory() for s in aggs]
            elif key != current_key:                   # group boundary
                yield current_key + tuple(a.finalise() for a in bucket)
                current_key = key
                bucket = [s.factory() for s in aggs]
            for acc, spec in zip(bucket, aggs):
                acc.add(row[spec.source])
        if current_key is not None:
            yield current_key + tuple(a.finalise() for a in bucket)
    finally:
        sorted_child.close()

Twenty lines. The elif key != current_key check is the whole algorithm: every time the key changes, the current group is complete and can be emitted. Memory used: exactly one accumulator bucket at a time — O(1), regardless of the number of groups. This is sort aggregation's single biggest structural advantage: it does not care whether you have a billion groups, because it never holds more than one group's state in memory.

The catch, of course, is that sort aggregation requires sorted input. If the child operator is already sorted on the group key (clustered B-tree index scan, upstream ORDER BY, upstream merge join with a matching key), the sort is free. Otherwise you have to stick an external merge sort under the aggregator.

Deriving the cost

Already-sorted input. One linear pass, one accumulator in memory at a time:

\boxed{\;\text{I/O}_{\text{sort agg, sorted input}} = B_R\;}

Matches in-memory hash aggregation — optimal.

Unsorted input. External merge sort from chapter 47 costs 2·B_R·(1 + ⌈log_{M−1}(B_R/M)⌉). For realistic B_R and M, that is one merge pass — total 4·B_R. Add the final linear pass and fuse it into the sort's last merge (production engines do this as a standard optimisation), so the aggregation pass costs zero extra I/O:

\boxed{\;\text{I/O}_{\text{sort agg, unsorted}} \approx 2 \cdot B_R \cdot (1 + \lceil \log_{M-1}(B_R/M) \rceil)\;}

Why the aggregation pass fuses into the sort's final merge: during the last merge pass of external sort, the k-way merge heap already emits rows in sorted order one at a time. Instead of writing them to a materialised sorted file and then re-reading, the aggregator sits directly on top of the merge heap's output stream, consuming rows as they emerge. The final merge pass' output never hits disk. This is a standard optimisation in every production external sort, and it is what drops the cost from 5·B_R (sort + separate aggregation pass) to 4·B_R.

So sort aggregation on unsorted input is 4·B_R in the realistic regime, versus external hash aggregation's 3·B_R. Hash wins by one pass. On sorted input, sort is B_R versus hash's B_R — they tie on I/O, but sort wins on CPU because the sort-aggregate inner loop is a simple key-equality check while hash aggregate pays for a hash computation per row.

Pre-aggregation — the combiner trick

There is a cheap optimisation that sits on top of either algorithm and often beats both. It goes by three names: partial aggregation (Postgres, DuckDB), the combiner (MapReduce, Spark), or pre-aggregation (SQL Server). They all do the same thing.

Run the input through a small in-memory hashmap first. When the hashmap fills, spill the partial aggregates to disk (not the raw rows). Downstream, aggregate the partial aggregates into the final result.

def pre_aggregate(child, group_cols, aggs, mem_limit_rows=10_000):
    """Emit partial aggregate rows from a small in-memory hashmap,
    flushing when full. Output is a stream of partial-group rows."""
    table: dict[tuple, list[Accumulator]] = {}
    child.open()
    try:
        for row in child:
            key = tuple(row[c] for c in group_cols)
            bucket = table.get(key)
            if bucket is None:
                if len(table) >= mem_limit_rows:        # flush & reset
                    for k, b in table.items():
                        yield k + tuple(a.finalise() for a in b)
                    table = {}
                bucket = [s.factory() for s in aggs]
                table[key] = bucket
            for acc, spec in zip(bucket, aggs):
                acc.add(row[spec.source])
        for k, b in table.items():
            yield k + tuple(a.finalise() for a in b)
    finally:
        child.close()

The parent operator — the final aggregator — sees a stream of partial groups instead of raw rows. Its job is to re-aggregate: pick up every (country, partial_count, partial_sum) and feed the partials into SUM accumulators, so (India, 10000) followed by (India, 8500) becomes (India, 18500).

Why this saves I/O: on 500 M rows with 200 distinct countries, the pre-aggregation reduces the output stream from 500 M rows to — in the best case — 200 rows per flush. If the hashmap holds 10,000 partial groups at once and there are 200 distinct countries, the hashmap never fills; one flush at the end, 200 output rows, and the downstream aggregation is trivial. The I/O reduction factor is |R| / G in the best case, which for |R| = 500 M and G = 200 is 2.5 · 10^6. The spill pass of external hash aggregation, which was going to write B_R pages of raw rows, now writes B_G pages of partial groups — millions of times smaller.

Why this is exactly the MapReduce "combiner": MapReduce's reduce phase shuffles all mapper output over the network to reducers that group by key. If each mapper pre-aggregates its local slice with a combiner before the shuffle, the network traffic shrinks by the same |R|/G factor. The algorithmic pattern — "partial aggregate before expensive redistribution" — is the same at every scale, from DuckDB's in-memory two-phase aggregator to Spark's shuffle-write optimiser.

Pre-aggregation requires the aggregate to be algebraic or distributive. For SUM, COUNT, MIN, MAX, combining partials is trivial: the partial SUM of two chunks is the sum of the partial sums. For AVG, the partial state is (sum, count) and you combine by adding both. For COUNT(DISTINCT) and MEDIAN, partials cannot be combined correctly — see the sketches section below.

Pre-aggregation is the single most impactful optimisation on top of hash aggregation in practice. Every serious analytical engine does it automatically. The DuckDB hash-aggregate implementation goes further and does two-phase aggregation even within a single thread, because the L1-cache-sized first hashmap is 100× faster per row than the L3-sized final one.

When hash beats sort and vice versa

Four regimes, and knowing which you are in tells the planner what to do.

Small G, fits in memory. GROUP BY country with 200 distinct values. Hash aggregation is one linear pass, cost B_R. Sort aggregation on unsorted input costs 4·B_R because of the sort. Hash wins four-to-one — the common case for dashboard analytics, and why hash is the default.

Large G, both algorithms spill. GROUP BY customer_id with 50 M distinct values and 100 MB of memory. External hash costs 3·B_R with recursive partitioning on skew; sort aggregation costs 4·B_R with no recursion. Hash still wins by one pass, but if the keys are skewed enough to force a second partitioning round, sort can overtake. In production optimisers this is the fuzzy regime where statistics matter — Postgres's cost comparison between hash and sort aggregate depends on the estimated G.

Input already sorted on the group key. Clustered B-tree on the group column, or the child is a sort-merge join on that key, or there is an upstream ORDER BY group_key. Sort aggregation is one linear pass, cost B_R. Hash aggregation is also one pass but its hashmap allocates memory proportional to G, whereas sort aggregation holds one bucket. Sort wins on CPU and memory — the planner picks it when it can.

Downstream ORDER BY on the group key. GROUP BY country ORDER BY country. Sort aggregation emits output in group-key order, making the ORDER BY a no-op; hash aggregation forces an extra Sort operator on top. Same interesting-orders argument as sort-merge join.

Very high cardinality, approximate answer acceptable. SELECT COUNT(DISTINCT ip_address) FROM events. The distinct-count aggregate is holistic; the exact algorithm sorts and counts key boundaries, costing 4·B_R. A HyperLogLog sketch answers it to within 1–2% error in O(1) memory and O(B_R) time. ClickHouse, BigQuery, Presto, DuckDB, Redshift all ship HLL as a first-class aggregate.

GROUP BY country vs GROUP BY customer_id on the same 10 GB table

Setup. Orders table: 500 M rows × 200 bytes = 100 GB. Page size 8 KB, so B_R ≈ 1.25 × 10^7 pages — for clean arithmetic, call it B_R = 10^7. Buffer pool available to the aggregator: 100 MB = M ≈ 1.28 × 10^4 pages. Call it M = 10^4.

Query 1 — GROUP BY country. G = 200 distinct countries. Each accumulator bucket is roughly 64 bytes (two 64-bit aggregates plus dict overhead), so the whole hashmap is 200 × 64 = 12.8 KB. Fits trivially in memory.

\text{I/O}_{\text{hash agg}} = B_R = 10^7 \text{ pages}

Why 10^7: one linear scan, hashmap lives entirely in the L1/L2 cache, zero spill, zero re-reads. This is the best any aggregator can do and equals the cost of simply scanning the table.

At 1 GB/s SSD, that is about 100 seconds of wall clock — pure scan time, no CPU-bound stall anywhere. Pre-aggregation does not help (the hashmap already fits), so the whole plan is one operator.

Query 2 — GROUP BY customer_id. G = 50 × 10^6 distinct customers. Hashmap size: 50 × 10^6 × 64 \text{ bytes} = 3.2 \text{ GB}32 times memory. You cannot hold it.

External hash aggregation. Partition count: you need each partition's hashmap to fit in 100 MB = 1.5 × 10^6 groups per partition, so k ≥ 50 × 10^6 / 1.5 × 10^6 ≈ 34. Round up to k = 64.

\text{I/O}_{\text{ext hash agg}} = 3 \cdot B_R = 3 \times 10^7 \text{ pages} = 240 \text{ GB of disk traffic}

At 1 GB/s, that is about 240 seconds — four minutes. Two and a half times slower than the country query despite being the same table, purely because of the distinct-key count.

Or sort aggregation. External sort costs 2·B_R·(1 + ⌈log_{M−1}(B_R/M)⌉). With B_R = 10^7, M = 10^4: B_R/M = 10^3, log_{M−1}(B_R/M) ≈ log_{10^4}(10^3) < 1, so ⌈...⌉ = 1. Total passes: 2. I/O: 4·B_R = 4 × 10^7 \text{ pages}, and the aggregation fuses into the final merge.

\text{I/O}_{\text{sort agg}} = 4 \cdot B_R = 4 \times 10^7 \text{ pages}

Hash wins by 33% on I/O — 3 × 10^7 vs 4 × 10^7. In this regime the planner picks hash unless the query has an ORDER BY customer_id downstream, in which case sort's free output order tips the balance.

With pre-aggregation. Run a small pre-aggregator on the input with a 10 K-entry hashmap. Each flush emits at most 10 K partial groups. Over 500 M rows, you flush roughly 5000 times — output volume roughly 5000 × 10 \text{K} = 5 \times 10^7 partial-group rows, which is ten percent of input. Feeding this reduced stream into the external hash aggregator drops the partition-pass I/O by roughly 10×, giving total I/O closer to 1.3·B_R = 1.3 × 10^7. DuckDB's actual numbers on this workload typically show a 3–5× wall-clock improvement from pre-aggregation alone.

Rows streaming into a hashmap keyed by country Left side shows a vertical stack of input rows labelled with country values: India, USA, India, Brazil, USA, India, Germany, Brazil. Arrows flow right into a central hashmap box with bucket rows: India → (count=3, sum=450); USA → (count=2, sum=220); Brazil → (count=2, sum=180); Germany → (count=1, sum=90). Right side shows the output stream: one row per country with the finalised counts and sums. Hash aggregation: rows stream in, accumulators update, one output row per group input stream India,100 USA,120 India,150 Brazil,90 USA,100 India,200 Germany,90 Brazil,90 hashmap key = country; value = (n, sum) India (3, 450) USA (2, 220) Brazil (2, 180) Germany (1, 90) 4 distinct keys fits trivially in memory output India, 3, 450 USA, 2, 220 Brazil, 2, 180 Germany, 1, 90 4 rows, one per group Every row does one hash, one dict lookup, one add() per aggregate — amortised O(1). Output size is |groups|, not |rows| — 500 M rows collapse into 4 lines.
Hash aggregation on a tiny eight-row sample of the orders table. Every input row updates exactly one hashmap entry; at end of input, the map is drained to produce one output row per distinct country. Memory is `O(G)`, I/O is one sequential pass — optimal when `G` fits in memory.

Common confusions

Going deeper

Vectorised aggregation

Classical hash aggregation does one add call per row — a virtual dispatch plus a hashmap lookup for a tiny amount of real work. DuckDB, ClickHouse, and Velox use vectorised aggregation: process a batch of 1024–2048 rows at a time. Vectorise the hash (SIMD, one instruction per 16 keys), the bucket lookup (AVX-512 gather), and the accumulator update (add(vector) instead of 1024 add(value) calls). The inner loop becomes branch-free columnar arithmetic, running at 5–10 GB/s per core on simple SUM/COUNT. The DuckDB aggregation blog walks through the design, including variable-width keys and skew handling.

Approximate aggregation — HyperLogLog and friends

COUNT(DISTINCT x) is the canonical holistic aggregate, and the exact algorithm is too expensive on billion-row inputs. HyperLogLog (Flajolet et al. 2007) maintains a 4–16 KB probabilistic state that estimates distinct count to within 1–2% error, in O(B_R) time and O(1) memory. The insight: hash each value uniformly, track the leading-zero position across a grid of buckets, and the expected max leading-zero count is log_2(distinct count). ClickHouse's uniqCombined, BigQuery's APPROX_COUNT_DISTINCT, DuckDB's approx_count_distinct are all HLL variants. Twin algorithms — Count-Min Sketch for heavy hitters, t-digest for quantiles — turn other holistic aggregates into algebraic ones at bounded error. A future chapter builds HLL from first principles.

ROLLUP, CUBE, GROUPING SETS

GROUP BY ROLLUP(country, city) computes the country-city grouping, the country rollup, and the grand total in one query. The naive implementation runs the aggregator three times; a smart one computes the finest (country, city) aggregation once and derives the coarser levels by re-aggregating the fine results — a distributive-aggregate property, which is why the bucket classification at the top of this chapter is load-bearing.

Where this leads next

Aggregation is the operator you use most often, whether you know it or not. Every dashboard, every report, every "top 10" query, every EXPLAIN on an analytical workload has a HashAggregate or GroupAggregate in the plan. The three cost formulas from this chapter — B_R in memory, 3·B_R for external hash, 4·B_R for sort — are enough to read any query plan with confidence.

References

  1. Graefe, Query Evaluation Techniques for Large Databases, ACM Computing Surveys 25(2), 1993 — the reference survey for physical aggregation algorithms, covering both hash and sort variants, the distributive/algebraic/holistic classification, and the partitioning trick for external hash aggregation. The numbers in this chapter are calibrated against Graefe's formulas.
  2. Larson, Grouping and Duplicate Elimination: Benefits of Early Aggregation, Microsoft Research TR 97-36, 1997 — the paper that formalised pre-aggregation (the "combiner" trick) and showed its |R|/G reduction factor empirically. Still the clearest exposition of why partial aggregation is the single biggest optimisation on top of hash aggregation.
  3. Raasveldt, DuckDB Hash Aggregation Blog, 2022 — the DuckDB team's walkthrough of their vectorised hash aggregator, including two-phase aggregation, variable-width group keys, and skew handling. The most readable modern source on how aggregation actually runs in a production vectorised engine.
  4. Flajolet, Fusy, Gandouet, Meunier, HyperLogLog: the Analysis of a Near-Optimal Cardinality Estimation Algorithm, AofA 2007 — the original HyperLogLog paper. Proves the error bounds that every production APPROX_COUNT_DISTINCT relies on. A surprisingly readable 20 pages.
  5. PostgreSQL source, nodeAgg.c — the production implementation of both hash and sort aggregation in Postgres, including the ExecHashAgg spill path with recursive partitioning. About 4000 lines, and one of the cleanest open-source aggregation implementations to read.
  6. Pavlo, CMU 15-445 Aggregation Lecture Notes, Carnegie Mellon Database Group — the standard teaching reference for the hash-vs-sort comparison, with the cost derivations done in the same notation as this chapter.