Apache Pinot and the Uber use case

Aditi opens the Uber driver app at 09:14 in Bengaluru. She wants to know "how many trips have I done in the last 7 days, and what was my acceptance rate." That tile renders in 80 milliseconds. Behind it sits a Pinot table with 14 billion trip events from the last 90 days; the query selects rows for one driver across one week and computes two aggregates. The same query against a generic OLAP warehouse would scan minutes; against ClickHouse's MergeTree it would land in roughly 60 ms; against Pinot's pre-aggregated star-tree segment it lands in 8 ms because the answer was already computed at ingest time and stored at the right node of a tree. Pinot's bet is different from ClickHouse's: not "make scans fast" but "make scans rare."

Pinot is two storage tiers — real-time consuming from Kafka, offline from batch — both serving the same query through a Broker that fans out to Servers holding immutable segments. Each segment is a columnar slice with a forward index, an inverted index, and (often) a star-tree pre-aggregation. The star-tree is what lets a multi-dimensional GROUP BY answer in single-digit milliseconds: the answers are pre-computed at ingest, indexed by dimension path, and the query becomes a tree traversal instead of a scan.

Why Uber wrote Pinot

In 2014 Uber's analytics stack was Postgres replicas plus a daily Hadoop dump into Vertica. Driver-facing analytics ("your week so far") required real-time freshness — a trip completed in the last hour had to be visible in the next page load. A daily batch pipeline gave 24-hour-stale answers. Postgres replicas could not handle the fan-out: 1 lakh active drivers all checking their own dashboards meant 1 lakh point queries per minute against a database designed for OLTP. Vertica was the right shape (columnar, MPP) but its ingestion model was bulk-load — landing a 5-minute batch into Vertica took 90 seconds and added an entire table-version, not the per-event freshness Uber needed.

ClickHouse existed at this point inside Yandex but had not yet been open-sourced (that happened in 2016). The team at Uber that became the Pinot team — Kishore Gopalakrishna and others, eventually moving the project to LinkedIn — built around three constraints that ClickHouse's design did not meet head-on: per-row freshness from Kafka without lossy buffering, dimension-heavy GROUP BY queries answered in single-digit milliseconds at QPS levels of 10,000+, and strict separation between real-time and offline storage so that a real-time outage does not corrupt the offline tier. Pinot's segment model, star-tree, and the Broker–Server–Controller topology all fall out of those three constraints.

Pinot cluster topology — Controller, Broker, Server, with real-time and offline tiersA topology diagram showing a Pinot cluster. At the top, a Controller node manages metadata and segment assignment via Helix and ZooKeeper. Below it, a Broker node receives queries and fans them out. Two groups of Servers sit below the Broker: a Real-time Server group consuming from Kafka into LLC (low-level consumer) segments, and an Offline Server group loaded with segments built by a batch job (Spark/Flink) and pushed to deep storage (S3 or HDFS). The Broker scatters the same query to both tiers and gathers results. Pinot cluster — query path crosses real-time and offline Controller Helix + ZooKeeper, segment metadata Broker scatter-gather, segment-prune Real-time Servers consume Kafka via LLC consuming in-memory sealed flush to deep storage at threshold Offline Servers batch-built segments from S3 spark job builds pushed + loaded cheaper hardware, larger segments scatter scatter deep storage (S3 / HDFS) — segments live here once flushed; both tiers can hydrate from it
The Broker fans the same query to real-time and offline servers; the offline tier holds older sealed segments on cheaper hardware, the real-time tier holds the live tail. Time-boundary metadata (held by the Controller) tells the Broker which tier owns which time range, so the same row never gets counted twice.

The Controller is metadata only — it does not serve queries. It runs Apache Helix on top of ZooKeeper to track segment-to-server assignment, table schemas, and ingestion progress. When a real-time segment seals (reaches the row threshold), the Controller orchestrates the flush to deep storage and the handoff to the offline tier. Why a Controller exists separately: the operational reality of running a few hundred servers is that segments are constantly moving — a server fails, segments rebalance; a new server joins, segments rebalance; a real-time segment seals, it gets renamed and the Broker's query plan needs to know. Centralising that in Helix makes the per-server logic stateless and the Broker's view always-current.

