Offline features: big tables, point-in-time correctness

A Razorpay data scientist named Aditi opens her laptop on a Tuesday morning and runs feast get-historical-features against six months of UPI transactions to produce a fraud-model training set. The query reads 124 billion historical rows, joins each one to its 84 features as those features stood at the row's own timestamp, and writes 8.7 TB of training data to S3. The job finishes in 47 minutes on a 200-core Spark cluster. Two years ago, the equivalent training-set rebuild took 14 hours, leaked label information, and produced a model that scored 0.94 offline and 0.71 online. The difference is not the cluster size. The difference is that the join is now a point-in-time join, the table layout is bucketed by (entity_id, event_timestamp), and the offline store knows the difference between "as of now" and "as of then". The rest of this chapter is what changed.

The offline store materialises features as a big bucketed table where every row is a (entity_id, feature_value, valid_from, valid_to) interval. Producing a training set means joining a label table to this big table with an AS OF JOIN — for each label row, find the single feature interval whose [valid_from, valid_to) contains the label's event timestamp. The whole subject is how to make that join cheap at billions of rows: bucketed storage on entity_id, sorted-merge against the labels, and snapshot isolation against the warehouse.

Why the offline store is its own thing

The previous chapter ended with a claim: the offline store cannot just be the warehouse. The warehouse is partitioned by ingest date, indexed for analyst queries, and tuned for "give me yesterday's GMV by category". The offline store has one job — answer this exact query, fast, at scale:

Given 124 billion (entity_id, event_ts) label rows, return for each row the value of feature F as of event_ts, exactly once.

That is structurally a different access pattern. The warehouse stores facts; the offline store stores time-versioned snapshots of features keyed by entity. The two layouts diverge in three specific ways.

Bucketing by entity, not by date. A warehouse fact table is partitioned by event_date because analysts ask "what happened yesterday?" and the engine wants to skip 99 percent of the table. The offline store partitions on entity_id (hash bucket) and sorts within the partition by event_ts because the model-training query asks "what is feature F for this card_id, just before this timestamp?". Date partitioning would force every training query to read every partition; entity bucketing reads at most one partition per training row.

Time-interval rows, not snapshot rows. A warehouse fact says "card 7 had merchant_count=42 on 2026-04-25". The offline store says "card 7's merchant_count was 41 from 14:23:00 to 14:23:47, and 42 from 14:23:47 onward". The interval form is what makes AS OF JOIN possible — the join reduces to "find the interval containing my timestamp", which is a sorted-merge.

Snapshot isolation against late events. A warehouse rebuild on April 25 sees the world as the warehouse looked on April 25. The offline store rebuild has to honour that the training row was written on March 12 and must see the world as it looked on March 12 — even if a late event arrived on March 14 and corrected a value. The store either keeps both timelines (event-time table + processing-time table) or rebuilds the historical store every time a late event lands. Hudi's solution is the former; Iceberg's snapshot-time-travel makes the latter cheap.

Why the offline store has a different shape than the warehouseA diagram contrasting the warehouse layout — partitioned by event_date, optimised for analyst queries — with the offline store layout — bucketed by entity_id, sorted by event_ts within bucket, storing intervals rather than snapshots. Warehouse vs. offline store: same data, two layouts Warehouse fact table PARTITIONED BY event_date 2026-04-25/ 412M rows (one file per hour) 2026-04-24/ 408M rows 2026-04-23/ 405M rows Query: "yesterday's GMV by merchant" Reads 1 partition. Skips 179 others. Wrong shape for "feature for card 7 at exactly 14:23:00.450" Offline feature store BUCKETED BY hash(entity_id), 64 buckets bucket_07/ all events for cards mod 64 = 7 sorted by (card_id, event_ts ASC) rows are intervals: [valid_from, valid_to) Query: AS OF JOIN labels USING (card_id) Reads 1 bucket per card. Sorted-merge. For each label, exactly one feature interval matches. O(N+M) total.
Same Iceberg storage layer, two layouts. The warehouse and offline store coexist — one stores facts for analysts, one stores time-versioned features for trainers. Forcing them into one layout makes one of them slow.

