Materialize and Differential Dataflow: databases as views
A Razorpay analyst writes SELECT merchant_id, SUM(amount) FROM payments WHERE status='captured' GROUP BY merchant_id and pins it to a dashboard that 2,000 merchants stare at all day. Every refresh re-scans 400 million rows and the query takes nine seconds. There is a better idea: run the query once, then keep its answer correct as new payments land — touching only the rows that actually changed. That idea has a name (incremental view maintenance), a math (Differential Dataflow), and a shipping product (Materialize).
A streaming database flips the usual contract. Instead of data sits still and queries run repeatedly, the query sits still and data flows through it. Differential Dataflow gives the algebra to update a query's answer by exactly the change implied by each input delta; Materialize wraps that algebra in PostgreSQL-compatible SQL.
The shape of the problem: queries that should never re-run
Walk into a BookMyShow operations room on the morning IPL tickets drop. Ten dashboards. Each is some flavour of SELECT show_id, COUNT(DISTINCT user_id) FROM clicks GROUP BY show_id. The ingest is 80,000 clicks per second. If you re-evaluate each query every five seconds the way a normal database would, you are scanning the same 80,000 × 5 = 400,000 new rows ten times — once per dashboard — plus the entire historical click table from the last six hours. You burn CPU on work the previous answer already covered.
The textbook fix is a materialized view: compute the answer once, store it, and "refresh" it on a schedule. PostgreSQL has these. They are useful and they are wrong for this workload, because the refresh re-runs the whole query. You bought a pre-computed answer; you did not buy incremental maintenance. When a new click arrives for show_id=42, the only change to the answer is +1 for that one group. A correctly designed system should do work proportional to that delta — one row touched — not work proportional to the whole click table.
That correctly designed system is what Frank McSherry and his Naiad co-authors built between 2011 and 2013, and what the Materialize startup turned into a database in 2019.
Naiad, Timely Dataflow, and the progress problem
Before we get to deltas, we have to fix the thing that breaks every naive streaming system: knowing when an answer is final.
Consider a join: clicks ⋈ users ON clicks.user_id = users.id. A click for user 7 arrives at logical time t=100. The matching users row arrived at t=50. Fine, the join emits. But suppose the click for user 7 arrives before the user row — the user signed up on a different shard and the message is in flight. If the join emits "no match" at t=100, it is wrong as of t=100, because the user row will land any moment now and the answer for t=100 should include it.
The fix is logical time and progress tracking. Every record in the system carries a timestamp (it can be a stream offset, an event-time millisecond, or a logical version vector — a partial order). Operators do not emit a final result for time t until every input has promised it will not send any more records with timestamp ≤ t. That promise is called a frontier or watermark, and tracking it is the entire job of Naiad's underlying engine, Timely Dataflow.
Timely Dataflow is the bookkeeping. Differential Dataflow is the algebra that runs on top.
Why the frontier matters: without it, an operator either emits results too eagerly (and is wrong, because a late record will invalidate the emission) or too late (and the system stalls). The frontier turns a "fuzzy promise" into a hard contract: time t is closed, time t+1 is still open.
Differential Dataflow: subtract the old answer, add the new
Here is the core trick, and it is small enough to fit on a postcard.
Every record in Differential Dataflow is a triple: (data, time, diff). diff is an integer, usually +1 (an insertion) or -1 (a deletion). An update from (user=7, status='active') to (user=7, status='banned') is two triples: ((7,'active'), t, -1) and ((7,'banned'), t, +1). There is no "update" operation; there is only "retract" plus "insert."
A relational operator — say, COUNT grouped by some key — is then defined not on tables but on collections of diffs. The output for each group at each time is the sum of the input diffs for that group up to that time. When a new +1 arrives for show_id=42 at time t=101, the operator emits ((42, 1+old_count), t=101, +1) and ((42, old_count), t=101, -1). The downstream operator now sees: "old answer retracted, new answer inserted." It does the same trick. The change propagates, and every operator does work proportional only to the diff that arrived, not to the whole input.
That is the secret. Once you accept that updates are pairs of (retract, insert), every relational operator — JOIN, GROUP BY, DISTINCT, even recursive queries — has an incremental version that does delta-work per delta-input.
Why "retract" is essential: if the operator only emitted "+1 for the new count," every downstream consumer would have to figure out that the old count is no longer correct. By emitting both -1 old and +1 new, the system makes the intent explicit and lets every downstream operator apply the same algebra without special cases. Retraction is what makes the pipeline fully compositional.
A tiny IVM engine in 40 lines of Python
To make this concrete, here is a one-operator (just COUNT BY) IVM engine. It accepts diffs, maintains a running count per key, and emits retract/insert pairs.
# tiny_dd.py — a postcard-sized Differential Dataflow operator
from collections import defaultdict
class CountByKey:
"""Maintains COUNT(*) GROUP BY key over a stream of (key, time, diff) tuples."""
def __init__(self):
self.state = defaultdict(int) # key -> current count
self.output = [] # list of (key, count, time, diff)
def feed(self, key, time, diff):
old = self.state[key]
new = old + diff
# retract the old answer, insert the new one
if old != 0:
self.output.append((key, old, time, -1))
if new != 0:
self.output.append((key, new, time, +1))
if new == 0:
del self.state[key]
else:
self.state[key] = new
def snapshot(self):
"""Sum diffs in self.output to get the current view."""
view = defaultdict(int)
for key, count, _t, diff in self.output:
view[(key, count)] += diff
# only rows with positive multiplicity are live
return {(k, c) for (k, c), m in view.items() if m > 0}
op = CountByKey()
# three clicks for show 42, one for show 99
op.feed("show=42", time=100, diff=+1)
op.feed("show=42", time=101, diff=+1)
op.feed("show=99", time=101, diff=+1)
op.feed("show=42", time=102, diff=+1)
# a click is retracted (user undid it)
op.feed("show=42", time=103, diff=-1)
print("emitted diffs:")
for row in op.output:
print(" ", row)
print("current view:", op.snapshot())
Output:
emitted diffs:
('show=42', 1, 100, 1)
('show=42', 1, 101, -1)
('show=42', 2, 101, 1)
('show=99', 1, 101, 1)
('show=42', 2, 102, -1)
('show=42', 3, 102, 1)
('show=42', 3, 103, -1)
('show=42', 2, 103, 1)
current view: {('show=42', 2), ('show=99', 1)}
Twenty lines of state, and you have an incremental COUNT BY. Real Differential Dataflow does the same thing for every relational operator — joins via arrangements (indexed multi-version maps), GROUP BY aggregates, DISTINCT via diff-collapsing, and even fixed-point recursion (transitive closure, graph reachability) by iterating the diff loop until the frontier in a nested time domain closes. The trick scales because the unit of work is always one diff.
Why the snapshot trick is correct: every emitted output is a (key, count, time, diff) tuple. To get the current view at any closed time, sum the diffs by (key, count) and keep only the entries with non-zero multiplicity. A retraction at t=101 of ('show=42', 1) and an insertion at the same time of ('show=42', 2) cancels the old answer and installs the new one — because the multiplicity of ('show=42', 1) becomes zero, while ('show=42', 2) becomes one. The whole algebra is a multi-set: the data is the multiplicity, not the records.
The compositionality matters. If you wired a second operator downstream — say, "alert when any merchant's count exceeds 1000" — that operator just consumes the same (key, count, time, diff) stream the first one emits. It does not need to know that the count came from incremental maintenance; it just sees diffs and applies its own algebra. This is why Materialize can stack 20 SQL operators in a view and still update each one in time proportional to the input delta.
Materialize: SQL on top of the algebra
Differential Dataflow is a Rust library. Materialize is what happens when you wrap that library in a PostgreSQL wire-protocol server and let an analyst write CREATE MATERIALIZED VIEW. Conceptually:
- The user types
CREATE MATERIALIZED VIEW top_merchants AS SELECT merchant_id, SUM(amount) FROM payments WHERE status='captured' GROUP BY merchant_id;. - Materialize parses the SQL, plans it, and lowers the plan to a Differential Dataflow program — a graph of operators connected by diff streams.
- The view is maintained as input flows in. New
paymentsrows (from a Kafka topic, a Postgres CDC stream, or anINSERT) become diffs that flow through the graph. SELECT * FROM top_merchants WHERE merchant_id = 'razorpay_42';reads the current state of the maintained view at the latest closed frontier — instantly, with no scan of the source.
The user's mental model is unchanged: it is just Postgres with materialized views that always look fresh. The implementation is upside-down: the query is the long-lived object; the data is what flows.
The performance promise is concrete. A JOIN of two tables that takes 9 seconds to compute from scratch in Postgres takes — in Materialize — one millisecond per arriving row, because the join's incremental cost is O(log n) in the size of the smaller indexed side (the arrangement), not O(n × m). A merchant who logs in to their dashboard at any moment sees the answer that was correct as of the latest closed frontier (typically a few hundred milliseconds ago).
The tradeoff is memory. The arrangements — the indexed multi-version state for each operator — live in RAM. A view over a 100-GB Postgres table needs roughly 100 GB of arrangement memory across the cluster. Materialize is fast because it pre-builds the answer; pre-building costs space.
Why a join needs an arrangement and a COUNT does not: a COUNT BY key operator only needs to remember a single integer per key — the running count. A JOIN ON key operator, on the other hand, needs the whole right side indexed by the join key, because every left-side delta has to find the right-side rows it matches. The arrangement is that index, plus enough version history to handle out-of-order arrivals correctly. This is why join-heavy Materialize workloads burn memory while pure-aggregate ones do not.
A worked sizing example. Suppose Zerodha runs a view of orders ⋈ users ON user_id over 8 million users and 200 million orders/day. The arrangement on the users side is roughly 8M × (key + payload + version overhead) ≈ 8M × 200 bytes ≈ 1.6 GB. The arrangement on the orders side is the larger one — about 40 GB at full retention — but is sharded across the cluster's worker threads. A 4-node cluster with 16 GB RAM each absorbs this comfortably; a single laptop does not. The general rule: budget RAM for the sum of the keyed sides of every join in your view set, plus 30–50% slack for multi-version overhead and for arrangements that have not yet compacted past the current frontier.
Common confusions
-
"Materialized views in Postgres are the same thing." They are not. Postgres
MATERIALIZED VIEWrequires a manualREFRESH MATERIALIZED VIEWthat re-runs the entire query, scanning the whole input. There is no incremental maintenance; the view is stale until refreshed. Differential Dataflow does work proportional to the delta, not the whole input. The two share a name and almost nothing else. -
"Differential Dataflow is just stream processing like Flink." Flink is a stateful streaming engine that lets you write operators which incrementally update state — but the operator logic is yours to write. Differential Dataflow gives you the operators for free with mathematical correctness guarantees: every relational primitive has a known-correct incremental implementation that handles late data via retractions. Materialize compiles SQL into that algebra; Flink expects you to write the Java/Scala.
-
"This works for SELECTs but breaks on joins or recursion." It works for joins (via arrangements), recursive CTEs (via fixed-point iteration with nested timestamps),
DISTINCT, anti-joins, outer joins, and aggregates with retractable functions. Non-retractable aggregates likeMEDIANorPERCENTILE_DISCare the genuinely hard cases — Materialize implements them, but their incremental cost can degrade toO(n)per delta in the worst case. -
"You don't need Kafka if you have Materialize." Materialize consumes from Kafka, Kinesis, or Postgres logical decoding (CDC). It is a sink for change streams, not a replacement for the broker. The stream/table duality is what makes the marriage natural: the broker holds the log of changes, Materialize maintains the table-shaped view of that log.
-
"Differential Dataflow runs in the cloud somewhere magical." It is just a Rust library —
timelyplusdifferential-dataflowon crates.io. You cancargo add differential-dataflowin any Rust binary and have arrangements running on your laptop. Materialize the company productised the library; the algebra is open and decades-stable. -
"If the query never changes, can I edit my SQL?" Yes — but adding an operator to a running dataflow is non-trivial. Materialize handles it by tearing the dataflow down and rebuilding (with a brief catch-up phase). McSherry's research has explored incremental query incrementalisation — adding operators without a full rebuild — but production Materialize takes the simpler hit.
Going deeper
The Naiad paper and timely-versus-differential
Naiad (Microsoft Research, 2013) introduced Timely Dataflow — the runtime — and Differential Dataflow — the query algebra — in one paper. They are separable: Timely is a low-level scheduler for dataflow programs with cyclic graphs, nested loops, and per-message logical timestamps; Differential is one specific use of Timely where every message is a (data, time, diff) triple and every operator preserves diff semantics. Today, timely-dataflow and differential-dataflow are two separate Rust crates, both maintained by Frank McSherry. Most production users (Materialize, in particular) only consume the differential layer.
Arrangements: the magical join index
A relational join has cost O(n × m) if you scan both sides. Differential Dataflow gets it down to roughly O((Δn + Δm) × log(n + m)) per delta-batch by pre-building an arrangement: a sorted, indexed, multi-versioned map from join keys to records-with-diffs-and-times. When a new delta arrives on the left side, the operator probes the arrangement on the right side at the appropriate logical time, finds the matching records, and emits the join diffs. The clever part is the multi-versioning: the arrangement stores enough history that you can probe at any not-yet-compacted past time, which is what makes correct retractions possible across out-of-order input. This is why memory grows: every active timestamp needs its own arrangement state until the frontier compacts past it.
Fixed-point recursion: graph queries on a stream
The single most striking demo of Differential Dataflow is incremental graph reachability. You have a graph with millions of edges; you compute "all pairs of nodes within 5 hops" — a transitive closure. Recompute this from scratch on every edge insertion or deletion is hopeless. Differential Dataflow expresses the recursion as a fixed-point loop in a nested time domain (one outer time per epoch of input changes, one inner time per iteration of the fixed-point), and updates the closure with work proportional only to the affected paths. Adding one edge that creates a single new path costs roughly O(diameter × Δpaths). McSherry showed this beating bespoke graph databases (Neo4j, Giraph) on social-graph workloads while being a 200-line program.
The Indian use case that lights this up is fraud-ring detection at PhonePe or Paytm. You have a graph of user → device → user → device edges. A fraud ring is a connected component with more than k shared devices among accounts that should be unrelated. Re-running this query nightly against 400 million users is a six-hour batch job. Expressed as a fixed-point dataflow, every new login (which is one edge insertion) costs only the paths it touches — usually a few dozen — and the alert fires within a second of the login completing. The same recursive primitive that powers PageRank, single-source shortest paths, and Datalog rules powers fraud detection.
Compaction and the cost of remembering history
A naive Differential Dataflow implementation would keep every diff at every time forever — useful for re-running queries at past times, ruinous for memory. The system compacts old times by collapsing every diff at time ≤ frontier into a single equivalent diff at the frontier itself. After compaction, you can no longer query the view as of t=100; you can only query it as of the current frontier. This is the same compaction idea you saw in LSM-tree segments — the trade between memory and historical query capability. Materialize exposes compaction policies as CREATE INDEX … WITH (RETAIN HISTORY = '1 hour'), letting an analyst pick how far back time-travel queries can look.
Why Materialize chose Postgres compatibility
When Materialize the company launched in 2019, the natural alternative was to invent a new SQL dialect or a new wire protocol — something more "streaming-native." They deliberately did not. They picked PostgreSQL wire compatibility because the friction of introducing a new analytical database into a Bengaluru fintech (or anywhere else) is dominated by the BI tool integration: Tableau, Metabase, Superset, every Python notebook, every Go service. By speaking pgwire, Materialize slid into all those tools as a drop-in. The slogan "the streaming database for the rest of us" only works if the SQL surface is unchanged from what an analyst already knows.
What Differential Dataflow doesn't solve
It is not a transactional database. There is no concept of multi-statement read-write transactions over the maintained views; you can read consistent snapshots, but you cannot do an UPDATE … WHERE … RETURNING against a maintained view and expect serialisable isolation across many such operations. Materialize layers some of this on top (a dataflow-as-of-time read consistency model), but the core algebra is for read-mostly analytical views, not OLTP. For OLTP backed by an Aurora-style primary, you wire CDC into Materialize and let the views ride along.
Where this leads next
- Change Data Capture: Debezium, logical decoding — chapter 180: how Postgres and MySQL emit the diff stream that Materialize consumes.
- Where this is all going: the database as a materialized view — chapter 181: the philosophical implication that every database can be modelled as a maintained view of a log.
- The stream / table duality — chapter 175: the Kafka-side intuition that streams and tables are two views of the same data.
- Flink: stateful stream processing — chapter 178: the operator-level streaming engine that Materialize-the-algebra sits one level above.
- Kafka as a distributed log — chapter 174: the broker that supplies the input deltas.
References
- Naiad: A Timely Dataflow System — Murray, McSherry, Isaacs, Isard, Barham, Abadi (SOSP 2013). The original paper introducing Timely and Differential Dataflow.
- Differential Dataflow — the Rust crate. README + examples are the most readable introduction to the algebra.
- Frank McSherry's blog — long-form posts on arrangements, joins, and the engineering of incremental computation.
- Materialize documentation — SQL surface, source connectors, sizing guidance.
- Kreps, The Log: What every software engineer should know — the LinkedIn essay that made "log + view" canonical streaming-systems doctrine.
- /wiki/the-stream-table-duality — internal companion chapter on streams and tables.
- /wiki/flink-stateful-stream-processing — internal companion on Flink.
- Abadi et al., The Beckman Report on Database Research — places streaming databases in the broader research agenda.