In short

A search index that fits in RAM on one machine is bliss. A search index for a hundred million Flipkart products, ten years of Slack messages, or every log line your fleet emitted last week does not fit, and the moment one node is no longer enough, every assumption from the previous six chapters has to be re-checked under a new constraint: the postings lists for samsung and phone may now live on different machines. The standard answer — used by Elasticsearch, OpenSearch, Solr Cloud, Quickwit, and Vespa — is sharding plus scatter-gather plus a reducer. The index is split into N primary shards at creation time; each primary has M replicas for read throughput and failover; documents are routed by hash(routing_key) % N. A query arriving at any node makes that node the coordinator: it fans the query out to one replica of each shard, each shard runs the query locally against its own inverted index and returns its top-K hits with their BM25 scores, and the coordinator reduces the union of N \cdot K candidates by re-sorting on score and keeping the global top-K. A second round trip then fetches the full document bodies for just those final winners. This pattern is fast, embarrassingly parallel, and almost trivially correct — until you ask for page 1001, at which point the reducer has to sort N × 10010 documents to return ten of them. This chapter builds the scatter-gather pattern from first principles, walks through the two-phase query-then-fetch protocol, traces a real query across four shards of an Indian e-commerce index, and explains the two famous distributed-search hazards: deep paging (and its fix, search_after) and cross-shard IDF (and its fix, dfs_query_then_fetch).

By chapter 143 you had a working inverted index. By chapter 145 the postings were compressed. By chapter 146 BM25 ranked them. By chapter 148 the segments rotated cleanly on disk. The whole search engine sat on one box. Then someone in product told you the catalogue is going from ten million SKUs to a hundred million, and your one box — even a beefy one with a terabyte of RAM — is not going to hold the inverted index, the doc store, the field caches, the JVM heap, and serve traffic. The index has to be sharded.

Sharding a search index is not the same problem as sharding an OLTP key-value store. In a KV store you route a key to a shard and the entire answer lives there. In a search index a single query — samsung phone under 20000 — needs to consult every shard, because any shard might contain a document that ends up in the top-10. There is no routing key for "the best matches across the whole corpus". So sharded search invents a different pattern: scatter the query to all shards in parallel, gather their per-shard top-K hits, and reduce the union into a global top-K. The pattern is old (it dates to the original Inktomi and AltaVista architectures of the late 1990s) and is now the lingua franca of every horizontally scalable search engine you will deploy.

The sharding model

An Elasticsearch (or OpenSearch) index is a logical container of documents. Behind it sits a fixed number of primary shards decided at index creation time, each of which is a self-contained Lucene index — its own inverted index, its own doc store, its own segment files, its own merges. Each primary shard has zero or more replica shards, which are byte-for-byte copies of the primary used for read load-balancing and failover. The product is what you actually deploy: N \cdot (1 + M) shard copies for an index with N primaries and M replicas each.

Sharded index layoutAn index labelled "products" at the top is split into 4 primary shards labelled P0, P1, P2, P3. Each primary has 2 replicas. Below, three node boxes are labelled node-A, node-B, node-C. The 12 shard copies (4 primaries + 8 replicas) are distributed across the three nodes such that no primary and its replica share a node.Index "products": 4 primary shards × (1 primary + 2 replicas) = 12 shard copiesindex: productsP0 (primary)P1 (primary)P2 (primary)P3 (primary)↓ each primary has 2 replicas; copies scattered across nodes ↓node-AP0R1·aR2·aR3·bnode-BP1R0·aP3R2·bnode-CP2R0·bR1·bR3·aprimaryreplicano primary shares a node with its replica → survives any single-node failure
An index of 4 primary shards with 2 replicas each lays down 12 shard copies on the cluster. Elasticsearch's allocator guarantees that a primary and any of its replicas live on different nodes, so losing any single node still leaves at least one copy of every shard reachable.

A document arriving at the cluster is routed to its primary shard by a deterministic hash:

shard_id = hash(routing_key) mod N

The default routing_key is the document's _id, but you can override it (Elasticsearch lets you say ?routing=customer_42 so all of a customer's documents land on the same shard, which makes filter-by-customer queries hit only one shard instead of all of them). Why hash and not range: range routing creates hot spots on whichever range happens to be popular today; hashing spreads write load uniformly across shards by construction.

