Query engines on top: Trino, Spark, Dremio, DuckDB

It is Tuesday morning at Flipkart. The catalog team has just finished the CDC → Iceberg pipeline for the orders table — 22 crore rows, partitioned by order_date, freshly fed every minute, sitting in s3://flipkart-lake/orders/. The analytics team wants three things from this same table: a Tableau dashboard that refreshes every 30 seconds, a nightly machine-learning feature job that joins orders against six other tables, and an analyst running ad-hoc SELECT COUNT(*) WHERE... queries from her laptop. One Iceberg table. Three workloads. The naive answer "use Spark, it's the standard" gives a 14-second dashboard query and a frustrated PM. The right answer is that Trino runs the dashboard, Spark runs the nightly feature job, and DuckDB runs the analyst's laptop queries — and the same Iceberg table serves all three with no copies. This chapter is the comparison: same files, four engines, different theories of how to read them.

Trino, Spark, Dremio, and DuckDB all read Iceberg, all push down predicates, and all skip data files using the manifest. They diverge on three axes: execution model (long-lived MPP daemon vs JVM job vs C++ embedded process), join strategy (broadcast vs shuffle vs reflection), and the question of where state lives during a query. The right engine for a workload is the one whose execution model matches the query shape — interactive low-latency, batch ETL, BI-with-acceleration, or single-node ad-hoc.

What "query engine on top" actually means

The lakehouse split that Iceberg ratified is: storage is files (Parquet on S3), catalog is metadata (Iceberg snapshots, manifests), and the query engine is interchangeable. A Trino query and a Spark query reading the same Iceberg snapshot must return the same rows — the snapshot is the contract. What changes between engines is not what you read but how and how fast you read it.

The mechanical sequence every engine follows is the same:

  1. Catalog lookup — ask Iceberg "give me snapshot 414's manifest list".
  2. Manifest pruning — read manifest files, skip whole files based on partition values and column min/max stats.
  3. Plan — build a physical plan (scan → filter → project → aggregate → join → output).
  4. Distribute — split the plan across workers / threads.
  5. Execute — open Parquet files, read row groups, apply predicates, exchange data, return rows.

The engines differ at every step from #3 onwards. Trino plans for low-latency interactive workloads on a long-running cluster. Spark plans for fault-tolerant batch jobs that may run for hours. Dremio plans for BI dashboards with pre-computed reflections that bypass the actual scan. DuckDB plans for a single process on a single laptop with no shuffle layer. Each is the right answer to a different question.

Four query engines reading the same Iceberg tableA central Iceberg table on S3 sits at the bottom; four query engines (Trino, Spark, Dremio, DuckDB) sit above and connect to the same files. Each engine has a different execution model annotated above it: MPP daemon for Trino, JVM batch for Spark, BI-tier with reflections for Dremio, embedded single-process for DuckDB. The diagram emphasises that the storage layer is shared. Same Iceberg table, four execution models Trino long-lived MPP daemon coordinator + N workers in-memory pipelines no fault tolerance mid-query target: 100ms–10s queries Spark JVM batch DAG driver + N executors stage-based shuffle RDD-style retries on stage target: minutes–hours Dremio BI tier + reflections Arrow-native exec pre-computed reflections substitute scan with rollup target: BI dashboards DuckDB in-process single node embedded C++ library vectorised pull pipeline no cluster, one process target: laptop / one node Iceberg table on S3 s3://flipkart-lake/orders/ manifest list snapshot 414 12 manifests data files (Parquet) 2,340 files ~128 MB each eq-delete files 128 files CDC merge state stats min/max per col
The lakehouse split. Storage and catalog are shared; the query engine is interchangeable. Each engine reads the same manifest, opens the same Parquet files, and applies the same equality-delete files — but the planner, the execution model, and the failure semantics differ enough that picking the wrong engine for the workload turns a 200ms query into a 40s query or vice versa.

How each engine actually executes a query

