Joins on streams: interval joins, as-of joins, temporal tables

At 09:15:00.118 IST a Zerodha order for 50 shares of RELIANCE hits the matching engine. Two streams describe what just happened: an orders stream from Kite, and a quotes stream from the NSE tick feed. Aditi's risk pipeline needs to join them — for every order, attach the prevailing best bid/ask quote at the moment the order was placed — to compute slippage. In a database this is one line of SQL. In a stream processor it is the hardest operator the runtime has, because the two streams arrive out of order, the join state grows without bound unless you tell it not to, and the answer changes depending on whether you join "near in time" or "the latest version up to this time".

Stream-stream joins fail naively because both sides are unbounded — you can't hold all of either. Three shapes work in production: interval joins (match within ±Δt; bounded state), as-of joins (match the last value at-or-before; the price-tick pattern), and temporal table joins (one stream against a versioned dimension). Each is a different bound on state and a different correctness story under late data.

Why a regular SQL join breaks on streams

A relational join evaluates A ⋈ B by reading every row of A and every row of B. On a stream, every row of A is a row that has arrived so far, and A keeps growing. A literal translation of SELECT * FROM orders o JOIN quotes q ON o.symbol = q.symbol to a stream operator would buffer every quote forever — at NSE's 200k ticks/sec, the operator's state crosses 1TB in 14 minutes. Nothing in production can hold that.

The streaming variant has to bound the state. The bound comes from one of three places: a time interval, a "latest version" rule, or a versioned dimension table that's much smaller than either stream. Each gives a different join shape, and each answers a slightly different question.

Three streaming join shapesThree rows show the same orders and quotes streams joined under different rules: interval (matches within a window), as-of (matches the latest quote at-or-before each order), and temporal (matches against a versioned dimension table). Interval, as-of, temporal — three answers to "join two streams" Interval order ± 50ms bound = window As-of order at t match: latest quote ≤ t bound = "last per key" Temporal FX_RATES (versioned dimension) v1 [09:15, 09:18) | v2 [09:18, 09:23) | v3 [09:23, ...)
Interval bounds state by clock. As-of bounds it by "last value per key". Temporal bounds it by a versioned dimension that fits in memory. Pick the one that matches what your downstream consumer actually wants.

Interval joins: the bounded-window shape

An interval join is the rule "match if and only if the two events are within ±Δt of each other". Flink writes it as orders.intervalJoin(quotes).between(Time.milliseconds(-50), Time.milliseconds(+50)). Beam's equivalent is a session-style CoGroupByKey with explicit time predicates. The semantics: for each order, emit a join row for every quote whose event_time is within 50ms before or after the order's event_time.

State stays bounded because the operator can prove that any quote older than current_watermark - 50ms will never join again — every future order's lower bound is >= current_watermark - 50ms, so older quotes can be evicted. The state size is 2Δt × keyspace × tick_rate. For Zerodha at 200k quotes/sec across 5,000 symbols with Δt = 50ms, that's roughly 200k × 0.1s = 20k quote rows in state at steady state — easily heap-resident, no RocksDB needed.

Interval joins return zero, one, or many rows per left event. A burst of quotes in the 100ms window around an order produces many join rows. Production usually adds a LIMIT 1 or "nearest" reducer downstream when only one match is meaningful — which is most of the time, and which is exactly what an as-of join does without the explosion.

As-of joins: "the latest version at-or-before"

An as-of join (ASOF JOIN in kdb+/q, Snowflake, ClickHouse; MATCH_RECOGNIZE with LAST in Flink SQL) matches each left row to the single quote whose event_time is the largest value the order's event_time. There is exactly one match per left row (or zero, if no prior quote exists for the symbol). State per key is one row: the last seen quote.

This is the dominant join shape for finance. Slippage, mark-to-market, P&L attribution, post-trade analytics — all of them need "what was the price right before this trade happened?". An interval join would force you to pick a window size and then collapse multiple matches; an as-of join answers the question directly. Why state stays tiny: the operator only needs the latest quote per symbol, so its memory footprint is keyspace × row_size. With 5,000 NSE symbols at 80 bytes each, that's 400KB. Compare to interval: 20,000+ rows. The two-orders-of-magnitude difference is what makes as-of the production default.

