Materialize and Differential Dataflow: databases as views

A Razorpay analyst writes SELECT merchant_id, SUM(amount) FROM payments WHERE status='captured' GROUP BY merchant_id and pins it to a dashboard that 2,000 merchants stare at all day. Every refresh re-scans 400 million rows and the query takes nine seconds. There is a better idea: run the query once, then keep its answer correct as new payments land — touching only the rows that actually changed. That idea has a name (incremental view maintenance), a math (Differential Dataflow), and a shipping product (Materialize).

A streaming database flips the usual contract. Instead of data sits still and queries run repeatedly, the query sits still and data flows through it. Differential Dataflow gives the algebra to update a query's answer by exactly the change implied by each input delta; Materialize wraps that algebra in PostgreSQL-compatible SQL.

The shape of the problem: queries that should never re-run

Walk into a BookMyShow operations room on the morning IPL tickets drop. Ten dashboards. Each is some flavour of SELECT show_id, COUNT(DISTINCT user_id) FROM clicks GROUP BY show_id. The ingest is 80,000 clicks per second. If you re-evaluate each query every five seconds the way a normal database would, you are scanning the same 80,000 × 5 = 400,000 new rows ten times — once per dashboard — plus the entire historical click table from the last six hours. You burn CPU on work the previous answer already covered.

The textbook fix is a materialized view: compute the answer once, store it, and "refresh" it on a schedule. PostgreSQL has these. They are useful and they are wrong for this workload, because the refresh re-runs the whole query. You bought a pre-computed answer; you did not buy incremental maintenance. When a new click arrives for show_id=42, the only change to the answer is +1 for that one group. A correctly designed system should do work proportional to that delta — one row touched — not work proportional to the whole click table.

That correctly designed system is what Frank McSherry and his Naiad co-authors built between 2011 and 2013, and what the Materialize startup turned into a database in 2019.

Naiad, Timely Dataflow, and the progress problem

Before we get to deltas, we have to fix the thing that breaks every naive streaming system: knowing when an answer is final.

Consider a join: clicks ⋈ users ON clicks.user_id = users.id. A click for user 7 arrives at logical time t=100. The matching users row arrived at t=50. Fine, the join emits. But suppose the click for user 7 arrives before the user row — the user signed up on a different shard and the message is in flight. If the join emits "no match" at t=100, it is wrong as of t=100, because the user row will land any moment now and the answer for t=100 should include it.

The fix is logical time and progress tracking. Every record in the system carries a timestamp (it can be a stream offset, an event-time millisecond, or a logical version vector — a partial order). Operators do not emit a final result for time t until every input has promised it will not send any more records with timestamp ≤ t. That promise is called a frontier or watermark, and tracking it is the entire job of Naiad's underlying engine, Timely Dataflow.

Timely Dataflow is the bookkeeping. Differential Dataflow is the algebra that runs on top.

Timely Dataflow's frontierA horizontal timeline showing events at logical times 1, 2, 3, 4, 5. Events at times 1 and 2 are settled; the frontier line sits at 3, indicating no more records ≤ 2 will arrive. Events at times 3, 4, 5 are still in flight.logical time →t=1t=2frontier = 3t=3t=4t=5settled — answers finalin-flight — answers may changeThe frontier is the system's promise: "no more records will arrive with t < 3."Operators are free to emit final results for t=1 and t=2; they must hold off on t=3.
The frontier is a moving line in logical time. Once it has advanced past time t, every operator knows its t-th answer is final and can be released downstream.

Why the frontier matters: without it, an operator either emits results too eagerly (and is wrong, because a late record will invalidate the emission) or too late (and the system stalls). The frontier turns a "fuzzy promise" into a hard contract: time t is closed, time t+1 is still open.

Differential Dataflow: subtract the old answer, add the new

Here is the core trick, and it is small enough to fit on a postcard.

Every record in Differential Dataflow is a triple: (data, time, diff). diff is an integer, usually +1 (an insertion) or -1 (a deletion). An update from (user=7, status='active') to (user=7, status='banned') is two triples: ((7,'active'), t, -1) and ((7,'banned'), t, +1). There is no "update" operation; there is only "retract" plus "insert."