The number of primary shards is fixed at index creation. You cannot reshard an Elasticsearch index in place — to change N you reindex into a new index with the new shard count, then atomically swap an alias from the old to the new. Why fixed: the routing function hash(id) % N would map an existing document to a different shard if N changed, so every document would have to move. Elasticsearch's _split and _shrink APIs work around this for power-of-two splits without rehashing every document, but they are still offline operations.

Replicas, by contrast, are dynamic. You can run PUT /products/_settings {"number_of_replicas": 3} on a live index and the cluster will copy each primary to three replicas in the background. Replicas serve reads (the coordinator load-balances queries across primary + replicas), so adding replicas is the standard way to scale out read throughput once the shard count is fixed.

The query lifecycle: scatter, gather, reduce, fetch

A search query against a sharded index runs in two phases. This is the central protocol of distributed search and worth slowing down for.

Scatter-gather query flowA timeline left-to-right. Client at top-left sends a query to a Coordinator node. The coordinator fans the query out to four shard nodes (S0, S1, S2, S3) in parallel. Each shard returns its top-K results with scores. The coordinator reduces the union into a global top-K by re-sorting on score, then returns the final top-K to the client. Below, a second round trip fetches full document bodies for just the final winners.Scatter-gather: client → coordinator → all shards → reduce → clientclientcoordinatorS0S1S2S3qscatter(q)→ topK₀ + scores→ topK₁ + scores→ topK₂ + scores→ topK₃ + scoresREDUCER (on coordinator)union N × K candidates → re-sort on score→ pick global top-Kcost: O(N · K · log(N · K))client→ global topK (ids+scores)Phase 1: querypayload = K × (id, score)tiny — bytes per shardPhase 2 (next): fetchfull _source for K winnersphase 2: fetch full _sourcefrom owning shards
The scatter-gather pattern. The coordinator fans the query out to one replica of each shard in parallel, gathers their per-shard top-K results, reduces the N · K candidates into a global top-K, and returns the IDs to the client. A second round trip fetches the full document bodies for just those K winners.

Phase 1 — query. The client sends GET /products/_search with a query body to any node in the cluster. That node becomes the coordinator for this request. It looks up the shard table, picks one replica of each of the N primary shards (round-robin or by Adaptive Replica Selection, which prefers replicas with lower observed latency), and ships the query to all of them in parallel. Each shard runs the query against its own inverted index — the postings lists, the BM25 scorer, the filter cache, all of it — and returns the top-K matching document IDs along with their BM25 scores. Why only IDs and scores, not the full documents: the per-shard top-K is at most N times larger than what the client asked for, but the document bodies are big — JSON product descriptions can be a kilobyte each. Sending only IDs and scores makes the per-shard payload tiny.

The reducer. When the coordinator has all N per-shard results, it has N \cdot K candidate hits in front of it. It puts them in a heap keyed by score and pops the top K. That is the reducer. The work is O(N \cdot K \cdot \log(N \cdot K)) — for typical N = 10 and K = 10 that is sorting a hundred items, microseconds.

Phase 2 — fetch. The coordinator now knows the global top-K document IDs and which shard each lives on. It sends a fetch request to each owning shard for the full document bodies (or _source, or whatever fields the user asked for). The shards return the documents, the coordinator assembles the final result page, and ships it to the client.

Two phases is one round trip more than you might expect. The point of the split is bandwidth: phase 1 is a tiny request because only IDs and scores cross the wire, and phase 2 only fetches the bodies of the final winners — never any of the per-shard losers. If you ran this as a single phase that returned full documents from every shard, you would transfer N \cdot K documents only to throw away N \cdot K - K of them.

A skeleton of the coordinator, in pseudo-Python:

def search(index: str, query: dict, size: int = 10) -> list[dict]:
    shards = cluster_state.shards_for(index)            # [shard_0, ..., shard_{N-1}]
    # Phase 1: scatter
    futures = [
        send_query(pick_replica(s), query, size=size)   # one replica per shard
        for s in shards
    ]
    per_shard = [f.result() for f in futures]           # gather (parallel)

    # Reduce: union, re-sort by BM25 score, take top-K
    all_hits = [hit for hits in per_shard for hit in hits]
    top = heapq.nlargest(size, all_hits, key=lambda h: h.score)

    # Phase 2: fetch full bodies for the winners only
    by_shard = group_by(top, lambda h: h.shard_id)
    docs = parallel_fetch(by_shard)
    return assemble(top, docs)

The coordinator is stateless across requests — any node can play the role for any query — which is why an Elasticsearch cluster has no single point of failure for search traffic.

A "samsung phone" query on 100 M Indian e-commerce products

A Bangalore-based marketplace runs an index: products with 4 primary shards (call them P0..P3), each with 2 replicas, totalling 12 shard copies spread across 3 nodes (node-A, node-B, node-C). The corpus is 100 million product listings — about 25 M per shard, with shards balanced because routing is hash(product_id) % 4. A buyer in Pune types samsung phone into the search box. Their browser hits the API gateway, which forwards the query to node-B. node-B becomes the coordinator for this request and does not run the query itself; it dispatches.

node-B looks at the cluster state, sees that the four primary shards have replicas at {P0 → node-A, R0·b → node-C}, {P1 → node-B, R1·a → node-A}, {P2 → node-C, R2·a → node-A}, {P3 → node-B, R3·a → node-C}, and asks the Adaptive Replica Selection algorithm to pick the lowest-latency replica for each shard. It picks P0@node-A, P1@node-B (local!), P2@node-C, P3@node-B (local!). Two of the four queries do not even leave the box — Elasticsearch's coordinator preferentially picks local replicas when latencies are comparable.

Each shard runs the BM25 query against its 25-million-document inverted index. Postings lists for samsung and phone are intersected, scored, and the top-10 hits are returned with (doc_id, _score). Per-shard latency: about 25 ms each, all running in parallel, so wall-clock for the scatter phase is dominated by the slowest shard — say 30 ms. The coordinator now has 40 candidate hits (4 shards × top-10) and reduces them to the global top-10 in 200 microseconds. Phase 1 total: ~30 ms.

Phase 2: the 10 global winners come from, say, {P0: 3, P1: 4, P2: 1, P3: 2} documents. The coordinator issues four parallel fetch requests for the corresponding _source documents. The shards read each document from their doc store (one disk seek per doc, often served from page cache for hot products) and return the JSON bodies. Network round-trip plus disk: ~20 ms.

Total user-perceived latency: 30 + 20 = 50 ms inside the cluster, plus ~50 ms of API gateway, TLS termination, and India-internal network → ~100 ms in the user's browser. The buyer sees a Samsung Galaxy M-series result list before they have finished thinking about which phone to tap.

If node-A were down at query time, the coordinator would route P0's query to R0·b@node-C instead, and the user would never know.

The deep paging problem

The reducer is cheap when the global top-K is small. It stops being cheap when the user asks for page 1001.

Pagination in Elasticsearch's classic API uses from and size: from=10000&size=10 asks for results 10001 through 10010. To return those ten documents, the coordinator must know the global ranking up to position 10010 — which means each shard must return its local top 10010 hits, because any of those could end up in the global top 10010. The coordinator then merges N \times 10010 hits into a sorted run of length \geq 10010, slices off positions 10000 through 10010, and serves them.

The deep paging problemA diagram showing a query "from=10000, size=10" sent to a coordinator. The coordinator fans out to 4 shards, each of which is forced to return its local top 10010 hits. The coordinator then has 40,040 candidate hits to sort. The fix shown below is search_after: client passes the last seen sort value, each shard returns just K hits past that cursor, total work O(N · K).Deep paging: from=10000, size=10 — coordinator sorts N × 10010 hits to pick 10from=10000, size=10coordinatorS0: top 10010S1: top 10010S2: top 10010S3: top 10010REDUCER must sort 40,040 hitsN × (from + size) = 4 × 10010to deliver 10 documentscost grows linearly with `from`Fix: search_after — cursor on the last seen sort valuesearch_after=[last_score, last_id]coordinatorS0: top 10S1: top 10S2: top 10S3: top 10REDUCER sorts 40 hitsN × size = 4 × 10cost is constant per pageO(N · K) regardless of depth
Top: classic offset-based pagination forces every shard to return `from + size` hits, and the reducer sorts `N × (from + size)`. Cost grows linearly with the page number. Bottom: `search_after` passes a cursor (the last seen sort value) and each shard returns only `size` hits past that cursor. Cost is `O(N · size)` regardless of depth.

