Joins on streams: interval joins, as-of joins, temporal tables
At 09:15:00.118 IST a Zerodha order for 50 shares of RELIANCE hits the matching engine. Two streams describe what just happened: an orders stream from Kite, and a quotes stream from the NSE tick feed. Aditi's risk pipeline needs to join them — for every order, attach the prevailing best bid/ask quote at the moment the order was placed — to compute slippage. In a database this is one line of SQL. In a stream processor it is the hardest operator the runtime has, because the two streams arrive out of order, the join state grows without bound unless you tell it not to, and the answer changes depending on whether you join "near in time" or "the latest version up to this time".
Stream-stream joins fail naively because both sides are unbounded — you can't hold all of either. Three shapes work in production: interval joins (match within ±Δt; bounded state), as-of joins (match the last value at-or-before; the price-tick pattern), and temporal table joins (one stream against a versioned dimension). Each is a different bound on state and a different correctness story under late data.
Why a regular SQL join breaks on streams
A relational join evaluates A ⋈ B by reading every row of A and every row of B. On a stream, every row of A is a row that has arrived so far, and A keeps growing. A literal translation of SELECT * FROM orders o JOIN quotes q ON o.symbol = q.symbol to a stream operator would buffer every quote forever — at NSE's 200k ticks/sec, the operator's state crosses 1TB in 14 minutes. Nothing in production can hold that.
The streaming variant has to bound the state. The bound comes from one of three places: a time interval, a "latest version" rule, or a versioned dimension table that's much smaller than either stream. Each gives a different join shape, and each answers a slightly different question.
Interval joins: the bounded-window shape
An interval join is the rule "match if and only if the two events are within ±Δt of each other". Flink writes it as orders.intervalJoin(quotes).between(Time.milliseconds(-50), Time.milliseconds(+50)). Beam's equivalent is a session-style CoGroupByKey with explicit time predicates. The semantics: for each order, emit a join row for every quote whose event_time is within 50ms before or after the order's event_time.
State stays bounded because the operator can prove that any quote older than current_watermark - 50ms will never join again — every future order's lower bound is >= current_watermark - 50ms, so older quotes can be evicted. The state size is 2Δt × keyspace × tick_rate. For Zerodha at 200k quotes/sec across 5,000 symbols with Δt = 50ms, that's roughly 200k × 0.1s = 20k quote rows in state at steady state — easily heap-resident, no RocksDB needed.
Interval joins return zero, one, or many rows per left event. A burst of quotes in the 100ms window around an order produces many join rows. Production usually adds a LIMIT 1 or "nearest" reducer downstream when only one match is meaningful — which is most of the time, and which is exactly what an as-of join does without the explosion.
As-of joins: "the latest version at-or-before"
An as-of join (ASOF JOIN in kdb+/q, Snowflake, ClickHouse; MATCH_RECOGNIZE with LAST in Flink SQL) matches each left row to the single quote whose event_time is the largest value ≤ the order's event_time. There is exactly one match per left row (or zero, if no prior quote exists for the symbol). State per key is one row: the last seen quote.
This is the dominant join shape for finance. Slippage, mark-to-market, P&L attribution, post-trade analytics — all of them need "what was the price right before this trade happened?". An interval join would force you to pick a window size and then collapse multiple matches; an as-of join answers the question directly. Why state stays tiny: the operator only needs the latest quote per symbol, so its memory footprint is keyspace × row_size. With 5,000 NSE symbols at 80 bytes each, that's 400KB. Compare to interval: 20,000+ rows. The two-orders-of-magnitude difference is what makes as-of the production default.
The hard part of as-of is out-of-order arrival. If quote at t=09:15:00.110 arrives after order at t=09:15:00.118 but the quote at t=09:15:00.115 has already arrived, which one is the "latest at-or-before"? The answer must be 0.115 (the larger one ≤ 0.118), but at the moment the order was processed, the operator didn't yet know 0.115 existed. As-of joins therefore depend on a watermark: the operator delays the join for each left row until the watermark exceeds the row's event_time, guaranteeing no earlier-but-later-arriving quote can change the answer. The latency cost is exactly one watermark interval.
Temporal table joins: one stream, one slow-changing dimension
A temporal table is a versioned dimension that you join against. Think of it as a table where every row carries [valid_from, valid_to) and the join rule is "find the row whose validity interval contains the streaming event's event_time". Flink SQL writes FOR SYSTEM_TIME AS OF orders.event_time; Beam has SideInput with windowing that approximates it.
The canonical use is currency conversion. A payments stream from Razorpay (1M events/sec at peak) joins against an fx_rates table (≈200 currency pairs × a few updates per day = a few thousand rows total). You don't want to ship fx_rates as a stream — it's a tiny dimension. You want the runtime to cache the latest version of the dimension, refresh it on the slow-update channel, and join the fast stream against it under temporal-correctness rules.
Temporal table joins are the right shape when one side is genuinely small (fits in operator memory or in a broadcast variable) and slow-changing relative to the other. The state size is dimension_size × version_count. For fx_rates with 200 keys × 365 versions/year = 73k rows: 6MB at 80 bytes/row. That's a side input, not a state store. Why this isn't just a regular dictionary lookup: regular lookups use the current version, which gives wrong answers under replay or backfill. A payments event from 09:15 IST joined against the now-current INR/USD rate produces a different number than joining against the rate that was in effect at 09:15. Temporal joins use the dimension version active at the event's event_time, which is replay-safe.
Build the three: 80 lines of stream-join semantics
# stream_joins.py — interval, as-of, and temporal joins side by side
from collections import defaultdict
from bisect import bisect_right
from dataclasses import dataclass
@dataclass
class Order: t: int; symbol: str; qty: int
@dataclass
class Quote: t: int; symbol: str; bid: int; ask: int # paise
# Two streams, slightly out of order
quotes = [Quote(100, "RELIANCE", 286500, 286550),
Quote(115, "RELIANCE", 286480, 286530),
Quote(118, "RELIANCE", 286470, 286520),
Quote(150, "RELIANCE", 286460, 286510),
Quote(110, "TCS", 370000, 370050),
Quote(140, "TCS", 370020, 370070)]
orders = [Order(118, "RELIANCE", 50),
Order(135, "TCS", 10),
Order(160, "RELIANCE", 25)]
def interval_join(orders, quotes, before_ms=50, after_ms=50):
by_sym = defaultdict(list)
for q in quotes: by_sym[q.symbol].append(q)
for v in by_sym.values(): v.sort(key=lambda q: q.t)
out = []
for o in orders:
for q in by_sym[o.symbol]:
if o.t - before_ms <= q.t <= o.t + after_ms:
out.append((o, q))
return out
def asof_join(orders, quotes):
by_sym = defaultdict(list)
for q in sorted(quotes, key=lambda q: q.t):
by_sym[q.symbol].append(q)
out = []
for o in orders:
qs = by_sym[o.symbol]
ts = [q.t for q in qs]
idx = bisect_right(ts, o.t) - 1 # latest quote with t <= o.t
out.append((o, qs[idx] if idx >= 0 else None))
return out
# A versioned FX dimension: (valid_from, rate_paise_per_usd)
fx_versions = [(0, 8200), (130, 8260), (155, 8240)]
def temporal_join(orders, fx_versions):
fx_versions = sorted(fx_versions)
times = [v[0] for v in fx_versions]
out = []
for o in orders:
idx = bisect_right(times, o.t) - 1
rate = fx_versions[idx][1] if idx >= 0 else None
out.append((o, rate))
return out
print("INTERVAL (±50ms)")
for o, q in interval_join(orders, quotes):
print(f" order t={o.t:3d} {o.symbol:8s} -> quote t={q.t} mid={(q.bid+q.ask)/2/100:.2f}")
print("\nAS-OF (latest <= order.t)")
for o, q in asof_join(orders, quotes):
print(f" order t={o.t:3d} {o.symbol:8s} -> quote t={q.t if q else '-'} "
f"mid={(q.bid+q.ask)/2/100:.2f if q else float('nan')}")
print("\nTEMPORAL (FX as-of order.t)")
for o, rate in temporal_join(orders, fx_versions):
print(f" order t={o.t:3d} {o.symbol:8s} qty={o.qty:3d} -> fx={rate/100:.2f} INR/USD")
INTERVAL (±50ms)
order t=118 RELIANCE -> quote t=100 mid=2865.25
order t=118 RELIANCE -> quote t=115 mid=2865.05
order t=118 RELIANCE -> quote t=118 mid=2864.95
order t=118 RELIANCE -> quote t=150 mid=2864.85
order t=135 TCS -> quote t=110 mid=3700.25
order t=135 TCS -> quote t=140 mid=3700.45
order t=160 RELIANCE -> quote t=118 mid=2864.95
order t=160 RELIANCE -> quote t=150 mid=2864.85
AS-OF (latest <= order.t)
order t=118 RELIANCE -> quote t=118 mid=2864.95
order t=135 TCS -> quote t=110 mid=3700.25
order t=160 RELIANCE -> quote t=150 mid=2864.85
TEMPORAL (FX as-of order.t)
order t=118 RELIANCE qty= 50 -> fx=82.00 INR/USD
order t=135 TCS qty= 10 -> fx=82.60 INR/USD
order t=160 RELIANCE qty= 25 -> fx=82.40 INR/USD
Six lines do the engineering:
bisect_right(ts, o.t) - 1— the as-of primitive.bisect_rightreturns the insert position immediately after equal values, so-1gives the latest entry witht <= o.t. Why this matters in production: the bisect over a sorted-by-event-time per-key list is the same data structure Flink uses internally for as-of joins (MapState<Long, Row>with a tombstone-awareheadMaplookup). The state-per-key is one entry; lookup is O(log n) over n quotes seen so far.if o.t - before_ms <= q.t <= o.t + after_ms— the interval predicate. The+after_mshalf is the trap: it requires the operator to delay emission for each order until the watermark crosseso.t + after_ms, otherwise a future quote could still join. Production interval joins always have this delay budget; under-tuning it produces wrong answers, over-tuning it produces stale dashboards.- Eight rows from interval, three from as-of — same input, same predicate-on-same-key, very different cardinality. The interval join's 8 rows would force a downstream
GROUP BY order_idto collapse to a single answer; as-of returns it directly. The choice of join shape is the choice of where the cardinality reduction happens. fx_versions = [(0, 8200), (130, 8260), (155, 8240)]— the temporal table. Just three versions. TheRELIANCEorder att=160correctly picks the version starting att=155(rate 82.40), not the now-current rate. Why backfill correctness depends on this: replaying yesterday's payments stream against today'sfx_versionssnapshot would give wrong INR amounts unless the join uses the version active at each payment'sevent_time. Temporal join semantics make backfill produce the exact same numbers as the original run.asof_joinreturns one row,interval_joinmay return many — andtemporal_joinreturns exactly one — these are the three cardinality regimes. Production picks the join shape by what the consumer wants: "tag each trade with one prior quote" (as-of), "tag each trade with all quotes nearby for noise estimation" (interval), "convert each transaction's currency at the rate in effect" (temporal).- Per-key state size differs by orders of magnitude —
interval_joinkeeps every quote in the window inby_sym;asof_joinonly needs the latest one (a real implementation TTLs the older ones);temporal_joinkeeps a few hundred dimension versions total, not per key. State size determines whether the operator is heap-resident or RocksDB-resident, which determines throughput.
A second figure: state growth under each shape
The single best predictor of whether a stream-join will hold up in production is the shape of its state-size curve over time. Interval joins have a flat steady-state profile (state proportional to window width × event rate). As-of joins have a staircase that plateaus once every key has been seen once. Temporal joins have a tiny dimension that barely grows.
The y-axis dictates the operating cost. At Razorpay's scale the temporal join's state is 6MB; the as-of join across UPI handles is ~80MB; an interval join across the same stream with a 60-second window is 4GB+. State backend choice (heap vs RocksDB) and parallelism budget follow directly from this picture.
What goes wrong in production
A stream-stream join is the operator most likely to silently produce wrong numbers. The four common failures:
Watermark mis-alignment. The two input streams advance their watermarks at different rates. orders watermark is at 09:16:00 but quotes watermark is at 09:15:30. An interval join takes min(left, right) as its operator watermark, so the join's state is held until the slower side advances. If quotes is permanently 30 seconds behind orders (a slow Kafka partition), the join's state grows by 30 seconds × full quote rate forever. The fix is withIdleness on the stream-source side and explicit per-source watermark monitoring.
Skewed keys. RELIANCE has 200k quotes/sec, MIDCAP_X has 5/sec. The interval join's state is sharded by symbol-keyed parallelism; the worker holding RELIANCE has 40,000× the state of others. State-store hotspot, GC pauses, missed deadlines. The Flink fix is keyBy(symbol).rebalance().keyBy(symbol) after the join to redistribute, or use a different parallelism scheme (sub-keyed join with a hash of the order_id).
Mismatched grain. The join keys' types disagree subtly. quotes.symbol is RELIANCE-EQ (with the -EQ exchange suffix); orders.symbol is RELIANCE. The join is silent — every right row has zero matches, the left side flows through unchanged, dashboards show every trade as "no quote available". The fix is an explicit schema contract on join keys (see /wiki/data-contracts-the-producer-consumer-boundary) and an alert on join hit rate.
Re-firing under late data. If the join is followed by an aggregation, late events change the join row, which changes the aggregate. With the wrong sink, this produces double-counting. The fix is the late-data policy from the previous chapter (/wiki/late-data-drop-reprocess-or-side-output) plus an idempotent or retraction-aware sink downstream.
Common confusions
- "An interval join is the same as a windowed inner join." Close but not identical. A windowed inner join (
A WINDOW JOIN B ON A.k = B.k WITHIN 1 MINUTE) buckets both streams into the same fixed window and joins within the bucket — events on opposite sides of a bucket boundary don't match even if they're 1 second apart. An interval join uses a sliding window centred on each event, so a09:00:59order matches a09:01:01quote. The semantic difference shows up as ~12% missed matches in real Zerodha data when teams pick the wrong shape. - "As-of join means the right side has at most one row per key, ever." No. The right side is an unbounded stream — many quotes per symbol per second. As-of just means the matched row is the latest at-or-before. The right-side state holds the latest per key (one row per symbol), which is what makes it cheap.
- "Temporal table joins need a separate database." No. The "temporal table" is just a stream that the runtime treats as a versioned dimension. In Flink,
CREATE TEMPORARY VIEW fx_rates AS SELECT * FROM fx_streamfollowed byFOR SYSTEM_TIME AS OF orders.event_timeis enough. The dimension is materialised in operator state, not in an external store. - "You can ASOF join two unbounded streams without bounds and stay safe." Only if every key on the right side gets updates regularly. If a symbol stops trading at 15:30, its last quote is held in state forever. Without a session-timeout TTL on the per-key state, the as-of join's memory grows monotonically. Flink's
MapState.clear()after configurable idle time is the standard mitigation. - "A regular
INNER JOINin Flink SQL works the same as in batch." It does not — it produces an "unbounded join" withMAX_LIFETIMEdefaulting to never. Production breaks. Flink will emit a warning to useINTERVAL JOINorLOOKUP JOINinstead. Trust the warning. - "Late data on a stream-stream join just causes wrong answers; you can fix downstream." No — late data on the join's right side causes a missed match that may never be corrected. If a slippage row was emitted with
quote_mid = NULLbecause the right-side watermark was behind, the left-side dashboard shows an under-count even after the quote arrives. Stream-stream joins must use side-output for late events on either side, not just the left.
Going deeper
How Flink implements the interval join under the hood
Flink's IntervalJoinOperator uses two MapState<Long, List<Row>> — one per side — keyed by event timestamp. On every left event, it scans the right state for entries within [t - lower, t + upper]. State cleanup runs on watermark advance: any entry with t < watermark - upper on the left or t < watermark - lower on the right is evicted, because no future event on the other side can join it. The state grows linearly with (upper + lower) × per-key event rate × parallelism. Flink emits both inner and left outer interval joins; right outer is implemented by swapping arguments. Under the hood, RocksDB compaction can lag behind eviction during high-throughput bursts, which is why interval joins benefit from heap state-backend on small windows.
The Zerodha post-trade pipeline at NSE close
A real Zerodha pipeline at 15:30 IST processes the day's trades: the order stream is bounded (~1 crore orders), the quote stream is bounded (~30 crore ticks), and slippage is computed as (execution_price - prevailing_mid). The shape is as-of: each order joins to the quote whose timestamp is the largest ≤ order.execution_time. The pipeline runs on Flink with keyBy(symbol) and MapState<Long, Quote> holding the latest 5 quotes per symbol (extra versions for replay safety). State size is ~5,000 symbols × 5 quotes × 96 bytes = 2.4MB per parallelism slot. The slippage rows feed into a Trino-on-S3 lakehouse table for compliance reporting; the post-trade window's correctness is audited against an offline as-of join in DuckDB on a sample. Tolerance for as-of mismatch is zero — a single mis-joined trade is a regulatory finding.
As-of vs LATERAL: the SQL alternative that doesn't quite work
Standard SQL has LATERAL joins, which let the right side be a per-row subquery. SELECT * FROM orders o, LATERAL (SELECT * FROM quotes q WHERE q.symbol = o.symbol AND q.t <= o.t ORDER BY q.t DESC LIMIT 1) is the as-of in vanilla Postgres. It works in batch and on bounded streams. On an unbounded stream, the LATERAL subquery requires materialising every quote ever seen, so a streaming engine can't safely execute it. Flink SQL refuses to plan it as a stream operator and forces you to declare LOOKUP JOIN (against a JDBC table) or MATCH_RECOGNIZE instead. The rewrite is mechanical but the planner won't do it for you — it errors out with "Could not produce a streaming plan for LATERAL".
Watermark per-key joins and the Kafka 256-partition reality
A real Kafka source has many partitions, each with its own event-time progression. A naive global watermark is min(per_partition_watermarks), which is dragged down by any quiet partition. withIdleness(Duration.ofSeconds(30)) excludes partitions quiet for 30s. Stream-stream joins are sensitive to this: a join's state holding for a slow partition's catch-up is the most common cause of unbounded state growth in production Flink. The fix is per-key watermarking — each symbol gets its own watermark that advances independently. Flink doesn't expose this as a primitive; teams hand-roll it with a KeyedProcessFunction that maintains a per-key lastSeenEventTime and computes the join's effective watermark from the minimum across active keys only. This is one of the harder pieces of streaming engineering and is why managed services (Confluent Cloud, AWS MSK + Kinesis Analytics) bake it in.
The differential dataflow alternative
Differential dataflow (/wiki/iceberg-delta-hudi-from-the-producers-perspective is a different topic, but the foundational paper applies here) makes joins on streams correct under late data without explicit lateness policy. The trick: every row is tagged with (value, time, multiplicity) — a positive multiplicity for an addition, a negative for a retraction. A late event arriving for a closed window emits a (-old, +new) retraction-emit pair through every operator, including the join. The join's output remains exactly correct under any reordering or replay. The cost is operator complexity (every operator must support negative multiplicities) and a typically 3–10× constant-factor overhead vs Flink's append-only model. Materialize and Feldera commercialise this; production adoption in India is rare in 2026 but the model is the cleanest answer in the literature.
Where this leads next
The next chapter — /wiki/checkpointing-the-consistent-snapshot-algorithm — explains how a stream operator's state (including all the join state described above) is snapshotted consistently across a distributed pipeline. Without checkpointing, an interval join with 20MB of state loses everything on a worker crash; with checkpointing, the worker resumes from the last consistent snapshot and the join's correctness survives the failure. The checkpoint interval determines the maximum reprocessing time after a crash, which determines how aggressive your join's per-key TTLs can be.
After that, /wiki/state-backends-heap-rocksdb-external covers where the join state physically lives. An as-of join's per-key state of "last 5 quotes" sits comfortably in heap; an interval join's per-window state of "all quotes in last 100ms" overflows to RocksDB on high-throughput keys. The choice of backend changes the latency profile by 2–10×.
The mental model for stream joins: the join shape is a contract about what the right answer is. The state size is the cost of paying that contract. The watermark coordination is the source of every wrong answer in production.
References
- The Dataflow Model — Akidau et al. (VLDB 2015) — section 5 covers how triggers and watermarks compose with joins; the foundation for streaming join correctness.
- Apache Flink — Joins on Streams — the production-grade DataStream API for interval joins; the SQL surface adds temporal-table semantics on top.
- Flink SQL — Temporal Table Joins — the cleanest implementation of
FOR SYSTEM_TIME AS OFsemantics in open source. - kdb+/q
aj— As-of Join — the original as-of join, a finance primitive since the 1990s; the semantics that ClickHouse, Snowflake, and Flink all derived from. - ClickHouse — ASOF JOIN — a practical explanation with finance examples; ClickHouse's join is one of the few that works correctly on unbounded ingest with merge-tree state.
- Materialize — Streaming Joins via Differential Dataflow — the retraction-based correctness model; the cleanest answer to "what should a streaming join do under late data?".
- Zerodha Engineering — Building Kite's order pipeline — production case studies of NSE-scale streaming.
- /wiki/late-data-drop-reprocess-or-side-output — the previous chapter; stream joins inherit late-data policy from each input.
- /wiki/event-time-vs-processing-time-the-whole-ballgame — without event-time semantics, "join within ±50ms" has no meaning.