Druid and its segment model

Karan runs growth at a Bengaluru ad-tech firm; at 09:30 his board opens the campaigns dashboard and starts slicing — by city, by creative, by device, by hour, by ad-network, by surface. Each click reissues a query with a new GROUP BY against the last 30 days of impression events: 9 billion rows. The dashboard renders every cut in under 200 ms. The store underneath is Druid. ClickHouse would scan more; Pinot would need every dimension declared in a star-tree in advance. Druid took a different bet in 2011 — bitmap-index every dimension, bucket data by time, roll up at ingest — and that bet is still the right one when the dimensions you slice on are not knowable until the user clicks.

Druid is a time-series OLAP store that splits data into immutable segments, each one a columnar slice for a specific time bucket. Every dimension column carries a bitmap index built at ingest, so any combination of WHERE filters becomes a bitmap AND in microseconds. Optional rollup collapses identical dimension tuples at ingest, often shrinking event volume 10–100x. The architecture (Broker, Historical, MiddleManager, Coordinator, Overlord) separates query, storage, ingest, and balance into four roles so that any one can be scaled independently.

What Druid was built to solve

In 2011 Metamarkets — an ad-analytics company that processed bid streams for the programmatic ad market — could not get its dashboards working on any existing OLAP store. The shape was: 1–2 billion rows per day arriving as a Kafka-like stream, 50+ dimension columns (city, exchange, advertiser, creative, browser, OS, surface, hour-of-day, and so on), users sliding through ad-hoc combinations of those dimensions in a Looker-style dashboard, with hard latency budgets of 100–500 ms and concurrency of a few hundred analysts at once. Hadoop + Hive answered in minutes. PostgreSQL replicas could not hold the volume. Vertica was the closest fit but its licence cost and bulk-load model made it untenable at billions-of-events-per-day. So Eric Tschetter, Fangjin Yang and Gian Merlino at Metamarkets wrote Druid, open-sourced it in late 2012, and donated it to the Apache Foundation in 2018.

The constraints that shaped Druid:

  1. Dimensions are unbounded and unknowable. A user might slice by creative_id × city × hour; tomorrow they slice by advertiser × browser × surface. You cannot pre-declare every path.
  2. Time is the dominant filter. Almost every query has WHERE __time BETWEEN x AND y. Bucketing by time at the storage level is free pruning.
  3. Reads dwarf writes by query count. A single ingest event might be read by ten thousand dashboard queries over the next month. Pay at ingest, save at query.
  4. Append-only. Events do not update; corrections come in as compensating events. This kills MVCC complexity and makes segments immutable.

These four constraints lead — almost mechanically — to the segment model.

Druid cluster architecture — five process typesAn architecture diagram showing the five Druid process types. At the top, a Broker receives queries and consults a metadata cache. To the side, a Coordinator manages segment placement on Historicals and an Overlord manages ingest task assignment on MiddleManagers. The Broker fans out to Historical nodes (holding sealed segments from deep storage) and MiddleManager peons (holding in-progress real-time segments). Deep storage (S3 or HDFS) sits at the bottom; segments live there permanently and are pulled into Historical nodes on demand. Metadata DB (Postgres or MySQL) holds segment manifests. Druid — Broker, Historical, MiddleManager, Coordinator, Overlord Broker scatter-gather, segment prune by time Coordinator segment placement, balance, drop Overlord ingest task assignment, locks Historical nodes load sealed segments from deep storage 2026-04-22 2026-04-23 2026-04-24 tiered: hot / cold / archive MiddleManagers + Peons consume Kafka/Kinesis, build live segments consuming in-memory handoff at handoff: sealed → deep storage → Historical Deep storage (S3 / HDFS) — segments live here permanently Historicals pull on assignment; segment file is the unit of replication, eviction, and re-assignment Metadata DB (Postgres/MySQL) — segment manifest, task state, rule history
Five process types, each with one job. The Broker is the only query-path process; Coordinator and Overlord are control plane. Historicals serve sealed data, MiddleManagers serve live data. The Broker stitches the result, hiding the seam from the caller.

The five-process layout looks heavy compared to ClickHouse (one process) or Pinot (three roles), but each process has independent scaling. Why split Coordinator and Overlord at all: they manage different state machines. The Coordinator decides "which Historicals hold which segments" — a slow loop running every minute that rebalances against load and tier rules. The Overlord decides "which MiddleManager peon runs which ingest task" — a fast loop running every second. Mixing them in one process would couple their backpressure and the dashboard would stutter every time a segment rebalanced.

The segment — anatomy of Druid's storage unit

