Streaming features and feature freshness

A Razorpay fraud-scoring model decides in 80 ms whether to approve a card transaction. One of its 47 features is failed_txn_count_last_60s — how many declines this card has produced in the past minute. If the feature store still says "0" because the materialisation job runs hourly, a fraudster who just burned through nine declines on stolen cards looks clean to the model. The decline number is correct in the warehouse, eventually. By the time it lands, the tenth transaction has already gone through and ₹47,000 is in a wallet in another country. The feature was right. It was just late.

Streaming features are features whose value must reflect events that happened seconds ago, not hours. They exist because for fraud, abuse, recommendation, and pricing, the freshness of the input is more valuable than its completeness. The hard part is not "computing it once" — the hard part is keeping the offline (training) and online (serving) versions consistent while one is updated by Spark every hour and the other is updated by Flink every second.

What "freshness" means as a number

Feature freshness has a precise definition: the wall-clock gap between when an event happened and when its effect is visible to the model at inference time. If a card swipe happens at 12:00:00.000 and the resulting feature value is readable at 12:00:00.450, freshness is 450 ms. If it is readable at 13:00:00, freshness is one hour.

Different features need different freshness, and the right design picks the cheapest pipeline that meets each feature's number. A merchant_30day_avg_transaction_value feature can be a day old — a daily Spark batch job is fine. A card_failed_count_last_60s feature needs to be 1-second fresh — only a streaming job can hit that. Building everything as streaming because "real-time" sounds modern is how teams burn ₹40 lakh/year on Flink clusters that compute features used once a week.

Feature freshness spectrum from batch hourly to streaming millisecondA horizontal axis showing freshness budgets from 24 hours to 100 milliseconds, with example feature types annotated above and the appropriate pipeline technology annotated below. Pick the cheapest pipeline that meets the freshness budget 24 hours 1 hour 5 minutes 30 seconds 100 ms avg_basket_value 90day_user_revenue merchant_today_volume user_today_clicks card_decline_5min page_views_5min card_decline_60s cart_abandon_30s Daily Spark cron + warehouse Hourly Spark Airflow scheduled Micro-batch (5min) Spark Structured Streaming Flink streaming Kafka + state store Cost rises ~10x at each step right; only fraud / abuse / pricing typically need the orange tier.
The freshness spectrum: each step right multiplies infrastructure cost roughly 10x. Razorpay runs only ~12 features in the orange tier (sub-30-second), even though their fraud model has 47 inputs total — the rest are cheaper to compute fresh-enough.

The dual-pipeline problem

The hard part of streaming features is not the streaming. Flink can compute failed_txn_count_last_60s over a Kafka topic in 200 ms. The hard part is that the same feature must also exist in the offline store, with the same values, for training.

Imagine you train a fraud model with the offline value failed_txn_count_last_60s = 3 for a card at 12:00:00. For training to be honest, the value 3 must be exactly what the streaming pipeline would have written into the online store at 12:00:00 if the model had been live then. If the offline pipeline (Spark, hourly, batch) and the online pipeline (Flink, sub-second, streaming) compute the same feature with even slightly different windowing rules, you get training-serving skew — the model trained on offline values whose meaning differs subtly from the online values it sees in production.

The naive fix — "let's just write two pipelines, one Spark for offline, one Flink for online" — is what every team starts with and what every team eventually rewrites. Two codebases drift. The Spark version uses BETWEEN now() - INTERVAL 60 SECOND AND now(). The Flink version uses a tumbling window with a 60-second size. These are different. Spark's range includes "right now"; Flink's tumbling window has discrete edges. A card declining at second 59.5 lands in different windows depending on which pipeline computed it.

Dual pipeline architecture for streaming featuresDiagram showing one event source (Kafka transactions topic) flowing into two materialisation pipelines: a Flink streaming job that writes to the online store (Redis) and a Spark batch job that writes to the offline store (S3 Iceberg), both governed by a single feature definition. One feature definition, two physical pipelines Feature definition SQL / Python DSL — single source compiles to ↓ Kafka: transactions card_id, status, ts Flink streaming 60s sliding window, 1s slide Spark hourly batch backfill — same window logic Online store: Redis ~5ms read, serving path Offline store: Iceberg point-in-time training join Source of truth Two engines, one logic Two stores, one value If the two engines compute window edges differently, training-serving skew is born here.
The dual pipeline. Both engines must reach the same value for the same `(card_id, timestamp)` query. A 100 ms difference in window-edge handling is enough to make a model's offline AUC of 0.91 collapse to 0.78 in production.

A small streaming feature pipeline you can run

