In short

SQL's JOIN combines rows from two tables on a shared key. In a single-node relational database that operation is cheap — the planner picks a hash join, a sort-merge join, or a nested-loop join and every row of both inputs is reachable in microseconds. In a distributed wide-column database the two sides of the join almost always live on different nodes, chosen by independent partition-key hashes; performing the join would require either shipping one whole table through the network to the other ("broadcast") or repartitioning both on the join column ("shuffle"). Both are catastrophic at petabyte scale, and both break the shared-nothing invariant that makes the cluster scale in the first place. Cassandra rejects the operation outright at the CQL layer; DynamoDB exposes only lookups and scans, never joins; Bigtable has no JOIN syntax at all.

Four patterns take its place. Denormalisation copies the fields you need into the table you query — the user's display name lives inside every tweet row, fan-out writes keep it current. Materialised views ask the engine to maintain a second table automatically from a primary; Cassandra's implementation is famously fragile, CockroachDB and DynamoDB's secondary-index variants are better. Application-side joins issue two independent queries from your service and merge the results in memory — the workhorse of production code, simple and schema-agnostic. External compute (Spark, Flink) does the actual join offline for analytics over the whole dataset, with minute-to-hour latency. Each trades storage, staleness, latency, and complexity against the others. Knowing which one applies to a given access pattern is the substance of Chapter 89.

A single SQL statement captures the problem cleanly. In a relational database you write:

SELECT t.text, u.name
FROM tweets t
JOIN users u ON t.user_id = u.id
WHERE t.id = 42;

Postgres plans one query, touches two index entries, stitches two rows into one result. Ten lines of client code, one round trip, no architectural consequences. Now run the same request against a three-node Cassandra cluster. The tweets table is partitioned by tweet_id; row 42 lives on some replica set. The users table is partitioned by user_id; the author of tweet 42 lives on a different replica set. The join requires rows from two partitions on two sets of nodes. CQL does not offer the syntax. The application has to choose a pattern — and the choice has first-class schema consequences that persist for the life of the system. That choice is what Build 11 is teaching you to make.

Why distributed joins are hard

Consider two tables A(a_key, ...) and B(b_key, ...) that you want to join on a_key = b_key. On a single machine the planner has a handful of algorithms, all of them fast because both inputs are in the same address space. Hash join reads B into a hash table keyed by b_key, then streams A and probes the hash for each row — O(|A| + |B|) time, one full pass per side. Sort-merge join sorts both inputs on the join key and walks them in lockstep. Nested-loop join, the oldest idea, scans A and for each row looks up B through an index. All three work because both A and B live on the same node.

In a shared-nothing cluster A and B are partitioned independently. Row A[i] lives on the node whose hash of A[i].a_key says so; row B[j] lives on a totally different node, chosen by the hash of B[j].b_key. The join algorithm needs access to both. There are only two ways to get it, and both are brutal.

Broadcast the smaller table. Ship every row of B to every node, so each node holds all of B in memory. Then each node can do a local hash join with its local slice of A. Cost: |B| bytes over the network, multiplied by the number of nodes. Feasible when B is a small dimension table (a few million rows) and the cluster has fast networking; impossible when B itself is hundreds of gigabytes.

Shuffle both on the join key. Recompute a new partitioning where A and B are co-located — every row of A with a_key = k and every row of B with b_key = k end up on the same node. Then each node does a local join on its slice. Cost: |A| + |B| bytes through the network, rewriting both tables in the process. Feasible for batch analytics that run for minutes; catastrophic for an OLTP query that is supposed to return in single-digit milliseconds.

Why the shared-nothing architecture makes this worse, not better: the whole point of Cassandra, DynamoDB, and Bigtable is that nodes do not share memory or disks. Each node owns its partitions outright. There is deliberately no coordinator that can see both A and B at once — if such a coordinator existed it would be the scaling bottleneck the architecture was designed to avoid. Shared-nothing buys linear scale for per-partition operations and pays for it by giving up cross-partition operations. Joins are the most visible casualty.

Relational databases like Postgres and Oracle have many clever tricks, but at cluster scale they hit the same wall. Spanner, F1, and CockroachDB implement distributed joins by performing shuffles and broadcasts at query time, and they are explicit that complex joins on large tables carry latency of seconds or tens of seconds. That is the actual cost of the abstraction. Cassandra declines to hide it — the CQL grammar has no JOIN keyword at all — on the theory that a database which silently turns a syntactic convenience into a 30-second cross-cluster operation is misleading its users.