A Druid segment is the data for one datasource (Druid's word for table) over one time chunk (typically 1 hour or 1 day) on one shard (partition within that time chunk). The naming convention is <datasource>_<chunk_start>_<chunk_end>_<version>_<shard> — for example, bid_events_2026-04-24T00:00:00.000Z_2026-04-25T00:00:00.000Z_2026-04-25T03:14:21Z_3. This name is a globally unique identifier and is the key in deep storage and the metadata DB.

Inside the segment file (a .zip historically, now a directory layout in v1+), four things sit side by side:

  1. __time column — the timestamp, stored as long milliseconds since epoch, sorted ascending. Druid's segment is always time-sorted.
  2. Dimension columns — string-typed columns, dictionary-encoded, with three indices each: a forward index (row → dictionary id), a reverse dictionary (id → string), and a bitmap index (one bitmap per distinct value, marking which rows contain that value).
  3. Metric columns — numeric columns (long, double, float), stored as compressed arrays of values, no dictionary.
  4. Segment metadata — the column list, the time range, the rollup granularity, the shard spec, and a bitmap-encoded null mask per column.

The bitmap index per dimension is the heart of Druid's query model. For every distinct value of every dimension column, Druid stores a Roaring Bitmap whose bit i is set if row i has that value. A query like WHERE country = 'IN' AND device = 'mobile' AND surface = 'app' becomes:

result_bitmap = bitmap[country='IN'] AND bitmap[device='mobile'] AND bitmap[surface='app']
matching_rows = result_bitmap.iterator()

Three bitmap ANDs in microseconds. Then for each matching row, walk the metric columns and aggregate. Why bitmaps and not B-trees: bitmaps support fast set algebra. AND, OR, NOT, AND NOT all run at SIMD speeds on Roaring's compressed run-length representation, and the cost is proportional to the size of the bitmap, not the number of rows in the segment. A 10-million-row segment with a bitmap covering 500 matching rows answers the filter in microseconds; B-tree probes would cost a logarithm-of-N per filter, paid per matching key.

Druid segment anatomy — time, dimensions, metrics, bitmap indicesA diagram showing the layout of a Druid segment file. At the top, the time column shows sorted milliseconds. Below it, three dimension columns (country, device, surface) each show a forward index of dictionary IDs, a reverse dictionary, and a bitmap index where each row of bits represents one distinct value with set bits showing which rows contain that value. To the right, two metric columns (impressions, revenue_paise) hold compressed numeric values. At the bottom, a bitmap-AND visualisation shows three bitmaps combining to produce the matching rows for a multi-dimension WHERE clause. One segment — bid_events__2026-04-24_shard-3 __time 1714003200000 1714003200120 1714003200340 1714003200512 1714003200800 ... country dict: 0=IN 1=US 2=AE fwd: 0,0,1,0,0,2,0,1... bitmap[IN]: 11011010... bitmap[US]: 00100001... device dict: 0=mobile 1=desktop fwd: 0,1,0,0,1,0,0,1... bitmap[mobile]: 10110110... bitmap[desktop]: 01001001... surface dict: 0=app 1=web 2=connected_tv fwd: 0,1,0,0,0,1,0,0... bitmap[app]: 10111010... bitmap[web]: 01000101... metrics no dictionary, compressed impressions: [1,1,1,1,1,...] revenue_paise: [12,8,15,...] aggregated at ingest if rollup=true WHERE country='IN' AND device='mobile' AND surface='app' bitmap[IN] : 1 1 0 1 1 0 1 0 ... bitmap[mobile]: 1 0 1 1 0 1 1 0 ... bitmap[app] : 1 0 1 1 1 0 1 0 ... AND ────► result: 1 0 0 1 0 0 1 0 three matching rows in microseconds
Each dimension carries a Roaring bitmap per distinct value. A multi-condition WHERE becomes a bitmap AND chain. The metric columns are read only for rows that survive the AND.

Segments are immutable once sealed. Updating a row means writing a new segment for that time chunk with a higher version number, and the Coordinator drops the older version once the new one is loaded. This is the model that lets Druid offer "atomic re-ingest" for backfills: ingest a corrected day under a new version, the Coordinator switches the dashboard to the new version, drops the old. No reader ever sees a half-replaced state.

Rollup — the ingest-time aggregation

Rollup is Druid's most distinctive choice. At ingest time, rows that share the same dimension tuple within the same time bucket can be collapsed into a single row whose metrics are pre-aggregated. If your event stream looks like:

2026-04-24T09:14:00Z  country=IN  device=mobile  surface=app  impressions=1  revenue=12
2026-04-24T09:14:00Z  country=IN  device=mobile  surface=app  impressions=1  revenue=8
2026-04-24T09:14:00Z  country=IN  device=mobile  surface=app  impressions=1  revenue=15
2026-04-24T09:14:00Z  country=IN  device=mobile  surface=web  impressions=1  revenue=3

with rollup at minute granularity, those four rows become two:

2026-04-24T09:14:00Z  country=IN  device=mobile  surface=app  impressions=3  revenue=35
2026-04-24T09:14:00Z  country=IN  device=mobile  surface=web  impressions=1  revenue=3

For ad-tech, gaming, IoT, and many fintech audit streams, the per-row data is not informative — the aggregate is. Rollup at ingest commonly shrinks volume 10–100×, and every downstream query benefits because there's less to scan. The cost is that you cannot recover the individual events from the rolled-up segment. Rollup is a one-way decision per datasource. Why rollup is not the same as a materialised view: a materialised view sits on top of a base table and is a derived structure; the base is still queryable. Rollup eliminates the base. The trade-off is space and query speed for retrievability of individual events. For dashboards over 30 days of impressions, the individual events are not what anyone is asking for; rollup is the right call.

The rollup grain is set per datasource at ingest spec time. Common choices: MINUTE for ad-tech (1-minute buckets), HOUR for IoT, NONE (no rollup) for audit logs where every event must be preserved.

A working ingest spec, query, and the result

Here is a complete ingest spec for an event stream from Kafka into a Druid datasource, plus a SQL query that hits the bitmap index path. Output is from a real cluster — three Brokers, six Historicals on c5.2xlarge, three MiddleManagers, the Metamarkets-style ad-tech data shape.

import requests, json, time

OVERLORD = "http://druid-overlord:8090"
BROKER   = "http://druid-broker:8082"

# 1. Submit an ingestion supervisor that consumes Kafka and writes segments.
ingest_spec = {
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "bid_events",
      "timestampSpec": {"column": "ts", "format": "millis"},
      "dimensionsSpec": {"dimensions": ["country", "device", "surface",
                                        "advertiser_id", "creative_id", "exchange"]},
      "metricsSpec": [
        {"type": "longSum", "name": "impressions", "fieldName": "impressions"},
        {"type": "longSum", "name": "revenue_paise", "fieldName": "revenue_paise"},
        {"type": "thetaSketch", "name": "users_sketch", "fieldName": "user_id"}
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "MINUTE",
        "rollup": True
      }
    },
    "ioConfig": {
      "topic": "bid-events",
      "consumerProperties": {"bootstrap.servers": "kafka:9092"},
      "taskCount": 6, "replicas": 2
    },
    "tuningConfig": {"type": "kafka", "maxRowsInMemory": 1000000}
  }
}
requests.post(f"{OVERLORD}/druid/indexer/v1/supervisor",
              json=ingest_spec).raise_for_status()