The segment — Pinot's unit of storage

A Pinot segment is the analogue of a ClickHouse part. It is a self-contained immutable columnar slice with three things layered on top:

  1. Forward index per column — the actual column data, dictionary-encoded by default. Reading the column means reading dictionary-id integers and looking up the dictionary at output time.
  2. Inverted index per indexed column — for each distinct value in the column, the list of row IDs containing that value. This is what makes WHERE driver_id = 8472334 answer in microseconds without scanning.
  3. Optional star-tree — a pre-aggregation tree over a chosen subset of dimensions (covered in the next section).

A segment is not a directory of files like ClickHouse's part — it is a single tar-style archive (.tar.gz on disk, v3 format internally) containing the column metadata, the forward indices, the inverted indices, and the star-tree if present. When a Server loads a segment, it memory-maps the file and the indices become directly addressable. Why memory-mapping over reading: a 1 GB segment loaded by read() would consume 1 GB of heap; mmap'd, the OS page cache decides what stays resident and the JVM heap stays small. Pinot Servers routinely run with 16 GB heap and 200 GB of mmap'd segment data per node — only the hot pages live in RAM, the cold pages are paged in on demand.

The segment naming convention <table>__<partition>__<sequence>__<creation_ts> carries the lineage. A real-time segment named trips__7__142__1714032000 means table trips, Kafka partition 7, sequence 142, sealed at timestamp 1714032000. After the segment seals and is pushed to deep storage, the offline tier loads it under the same name — the Broker uses the time boundary (held by the Controller) to know that this segment now belongs to "offline" and the next sequence belongs to "real-time".

The star-tree — pre-aggregation as an index

The star-tree is what makes Pinot answer dimension-heavy GROUP BYs in single-digit milliseconds. Consider Uber's "trips per city per day per service-tier" dashboard: dimensions are city_id × day × service_tier, metric is count(*) and sum(fare_paise). Without pre-aggregation, that query has to scan every row whose timestamp is in range, group by three columns, and aggregate. With a star-tree built on (city_id, service_tier, day), the answer for any prefix of those dimensions is materialised at a node in the tree — Bengaluru × UberGo × 2026-04-24 is one node, the count and sum are stored on it, and the query is a tree traversal of a few nodes.

Star-tree pre-aggregation for (city, service_tier, day)A tree diagram showing how a star-tree pre-aggregates over three dimensions. Root at top splits by city: Bengaluru, Mumbai, Delhi, plus a star * branch representing "all cities". Each city node splits by service_tier: UberGo, UberXL, Auto, plus *. Each tier node splits by day: 2026-04-22, 2026-04-23, 2026-04-24, plus *. Leaf nodes hold pre-computed metrics: count and sum_fare. The * branches let queries that omit a dimension still hit the tree at the right node without descending — a query for "all cities, UberGo, 2026-04-24" walks root → * → UberGo → 2026-04-24 and reads the leaf in three hops. Star-tree on (city_id, service_tier, day) — root, branches, * collapse root Bengaluru Mumbai Delhi Hyderabad * (all cities) UberGo UberXL * 2026-04-22 2026-04-23 2026-04-24 leaf node (pre-computed) count = 4,82,193 sum_fare_paise = 1,41,72,18,400 Query: all cities × UberGo × 2026-04-24 Path: root → * → UberGo → 2026-04-24 Cost: three pointer-chases, no scan
The * branches collapse a dimension — they store the aggregate over all values of that dimension. A query that omits `city_id` from its GROUP BY can use the * branch instead of summing across city branches at query time. The tree fan-out is bounded by `maxLeafRecords` (default 10,000); below that threshold a leaf holds the raw rows, above it the leaf becomes a sub-tree.

The star-tree is built at segment creation time. You declare it in the table config:

starTreeIndexConfigs:
  - dimensionsSplitOrder: [city_id, service_tier, day]
    skipStarNodeCreationForDimensions: []
    functionColumnPairs:
      - "COUNT__*"
      - "SUM__fare_paise"
    maxLeafRecords: 10000

This declares a tree split on those three dimensions in that order, with two materialised aggregates per node. A query that asks for SUM(fare_paise) grouped by any prefix or suffix of those dimensions can be answered from the tree. A query that groups by an undeclared dimension (say, payment_method) falls back to scanning the forward index — still fast because of the inverted index, but no longer single-digit milliseconds.