A relational operator — say, COUNT grouped by some key — is then defined not on tables but on collections of diffs. The output for each group at each time is the sum of the input diffs for that group up to that time. When a new +1 arrives for show_id=42 at time t=101, the operator emits ((42, 1+old_count), t=101, +1) and ((42, old_count), t=101, -1). The downstream operator now sees: "old answer retracted, new answer inserted." It does the same trick. The change propagates, and every operator does work proportional only to the diff that arrived, not to the whole input.

That is the secret. Once you accept that updates are pairs of (retract, insert), every relational operator — JOIN, GROUP BY, DISTINCT, even recursive queries — has an incremental version that does delta-work per delta-input.

Differential Dataflow's retract-and-insert patternTwo columns. Left column shows three input deltas arriving at a COUNT BY operator: +1 (show_id=42) at t=100, +1 (show_id=42) at t=101, +1 (show_id=99) at t=101. Right column shows the operator's emitted diffs: retract count=0 insert count=1 at t=100; retract count=1 insert count=2 at t=101 for show_id=42; insert count=1 for show_id=99 at t=101. An arrow connects each input to its caused output.input deltas((42,click), t=100, +1)((42,click), t=101, +1)((99,click), t=101, +1)COUNTBY show_idoutput diffs((42, 1), t=100, +1)((42, 1), t=101, -1)((42, 2), t=101, +1)((99, 1), t=101, +1)Each input diff produces one or two output diffs.Old answer retracted (-1), new answer inserted (+1).Work per delta is O(1) for COUNT, O(log n) for JOIN with an arrangement index.
The COUNT operator turns each input delta into a retract-and-insert pair on its current group answer. No re-scan; just bookkeeping per group.

Why "retract" is essential: if the operator only emitted "+1 for the new count," every downstream consumer would have to figure out that the old count is no longer correct. By emitting both -1 old and +1 new, the system makes the intent explicit and lets every downstream operator apply the same algebra without special cases. Retraction is what makes the pipeline fully compositional.

A tiny IVM engine in 40 lines of Python

To make this concrete, here is a one-operator (just COUNT BY) IVM engine. It accepts diffs, maintains a running count per key, and emits retract/insert pairs.

# tiny_dd.py — a postcard-sized Differential Dataflow operator
from collections import defaultdict

class CountByKey:
    """Maintains COUNT(*) GROUP BY key over a stream of (key, time, diff) tuples."""
    def __init__(self):
        self.state = defaultdict(int)   # key -> current count
        self.output = []                # list of (key, count, time, diff)

    def feed(self, key, time, diff):
        old = self.state[key]
        new = old + diff
        # retract the old answer, insert the new one
        if old != 0:
            self.output.append((key, old, time, -1))
        if new != 0:
            self.output.append((key, new, time, +1))
        if new == 0:
            del self.state[key]
        else:
            self.state[key] = new

    def snapshot(self):
        """Sum diffs in self.output to get the current view."""
        view = defaultdict(int)
        for key, count, _t, diff in self.output:
            view[(key, count)] += diff
        # only rows with positive multiplicity are live
        return {(k, c) for (k, c), m in view.items() if m > 0}


op = CountByKey()
# three clicks for show 42, one for show 99
op.feed("show=42", time=100, diff=+1)
op.feed("show=42", time=101, diff=+1)
op.feed("show=99", time=101, diff=+1)
op.feed("show=42", time=102, diff=+1)
# a click is retracted (user undid it)
op.feed("show=42", time=103, diff=-1)

print("emitted diffs:")
for row in op.output:
    print(" ", row)
print("current view:", op.snapshot())

Output:

emitted diffs:
  ('show=42', 1, 100, 1)
  ('show=42', 1, 101, -1)
  ('show=42', 2, 101, 1)
  ('show=99', 1, 101, 1)
  ('show=42', 2, 102, -1)
  ('show=42', 3, 102, 1)
  ('show=42', 3, 103, -1)
  ('show=42', 2, 103, 1)
current view: {('show=42', 2), ('show=99', 1)}

Twenty lines of state, and you have an incremental COUNT BY. Real Differential Dataflow does the same thing for every relational operator — joins via arrangements (indexed multi-version maps), GROUP BY aggregates, DISTINCT via diff-collapsing, and even fixed-point recursion (transitive closure, graph reachability) by iterating the diff loop until the frontier in a nested time domain closes. The trick scales because the unit of work is always one diff.

