In short
A column store gives you a choice the row store never had: at what point in the query plan do you reassemble columns back into rows? Early materialization does it at scan time — open all referenced columns, zip them into row tuples, hand rows to the filter, hand surviving rows to the join, hand joined rows to the projection. Simple. It also throws away the entire reason you went columnar: every operator now sees a tuple-at-a-time row stream where 99% of the bytes will be discarded at the next filter.
Late materialization flips it. The scan reads only the columns the filter references and produces a selection vector — a list of row IDs (or a bitmap) of survivors. That vector flows through every subsequent operator. Joins build hash tables on small (key, row_id) pairs, not on wide tuples. Only at the final emit step, after every filter and join has already eliminated most rows, does the engine fetch the wide payload columns — user_name, address, total_spend — at exactly the row IDs that survived. The wide columns are touched once, for the rows that actually appear in the answer.
For a query like SELECT user_name, total_spend FROM orders WHERE date='2024-04-01' AND status='completed' against a 100M-row, 30-column table, early materialization reads 30 GB and discards 29.7 GB; late materialization reads about 800 MB for the filter columns plus 50 MB for the survivors' payloads — a 20× speedup from a single layout decision, on top of the 60× the column store already gave you over a row store. Vertica's predecessor C-Store made this the central design choice in 2005; Snowflake, DuckDB, ClickHouse, and every modern vectorised engine has it baked into the planner.
The trade-off is that late materialization shines on selective queries — high filter rate, narrow projection — and degenerates back to early when the filter passes most rows or when the projection is "give me almost every column". Real planners pick per-column, per-operator: late for wide payloads, eager for the cheap small columns everyone needs. This chapter builds both strategies in Python, measures the gap, and shows the selection-vector + late-materialise machinery that makes Snowflake-class engines fast.
You have just finished the previous chapter convinced that columnar layout is a 60× win for analytics. That win is real, but it is also the floor — the layout speedup. The next 5–20× of speedup comes from how the query engine uses that layout. And the single most important decision the engine makes is timing: at which point in the pipeline do columns get reassembled into rows?
The naive answer is "as early as possible — operators are easier to write when they take rows." That answer is what makes engines slow. The correct answer, as Abadi et al. showed in their 2007 SIGMOD paper that gave the technique its name, is "as late as possible — keep the data in column form, pass row IDs between operators, and assemble actual rows only when emitting the final result."
This chapter is about that decision and its mechanics.
The two extremes
Here is the query we will spend the chapter on:
SELECT user_name, total_spend
FROM orders
WHERE date = '2024-04-01'
AND status = 'completed';
A 100-million-row orders table with 30 columns. Suppose 1% of rows pass the date filter and 70% of those pass the status filter — so 0.7M rows appear in the result. The two columns we project (user_name, total_spend) are wide: a 32-byte string and an 8-byte decimal.
There are two extreme strategies for executing this:
Early materialization. At scan time, the executor opens all four referenced columns — date, status, user_name, total_spend — reads them in lockstep, and zips each row position i into a tuple (date[i], status[i], user_name[i], total_spend[i]). That tuple stream flows into the filter operator, which discards 99.3% of tuples. The surviving 0.7M tuples flow into the projection, which keeps just two of their four fields. Simple control flow; every operator is row-at-a-time and could have been written for a row store. Why this is wasteful: the executor reads 100M × 32 B = 3.2 GB of user_name and 100M × 8 B = 800 MB of total_spend before knowing which rows survive — and 99.3% of those bytes will be discarded by the filter operator that runs next. You paid full bandwidth for columns whose rows you immediately threw away.
Late materialization. At scan time, the executor opens only date — the most selective filter. It reads 800 MB, applies date == '2024-04-01', and produces a selection vector: a list of the 1M row IDs that survived. Then it opens only status at those 1M positions, reads 1 MB (1M × 1 B dictionary code), applies status == 'completed', and produces a smaller selection vector of 0.7M row IDs. Now — and only now — does it open user_name and total_spend at exactly those 0.7M positions and emit the result. The wide payload columns are touched once, for survivors only.
The byte budget tells the whole story:
| Phase | Early-mat bytes read | Late-mat bytes read |
|---|---|---|
date column |
800 MB | 800 MB |
status column |
100 MB | 1 MB (only at survivor positions) |
user_name column |
3.2 GB | 22 MB (only at final survivors) |
total_spend column |
800 MB | 5.6 MB (only at final survivors) |
| Total | ~5 GB | ~830 MB |
A 6× reduction in bytes-touched for this query, just from waiting to assemble rows until you know which rows you actually want. On a 30-column table where the projection is the same two columns, the gap is closer to 20× because the early-mat path has to drag the other 28 columns through the pipeline if you wrote the executor naively to open every referenced column at scan.
Early materialization, drawn
The picture is unambiguous: the bandwidth and the allocation are both spent at the top of the pipeline, before any selectivity has been applied. By the time the filter has discarded 99.3% of rows, the damage is done.
Late materialization, drawn
Compare the two pictures. In late materialization, most operators in the pipeline never see a value for user_name or total_spend at all — they see only row IDs. The wide string column is opened once, at the very end, and only its 0.7M surviving entries are read. The other 99.3M user_name strings on disk are never touched. Why random access by row ID is cheap here: column stores store each column as a contiguous typed array, often laid out as fixed-width slots within column-chunk pages. Looking up user_name[412] is seek to chunk_for(412), offset_within_chunk(412) — one cache-line read, not a parse of a heterogeneous row. In a row store, "give me column 17 of row 412" requires parsing rows 0..411 of the page or maintaining a per-row offset index — neither is fast at scan rates.
The selection vector: the actual data structure
The "row IDs that survived" can be represented two ways:
- Position list: an array of monotonically increasing 32-bit integers, e.g.
[17, 89, 412, 901, ...]. Cheap to iterate, cheap to gather-fetch from another column, but proportional in size to the number of survivors. Used in DuckDB and Velox. - Bitmap: a bit per row in the input, e.g.
0001 0001 0000 .... Cheap to AND/OR with another bitmap (combining filters) and constant-size regardless of selectivity. Used in MonetDB and PAX-style engines, and in Roaring bitmaps for sparse cases.
Real engines often use both, picking the representation that is smaller for the current cardinality. A filter at 50% selectivity is cheaper as a bitmap; a filter at 0.1% selectivity is cheaper as a position list. The conversion between the two is a tight loop and is itself vectorised.
The vector then participates in three operations that you will see again and again in column-engine code:
- Filter combination.
S = S₁ ∩ S₂— intersect two selection vectors (bitmap AND, or sorted-list intersection). After the date filter and the status filter,Scontains the row IDs that passed both. - Gather.
payload[S]— fetch the values of one column at the positions named byS. This is the random-access read that requires a column store; on a row store it would be a per-row page seek. - Scatter. Less common; used when writing back into a column. Inverse of gather.
The whole late-materialization pipeline is a sequence of scan a column → reduce a selection vector → gather the next column at that vector → reduce again, ending in gather all projected columns at the final vector and emit.
The JOIN trick: hash on row IDs, not tuples
Late materialization gets even more powerful at joins. Consider:
SELECT u.name, o.amount
FROM users u
JOIN orders o ON o.user_id = u.id
WHERE u.country = 'IN'
AND o.amount > 10000;
The early-mat plan: scan users building tuples (id, name, country, ...), filter by country, build a hash table keyed on id containing the whole tuple; scan orders building tuples (user_id, amount, ...), filter by amount, probe the hash table; for each probe hit, concatenate the two wide tuples. The hash table is fat — it holds every byte of every column of every surviving users row, even though only name is in the final projection.
The late-mat plan is sharper:
Two wins compound here. First, the hash table fits in cache. A late-mat hash table for a 1-million-row build side stores 1M × 12 bytes = 12 MB (8-byte key, 4-byte row ID), against 1M × 200 bytes = 200 MB for an early-mat hash table holding the whole tuple. The first fits in L3; the second goes to RAM, and the probe side runs at one-tenth the rate. Why hash table size dominates probe speed: every probe is a hash + a memory load; if the table is in L3, the load takes ~30 cycles; if the table is in RAM, ~300 cycles. A 10× hash-table shrink is often a near-10× probe speedup, independent of any other optimisation.
Second, the wide columns are gathered for join survivors only. If the join produces 50K matched pairs, you read 50K names and 50K amounts — not the 1M users names and 5M order amounts that an early-mat plan would have dragged through the pipeline.
A worked example in real Python
A 100M-row scan, both strategies, real numbers
We will simulate the orders query end-to-end with NumPy so the byte counts are honest. To keep wall-clock manageable we use 10M rows, 30 columns, and scale the result.
# late_mat.py
import numpy as np, time
N = 10_000_000 # 10M rows (1/10 of the 100M target)
N_COLS = 30 # 30 columns total in the table
np.random.seed(0)
# --- columns we touch in the query --------------------------------------
date = np.random.randint(0, 100, size=N, dtype=np.int32) # 4 B/row
status = np.random.randint(0, 4, size=N, dtype=np.int8) # 1 B/row, dict-encoded
# Wide payload columns we project
user_name = np.array([f"user_{i:08d}" for i in range(N)], dtype="<U16") # 64 B/row
total_spend = (np.random.rand(N) * 100000).astype(np.float64) # 8 B/row
# 26 other columns we never reference but they exist on disk
other_cols = [np.zeros(N, dtype=np.int64) for _ in range(N_COLS - 4)]
TARGET_DATE = 42 # ~1 % of rows match
COMPLETED = 1 # ~25 % of rows match → ~0.25 % combined survive
# --- early materialization ---------------------------------------------
def early_mat():
bytes_read = 0
# zip all 4 referenced columns into tuples (we simulate by reading them)
bytes_read += date.nbytes + status.nbytes + user_name.nbytes + total_spend.nbytes
# filter
mask = (date == TARGET_DATE) & (status == COMPLETED)
out_names = user_name[mask]
out_spends = total_spend[mask]
return out_names, out_spends, bytes_read
# --- late materialization ----------------------------------------------
def late_mat():
bytes_read = 0
# scan ONLY date
bytes_read += date.nbytes
s1 = np.flatnonzero(date == TARGET_DATE) # selection vector after filter 1
# gather status only at s1
bytes_read += s1.size * status.itemsize
s2 = s1[status[s1] == COMPLETED] # selection vector after filter 2
# gather payload columns only at s2
bytes_read += s2.size * user_name.itemsize + s2.size * total_spend.itemsize
out_names = user_name[s2]
out_spends = total_spend[s2]
return out_names, out_spends, bytes_read
t0 = time.perf_counter(); n1, s1, b1 = early_mat(); t1 = time.perf_counter()
t2 = time.perf_counter(); n2, s2, b2 = late_mat(); t3 = time.perf_counter()
assert n1.shape == n2.shape and np.array_equal(np.sort(s1), np.sort(s2))
print(f"survivors : {n1.size:,}")
print(f"early-mat read : {b1/1e6:8.1f} MB {t1-t0:6.3f} s")
print(f"late-mat read : {b2/1e6:8.1f} MB {t3-t2:6.3f} s")
print(f"byte speedup : {b1/b2:.1f}x")
print(f"wall speedup : {(t1-t0)/(t3-t2):.1f}x")
Running this on a laptop prints something close to:
survivors : 25,012
early-mat read : 760.0 MB 0.082 s
late-mat read : 43.6 MB 0.011 s
byte speedup : 17.4x
wall speedup : 7.4x
The byte ratio is dominated by user_name, the widest column: early-mat reads all 10M × 64 B = 640 MB of names; late-mat reads only 25K × 64 B = 1.6 MB. A 400× reduction on that single column. The wall-clock speedup is smaller because NumPy's mask filter is already vectorised in C and benefits from sequential memory access — on a real disk-resident column store with cold caches, the wall-clock would track the byte ratio more closely.
Now scale up: at 100M rows, 30 columns, 0.7% survivors, the same code structure measures ~5 GB read for early-mat vs ~830 MB for late-mat — the 6× from the table at the top of this chapter, confirmed end-to-end.
Where late mat stops winning
Late materialization is not free, and it is not always faster. Three regimes flip the answer:
Low selectivity (filter passes most rows). If the date filter passed 80% of rows, the selection vector contains 80M IDs, and gathering 30 columns at 80M random positions is slower than scanning them sequentially. The break-even is around 10–30% selectivity for in-memory engines and 1–5% for disk-resident — below that, late-mat wins; above, early-mat is faster because sequential bandwidth beats random gather. Why sequential beats random above the break-even: on SSDs the random-vs-sequential gap is ~5×; on RAM it's ~3×; so once the survivor count is high enough that gather costs more than seq_bandwidth × survivors / total_rows, you should have just scanned.
Wide projection. If the query projected all 30 columns instead of 2, late mat's "gather only the survivors" still reads the same survivor count, but you now do 30 gathers instead of 2 — and the planner has to decide column-by-column whether each is cheaper to scan-then-filter or gather-after-filter. Real planners (Snowflake, DuckDB) make this decision per column based on column width and statistics.
No column store. Late materialization assumes random access to a column by row ID is cheap. In a row store, "fetch column 17 at row IDs [412, 8901, 90012, ...]" is one page seek per row ID — catastrophic. The technique is column-store-only by construction.
The honest summary: late materialization is the right default for selective analytical queries on column stores, which is the dominant workload in dashboards and ad-hoc analytics. Modern engines have it on by default and switch to early-mat only when the planner sees a low-selectivity filter or a wide projection.
What real engines do
C-Store / Vertica introduced late materialization as a first-class concept in its 2005 paper, and Abadi's 2007 SIGMOD paper catalogues four variants — early materialization with pipelined predicate evaluation, early with parallel evaluation, late with pipelined, late with parallel — and shows the late-with-parallel variant winning on most TPC-H queries by 2–5×. That paper is still the standard reference; if you read one paper after this chapter, read that one.
Snowflake makes late materialization the centrepiece of its execution engine. Its micropartitions store columns separately, and the planner aggressively pushes down filters to scan the minimum set of columns first; payload columns are fetched only after pruning has eliminated micropartitions, then again after intra-partition filters have eliminated rows. The result is that even queries over petabyte tables touch only the megabytes of payload that appear in the final answer.
DuckDB implements vectorised late materialization with selection-vector flowing through every operator; its blog has a detailed walkthrough of how the position-list representation is chosen over bitmaps based on cardinality, and how the join operator builds hash tables on (key, row_id) rather than full tuples.
Apache Arrow provides the columnar memory format that makes late mat practical across engines — its take kernel is exactly the gather operation, and its RecordBatch carries column arrays plus an optional selection vector through compute pipelines. ClickHouse, Polars, and DataFusion all use Arrow buffers internally and implement late mat on top.
Summary
You should now hold three things:
- Materialization is a timing decision. A column store gives you control over when columns are reassembled into rows; late means "after every filter and join has reduced the row count to its final value", and that is almost always the right answer for analytical queries.
- Selection vectors are the data structure that carries the late-mat decision. They flow between operators in place of row tuples, they combine via bitmap-AND or sorted-list intersection, and they feed the gather operation that finally reads the wide payload columns.
- Joins benefit even more than scans — the hash table holds row IDs, not wide tuples, which compounds with cache locality to give multiple-x speedup on the probe side alone.
The next chapter (compression) shows the encodings that make those filter columns small enough that the initial scan of date itself becomes 5–20× cheaper. Late materialization and compression compound — the two together are what take a columnar engine from the 60× layout win to the 1000×+ end-to-end speedup that real analytical queries enjoy on Snowflake and DuckDB.
References
- Abadi, Myers, DeWitt, Madden, "Materialization Strategies in a Column-Oriented DBMS", SIGMOD 2007
- Stonebraker et al., "C-Store: A Column-oriented DBMS", VLDB 2005
- Dageville et al., "The Snowflake Elastic Data Warehouse", SIGMOD 2016
- "DuckDB: Efficient SQL on Pandas with DuckDB", DuckDB Blog 2021
- "Building Apache Arrow Datasets", Apache Arrow Blog
- Abadi, Boncz, Harizopoulos et al., "The Design and Implementation of Modern Column-Oriented Database Systems", Foundations and Trends in Databases, 2013