The space cost is real: a star-tree on three medium-cardinality dimensions can add 30–60% to segment size. The query-time win is 10–100×. Uber's rule of thumb: build a star-tree on the dimensions that 80% of dashboard queries group by; let the remaining 20% pay the scan cost.

A working Pinot table — schema, segment, query

Here is a complete loop: schema, table config, batch ingestion, and a query against a live Pinot 1.x cluster. Output is from a four-server cluster running on a c5.2xlarge each.

# pinotdb is the DB-API 2.0 driver; pinot also speaks REST for control plane.
from pinotdb import connect
import requests, json, time

CONTROLLER = "http://pinot-controller:9000"
BROKER     = "pinot-broker"

# 1. Define the schema — dimensions, metrics, time column.
schema = {
  "schemaName": "trips",
  "dimensionFieldSpecs": [
    {"name": "trip_id",      "dataType": "LONG"},
    {"name": "driver_id",    "dataType": "LONG"},
    {"name": "rider_id",     "dataType": "LONG"},
    {"name": "city_id",      "dataType": "INT"},
    {"name": "service_tier", "dataType": "STRING"},
    {"name": "status",       "dataType": "STRING"}
  ],
  "metricFieldSpecs": [
    {"name": "fare_paise",   "dataType": "LONG"},
    {"name": "distance_m",   "dataType": "INT"}
  ],
  "dateTimeFieldSpecs": [
    {"name": "ts", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH",
     "granularity": "1:MILLISECONDS"}
  ]
}
requests.post(f"{CONTROLLER}/schemas", json=schema).raise_for_status()

# 2. Define the OFFLINE table with star-tree and inverted indices.
table = {
  "tableName": "trips",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "ts", "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS", "retentionTimeValue": "90",
    "replication": "2", "schemaName": "trips"
  },
  "tableIndexConfig": {
    "invertedIndexColumns": ["driver_id", "rider_id", "city_id", "service_tier"],
    "starTreeIndexConfigs": [{
      "dimensionsSplitOrder": ["city_id", "service_tier"],
      "functionColumnPairs": ["COUNT__*", "SUM__fare_paise"],
      "maxLeafRecords": 10000
    }]
  },
  "tenants": {}, "metadata": {}
}
requests.post(f"{CONTROLLER}/tables", json=table).raise_for_status()

# 3. Query — driver-facing weekly summary, single driver.
conn = connect(host=BROKER, port=8099, scheme="http")
cur = conn.cursor()
t0 = time.time()
cur.execute("""
SELECT count(*) AS trips, sum(fare_paise)/100 AS rupees,
       sum(case when status='COMPLETED' then 1 else 0 end)*1.0/count(*) AS completion_rate
FROM trips
WHERE driver_id = 8472334
  AND ts >= 1713571200000  -- 7 days ago in ms
""")
print(cur.fetchall())
print(f"latency = {(time.time()-t0)*1000:.1f} ms")

# 4. Star-tree query — city × service_tier counts for yesterday.
t0 = time.time()
cur.execute("""
SELECT city_id, service_tier, count(*) AS trips
FROM trips
WHERE ts BETWEEN 1714003200000 AND 1714089600000
GROUP BY city_id, service_tier
ORDER BY trips DESC LIMIT 20
""")
for row in cur.fetchall(): print(row)
print(f"star-tree latency = {(time.time()-t0)*1000:.1f} ms")
[(847, 12943.5, 0.94)]
latency = 9.2 ms

(7, 'UberGo',  482193)   # Bengaluru UberGo
(1, 'UberGo',  391044)   # Mumbai UberGo
(2, 'UberGo',  287910)   # Delhi UberGo
(7, 'Auto',    241337)   # Bengaluru Auto
(1, 'UberXL',  198210)
(2, 'Auto',    142841)
...
star-tree latency = 6.7 ms

Walk the parts that matter:

Real-time ingestion and the LLC protocol