Distributed join requires shuffle or broadcastThree cluster nodes each hold partitions of table A and table B, keyed on different columns. To join, either broadcast all of B to every node (network cost proportional to |B| times number of nodes) or shuffle both tables on the join key (network cost proportional to |A| plus |B|).Two tables partitioned independently — the join has nowhere to runNode 1A slice: a_key in [0, k1)B slice: b_key in [0, m1)overlap of keys here: ~0%Node 2A slice: a_key in [k1, k2)B slice: b_key in [m1, m2)overlap of keys here: ~0%Node 3A slice: a_key in [k2, max)B slice: b_key in [m2, max)overlap of keys here: ~0%Option 1: Broadcast BShip all of B to every node.Network cost = |B| × nodes.Works only if B is small.Option 2: Shuffle bothRepartition A and B on join key.Network cost = |A| + |B|.Minutes to hours at TB scale.
Why wide-column databases refuse to implement JOIN. Table A is partitioned on a_key and table B on b_key; the probability that a given A-row and its matching B-row happen to share a node is near zero. Either the engine ships one table across the network (broadcast) or re-partitions both on the join column (shuffle). At OLTP scale, both exceed the latency budget by orders of magnitude.

Pattern 1 — Denormalisation

The first and most common answer is the one relational textbooks spend a chapter warning you against: copy the columns you need into the table you are going to query. If the read path asks "give me the tweet with author's display name and avatar", put the author's display name and avatar inside the tweet row.

CREATE TABLE tweets (
    tweet_id     bigint PRIMARY KEY,
    text         text,
    user_id      bigint,
    user_name    text,
    user_avatar  text,
    created_at   timestamp
);

On the write path, when a user composes a tweet, your application fetches the author's profile and stuffs user_name and user_avatar into the new row. On the read path, the single-partition lookup returns everything the timeline needs in one round trip. No join, no second query, no fan-out at read time.

The cost comes due when the user changes their display name. Every tweet they ever wrote has a stale user_name column. The fix is a write fan-out: when Priya renames herself, the application reads the list of her tweet IDs and issues an UPDATE against each one. For a normal user with a few thousand tweets, that is a few thousand writes — seconds of work, amortised over a rare operation. For a celebrity with 200000 tweets, it is a background job that takes a minute and touches 200000 partitions. Production systems either accept the cost, rate-limit the rename, or run the update as a Spark job for users above a threshold.

Why this is not the sin relational normalisation theory makes it out to be: the "normalise until it hurts, denormalise until it works" rule was written for single-machine databases where disk was expensive and joins were cheap. Flip those economics — disk is cheap at cloud prices and joins are expensive at cluster scale — and denormalisation becomes the default, not the fallback. The wide-column schema is not a failed relational schema; it is a different schema optimised for a different cost model.

Denormalisation is the most performant per-query pattern because it turns every read into a single-partition lookup. It is also the most storage-heavy and the most write-fan-out heavy. Use it for access patterns that are read-dominant by at least an order of magnitude, where the joined fields rarely change, and where the hot path latency matters more than the background update cost. Tweets-with-author-name fits perfectly: users read each other's tweets orders of magnitude more often than anyone renames themselves.

Pattern 2 — Materialised views

A materialised view is a second table, derived from a primary table, that the database maintains automatically. You declare the view once; every write to the base table causes the engine to propagate the change to the view. The read path queries the view as if it were a normal table, and gets the denormalised data with no application-side fan-out logic.

Cassandra's CQL supports the syntax directly:

CREATE MATERIALIZED VIEW tweets_by_user AS
    SELECT tweet_id, user_id, created_at, text, user_name
    FROM tweets
    WHERE user_id IS NOT NULL AND tweet_id IS NOT NULL
    PRIMARY KEY (user_id, created_at, tweet_id)
    WITH CLUSTERING ORDER BY (created_at DESC);

The base table tweets is partitioned on tweet_id; the view tweets_by_user is partitioned on user_id and sorted by created_at DESC. Inserts and updates to the base cause the engine to perform the corresponding insert, update, or delete on the view behind the scenes. The application sees a second "table" it can query by user — the same pattern you would otherwise implement with a second CREATE TABLE and dual writes.

The win is real: the engine handles the fan-out logic, so the application cannot forget to update the second table or handle a partial-failure edge case incorrectly. The loss is also real: Cassandra's materialised-view implementation has a long Jira history of consistency bugs. When a replica for the base table fails mid-write, the view may diverge from the base in ways that are hard to detect and harder to repair. As of Cassandra 4.x the recommendation from many production operators is still "build the second table by hand". Community guidance marks materialised views as "experimental" and suggests avoiding them for production workloads where drift would be a correctness problem.