The same SQL, SELECT order_state, SUM(amount_inr) FROM orders WHERE order_date = '2026-04-25' GROUP BY order_state, takes a different path through each engine.

Trino: long-lived MPP, no checkpoints, in-memory exchange

Trino's coordinator parses the SQL, builds a logical plan, and asks Iceberg's connector for the manifest list of the current snapshot. The connector returns a list of (file_path, partition_value, row_count) tuples; the coordinator filters them by order_date = '2026-04-25' (manifest pruning, no S3 read for non-matching files) and produces a list of, say, 47 Parquet files to scan. The coordinator slices those 47 files into 47 splits and assigns them to workers. Each worker opens its split, reads only the amount_inr and order_state columns (Parquet's column projection), applies the predicate, and pipelines partial aggregates into shuffle. A second stage receives the shuffled partials, finalises the GROUP BY, and streams rows to the coordinator, which streams them to the client. The whole thing runs as a single physical pipeline with no intermediate state on disk — fast, but if a worker dies mid-query, the entire query fails and the client retries.

Why Trino's no-checkpoint design is faster for short queries: a checkpoint costs roughly 100ms of disk write per stage boundary in Spark. For a 200ms query that's 50% overhead with zero benefit because the query is too fast for fault tolerance to matter. Trino skips the checkpoint and pipelines stages directly into each other, accepting that a failure mid-query forces a retry. For interactive BI where the user re-clicks anyway on failure, the trade-off is right.

Spark: stage-based DAG, shuffle-to-disk, retryable

Spark's driver builds a logical plan from the SQL, optimises it with Catalyst, and produces a physical DAG. The DAG has explicit stage boundaries — every shuffle is a stage cut. For our query, stage 1 is the scan + partial aggregate (one task per Parquet split, say 47 tasks); stage 2 is the final aggregate (one task per order_state value, say 28 tasks). Between stage 1 and stage 2, every task in stage 1 writes its partial aggregates to local SSD as shuffle blocks; every task in stage 2 reads only the blocks for its keys. If a stage-2 task dies, the driver retries it without re-running stage 1. This makes Spark survive losses of individual workers in a 6-hour ETL job, at the cost of writing every shuffle to disk.

Why Spark's shuffle-to-disk wins for long jobs: a 6-hour job has a meaningful probability of losing at least one worker (Spot instance reclamation, OOM kills, driver-induced restarts). Without checkpointing, that single failure restarts the whole 6 hours. Spark's stage boundaries are the granularity of retry — a failed stage costs minutes, not hours. The disk-write overhead per shuffle (5–15% of total runtime in our experience at Flipkart) buys the ability to lose 5–10% of executors mid-job and still complete.

Dremio: substitute the scan with a reflection

Dremio's planner does the standard manifest pruning, but before executing the scan, it checks whether a reflection — a pre-computed materialised aggregate stored as Arrow/Parquet on local SSD — covers this query. If a reflection exists for (order_date, order_state) → SUM(amount_inr), the planner rewrites the query to scan the reflection (a few KB of pre-aggregated rows) instead of the source table (47 files of granular orders). The query that was a 1.2-second full scan becomes a 25ms reflection lookup. Reflections are refreshed in the background as the source table changes; the planner consults a refresh manifest to know whether a reflection is fresh enough for the query's freshness requirement.

This is Dremio's distinguishing bet: BI dashboards run the same 50 queries a thousand times a day, and the right answer is to pre-compute them. The cost is reflection storage (typically 5–15% of source size) and reflection refresh cost (incremental, but non-zero). The pay-off is sub-second p99 on a fixed set of analytical patterns. Why a reflection isn't just a materialised view: a materialised view answers exactly one query shape. A reflection is shape-aware — it can serve any query whose logical plan can be rewritten to use it (e.g., a daily reflection serves weekly queries by re-aggregating). Dremio's planner does the rewrite automatically, the user just writes plain SQL.

DuckDB: one process, vectorised pull pipeline

DuckDB runs inside the analyst's Python process. It calls the Iceberg catalog (via iceberg-rust or pyiceberg), gets the file list, and opens the 47 Parquet files using its own C++ Parquet reader. Execution is a vectorised pull pipeline — the output operator pulls a batch of 2048 rows from the aggregate, which pulls from the filter, which pulls from the scan, which reads the next row group from Parquet. There is no shuffle (one process), no driver/executor split (the analyst's Python is the driver and the executor), no fault tolerance (if Python crashes, the query is gone). For a 22-crore-row table on S3 this runs in 4–8 seconds on a laptop with a 1 Gbps connection — limited by S3 read bandwidth, not CPU.