The real-time tier consumes from Kafka via the Low-Level Consumer (LLC) protocol. Each Pinot Server is assigned specific Kafka partitions; it consumes those partitions directly (bypassing the Kafka high-level consumer's group rebalance dance) and builds an in-memory consuming segment. When the segment hits a row threshold (default 1 million) or a time threshold (default 6 hours), it seals: the in-memory data is written to a v3 segment file, indices are built, the segment is uploaded to deep storage, and the Server that owns the next sequence starts a new consuming segment.

The LLC protocol's commit step is coordinated by the Controller. The first Server to finish committing wins; other replicas download from deep storage instead of building independently. Why this matters for consistency: if every replica built its own segment independently, the bytes might differ (timing of consumption, ordering within a millisecond). Centralising the commit to one winner and having the others download produces byte-identical replicas, which makes segment migration, integrity checks, and "is this segment corrupt?" alerts deterministic.

A real-time-only Pinot deployment is rare in practice. The pattern is hybrid: real-time holds the last 1–3 days, offline holds 30–90 days, both serve the same logical table. The offline tier is rebuilt nightly from the source-of-truth in the lakehouse — typically a Spark job that reads the previous day's data from Iceberg, builds segments offline, and pushes them to Pinot. This is the same pattern the lambda architecture prescribes, but with the speed and batch layers running the same query engine.

Common confusions

Going deeper

Segment partitioning and broker-level pruning

A Pinot table can be partitioned at segment-creation time by hashing a column. If you partition on driver_id % 64, every segment carries metadata stating "this segment contains only driver_ids in partition K." The Broker uses that metadata to prune segments before scattering — a query for driver_id = 8472334 only goes to the Servers holding partition 8472334 % 64, not all 200 Servers. This is how Uber gets sub-10ms latency at 50,000 QPS: the Broker's scatter is to ~3 Servers per query, not all 200. The cost is that ingestion has to be partition-aware (Kafka producer must hash on the same key), and that re-partitioning requires a full segment rebuild.

Mutable segments and the upsert table type

Pinot 0.8 added the UPSERT table type, which lets a real-time segment hold the latest row per primary key — newer rows replace older ones in-place. This breaks the "segments are immutable" rule but only for the consuming (in-memory) part of the segment; once sealed, the segment is immutable again. The use case is "current state of an order/trip/account" served from Pinot rather than Postgres. The cost is a per-key in-memory map across the consuming segment, which limits primary-key cardinality to whatever fits in heap. Uber uses upsert tables for the trip-state table — at any given time, ~3 lakh active trips are in-flight, the map fits in 200 MB, no problem. Razorpay tried it for 2 crore active merchants and ran out of heap; they fell back to the lakehouse.

Segment commit consistency and deep storage

The commit-winner protocol described above has a subtle correctness property: if the winning Server crashes after pushing to deep storage but before the Controller records the commit, the next round elects a new winner who downloads the same bytes. If two Servers both push to deep storage simultaneously (race in the Controller), the Controller's atomic ZooKeeper write decides which one's URL is canonical and the loser's bytes become orphan files (cleaned up by a background sweep). This is the practical realisation of the "exactly-once segment seal" property that Pinot needs to offer downstream consumers — a segment either exists fully or does not exist; no half-written segment is ever visible.

Comparing Pinot to ClickHouse on a fair workload

For Uber's "driver weekly summary" query — single driver, time-range filter, two aggregates — Pinot lands in 8–12 ms because the inverted index on driver_id finds rows directly. ClickHouse on the same data lands in 30–60 ms because the granule-level pruning still reads ~8K rows per matching granule and filters in-engine. For a "all cities aggregate" query without dimension filters, Pinot's star-tree answers in 5–8 ms; ClickHouse's columnar scan answers in 80–200 ms. For a flexible ad-hoc query that no one declared in advance — say, "trips by surge multiplier bucket" where surge_multiplier is not in the star-tree — ClickHouse wins, scanning the column at 1 GB/s; Pinot falls back to scanning the forward index at similar speed but without the columnar vectorisation tricks. The lesson is that the engine choice follows the workload's predictability, not its size.

Where this leads next

The story Pinot tells is that the hard problem of real-time analytics is not "make scans faster." It is "decide what answer to compute at ingest time, so that query time becomes a lookup." ClickHouse picks one extreme of that trade-off; Druid picks another; Pinot picks a third. The next chapter on Druid finishes the triangle.

References