Other systems handle this better. CockroachDB's materialised views use its distributed-SQL transaction machinery and stay synchronously consistent with the base. TiDB offers similar guarantees. DynamoDB's Global Secondary Indexes (GSIs) and Local Secondary Indexes (LSIs) are functionally materialised views with specific restrictions — a GSI is a view with a different partition key, an LSI shares the partition key but uses a different sort key. Both are maintained by the DynamoDB engine asynchronously, and DynamoDB is explicit that GSI reads are eventually consistent with base-table writes (typically within a second, sometimes longer under load).

Choose materialised views when the engine's guarantees are strong enough for your correctness needs and the application simplification is worth the loss of direct control. Choose explicit second tables with application-managed dual writes when you need tight control over consistency, ordering, or failure handling.

Pattern 3 — Application-side joins

Sometimes the right answer is the obvious one: issue two queries and merge in application memory. The read path fetches the primary row, reads the foreign key out of it, issues a second query for the referenced row, and assembles the result in service code.

def get_tweet_with_author(session, tweet_id):
    tweet = session.execute(
        "SELECT tweet_id, text, user_id, created_at FROM tweets WHERE tweet_id = %s",
        [tweet_id],
    ).one()
    user = session.execute(
        "SELECT user_id, name, avatar FROM users WHERE user_id = %s",
        [tweet.user_id],
    ).one()
    return {
        "tweet_id": tweet.tweet_id,
        "text": tweet.text,
        "created_at": tweet.created_at,
        "author_name": user.name,
        "author_avatar": user.avatar,
    }

Two round trips instead of one, both single-partition lookups, each served in a few milliseconds. Total latency roughly 2× the single-query case, which is usually fine — a 4 ms query becomes an 8 ms query, still well under the budget of a typical REST response.

The flexibility is the payoff. Application-side joins work for any schema, adapt immediately when access patterns change, and require no write fan-out when the joined fields change — because the joined fields live in exactly one place, the users table, and a rename updates one row. There is no denormalised copy to keep consistent. The complexity lives in the read path as code, where a human can read it, debug it, and change it, rather than as implicit engine behaviour.

Why this is the workhorse pattern in production: most real applications have some access patterns that are read-heavy enough to justify denormalisation and many that are not. For the not-heavy-enough ones, issuing the extra query is cheaper than maintaining the extra copy. A service that fetches a tweet for display 100000 times per second needs the author's name denormalised; a service that shows a moderator's dashboard 10 times per hour can afford the second round trip. Real systems mix the two, picking the pattern per access path.

The main drawback is the per-query round trip multiplication. A list of 100 tweets with author information would naïvely be 1 + 100 queries. The production optimisation is batched lookup: fetch the 100 tweets, extract their distinct user IDs (often far fewer than 100 because of duplicates), issue one IN (...) query against the users table, and merge. That is two queries regardless of list length, and works beautifully as long as the IN clause stays within Cassandra's coordinator-friendly size (a few hundred keys is fine; ten thousand starts to hurt because the coordinator has to fan out that many sub-requests).

Pattern 4 — External compute frameworks

For analytics and reporting — "what is the average tweet length across all users in South Asia, grouped by user signup month?" — no per-query pattern is adequate. The question legitimately needs a join across the entire dataset, and the latency budget is minutes, not milliseconds. This is what Spark and Flink exist for.

The Spark-Cassandra connector reads whole partitions in parallel from every Cassandra node into Spark executors. Once the data is in Spark's DataFrame abstraction, Spark's planner performs the join using a real distributed-join algorithm — shuffle-hash or broadcast-hash depending on the size ratio — and writes the result back out, either into a separate Cassandra table, into a Parquet file on S3, or into a downstream analytics warehouse like Snowflake or BigQuery. Typical latency: five to sixty minutes for a full-cluster join, depending on data volume.

The architecture separates two workloads that want conflicting things. The OLTP cluster (Cassandra) is optimised for single-partition reads at millisecond latency, with aggressive memory caching of hot partitions. Running a Spark job that sequentially reads every partition would evict the hot cache and tank the OLTP latency. Production deployments therefore either point Spark at a read-replica cluster that only serves batch workloads, or use a dedicated analytics copy of the data — Cassandra writes to a Kafka topic, a stream processor writes that into a data lake, and Spark reads the lake. The OLTP cluster never sees the scan.

