Compaction: small-files hell and how to avoid it

Kiran's stream from the BookMyShow ticketing pipeline writes one Parquet file per Kafka micro-batch — 30 seconds of events, about 4 MB per file. After a week the table has 20,000 files. After a month, 86,000. The dashboards that used to return in 800 ms now take 11 seconds. Nothing about the data has changed; nothing about the queries has changed; nothing about the cluster has changed. What changed is the count of files the planner has to enumerate, the count of S3 GETs the executor has to issue, and the count of TCP connections the readers have to multiplex. This is small-files hell, and it is the most predictable failure mode of streaming-into-a-lakehouse.

Object stores are not filesystems — every file the planner reads costs at least one round-trip to list it and another to open it, and the per-file overhead dominates query cost when files are small. Compaction is the maintenance job that merges many small files into a few large ones (~256 MB to 1 GB), reducing planner overhead and S3 cost. The hard parts are picking what to compact, doing it without breaking concurrent writers, and knowing when to stop.

Why small files hurt: the cost model is in the metadata, not the bytes

Reading 1 GB out of S3 costs roughly the same regardless of whether it lives in one file or 1,000 files — the transfer price is per byte and the wall time is dominated by sustained network throughput. So why is small-files hell a real problem? Because the planner cost is per-file, and the per-file cost is large.

For each file the engine touches, it has to: (1) issue a LIST to discover it (or read a manifest entry that mentions it); (2) issue a HEAD or GET to read the Parquet footer (the last ~8 KB of the file holds the schema, row groups, and column statistics); (3) optionally issue another GET for column-statistics-based pruning before deciding to read row data. That is two to three S3 round-trips per file, and on ap-south-1 from EMR each round-trip is around 25-40 ms.

Why small files hurt: per-file metadata overheadA side-by-side comparison. Left: 1,000 small files of 1 MB each. The planner issues 2 round-trips per file just to read footers (2,000 RTs), then reads the actual bytes. Right: 1 large file of 1 GB. The planner issues 2 round-trips total (footer plus one range read). Total cost annotations show 60-80 seconds for left vs under 2 seconds for right at ap-south-1 latency. Same 1 GB of data, two layouts, very different planner cost 1,000 files × 1 MB each LIST root → 1 RT read 1,000 footers → 1,000 RT read 1,000 row groups → 1,000 RT ~2,001 RT × 30 ms ≈ 60 seconds wall time + S3 GET cost: 2,000 × ₹0.0003 ≈ ₹0.6 per query, per executor (parallelism helps but does not erase) 1 file × 1 GB LIST root → 1 RT read 1 footer → 1 RT range-read row groups → 1 RT (parallel) ~3 RT × 30 ms ≈ < 200 ms wall time + S3 GET cost: 3 × ₹0.0003 ≈ negligible throughput-bound, not RT-bound
The bytes are identical. The wall-time cost is 300× different because S3 charges per object operation, not per byte.

Why the per-file overhead does not disappear with parallelism: a 200-executor Spark cluster reads 1,000 files in 5 batches of 200, but each executor still pays the round-trip latency for every file it owns. You traded serial 60 seconds for parallel 5 × (200 × 30 ms) = 30 seconds of pure metadata, before any actual byte read happens. And you've now consumed 200 task slots on metadata work that produces zero output rows.

There is a second cost that is less visible: the manifest tree itself bloats. An Iceberg manifest entry per data file is roughly 1 KB. 100,000 files = 100 MB of manifests the planner reads on every query, even with predicate pushdown. The manifest list above it is also longer. The planning phase, which used to be 50 ms, becomes 800 ms — and that hits every query, including ones that ultimately read three rows.

There is a third cost that hits the writer side: commits get slow. A new Iceberg snapshot has to add the new files to the manifest tree, copy forward the old entries, and atomically swap the catalog pointer. With many small files, the manifest copy-forward cost grows. A streaming writer that commits every 30 seconds with a 100,000-file-table now spends 4 seconds on the metadata write — longer than the actual data write.

What compaction actually does

Compaction is the maintenance operation that takes a set of small files belonging to one logical group (a partition, a bucket, a Z-order range) and rewrites them as a smaller set of larger files. The output files have the same schema and the same partition values; the only thing that changed is the file boundary.

Mechanically, on a modern table format (Iceberg, Delta, Hudi), compaction is:

  1. Pick a target group — most often "all files in a partition smaller than target_file_size_bytes / 2".
  2. Read them concurrently, decode the Parquet, optionally apply pending row-level deletes (for merge-on-read tables; see Build 12).
  3. Re-write the rows into N output files, each close to the target size (commonly 256 MB or 512 MB; some shops go to 1 GB).
  4. Commit an atomic metadata operation that swaps the old files for the new files in the manifest. The old files get dereferenced and become eligible for EXPIRE_SNAPSHOTS cleanup later.