The hard part of as-of is out-of-order arrival. If quote at t=09:15:00.110 arrives after order at t=09:15:00.118 but the quote at t=09:15:00.115 has already arrived, which one is the "latest at-or-before"? The answer must be 0.115 (the larger one ≤ 0.118), but at the moment the order was processed, the operator didn't yet know 0.115 existed. As-of joins therefore depend on a watermark: the operator delays the join for each left row until the watermark exceeds the row's event_time, guaranteeing no earlier-but-later-arriving quote can change the answer. The latency cost is exactly one watermark interval.

Temporal table joins: one stream, one slow-changing dimension

A temporal table is a versioned dimension that you join against. Think of it as a table where every row carries [valid_from, valid_to) and the join rule is "find the row whose validity interval contains the streaming event's event_time". Flink SQL writes FOR SYSTEM_TIME AS OF orders.event_time; Beam has SideInput with windowing that approximates it.

The canonical use is currency conversion. A payments stream from Razorpay (1M events/sec at peak) joins against an fx_rates table (≈200 currency pairs × a few updates per day = a few thousand rows total). You don't want to ship fx_rates as a stream — it's a tiny dimension. You want the runtime to cache the latest version of the dimension, refresh it on the slow-update channel, and join the fast stream against it under temporal-correctness rules.

Temporal table joins are the right shape when one side is genuinely small (fits in operator memory or in a broadcast variable) and slow-changing relative to the other. The state size is dimension_size × version_count. For fx_rates with 200 keys × 365 versions/year = 73k rows: 6MB at 80 bytes/row. That's a side input, not a state store. Why this isn't just a regular dictionary lookup: regular lookups use the current version, which gives wrong answers under replay or backfill. A payments event from 09:15 IST joined against the now-current INR/USD rate produces a different number than joining against the rate that was in effect at 09:15. Temporal joins use the dimension version active at the event's event_time, which is replay-safe.

Build the three: 80 lines of stream-join semantics

# stream_joins.py — interval, as-of, and temporal joins side by side
from collections import defaultdict
from bisect import bisect_right
from dataclasses import dataclass

@dataclass
class Order: t: int; symbol: str; qty: int
@dataclass
class Quote: t: int; symbol: str; bid: int; ask: int  # paise

# Two streams, slightly out of order
quotes = [Quote(100, "RELIANCE", 286500, 286550),
          Quote(115, "RELIANCE", 286480, 286530),
          Quote(118, "RELIANCE", 286470, 286520),
          Quote(150, "RELIANCE", 286460, 286510),
          Quote(110, "TCS",      370000, 370050),
          Quote(140, "TCS",      370020, 370070)]
orders = [Order(118, "RELIANCE", 50),
          Order(135, "TCS",      10),
          Order(160, "RELIANCE", 25)]

def interval_join(orders, quotes, before_ms=50, after_ms=50):
    by_sym = defaultdict(list)
    for q in quotes: by_sym[q.symbol].append(q)
    for v in by_sym.values(): v.sort(key=lambda q: q.t)
    out = []
    for o in orders:
        for q in by_sym[o.symbol]:
            if o.t - before_ms <= q.t <= o.t + after_ms:
                out.append((o, q))
    return out

def asof_join(orders, quotes):
    by_sym = defaultdict(list)
    for q in sorted(quotes, key=lambda q: q.t):
        by_sym[q.symbol].append(q)
    out = []
    for o in orders:
        qs = by_sym[o.symbol]
        ts = [q.t for q in qs]
        idx = bisect_right(ts, o.t) - 1   # latest quote with t <= o.t
        out.append((o, qs[idx] if idx >= 0 else None))
    return out

# A versioned FX dimension: (valid_from, rate_paise_per_usd)
fx_versions = [(0,    8200), (130,  8260), (155,  8240)]
def temporal_join(orders, fx_versions):
    fx_versions = sorted(fx_versions)
    times = [v[0] for v in fx_versions]
    out = []
    for o in orders:
        idx = bisect_right(times, o.t) - 1
        rate = fx_versions[idx][1] if idx >= 0 else None
        out.append((o, rate))
    return out

print("INTERVAL (±50ms)")
for o, q in interval_join(orders, quotes):
    print(f"  order t={o.t:3d} {o.symbol:8s} -> quote t={q.t} mid={(q.bid+q.ask)/2/100:.2f}")

print("\nAS-OF (latest <= order.t)")
for o, q in asof_join(orders, quotes):
    print(f"  order t={o.t:3d} {o.symbol:8s} -> quote t={q.t if q else '-'} "
          f"mid={(q.bid+q.ask)/2/100:.2f if q else float('nan')}")