This is the emergence of the "lambda architecture" or "data lake pattern": Cassandra (or DynamoDB) for OLTP at millisecond latency, Spark/Flink/Databricks for OLAP at minute latency, connected by a stream. Joins that are impossible in the OLTP side become routine in the OLAP side. The two halves speak the same data model but with different query capabilities.

Flink fills the same role with a streaming-first architecture — joins happen continuously as records arrive, producing materialised output tables that downstream services can query. Use Flink when you want the join to be always up-to-date (near-real-time dashboards); use Spark for batch reports where hourly or daily freshness is enough.

Choosing among the four

The four patterns are not alternatives, they are a toolkit. Any real system uses several of them, picking per access pattern. The decision rules are mostly mechanical.

The read-to-write ratio is the single most useful input. A 1000:1 read-heavy workload with stable joined data (every read pays the cost 1000 times; every write pays once) justifies denormalisation. A 1:1 workload with volatile joined data (every write triggers an expensive fan-out) favours application-side joins. A 10:1 workload with moderately stable data is the judgement call; materialised views or hand-maintained second tables are the usual answers.

A common mixed design: denormalise the hot path, application-side-join the cold path. The Twitter-style timeline view denormalises author name and avatar into the tweet row because the timeline is read a billion times a day and only the last 1000 tweets per user matter. A historical search over all 10 million of a user's tweets — rare, slow, tolerant of an extra round trip — joins application-side against the users table at read time. Both patterns coexist in the same service and query the same two tables; each path picks its own strategy.

Python sketch — a join-cache pattern

When an access path has to do application-side joins against a small dimension table, the per-row second lookup multiplies. Caching the dimension in application memory turns N lookups into 1 cache check per row plus 1 backing query per cache miss. For users (typically a few million rows totalling a few gigabytes), this fits comfortably in a service's heap.

from functools import lru_cache

class TweetService:
    def __init__(self, session):
        self.session = session

    @lru_cache(maxsize=100_000)
    def _load_user(self, user_id):
        return self.session.execute(
            "SELECT user_id, name, avatar FROM users WHERE user_id = %s",
            [user_id],
        ).one()

    def get_timeline(self, user_id, limit=100):
        tweets = self.session.execute(
            "SELECT tweet_id, user_id, text, created_at "
            "FROM timeline_by_user WHERE user_id = %s LIMIT %s",
            [user_id, limit],
        ).all()
        return [
            {
                "tweet_id": t.tweet_id,
                "text": t.text,
                "created_at": t.created_at,
                "author": self._load_user(t.user_id),
            }
            for t in tweets
        ]

Why LRU caching is a good default here: the users dimension is small, slowly changing, and accessed with Zipfian skew — a small fraction of users account for most lookups. LRU with a 100000-entry bound fits in tens of megabytes of heap per service instance and achieves >95% hit rate on real Twitter-shaped workloads. The occasional cache miss costs one CQL round trip. Stale entries (from users renaming themselves) are corrected the next time the TTL evicts the entry; for most UI features, 60 seconds of staleness is acceptable.

This is what an application-side join looks like with one production-grade optimisation. Real systems add TTLs, negative caching for not-found users, prefetching based on predicted access patterns, and careful cache-invalidation hooks that fire when users update their profiles. The pattern generalises: any time you have a small dimension being joined to a large fact, cache the dimension.

Worked example — the tweet timeline

The classic design exercise in Build 11. Requirement: "show the last 100 tweets a user has written, each with the author's display name and avatar". You have three schema options and three join patterns to weigh.

Option A — denormalise into tweets_by_user.

CREATE TABLE tweets_by_user (
    user_id            bigint,
    created_at         timestamp,
    tweet_id           bigint,
    text               text,
    user_display_name  text,
    user_avatar        text,
    PRIMARY KEY ((user_id), created_at, tweet_id)
) WITH CLUSTERING ORDER BY (created_at DESC, tweet_id DESC);

On the write path, every tweet inserts into this table with the denormalised user fields. On the read path, a single partition lookup by user_id returns 100 tweets with everything needed. Latency: one round trip, a few milliseconds. Write fan-out on rename: update all of this user's tweet rows (a few seconds to a minute depending on tweet count). Storage: each tweet carries 50-200 bytes of denormalised user data, so 1 KB tweets grow by 5-20%.

Option B — application-side join.

CREATE TABLE tweets_by_user (
    user_id     bigint,
    created_at  timestamp,
    tweet_id    bigint,
    text        text,
    PRIMARY KEY ((user_id), created_at, tweet_id)
) WITH CLUSTERING ORDER BY (created_at DESC, tweet_id DESC);

