Offline features: big tables, point-in-time correctness
A Razorpay data scientist named Aditi opens her laptop on a Tuesday morning and runs feast get-historical-features against six months of UPI transactions to produce a fraud-model training set. The query reads 124 billion historical rows, joins each one to its 84 features as those features stood at the row's own timestamp, and writes 8.7 TB of training data to S3. The job finishes in 47 minutes on a 200-core Spark cluster. Two years ago, the equivalent training-set rebuild took 14 hours, leaked label information, and produced a model that scored 0.94 offline and 0.71 online. The difference is not the cluster size. The difference is that the join is now a point-in-time join, the table layout is bucketed by (entity_id, event_timestamp), and the offline store knows the difference between "as of now" and "as of then". The rest of this chapter is what changed.
The offline store materialises features as a big bucketed table where every row is a (entity_id, feature_value, valid_from, valid_to) interval. Producing a training set means joining a label table to this big table with an AS OF JOIN — for each label row, find the single feature interval whose [valid_from, valid_to) contains the label's event timestamp. The whole subject is how to make that join cheap at billions of rows: bucketed storage on entity_id, sorted-merge against the labels, and snapshot isolation against the warehouse.
Why the offline store is its own thing
The previous chapter ended with a claim: the offline store cannot just be the warehouse. The warehouse is partitioned by ingest date, indexed for analyst queries, and tuned for "give me yesterday's GMV by category". The offline store has one job — answer this exact query, fast, at scale:
Given 124 billion
(entity_id, event_ts)label rows, return for each row the value of featureFas ofevent_ts, exactly once.
That is structurally a different access pattern. The warehouse stores facts; the offline store stores time-versioned snapshots of features keyed by entity. The two layouts diverge in three specific ways.
Bucketing by entity, not by date. A warehouse fact table is partitioned by event_date because analysts ask "what happened yesterday?" and the engine wants to skip 99 percent of the table. The offline store partitions on entity_id (hash bucket) and sorts within the partition by event_ts because the model-training query asks "what is feature F for this card_id, just before this timestamp?". Date partitioning would force every training query to read every partition; entity bucketing reads at most one partition per training row.
Time-interval rows, not snapshot rows. A warehouse fact says "card 7 had merchant_count=42 on 2026-04-25". The offline store says "card 7's merchant_count was 41 from 14:23:00 to 14:23:47, and 42 from 14:23:47 onward". The interval form is what makes AS OF JOIN possible — the join reduces to "find the interval containing my timestamp", which is a sorted-merge.
Snapshot isolation against late events. A warehouse rebuild on April 25 sees the world as the warehouse looked on April 25. The offline store rebuild has to honour that the training row was written on March 12 and must see the world as it looked on March 12 — even if a late event arrived on March 14 and corrected a value. The store either keeps both timelines (event-time table + processing-time table) or rebuilds the historical store every time a late event lands. Hudi's solution is the former; Iceberg's snapshot-time-travel makes the latter cheap.
The first instinct is to skip building this and let the warehouse do double duty. It works for one feature, two queries, six months. By the time the team has 80 features and weekly retrains, the cluster cost of running point-in-time joins against date-partitioned tables exceeds the cost of building a separate offline store within a quarter. Hopsworks's published numbers from 2023 show a 12× speed-up after switching to bucketed feature tables; Razorpay's internal numbers from 2025 show 18×. The shape change is not a refinement, it is the whole point.
The AS OF JOIN, derived from first principles
Forget feature stores for a moment. Forget Spark. You have two tables and you need to join them by entity_id plus "feature value as of label timestamp". Walk through what the join has to do, mechanically.
Inputs. A label table L with columns (card_id, label_ts, fraud_label) — say 500 million rows over six months. A feature-event table F with columns (card_id, feature_ts, feature_value) — say 8 billion rows, one per change of the feature value. (For an aggregate feature like card_txn_count_24h, F is the materialised stream of value changes; the platform produced it once and stores it as the feature group's history.)
Output. A joined table J with columns (card_id, label_ts, fraud_label, feature_value_at_label_ts). Each row of L produces exactly one row of J. The feature_value_at_label_ts is the value of feature_value from the most recent F row for that card_id whose feature_ts <= label_ts.
The naive nested-loop is O(N×M) — for each label row, scan all feature rows for that card. At 500M × 16 features-per-card-per-day-on-average = 8 billion combinations, this is dead on arrival. The reframing is: sort both tables by (card_id, ts ASC), then walk them together with two pointers. For each label row, the matching feature row is "the latest feature row I've already passed for this card". The walk is O(N+M).
That walk is exactly an AS OF JOIN. SQL engines spell it differently — Trino's JOIN ... ON ... ASOF (since version 401), Spark 3.5's _jvm.org.apache.spark.sql.functions.last_value window trick, Snowflake's MATCH_RECOGNIZE, ClickHouse's ASOF JOIN — but the mechanic is identical. Why this is cheaper than a window function: a LAST_VALUE OVER (PARTITION BY card_id ORDER BY ts) ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW requires the engine to produce a result row for every input row of the union of L and F. The AS OF JOIN produces only one result row per L row. At 8 billion vs. 500 million, that is a 16× I/O reduction.
# AS OF JOIN, two-pointer walk, no SQL engine.
# Inputs: pre-sorted by (card_id, ts).
def as_of_join(labels, features):
"""
labels: list of (card_id, label_ts, fraud_label) sorted by (card_id, label_ts)
features: list of (card_id, feature_ts, feature_value) sorted by (card_id, feature_ts)
Returns: list of (card_id, label_ts, fraud_label, feature_value_at_label_ts)
"""
out = []
f_idx = 0
last_value_per_card = {} # tracks running "as-of" value per card
for card, label_ts, label in labels:
# Advance f_idx through every feature row whose ts <= label_ts and
# belongs to a card we have already seen (sorted-merge invariant).
while f_idx < len(features):
f_card, f_ts, f_val = features[f_idx]
if (f_card, f_ts) > (card, label_ts):
break
last_value_per_card[f_card] = f_val
f_idx += 1
# The current as-of value for this card.
out.append((card, label_ts, label,
last_value_per_card.get(card)))
return out
# Tiny realistic example for card_id 7 and 9
labels = [
(7, 100, 'ok'), (7, 200, 'fraud'), (9, 150, 'ok'), (9, 220, 'ok'),
]
features = [
(7, 50, 3), (7, 150, 5), (7, 180, 12),
(9, 10, 1), (9, 200, 4),
]
for row in as_of_join(labels, features):
print(row)
# Output
(7, 100, 'ok', 3)
(7, 200, 'fraud', 12)
(9, 150, 'ok', 1)
(9, 220, 'ok', 4)
Walk through three of those rows. Label (7, 100, 'ok'): the pointer advances through feature (7, 50, 3); the next feature row is (7, 150, 5) whose ts=150 > 100, so it stops. The as-of value is 3. Label (7, 200, 'fraud'): the pointer advances through (7, 150, 5) and (7, 180, 12); the next is (9, 10, 1) which is for a different card, so it stops. The as-of value is 12. Label (9, 150, 'ok'): the pointer advances through (9, 10, 1); the next feature (9, 200, 4) has ts=200 > 150, stops. The as-of value is 1.
That is the kernel. Spark's distributed implementation runs the same walk inside each (card_id mod 64) bucket on each executor. Every bucket is independent — there is no shuffle between buckets — so the join scales linearly with the number of executors. Why bucketing matters here, not just partitioning: bucketing on card_id guarantees that all rows for a given card live in the same file on the same executor. A date-partitioned warehouse table would scatter card 7's events across 180 partitions, forcing a full shuffle on every training query. The shuffle is the single biggest cost item in pre-feature-store ML training pipelines.
What the table actually looks like on disk
The offline store is a Hudi or Iceberg table — both work, with different consistency knobs. Here is a concrete schema for one feature group (Razorpay's card_24h_features):
# Iceberg DDL for an offline feature group
create_table_sql = """
CREATE TABLE razorpay.fraud.card_24h_features (
card_id BIGINT NOT NULL,
event_ts TIMESTAMP(6) NOT NULL,
valid_from TIMESTAMP(6) NOT NULL,
valid_to TIMESTAMP(6),
txn_count_24h INT NOT NULL,
failed_txn_rate_24h DOUBLE,
distinct_merchants_24h SMALLINT,
geo_entropy_24h DOUBLE
)
USING iceberg
PARTITIONED BY (bucket(64, card_id))
TBLPROPERTIES (
'write.distribution-mode' = 'hash',
'write.parquet.compression-codec' = 'zstd',
'sort-order' = 'card_id ASC, event_ts ASC',
'format-version' = '2'
)
"""
# What a single row looks like (compact print)
row = {
"card_id": 7,
"event_ts": "2026-04-25 14:23:00.450",
"valid_from": "2026-04-25 14:23:00.450",
"valid_to": "2026-04-25 14:23:47.890", # closed when the next event arrived
"txn_count_24h": 41,
"failed_txn_rate_24h": 0.073,
"distinct_merchants_24h": 12,
"geo_entropy_24h": 0.41,
}
# AS OF JOIN against this table at training time, in Trino syntax
as_of_query = """
SELECT l.card_id, l.label_ts, l.fraud_label,
f.txn_count_24h, f.failed_txn_rate_24h,
f.distinct_merchants_24h, f.geo_entropy_24h
FROM razorpay.fraud.training_labels l
JOIN razorpay.fraud.card_24h_features f
ASOF ON l.card_id = f.card_id AND l.label_ts >= f.event_ts
"""
print(create_table_sql)
print("---")
print(row)
print("---")
print(as_of_query)
Three things to notice. PARTITIONED BY (bucket(64, card_id)) — Iceberg's hidden partitioning bucket function. Not PARTITIONED BY event_date. The bucket function makes every query that joins on card_id skip 63/64ths of the data without the user writing any partition predicate. sort-order = 'card_id ASC, event_ts ASC' — within the bucket, rows are sorted, which is what makes the AS OF walk work without an extra sort step. valid_to is nullable — the most recent row for each card has an open right endpoint, meaning "this is the current value". When a new event arrives, the old open row is closed (valid_to = new_ts) and a new open row is appended.
This is exactly the BiTemporal pattern from the database literature (Snodgrass 1995). Hudi calls it "merge-on-read with time travel"; Iceberg calls it "snapshot evolution with sort orders". The two implementations differ on update path (Hudi merges in place; Iceberg writes a new snapshot), but the read shape — bucketed, sorted, interval rows — is the same.
What goes wrong at billions of rows
The previous sections were tidy because the example was small. At 124 billion rows, the engineering reality has corners.
Skewed entities. A handful of merchant card_ids transact 1000× more than the median. The bucket containing those merchants becomes 100 GB while the median bucket is 100 MB — a classic data-skew problem. The fix is salting the hash on the hot keys: instead of bucket(64, card_id), use bucket(64, card_id || mod(rand(), salt_factor)) for known-hot keys, then de-salt during the join. Spark's salt_factor=8 for the top 100 hottest cards typically flattens the bucket-size variance below 2×.
The valid_to rewrite problem. Every new feature event closes the previous row's valid_to. On Iceberg, that is a row update, which means rewriting the file. On Hudi merge-on-read, it is a delta log entry, which is cheaper to write but requires merge-on-read at query time. Why this trade-off matters for offline stores: the read pattern is "scan the whole bucket once per training run". Iceberg's copy-on-write writes more on update but reads cheaply, which fits the offline-store access pattern (rare update, frequent scan). Hudi's merge-on-read flips it — fits CDC pipelines where updates are constant. Most production offline stores use Iceberg COW for this reason.
Streaming late events. A late event arriving 4 minutes after the watermark advances should not corrupt the historical training set, but it should be reflected in the next scheduled training run. The fix is to write the historical rows with event_ts as the partition timestamp and processing_ts as a secondary column; backfill jobs read by event_ts and ignore processing_ts, while audit jobs use processing_ts to detect lateness. Hudi's "incremental query" mode supports both views natively; Iceberg requires the user to write the audit join explicitly.
Snapshot drift between the offline store and the warehouse. If the offline store is materialised from the warehouse (typical pattern), and the warehouse rebuilds nightly, the offline store can lag by 24 hours. Training queries that JOIN the offline store with non-feature-store tables (label tables, often) hit cross-store inconsistency. The Razorpay fix: pin the training query to a specific Iceberg snapshot — FOR VERSION AS OF 8472648242 — so all referenced tables read from the same logical timestamp.
What this looks like end-to-end at Razorpay
Aditi's offline store at Razorpay holds 14 feature groups for the fraud-ML team. The largest is card_24h_features with 12 features, partitioned by bucket(64, card_id), sorted by (card_id, event_ts), written in Parquet with zstd-3 compression. A scheduled Spark job reads from the Kafka topic card.txn.events.v3, computes the 12 features in a 24-hour rolling window, and appends to the Iceberg table at a 5-minute cadence. The job runs on 80 cores, writes ~6 GB per 5-minute interval, and emits a Hive Metastore commit visible to Trino, Spark, and Dremio.
A training-set rebuild looks like this. Aditi runs feast get-historical-features --entity-df labels.parquet --features card_24h_features:txn_count_24h,card_24h_features:geo_entropy_24h --output train.parquet. Feast translates this to a Spark job that does an AS OF JOIN of labels against card_24h_features on (card_id, event_ts <= label_ts). The Spark job runs the two-pointer walk inside each of the 64 buckets in parallel, joins to 84 features in 12 feature groups, and writes 8.7 TB of training Parquet to S3. The job finishes in 47 minutes, costs roughly ₹4,200 on AWS spot instances, and the resulting training set is bit-for-bit identical regardless of when it is rebuilt — because every feature value is point-in-time-anchored by event_ts, and the underlying Iceberg snapshot is pinned by the platform.
The first time Aditi ran this against the new offline store after the migration from "warehouse + manual SQL", her fraud model's online precision jumped from 0.71 to 0.83 with no change to the model code. The model had been there all along. The training data, for the first time, was honest about what serving could see. The shape of the storage layer was the entire fix.
Common confusions
- "AS OF JOIN is just a window function." A window function over the union of L and F produces N+M output rows; AS OF JOIN produces N. The window function also requires the engine to materialise the entire window for each row, which at 8B rows blows out memory. AS OF JOIN's two-pointer walk uses O(distinct entities) memory.
- "The offline store is a view on the warehouse." It can start that way for one feature group. By feature group 5, the warehouse layout — partitioned by date — is the wrong shape for AS OF JOIN, and queries cost 12× to 18× more than a properly bucketed offline store. The shape change is the difference, not the storage location.
- "Hudi and Iceberg are interchangeable for the offline store." They differ on update path. Hudi's merge-on-read is cheaper to write but more expensive to read; Iceberg's copy-on-write is the opposite. The offline store reads vastly more often than it writes (one scheduled rebuild per training run, hundreds of training queries against it), so most production deployments use Iceberg COW. Hudi MOR fits the CDC pipeline that feeds the offline store, not the offline store itself.
- "You can run the AS OF JOIN against the warehouse with a self-join and a window function." You can, and Razorpay did for a year. The query is correct; it costs ₹38,000 per training run on the warehouse versus ₹4,200 on a bucketed offline store, and at six retrains per week the gap pays back the offline-store engineering effort in two months.
- "Bucketing is just partitioning under another name." Hive partitioning creates one directory per partition value — fine for low-cardinality keys (date) but disastrous for high-cardinality (card_id with 50 million distinct values). Iceberg's bucket transform hashes the value into a fixed number of buckets, so the file count is bounded regardless of cardinality. This is why the offline store can use
bucket(64, card_id)but cannot usePARTITIONED BY card_id. - "Open right endpoints in the interval table waste storage." They do not. Each card has exactly one open-ended row at any moment — the current value. When the next event lands, that row is updated to close the interval and a new open row is appended. The total row count equals the total event count plus the number of distinct cards, not 2× the event count.
Going deeper
Snodgrass's bitemporal model and why feature stores rediscovered it
The 1995 book by Richard Snodgrass — Developing Time-Oriented Database Applications in SQL — introduced bitemporal tables: tables with both valid time (when the fact was true in the world) and transaction time (when the database knew about the fact). Every offline feature store row is a bitemporal record under a different name: event_ts is valid time, processing_ts is transaction time, the (valid_from, valid_to) pair tracks the valid-time interval. Snodgrass derived AS OF JOIN under the name temporal coalescing in chapter 7 and proved its O(N+M) bound. Feature stores reinvented this in 2018; the SQL standard added explicit support in SQL:2011's PERIOD FOR clause, which Trino and ClickHouse implement under the AS OF spelling.
The cold-start problem: how do you backfill the first six months?
When a feature group is created, the offline store is empty. Filling six months of historical data requires a one-time Spark job that scans the source Kafka topic from offset 0 (or replays from the warehouse archive) and computes the feature value at every event timestamp. For a 12-feature group over six months of UPI traffic at Razorpay, this backfill is roughly 200 TB of Kafka reads and runs for ~6 hours on a 400-core cluster. The trick is to do it once, write the result to the bucketed Iceberg table, then switch the live pipeline to incremental updates. Hopsworks's documentation on backfill jobs is the practitioner reference; Tecton automates the same thing under the hood.
Why feature stores avoid the analyst's "denormalised wide table"
A natural temptation is to denormalise — produce one wide table with (label_ts, card_id, fraud_label, all 84 features) and call it the training set. This works for one model, breaks at the second. Different models use different feature subsets, different time horizons, different label schemas. Materialising 84 features against every label every time wastes 90 percent of the work. The feature-group abstraction — each group is a small bucketed Iceberg table; training queries pick which groups to AS-OF-join — keeps the I/O proportional to the features actually used. Razorpay's fraud model uses 12 of the 84 features in one group; a separate underwriting model uses 31 of 84 in three groups; both share the same offline store.
Pinning to an Iceberg snapshot for reproducibility
Reproducibility — running the same training job in March and getting the same training set — requires snapshot pinning. Iceberg's FOR VERSION AS OF syntax pins the read to a specific snapshot ID; combined with the AS OF JOIN syntax, it gives full point-in-time-correctness across both the feature data and the warehouse joins. Without snapshot pinning, a training job re-run a week later sees a slightly different label table (because of late-arriving labels) and produces a slightly different model. Tecton's training-set DAG pins all referenced snapshots automatically; Feast 0.40+ supports the same via the snapshot_id parameter on get_historical_features.
What watermark-aware offline stores look like
The naive offline store treats every event as "in time". The reality is that 0.04 percent of events arrive after the watermark, and a feature value computed without them is silently wrong by exactly the late events' contribution. The Hudi pattern is to write two timelines — the in-order timeline (closes the watermark) and the late-arrival timeline — and let the user choose. The Iceberg pattern is to rewrite the affected snapshot when a late event lands and rely on the FOR VERSION AS OF pin to keep training sets stable. Both work; both have failure modes (Hudi: complexity at read time; Iceberg: rewrite cost at write time). The chapter on streaming features (next in this build) goes into the watermark-vs-rewrite trade-off in detail.
Where this leads next
- /wiki/online-features-key-value-lookups-at-p99 — the inference-path companion to this chapter. Same feature, different store.
- /wiki/feast-tecton-hopsworks-architectures-compared — three vendors, three takes on the AS OF JOIN problem.
- /wiki/streaming-features-and-feature-freshness — how late events show up in the offline store and what to do about them.
- /wiki/the-feature-store-as-a-materialized-view — the unifying mental model that ties this build back to Build 10.
References
- Snodgrass, "Developing Time-Oriented Database Applications in SQL" (Morgan Kaufmann, 1995) — chapters 7–10 are the canonical treatment of AS OF JOIN under the name temporal coalescing. Free PDF from the author.
- Hopsworks, "Time-Travel Queries for the Feature Store" (2022) — the practitioner reference for Hudi-based offline stores.
- Apache Iceberg, "Hidden Partitioning and Bucket Transforms" — the partitioning model that makes bucket-by-entity cheap.
- Trino docs, "ASOF JOIN" (since 401) — engine-level reference for the syntax used in production.
- Tecton, "Materialization Pipelines: Offline and Online" — the binding documentation for the proof-obligation pattern from chapter 112.
- Apache Hudi, "Merge-on-Read vs Copy-on-Write" — the reference for the update-path trade-off discussed above.
- Lakshman & Malik, "Cassandra: A Decentralized Structured Storage System" (2010) — bucketing-by-entity originated here and propagated to feature stores via the Uber/Cassandra pipeline.
- /wiki/training-serving-skew-the-fundamental-ml-problem — chapter 112; the proof obligation that this chapter delivers half of.