Compaction: small-files hell and how to avoid it
Kiran's stream from the BookMyShow ticketing pipeline writes one Parquet file per Kafka micro-batch — 30 seconds of events, about 4 MB per file. After a week the table has 20,000 files. After a month, 86,000. The dashboards that used to return in 800 ms now take 11 seconds. Nothing about the data has changed; nothing about the queries has changed; nothing about the cluster has changed. What changed is the count of files the planner has to enumerate, the count of S3 GETs the executor has to issue, and the count of TCP connections the readers have to multiplex. This is small-files hell, and it is the most predictable failure mode of streaming-into-a-lakehouse.
Object stores are not filesystems — every file the planner reads costs at least one round-trip to list it and another to open it, and the per-file overhead dominates query cost when files are small. Compaction is the maintenance job that merges many small files into a few large ones (~256 MB to 1 GB), reducing planner overhead and S3 cost. The hard parts are picking what to compact, doing it without breaking concurrent writers, and knowing when to stop.
Why small files hurt: the cost model is in the metadata, not the bytes
Reading 1 GB out of S3 costs roughly the same regardless of whether it lives in one file or 1,000 files — the transfer price is per byte and the wall time is dominated by sustained network throughput. So why is small-files hell a real problem? Because the planner cost is per-file, and the per-file cost is large.
For each file the engine touches, it has to: (1) issue a LIST to discover it (or read a manifest entry that mentions it); (2) issue a HEAD or GET to read the Parquet footer (the last ~8 KB of the file holds the schema, row groups, and column statistics); (3) optionally issue another GET for column-statistics-based pruning before deciding to read row data. That is two to three S3 round-trips per file, and on ap-south-1 from EMR each round-trip is around 25-40 ms.
Why the per-file overhead does not disappear with parallelism: a 200-executor Spark cluster reads 1,000 files in 5 batches of 200, but each executor still pays the round-trip latency for every file it owns. You traded serial 60 seconds for parallel 5 × (200 × 30 ms) = 30 seconds of pure metadata, before any actual byte read happens. And you've now consumed 200 task slots on metadata work that produces zero output rows.
There is a second cost that is less visible: the manifest tree itself bloats. An Iceberg manifest entry per data file is roughly 1 KB. 100,000 files = 100 MB of manifests the planner reads on every query, even with predicate pushdown. The manifest list above it is also longer. The planning phase, which used to be 50 ms, becomes 800 ms — and that hits every query, including ones that ultimately read three rows.
There is a third cost that hits the writer side: commits get slow. A new Iceberg snapshot has to add the new files to the manifest tree, copy forward the old entries, and atomically swap the catalog pointer. With many small files, the manifest copy-forward cost grows. A streaming writer that commits every 30 seconds with a 100,000-file-table now spends 4 seconds on the metadata write — longer than the actual data write.
What compaction actually does
Compaction is the maintenance operation that takes a set of small files belonging to one logical group (a partition, a bucket, a Z-order range) and rewrites them as a smaller set of larger files. The output files have the same schema and the same partition values; the only thing that changed is the file boundary.
Mechanically, on a modern table format (Iceberg, Delta, Hudi), compaction is:
- Pick a target group — most often "all files in a partition smaller than
target_file_size_bytes / 2". - Read them concurrently, decode the Parquet, optionally apply pending row-level deletes (for merge-on-read tables; see Build 12).
- Re-write the rows into N output files, each close to the target size (commonly 256 MB or 512 MB; some shops go to 1 GB).
- Commit an atomic metadata operation that swaps the old files for the new files in the manifest. The old files get dereferenced and become eligible for
EXPIRE_SNAPSHOTScleanup later.
The atomic swap is the part that makes compaction safe to run concurrently with writers and readers. A reader that started before the compaction commit sees the old files (its snapshot is pinned). A reader that starts after sees the new files. No reader ever sees a mix. The swap is a metadata-only operation — the actual byte rewrite happened in step 3, but step 4 is the moment the change becomes visible.
Why this is a swap and not an in-place edit: object stores like S3 do not support partial updates of an object — you either write a new object or replace the whole one. So compaction is forced into a "write the new files first, then atomically point the manifest at them" pattern, which is also exactly the pattern that gives you snapshot isolation for free. The two properties (no in-place edits, snapshot isolation) come from the same constraint.
What compaction is not:
- It is not a content change. Rows are not re-ordered (unless you also asked for Z-ordering or sort), filtered, or modified.
- It is not free. The bytes are read once, decoded, re-encoded (Parquet's Snappy compression is asymmetric — encode is slow), and written back. On a 1 TB partition, this is about 2 hours on a 100-worker Spark cluster.
- It is not idempotent in the cheap sense. Running it twice does the work twice. Shops gate it on "is there enough small-file pressure to make this worth the rewrite cost?"
Picking what to compact, and when
The naïve "compact everything below 256 MB" rule looks fine on a whiteboard and is wrong in production. Here is the cost model that drives a real compaction policy:
- Per-partition file count. A partition with 8 files of 32 MB each has 256 MB of data spread across 8 files; one rewrite pass yields one 256 MB file. The work is small; the planner gain is large. Compact.
- Per-partition smallest file size. A partition with 2 files of 200 MB each has the same data volume but is already at the target size; rewriting yields the same file count. Don't compact.
- Time since last write. A partition that received 600 small files in the last hour is "hot" — wait, more files are coming. A partition that received its last file 24 hours ago is "cold" — compact now.
- Read frequency. A partition that has not been queried in 90 days does not benefit from compaction. The planner-overhead cost was hypothetical.
Iceberg's RewriteDataFiles action exposes these as policy parameters: min-input-files, target-file-size-bytes, max-file-group-size-bytes, partial-progress.enabled. Delta has OPTIMIZE with WHERE predicates. Hudi has the inline-vs-async clustering knob.
A common production policy that works on Razorpay-scale payments tables: run compaction nightly per partition with dt < today AND num_files > 8 AND avg_file_size < 64 MB. The "today" exclusion is critical — you do not compact a partition that is still receiving writes, because you'll be playing whack-a-mole all day.
A working compaction job
Below is a compaction loop that picks the most-fragmented partition, reads its files, rewrites them into one or two large files, and emits a metadata-style commit log. It uses pyarrow for the actual rewrite and a JSON file as a stand-in for the table format's manifest. The cost model in the comments is what a real Iceberg RewriteDataFiles action evaluates.
# compact.py — pick a partition, merge small files into target-sized ones.
import os, json, glob, time, shutil
from collections import defaultdict
import pyarrow.parquet as pq
import pyarrow as pa
ROOT = "/tmp/lake/payments"
TARGET_BYTES = 256 * 1024 * 1024 # 256 MB
MIN_INPUTS = 4 # only compact if at least 4 small files
SMALL_THRESH = TARGET_BYTES // 4 # 64 MB
def scan_partitions(root):
by_part = defaultdict(list)
for f in glob.glob(f"{root}/dt=*/part-*.parquet"):
part = os.path.basename(os.path.dirname(f)) # e.g. "dt=2026-04-24"
size = os.path.getsize(f)
by_part[part].append((f, size))
return by_part
def pick_target(by_part):
# Score = (num_small_files, total_small_bytes); higher first.
candidates = []
for part, files in by_part.items():
small = [(f,s) for f,s in files if s < SMALL_THRESH]
if len(small) >= MIN_INPUTS:
candidates.append((part, small,
len(small),
sum(s for _,s in small)))
candidates.sort(key=lambda x: (-x[2], -x[3]))
return candidates[0] if candidates else None
def compact(part, files):
# Read all small files into one Arrow table, then split-write to TARGET.
tables = [pq.read_table(f) for f,_ in files]
big = pa.concat_tables(tables)
rows_total = big.num_rows
bytes_est = sum(s for _,s in files)
rows_per_out = max(1, int(rows_total *
TARGET_BYTES / max(1, bytes_est)))
out_dir = f"{ROOT}/{part}"
out_idx, ts = 0, int(time.time())
for start in range(0, rows_total, rows_per_out):
chunk = big.slice(start, rows_per_out)
out = f"{out_dir}/compact-{ts}-{out_idx:04d}.parquet"
pq.write_table(chunk, out, compression="snappy")
out_idx += 1
# "Commit": move old files to a recycle bin atomically-ish.
bin_dir = f"{ROOT}/_recycle/{ts}/{part}"
os.makedirs(bin_dir, exist_ok=True)
for f,_ in files:
shutil.move(f, f"{bin_dir}/{os.path.basename(f)}")
return {"partition": part, "in_files": len(files),
"in_bytes": bytes_est, "out_files": out_idx,
"out_target_bytes": TARGET_BYTES,
"ts": ts}
if __name__ == "__main__":
parts = scan_partitions(ROOT)
pick = pick_target(parts)
if not pick:
print("nothing to compact"); raise SystemExit(0)
part, files, _, _ = pick
result = compact(part, files)
print(json.dumps(result, indent=2))
# Sample run on a partition with 12 files × 8 MB each:
{
"partition": "dt=2026-04-24",
"in_files": 12,
"in_bytes": 100663296,
"out_files": 1,
"out_target_bytes": 268435456,
"ts": 1714056234
}
The lines that matter: SMALL_THRESH = TARGET_BYTES // 4 is the policy lever — only files smaller than a quarter of the target are even considered. pick_target sorts by (num_small_files, total_small_bytes); this prioritises the most-fragmented partition first, which is the right answer when your maintenance window is bounded. rows_per_out = rows_total * TARGET_BYTES / bytes_est estimates how many rows produce one output file at the target size, assuming similar compression ratio across input files. Why the rows-per-output formula is an estimate, not a guarantee: Parquet's compressed size depends on the data distribution within each file. Compacting files with very different cardinalities can produce output files that are 30% smaller or larger than the target. Real implementations iterate — they measure the first written file, adjust rows_per_out, and continue.
The "commit" here is faked — moving files to a recycle directory simulates the dereference-and-cleanup pattern. A real Iceberg compaction issues a replace_data_files operation against the catalog, which is one atomic metadata write. Concurrent readers see either the old files or the new files, never a mix.
What can go wrong
Compaction is one of the operations that "looks safe" but has sharp edges in production. The patterns:
- Compaction blocked by a long-running query. If a reader holds a snapshot from an hour ago, the old files cannot be expired even after a successful compaction. They become "dereferenced but not deleted" — still in S3, costing storage. The cleanup happens on
EXPIRE_SNAPSHOTS, which respects the oldest live reader. A query that runs all night in a BI tool can pin storage that should have been freed. - Skewed input partitions. Compacting a partition where one file is 8 GB and 30 files are 4 MB each gets you a single 8.1 GB output (the 8 GB file plus the merged 30) — which is over your target. You wanted to leave the 8 GB file alone.
- Schema drift between input files. If new columns were added between the time files were written, the merged Parquet schema is the union, and the older files have those columns as
NULL. Iceberg handles this via field-id projection; rawpyarrow.concat_tableswithpromote=Truemostly handles it but rejects type mismatches. - Pending row-level deletes. On a merge-on-read Iceberg or Hudi table, compaction has to apply all the pending deletes before re-writing. Skip this and you re-introduce deleted rows. The Iceberg
RewriteDataFilesaction does it correctly; a hand-rolled compactor like ours above does not. - Storage explosion during the rewrite. While compaction is in flight you have both old and new files on disk. Plan for ~2× peak storage on the partition being compacted, not 1×. On a 200 GB partition this is 400 GB of momentary S3 footprint.
Common confusions
- "Compaction reduces my storage." It does not, by itself — it rewrites the same bytes (with potentially better compression if the input files had under-filled column dictionaries). The storage drop happens later, when
EXPIRE_SNAPSHOTSdeletes the dereferenced old files. Until then, you have more storage during the compaction window. - "More files = more parallelism = better." Up to a point. With 200 executors and 200 files, parallelism is perfect. With 200 executors and 200,000 files, every executor processes 1,000 files and pays the per-file overhead 1,000 times. There is a sweet spot — usually around (executors × 2–4) files per partition.
- "Compaction is the same as Z-ordering." No. Compaction merges small files; Z-ordering reorders rows within files for data-skipping on multi-column predicates. They are often run together (
OPTIMIZE ... ZORDER BY), but they are different operations. Z-order without compaction does nothing for small-files cost. - "Streaming sinks should write 256 MB files directly to avoid compaction." They cannot — a stream commits a checkpoint-sized batch (often 30 seconds of data, a few MB), and waiting longer breaks freshness SLAs. The two-stage pattern (small files now, compaction later) is the design, not a workaround.
- "OPTIMIZE on Delta and RewriteDataFiles on Iceberg do the same thing." Mostly, but not quite. Delta's
OPTIMIZEalways uses a fixed target (1 GB) and is bin-packing only. Iceberg'sRewriteDataFilesexposes the strategy (binpackorsort) and target as parameters. Hudi splits compaction into "clustering" (rewrite) and "compaction" (apply log files to base files) — the words mean different things across formats. - "You can run compaction every 5 minutes for fresh data." You can run it; you should not. Each compaction commit creates a new snapshot. 5-minute compaction means 288 snapshots a day, which makes time-travel queries and snapshot expiry significantly more expensive. Daily or 4-hourly is the typical cadence for a streaming-fed table.
Going deeper
Bin-packing vs sort-based compaction
Iceberg's RewriteDataFiles supports two strategies. Bin-packing is the default: pick small files, merge them, write outputs at target size. Order-within-file is preserved (mostly, by ingestion order), which is fine for queries that scan whole partitions. Sort-based compaction additionally sorts each output file by a configured key — usually the same column you would Z-order on. This makes data-skipping work better on subsequent reads (column min/max statistics tighten), at the cost of a much more expensive rewrite (you are sorting potentially terabytes). The decision is: do queries on this table predicate on a high-cardinality column? If yes, sort-based pays. If no, bin-packing.
Dremio-style file batching at write time
Some systems (Dremio Sonar, recent Spark with adaptive execution) try to write the right-sized files in the first place by batching at write time. The streaming sink buffers a target-size's worth of rows in memory before flushing one file, instead of flushing every micro-batch. This trades freshness (the last unflushed batch is invisible to queries until it lands) for fewer compactions. For a payments analytics table where queries care about minutes-old data, the trade-off works. For a fraud-detection table where queries care about second-old data, it does not. The general rule: write-time batching solves small files when freshness budget allows; compaction solves it when freshness budget does not.
Compaction + GDPR delete: the interaction Aditi missed
When a user requests their data deleted (DPDPA in India, GDPR in EU), the lakehouse's job is to find every row matching that user-id and remove it. On a copy-on-write table this is "rewrite the partition without those rows". On a merge-on-read table this is "log a position-delete pointing at those rows; apply on read or at next compaction". Compaction now has a new responsibility: when it merges files, it must apply all pending deletes to the input rows, or those rows will reappear in the merged output. This is why a hand-rolled compactor is rarely production-safe — it is easy to forget the delete-application step, and the consequence is a regulatory breach, not just a query bug.
Why compaction is the failure mode of streaming-into-lakehouse
Most teams that move from a daily-batch ETL onto a streaming-into-Iceberg architecture hit the same wall around month 4. The first month, data lands and dashboards work. By month 4, file counts are in the millions, query latencies have crept up, and storage cost is mysteriously double what the data volume suggests. The fix is a compaction policy and a EXPIRE_SNAPSHOTS policy. The painful part is realising the team should have set both up on day 1 — the cost of retrofitting them on a 50 TB table that has been written to for four months is itself a multi-day rewrite. Set up compaction before you turn on streaming, not after.
Where this leads next
Compaction is one of the maintenance jobs that production lakehouses run alongside ingestion. The companion job, EXPIRE_SNAPSHOTS, removes the dereferenced old files compaction leaves behind — and is a separate sharp edge worth its own chapter at /wiki/snapshot-expiry-and-time-travel-cost. The next chapter on the build path, /wiki/iceberg-delta-hudi-from-the-producers-perspective, compares the three formats on every dimension that matters for a producer — including how each one wires up compaction.
If you want the read-side mirror of this chapter (how the planner uses the compaction's output via column statistics and partition pruning), read /wiki/parquet-end-to-end-what-you-write-what-you-get-back and the upcoming chapter on Z-ordering and data skipping.
References
- Apache Iceberg: RewriteDataFiles — the canonical compaction action, with all the policy parameters (target size, min input files, partial progress, sort/binpack strategy).
- Delta Lake: OPTIMIZE — Delta's compaction with bin-packing and Z-order options.
- Apache Hudi: clustering and compaction — Hudi's distinct treatment of file rewrite (clustering) vs delta-log apply (compaction).
- Netflix: Iceberg in production — the classic write-up on how Netflix runs compaction at trillion-row scale, including the interaction with snapshot expiry.
- Tabular: Why does my query slow down? (small files) — a series on operationalising Iceberg, with a clear breakdown of the planner-overhead model when file counts grow.
- Flipkart Engineering: streaming-to-lakehouse migration — Indian-context production write-up that explicitly calls out the compaction policy as the longest-tail debugging item in the cutover.
- /wiki/partition-evolution-and-the-rename-problem — the previous chapter; explains why splitting a partition into 64 buckets makes small-files cost worse before it gets better.
- /wiki/parquet-end-to-end-what-you-write-what-you-get-back — what the planner reads from each file's footer; explains the per-file overhead from the format side.