In short
A GROUP BY is a shuffle. Every row sharing a group-key value has to meet in one place so the aggregate can be computed, and the two canonical ways to arrange that meeting are hashing and sorting. Hash aggregation builds a hashmap keyed by the group key, updating a running accumulator (SUM, COUNT, MIN, …) on each row insert. In memory it is a single linear pass over the input — O(|R|) CPU and B_R page reads. When the hashmap exceeds memory you switch to external hash aggregation, which partitions the input by h(key) mod k exactly like grace hash join, aggregates each partition in isolation, and concatenates. Cost: 3·B_R — one pass to read, one to write partitions, one to re-read. Sort aggregation sorts the input on the group key using external merge sort, then does a single linear pass that emits one output row every time the key changes. Cost: 2·B_R·(1 + ⌈log_{M−1}(N/M)⌉) for the sort plus B_R for the final pass — dominated by the sort, so roughly 4·B_R in realistic regimes. On top of both, pre-aggregation (the combiner trick MapReduce and Spark use) feeds the input through a tiny in-memory map first, collapsing partial groups before they ever hit the spill path. MIN, MAX, COUNT, SUM are algebraic — their partials combine cleanly, so pre-aggregation and distributed aggregation work for free. AVG is just SUM + COUNT + divide. MEDIAN, COUNT(DISTINCT) are holistic — their partials cannot be combined, so they need sorting, sketches, or a full shuffle. SELECT DISTINCT is GROUP BY without an aggregate and has the same cost profile.
The query that forces you to choose
You are running this against a global orders table:
SELECT country, COUNT(*), SUM(amount)
FROM orders
GROUP BY country;
Five hundred million rows. Roughly two hundred distinct countries. Your buffer pool is 100 MB.
You already know from iterator-model execution that the physical plan is a tree, and somewhere in it sits a GroupAggregateExec node that consumes rows from a scan and emits one row per country. The question is how it does that.
Two hundred accumulators fit in a few kilobytes. A hashmap keyed by country name, with (count, sum) pairs for the values, does not break a sweat. One pass over the input — B_R page reads — and you are done. Sort aggregation would force you to sort five hundred million rows first, which is roughly 4·B_R I/O even at best. Hash aggregation is four times cheaper on this query, and every analytical engine on earth picks it.
Now change the query:
SELECT customer_id, COUNT(*), SUM(amount)
FROM orders
GROUP BY customer_id;
Same table. Same buffer pool. But now there are fifty million distinct customer_ids, and a hashmap with fifty million entries does not fit in 100 MB. Hash aggregation either spills to disk (three-pass grace-style partitioning) or pre-aggregates so aggressively that the hashmap does fit. Sort aggregation, meanwhile, is now competitive: the sort costs 4·B_R whether there are two hundred groups or fifty million, whereas hash's cost shot up.
Which of the two you pick depends on the number of distinct keys, the memory budget, whether the input is already sorted, and what the operator above you wants. This chapter builds both algorithms, derives their costs, and lays out the four regimes where one or the other wins.
The aggregation operator contract
A GroupAggregateExec is a Volcano iterator just like HashJoinExec or SortMergeJoinExec. Its contract is the same open(), next(), close() from chapter 44, with two pieces of configuration at construction time: a list of column indices that form the group key, and a list of (aggregate_function, source_column) pairs that say what to compute per group.
# query/exec/agg.py
from dataclasses import dataclass, field
from typing import Callable, Iterator, Any
Row = tuple
@dataclass
class AggSpec:
factory: Callable[[], "Accumulator"] # fresh accumulator per group
source: int # column index to feed in
name: str # output column name
@dataclass
class GroupAggregateExec:
child: Any # input operator
group_cols: tuple[int, ...] # indices forming the group key
aggs: list[AggSpec] # SUM/COUNT/AVG/…
mode: str = "hash" # "hash" | "sort"
An Accumulator is the small interface that SUM, COUNT, MIN and friends all satisfy. Two methods and nothing else: add(value) updates the running state, finalise() returns the output value at the end.
class Accumulator:
def add(self, value: Any) -> None: ...
def finalise(self) -> Any: ...
class Count(Accumulator):
def __init__(self): self.n = 0
def add(self, v): self.n += 0 if v is None else 1
def finalise(self): return self.n
class Sum(Accumulator):
def __init__(self): self.s = 0
def add(self, v):
if v is not None: self.s += v
def finalise(self): return self.s
class Min(Accumulator):
def __init__(self): self.m = None
def add(self, v):
if v is not None and (self.m is None or v < self.m): self.m = v
def finalise(self): return self.m
class Avg(Accumulator):
def __init__(self): self.s, self.n = 0, 0
def add(self, v):
if v is not None: self.s += v; self.n += 1
def finalise(self): return None if self.n == 0 else self.s / self.n
Why separate add and finalise: most aggregates have a running state that is not the output. AVG carries (sum, count) internally and divides once at the end — calling the division on every add would be wasted work, and more importantly, it would make merging partial accumulators across partitions impossible. The split between add (row-level update) and finalise (group-level readout) is the same split as Spark's createCombiner / mergeValue / mergeCombiners trio. It is the interface that makes distributed and pre-aggregation work.
Why every accumulator has to handle None: SQL's three-valued logic says NULL is neither present nor absent — COUNT(x) skips nulls, COUNT(*) counts them, SUM skips them, MIN skips them. The accumulator is the right place to encode those rules once, so the operator above does not have to branch on column semantics. A common bug in amateur engines is SUM returning NULL + 3 = NULL and poisoning the whole group; filtering nulls inside add avoids it.
The aggregates fall into three buckets based on how their partial states combine — this distinction becomes load-bearing two sections from now, when partitioning enters the picture:
- Distributive — the final aggregate over a partitioned input equals the same aggregate applied to the per-partition aggregates.
SUM,COUNT,MIN,MAXare distributive:SUM(A ∪ B) = SUM(A) + SUM(B). - Algebraic — computable from a fixed-size vector of distributive aggregates.
AVG = SUM / COUNT.STDDEVneeds(n, sum, sum_of_squares). The state stays bounded. - Holistic — no finite summary works.
MEDIAN,COUNT(DISTINCT),PERCENTILEneed the whole input (or a sketch that approximates it).
Every optimisation in the rest of this chapter applies cleanly to distributive and algebraic aggregates. Holistic aggregates need either sorting or an approximation algorithm — they are the reason HyperLogLog exists.
Hash-based aggregation
The algorithm is three lines in English. Scan the input. For each row, compute the group key and look it up in a dict; if absent, insert a fresh tuple of accumulators. Call add on each accumulator with the right source column. At end of input, finalise and emit one row per entry.
# query/exec/hash_agg.py
from collections import OrderedDict
from typing import Iterator
class HashAggIterator:
def __init__(self, exec: GroupAggregateExec):
self.exec = exec
self.table: dict[tuple, list[Accumulator]] = {}
def _group_key(self, row: Row) -> tuple:
return tuple(row[i] for i in self.exec.group_cols)
def _ensure_bucket(self, key: tuple) -> list[Accumulator]:
bucket = self.table.get(key)
if bucket is None:
bucket = [spec.factory() for spec in self.exec.aggs]
self.table[key] = bucket
return bucket
def __iter__(self) -> Iterator[Row]:
self.exec.child.open()
try:
for row in self.exec.child:
bucket = self._ensure_bucket(self._group_key(row))
for acc, spec in zip(bucket, self.exec.aggs):
acc.add(row[spec.source])
finally:
self.exec.child.close()
for key, bucket in self.table.items():
yield key + tuple(a.finalise() for a in bucket)
Thirty lines including whitespace. The core of every analytical engine's hash aggregator is this shape — Postgres's ExecAgg with AGG_HASHED strategy, DuckDB's PhysicalHashAggregate, Spark's HashAggregateExec all look like this when you strip away their bookkeeping.
Why the whole input is consumed before the first output row is emitted: the last row of the input might land in a brand-new group, or update the running SUM of an existing group. You cannot safely emit any group while input remains, because the emitted value might still change. This makes hash aggregation a blocking operator in the sense of chapter 44 — its open() has to drain the child before the first next() can return. The same is true of sort aggregation, with one small exception noted later.
Deriving the cost
Let B_R be the number of input pages and G the number of distinct groups. Let M be the number of memory pages available to the operator.
Case 1: the hashmap fits in memory (G · s_acc < M, where s_acc is accumulator bytes). One linear pass over the input. Every row: one hash, one dict lookup, one add per aggregate — amortised O(1). Total CPU is O(|R| · a) where a is the number of aggregates, and I/O is just the scan:
Why the I/O is B_R and not B_R + \text{anything}: the hashmap lives in memory, the output rows stream up through the iterator protocol to whatever parent is consuming them, and the per-row update is O(1). You never re-read a page. There is no cheaper algorithm possible; every aggregation over an unsorted input has to at least read the input once, so B_R is the theoretical floor.
This is a spectacular number. A query like SELECT country, COUNT(*) FROM orders GROUP BY country with 500 M rows and 200 countries runs at disk-scan speed — roughly B_R / \text{scan-bandwidth} wall-clock. Nothing beats it on a single machine.
Case 2: the hashmap does not fit. Which is the next section.
External hash aggregation
The moment the hashmap exceeds memory, naive Python would silently page it to swap and every add becomes a 10 ms random disk read — exactly the failure mode hash join warned about. The database fixes it with the same partitioning trick grace hash join uses: partition the input by h(key) mod k so that rows with the same group key land in the same partition, then aggregate each partition independently, in memory, one at a time.
import pickle, tempfile, os
def external_hash_agg(child, group_cols, aggs, k=16):
"""Partition by hash(group_key) % k, then aggregate each partition."""
dirpath = tempfile.mkdtemp()
files = [open(f"{dirpath}/part_{i}", "wb") for i in range(k)]
try:
child.open()
for row in child:
key = tuple(row[c] for c in group_cols)
pickle.dump(row, files[hash(key) % k])
child.close()
finally:
for f in files: f.close()
for i in range(k):
table: dict[tuple, list[Accumulator]] = {}
with open(f"{dirpath}/part_{i}", "rb") as f:
while True:
try: row = pickle.load(f)
except EOFError: break
key = tuple(row[c] for c in group_cols)
bucket = table.setdefault(key, [s.factory() for s in aggs])
for acc, spec in zip(bucket, aggs):
acc.add(row[spec.source])
for key, bucket in table.items():
yield key + tuple(a.finalise() for a in bucket)
os.remove(f"{dirpath}/part_{i}")
Two phases, two passes' worth of code. The partition pass writes every input row to one of k spill files based on the hash of its group key. The aggregation pass, for each partition in turn, loads it into an in-memory hashmap and runs the normal hash-aggregate algorithm. Because h is deterministic, all rows with the same group key land in the same partition, so no cross-partition state needs to be maintained — each partition's result is independent and can be emitted directly.
Deriving the cost
Count page reads and writes.
- Partition pass. Read the input once (
B_Rreads), write the partitioned copy (B_Rwrites). - Aggregation pass. Read each partition once (
B_Rtotal reads). No writes — results stream up through the iterator.
Why exactly three and not more: each page of input is touched three times — read once during partitioning, written once as part of a partition file, read once during per-partition aggregation. The output goes up through the iterator without being written to disk, so there is no fourth pass. This is the same 3 that shows up in grace hash join's 3(B_R + B_S) — it is the grace partitioning pattern applied to an aggregation where there is only one input.
Choosing k. You want each partition's hashmap (not the partition file itself — the hashmap of its distinct groups) to fit in memory. If G is the total number of groups, each partition holds roughly G/k groups. Pick k so that G/k · s_acc < M, i.e. k ≥ G · s_acc / M. In practice k is typically 16 to 256 on a healthy analytical query, and 1024+ when working with billions of groups.
Skewed keys are the pain point. If one group contains twenty percent of all rows, twenty percent of the input ends up in one partition, and that partition's hashmap still does not fit. You have to recurse: detect the overflow during the aggregation pass, re-partition the offending partition with a different hash function, and try again. Postgres does exactly this — see ExecHashAgg in nodeAgg.c. The escape hatch is that a single hot key (one value appearing a billion times) cannot be split at all by any hash function, which is fine for aggregation — one hot key means one accumulator in memory, so the big partition shrinks to a tiny output anyway.
Sort-based aggregation
Sorting rearranges the input so that all rows with the same group key are physically adjacent. Once that is true, aggregation becomes a one-cursor linear pass: open an accumulator on the first row, keep updating as long as the key is unchanged, emit-and-reset the moment the key changes.
def sort_agg(sorted_child, group_cols, aggs):
"""Assumes sorted_child is already sorted on group_cols."""
current_key = None
bucket: list[Accumulator] = []
sorted_child.open()
try:
for row in sorted_child:
key = tuple(row[c] for c in group_cols)
if current_key is None:
current_key = key
bucket = [s.factory() for s in aggs]
elif key != current_key: # group boundary
yield current_key + tuple(a.finalise() for a in bucket)
current_key = key
bucket = [s.factory() for s in aggs]
for acc, spec in zip(bucket, aggs):
acc.add(row[spec.source])
if current_key is not None:
yield current_key + tuple(a.finalise() for a in bucket)
finally:
sorted_child.close()
Twenty lines. The elif key != current_key check is the whole algorithm: every time the key changes, the current group is complete and can be emitted. Memory used: exactly one accumulator bucket at a time — O(1), regardless of the number of groups. This is sort aggregation's single biggest structural advantage: it does not care whether you have a billion groups, because it never holds more than one group's state in memory.
The catch, of course, is that sort aggregation requires sorted input. If the child operator is already sorted on the group key (clustered B-tree index scan, upstream ORDER BY, upstream merge join with a matching key), the sort is free. Otherwise you have to stick an external merge sort under the aggregator.
Deriving the cost
Already-sorted input. One linear pass, one accumulator in memory at a time:
Matches in-memory hash aggregation — optimal.
Unsorted input. External merge sort from chapter 47 costs 2·B_R·(1 + ⌈log_{M−1}(B_R/M)⌉). For realistic B_R and M, that is one merge pass — total 4·B_R. Add the final linear pass and fuse it into the sort's last merge (production engines do this as a standard optimisation), so the aggregation pass costs zero extra I/O:
Why the aggregation pass fuses into the sort's final merge: during the last merge pass of external sort, the k-way merge heap already emits rows in sorted order one at a time. Instead of writing them to a materialised sorted file and then re-reading, the aggregator sits directly on top of the merge heap's output stream, consuming rows as they emerge. The final merge pass' output never hits disk. This is a standard optimisation in every production external sort, and it is what drops the cost from 5·B_R (sort + separate aggregation pass) to 4·B_R.
So sort aggregation on unsorted input is 4·B_R in the realistic regime, versus external hash aggregation's 3·B_R. Hash wins by one pass. On sorted input, sort is B_R versus hash's B_R — they tie on I/O, but sort wins on CPU because the sort-aggregate inner loop is a simple key-equality check while hash aggregate pays for a hash computation per row.
Pre-aggregation — the combiner trick
There is a cheap optimisation that sits on top of either algorithm and often beats both. It goes by three names: partial aggregation (Postgres, DuckDB), the combiner (MapReduce, Spark), or pre-aggregation (SQL Server). They all do the same thing.
Run the input through a small in-memory hashmap first. When the hashmap fills, spill the partial aggregates to disk (not the raw rows). Downstream, aggregate the partial aggregates into the final result.
def pre_aggregate(child, group_cols, aggs, mem_limit_rows=10_000):
"""Emit partial aggregate rows from a small in-memory hashmap,
flushing when full. Output is a stream of partial-group rows."""
table: dict[tuple, list[Accumulator]] = {}
child.open()
try:
for row in child:
key = tuple(row[c] for c in group_cols)
bucket = table.get(key)
if bucket is None:
if len(table) >= mem_limit_rows: # flush & reset
for k, b in table.items():
yield k + tuple(a.finalise() for a in b)
table = {}
bucket = [s.factory() for s in aggs]
table[key] = bucket
for acc, spec in zip(bucket, aggs):
acc.add(row[spec.source])
for k, b in table.items():
yield k + tuple(a.finalise() for a in b)
finally:
child.close()
The parent operator — the final aggregator — sees a stream of partial groups instead of raw rows. Its job is to re-aggregate: pick up every (country, partial_count, partial_sum) and feed the partials into SUM accumulators, so (India, 10000) followed by (India, 8500) becomes (India, 18500).
Why this saves I/O: on 500 M rows with 200 distinct countries, the pre-aggregation reduces the output stream from 500 M rows to — in the best case — 200 rows per flush. If the hashmap holds 10,000 partial groups at once and there are 200 distinct countries, the hashmap never fills; one flush at the end, 200 output rows, and the downstream aggregation is trivial. The I/O reduction factor is |R| / G in the best case, which for |R| = 500 M and G = 200 is 2.5 · 10^6. The spill pass of external hash aggregation, which was going to write B_R pages of raw rows, now writes B_G pages of partial groups — millions of times smaller.
Why this is exactly the MapReduce "combiner": MapReduce's reduce phase shuffles all mapper output over the network to reducers that group by key. If each mapper pre-aggregates its local slice with a combiner before the shuffle, the network traffic shrinks by the same |R|/G factor. The algorithmic pattern — "partial aggregate before expensive redistribution" — is the same at every scale, from DuckDB's in-memory two-phase aggregator to Spark's shuffle-write optimiser.
Pre-aggregation requires the aggregate to be algebraic or distributive. For SUM, COUNT, MIN, MAX, combining partials is trivial: the partial SUM of two chunks is the sum of the partial sums. For AVG, the partial state is (sum, count) and you combine by adding both. For COUNT(DISTINCT) and MEDIAN, partials cannot be combined correctly — see the sketches section below.
Pre-aggregation is the single most impactful optimisation on top of hash aggregation in practice. Every serious analytical engine does it automatically. The DuckDB hash-aggregate implementation goes further and does two-phase aggregation even within a single thread, because the L1-cache-sized first hashmap is 100× faster per row than the L3-sized final one.
When hash beats sort and vice versa
Four regimes, and knowing which you are in tells the planner what to do.
Small G, fits in memory. GROUP BY country with 200 distinct values. Hash aggregation is one linear pass, cost B_R. Sort aggregation on unsorted input costs 4·B_R because of the sort. Hash wins four-to-one — the common case for dashboard analytics, and why hash is the default.
Large G, both algorithms spill. GROUP BY customer_id with 50 M distinct values and 100 MB of memory. External hash costs 3·B_R with recursive partitioning on skew; sort aggregation costs 4·B_R with no recursion. Hash still wins by one pass, but if the keys are skewed enough to force a second partitioning round, sort can overtake. In production optimisers this is the fuzzy regime where statistics matter — Postgres's cost comparison between hash and sort aggregate depends on the estimated G.
Input already sorted on the group key. Clustered B-tree on the group column, or the child is a sort-merge join on that key, or there is an upstream ORDER BY group_key. Sort aggregation is one linear pass, cost B_R. Hash aggregation is also one pass but its hashmap allocates memory proportional to G, whereas sort aggregation holds one bucket. Sort wins on CPU and memory — the planner picks it when it can.
Downstream ORDER BY on the group key. GROUP BY country ORDER BY country. Sort aggregation emits output in group-key order, making the ORDER BY a no-op; hash aggregation forces an extra Sort operator on top. Same interesting-orders argument as sort-merge join.
Very high cardinality, approximate answer acceptable. SELECT COUNT(DISTINCT ip_address) FROM events. The distinct-count aggregate is holistic; the exact algorithm sorts and counts key boundaries, costing 4·B_R. A HyperLogLog sketch answers it to within 1–2% error in O(1) memory and O(B_R) time. ClickHouse, BigQuery, Presto, DuckDB, Redshift all ship HLL as a first-class aggregate.
GROUP BY country vs GROUP BY customer_id on the same 10 GB table
Setup. Orders table: 500 M rows × 200 bytes = 100 GB. Page size 8 KB, so B_R ≈ 1.25 × 10^7 pages — for clean arithmetic, call it B_R = 10^7. Buffer pool available to the aggregator: 100 MB = M ≈ 1.28 × 10^4 pages. Call it M = 10^4.
Query 1 — GROUP BY country. G = 200 distinct countries. Each accumulator bucket is roughly 64 bytes (two 64-bit aggregates plus dict overhead), so the whole hashmap is 200 × 64 = 12.8 KB. Fits trivially in memory.
Why 10^7: one linear scan, hashmap lives entirely in the L1/L2 cache, zero spill, zero re-reads. This is the best any aggregator can do and equals the cost of simply scanning the table.
At 1 GB/s SSD, that is about 100 seconds of wall clock — pure scan time, no CPU-bound stall anywhere. Pre-aggregation does not help (the hashmap already fits), so the whole plan is one operator.
Query 2 — GROUP BY customer_id. G = 50 × 10^6 distinct customers. Hashmap size: 50 × 10^6 × 64 \text{ bytes} = 3.2 \text{ GB} — 32 times memory. You cannot hold it.
External hash aggregation. Partition count: you need each partition's hashmap to fit in 100 MB = 1.5 × 10^6 groups per partition, so k ≥ 50 × 10^6 / 1.5 × 10^6 ≈ 34. Round up to k = 64.
At 1 GB/s, that is about 240 seconds — four minutes. Two and a half times slower than the country query despite being the same table, purely because of the distinct-key count.
Or sort aggregation. External sort costs 2·B_R·(1 + ⌈log_{M−1}(B_R/M)⌉). With B_R = 10^7, M = 10^4: B_R/M = 10^3, log_{M−1}(B_R/M) ≈ log_{10^4}(10^3) < 1, so ⌈...⌉ = 1. Total passes: 2. I/O: 4·B_R = 4 × 10^7 \text{ pages}, and the aggregation fuses into the final merge.
Hash wins by 33% on I/O — 3 × 10^7 vs 4 × 10^7. In this regime the planner picks hash unless the query has an ORDER BY customer_id downstream, in which case sort's free output order tips the balance.
With pre-aggregation. Run a small pre-aggregator on the input with a 10 K-entry hashmap. Each flush emits at most 10 K partial groups. Over 500 M rows, you flush roughly 5000 times — output volume roughly 5000 × 10 \text{K} = 5 \times 10^7 partial-group rows, which is ten percent of input. Feeding this reduced stream into the external hash aggregator drops the partition-pass I/O by roughly 10×, giving total I/O closer to 1.3·B_R = 1.3 × 10^7. DuckDB's actual numbers on this workload typically show a 3–5× wall-clock improvement from pre-aggregation alone.
Common confusions
-
"
GROUP BYis the same asORDER BY." No.GROUP BY countryproduces one row per country with aggregates;ORDER BY countryproduces the same input rows in a sorted sequence. Different output schemas, different row counts, different costs. They coincide only whenGROUP BYis implemented via sort aggregation and happens to emit in group-key order — an implementation accident, not a semantic guarantee. If you want ordered output, write theORDER BY. -
"Hash aggregation always wins for aggregate queries." No. On sorted or clustered input, sort aggregation is strictly cheaper on CPU (no hashing) and memory (one bucket, not
G). On skewed high-cardinality inputs, sort aggregation degrades gracefully while hash has to recursively partition. The Postgres planner flips betweenHashAggregateandGroupAggregateconstantly based on the estimated number of groups and the existence of sorted input. "Hash is default" ≠ "hash is best". -
"
AVGis a basic aggregate."AVGneeds a runningSUMand a runningCOUNTand a final division — three pieces of state and a finalise-time arithmetic. The running state is not the output, which is why theadd/finalisesplit matters. Every time someone writesAVG(x) = SUM(x) / Nand forgets that nulls skip both sides, a subtle off-by-one bug enters production. -
"
SELECT DISTINCTis free."SELECT DISTINCT x FROM tis exactlySELECT x FROM t GROUP BY xwith no aggregate. Same hash-versus-sort trade-off, same memory profile, same spill rules. Calling it "distinct" instead of "group by" does not change the cost — on a big table with a high-cardinality column, it is a 4×B_Rsort or a 3×B_Rspilling hash. Treat it like any other aggregation and budget for it. -
"
COUNT(DISTINCT x)is one of those basic aggregates." It is holistic. ComputingCOUNT(DISTINCT x)exactly requires either sorting or maintaining a hashmap of every distinct value seen — both proportional to the number of distinct values, which is the quantity you are trying to compute. This is why the planner treatsCOUNT(DISTINCT x)as a separate pass with a private aggregator, and why you should reach for HyperLogLog when an approximate answer is acceptable. The difference is a 16 KB sketch versus gigabytes of state. -
"Pre-aggregation always helps." Only when
|R|is much larger than the number of distinct keys fitting in the small hashmap. If every row is its own group (|R| = G), pre-aggregation does zero compression and you pay the overhead of an extra hashmap. Production engines check the early reduction ratio after a few thousand rows and disable the combiner if it is not earning its keep.
Going deeper
Vectorised aggregation
Classical hash aggregation does one add call per row — a virtual dispatch plus a hashmap lookup for a tiny amount of real work. DuckDB, ClickHouse, and Velox use vectorised aggregation: process a batch of 1024–2048 rows at a time. Vectorise the hash (SIMD, one instruction per 16 keys), the bucket lookup (AVX-512 gather), and the accumulator update (add(vector) instead of 1024 add(value) calls). The inner loop becomes branch-free columnar arithmetic, running at 5–10 GB/s per core on simple SUM/COUNT. The DuckDB aggregation blog walks through the design, including variable-width keys and skew handling.
Approximate aggregation — HyperLogLog and friends
COUNT(DISTINCT x) is the canonical holistic aggregate, and the exact algorithm is too expensive on billion-row inputs. HyperLogLog (Flajolet et al. 2007) maintains a 4–16 KB probabilistic state that estimates distinct count to within 1–2% error, in O(B_R) time and O(1) memory. The insight: hash each value uniformly, track the leading-zero position across a grid of buckets, and the expected max leading-zero count is log_2(distinct count). ClickHouse's uniqCombined, BigQuery's APPROX_COUNT_DISTINCT, DuckDB's approx_count_distinct are all HLL variants. Twin algorithms — Count-Min Sketch for heavy hitters, t-digest for quantiles — turn other holistic aggregates into algebraic ones at bounded error. A future chapter builds HLL from first principles.
ROLLUP, CUBE, GROUPING SETS
GROUP BY ROLLUP(country, city) computes the country-city grouping, the country rollup, and the grand total in one query. The naive implementation runs the aggregator three times; a smart one computes the finest (country, city) aggregation once and derives the coarser levels by re-aggregating the fine results — a distributive-aggregate property, which is why the bucket classification at the top of this chapter is load-bearing.
Where this leads next
-
Rule-based rewrites: predicate pushdown, join pushdown — the next optimiser layer. Aggregation pushdown rewrites
SUMover a joined-then-grouped expression into a pre-aggregation on one side of the join, shrinking the join's input. The distributive-vs-holistic classification governs which aggregates can be pushed through and which cannot. -
Window functions —
OVER (PARTITION BY x ORDER BY y)is structurally similar toGROUP BY xbut emits one output row per input row. The implementation shares the sort aggregation pipeline and adds a frame-advance loop on top. -
HyperLogLog and approximate aggregation — holistic aggregates need special algorithms; this is where sketches live.
-
Distributed aggregation — in an MPP engine, aggregation is two-phase: partial aggregate on each worker, shuffle to a coordinator, final aggregate. Exactly the partition-then-aggregate pattern with network replacing disk.
Aggregation is the operator you use most often, whether you know it or not. Every dashboard, every report, every "top 10" query, every EXPLAIN on an analytical workload has a HashAggregate or GroupAggregate in the plan. The three cost formulas from this chapter — B_R in memory, 3·B_R for external hash, 4·B_R for sort — are enough to read any query plan with confidence.
References
- Graefe, Query Evaluation Techniques for Large Databases, ACM Computing Surveys 25(2), 1993 — the reference survey for physical aggregation algorithms, covering both hash and sort variants, the distributive/algebraic/holistic classification, and the partitioning trick for external hash aggregation. The numbers in this chapter are calibrated against Graefe's formulas.
- Larson, Grouping and Duplicate Elimination: Benefits of Early Aggregation, Microsoft Research TR 97-36, 1997 — the paper that formalised pre-aggregation (the "combiner" trick) and showed its
|R|/Greduction factor empirically. Still the clearest exposition of why partial aggregation is the single biggest optimisation on top of hash aggregation. - Raasveldt, DuckDB Hash Aggregation Blog, 2022 — the DuckDB team's walkthrough of their vectorised hash aggregator, including two-phase aggregation, variable-width group keys, and skew handling. The most readable modern source on how aggregation actually runs in a production vectorised engine.
- Flajolet, Fusy, Gandouet, Meunier, HyperLogLog: the Analysis of a Near-Optimal Cardinality Estimation Algorithm, AofA 2007 — the original HyperLogLog paper. Proves the error bounds that every production
APPROX_COUNT_DISTINCTrelies on. A surprisingly readable 20 pages. - PostgreSQL source, nodeAgg.c — the production implementation of both hash and sort aggregation in Postgres, including the
ExecHashAggspill path with recursive partitioning. About 4000 lines, and one of the cleanest open-source aggregation implementations to read. - Pavlo, CMU 15-445 Aggregation Lecture Notes, Carnegie Mellon Database Group — the standard teaching reference for the hash-vs-sort comparison, with the cost derivations done in the same notation as this chapter.