# 2. Ad-hoc dimension slice — bitmap-index path, single segment fan-out.
sql = """
SELECT country, device, surface,
       SUM(impressions)      AS imps,
       SUM(revenue_paise)/100 AS rupees,
       APPROX_COUNT_DISTINCT_DS_THETA(users_sketch) AS uniq_users
FROM bid_events
WHERE __time BETWEEN TIMESTAMP '2026-04-24 00:00:00'
                 AND TIMESTAMP '2026-04-24 23:59:59'
  AND advertiser_id = 'flipkart-bbd-2026'
GROUP BY country, device, surface
ORDER BY rupees DESC
LIMIT 10
"""
t0 = time.time()
r = requests.post(f"{BROKER}/druid/v2/sql",
                  json={"query": sql, "resultFormat": "array"})
for row in r.json(): print(row)
print(f"latency = {(time.time()-t0)*1000:.1f} ms")
['IN', 'mobile',  'app',   18429103, 4914203, 7142891]
['IN', 'mobile',  'web',    9214038, 2102844, 4231044]
['IN', 'desktop', 'web',    4218910,  812490, 1832441]
['AE', 'mobile',  'app',     921310,  401202,  214831]
['US', 'mobile',  'app',     412930,  180911,   91410]
['IN', 'connected_tv', 'app', 391044, 612831,    21044]
['SG', 'mobile',  'app',     281930,   91240,   84310]
['IN', 'desktop', 'app',     142930,   31840,   71248]
['MY', 'mobile',  'web',      91440,   21040,   42018]
['IN', 'mobile',  'tv',       81940,   12041,   38104]
latency = 47.2 ms

Walk what the query did:

The handoff — from MiddleManager to Historical