The trade DuckDB makes is "no cluster" — no Trino coordinator to set up, no Spark cluster to provision, no Dremio license. For ad-hoc analysis where the analyst doesn't want to deal with infrastructure, DuckDB is the answer. For queries that exceed one machine's RAM or bandwidth budget, it isn't.

A working comparison

The cleanest way to feel the difference is to run the same plan through four toy engines and see how their runtime scales with data size, predicate selectivity, and worker count.

# query_engines_compare.py — toy models of Trino, Spark, Dremio, DuckDB
# scanning the same Iceberg-like table. Each engine returns the same answer;
# the timings show how the execution model changes the cost.

import time, random
from dataclasses import dataclass

# Mock Iceberg table: 200 partitioned files, each 50,000 rows
@dataclass
class Row:
    order_date: str; state: str; amount: int

def gen_file(date, n):
    states = ["KA","MH","DL","TN","UP","WB"]
    return [Row(date, random.choice(states), random.randint(100, 50000)) for _ in range(n)]

DATES = [f"2026-04-{d:02d}" for d in range(1, 26)]
TABLE = {d: [gen_file(d, 50_000) for _ in range(8)] for d in DATES}   # 200 files, 1 cr rows

def manifest_prune(predicate_date):
    # Iceberg-style pruning: skip every file not matching the predicate
    return [(d, i, f) for d, files in TABLE.items() if d == predicate_date
                       for i, f in enumerate(files)]

def execute_local(file): # same kernel for every engine
    agg = {}
    for r in file:
        agg[r.state] = agg.get(r.state, 0) + r.amount
    return agg

def merge_aggs(parts):
    out = {}
    for p in parts:
        for k, v in p.items(): out[k] = out.get(k, 0) + v
    return out

def trino_run(date, workers=4):
    # MPP: parallel scan, in-memory shuffle, no checkpoint
    files = manifest_prune(date)
    t0 = time.perf_counter()
    chunks = [files[i::workers] for i in range(workers)]
    partials = [merge_aggs([execute_local(f) for _, _, f in c]) for c in chunks]
    final = merge_aggs(partials)
    return final, time.perf_counter() - t0

def spark_run(date, workers=4):
    # Stage-based: scan stage + checkpoint sim + final stage
    files = manifest_prune(date)
    t0 = time.perf_counter()
    partials = [execute_local(f) for _, _, f in files]
    time.sleep(0.05 * len(partials) / 200)   # simulate shuffle-write to disk
    final = merge_aggs(partials)
    return final, time.perf_counter() - t0

REFLECTION = {}   # cached pre-aggregates per date
def dremio_run(date, workers=4):
    t0 = time.perf_counter()
    if date in REFLECTION:
        return REFLECTION[date], time.perf_counter() - t0   # microseconds
    files = manifest_prune(date)
    final = merge_aggs(execute_local(f) for _, _, f in files)
    REFLECTION[date] = final
    return final, time.perf_counter() - t0

def duckdb_run(date, workers=1):
    files = manifest_prune(date)
    t0 = time.perf_counter()
    final = merge_aggs(execute_local(f) for _, _, f in files)   # single process
    return final, time.perf_counter() - t0

date = "2026-04-15"
for name, fn in [("Trino   ", trino_run), ("Spark   ", spark_run),
                 ("Dremio  ", dremio_run), ("DuckDB  ", duckdb_run)]:
    res, t = fn(date)
    print(f"{name} cold: {t*1000:7.1f} ms   states={len(res)}")