The first instinct is to skip building this and let the warehouse do double duty. It works for one feature, two queries, six months. By the time the team has 80 features and weekly retrains, the cluster cost of running point-in-time joins against date-partitioned tables exceeds the cost of building a separate offline store within a quarter. Hopsworks's published numbers from 2023 show a 12× speed-up after switching to bucketed feature tables; Razorpay's internal numbers from 2025 show 18×. The shape change is not a refinement, it is the whole point.

The AS OF JOIN, derived from first principles

Forget feature stores for a moment. Forget Spark. You have two tables and you need to join them by entity_id plus "feature value as of label timestamp". Walk through what the join has to do, mechanically.

Inputs. A label table L with columns (card_id, label_ts, fraud_label) — say 500 million rows over six months. A feature-event table F with columns (card_id, feature_ts, feature_value) — say 8 billion rows, one per change of the feature value. (For an aggregate feature like card_txn_count_24h, F is the materialised stream of value changes; the platform produced it once and stores it as the feature group's history.)

Output. A joined table J with columns (card_id, label_ts, fraud_label, feature_value_at_label_ts). Each row of L produces exactly one row of J. The feature_value_at_label_ts is the value of feature_value from the most recent F row for that card_id whose feature_ts <= label_ts.

The naive nested-loop is O(N×M) — for each label row, scan all feature rows for that card. At 500M × 16 features-per-card-per-day-on-average = 8 billion combinations, this is dead on arrival. The reframing is: sort both tables by (card_id, ts ASC), then walk them together with two pointers. For each label row, the matching feature row is "the latest feature row I've already passed for this card". The walk is O(N+M).

That walk is exactly an AS OF JOIN. SQL engines spell it differently — Trino's JOIN ... ON ... ASOF (since version 401), Spark 3.5's _jvm.org.apache.spark.sql.functions.last_value window trick, Snowflake's MATCH_RECOGNIZE, ClickHouse's ASOF JOIN — but the mechanic is identical. Why this is cheaper than a window function: a LAST_VALUE OVER (PARTITION BY card_id ORDER BY ts) ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW requires the engine to produce a result row for every input row of the union of L and F. The AS OF JOIN produces only one result row per L row. At 8 billion vs. 500 million, that is a 16× I/O reduction.

# AS OF JOIN, two-pointer walk, no SQL engine.
# Inputs: pre-sorted by (card_id, ts).

def as_of_join(labels, features):
    """
    labels: list of (card_id, label_ts, fraud_label) sorted by (card_id, label_ts)
    features: list of (card_id, feature_ts, feature_value) sorted by (card_id, feature_ts)
    Returns: list of (card_id, label_ts, fraud_label, feature_value_at_label_ts)
    """
    out = []
    f_idx = 0
    last_value_per_card = {}    # tracks running "as-of" value per card

    for card, label_ts, label in labels:
        # Advance f_idx through every feature row whose ts <= label_ts and
        # belongs to a card we have already seen (sorted-merge invariant).
        while f_idx < len(features):
            f_card, f_ts, f_val = features[f_idx]
            if (f_card, f_ts) > (card, label_ts):
                break
            last_value_per_card[f_card] = f_val
            f_idx += 1
        # The current as-of value for this card.
        out.append((card, label_ts, label,
                    last_value_per_card.get(card)))
    return out

# Tiny realistic example for card_id 7 and 9
labels = [
    (7, 100, 'ok'), (7, 200, 'fraud'), (9, 150, 'ok'), (9, 220, 'ok'),
]
features = [
    (7,  50, 3), (7, 150, 5), (7, 180, 12),
    (9,  10, 1), (9, 200, 4),
]

for row in as_of_join(labels, features):
    print(row)
# Output
(7, 100, 'ok', 3)
(7, 200, 'fraud', 12)
(9, 150, 'ok', 1)
(9, 220, 'ok', 4)

Walk through three of those rows. Label (7, 100, 'ok'): the pointer advances through feature (7, 50, 3); the next feature row is (7, 150, 5) whose ts=150 > 100, so it stops. The as-of value is 3. Label (7, 200, 'fraud'): the pointer advances through (7, 150, 5) and (7, 180, 12); the next is (9, 10, 1) which is for a different card, so it stops. The as-of value is 12. Label (9, 150, 'ok'): the pointer advances through (9, 10, 1); the next feature (9, 200, 4) has ts=200 > 150, stops. The as-of value is 1.

That is the kernel. Spark's distributed implementation runs the same walk inside each (card_id mod 64) bucket on each executor. Every bucket is independent — there is no shuffle between buckets — so the join scales linearly with the number of executors. Why bucketing matters here, not just partitioning: bucketing on card_id guarantees that all rows for a given card live in the same file on the same executor. A date-partitioned warehouse table would scatter card 7's events across 180 partitions, forcing a full shuffle on every training query. The shuffle is the single biggest cost item in pre-feature-store ML training pipelines.

What the table actually looks like on disk

The offline store is a Hudi or Iceberg table — both work, with different consistency knobs. Here is a concrete schema for one feature group (Razorpay's card_24h_features):

# Iceberg DDL for an offline feature group
create_table_sql = """
CREATE TABLE razorpay.fraud.card_24h_features (
    card_id BIGINT NOT NULL,
    event_ts TIMESTAMP(6) NOT NULL,
    valid_from TIMESTAMP(6) NOT NULL,
    valid_to TIMESTAMP(6),
    txn_count_24h INT NOT NULL,
    failed_txn_rate_24h DOUBLE,
    distinct_merchants_24h SMALLINT,
    geo_entropy_24h DOUBLE
)
USING iceberg
PARTITIONED BY (bucket(64, card_id))
TBLPROPERTIES (
    'write.distribution-mode' = 'hash',
    'write.parquet.compression-codec' = 'zstd',
    'sort-order' = 'card_id ASC, event_ts ASC',
    'format-version' = '2'
)
"""

# What a single row looks like (compact print)
row = {
    "card_id": 7,
    "event_ts": "2026-04-25 14:23:00.450",
    "valid_from": "2026-04-25 14:23:00.450",
    "valid_to":  "2026-04-25 14:23:47.890",   # closed when the next event arrived
    "txn_count_24h": 41,
    "failed_txn_rate_24h": 0.073,
    "distinct_merchants_24h": 12,
    "geo_entropy_24h": 0.41,
}

# AS OF JOIN against this table at training time, in Trino syntax
as_of_query = """
SELECT  l.card_id, l.label_ts, l.fraud_label,
        f.txn_count_24h, f.failed_txn_rate_24h,
        f.distinct_merchants_24h, f.geo_entropy_24h
FROM    razorpay.fraud.training_labels l
JOIN    razorpay.fraud.card_24h_features f
ASOF    ON l.card_id = f.card_id AND l.label_ts >= f.event_ts
"""
print(create_table_sql)
print("---")
print(row)
print("---")
print(as_of_query)

Three things to notice. PARTITIONED BY (bucket(64, card_id)) — Iceberg's hidden partitioning bucket function. Not PARTITIONED BY event_date. The bucket function makes every query that joins on card_id skip 63/64ths of the data without the user writing any partition predicate. sort-order = 'card_id ASC, event_ts ASC' — within the bucket, rows are sorted, which is what makes the AS OF walk work without an extra sort step. valid_to is nullable — the most recent row for each card has an open right endpoint, meaning "this is the current value". When a new event arrives, the old open row is closed (valid_to = new_ts) and a new open row is appended.

This is exactly the BiTemporal pattern from the database literature (Snodgrass 1995). Hudi calls it "merge-on-read with time travel"; Iceberg calls it "snapshot evolution with sort orders". The two implementations differ on update path (Hudi merges in place; Iceberg writes a new snapshot), but the read shape — bucketed, sorted, interval rows — is the same.

AS OF JOIN as a two-pointer walkA timeline diagram showing a labels stream and a features stream both sorted by timestamp. Two pointers advance together; for each label, the matching feature is the most-recent feature row already passed. AS OF JOIN: sorted-merge with two pointers labels (card 7) 100 200 features (card 7) 50: v=3 150: v=5 180: v=12 label@100 → v=3 label@200 → v=12 Pointer never moves backward. Total cost: O(N + M). No row of L scans more than its share of F. Bucketing on card_id keeps each walk inside one file on one executor.
The AS OF semantics is "find the most recent feature row at or before the label timestamp". The sorted-merge walk does it once. Date-partitioned warehouse tables cannot, because the partition key is wrong.

What goes wrong at billions of rows

The previous sections were tidy because the example was small. At 124 billion rows, the engineering reality has corners.

Skewed entities. A handful of merchant card_ids transact 1000× more than the median. The bucket containing those merchants becomes 100 GB while the median bucket is 100 MB — a classic data-skew problem. The fix is salting the hash on the hot keys: instead of bucket(64, card_id), use bucket(64, card_id || mod(rand(), salt_factor)) for known-hot keys, then de-salt during the join. Spark's salt_factor=8 for the top 100 hottest cards typically flattens the bucket-size variance below 2×.

The valid_to rewrite problem. Every new feature event closes the previous row's valid_to. On Iceberg, that is a row update, which means rewriting the file. On Hudi merge-on-read, it is a delta log entry, which is cheaper to write but requires merge-on-read at query time. Why this trade-off matters for offline stores: the read pattern is "scan the whole bucket once per training run". Iceberg's copy-on-write writes more on update but reads cheaply, which fits the offline-store access pattern (rare update, frequent scan). Hudi's merge-on-read flips it — fits CDC pipelines where updates are constant. Most production offline stores use Iceberg COW for this reason.

Streaming late events. A late event arriving 4 minutes after the watermark advances should not corrupt the historical training set, but it should be reflected in the next scheduled training run. The fix is to write the historical rows with event_ts as the partition timestamp and processing_ts as a secondary column; backfill jobs read by event_ts and ignore processing_ts, while audit jobs use processing_ts to detect lateness. Hudi's "incremental query" mode supports both views natively; Iceberg requires the user to write the audit join explicitly.

Snapshot drift between the offline store and the warehouse. If the offline store is materialised from the warehouse (typical pattern), and the warehouse rebuilds nightly, the offline store can lag by 24 hours. Training queries that JOIN the offline store with non-feature-store tables (label tables, often) hit cross-store inconsistency. The Razorpay fix: pin the training query to a specific Iceberg snapshot — FOR VERSION AS OF 8472648242 — so all referenced tables read from the same logical timestamp.

What this looks like end-to-end at Razorpay

Aditi's offline store at Razorpay holds 14 feature groups for the fraud-ML team. The largest is card_24h_features with 12 features, partitioned by bucket(64, card_id), sorted by (card_id, event_ts), written in Parquet with zstd-3 compression. A scheduled Spark job reads from the Kafka topic card.txn.events.v3, computes the 12 features in a 24-hour rolling window, and appends to the Iceberg table at a 5-minute cadence. The job runs on 80 cores, writes ~6 GB per 5-minute interval, and emits a Hive Metastore commit visible to Trino, Spark, and Dremio.

A training-set rebuild looks like this. Aditi runs feast get-historical-features --entity-df labels.parquet --features card_24h_features:txn_count_24h,card_24h_features:geo_entropy_24h --output train.parquet. Feast translates this to a Spark job that does an AS OF JOIN of labels against card_24h_features on (card_id, event_ts <= label_ts). The Spark job runs the two-pointer walk inside each of the 64 buckets in parallel, joins to 84 features in 12 feature groups, and writes 8.7 TB of training Parquet to S3. The job finishes in 47 minutes, costs roughly ₹4,200 on AWS spot instances, and the resulting training set is bit-for-bit identical regardless of when it is rebuilt — because every feature value is point-in-time-anchored by event_ts, and the underlying Iceberg snapshot is pinned by the platform.

The first time Aditi ran this against the new offline store after the migration from "warehouse + manual SQL", her fraud model's online precision jumped from 0.71 to 0.83 with no change to the model code. The model had been there all along. The training data, for the first time, was honest about what serving could see. The shape of the storage layer was the entire fix.

Common confusions

Going deeper

Snodgrass's bitemporal model and why feature stores rediscovered it

The 1995 book by Richard Snodgrass — Developing Time-Oriented Database Applications in SQL — introduced bitemporal tables: tables with both valid time (when the fact was true in the world) and transaction time (when the database knew about the fact). Every offline feature store row is a bitemporal record under a different name: event_ts is valid time, processing_ts is transaction time, the (valid_from, valid_to) pair tracks the valid-time interval. Snodgrass derived AS OF JOIN under the name temporal coalescing in chapter 7 and proved its O(N+M) bound. Feature stores reinvented this in 2018; the SQL standard added explicit support in SQL:2011's PERIOD FOR clause, which Trino and ClickHouse implement under the AS OF spelling.

The cold-start problem: how do you backfill the first six months?

When a feature group is created, the offline store is empty. Filling six months of historical data requires a one-time Spark job that scans the source Kafka topic from offset 0 (or replays from the warehouse archive) and computes the feature value at every event timestamp. For a 12-feature group over six months of UPI traffic at Razorpay, this backfill is roughly 200 TB of Kafka reads and runs for ~6 hours on a 400-core cluster. The trick is to do it once, write the result to the bucketed Iceberg table, then switch the live pipeline to incremental updates. Hopsworks's documentation on backfill jobs is the practitioner reference; Tecton automates the same thing under the hood.

Why feature stores avoid the analyst's "denormalised wide table"

A natural temptation is to denormalise — produce one wide table with (label_ts, card_id, fraud_label, all 84 features) and call it the training set. This works for one model, breaks at the second. Different models use different feature subsets, different time horizons, different label schemas. Materialising 84 features against every label every time wastes 90 percent of the work. The feature-group abstraction — each group is a small bucketed Iceberg table; training queries pick which groups to AS-OF-join — keeps the I/O proportional to the features actually used. Razorpay's fraud model uses 12 of the 84 features in one group; a separate underwriting model uses 31 of 84 in three groups; both share the same offline store.

Pinning to an Iceberg snapshot for reproducibility

Reproducibility — running the same training job in March and getting the same training set — requires snapshot pinning. Iceberg's FOR VERSION AS OF syntax pins the read to a specific snapshot ID; combined with the AS OF JOIN syntax, it gives full point-in-time-correctness across both the feature data and the warehouse joins. Without snapshot pinning, a training job re-run a week later sees a slightly different label table (because of late-arriving labels) and produces a slightly different model. Tecton's training-set DAG pins all referenced snapshots automatically; Feast 0.40+ supports the same via the snapshot_id parameter on get_historical_features.

What watermark-aware offline stores look like

The naive offline store treats every event as "in time". The reality is that 0.04 percent of events arrive after the watermark, and a feature value computed without them is silently wrong by exactly the late events' contribution. The Hudi pattern is to write two timelines — the in-order timeline (closes the watermark) and the late-arrival timeline — and let the user choose. The Iceberg pattern is to rewrite the affected snapshot when a late event lands and rely on the FOR VERSION AS OF pin to keep training sets stable. Both work; both have failure modes (Hudi: complexity at read time; Iceberg: rewrite cost at write time). The chapter on streaming features (next in this build) goes into the watermark-vs-rewrite trade-off in detail.

Where this leads next

References