The cleanest way to feel the dual pipeline problem is to write a tiny streaming feature engine. The script below reads transaction events from a synthetic Kafka-style stream, maintains a 60-second sliding window per card, and writes the rolling failed-count to an in-memory online store. It also writes a snapshot every 5 seconds to an offline store. Same window logic, two outputs.

# streaming_features.py — minimal sliding-window feature engine
import time, collections, json, threading
from datetime import datetime, timedelta

# In-memory stores standing in for Redis (online) and S3/Iceberg (offline)
ONLINE = {}                  # card_id -> {"failed_60s": int, "as_of": ts}
OFFLINE = []                 # list of (card_id, value, event_ts) tuples

# Per-card rolling deque of (event_ts, status) within last 60s
WINDOW = collections.defaultdict(collections.deque)
WINDOW_SEC = 60

def ingest(event):
    """Single event ingestion — called for every Kafka record."""
    card, status, ts = event["card_id"], event["status"], event["event_ts"]
    cutoff = ts - timedelta(seconds=WINDOW_SEC)
    dq = WINDOW[card]
    dq.append((ts, status))
    while dq and dq[0][0] < cutoff:        # evict events outside window
        dq.popleft()
    failed = sum(1 for _, s in dq if s == "FAILED")
    ONLINE[card] = {"failed_60s": failed, "as_of": ts.isoformat()}

def snapshot_offline():
    """Every 5s, write current online state to the offline log."""
    while True:
        time.sleep(5)
        ts = datetime.utcnow()
        for card, v in list(ONLINE.items()):
            OFFLINE.append((card, v["failed_60s"], ts.isoformat()))
        print(f"[{ts:%H:%M:%S}] snapshot: {len(ONLINE)} cards, "
              f"{len(OFFLINE)} total offline rows")

# Drive a synthetic event stream
def run():
    threading.Thread(target=snapshot_offline, daemon=True).start()
    cards = [f"C{n:03d}" for n in range(5)]
    t = datetime(2026, 4, 25, 12, 0, 0)
    for i in range(120):
        card = cards[i % 5]
        status = "FAILED" if i % 7 == 0 else "OK"
        ingest({"card_id": card, "status": status, "event_ts": t})
        if i % 10 == 0:
            print(f"  online[{card}] = {ONLINE[card]}")
        t += timedelta(seconds=1)
        time.sleep(0.05)

run()
# Sample output:
  online[C000] = {'failed_60s': 1, 'as_of': '2026-04-25T12:00:00'}
  online[C000] = {'failed_60s': 2, 'as_of': '2026-04-25T12:00:35'}
[12:00:05] snapshot: 5 cards, 5 total offline rows
  online[C000] = {'failed_60s': 2, 'as_of': '2026-04-25T12:01:10'}
[12:00:10] snapshot: 5 cards, 10 total offline rows
  online[C000] = {'failed_60s': 1, 'as_of': '2026-04-25T12:01:45'}   # eviction kicked in
[12:00:15] snapshot: 5 cards, 15 total offline rows

Walk through the mechanism. WINDOW[card].append((ts, status)) — every event extends the per-card deque. The deque is the state store that any production streaming engine (Flink, Kafka Streams, Spark Structured Streaming) hides behind a SQL WINDOW clause. while dq and dq[0][0] < cutoff: dq.popleft() — eviction. This is the most common bug in hand-rolled streaming features: forgetting to evict means the count grows monotonically and your "failed in last 60s" feature becomes "failed since the start of time". Why eviction order matters: deques are evicted left-side first because events are appended right-side and naturally sorted by ts. If your event stream is out-of-order (late events from a slow region), this assumption breaks and the deque can hold events older than the cutoff. Production engines use a sorted state with watermark-based eviction; this toy uses an append-only assumption. ONLINE[card] = {...} — the write to the online store happens on every event, not on a window close. This is what makes it a sliding window with 1-event slide, not a tumbling window — every new event yields a new feature value. snapshot_offline thread — runs in parallel, writing the online value to the offline log every 5 seconds. Why a periodic snapshot rather than writing every event to the offline store: the offline store is row-storage in S3/Iceberg. One write per event would create millions of tiny files (the small-files problem from earlier chapters). Snapshotting at 5-second cadence gives you 12 rows/minute/card instead of 60/minute/card, with negligible loss of training fidelity.

The toy is single-process Python; production is Flink with RocksDB state, but the shape is identical: per-key state, eviction on event time, dual sink.

How Flink and the feature store cooperate at production scale