CREATE TABLE users (
    user_id       bigint PRIMARY KEY,
    display_name  text,
    avatar        text
);

Read path: one query for 100 tweets (single partition) + one query for the single author (since a user's own tweets all share the same author, this is a single users lookup, not 100). Two round trips, still under 10 ms end-to-end. Write fan-out on rename: one row in the users table, instant. Storage: no duplication.

Option C — materialised view.

Cassandra materialised view on a base table keyed by tweet_id, with the view keyed by user_id. Engine maintains. Read path identical to Option A. Write path simpler (no application fan-out). Risk: the view's eventual consistency under replica failures.

Pick based on the access pattern. For the "user's own timeline" view, Option B is almost always right: the 100 tweets share one author, so the second query is a single lookup, not a per-row lookup. For a multi-user timeline ("home feed" aggregating tweets from everyone Priya follows), the join becomes more expensive — 100 tweets, 100 distinct authors — and batched application-side join or Option A denormalisation becomes more attractive.

Notification feed design

A consumer application has to display a notification feed: "Priya liked your tweet", "Arjun replied to your comment", "Dev followed you". Each notification has an actor (the user performing the action), a subject (the thing the action targeted), and a timestamp. Read pattern: "show me the last 50 notifications", called on every app open.

Primary table:

CREATE TABLE notifications_by_user (
    user_id     bigint,
    created_at  timestamp,
    notif_id    uuid,
    type        text,
    actor_id    bigint,
    subject_id  bigint,
    PRIMARY KEY ((user_id), created_at, notif_id)
) WITH CLUSTERING ORDER BY (created_at DESC, notif_id DESC);

For display, the UI needs each actor's display name and avatar. Three options, same as before.

Denormalised: include actor_display_name and actor_avatar in notifications_by_user. Write fan-out on rename: every notification that references the renamed user across every recipient's partition. For a user with many followers, this is tens of thousands of updates per rename. Expensive but rare.

Application-side: fetch 50 notifications, extract the set of distinct actor IDs (often 10-30 distinct actors for 50 notifications), batch-fetch with WHERE actor_id IN (...), merge in memory. Two round trips, no write fan-out.

Materialised view: not helpful here — the base table is already keyed the way we want to read.

For notifications, application-side wins comfortably. Renames are rare, the dimension set (distinct actors per 50 notifications) is small, and the write fan-out cost of denormalisation on a per-user basis would be punishing for popular accounts. The second query, batched, adds perhaps 3 ms to an 8 ms primary query — an invisible cost in a mobile app.

Four patterns for replacing a JOIN in wide-columnFour side-by-side panels showing denormalisation, materialised views, application-side joins, and external compute, each with a schematic of what goes over the wire and when.Denormalise(fan-out on write)Read: 1 partition fetchWrite: N rows per updateLatency: 1 RTT (fast)Joined fields rarelychange; reads dominant.Example:tweets(user_name, ...)Materialised view(engine maintained)Read: 1 partition fetchWrite: engine fan-outLatency: 1 RTT (fast)Cassandra: fragile.Cockroach/Dynamo: OK.Example:DynamoDB GSIApp-side join(two queries, merge)Read: 2 partition fetchesWrite: 1 row, 1 placeLatency: 2 RTT (fine)Flexible, any schema.No fan-out on rename.Example:tweet + users lookupExternal compute(Spark/Flink)Read: full scanWrite: batch outputLatency: minutesAnalytics only.Separate cluster.Example:nightly report
The four patterns in one view, ordered by the time they pay their cost. Denormalisation front-loads cost at write time so reads are single-partition. Materialised views defer the same logic to the engine at the price of tighter consistency guarantees. Application-side joins push the logic into the service code and accept one extra round trip. External compute punts the join to a separate cluster on a slower schedule. Any real system uses several of them, picking per access path.

Common confusions

Going deeper

Cassandra secondary indexes and SASI

Cassandra's CREATE INDEX creates a per-node inverted lookup from an indexed column to the local partition keys that contain it. Queries using the index still fan out to every node, so they scale sub-linearly. SASI (SSTable-Attached Secondary Index, Cassandra 3.4+) adds per-SSTable indexes with better support for prefix and range queries, but preserves the per-node fan-out model. For high-cardinality equality lookups over a small fraction of rows, SASI can be acceptable; for low-cardinality or table-wide queries, it is still roughly a full scan. The production advice remains: a second table is almost always better than an index.

DynamoDB GSI vs LSI

DynamoDB's Local Secondary Index (LSI) shares the base table's partition key but offers a different sort key — useful when you need a second sort dimension within a partition. LSIs must be declared at table-create time, share the partition's 10 GB size limit with the base, and are strongly consistent with the base (reads can explicitly request strong consistency). Global Secondary Indexes (GSIs) can have a different partition key and a different sort key entirely, are maintained asynchronously, are eventually consistent with the base, and can be added or dropped after the table exists. GSIs are the general-purpose "second table for a second access pattern" tool; LSIs are the narrow case where only the sort order changes.

Spark-Cassandra-Connector architecture

The connector uses Cassandra's token-range information to parallelise reads: each Spark partition maps to a token range, and the executor reads directly from the replica that owns that range (token-aware routing). Writes use asynchronous batched inserts with configurable consistency levels. The connector is read-efficient because each token range is sequential on disk, and write-efficient because each batch targets one partition's replica set, avoiding coordinator fan-out. Crucially, running the connector at OLTP-scale throughput against a production cluster will evict the hot-partition cache; production deployments point Spark at a dedicated analytics replica or a data-lake copy.

The data lake pattern

Modern large-scale architectures couple a wide-column OLTP store (Cassandra or DynamoDB) with a columnar analytics store (Snowflake, BigQuery, Databricks Delta Lake) via a stream (Kafka, Kinesis). Every write to Cassandra is also published to Kafka; a stream processor (Flink, Kafka Streams) transforms and writes into the analytics store in near-real-time. Joins that are impossible in Cassandra become trivial in the analytics store, which was built for exactly that pattern. The two stores speak the same logical data model but have disjoint query capabilities. The split between OLTP and OLAP at the storage layer is older than wide-column — it goes back to Kimball-style data warehousing in the 1990s — but the tight streaming coupling is new, and it is what makes the wide-column "no joins" constraint tolerable in a modern stack.

Google F1 and Spanner — distributed SQL joins

Google's F1 and Spanner implement full SQL, joins included, on top of a globally distributed storage layer. They do this by performing shuffles and broadcasts at query time and by carefully choosing which joins can be pushed down to co-located partitions (when two tables share a partition key, the join can run locally per partition without any shuffle). CockroachDB, which is modelled on Spanner, uses the same strategy. The lesson is that distributed joins can be done — the cost is latency (seconds to tens of seconds for large joins) and query-planner complexity. Cassandra chose the other horn of the trade: no joins in the engine, push the equivalent work to the schema design or the application. Neither choice is wrong; they serve different workloads.

Where this leads next

Chapter 90 is CQL — SQL-shaped but not relational, which walks through the CQL grammar and its deliberate omissions (no JOIN, no subquery, no arbitrary WHERE). Chapter 91 covers sharding strategies: consistent hashing recap, range-based sharding, hot-spot mitigation. The pattern you learned here — denormalise to make joins disappear — is load-bearing for everything in the remainder of Build 11, because every table you design from now on will be shaped by an access pattern and will carry the denormalised columns that pattern needs.

References

  1. Apache Software Foundation, CQL Data Manipulation: SELECT — the official CQL reference, including the explicit absence of a JOIN clause and the restrictions on WHERE clauses. Worth reading alongside the partition-key rules from Chapter 86.
  2. Apache Software Foundation, Materialized Views in Cassandra — the canonical docs on CREATE MATERIALIZED VIEW, including the current experimental-status warnings and the known limitations around replica failures and view repair.
  3. Amazon Web Services, Using Global Secondary Indexes in DynamoDB — the DynamoDB GSI reference, covering the eventual-consistency semantics, the write-capacity cost of maintaining indexes, and the design patterns for multi-key access.
  4. Kleppmann, Designing Data-Intensive Applications, Chapter 6 — Partitioning, O'Reilly 2017 — chapter 6 covers partitioning and its consequences for joins; chapter 10 covers batch processing with MapReduce-style systems and the shuffle/broadcast join algorithms that Spark uses.
  5. DataStax, Spark Cassandra Connector Documentation — the architecture and usage of the Spark-Cassandra connector, including the token-range parallelism model and the configuration knobs for batch vs streaming workloads.
  6. Bacon et al., Spanner: Becoming a SQL System, SIGMOD 2017 — the Google paper describing how Spanner implements distributed SQL with joins on top of its globally distributed storage, including the shuffle and co-location optimisations that let the query planner avoid full shuffles when the partition keys align.