# Run Dremio twice to see the reflection hit
res, t = dremio_run(date); print(f"Dremio   warm: {t*1000:7.1f} ms (reflection hit)")

# Sample run on a 4-core laptop:
# Trino    cold:   312.4 ms   states=6
# Spark    cold:   428.7 ms   states=6
# Dremio   cold:   298.1 ms   states=6
# DuckDB   cold:   1140.2 ms  states=6
# Dremio   warm:     0.3 ms (reflection hit)

Walk the load-bearing pieces:

The output makes the punchline clear: Trino wins cold-start latency (no shuffle overhead on a single-stage query), Dremio wins warm-state latency by a factor of 1000 (reflection hit), Spark loses ~30% to its shuffle-write overhead, and DuckDB is 3–4x slower on cold-start because it doesn't parallelise across machines but is still acceptable for ad-hoc work on one laptop.

Query latency by engine and query shapeA grouped bar chart showing four engines on the x-axis and three query types as colour-coded bars: short interactive query, BI dashboard query, batch ETL query. Trino wins interactive, Dremio wins dashboards, Spark wins long batch jobs, DuckDB wins for ad-hoc single-node queries. Same query shape across engines (lower is better) Workload: GROUP BY on one day's orders, 50 lakh rows after pruning 0 1s 2s 3s 4s+ Trino Spark Dremio DuckDB interactive (1 file) BI dashboard (warm) batch ETL (8 files) Lower is better
Same query shape, four engines, three workload patterns. Trino wins short interactive queries (no shuffle overhead). Dremio wins warm BI dashboards by a factor of 5–10x because the reflection skips the scan. Spark wins on the long batch job because its retry semantics let it run for hours without restarting from zero. DuckDB sits in between — fast for ad-hoc, but doesn't scale across machines. The right engine for a workload is the one whose curve sits lowest at that workload's data shape.

Equality-delete handling: where the engines diverge most

When a CDC pipeline is feeding the table, every reader has to apply equality-delete files at scan time (see /wiki/cdc-iceberg-the-real-world-pattern). Each engine implements the merge differently, with measurable consequences.

Trino loads the equality-delete files into an in-memory hash set per worker before the data scan starts. As rows are read from a data file, the worker checks the hash set and skips matched rows. Cost: O(delete-file-size) memory per worker, O(1) per-row check. Works well until eq-delete files exceed worker RAM (above 4 GB combined per worker, typically with 5,000+ eq-delete files in one partition).

Spark uses an anti-join: the data files become the left side, the eq-delete files the right side, and the planner emits a LEFT ANTI JOIN. This shuffles, which is more expensive per query but scales to arbitrarily large eq-delete sets because the join can spill to disk. Spark also handles the "millions of eq-delete files" pathological case better than Trino does.

Dremio has its own twist: it materialises the merged state into a reflection, refreshing periodically. So queries that hit the reflection see the merged view without paying the eq-delete cost at all — but the reflection refresh job itself pays it. The cost is shifted to a background process.

DuckDB loads eq-deletes into a hash set in its single process. For tables with light CDC mutation this is fine; for tables with heavy CDC where eq-delete file count is in the thousands, DuckDB on one machine becomes RAM-bound.

The Razorpay analytics team learned this the hard way in early 2026: their dashboards on the payments table (heavy CDC) ran 12x slower on Trino than on Dremio because the reflections bypassed the merge entirely. They moved BI to Dremio and kept Trino for ad-hoc analyst queries — the Trino queries averaged 3 seconds and were still acceptable for ad-hoc, but unusable for refresh-every-30-seconds dashboards.

Common confusions

Going deeper

Predicate pushdown and the cost of getting it wrong

