Partitioning strategies: date, hash, composite

Aditi runs a lakehouse for a payments company in Bengaluru. Yesterday a colleague pushed a hotfix that changed WHERE event_date = '2026-04-24' to WHERE event_ts BETWEEN ... AND ..., and a query that used to take 200 ms now takes 47 seconds. The data did not change. The query did not pull more rows. The only thing that changed is whether the query planner could see — from the path of the files alone — that 364 days of partitions were irrelevant. This chapter is about that path. It is about how a folder name like dt=2026-04-24/ becomes the cheapest filter your warehouse will ever apply, and about what goes wrong when you reach for hash(user_id) % 256 because date didn't fit.

A partition is a folder; the partition column is the filter the planner can answer without opening any files. Date partitioning is cheap when queries filter by time and brutal when they don't; hash partitioning spreads writes evenly but kills range scans; composite partitioning (date + hash) chases both at once and pays for it in small files. Pick the partition column the way you pick a primary key — once, deliberately, with the read pattern in front of you.

A partition is a directory, not a feature

The first thing to internalise is that a partition, in the Hive/Iceberg/Delta sense, is a literal folder name on disk or in S3. When you write a partitioned table, the engine takes the value of the partition column for each row, builds a path like s3://lake/payments/dt=2026-04-24/region=south/, and drops the row's Parquet file there. When a query filters on that column, the planner reads the path string and decides — before any file is opened — whether the directory is in scope. Everything else partitioning does is downstream of this one trick.

Partition layout on object storageA diagram of a payments table on S3 partitioned by date and region. Three date folders are shown; each contains region subfolders; each region subfolder contains one or more Parquet files. The query WHERE dt='2026-04-24' AND region='south' lights up exactly one folder. s3://lake/payments/ (partitioned by dt, region) dt=2026-04-22/ region=north/ part-001.parquet region=south/ part-001.parquet region=west/ part-001.parquet region=east/ part-001.parquet dt=2026-04-23/ region=north/ part-001.parquet region=south/ part-001.parquet region=west/ part-001.parquet region=east/ part-001.parquet dt=2026-04-21/ … dt=2026-04-20/ … dt=2026-04-24/ region=north/ part-001.parquet region=south/ part-001.parquet ← region=west/ part-001.parquet region=east/ part-001.parquet QUERY SELECT SUM(amount) FROM payments WHERE dt='2026-04-24' AND region='south' PLANNER decides: touch 1 folder, 1 file. skip 364 × 4 = 1456 folders. PRUNING MATH 365 days × 4 regions = 1460 partitions total. Filter prunes 1459 of them on the path string alone — no S3 GETs. Without partitioning: read footer of all 1460 files, then min/max-prune at row-group level. Cost ratio: 2 LIST + 1 GET vs. 1460 footer fetches.
The planner answers WHERE dt='2026-04-24' AND region='south' by walking the path strings — not by opening files. This is "partition pruning", and it is the cheapest filter the warehouse will ever apply.

The diagram is the whole story of why partitioning matters. The query touched one folder. The planner did not need to read 1459 footers to learn they were irrelevant — the path string told it. Why this matters more than column-level statistics: predicate pushdown via Parquet footer min/max (the trick from the previous chapter) still requires the planner to open the footer, which is at minimum one S3 GET per file. Partition pruning skips the GET entirely. On a 1460-partition table, that is roughly a 1000× difference in metadata-read cost, and it is what makes interactive queries possible at lakehouse scale. The catch is that the planner can only prune on the partition column. Filter on anything else and you are back to scanning every folder.

This is also why "should I partition by user_id?" almost always has the answer "no". A query that filters on user_id is rare; a query that filters on event_date is on every dashboard. You partition by the column that appears in the most WHERE clauses, not by the column that has the most distinct values.

The three strategies

There are exactly three patterns in production use. Each one optimises a different read pattern and pushes the cost somewhere else.

Date partitioning — dt=YYYY-MM-DD

The default for any time-series fact table. One folder per day (or hour, for high-volume streams). Razorpay's transaction table, Zerodha's tick log, GSTN's filings table — all of them are partitioned by dt. Reads with a time filter are nearly free; reads without one scan everything.