The atomic swap is the part that makes compaction safe to run concurrently with writers and readers. A reader that started before the compaction commit sees the old files (its snapshot is pinned). A reader that starts after sees the new files. No reader ever sees a mix. The swap is a metadata-only operation — the actual byte rewrite happened in step 3, but step 4 is the moment the change becomes visible.

Why this is a swap and not an in-place edit: object stores like S3 do not support partial updates of an object — you either write a new object or replace the whole one. So compaction is forced into a "write the new files first, then atomically point the manifest at them" pattern, which is also exactly the pattern that gives you snapshot isolation for free. The two properties (no in-place edits, snapshot isolation) come from the same constraint.

What compaction is not:

Picking what to compact, and when

The naïve "compact everything below 256 MB" rule looks fine on a whiteboard and is wrong in production. Here is the cost model that drives a real compaction policy:

Iceberg's RewriteDataFiles action exposes these as policy parameters: min-input-files, target-file-size-bytes, max-file-group-size-bytes, partial-progress.enabled. Delta has OPTIMIZE with WHERE predicates. Hudi has the inline-vs-async clustering knob.

A common production policy that works on Razorpay-scale payments tables: run compaction nightly per partition with dt < today AND num_files > 8 AND avg_file_size < 64 MB. The "today" exclusion is critical — you do not compact a partition that is still receiving writes, because you'll be playing whack-a-mole all day.

Compaction lifecycle: small files merging into oneAn animated timeline showing 12 small files in a partition over 24 hours, then a compaction job at hour 25 merging them into 2 large files. The file count over time graph drops from 12 to 2. A partition's file count over 25 hours of streaming + 1 compaction 00:00 06:00 12:00 18:00 24:00 12 8 4 0 compaction commits streaming 12 small files compact & commit → 2 fat files file count grows linearly with stream batches
The shape that compaction policy is reacting to: file count grows linearly while a partition is being written, then drops sharply on the maintenance pass. The art is timing the drop.

A working compaction job

Below is a compaction loop that picks the most-fragmented partition, reads its files, rewrites them into one or two large files, and emits a metadata-style commit log. It uses pyarrow for the actual rewrite and a JSON file as a stand-in for the table format's manifest. The cost model in the comments is what a real Iceberg RewriteDataFiles action evaluates.

# compact.py — pick a partition, merge small files into target-sized ones.
import os, json, glob, time, shutil
from collections import defaultdict
import pyarrow.parquet as pq
import pyarrow as pa

ROOT          = "/tmp/lake/payments"
TARGET_BYTES  = 256 * 1024 * 1024     # 256 MB
MIN_INPUTS    = 4                      # only compact if at least 4 small files
SMALL_THRESH  = TARGET_BYTES // 4      # 64 MB

def scan_partitions(root):
    by_part = defaultdict(list)
    for f in glob.glob(f"{root}/dt=*/part-*.parquet"):
        part = os.path.basename(os.path.dirname(f))     # e.g. "dt=2026-04-24"
        size = os.path.getsize(f)
        by_part[part].append((f, size))
    return by_part

def pick_target(by_part):
    # Score = (num_small_files, total_small_bytes); higher first.
    candidates = []
    for part, files in by_part.items():
        small = [(f,s) for f,s in files if s < SMALL_THRESH]
        if len(small) >= MIN_INPUTS:
            candidates.append((part, small,
                               len(small),
                               sum(s for _,s in small)))
    candidates.sort(key=lambda x: (-x[2], -x[3]))
    return candidates[0] if candidates else None

def compact(part, files):
    # Read all small files into one Arrow table, then split-write to TARGET.
    tables = [pq.read_table(f) for f,_ in files]
    big    = pa.concat_tables(tables)
    rows_total = big.num_rows
    bytes_est  = sum(s for _,s in files)
    rows_per_out = max(1, int(rows_total *
                              TARGET_BYTES / max(1, bytes_est)))

    out_dir = f"{ROOT}/{part}"
    out_idx, ts = 0, int(time.time())
    for start in range(0, rows_total, rows_per_out):
        chunk = big.slice(start, rows_per_out)
        out = f"{out_dir}/compact-{ts}-{out_idx:04d}.parquet"
        pq.write_table(chunk, out, compression="snappy")
        out_idx += 1

    # "Commit": move old files to a recycle bin atomically-ish.
    bin_dir = f"{ROOT}/_recycle/{ts}/{part}"
    os.makedirs(bin_dir, exist_ok=True)
    for f,_ in files:
        shutil.move(f, f"{bin_dir}/{os.path.basename(f)}")

    return {"partition": part, "in_files": len(files),
            "in_bytes": bytes_est, "out_files": out_idx,
            "out_target_bytes": TARGET_BYTES,
            "ts": ts}