Why the snapshot trick is correct: every emitted output is a (key, count, time, diff) tuple. To get the current view at any closed time, sum the diffs by (key, count) and keep only the entries with non-zero multiplicity. A retraction at t=101 of ('show=42', 1) and an insertion at the same time of ('show=42', 2) cancels the old answer and installs the new one — because the multiplicity of ('show=42', 1) becomes zero, while ('show=42', 2) becomes one. The whole algebra is a multi-set: the data is the multiplicity, not the records.

The compositionality matters. If you wired a second operator downstream — say, "alert when any merchant's count exceeds 1000" — that operator just consumes the same (key, count, time, diff) stream the first one emits. It does not need to know that the count came from incremental maintenance; it just sees diffs and applies its own algebra. This is why Materialize can stack 20 SQL operators in a view and still update each one in time proportional to the input delta.

Materialize: SQL on top of the algebra

Differential Dataflow is a Rust library. Materialize is what happens when you wrap that library in a PostgreSQL wire-protocol server and let an analyst write CREATE MATERIALIZED VIEW. Conceptually:

  1. The user types CREATE MATERIALIZED VIEW top_merchants AS SELECT merchant_id, SUM(amount) FROM payments WHERE status='captured' GROUP BY merchant_id;.
  2. Materialize parses the SQL, plans it, and lowers the plan to a Differential Dataflow program — a graph of operators connected by diff streams.
  3. The view is maintained as input flows in. New payments rows (from a Kafka topic, a Postgres CDC stream, or an INSERT) become diffs that flow through the graph.
  4. SELECT * FROM top_merchants WHERE merchant_id = 'razorpay_42'; reads the current state of the maintained view at the latest closed frontier — instantly, with no scan of the source.

The user's mental model is unchanged: it is just Postgres with materialized views that always look fresh. The implementation is upside-down: the query is the long-lived object; the data is what flows.

Materialize architecture: SQL becomes a long-lived dataflowLeft: a Kafka topic labelled payments and a Postgres CDC source. Centre: a box labelled Materialize containing three sub-boxes — the SQL planner, a Differential Dataflow graph showing FILTER → GROUP BY → SUM nodes, and an arrangement (indexed view state). Right: SQL clients reading from the maintained view at sub-millisecond latency. Arrows show diff streams flowing left to right and SQL SELECT queries reading from the arrangement.Kafka topicpaymentsPostgres CDCmerchantsMaterializeSQL plannerArrangements (indexed state)FILTERGROUP BYSUMDifferential Dataflow graphruns continuously, updates state per diffwork ∝ delta size, not table sizeSQL clientdashboardSQL clientapidiff streamsSELECT
Materialize is a SQL front-end on top of a Differential Dataflow graph. The graph is the long-lived object; SELECT reads cached state at the latest closed frontier.

The performance promise is concrete. A JOIN of two tables that takes 9 seconds to compute from scratch in Postgres takes — in Materialize — one millisecond per arriving row, because the join's incremental cost is O(log n) in the size of the smaller indexed side (the arrangement), not O(n × m). A merchant who logs in to their dashboard at any moment sees the answer that was correct as of the latest closed frontier (typically a few hundred milliseconds ago).

The tradeoff is memory. The arrangements — the indexed multi-version state for each operator — live in RAM. A view over a 100-GB Postgres table needs roughly 100 GB of arrangement memory across the cluster. Materialize is fast because it pre-builds the answer; pre-building costs space.

Why a join needs an arrangement and a COUNT does not: a COUNT BY key operator only needs to remember a single integer per key — the running count. A JOIN ON key operator, on the other hand, needs the whole right side indexed by the join key, because every left-side delta has to find the right-side rows it matches. The arrangement is that index, plus enough version history to handle out-of-order arrivals correctly. This is why join-heavy Materialize workloads burn memory while pure-aggregate ones do not.

A worked sizing example. Suppose Zerodha runs a view of orders ⋈ users ON user_id over 8 million users and 200 million orders/day. The arrangement on the users side is roughly 8M × (key + payload + version overhead) ≈ 8M × 200 bytes ≈ 1.6 GB. The arrangement on the orders side is the larger one — about 40 GB at full retention — but is sharded across the cluster's worker threads. A 4-node cluster with 16 GB RAM each absorbs this comfortably; a single laptop does not. The general rule: budget RAM for the sum of the keyed sides of every join in your view set, plus 30–50% slack for multi-version overhead and for arrangements that have not yet compacted past the current frontier.