When a real-time segment seals (after segmentGranularity time has elapsed and the in-memory rows are flushed), the peon running on the MiddleManager pushes the segment file to deep storage and inserts a row into the metadata DB. The Coordinator's next loop picks up the new segment, decides which Historical tier should hold it (hot, cold, archive — based on time-since-creation rules), and instructs the chosen Historicals to download it from deep storage. Once the Historicals confirm they have it, the Coordinator marks the peon's serving copy as "released" and the peon discards its in-memory copy.

This handoff is the seam between live and historical. While the peon holds the segment, the Broker queries it from the MiddleManager. After handoff, the Broker queries the Historical. The metadata DB is the source of truth for "where is this segment served from right now"; the Broker's segment-cache refresh picks up that change. Why a separate handoff step rather than always serving from deep storage: deep storage is S3 with 10–100 ms latency per object read. Querying directly from S3 would blow the 100 ms dashboard budget on the first GET. Historicals pull the segment to local NVMe during their boot or a Coordinator-instruction load, and queries hit the local mmap'd file in microseconds. Deep storage is the durability layer; Historicals are the serving layer.

The tiering rules are why this matters in production. A typical Indian ad-tech setup runs three Historical tiers: hot (NVMe, 7 days, replication 2) holds the current week, cold (HDD, 30 days, replication 1) holds last month, archive (deep storage only, no Historical) holds older data. Queries for last week land on hot Historicals in milliseconds; queries for last month land on cold in tens of milliseconds; queries spanning archive trigger a Historical to lazy-load from deep storage in a few seconds. The cost ratio is roughly 1:0.3:0.05 between the tiers; the latency ratio is the inverse. Karan's growth dashboard at the start of this chapter only ever queries hot.

Common confusions

Going deeper

Theta sketches and approximate analytics

Druid's metricsSpec supports thetaSketch, HLLSketch, quantilesSketch, and tDigest as native column types. These are mergeable sketches: a sketch built per row and rolled up at ingest preserves cardinality / quantile estimates with bounded error and supports cross-segment merge at query time. A query for "unique users last week, segmented by city" against rolled-up data with 10 billion underlying events answers from 200 sketch-merges in 30 ms. Without sketches, exact distinct over 10 billion events would need a hash table the size of the cardinality; impossible at sub-second latency. This is why Druid is dominant in ad-tech: every dashboard wants unique counts, and theta sketches make them practically free.

Compaction and the "small segments" problem

Real-time ingestion produces one segment per hour per partition per replica. Over a year with 6 partitions, that is 26,000 segments per datasource — manageable. Over a year with 200 partitions across 50 datasources, that is over 8 million segments — Coordinator slowdown territory. Druid's compaction task merges adjacent segments within the same time chunk into larger, denser segments, reducing both segment count and per-query overhead. The trade-off is that compaction is a separate batch job that contends with ingestion for MiddleManager slots. Most production clusters run compaction with a low-priority lock, scheduled to run during off-peak hours. Razorpay's ad-server analytics cluster spends 30% of MiddleManager hours on compaction; without it, query p99 doubles within two months.

Multi-tenancy and the lookup pattern

A Druid cluster typically serves multiple datasources for different teams. Cross-team isolation is via Coordinator tier rules (one team's hot tier is another team's archive) and broker query priorities. A particular Druid pattern that does not exist in ClickHouse or Pinot is the lookup: a small reference table (city_id → city_name, advertiser_id → advertiser_name) loaded into every Historical's memory and joined at query time without a network hop. Lookups are limited to a few million rows and refresh on a poll loop; they are how Druid handles dimensional joins without an actual JOIN engine. For unbounded joins (fact-to-fact), Druid is not the right tool — Trino on the same Iceberg base is.

Druid's history and what changed

The original 2011 Druid paper described five process types and a real-time node that was eventually split into MiddleManager + Peon. The 0.10 release in 2017 added the SQL layer (before that, queries were a custom JSON DSL). Druid 24 (2022) added multi-stage query (MSQ), which lets Druid run shuffle-based queries — basically Spark-on-Druid — for batch workloads beyond OLAP. Druid 28 (2024) added experimental dynamic schema discovery for unstructured ingestion and made theta sketches the default for distinct counts. The architectural core has barely changed; the surface is now much friendlier.

Where this leads next

The thread running through ClickHouse, Pinot, and Druid is that the hard problem of real-time analytics is not raw scan throughput — it is deciding, at ingest time, what answer to make readily available. ClickHouse keeps the rows and makes the scan very fast. Pinot pre-aggregates declared dimension paths. Druid bitmap-indexes every dimension and rolls up at ingest. Three corners of the same triangle, and the right answer depends on whether your dashboard's queries are knowable in advance.

References