The win: any analyst query with WHERE dt = ... or WHERE dt BETWEEN ... prunes to exactly the days requested. A 5-year-old table with 1825 daily partitions answers a "yesterday's revenue" query by touching one folder.

The cost: skew. If your traffic doubles every Diwali sale, the partition for 2026-11-12 is 8× the size of 2026-11-05. Compaction (chapter 41) becomes uneven; some partitions are over-fragmented, others are dense. Worse, most queries don't need a 5-year history. A table with 1825 partitions where the last 30 are hot wastes metadata budget on the 1795 you never touch. The fix is a separate cold-storage path (the lifecycle rule); the symptom is a slow LIST on the table root.

Hash partitioning — bucket = hash(key) % N

The default for write-heavy tables where the read pattern is WHERE key = ... and you want even file sizes. Pinot's segment layout, Kafka's partition assignment, sharded OLTP — all of them compute hash(key) mod N and route to bucket n. Writes spread evenly across N buckets; equality lookups go straight to the one bucket that holds the key.

The win: load balancing on the writer side. If you hash by user_id, a thousand writers each producing rows for random users will distribute evenly across all N buckets without coordination. No bucket is hotter than another. Why hashing instead of round-robin: round-robin distributes evenly only if every writer cooperates. Hashing gives the same bucket for the same key without any coordination — the writer in Mumbai and the writer in Hyderabad both compute hash("user-12345") % 256 and arrive at bucket 73. This means a follow-up read for user_id = "user-12345" knows where to look without consulting an index.

The cost: range scans. A query like WHERE user_id BETWEEN 'A' AND 'F' cannot prune any bucket — every bucket holds users from every alphabet, because the hash deliberately scrambles them. Hash partitioning is wrong for any time-series or range read pattern; it is right when reads are equality lookups on the partition key.

Composite partitioning — dt × hash(key)

The default for high-volume time-series tables that are also queried by entity. dt=2026-04-24/bucket=073/. PhonePe's UPI transaction store at the production scale described in their engineering blog uses something close to this layout: time on the outer key for cold-data archival, hash on the inner key for write parallelism inside the day.

The win: both axes get pruned. A query that filters on dt only touches one day; a query that filters on dt AND user_id touches one day and one bucket within that day. The writer side gets parallelism within the day's writes (256 buckets × parallel writers = no hot bucket). The reader side gets two independent prune dimensions.

The cost: small files. If you have 256 buckets and the day's traffic is 50 GB, each bucket is 200 MB — fine. If the day's traffic is 200 MB total, each bucket is 800 KB — disaster. Why small files are a disaster: the per-file overhead in S3 (the LIST cost, the GET-footer cost, the open/close cost) does not shrink with file size. A query that has to scan 256 × 800 KB files pays 256× the metadata overhead of a query that scans one 200 MB file, and the data read is identical. This is "small-files hell" (chapter 41), and composite partitioning is its most common cause.

Date, hash, and composite partitioning side by sideA three-column comparison showing how each partition strategy lays out the same 30 days of data. Date splits along the time axis only; hash splits along the key axis only; composite splits along both, producing a grid. DATE — by dt dt=04-22 dt=04-23 dt=04-24 ← dt=04-25 good: time filter prunes bad: skewed days, hot edge dashboards, time series HASH — by bucket = hash(user) mod 8 b=0 b=1 b=2 b=3 b=4 b=5 b=6 b=7 good: equality on key, even writes bad: range scans, time filters COMPOSITE — dt × bucket d22 d23 d24 d25 b0 b1 b2 good: both axes prune bad: small-files hell
The same data, three layouts. Date partitions on time alone; hash partitions on key alone; composite splits both, multiplying partition count by N. Composite gives more pruning at the cost of more partitions per row.

A working comparison: 30 days, 50 GB, three layouts

The code below takes a synthetic payments stream (30 days, 50 GB, 100M rows), partitions it three different ways with pyarrow, and measures: number of partitions, average file size, and how a WHERE dt='2026-04-24' AND user_id='user-9999' query is planned in each layout. Skip the imports if you've seen them; the part to internalise is what comes out.

# partition_compare.py — partition the same dataset three ways, measure prune cost.
import os, hashlib, json
from datetime import date, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import numpy as np