Common confusions

Going deeper

The Naiad paper and timely-versus-differential

Naiad (Microsoft Research, 2013) introduced Timely Dataflow — the runtime — and Differential Dataflow — the query algebra — in one paper. They are separable: Timely is a low-level scheduler for dataflow programs with cyclic graphs, nested loops, and per-message logical timestamps; Differential is one specific use of Timely where every message is a (data, time, diff) triple and every operator preserves diff semantics. Today, timely-dataflow and differential-dataflow are two separate Rust crates, both maintained by Frank McSherry. Most production users (Materialize, in particular) only consume the differential layer.

Arrangements: the magical join index

A relational join has cost O(n × m) if you scan both sides. Differential Dataflow gets it down to roughly O((Δn + Δm) × log(n + m)) per delta-batch by pre-building an arrangement: a sorted, indexed, multi-versioned map from join keys to records-with-diffs-and-times. When a new delta arrives on the left side, the operator probes the arrangement on the right side at the appropriate logical time, finds the matching records, and emits the join diffs. The clever part is the multi-versioning: the arrangement stores enough history that you can probe at any not-yet-compacted past time, which is what makes correct retractions possible across out-of-order input. This is why memory grows: every active timestamp needs its own arrangement state until the frontier compacts past it.

Fixed-point recursion: graph queries on a stream

The single most striking demo of Differential Dataflow is incremental graph reachability. You have a graph with millions of edges; you compute "all pairs of nodes within 5 hops" — a transitive closure. Recompute this from scratch on every edge insertion or deletion is hopeless. Differential Dataflow expresses the recursion as a fixed-point loop in a nested time domain (one outer time per epoch of input changes, one inner time per iteration of the fixed-point), and updates the closure with work proportional only to the affected paths. Adding one edge that creates a single new path costs roughly O(diameter × Δpaths). McSherry showed this beating bespoke graph databases (Neo4j, Giraph) on social-graph workloads while being a 200-line program.

The Indian use case that lights this up is fraud-ring detection at PhonePe or Paytm. You have a graph of user → device → user → device edges. A fraud ring is a connected component with more than k shared devices among accounts that should be unrelated. Re-running this query nightly against 400 million users is a six-hour batch job. Expressed as a fixed-point dataflow, every new login (which is one edge insertion) costs only the paths it touches — usually a few dozen — and the alert fires within a second of the login completing. The same recursive primitive that powers PageRank, single-source shortest paths, and Datalog rules powers fraud detection.

Compaction and the cost of remembering history

A naive Differential Dataflow implementation would keep every diff at every time forever — useful for re-running queries at past times, ruinous for memory. The system compacts old times by collapsing every diff at time ≤ frontier into a single equivalent diff at the frontier itself. After compaction, you can no longer query the view as of t=100; you can only query it as of the current frontier. This is the same compaction idea you saw in LSM-tree segments — the trade between memory and historical query capability. Materialize exposes compaction policies as CREATE INDEX … WITH (RETAIN HISTORY = '1 hour'), letting an analyst pick how far back time-travel queries can look.

Why Materialize chose Postgres compatibility

When Materialize the company launched in 2019, the natural alternative was to invent a new SQL dialect or a new wire protocol — something more "streaming-native." They deliberately did not. They picked PostgreSQL wire compatibility because the friction of introducing a new analytical database into a Bengaluru fintech (or anywhere else) is dominated by the BI tool integration: Tableau, Metabase, Superset, every Python notebook, every Go service. By speaking pgwire, Materialize slid into all those tools as a drop-in. The slogan "the streaming database for the rest of us" only works if the SQL surface is unchanged from what an analyst already knows.

What Differential Dataflow doesn't solve

It is not a transactional database. There is no concept of multi-statement read-write transactions over the maintained views; you can read consistent snapshots, but you cannot do an UPDATE … WHERE … RETURNING against a maintained view and expect serialisable isolation across many such operations. Materialize layers some of this on top (a dataflow-as-of-time read consistency model), but the core algebra is for read-mostly analytical views, not OLTP. For OLTP backed by an Aurora-style primary, you wire CDC into Materialize and let the views ride along.

Where this leads next

References