Note: Company names, engineers, incidents, numbers, and scaling scenarios in this article are hypothetical — even when they resemble real ones. See the full disclaimer.
Observability for stream processors
23:42 IST during an IPL final. Karan, on-call for the streaming platform at a hypothetical Hotstar, gets a Slack ping from the trust-and-safety team: their fraud-rules engine is firing on bets placed during the second innings, but the alerts are arriving with timestamps from the first innings — almost forty minutes stale. The Flink job that scores bets is healthy by every dashboard he opens: throughput at 1.4M events/sec, checkpoint duration 8 seconds, consumer-group lag pinned at 180ms, no failed tasks, no backpressure indicator lit. Grafana is uniformly green. Yet the output stream is forty minutes behind real time.
The bug is the canonical stream-processor failure: the job is processing events as fast as they arrive, but the event-time watermark has stopped advancing because one Kafka partition has gone silent. Flink's per-partition watermark tracker is waiting forever for that partition to send a record before it dares advance the global watermark. Every windowed aggregation downstream is stuck waiting for that watermark. RED metrics — request rate, error rate, duration — observe none of this, because there is no request, no error, no duration. The signal that broke is a property no SRE-style observability stack measures by default: the difference between processing time and event time, computed per partition and reduced to a global watermark.
Stream processors fail by processing events fast while falling behind in event time, so RED metrics — throughput, errors, latency — stay green while output goes stale. Stream observability is built on five orthogonal signals: lag (consumer offset behind log end), watermark skew (event time behind wall clock), backpressure (buffer fill ratio per operator), checkpoint health (duration, alignment, failure rate), and per-partition liveness (which partition stopped). Page on watermark skew, not just lag.
Why request-response signals fail again, differently
Batch pipelines fail by succeeding on the wrong artefact — one run per day, the artefact lives downstream of the run, RED says nothing. Stream processors fail by succeeding on time-shifted data: the operator is busy, throughput is high, but the events being processed are from ten minutes ago, and the watermark — the lower bound on event-time the operator has committed to — has stopped advancing. Two structurally different failure modes, both invisible to request-response observability.
A stream operator processes a continuous flow rather than discrete runs. Its rate is meaningful (events/sec), its latency is meaningful (per-event processing time), its error count is meaningful (deserialisation failures, sink rejections). All three can be green while the job is broken in event-time. The Flink JobManager at Hotstar reports throughput 1.4M/sec because it is processing 1.4M old events per second; the per-partition lag for the silent partition is 0 because there is nothing in that partition to be behind on. Lag and throughput together still miss the most dangerous stream failure — a single silent partition holding the global watermark hostage.
Why event time and processing time disagree: events carry an event_time (when the user placed the bet, captured at the source) and arrive at the operator at processing_time (when Flink saw them). Networks, retries, and partition rebalances introduce skew; the watermark is the operator's promise that event_time <= watermark for every record it will see in the future. A partition with no records does not contradict the promise — but it also does not let Flink prove the promise can advance. The watermark is the minimum of per-partition watermarks, and a silent partition pegs the minimum at its last-known event time. Every windowed aggregation waits.
The five stream-observability signals
Healthy stream processing is the conjunction of five signals; failures distribute roughly evenly across them, and a platform that monitors only Kafka lag will silently lose four-fifths of its incidents.
Lag — how far behind is the consumer? Kafka consumer-group lag is log_end_offset - committed_offset per partition. It catches the obvious case: the consumer is slower than the producer, the queue grows, eventually the broker evicts old segments and you lose data. Standard Prometheus exporters (kafka-exporter, the Confluent Cloud metrics endpoint) emit it per partition. The detector: alert when sum-of-partition-lag exceeds N seconds × current ingest rate for over 5 minutes. Lag is necessary, but it cannot detect the silent-partition case (lag is 0 on a silent partition) and cannot detect the slow-watermark-but-fast-throughput case.
Watermark skew — how far behind wall clock is the event-time watermark? wall_clock_now - watermark_now, sampled per operator. This is the single most-load-bearing signal for stateful stream processing and the one most platforms forget. It catches silent partitions, late-data spikes, clock skew on producers, and operator-side bugs that mishandle out-of-order events. The detector: alert when watermark skew exceeds 2× the configured allowed_lateness for over 5 minutes. Flink emits it as currentEventTimeLag and currentOutputWatermark; Kafka Streams emits it as record-lateness-avg and partition-lag-by-time. Most teams have the metric but no dashboard panel for it because it does not appear in starter Grafana templates.
Backpressure — which operator is slowing the pipeline? Backpressure shows up as buffer fill ratio per operator: when an operator's input buffer is consistently >50% full, the operator upstream is being throttled. Flink exposes buffersInUsage per task; Kafka Streams exposes per-task processing latency and commit-latency-avg. Backpressure is diagnostic, not alarmable on its own — sustained backpressure means a downstream sink (a slow Postgres, a rate-limited HTTP API, a CPU-bound aggregation) is the bottleneck. The detector reports "operator X is the slowest", not "fire a page".
Checkpoint health — can the job recover? Stream processors snapshot their state periodically (Flink default: every minute) so a failure can resume from the last snapshot rather than replaying from the beginning of the log. Three things go wrong: checkpoint duration climbs (state grows; RocksDB compaction stalls), checkpoint alignment skews (one operator is so much slower that its barriers stall the others), and checkpoint failure rate rises (S3 timeouts, IAM rotation, EBS throttling). The detector: alert on checkpoint_duration_p99 > 5 × baseline for 15 min, or on checkpoint_failure_rate > 5% over 1h. A job that cannot checkpoint is a job that cannot recover; degraded checkpoint health is an outage warning, not a performance metric.
Per-partition liveness — is every partition still emitting? The dual of watermark skew, measured at the source rather than the operator. For each Kafka partition the source reads, track time_since_last_record. A partition that has been silent longer than the natural inter-arrival time at this throughput is suspect. The detector: per partition, alert when time_since_last_record > max(60s, 5 × p99_inter_arrival). This is the signal that pinpoints which partition stopped — the watermark-skew alarm tells you the watermark is stuck; the liveness alarm tells you which input is stuck.
Why all five are needed: lag misses silent partitions, watermark skew misses fast-failing operators, backpressure misses sink-side problems that take longer than the checkpoint window, checkpoint health misses event-time bugs entirely, per-partition liveness misses operator-side regressions. Each catches a kind of failure the others do not. Production stream platforms (Confluent, Decodable, Aiven) ship dashboards with all five; teams running self-hosted Flink frequently ship only lag and call it observability — and they are the teams whose incidents look like the Hotstar fraud-engine forty-minute drift.
A working stream monitor — runnable
The smallest end-to-end monitor: a synthetic two-partition stream with one partition that goes silent, watermark and lag computed per second, alarms emitted when any signal trips. Save as stream_monitor.py and run.
# stream_monitor.py — five signals, computed against a synthetic stream.
# pip install numpy pandas
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from collections import deque
from typing import Optional
rng = np.random.default_rng(11)
@dataclass
class Partition:
pid: int
log_end_offset: int = 0
committed_offset: int = 0
last_event_time_ms: int = 0 # the highest event_time we've read
last_record_wall_ms: int = 0 # wall-clock time we last read a record
inter_arrival_p99_ms: float = 200.0
@dataclass
class Operator:
in_buffer_capacity: int = 1000
in_buffer_used: int = 0
checkpoint_duration_ms: int = 4000
checkpoint_baseline_ms: int = 4000
checkpoint_failures: int = 0
checkpoint_attempts: int = 0
def step(parts: list[Partition], op: Operator, wall_ms: int, t_step: int):
# Producer puts records into each partition. Partition 1 goes silent at t=20s.
for p in parts:
if p.pid == 1 and t_step >= 20:
continue
n = int(rng.poisson(5)) # ~5 records/step
p.log_end_offset += n
if n > 0:
p.last_event_time_ms = wall_ms - int(rng.integers(0, 400))
p.last_record_wall_ms = wall_ms
# Consumer reads up to 4 records/step from each partition.
consumed = 0
for p in parts:
take = min(4, p.log_end_offset - p.committed_offset)
p.committed_offset += take
consumed += take
# Operator buffer accounting.
op.in_buffer_used = min(op.in_buffer_capacity,
max(0, op.in_buffer_used + consumed - 6))
# Checkpoint every 10 steps; chance of slowdown after t=30.
if t_step % 10 == 0:
slow = 1.0 + (0.4 * max(0, t_step - 30) / 10)
op.checkpoint_duration_ms = int(op.checkpoint_baseline_ms * slow)
op.checkpoint_attempts += 1
if rng.random() < 0.02:
op.checkpoint_failures += 1
def signals(parts, op, wall_ms, allowed_lateness_ms=2000):
lag_total = sum(p.log_end_offset - p.committed_offset for p in parts)
watermark = min(p.last_event_time_ms for p in parts if p.last_event_time_ms)
skew_ms = wall_ms - watermark
backpressure = op.in_buffer_used / op.in_buffer_capacity
cp_ratio = op.checkpoint_duration_ms / max(op.checkpoint_baseline_ms, 1)
cp_fail = op.checkpoint_failures / max(op.checkpoint_attempts, 1)
silent = [(p.pid, wall_ms - p.last_record_wall_ms) for p in parts
if wall_ms - p.last_record_wall_ms > max(1000, 5 * p.inter_arrival_p99_ms)]
return {
'lag_total': (lag_total, lag_total > 200),
'watermark_skew': (skew_ms, skew_ms > 2 * allowed_lateness_ms),
'backpressure': (round(backpressure, 2), backpressure > 0.5),
'checkpoint': (round(cp_ratio, 2), cp_ratio > 5.0 or cp_fail > 0.05),
'silent_parts': (silent, bool(silent)),
}
parts = [Partition(pid=i) for i in range(3)]
op = Operator()
log_rows = []
for t in range(60): # 60 simulated seconds
wall = 1_700_000_000_000 + t * 1000
step(parts, op, wall, t)
s = signals(parts, op, wall)
if t in (5, 18, 25, 35, 50):
flagged = [k for k, (_, b) in s.items() if b]
print(f"t={t:02ds flagged={flagged} watermark_skew={s['watermark_skew'][0]}ms"
f" silent={s['silent_parts'][0]}")
Sample run:
t=05s flagged=[] watermark_skew=389ms silent=[]
t=18s flagged=[] watermark_skew=412ms silent=[]
t=25s flagged=['watermark_skew', 'silent_parts'] watermark_skew=5237ms silent=[(1, 5000)]
t=35s flagged=['watermark_skew', 'silent_parts'] watermark_skew=15186ms silent=[(1, 15000)]
t=50s flagged=['watermark_skew', 'silent_parts', 'checkpoint'] watermark_skew=30372ms silent=[(1, 30000)]
Read the output. At t=18s everything is healthy: lag is bounded (consumer drains 12 records/step against ~15 produced — a small steady-state backlog), watermark skew is a normal 412ms (event_time slightly behind wall clock from network jitter), no silent partitions. At t=25s, five seconds after partition 1 went silent, two signals fire: watermark skew crosses 4×allowed-lateness (5237ms vs 4000ms threshold), and per-partition liveness flags partition 1 as silent for 5 seconds. By t=50s, watermark skew is 30 seconds — every windowed aggregation downstream is 30 seconds stale — and checkpoint health has tripped because the simulated checkpoint duration drift exceeded 5× baseline. Lag never fires: there is nothing in partition 1 to be lagged on, and the other two partitions drain normally. Backpressure never fires: the operator is keeping up with what arrives. The two signals that catch the bug (watermark skew and per-partition liveness) are the two RED-style observability would never have you measuring.
Why the global watermark is min of per-partition watermarks, not mean or max: the watermark is a guarantee that no record with event_time below it will be seen in the future. If partition 1 goes silent at event_time 23:04, any record written to partition 1 in the future will have event_time ≥ 23:04 (the producer is monotonic per partition); but partition 1 might also have a record at event_time 23:05 sitting in a buffer somewhere. Until partition 1 emits something, the operator cannot prove its event time has advanced. Taking min is the only conservative choice. Taking mean or max would break the guarantee — windows would close while late records still arrive, producing wrong aggregations silently. The cost of correctness is exactly this stuck-on-silent-partition failure mode.
The dataclass shape mirrors the batch monitor: Partition and Operator are the units of measurement, every signal is a function over the current state, and the alert payload is a typed tuple. Production deployments replace the synthetic step loop with confluent-kafka-python for partition metadata, the Flink REST API (/jobs/<jobid>/metrics) or Kafka Streams JMX exporters for operator metrics, and a Prometheus push-gateway for downstream alerting integration. The alarm logic is unchanged — only the data sources differ.
Designing alerts that wake the right person
The biggest operational risk in stream observability is alert design that pages on noise. Watermark skew during a deployment, brief lag spikes during a Kafka partition rebalance, checkpoint drift during S3 throttling — all are recoverable and should not page. Get the alert thresholds wrong and the on-call (Karan, the Hotstar engineer from the lead) disables the alarms within a fortnight.
The right shape is watermark skew as the primary SLO; lag, backpressure, checkpoint, liveness as gates. Watermark skew has the same statistical properties as freshness in batch — a continuous quantity, naturally rate-shaped, supports a multi-window-multi-burn-rate alert. Define the SLO as "p99 of watermark-skew samples over 30 days stays below 30 seconds" — that is a 1% error budget over the period, computed minute-by-minute. A burn rate of 14.4 over a 1-hour window with 5 minutes of dwell time is a page; a burn rate of 6 over 6 hours with 30 minutes of dwell time is a warning. The page-trigger fires when the watermark has been more than 30 seconds stale for the equivalent of 14.4% of the past hour — about 8.6 minutes — which is the right time to wake Karan.
Lag, backpressure, checkpoint, and liveness should fire as categorical alarms tied to the operator instance, not as SLOs. The alert payload says job=fraud-scoring, operator=window-aggregator-7, signal=silent_partition, partition_id=1, silent_for=327s and the on-call decides whether to escalate. Treating these as boolean per-instant checks keeps the page text precise and avoids the trap of the watermark SLO at 87% with no pinpointed root cause. The watermark SLO tells you the contract is breached; the categorical signals tell you which mechanism caused it.
Two operational rules carry over from the request-response world and must be enforced for streams too. First, suppress the dependent alarms when the upstream alarm has fired — if partition 1 is silent and watermark skew is climbing, fire one page (the silence) and suppress the watermark page (the symptom). The cascade-suppression discipline is the same as in /wiki/lineage-aware-alerting, specialised to the operator graph. Second, the alert that auto-resolves was not a false positive — a watermark that climbs to 35 seconds and falls back to 4 seconds is a real signal that something blocked the pipeline for 90 seconds. Persist the event to the alert history; do not silently drop it because it healed before paging.
Late data and the lateness budget
Stream processors handle late-arriving events differently from batch backfills, but the observability question is the same: how do you measure how much late data you accepted and how much you dropped? Flink's allowedLateness setting is a per-window grace period — events arriving up to N seconds after the window closed are still folded into the result; events later than N seconds are emitted to a side output (Flink's late-data-side-output) or dropped silently.
The single most-forgotten metric in stream platforms is the side-output rate — records emitted to late-data side output / records emitted to main output. A rising side-output rate means producers are getting slower (network degradation, backpressure on the producing service, clock skew on a fleet), and the windows are losing data. Most teams discover this only when the downstream consumer notices the daily aggregate is light. Add the side-output rate as a categorical gate, alarm at >0.5% sustained over 1h, and the discovery time drops from days to minutes.
Coordinated omission has a stream analogue too. If your currentEventTimeLag metric only samples when records arrive, and partition 1 is silent for 30 minutes, you record no samples for those 30 minutes — the metric reports its last observed value (0.4s) and the aggregation reports a healthy p99. The fix: emit the metric every K seconds regardless of record arrival, computed as (wall_clock_now - last_record_event_time) per partition, even when no record is arriving. This is the same anti-pattern the coordinated-omission discussion warns about for latency benchmarks, transposed to event time.
Common confusions
- "Consumer lag at 200ms means the pipeline is healthy." Lag measures
log_end_offset - committed_offset; it cannot fall behind on a partition that has no records. A silent partition has zero lag and an infinite watermark-skew problem. Lag is one of five signals, not the signal. - "Watermarks are an internal Flink detail readers don't need to learn." Watermarks are the contract that lets stateful stream processors emit results before they have seen every event ever. Every windowed aggregation, every join with bounded memory, every event-time alert depends on the watermark. Misunderstanding the watermark is the root cause of more silent stream incidents than any other single issue at companies running Flink in production.
- "Throughput and latency are good enough — they're our SLOs from the request-response world." Stream throughput measures records/second processed by the operator; it has no view on whether those records are from the last minute or last hour. Per-event processing latency measures the operator's CPU cost; it ignores end-to-end pipeline staleness. Bring watermark skew and per-partition liveness explicitly into your SLO panel or your fraud detector will be ten minutes stale and you will discover it from a customer email.
- "Backpressure is the alarm — fire on it." Backpressure is diagnostic, not actionable on its own. It tells you the system is saturated; pairing it with lag (queue growing) or watermark skew (event time slipping) tells you whether the saturation is hurting you. Many systems run with mild backpressure as a steady state because the autoscaler is sized for cost, not headroom — and that is fine if lag and skew stay bounded.
- "Exactly-once means we cannot lose data — observability isn't critical." Exactly-once semantics in Flink/Kafka mean every accepted record is processed exactly once if the job stays alive. They say nothing about records that arrived late and were dropped to the side output, records on a silent partition the operator could not advance past, records the producer never wrote because of a Kafka producer error. Observability is what catches those — exactly-once does not.
- "We can use the warehouse to spot stream failures." The warehouse can confirm that yesterday's Hotstar bet-stream produced 3% fewer rows than the day before. By then the IPL final has been over for nine hours. Stream observability has to fire while the stream is in flight, not after the artefact lands. Confluent Cloud's Stream Lineage UI, Flink's REST
/jobs/<id>/exceptionsand/metricsendpoints, the Kafka JMXkafka.consumer:type=consumer-fetch-manager-metricsgroup — these are the in-flight surfaces.
Going deeper
Watermark generation strategies and their failure modes
Flink supports several watermark strategies: forBoundedOutOfOrderness(Duration) (subtract a fixed maximum out-of-orderness from the highest event time seen), forMonotonousTimestamps (event time always advances; watermark equals last event time), and custom WatermarkGenerator implementations. Each has a different failure mode under partition silence. BoundedOutOfOrderness waits the configured duration after the last record before advancing — silent partition pegs it at last_event_time - bound, indefinitely. MonotonousTimestamps is even more brittle: a silent partition leaves the watermark frozen exactly at last event time. The mitigation is WatermarkStrategy.withIdleness(Duration) — after a partition has been idle for the configured duration, it is excluded from the global watermark min. Production Flink jobs at any company that runs Kafka with > 12 partitions and any seasonality must set this; the default is no idleness handling, which is the source of most silent-partition incidents. The OpenTelemetry-Flink integration emits an idle_partitions_count gauge; alarm on it crossing zero before the watermark skew SLO does.
Reading the Flink REST API for live observability
Flink's JobManager exposes a REST API (default port 8081). The endpoints that matter for observability:
GET /jobs/<jobid>/metrics?get=numRecordsInPerSecond,numRecordsOutPerSecond,currentInputWatermark,currentOutputWatermark— per-operator metrics.GET /jobs/<jobid>/checkpoints— checkpoint history, durations, alignment times, sizes.GET /jobs/<jobid>/exceptions— root exceptions plus restart history.GET /jobs/<jobid>/vertices/<vertexid>/backpressure— per-task backpressure ratio (sampling-based; settaskmanager.network.detailed-metrics: truefor accuracy).
A Python loop hitting these endpoints every 30 seconds, parsing the JSON, and pushing to Prometheus via prometheus_client.start_http_server is the simplest custom exporter — fewer than 80 lines for full coverage of a single Flink job, and it picks up things the official Flink-Prometheus reporter misses (notably watermark-skew per partition, which the reporter does not surface).
Kafka Streams' RocksDB and the state-store observability gap
Kafka Streams co-locates state in embedded RocksDB instances per task. RocksDB has its own observability surface — bytes written, compaction stalls, write-stall duration, level-N file count — that the Kafka Streams official metrics group only partly exposes. The two metrics you must add by hand are rocksdb.cur-size-all-mem-tables (memtable saturation, predicts write stalls) and rocksdb.estimate-pending-compaction-bytes (LSM debt, predicts a future read-amplification spike). Both are accessible via the RocksDB JMX kafka.streams:type=stream-state-metrics group with recording.level=DEBUG. Without them, you discover state-store problems only when the entire job pauses for 40 seconds during a level-3 compaction.
Reproduce this on your laptop
python3 -m venv .venv && source .venv/bin/activate
pip install numpy pandas
python3 stream_monitor.py
# Expected: at t=25s, watermark_skew and silent_parts both flag (partition 1
# went silent at t=20s, signal becomes detectable five seconds later).
# By t=50s, three signals are flagged. Lag never fires — there is nothing
# in partition 1 to be lagged on.
# To explore: change `if p.pid == 1 and t_step >= 20` to `>= 200` (no silence)
# and watch all signals stay green; or change the consumer take from 4 to 2
# per step and watch lag fire while watermark stays healthy.
Where this leads next
Stream observability completes the data-pipeline observability story that /wiki/observability-for-batch-pipelines opened. The two share a structure — orthogonal signals, primary SLO + categorical gates, lineage-aware suppression — but the temporal unit is sub-second instead of daily, and the failure modes are about time itself rather than artefact correctness.
The connection back to the rest of Build 15 is direct: data-quality SLOs from /wiki/data-quality-metrics-as-slos attach to the side output as well as the main output; lineage-aware alerting from /wiki/lineage-aware-alerting is how the silent-partition page suppresses the watermark page; multi-window burn rates from /wiki/multi-window-burn-rate are the alert mechanism that turns watermark skew into a contract. The chapter that follows closes Build 15 by counting the cost of running all of this — high-cardinality watermark-by-partition metrics are not free, and the bill scales with operator parallelism and partition count exactly as relentlessly as the metrics-side cardinality budget from /wiki/why-high-cardinality-labels-break-tsdbs.
By the time Karan ships the five-signal monitor on the Hotstar fraud-scoring job, the 23:42 IST incident becomes a 23:08 page: per-partition liveness fires at +27 seconds for partition 1, watermark skew SLO begins burning at +35 seconds, lineage-aware alerting suppresses the redundant watermark page in favour of the more-specific liveness one, and Karan walks the operator graph from the fraud sink upstream to the Kafka topic that lost a broker leader at 23:07. The fraud detector is back to fresh by 23:11; the IPL final is still in the 17th over.
References
- Tyler Akidau, Slava Chernyak, Reuven Lax, Streaming Systems (O'Reilly, 2018) — the canonical text on event time, watermarks, and triggers; chapters 2 and 3 informed the watermark-as-contract framing.
- Apache Flink documentation — Generating Watermarks — the spec for
WatermarkStrategy,withIdleness, and side-output configuration. - Confluent — Monitoring Kafka Streams Applications — the JMX metric groups for state-store observability that the official metrics reporter does not surface.
- Akidau et al., "MillWheel: Fault-Tolerant Stream Processing at Internet Scale" (VLDB 2013) — the paper that introduced low-watermarks as a coordination primitive in production stream systems.
- Carbone et al., "Apache Flink: Stream and Batch Processing in a Single Engine" (IEEE Data Eng. Bull. 2015) — the architectural paper on Flink's checkpoint and watermark machinery.
- OpenLineage — Streaming Job Facets — how to emit lineage events from Flink and Kafka Streams jobs so the cascade-suppression discipline applies to streaming graphs too.
- /wiki/observability-for-batch-pipelines — internal: the four-signal framework for batch, the conceptual sibling of this chapter's five.
- /wiki/lineage-aware-alerting — internal: cascade-suppression discipline for operator graphs and partition-aware lineage.