print("\nTEMPORAL (FX as-of order.t)")
for o, rate in temporal_join(orders, fx_versions):
    print(f"  order t={o.t:3d} {o.symbol:8s} qty={o.qty:3d} -> fx={rate/100:.2f} INR/USD")
INTERVAL (±50ms)
  order t=118 RELIANCE -> quote t=100 mid=2865.25
  order t=118 RELIANCE -> quote t=115 mid=2865.05
  order t=118 RELIANCE -> quote t=118 mid=2864.95
  order t=118 RELIANCE -> quote t=150 mid=2864.85
  order t=135 TCS      -> quote t=110 mid=3700.25
  order t=135 TCS      -> quote t=140 mid=3700.45
  order t=160 RELIANCE -> quote t=118 mid=2864.95
  order t=160 RELIANCE -> quote t=150 mid=2864.85

AS-OF (latest <= order.t)
  order t=118 RELIANCE -> quote t=118 mid=2864.95
  order t=135 TCS      -> quote t=110 mid=3700.25
  order t=160 RELIANCE -> quote t=150 mid=2864.85

TEMPORAL (FX as-of order.t)
  order t=118 RELIANCE qty= 50 -> fx=82.00 INR/USD
  order t=135 TCS      qty= 10 -> fx=82.60 INR/USD
  order t=160 RELIANCE qty= 25 -> fx=82.40 INR/USD

Six lines do the engineering:

A second figure: state growth under each shape

The single best predictor of whether a stream-join will hold up in production is the shape of its state-size curve over time. Interval joins have a flat steady-state profile (state proportional to window width × event rate). As-of joins have a staircase that plateaus once every key has been seen once. Temporal joins have a tiny dimension that barely grows.

State growth profiles for the three join shapesA line chart showing how operator state size grows over time for interval, as-of, and temporal joins. Interval is flat at a moderate level. As-of climbs and plateaus at a low level. Temporal is near-zero. State size vs time, three join shapes processing time → state bytes → interval (Δt × rate) as-of (≈ keyspace) temporal (≈ versions) step-up: window fills ramp: each new symbol adds 1 row
Interval state hits its ceiling within one window width. As-of state climbs as new keys appear, plateaus once the keyspace is saturated. Temporal state is dominated by version count, not event rate.

The y-axis dictates the operating cost. At Razorpay's scale the temporal join's state is 6MB; the as-of join across UPI handles is ~80MB; an interval join across the same stream with a 60-second window is 4GB+. State backend choice (heap vs RocksDB) and parallelism budget follow directly from this picture.

What goes wrong in production

A stream-stream join is the operator most likely to silently produce wrong numbers. The four common failures:

Watermark mis-alignment. The two input streams advance their watermarks at different rates. orders watermark is at 09:16:00 but quotes watermark is at 09:15:30. An interval join takes min(left, right) as its operator watermark, so the join's state is held until the slower side advances. If quotes is permanently 30 seconds behind orders (a slow Kafka partition), the join's state grows by 30 seconds × full quote rate forever. The fix is withIdleness on the stream-source side and explicit per-source watermark monitoring.

Skewed keys. RELIANCE has 200k quotes/sec, MIDCAP_X has 5/sec. The interval join's state is sharded by symbol-keyed parallelism; the worker holding RELIANCE has 40,000× the state of others. State-store hotspot, GC pauses, missed deadlines. The Flink fix is keyBy(symbol).rebalance().keyBy(symbol) after the join to redistribute, or use a different parallelism scheme (sub-keyed join with a hash of the order_id).

Mismatched grain. The join keys' types disagree subtly. quotes.symbol is RELIANCE-EQ (with the -EQ exchange suffix); orders.symbol is RELIANCE. The join is silent — every right row has zero matches, the left side flows through unchanged, dashboards show every trade as "no quote available". The fix is an explicit schema contract on join keys (see /wiki/data-contracts-the-producer-consumer-boundary) and an alert on join hit rate.

Re-firing under late data. If the join is followed by an aggregation, late events change the join row, which changes the aggregate. With the wrong sink, this produces double-counting. The fix is the late-data policy from the previous chapter (/wiki/late-data-drop-reprocess-or-side-output) plus an idempotent or retraction-aware sink downstream.

Common confusions

Going deeper

How Flink implements the interval join under the hood