In production, the streaming feature engine does not run inside the feature-store SDK; it runs as a Flink job that writes to the feature store's online-store API. The split looks like:

  1. Application emits an event to Kafka — transactions topic with (card_id, status, event_ts) schema.
  2. Flink job consumes the topic, partitioned by card_id. Each Flink subtask owns a hash range of cards.
  3. The job's per-key state holds the rolling 60-second deque, kept in RocksDB on each Flink TaskManager.
  4. On every event, Flink computes the new feature value and writes it via the feature store's online API: feature_store.online_write(entity_id=card_id, feature="failed_60s", value=2, as_of=ts).
  5. Separately, the same job emits a snapshot stream to a Kafka sink topic, which a downstream Iceberg writer consumes to build the offline store.

The reason it is one Flink job (not two) is that the windowing logic for both the online and offline outputs must come from the same compiled physical plan. Why "same plan, two sinks" beats "two plans, two sinks": when you re-deploy the feature definition with a 90-second window instead of 60-second, both sinks get the new logic at the same checkpoint boundary. With two separate jobs, the deploy is a coordination problem — you would have a window where the online store is on the new logic and the offline store is on the old one, and any model trained during that window has poisoned features.

Tecton calls this the "two-mode materialisation engine" (Building 8 of their architecture); Hopsworks calls it "shared materialisation"; Feast plus Kafka users call it "writing the dual sink yourself in PyFlink and praying". The shape is the same; the operational maturity is what you pay for.

The freshness SLA contract

Once you have a streaming feature, its freshness is no longer "we hope it is fresh" — it is a contract a service-level objective explicitly tracks. The SLO is typically: "the p99 wall-clock gap between event arrival in Kafka and feature visibility in the online store is under 500 ms".

Measuring this end-to-end requires two timestamps to travel with the event. The first, event_ts, is when the application generated the event (e.g., when the user tapped pay). The second, online_visible_ts, is when the online store accepted the write. The gap is the freshness; the histogram of gaps over a day gives you the p50, p95, p99 numbers that go on the SLO dashboard.

When the freshness SLO is breached — say p99 climbs from 400 ms to 8 seconds — the diagnostic work is well-understood. Either Kafka is backed up (consumer lag growing), Flink is checkpointing slowly (RocksDB compaction storm, or TaskManager memory pressure), or the online store is rejecting writes (Redis CPU saturation). Each of these has a different fix, and a freshness regression usually means you can name which one within five minutes by looking at four dashboards. The chapters on stream processing internals and consumer-group lag cover the diagnostic patterns in detail.

Common confusions

Going deeper

Watermarks and the late-event problem in feature pipelines

Streaming features inherit the watermark problem from event-time stream processing (covered in build 8): if events can arrive out of order, when do you "close" the 60-second window for a card? If you wait forever, freshness goes infinite. If you close at the wall-clock cutoff, late events get dropped and your feature value is wrong by 1-2%. Production feature pipelines use a watermark with a small allowed lateness (typically 5-10 seconds) — events arriving past the watermark are routed to a "late-data sink" that updates the offline store retroactively. The online store does not get updated; once a feature value has been served to a model, retroactive correction would cause training-serving skew of the worst kind. The convention "online is best-effort, offline is eventually-correct" is what every mature feature platform converges on.

Feature freshness vs prediction freshness

A subtle distinction: the freshness of an individual feature is not the same as the freshness of the prediction the model produces. If a model uses 47 features and one is 8 hours old, the prediction is at most 8 hours fresh — even if the other 46 are sub-second. This means the slowest feature in the pipeline sets the model's effective freshness ceiling. Razorpay's fraud team uses this to guide investment: the value of making one feature 100 ms fresher is zero unless every other feature the model uses is already at least that fresh. Audit the slowest feature first.

Hot-key skew in streaming feature state

In a Flink job partitioned by card_id, most cards have low event rates (under 10 events/day) but a few — high-volume merchant cards — have 1000+ events/sec. The TaskManager owning a hot card has 100x the state-store work of the others. Fix: pre-key the partition by (card_id, hash(event_id) % 16) so a single hot card spreads across 16 sub-keys, then aggregate the 16 sub-features at read time on the serving path. This trades a small read-side aggregation (negligible) for write-side parallelism (large win). Flipkart Big Billion Days runs every fraud feature this way; without it, a single popular merchant's traffic would saturate one TaskManager and stall checkpoints for the entire job.

When streaming features are the wrong tool: the LLM RAG case

Vector-embedding "features" used in LLM-RAG retrieval don't fit the streaming model — embeddings are recomputed when the underlying text changes (rare), not when events arrive. The right pipeline is change-data-capture from a content table into a vector index, not a streaming aggregator. Confusing the two — building a Flink job that "streams embeddings" — wastes a Flink cluster for a problem that wants a CDC consumer plus a batch indexer.

Where this leads next

References