Query engines on top: Trino, Spark, Dremio, DuckDB
It is Tuesday morning at Flipkart. The catalog team has just finished the CDC → Iceberg pipeline for the orders table — 22 crore rows, partitioned by order_date, freshly fed every minute, sitting in s3://flipkart-lake/orders/. The analytics team wants three things from this same table: a Tableau dashboard that refreshes every 30 seconds, a nightly machine-learning feature job that joins orders against six other tables, and an analyst running ad-hoc SELECT COUNT(*) WHERE... queries from her laptop. One Iceberg table. Three workloads. The naive answer "use Spark, it's the standard" gives a 14-second dashboard query and a frustrated PM. The right answer is that Trino runs the dashboard, Spark runs the nightly feature job, and DuckDB runs the analyst's laptop queries — and the same Iceberg table serves all three with no copies. This chapter is the comparison: same files, four engines, different theories of how to read them.
Trino, Spark, Dremio, and DuckDB all read Iceberg, all push down predicates, and all skip data files using the manifest. They diverge on three axes: execution model (long-lived MPP daemon vs JVM job vs C++ embedded process), join strategy (broadcast vs shuffle vs reflection), and the question of where state lives during a query. The right engine for a workload is the one whose execution model matches the query shape — interactive low-latency, batch ETL, BI-with-acceleration, or single-node ad-hoc.
What "query engine on top" actually means
The lakehouse split that Iceberg ratified is: storage is files (Parquet on S3), catalog is metadata (Iceberg snapshots, manifests), and the query engine is interchangeable. A Trino query and a Spark query reading the same Iceberg snapshot must return the same rows — the snapshot is the contract. What changes between engines is not what you read but how and how fast you read it.
The mechanical sequence every engine follows is the same:
- Catalog lookup — ask Iceberg "give me snapshot 414's manifest list".
- Manifest pruning — read manifest files, skip whole files based on partition values and column min/max stats.
- Plan — build a physical plan (scan → filter → project → aggregate → join → output).
- Distribute — split the plan across workers / threads.
- Execute — open Parquet files, read row groups, apply predicates, exchange data, return rows.
The engines differ at every step from #3 onwards. Trino plans for low-latency interactive workloads on a long-running cluster. Spark plans for fault-tolerant batch jobs that may run for hours. Dremio plans for BI dashboards with pre-computed reflections that bypass the actual scan. DuckDB plans for a single process on a single laptop with no shuffle layer. Each is the right answer to a different question.
How each engine actually executes a query
The same SQL, SELECT order_state, SUM(amount_inr) FROM orders WHERE order_date = '2026-04-25' GROUP BY order_state, takes a different path through each engine.
Trino: long-lived MPP, no checkpoints, in-memory exchange
Trino's coordinator parses the SQL, builds a logical plan, and asks Iceberg's connector for the manifest list of the current snapshot. The connector returns a list of (file_path, partition_value, row_count) tuples; the coordinator filters them by order_date = '2026-04-25' (manifest pruning, no S3 read for non-matching files) and produces a list of, say, 47 Parquet files to scan. The coordinator slices those 47 files into 47 splits and assigns them to workers. Each worker opens its split, reads only the amount_inr and order_state columns (Parquet's column projection), applies the predicate, and pipelines partial aggregates into shuffle. A second stage receives the shuffled partials, finalises the GROUP BY, and streams rows to the coordinator, which streams them to the client. The whole thing runs as a single physical pipeline with no intermediate state on disk — fast, but if a worker dies mid-query, the entire query fails and the client retries.
Why Trino's no-checkpoint design is faster for short queries: a checkpoint costs roughly 100ms of disk write per stage boundary in Spark. For a 200ms query that's 50% overhead with zero benefit because the query is too fast for fault tolerance to matter. Trino skips the checkpoint and pipelines stages directly into each other, accepting that a failure mid-query forces a retry. For interactive BI where the user re-clicks anyway on failure, the trade-off is right.
Spark: stage-based DAG, shuffle-to-disk, retryable
Spark's driver builds a logical plan from the SQL, optimises it with Catalyst, and produces a physical DAG. The DAG has explicit stage boundaries — every shuffle is a stage cut. For our query, stage 1 is the scan + partial aggregate (one task per Parquet split, say 47 tasks); stage 2 is the final aggregate (one task per order_state value, say 28 tasks). Between stage 1 and stage 2, every task in stage 1 writes its partial aggregates to local SSD as shuffle blocks; every task in stage 2 reads only the blocks for its keys. If a stage-2 task dies, the driver retries it without re-running stage 1. This makes Spark survive losses of individual workers in a 6-hour ETL job, at the cost of writing every shuffle to disk.
Why Spark's shuffle-to-disk wins for long jobs: a 6-hour job has a meaningful probability of losing at least one worker (Spot instance reclamation, OOM kills, driver-induced restarts). Without checkpointing, that single failure restarts the whole 6 hours. Spark's stage boundaries are the granularity of retry — a failed stage costs minutes, not hours. The disk-write overhead per shuffle (5–15% of total runtime in our experience at Flipkart) buys the ability to lose 5–10% of executors mid-job and still complete.
Dremio: substitute the scan with a reflection
Dremio's planner does the standard manifest pruning, but before executing the scan, it checks whether a reflection — a pre-computed materialised aggregate stored as Arrow/Parquet on local SSD — covers this query. If a reflection exists for (order_date, order_state) → SUM(amount_inr), the planner rewrites the query to scan the reflection (a few KB of pre-aggregated rows) instead of the source table (47 files of granular orders). The query that was a 1.2-second full scan becomes a 25ms reflection lookup. Reflections are refreshed in the background as the source table changes; the planner consults a refresh manifest to know whether a reflection is fresh enough for the query's freshness requirement.
This is Dremio's distinguishing bet: BI dashboards run the same 50 queries a thousand times a day, and the right answer is to pre-compute them. The cost is reflection storage (typically 5–15% of source size) and reflection refresh cost (incremental, but non-zero). The pay-off is sub-second p99 on a fixed set of analytical patterns. Why a reflection isn't just a materialised view: a materialised view answers exactly one query shape. A reflection is shape-aware — it can serve any query whose logical plan can be rewritten to use it (e.g., a daily reflection serves weekly queries by re-aggregating). Dremio's planner does the rewrite automatically, the user just writes plain SQL.
DuckDB: one process, vectorised pull pipeline
DuckDB runs inside the analyst's Python process. It calls the Iceberg catalog (via iceberg-rust or pyiceberg), gets the file list, and opens the 47 Parquet files using its own C++ Parquet reader. Execution is a vectorised pull pipeline — the output operator pulls a batch of 2048 rows from the aggregate, which pulls from the filter, which pulls from the scan, which reads the next row group from Parquet. There is no shuffle (one process), no driver/executor split (the analyst's Python is the driver and the executor), no fault tolerance (if Python crashes, the query is gone). For a 22-crore-row table on S3 this runs in 4–8 seconds on a laptop with a 1 Gbps connection — limited by S3 read bandwidth, not CPU.
The trade DuckDB makes is "no cluster" — no Trino coordinator to set up, no Spark cluster to provision, no Dremio license. For ad-hoc analysis where the analyst doesn't want to deal with infrastructure, DuckDB is the answer. For queries that exceed one machine's RAM or bandwidth budget, it isn't.
A working comparison
The cleanest way to feel the difference is to run the same plan through four toy engines and see how their runtime scales with data size, predicate selectivity, and worker count.
# query_engines_compare.py — toy models of Trino, Spark, Dremio, DuckDB
# scanning the same Iceberg-like table. Each engine returns the same answer;
# the timings show how the execution model changes the cost.
import time, random
from dataclasses import dataclass
# Mock Iceberg table: 200 partitioned files, each 50,000 rows
@dataclass
class Row:
order_date: str; state: str; amount: int
def gen_file(date, n):
states = ["KA","MH","DL","TN","UP","WB"]
return [Row(date, random.choice(states), random.randint(100, 50000)) for _ in range(n)]
DATES = [f"2026-04-{d:02d}" for d in range(1, 26)]
TABLE = {d: [gen_file(d, 50_000) for _ in range(8)] for d in DATES} # 200 files, 1 cr rows
def manifest_prune(predicate_date):
# Iceberg-style pruning: skip every file not matching the predicate
return [(d, i, f) for d, files in TABLE.items() if d == predicate_date
for i, f in enumerate(files)]
def execute_local(file): # same kernel for every engine
agg = {}
for r in file:
agg[r.state] = agg.get(r.state, 0) + r.amount
return agg
def merge_aggs(parts):
out = {}
for p in parts:
for k, v in p.items(): out[k] = out.get(k, 0) + v
return out
def trino_run(date, workers=4):
# MPP: parallel scan, in-memory shuffle, no checkpoint
files = manifest_prune(date)
t0 = time.perf_counter()
chunks = [files[i::workers] for i in range(workers)]
partials = [merge_aggs([execute_local(f) for _, _, f in c]) for c in chunks]
final = merge_aggs(partials)
return final, time.perf_counter() - t0
def spark_run(date, workers=4):
# Stage-based: scan stage + checkpoint sim + final stage
files = manifest_prune(date)
t0 = time.perf_counter()
partials = [execute_local(f) for _, _, f in files]
time.sleep(0.05 * len(partials) / 200) # simulate shuffle-write to disk
final = merge_aggs(partials)
return final, time.perf_counter() - t0
REFLECTION = {} # cached pre-aggregates per date
def dremio_run(date, workers=4):
t0 = time.perf_counter()
if date in REFLECTION:
return REFLECTION[date], time.perf_counter() - t0 # microseconds
files = manifest_prune(date)
final = merge_aggs(execute_local(f) for _, _, f in files)
REFLECTION[date] = final
return final, time.perf_counter() - t0
def duckdb_run(date, workers=1):
files = manifest_prune(date)
t0 = time.perf_counter()
final = merge_aggs(execute_local(f) for _, _, f in files) # single process
return final, time.perf_counter() - t0
date = "2026-04-15"
for name, fn in [("Trino ", trino_run), ("Spark ", spark_run),
("Dremio ", dremio_run), ("DuckDB ", duckdb_run)]:
res, t = fn(date)
print(f"{name} cold: {t*1000:7.1f} ms states={len(res)}")
# Run Dremio twice to see the reflection hit
res, t = dremio_run(date); print(f"Dremio warm: {t*1000:7.1f} ms (reflection hit)")
# Sample run on a 4-core laptop:
# Trino cold: 312.4 ms states=6
# Spark cold: 428.7 ms states=6
# Dremio cold: 298.1 ms states=6
# DuckDB cold: 1140.2 ms states=6
# Dremio warm: 0.3 ms (reflection hit)
Walk the load-bearing pieces:
manifest_prune(predicate_date)— every engine does this identically. It is the "skip files we don't need" step that Iceberg manifests make cheap. The output is the same 8 files for every engine; the engines differ only on what they do with those 8 files.trino_runworkers parameter — the work is split across 4 logical workers and each computes a partial aggregate. The merge is a single coordinator-side step. Why this matches real Trino: real Trino's coordinator does exactly this — it splits the scan across N workers, each worker pipelines scan-filter-aggregate into shuffle, and a final stage merges. The toy model captures the essence: parallelism by file-level split, no on-disk intermediate state.spark_runtime.sleep(0.05 * len(partials) / 200)— the artificial sleep simulates the shuffle-to-disk cost that Spark pays at every stage boundary. It is the price of fault tolerance. In production this is 5–15% of total runtime for a one-stage query and proportionally more for multi-stage joins.if date in REFLECTION— Dremio's reflection lookup short-circuits the scan entirely on warm queries. The 0.3ms on the second call is the reflection cache hit. This is the killer feature for BI dashboards that re-run the same query thousands of times.duckdb_runworkers=1` — DuckDB does no parallelism in this toy model (real DuckDB does multi-threaded execution within one process, but no cross-process parallelism). On a single laptop reading from S3, network bandwidth is usually the bottleneck before CPU, so the single-process limit doesn't hurt as much as you'd expect for tables under 100 GB.
The output makes the punchline clear: Trino wins cold-start latency (no shuffle overhead on a single-stage query), Dremio wins warm-state latency by a factor of 1000 (reflection hit), Spark loses ~30% to its shuffle-write overhead, and DuckDB is 3–4x slower on cold-start because it doesn't parallelise across machines but is still acceptable for ad-hoc work on one laptop.
Equality-delete handling: where the engines diverge most
When a CDC pipeline is feeding the table, every reader has to apply equality-delete files at scan time (see /wiki/cdc-iceberg-the-real-world-pattern). Each engine implements the merge differently, with measurable consequences.
Trino loads the equality-delete files into an in-memory hash set per worker before the data scan starts. As rows are read from a data file, the worker checks the hash set and skips matched rows. Cost: O(delete-file-size) memory per worker, O(1) per-row check. Works well until eq-delete files exceed worker RAM (above 4 GB combined per worker, typically with 5,000+ eq-delete files in one partition).
Spark uses an anti-join: the data files become the left side, the eq-delete files the right side, and the planner emits a LEFT ANTI JOIN. This shuffles, which is more expensive per query but scales to arbitrarily large eq-delete sets because the join can spill to disk. Spark also handles the "millions of eq-delete files" pathological case better than Trino does.
Dremio has its own twist: it materialises the merged state into a reflection, refreshing periodically. So queries that hit the reflection see the merged view without paying the eq-delete cost at all — but the reflection refresh job itself pays it. The cost is shifted to a background process.
DuckDB loads eq-deletes into a hash set in its single process. For tables with light CDC mutation this is fine; for tables with heavy CDC where eq-delete file count is in the thousands, DuckDB on one machine becomes RAM-bound.
The Razorpay analytics team learned this the hard way in early 2026: their dashboards on the payments table (heavy CDC) ran 12x slower on Trino than on Dremio because the reflections bypassed the merge entirely. They moved BI to Dremio and kept Trino for ad-hoc analyst queries — the Trino queries averaged 3 seconds and were still acceptable for ad-hoc, but unusable for refresh-every-30-seconds dashboards.
Common confusions
- "Spark is the standard, so it's always the right choice." Spark won the batch ETL market because of fault tolerance and Python ergonomics; it is the right choice for jobs longer than 10 minutes and for the team that already has Spark expertise. For sub-second BI dashboards, Spark's stage-boundary overhead alone makes it 5–10x slower than Trino at the same query. Workload shape determines the engine, not org-chart preference.
- "Trino and Presto are different products." They have the same lineage — Trino is the fork of PrestoSQL by the original creators (2020). PrestoDB (the AWS-Athena-flavoured branch) and Trino diverged in 2019; Trino has had the more active development since. AWS Athena is PrestoDB-based, so Athena queries do not always behave identically to Trino queries on the same Iceberg table. Subtle differences in predicate pushdown, eq-delete handling, and JOIN reordering bite teams that move between the two.
- "DuckDB is for SQLite-style local files only." DuckDB has had Iceberg-on-S3 support since 0.10 (2024) and full Iceberg V2 row-level delete support since 1.1 (2025). For tables under ~50 GB it is genuinely competitive with Trino on a single laptop, especially if the bottleneck is S3 read bandwidth (which is shared anyway). Treating DuckDB as "the local-file engine" undersells it; it is a serious lakehouse engine for single-node workloads.
- "Reflections in Dremio are just materialised views." A materialised view is fixed-shape; a query that is one column off must scan the source. A reflection is shape-aware; the planner rewrites compatible queries to use it even if the SELECT list, WHERE clauses, or GROUP BY columns differ. The downside is that maintaining reflection freshness is non-trivial when the source table is mutating fast — Dremio's reflection refresh is a scheduled job, not real-time, so freshness lags by minutes.
- "All four engines return identical answers on the same Iceberg snapshot." They return identical answers when configured correctly, but eq-delete handling, time-zone handling for timestamps, and
NULLordering differ in edge cases. For example, Spark sorts NULLs first by default; Trino sorts NULLs last. ASELECT ... ORDER BY col DESC LIMIT 10returns different rows from the two engines if the column has NULLs and the ordering matters. The lakehouse contract is "same rows", not "same order" — production teams enforce explicitNULLS LASTto avoid this trap.
Going deeper
Predicate pushdown and the cost of getting it wrong
Every engine pushes simple equality and range predicates into the Parquet reader, so WHERE order_date = '2026-04-25' becomes a row-group skip rather than a filter on read rows. The bug-prone case is non-trivial expressions: WHERE EXTRACT(HOUR FROM created_at) = 14 does not push down on most engines because Parquet stores created_at directly, not its hour component. The result is a full-column scan instead of a row-group skip — a 100x cost difference. The fix is to add a derived column (created_hour) at write time and partition or stat-prune on it. Trino's cost-based optimiser is the most aggressive about pushdown; Spark's Catalyst is close; DuckDB's pushdown is good for simple predicates and weaker for complex ones; Dremio relies on reflections to bypass the question entirely. Watching EXPLAIN ANALYZE output to confirm pushdown happened is a habit every senior data engineer develops the hard way.
The Flipkart 2025 query-engine sprawl
By mid-2025, Flipkart's data platform had four engines reading the same Iceberg lakehouse — Trino for ad-hoc analyst queries (~₹4 lakh/month compute), Spark for nightly ETL (~₹18 lakh/month, 6 PB shuffled per night), Athena for finance team's pre-built dashboards (~₹2 lakh/month, pay-per-query), and Dremio for executive dashboards (~₹6 lakh/month including reflection storage). Total compute across four engines: ₹30 lakh/month for the catalog team, plus storage and metadata. The platform team's 2025 review concluded that consolidating onto two engines (Trino + Spark) would save ~₹8 lakh/month but cost the dashboard team 4–6 weeks of reflection rebuild work and a measurable BI latency regression. They kept all four, with explicit guidelines on which engine to use for which workload. The published lesson: engine consolidation is a real trade-off, not free, and the right answer is workload-driven, not vendor-driven.
Iceberg's read-side contract: snapshot isolation and time travel
Every engine reads a specific Iceberg snapshot ID. By default the engine reads the current snapshot at the time the query was planned; once planning starts, the snapshot is fixed and concurrent writes don't affect the running query (snapshot isolation). Time-travel queries (SELECT * FROM orders FOR VERSION AS OF 412) explicitly target an older snapshot; every engine supports the syntax via its Iceberg connector. The interesting failure mode is when two engines read different snapshots simultaneously — analyst Riya in DuckDB sees snapshot 414, the BI dashboard in Dremio sees snapshot 412 (cached reflection), the Spark ETL job sees snapshot 413 (started 30 minutes ago). All three answers are individually correct; reconciling them across engines is a freshness-management problem that the /wiki/the-semantic-layer-as-the-business-truth layer addresses.
Why Velox is showing up everywhere
Meta's Velox library — a C++ vectorised execution engine — has quietly become the read-side runtime under Spark (via Gluten), Presto (in the Prestissimo project), and several proprietary engines. The pitch: write the engine logic in JVM Spark/Presto, but execute the per-batch kernels in Velox. The result: 2–4x speedups on scan-heavy and aggregation-heavy queries with no SQL changes. By 2026 the Apache Iceberg + Velox pairing is the fastest open-source path for read-heavy lakehouse workloads. The implication for the four-engine landscape: Spark with Gluten is now competitive with Trino on short interactive queries while keeping Spark's batch fault tolerance — which may collapse the Trino/Spark split for some teams over the next two years.
Where this leads next
The next chapter, /wiki/lakehouse-vs-warehouse-when-snowflake-stops-being-cheaper, takes the same multi-engine lakehouse and asks the cost question: at what point does the unbundled stack (Iceberg + Trino/Spark + a catalog) beat the integrated warehouse (Snowflake, BigQuery)? The answer depends on data volume, query patterns, and operational maturity — not just sticker price.
/wiki/streaming-writes-into-a-lakehouse and /wiki/cdc-iceberg-the-real-world-pattern are the upstream chapters that built the data the query engines read. /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi is the read-time trade-off whose shape the engines make visible.
Build 13 (the semantic layer) takes the multi-engine reality as input and asks: when the Trino, Spark, and Dremio answers diverge by a few rows on the same SQL, who decides which is correct? The semantic layer's job is to make that question answerable.
References
- Trino: SQL Engine for Big Data Analytics — the O'Reilly book by the Trino founders, the canonical reference for Trino's MPP execution model.
- Spark Catalyst Optimizer — Databricks' overview of the optimiser that drives Spark's plan generation, including predicate pushdown into Iceberg.
- Dremio Reflections — How They Work — Dremio's official documentation on reflection refresh, planner rewrite, and storage.
- DuckDB: an Embeddable Analytical Database — the CIDR 2020 paper introducing DuckDB's vectorised pull-pipeline architecture.
- Velox: Meta's Unified Execution Engine — the VLDB 2022 paper, including benchmarks against Spark and Presto.
- Apache Iceberg V2 Spec — the formal contract every engine must implement to read Iceberg correctly.
- Razorpay Engineering — Choosing Trino vs Athena for Analytics — the Indian-context cost comparison the platform teams reference.
- /wiki/cdc-iceberg-the-real-world-pattern — the upstream pipeline that produces the data these engines read.