This is deep paging, and it is one of the few performance failure modes that distributed search inherits purely from being distributed. With N=10 shards and from=10000, the reducer is sorting 100,100 hits to deliver 10 documents — and worse, every shard is forced to keep 10010 hit objects in memory just to ship them to the coordinator. Elasticsearch defaults index.max_result_window to 10,000 specifically to refuse this query at the protocol boundary rather than let one bad pagination request cause an OutOfMemoryError on the coordinator.

The fix is search_after, a cursor-based pagination scheme. After the first page, the client gets back the sort values of the last document on that page (typically [score, _id]). For the next page it sends search_after=[last_score, last_id]&size=10. Each shard now runs the same query but only returns hits that sort strictly after the cursor — its local top-10 past the cursor, not its local top-10010 since the start. The coordinator merges N \times 10 hits, picks the global top-10, and returns them along with a fresh cursor. Cost per page: O(N \cdot K), independent of depth. The trade-off is that you lose random access — there is no "jump to page 47" — and pagination must be sequential. For an infinite-scroll product feed (the dominant UX on Indian e-commerce apps anyway), that constraint is invisible to the user.

search_after is what every modern search UI you build should use. Reserve from/size for shallow pagination (the first few pages) and reject anything past max_result_window.

The cross-shard scoring caveat

There is a second subtlety that surprises engineers the first time they look closely at sharded BM25 scores. BM25 has an IDF term — inverse document frequency, \log((N - df + 0.5)/(df + 0.5)) — and IDF depends on N (corpus size) and df (number of documents containing the term). On a single-machine index those are unambiguous. On a sharded index, by default, each shard computes IDF using its own local N and local df, because asking every shard to share statistics on every query would double the round trips.