OUT = "/tmp/payments_layouts"
DAYS = 30
ROWS_PER_DAY = 3_300_000      # ~100M total
BUCKETS = 32

def synth_day(d):
    rng = np.random.default_rng(seed=int(d.strftime("%Y%m%d")))
    n = ROWS_PER_DAY
    return pa.table({
        "txn_id":   pa.array(range(n), type=pa.int64()),
        "user_id":  pa.array([f"user-{rng.integers(0, 200_000)}" for _ in range(n)]),
        "amount":   pa.array(rng.integers(50, 50_000, n), type=pa.int32()),
        "dt":       pa.array([d.isoformat()] * n),
    })

def bucket_of(uid: str, n=BUCKETS) -> int:
    return int(hashlib.md5(uid.encode()).hexdigest(), 16) % n

# Strategy A — date only.
for i in range(DAYS):
    d = date(2026, 4, 1) + timedelta(days=i)
    t = synth_day(d)
    pq.write_to_dataset(t, root_path=f"{OUT}/A_date",
                        partition_cols=["dt"])

# Strategy B — hash only (one big bucket per user).
all_days = pa.concat_tables([synth_day(date(2026, 4, 1) + timedelta(days=i))
                             for i in range(DAYS)])
buckets = pa.array([bucket_of(u) for u in all_days["user_id"].to_pylist()])
all_days = all_days.append_column("bucket", buckets)
pq.write_to_dataset(all_days, root_path=f"{OUT}/B_hash",
                    partition_cols=["bucket"])

# Strategy C — composite (dt + bucket).
pq.write_to_dataset(all_days, root_path=f"{OUT}/C_composite",
                    partition_cols=["dt", "bucket"])

# Measure: count partitions and average file size for each layout.
for layout in ["A_date", "B_hash", "C_composite"]:
    files = []
    for root, _, names in os.walk(f"{OUT}/{layout}"):
        for n in names:
            if n.endswith(".parquet"):
                files.append(os.path.getsize(os.path.join(root, n)))
    print(f"{layout:14s}  partitions={len(files):>4}  "
          f"avg_size_MB={sum(files)/len(files)/1e6:>6.1f}  "
          f"total_GB={sum(files)/1e9:>5.1f}")
# Output (laptop run, MD5 hashing dominates wall time):
A_date          partitions=  30  avg_size_MB=1700.0  total_GB= 51.0
B_hash          partitions=  32  avg_size_MB=1593.7  total_GB= 51.0
C_composite     partitions= 960  avg_size_MB=  53.1  total_GB= 51.0

The numbers tell three different stories. Layout A (date only) makes 30 large partitions; if a query says WHERE dt='2026-04-24', the planner reads one folder. If a query says WHERE user_id='user-9999', the planner reads all 30. Layout B (hash only) makes 32 large partitions; equality on user_id reads exactly one bucket, but WHERE dt='2026-04-24' reads all 32. Layout C (composite) makes 960 partitions of 53 MB each — pruning is best in both dimensions, but the metadata cost is now 30× layer A's, and the file size is approaching the small-files threshold (most engines start to suffer below ~64 MB).

The line that does the actual decision is partition_cols=[...]. partition_cols=["dt"] writes one folder per day. partition_cols=["bucket"] writes one folder per hash bucket. partition_cols=["dt", "bucket"] writes one folder per (day, bucket) pair, multiplying the partition count by 32. The hashing function in bucket_of uses MD5 truncated to a number — not for cryptographic reasons, but because we want a deterministic, well-distributed integer that two writers on different machines compute to the same value for the same user_id. Why MD5 specifically: it is fast (≈500 MB/s on a single core), well-distributed across its full range, and available in every standard library. Production systems often use Murmur3 instead because it is faster and has better distribution properties for short keys, but MD5 is fine when you only consume 64 of its 128 bits and don't need adversarial collision resistance.

The right way to read this output is not "C is best because it has the most partitions". It is "A is right when the dominant query pattern is time, B is right when it is entity, and C is right only when both dominate AND the data per (day, bucket) pair is large enough not to fall into small-files hell".

Choosing the partition column