Flink's IntervalJoinOperator uses two MapState<Long, List<Row>> — one per side — keyed by event timestamp. On every left event, it scans the right state for entries within [t - lower, t + upper]. State cleanup runs on watermark advance: any entry with t < watermark - upper on the left or t < watermark - lower on the right is evicted, because no future event on the other side can join it. The state grows linearly with (upper + lower) × per-key event rate × parallelism. Flink emits both inner and left outer interval joins; right outer is implemented by swapping arguments. Under the hood, RocksDB compaction can lag behind eviction during high-throughput bursts, which is why interval joins benefit from heap state-backend on small windows.

The Zerodha post-trade pipeline at NSE close

A real Zerodha pipeline at 15:30 IST processes the day's trades: the order stream is bounded (~1 crore orders), the quote stream is bounded (~30 crore ticks), and slippage is computed as (execution_price - prevailing_mid). The shape is as-of: each order joins to the quote whose timestamp is the largest ≤ order.execution_time. The pipeline runs on Flink with keyBy(symbol) and MapState<Long, Quote> holding the latest 5 quotes per symbol (extra versions for replay safety). State size is ~5,000 symbols × 5 quotes × 96 bytes = 2.4MB per parallelism slot. The slippage rows feed into a Trino-on-S3 lakehouse table for compliance reporting; the post-trade window's correctness is audited against an offline as-of join in DuckDB on a sample. Tolerance for as-of mismatch is zero — a single mis-joined trade is a regulatory finding.

As-of vs LATERAL: the SQL alternative that doesn't quite work

Standard SQL has LATERAL joins, which let the right side be a per-row subquery. SELECT * FROM orders o, LATERAL (SELECT * FROM quotes q WHERE q.symbol = o.symbol AND q.t <= o.t ORDER BY q.t DESC LIMIT 1) is the as-of in vanilla Postgres. It works in batch and on bounded streams. On an unbounded stream, the LATERAL subquery requires materialising every quote ever seen, so a streaming engine can't safely execute it. Flink SQL refuses to plan it as a stream operator and forces you to declare LOOKUP JOIN (against a JDBC table) or MATCH_RECOGNIZE instead. The rewrite is mechanical but the planner won't do it for you — it errors out with "Could not produce a streaming plan for LATERAL".

Watermark per-key joins and the Kafka 256-partition reality

A real Kafka source has many partitions, each with its own event-time progression. A naive global watermark is min(per_partition_watermarks), which is dragged down by any quiet partition. withIdleness(Duration.ofSeconds(30)) excludes partitions quiet for 30s. Stream-stream joins are sensitive to this: a join's state holding for a slow partition's catch-up is the most common cause of unbounded state growth in production Flink. The fix is per-key watermarking — each symbol gets its own watermark that advances independently. Flink doesn't expose this as a primitive; teams hand-roll it with a KeyedProcessFunction that maintains a per-key lastSeenEventTime and computes the join's effective watermark from the minimum across active keys only. This is one of the harder pieces of streaming engineering and is why managed services (Confluent Cloud, AWS MSK + Kinesis Analytics) bake it in.

The differential dataflow alternative

Differential dataflow (/wiki/iceberg-delta-hudi-from-the-producers-perspective is a different topic, but the foundational paper applies here) makes joins on streams correct under late data without explicit lateness policy. The trick: every row is tagged with (value, time, multiplicity) — a positive multiplicity for an addition, a negative for a retraction. A late event arriving for a closed window emits a (-old, +new) retraction-emit pair through every operator, including the join. The join's output remains exactly correct under any reordering or replay. The cost is operator complexity (every operator must support negative multiplicities) and a typically 3–10× constant-factor overhead vs Flink's append-only model. Materialize and Feldera commercialise this; production adoption in India is rare in 2026 but the model is the cleanest answer in the literature.

Where this leads next

The next chapter — /wiki/checkpointing-the-consistent-snapshot-algorithm — explains how a stream operator's state (including all the join state described above) is snapshotted consistently across a distributed pipeline. Without checkpointing, an interval join with 20MB of state loses everything on a worker crash; with checkpointing, the worker resumes from the last consistent snapshot and the join's correctness survives the failure. The checkpoint interval determines the maximum reprocessing time after a crash, which determines how aggressive your join's per-key TTLs can be.

After that, /wiki/state-backends-heap-rocksdb-external covers where the join state physically lives. An as-of join's per-key state of "last 5 quotes" sits comfortably in heap; an interval join's per-window state of "all quotes in last 100ms" overflows to RocksDB on high-throughput keys. The choice of backend changes the latency profile by 2–10×.

The mental model for stream joins: the join shape is a contract about what the right answer is. The state size is the cost of paying that contract. The watermark coordination is the source of every wrong answer in production.

References