Every engine pushes simple equality and range predicates into the Parquet reader, so WHERE order_date = '2026-04-25' becomes a row-group skip rather than a filter on read rows. The bug-prone case is non-trivial expressions: WHERE EXTRACT(HOUR FROM created_at) = 14 does not push down on most engines because Parquet stores created_at directly, not its hour component. The result is a full-column scan instead of a row-group skip — a 100x cost difference. The fix is to add a derived column (created_hour) at write time and partition or stat-prune on it. Trino's cost-based optimiser is the most aggressive about pushdown; Spark's Catalyst is close; DuckDB's pushdown is good for simple predicates and weaker for complex ones; Dremio relies on reflections to bypass the question entirely. Watching EXPLAIN ANALYZE output to confirm pushdown happened is a habit every senior data engineer develops the hard way.

The Flipkart 2025 query-engine sprawl

By mid-2025, Flipkart's data platform had four engines reading the same Iceberg lakehouse — Trino for ad-hoc analyst queries (~₹4 lakh/month compute), Spark for nightly ETL (~₹18 lakh/month, 6 PB shuffled per night), Athena for finance team's pre-built dashboards (~₹2 lakh/month, pay-per-query), and Dremio for executive dashboards (~₹6 lakh/month including reflection storage). Total compute across four engines: ₹30 lakh/month for the catalog team, plus storage and metadata. The platform team's 2025 review concluded that consolidating onto two engines (Trino + Spark) would save ~₹8 lakh/month but cost the dashboard team 4–6 weeks of reflection rebuild work and a measurable BI latency regression. They kept all four, with explicit guidelines on which engine to use for which workload. The published lesson: engine consolidation is a real trade-off, not free, and the right answer is workload-driven, not vendor-driven.

Iceberg's read-side contract: snapshot isolation and time travel

Every engine reads a specific Iceberg snapshot ID. By default the engine reads the current snapshot at the time the query was planned; once planning starts, the snapshot is fixed and concurrent writes don't affect the running query (snapshot isolation). Time-travel queries (SELECT * FROM orders FOR VERSION AS OF 412) explicitly target an older snapshot; every engine supports the syntax via its Iceberg connector. The interesting failure mode is when two engines read different snapshots simultaneously — analyst Riya in DuckDB sees snapshot 414, the BI dashboard in Dremio sees snapshot 412 (cached reflection), the Spark ETL job sees snapshot 413 (started 30 minutes ago). All three answers are individually correct; reconciling them across engines is a freshness-management problem that the /wiki/the-semantic-layer-as-the-business-truth layer addresses.

Why Velox is showing up everywhere

Meta's Velox library — a C++ vectorised execution engine — has quietly become the read-side runtime under Spark (via Gluten), Presto (in the Prestissimo project), and several proprietary engines. The pitch: write the engine logic in JVM Spark/Presto, but execute the per-batch kernels in Velox. The result: 2–4x speedups on scan-heavy and aggregation-heavy queries with no SQL changes. By 2026 the Apache Iceberg + Velox pairing is the fastest open-source path for read-heavy lakehouse workloads. The implication for the four-engine landscape: Spark with Gluten is now competitive with Trino on short interactive queries while keeping Spark's batch fault tolerance — which may collapse the Trino/Spark split for some teams over the next two years.

Where this leads next

The next chapter, /wiki/lakehouse-vs-warehouse-when-snowflake-stops-being-cheaper, takes the same multi-engine lakehouse and asks the cost question: at what point does the unbundled stack (Iceberg + Trino/Spark + a catalog) beat the integrated warehouse (Snowflake, BigQuery)? The answer depends on data volume, query patterns, and operational maturity — not just sticker price.

/wiki/streaming-writes-into-a-lakehouse and /wiki/cdc-iceberg-the-real-world-pattern are the upstream chapters that built the data the query engines read. /wiki/copy-on-write-vs-merge-on-read-iceberg-vs-hudi is the read-time trade-off whose shape the engines make visible.

Build 13 (the semantic layer) takes the multi-engine reality as input and asks: when the Trino, Spark, and Dremio answers diverge by a few rows on the same SQL, who decides which is correct? The semantic layer's job is to make that question answerable.

References