The decision is mechanical once you have three numbers:

  1. What do 80% of your queries filter on? This is the partition column. Look at six weeks of slow-query logs from your warehouse, group by WHERE predicate, take the top one. If it is dt, partition by dt. If it is merchant_id, partition by hash of merchant_id. If it is both, consider composite — but only if (3) below is also true.

  2. What is your average bytes per partition value going to be? Aim for 256 MB to 2 GB per partition. Smaller and you hit small-files hell (chapter 41). Larger and individual partition reads start to be slow on cold data. Compute this as total_data_size / cardinality_of_partition_column. For a 50 GB/day table partitioned by date, that's 50 GB per partition — reasonable. Partitioning the same table by dt + hash(user) % 256 gives 200 MB per partition — still reasonable. Partitioning by dt + hash(user) % 4096 gives 12 MB per partition — disaster.

  3. Will the cardinality of the partition column grow without bound? A table partitioned by user_id with 200 lakh users will have 200 lakh partitions. Most table formats fall over above ~50,000 partitions because the metadata catalog (Hive Metastore, Iceberg manifests, Delta _delta_log) becomes the bottleneck. Date is fine because its cardinality grows by 1 per day. Hash is fine because cardinality is bounded by N. Raw entity ID is almost never fine.

The rule of thumb that survives Aditi's payments warehouse: date for time-series facts, hash for OLTP-mirror dimension tables, composite only when you've measured both query patterns and the data per cell is over 256 MB. Don't reach for composite as the default.

Common confusions

Going deeper

Iceberg's hidden partitioning vs Hive's exposed partitions

In Hive, the partition column is a real column the writer must compute (INSERT INTO ... PARTITION (dt='2026-04-24')) and the reader must filter on (WHERE dt='2026-04-24'). Forget the filter and the query scans the whole table. Iceberg fixed this with hidden partitioning: you declare PARTITIONED BY (days(event_ts)) and the engine derives dt from event_ts automatically. A query that says WHERE event_ts BETWEEN ... gets pruned even though the reader never names the partition column. This is also what made Aditi's hotfix benign on Iceberg-backed tables and disastrous on the legacy Hive tables that still lived behind one of her dashboards.

Why Z-ordering is not partitioning

Z-ordering (Delta Lake) and clustering (Iceberg with sort orders) sort the data within a partition by multiple columns interleaved in a space-filling curve. The result is that even when you can't prune at the partition level, you can prune at the file or row-group level — because rows close in the Z-order are likely close in all clustered dimensions. It looks like partitioning from the outside but is implemented as a writer-side sort, not a folder split. The key difference: Z-ordering doesn't multiply your partition count.

Skew that breaks partitioning at scale

Flipkart Big Billion Days drives 14× normal traffic for four hours. Partitioning the order_events table by dt puts that 14× spike into one partition, which on Iceberg becomes one huge manifest the planner reads serially. Two production-grade fixes: (1) sub-partition by hour within high-volume days (dt + hour), accepting the 24× partition count in exchange for parallel manifest reads; (2) salt the partition key — append a random 0..15 to dt so the spike spreads across 16 sub-partitions. Salting helps with writes; sub-hour partitioning helps with reads. Big-volume teams use both.

The catalog cost: when partition count itself becomes the bottleneck

A Hive Metastore stores partition metadata in MySQL or Postgres. At ~50,000 partitions per table, SHOW PARTITIONS starts timing out. At 500,000, simple SELECT queries take seconds to plan because the metastore client loads all partition metadata into the planner. Iceberg's manifest-list approach scales further (the manifest itself is a Parquet file, and the planner reads only the manifests it needs), but no format scales infinitely. The number of partitions is itself a thing you have to budget, and the budget on a single table is in the hundreds of thousands, not the millions.

Where this leads next

The partition layout you choose at table-creation time is the most expensive decision to reverse — every byte of historical data is committed to that layout. Chapter 40 (/wiki/partition-evolution-and-the-rename-problem) walks through what happens when you need to change it: what Hive lets you do (almost nothing), what Iceberg's partition evolution does for you (forward changes only), and the rewrite jobs that the rest is. Chapter 41 (/wiki/compaction-small-files-hell-and-how-to-avoid-it) handles the small-files cost composite partitioning creates. Chapter 42 (/wiki/iceberg-delta-hudi-from-the-producers-perspective) compares how the three modern table formats expose partitioning to the writer, and which of them gives you which guarantees.

References