if __name__ == "__main__":
    parts = scan_partitions(ROOT)
    pick = pick_target(parts)
    if not pick:
        print("nothing to compact"); raise SystemExit(0)
    part, files, _, _ = pick
    result = compact(part, files)
    print(json.dumps(result, indent=2))
# Sample run on a partition with 12 files × 8 MB each:
{
  "partition": "dt=2026-04-24",
  "in_files": 12,
  "in_bytes": 100663296,
  "out_files": 1,
  "out_target_bytes": 268435456,
  "ts": 1714056234
}

The lines that matter: SMALL_THRESH = TARGET_BYTES // 4 is the policy lever — only files smaller than a quarter of the target are even considered. pick_target sorts by (num_small_files, total_small_bytes); this prioritises the most-fragmented partition first, which is the right answer when your maintenance window is bounded. rows_per_out = rows_total * TARGET_BYTES / bytes_est estimates how many rows produce one output file at the target size, assuming similar compression ratio across input files. Why the rows-per-output formula is an estimate, not a guarantee: Parquet's compressed size depends on the data distribution within each file. Compacting files with very different cardinalities can produce output files that are 30% smaller or larger than the target. Real implementations iterate — they measure the first written file, adjust rows_per_out, and continue.

The "commit" here is faked — moving files to a recycle directory simulates the dereference-and-cleanup pattern. A real Iceberg compaction issues a replace_data_files operation against the catalog, which is one atomic metadata write. Concurrent readers see either the old files or the new files, never a mix.

What can go wrong

Compaction is one of the operations that "looks safe" but has sharp edges in production. The patterns:

Common confusions

Going deeper

Bin-packing vs sort-based compaction

Iceberg's RewriteDataFiles supports two strategies. Bin-packing is the default: pick small files, merge them, write outputs at target size. Order-within-file is preserved (mostly, by ingestion order), which is fine for queries that scan whole partitions. Sort-based compaction additionally sorts each output file by a configured key — usually the same column you would Z-order on. This makes data-skipping work better on subsequent reads (column min/max statistics tighten), at the cost of a much more expensive rewrite (you are sorting potentially terabytes). The decision is: do queries on this table predicate on a high-cardinality column? If yes, sort-based pays. If no, bin-packing.

Dremio-style file batching at write time

Some systems (Dremio Sonar, recent Spark with adaptive execution) try to write the right-sized files in the first place by batching at write time. The streaming sink buffers a target-size's worth of rows in memory before flushing one file, instead of flushing every micro-batch. This trades freshness (the last unflushed batch is invisible to queries until it lands) for fewer compactions. For a payments analytics table where queries care about minutes-old data, the trade-off works. For a fraud-detection table where queries care about second-old data, it does not. The general rule: write-time batching solves small files when freshness budget allows; compaction solves it when freshness budget does not.

Compaction + GDPR delete: the interaction Aditi missed

When a user requests their data deleted (DPDPA in India, GDPR in EU), the lakehouse's job is to find every row matching that user-id and remove it. On a copy-on-write table this is "rewrite the partition without those rows". On a merge-on-read table this is "log a position-delete pointing at those rows; apply on read or at next compaction". Compaction now has a new responsibility: when it merges files, it must apply all pending deletes to the input rows, or those rows will reappear in the merged output. This is why a hand-rolled compactor is rarely production-safe — it is easy to forget the delete-application step, and the consequence is a regulatory breach, not just a query bug.

Why compaction is the failure mode of streaming-into-lakehouse

Most teams that move from a daily-batch ETL onto a streaming-into-Iceberg architecture hit the same wall around month 4. The first month, data lands and dashboards work. By month 4, file counts are in the millions, query latencies have crept up, and storage cost is mysteriously double what the data volume suggests. The fix is a compaction policy and a EXPIRE_SNAPSHOTS policy. The painful part is realising the team should have set both up on day 1 — the cost of retrofitting them on a 50 TB table that has been written to for four months is itself a multi-day rewrite. Set up compaction before you turn on streaming, not after.

Where this leads next

Compaction is one of the maintenance jobs that production lakehouses run alongside ingestion. The companion job, EXPIRE_SNAPSHOTS, removes the dereferenced old files compaction leaves behind — and is a separate sharp edge worth its own chapter at /wiki/snapshot-expiry-and-time-travel-cost. The next chapter on the build path, /wiki/iceberg-delta-hudi-from-the-producers-perspective, compares the three formats on every dimension that matters for a producer — including how each one wires up compaction.

If you want the read-side mirror of this chapter (how the planner uses the compaction's output via column statistics and partition pruning), read /wiki/parquet-end-to-end-what-you-write-what-you-get-back and the upcoming chapter on Z-ordering and data skipping.

References