For a uniformly distributed corpus this approximation is harmless. But suppose you have 4 shards and a brand-new term "oneplus" happens to land disproportionately on shard P3 because of how the routing key worked out — say P3 has df = 5000 and the others have df = 50. Then P3 computes a small IDF for oneplus (the term looks common in P3's local corpus) while P0–P2 compute a large IDF (the term looks rare in their local corpora). The same query term gets weighted differently on different shards, and a oneplus phone X document on P0 may rank above an objectively better oneplus phone Y on P3 just because of where the documents happened to land.

The fix is search_type=dfs_query_then_fetch. With this mode the coordinator runs an extra preliminary round trip that asks every shard for the local document frequencies of every query term, sums them into global document frequencies, and ships those globals back out with the actual query. Every shard now scores using the same global IDF, and scores become directly comparable. The cost is one extra round trip per query — typically a 10–20 % latency hit — so most production deployments leave it off and accept that scoring is approximate, on the theory that with hundreds of thousands of documents per shard the local statistics are close enough to the global ones that ranking errors are vanishingly rare.

You should switch to dfs_query_then_fetch when (a) shards are small (< 100 K documents) so local statistics are noisy, (b) the corpus is heavily skewed (some shards much larger than others), or (c) you are running a measurable A/B test where the relevance metric matters more than 20 ms of latency.

Replicas and the read path

Replicas are the part of the architecture that turns consistent reads under failure into higher read throughput in the common case. The coordinator's choice of which replica to send each shard's query to is purely local: it consults a per-node observed-latency table (Elasticsearch calls this Adaptive Replica Selection) and picks the replica that has been responding fastest. Why this matters: a hot node — say one running an expensive merge or with a saturated disk queue — will start to respond more slowly, ARS will route around it within a handful of queries, and the slow node will recover as its load drops. The system self-balances without any central coordinator.

If the chosen replica fails to respond, the coordinator retries on another replica of the same shard — the shard query is idempotent (it's a read), so retries are safe. If all replicas of a shard are unreachable, the coordinator returns the partial results it has from the surviving shards along with a _shards.failed count, so the client at least sees something rather than a 5xx. This is configurable: allow_partial_search_results=false will instead fail the whole query.

Writes route to the primary, which then replicates to its replicas in parallel and acknowledges only after a configurable quorum — defaults to "primary + 1 replica" with wait_for_active_shards. Writes therefore cost replication latency, but reads cost only one shard hop.

Going deeper

The protocols below are what production search teams actually tune: cross-cluster search (federating queries across multiple Elasticsearch clusters), the can-match phase that prunes shards before scatter, segment-level early termination for sorted indexes, and Quickwit's split-based architecture that decouples storage from compute and lets the coordinator skip whole splits.

The can-match phase

Before scattering the actual query, the coordinator runs a lightweight can_match pre-filter against each shard. The shard inspects its segment-level metadata (min/max values for date fields, ranges for numeric fields, term dictionaries for cardinality estimation) and replies with a yes/no. If a shard cannot possibly contain a match — say the query has a filter timestamp >= 2026-04-01 and the shard's max timestamp is 2026-03-15 — the coordinator skips it entirely. For time-sliced indexes (one shard per day) this often eliminates 90 % of shards from a given query, turning a 365-shard scatter into a 7-shard scatter for "last week".

Cross-cluster search

For very large deployments (geographic or organisational), Elasticsearch supports cross-cluster search: a coordinator in cluster A holds a "remote cluster" connection to clusters B, C, D, and treats each cluster's shards as if they were local for query purposes. The fan-out is one extra hop deep but the protocol is otherwise identical. Indian companies running region-pinned clusters (one in Mumbai for compliance, one in Singapore for latency to SE Asia) federate this way.

Segment-level early termination

If you sort by a field that segments are physically sorted on (e.g., timestamp for time-series logs), each segment can stop scanning as soon as it has enough hits to fill the requested top-K. Combined with index.sort.field at index creation time, this turns "give me the latest 50 errors" from a full segment scan into a tiny prefix read, and is the reason log-search systems like Elasticsearch + ECS can serve "tail -f" style queries over petabyte indexes in milliseconds.

Decoupled storage: Quickwit and the lake-house pattern

Quickwit is a Rust-based search engine that takes the sharding model and adds one twist: shards (it calls them splits) are immutable files stored on object storage (S3, GCS), not on the search node's local disk. The coordinator's query plan picks which splits to read based on metadata, lazily streams just the needed bytes from S3 through a cache, and runs the BM25 search on the fly. This decouples storage cost (cheap, S3) from compute cost (only when querying), and is the model behind most modern log-search-on-S3 systems. The scatter-gather + reducer pattern is unchanged; what changes is where the shard's data physically lives and how it gets paged in.

Wrapping up

Distributed search is a beautiful application of the master pattern of distributed systems: parallelise the work, centralise the merge. The inverted index from chapter 143 and the BM25 scorer from chapter 146 work locally, unchanged, on each shard. Sharding adds a routing function on writes and a coordinator on reads. The coordinator's job is small — fan out, gather, heap-sort to top-K, fetch — but the two failure modes you have to design around (deep paging and cross-shard IDF) are unavoidable consequences of the architecture, and every production search system ships explicit knobs for both.

The next chapter (150) layers a second retrieval mode on top of this: dense vector search for semantic recall, fused with the BM25 lexical recall you already have, plus structured filters like price and category. The scatter-gather machinery you built here is what runs all three in parallel under one query.

References

  1. Elastic — Distributed Search Execution. Elasticsearch reference documentation.
  2. Elastic — Sizing Elasticsearch shards and Paginate search results. Reference documentation covering shard count trade-offs and search_after.
  3. Gormley, C. & Tong, Z. (2015). Elasticsearch: The Definitive Guide — Distributed Search Execution. O'Reilly. The classic two-phase query-then-fetch walkthrough.
  4. OpenSearch — Cluster formation and shard allocation. OpenSearch project documentation.
  5. Quickwit — Architecture: splits, indexers, searchers, and metastore. Quickwit project documentation.
  6. Apache Solr — Distributed Search with Index Sharding. Solr Reference Guide.