Your first pipeline: CSV → CSV in 50 lines
Riya works at a Bengaluru fintech that ships refunds on UPI. Every morning at 06:00 IST, the ops team needs a CSV of yesterday's refunds — joined with the merchant's GST state, deduped against any retries, and ready to mail. Yesterday she did it by hand in Excel and got it wrong. Today she will write the pipeline. Fifty lines, three primitives, one file on disk.
This chapter is the smallest pipeline that is honestly a pipeline. It runs end to end. It can be re-run after a crash without poisoning the output. It logs what it did. The shape you see here — Extract reads from one file, Transform applies the business rule, Load writes atomically to a final file — is the shape you will keep for every pipeline you ever ship, from this fifty-line version to the 100-node Airflow DAG at Razorpay.
A real pipeline reads input, transforms in pure functions, and writes output atomically — even at fifty lines. Keep Extract thin and defensive, Transform pure and replayable, Load atomic and idempotent. The fifty-line shape scales to the production shape because every production pipeline is just this skeleton with more layers, not a different skeleton.
What we are building
Riya has two CSV files dropped into /data/in/ every night by the platform team:
refunds_2026-04-24.csv— one row per refund initiated yesterday, withrefund_id,merchant_id,amount_paise,initiated_at,status.merchants.csv— reference data withmerchant_id,merchant_name,gst_state,kyc_status.
The ops team needs /data/out/refunds_settled_2026-04-24.csv containing only the refunds whose status == "SETTLED", joined with the merchant's name and gst_state, with the amount converted to rupees and rows deduped on refund_id.
The job is small enough that you could write it in ten minutes as one giant function. That is exactly what your last-week self would have done, and exactly the script that will page you at 02:14 a month from now. Instead, write it in three.
The fifty lines
Here is the entire pipeline. Save it as pipeline.py, drop the two input CSVs into /data/in/, and run python pipeline.py 2026-04-24.
# pipeline.py — Riya's daily refund pipeline, fifty lines, three primitives.
import sys, os, csv, hashlib, shutil, datetime
from pathlib import Path
IN_DIR = Path("/data/in")
OUT_DIR = Path("/data/out")
TMP_DIR = Path("/data/_staging")
# ---------- Extract ----------
def extract_refunds(run_date: str):
path = IN_DIR / f"refunds_{run_date}.csv"
if not path.exists():
raise FileNotFoundError(f"missing input: {path}")
with path.open() as f:
rows = list(csv.DictReader(f))
expected = {"refund_id", "merchant_id", "amount_paise",
"initiated_at", "status"}
missing = expected - set(rows[0].keys())
if missing:
raise RuntimeError(f"schema drift in refunds: {missing}")
return rows
def extract_merchants():
with (IN_DIR / "merchants.csv").open() as f:
return {r["merchant_id"]: r for r in csv.DictReader(f)}
# ---------- Transform ----------
def transform(refunds, merchants, run_id: str):
seen, out = set(), []
for r in refunds:
if r["status"] != "SETTLED":
continue
if r["refund_id"] in seen:
continue
seen.add(r["refund_id"])
m = merchants.get(r["merchant_id"], {})
amount_inr = int(r["amount_paise"]) / 100
dedup = hashlib.sha1(
f'{r["refund_id"]}|{r["merchant_id"]}|{r["amount_paise"]}'
.encode()).hexdigest()[:12]
out.append({
"refund_id": r["refund_id"],
"merchant_id": r["merchant_id"],
"merchant_name": m.get("merchant_name", "UNKNOWN"),
"gst_state": m.get("gst_state", "UNKNOWN"),
"amount_inr": f"{amount_inr:.2f}",
"initiated_at": r["initiated_at"],
"_dedup_key": dedup,
"_run_id": run_id,
})
return out
# ---------- Load ----------
def load(rows, run_date: str):
OUT_DIR.mkdir(parents=True, exist_ok=True)
TMP_DIR.mkdir(parents=True, exist_ok=True)
final = OUT_DIR / f"refunds_settled_{run_date}.csv"
tmp = TMP_DIR / f"{run_date}.{os.getpid()}.csv"
with tmp.open("w", newline="") as f:
w = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
w.writeheader()
w.writerows(rows)
os.replace(tmp, final) # atomic on POSIX
print(f"[load] wrote {len(rows):,} rows -> {final}")
# ---------- Orchestrate ----------
if __name__ == "__main__":
run_date = sys.argv[1] # "2026-04-24"
run_id = f"{run_date}T06:00:00+05:30"
refunds = extract_refunds(run_date)
merchants = extract_merchants()
rows = transform(refunds, merchants, run_id)
load(rows, run_date)
Sample run, given 42,318 input refunds and 8,42,167 merchants:
$ python pipeline.py 2026-04-24
[load] wrote 28,914 rows -> /data/out/refunds_settled_2026-04-24.csv
$ head -2 /data/out/refunds_settled_2026-04-24.csv
refund_id,merchant_id,merchant_name,gst_state,amount_inr,initiated_at,_dedup_key,_run_id
rfd_8a1f9c,m_44912,Asha Kirana Stores,Karnataka,1247.50,2026-04-24T11:42:18Z,7e2c91a6b3df,2026-04-24T06:00:00+05:30
Walk through the load-bearing lines.
expected = {"refund_id", ...} and the missing check is the schema drift guardrail. The platform team can rename a column without warning you; the pipeline must fail loudly here, not silently produce garbage. extract_merchants returns a dict keyed by merchant_id rather than a list — the join in Transform becomes O(N) lookup instead of O(N×M) scan. if r["status"] != "SETTLED" and if r["refund_id"] in seen are the business rule and the dedup, in that order; both belong in Transform because they need the full window of a run, not row-at-a-time. hashlib.sha1(...).hexdigest()[:12] stamps every output row with a content hash so a downstream consumer can tell whether a row changed without comparing every column. os.replace(tmp, final) is the most important line in the file — it is atomic on POSIX, meaning a reader of final will always see either the old version or the new version, never a half-written one. Without it, a consumer reading at 06:00:00.5 while you are still writing at 06:00:00.7 sees a truncated CSV.
Why os.replace and not shutil.move: shutil.move falls back to copy+delete across filesystems, which is not atomic. os.replace either succeeds atomically (same filesystem) or raises. Keeping IN_DIR, OUT_DIR, and TMP_DIR on the same volume lets you rely on a single rename(2) syscall. Cross-volume staging needs a different pattern (S3 CopyObject, two-phase commit, etc.).
Why dedup uses a content hash and not just the refund_id: in the dataset above, the same refund_id may appear twice with different amount_paise if the platform replayed an event with a fixed value. Dedup on just refund_id would silently drop the corrected row. Hashing the tuple (refund_id, merchant_id, amount_paise) lets the dedup distinguish "exact replay" from "logical update", and the _dedup_key column gives downstream consumers a stable identity to MERGE on.
Why each primitive looks the way it looks
The Extract functions are deliberately boring. extract_refunds opens one file, reads rows, checks columns, returns. extract_merchants opens one file, builds a dict, returns. There is no business logic, no joining, no amount_paise / 100. If the input file is missing, Extract raises. If the schema drifted, Extract raises. Otherwise, Extract is a straight read with one validation.
This is the entire shape of a production Extract: pull a slice, validate the boundary, hand the data off. The fact that this version reads from disk instead of Postgres or S3 changes the body of the function, not the shape. When you replace csv.DictReader(f) with psycopg2.connect(...) in chapter 11, the wrapper around it stays identical.
The Transform function is a pure function of three inputs (refunds, merchants, run_id) and one output (the list of dicts). It does not call the network. It does not read the system clock — run_id is passed in. It does not have credentials. The whole function can be unit-tested with two hand-built lists of dicts and a fake run_id. Run it with last Tuesday's data and you get last Tuesday's output, identical.
The Load function does the one thing that matters for correctness — atomic publication. Write to a temp file in /data/_staging/, then os.replace it onto /data/out/refunds_settled_<date>.csv. A consumer running cat /data/out/refunds_settled_2026-04-24.csv while the pipeline is mid-write sees yesterday's file, complete. A second after rename, they see today's file, complete. Never a torn half-write. This is the same invariant that Postgres uses for pg_dump, that Kafka uses for log segment rotation, and that S3 uses for PutObject: publication is one atomic step at the end, not a stream of partial writes.
Why staging is in a different directory and not the same one as final: if tmp is in OUT_DIR, a consumer who lists OUT_DIR mid-run sees a stray file with a name they were not expecting. Keeping a separate _staging/ directory means consumers can confidently ls /data/out/ and see exactly the files that are meant to be there. The convention scales: in S3, this becomes _staging/ keys vs final/ keys; in Iceberg, _metadata.json.tmp vs _metadata.json on commit.
What this pipeline still does not handle
Fifty lines is not nothing, but it is not yet a production pipeline. Here is what is still missing and which build addresses it.
| Gap | Symptom | Build that fixes it |
|---|---|---|
| Crash mid-Transform leaves no trace | Re-running starts from scratch but you have no idea where it failed | Build 1, ch. 4 (crashing mid-run) |
| No retry on Extract failure | A blip on the input filesystem fails the whole run | Build 1, ch. 6 + Build 2 (retry budgets) |
| Nothing schedules it | Riya runs python pipeline.py from her laptop at 06:00 — once she's on leave, no refunds |
Build 1, ch. 7 (cron); Build 4 (DAGs) |
| No idempotency at the destination | If you re-run, you overwrite — fine for one CSV, broken for warehouse loads | Build 2 (idempotency by construction) |
| No observability | When ops complains the file is "wrong", you cannot prove what input you had | Build 5 (lineage and contracts) |
| Schema evolution | Tomorrow the platform team adds a refund_reason column; the dedup hash must adapt |
Build 3 + Build 5 |
| Late events | A refund initiated yesterday that settles today still belongs in yesterday's file | Build 3 (incremental processing, watermarks) |
You will spend the next 130 chapters filling each gap. None of them are a different shape — they are layers on top of this same Extract / Transform / Load skeleton. The fifty-line script is the seed crystal.
Edge cases this skeleton already handles (and the ones it does not)
Run the pipeline against a few adversarial inputs and watch which ones it survives. This is the cheapest way to build intuition for what the three-primitive shape buys you and where it stops.
Empty refunds file. The DictReader yields zero rows, the expected - set(rows[0].keys()) line raises IndexError because rows[0] does not exist. That is a genuine bug in the fifty-line version — the production fix is to validate via csv.reader headers separately, then iterate. Worth fixing now: change rows[0].keys() to read the header row once before iterating, and short-circuit cleanly when there is nothing to process.
Duplicate refund_id with the same amount. The dedup hash is identical, so _dedup_key in seen triggers and the row is dropped. Output is correct: one row per logical refund. This is the case the dedup was built for.
Duplicate refund_id with a different amount_paise. The two rows produce two different dedup hashes, so both pass through. Whether that is right depends on the business semantics — if the second row is the correct amount and the first was a bug, you want the second; if both are real (e.g. partial refunds with the same parent ID), you want both with a more specific natural key. Build 2 covers the natural-key vs surrogate-key choice.
Merchant missing from merchants.csv. The row passes through with merchant_name="UNKNOWN" and gst_state="UNKNOWN". The pipeline does not crash — it produces a row that the ops team can spot and fix tomorrow. The alternative (raising on missing merchant) would page Riya at 03:00 because of one stale merchant ID; tagging-and-continuing is the kinder default for reference-data joins. Build 5 wraps this in a contract — "we expect missing-merchant rate to be < 0.1%; alert if higher."
Disk full during Load. csv.DictWriter raises OSError mid-write. Because the staging file is in /data/_staging/, the consumer's view of /data/out/ is unchanged — they still see yesterday's file. The half-written staging file is orphaned, which a separate cleanup job (or a find /data/_staging -mtime +1 -delete cron) sweeps. The atomic-publish invariant survives even partial Load failures.
Two pipelines running concurrently for the same run_date. Cron mis-fires, or an operator runs the pipeline manually while the scheduled run is still going. Both write to /data/_staging/ (different files, because of os.getpid() in the staging filename), then both call os.replace. Whichever finishes second wins the final filename; the first run's output is silently overwritten. This is fine if the pipeline is deterministic — same input, same output — and a serious bug if the input has changed between the two starts. Build 4's scheduler adds run-level locking; for now, the convention is "one operator at a time" and a clear log line on start so the second run notices the first.
An input row has a comma inside merchant_name. csv.DictReader handles RFC-4180 quoted fields correctly, so "Asha Kirana, Stores" parses as one field. csv.DictWriter re-quotes on output. This is one of the few places where the standard library quietly does the right thing without you asking. It is also why hand-rolling CSV parsing with line.split(",") is a recurring class of production bug at companies that learned from Excel rather than from RFCs.
Common confusions
-
"Pandas would be cleaner — why use raw
csvand dicts?" Pandas would be cleaner for the transform, and for any production version of this pipeline you would reach for it (or PyArrow). Rawcsvis used here so the pipeline runs on a stock Python install with zero dependencies — useful when you are the only engineer and your laptop'spipis broken at 02:00. The shape (Extract → Transform → Load) is independent of the library; chapter 11 swapscsv.DictReaderfor Postgres, and the wrapper survives. -
"
os.replaceis the same asmv." They are nearly the same —mvcallsrename(2)underneath when source and destination are on the same filesystem. The trap: when they are not on the same filesystem,mvfalls back to copy + delete, which is not atomic.os.replaceraisesOSErrorinstead of falling back, which is what you want — a non-atomic move silently is the kind of bug you find six months later when a partial file ends up in production. -
"The pipeline is idempotent because re-running produces the same output." Re-running with the same input produces the same output, yes — but that is determinism, not idempotency. True idempotency means re-running is safe at the destination: a second run does not double-insert, does not corrupt existing rows, does not leave an inconsistent state. With a CSV and
os.replace, you get this almost for free. With a PostgresINSERT INTO, you do not — Build 2 covers what changes. -
"Schema drift detection belongs in Transform." It belongs in Extract. The whole point of Extract is to fail loudly at the boundary so Transform can assume clean inputs. If schema drift first surfaces in Transform, you are left guessing whether the bug is in your join logic or in the upstream data — exactly the ambiguity Extract exists to remove.
-
"The dedup is unnecessary because input has no duplicates." Today, maybe. The dedup costs almost nothing and protects against three classes of bug: a retry from an upstream extract that double-emits; a manual re-run that concatenates two days of input; a bug in the source system that emits two rows with the same ID. The first time one of these happens at 02:14, the dedup is what saves you. Pay it forward.
-
"Why is
run_idpassed into Transform — can't it just compute it?" Becausedatetime.now()inside Transform makes it not a pure function, which means you cannot replay it. Passrun_idfrom the orchestrator (theif __name__ == "__main__"block) and Transform stays a function of its arguments. This rule is non-negotiable; Build 2 chapter 14 makes it formal.
Going deeper
From fifty lines to fifty thousand: what stays, what changes
Walk this fifty-line pipeline forward to a real production shape — the Razorpay refunds pipeline that runs every fifteen minutes on 100M events/day. Everything that changes lives below; everything not mentioned stays as-is.
The Extract changes from csv.DictReader to a CDC stream from a Postgres replica (Build 11). The Transform stays a pure function — but it runs on Apache Beam (Build 10) so it can scale across workers, with the same shape preserved. The Load changes from os.replace to a transactional MERGE into Iceberg (Build 12) on S3, with the atomic-publish invariant carried by Iceberg's snapshot commit instead of POSIX rename. The orchestrator changes from if __name__ == "__main__" to an Airflow DAG (Build 4) that runs every fifteen minutes, retries on failure, and emits SLA breaches to PagerDuty.
Notice what did not change: the three-primitive shape, the run_id parameter, the dedup hash, the schema-drift check at the boundary, the staging-then-publish pattern. Those concepts are the architecture; the libraries are implementation.
Why dedup happens once, and where
A common bug pattern in inherited pipelines: dedup is done in three places and disagrees in two of them. Once at extract, once at transform, once at load, with different keys. The result is rows that disappear or rows that double, depending on the run.
The discipline: dedup has exactly one home, and it is Transform. Extract should not dedup because it sees data row-at-a-time and cannot know whether a duplicate will arrive later in the run. Load should not dedup because by the time data reaches Load, the dedup decision has already been made — Load's job is to write what Transform produced, faithfully and atomically. If the destination can express idempotent writes (Postgres MERGE, Kafka idempotent producer), Load uses that primitive to handle retries, not its own dedup. The general rule: window-level checks belong in Transform, exactly once.
The run_id is the lineage primitive
The single most-undervalued line in the fifty-line pipeline is run_id = f"{run_date}T06:00:00+05:30". It does three things at once:
- Stamps every output row with the run that produced it, so you can answer "which run wrote this?" months later — the foundation of column-level lineage in Build 5.
- Seeds determinism. Random sampling, hash bucketing, and even
_staging/filenames can be derived fromrun_id, so retries produce the same intermediate keys. - Becomes the partition key when the destination grows up. The fifty-line CSV uses
run_dateas a filename suffix; the production version usesrun_idas an Iceberg partition; the Kafka version usesrun_idas the message key for idempotent producers. Same primitive, different surfaces.
Pick the format early and keep it consistent. ISO-8601 with timezone (2026-04-24T06:00:00+05:30) is the format almost every modern data platform parses correctly. Avoid epoch milliseconds — they read poorly in logs and break in spreadsheets.
A useful convention adopted by several Indian fintechs: pin the run timestamp to a fixed local time of day (06:00:00+05:30), not the wall clock of the executor. That way two retries on the same logical day produce the same run_id, which keeps the partition deterministic across retries. The actual start time goes into a separate started_at log field for diagnostics. Mixing "when did this run logically belong to" and "when did the process start" into the same timestamp is one of the silent ways pipelines develop time-zone-shaped bugs over years.
What the production version of this exact pipeline looks like at scale
At Razorpay, the production refunds pipeline runs every 15 minutes (not nightly), processes ~5 lakh refunds per window during peak business hours, and lands in three destinations simultaneously: an Iceberg table on S3 for analytics, a Postgres table for the ops dashboard, and a Kafka topic for downstream fraud-scoring. The Indian refund volume is heavily skewed — Big Billion Days at Flipkart (a downstream merchant) sends 14× normal traffic for 4 hours; Diwali week sees ₹2,000+ crore in refunds across the platform.
The pipeline still has three primitives. Extract is now Debezium reading the Postgres write-ahead log. Transform is a Beam job that joins against the merchant table (loaded into Beam state from a daily snapshot, refreshed on schema change). Load is a multi-sink: Iceberg writer + Postgres MERGE + Kafka idempotent producer, fanned out from a single transform output. The orchestrator is Airflow with sensors that wait for the Postgres replica to be caught up. The fifty-line shape is recognisable inside the 5,000-line shape, exactly like the SHA-256 hash function is recognisable inside the BitTorrent protocol.
The cost picture is also worth carrying forward. The fifty-line version costs Riya nothing — it runs on a laptop in 0.5 seconds. The Razorpay-scale version costs roughly ₹4–6 lakh per month (S3 storage + EMR compute + warehouse storage + monitoring). What you pay for at scale is not the throughput — modern hardware processes lakhs of rows per second per core. You pay for the operational layer: retries, lineage, monitoring, on-call, schema evolution, backfills. That is exactly the layer Builds 2 through 5 build out.
Diagnostics: what to log when this pipeline runs
The fifty-line version prints one line at the end. A production version emits structured events at every primitive boundary:
extract.start run_id=2026-04-24T06:00:00+05:30
extract.done run_id=... rows=42318 ms=412
transform.start run_id=...
transform.done run_id=... rows_in=42318 rows_out=28914 dropped=13404 ms=89
load.start run_id=... target=/data/out/refunds_settled_2026-04-24.csv
load.done run_id=... rows=28914 bytes=4218914 ms=37
pipeline.done run_id=... total_ms=552 status=ok
These are the seven events that make up run_id-keyed lineage. Build 5 turns them into a queryable lineage graph; Build 14 turns them into p99 latency dashboards. For now, even printing them to stderr means that when ops asks "why was the file empty?" you can grep one log file and answer in thirty seconds instead of three hours.
Where this leads next
Chapter 4 takes this exact pipeline and crashes it halfway through Transform. You will walk through what state is left on disk, which retry strategies are safe, and why "just re-run it" is a correct answer for this script but a dangerous one for tomorrow's pipeline. Chapter 5 names the three walls every Build-1 pipeline runs into — the gaps in the table above — and sets up Build 2 (idempotency).
- Crashing mid-run: which state are you in?
- The three walls: no idempotency, no observability, no schedule
- Idempotency by construction — Build 2, where the dedup-hash pattern from this chapter graduates into a first-class technique.
- The three primitives: extract, transform, load — chapter 2, the conceptual frame this pipeline embodies.
- What makes a data pipeline different from a script — chapter 1, the night-by-night list of failure modes this fifty-line skeleton begins to address.
Build 6 (chapters 67–77) returns to atomic publication with Parquet and Iceberg; the staging-rename pattern here is the in-the-small version of Iceberg's snapshot-commit pattern. Build 7 (chapters 78–87) shows what changes when the destination is a Kafka topic instead of a file — the same atomic-publish invariant, expressed through idempotent producers and transactional commits.
References
- Designing Data-Intensive Applications, Chapter 10: Batch Processing — Martin Kleppmann, O'Reilly 2017. The chapter that walks the same E/T/L decomposition for systems at terabyte scale.
- Functional Data Engineering — Maxime Beauchemin, 2018. The argument for pure-function transforms; this chapter's
transform()is the smallest possible instance of that argument. - The Linux
rename(2)man page — the syscall that backsos.replace. Reading it is the fastest way to understand why atomic publication is free on POSIX and not free everywhere else. - Python
os.replacedocumentation — the cross-platform wrapper, with the explicit guarantee that it raises rather than fall back to copy. - Razorpay engineering: scaling payments data engineering — the production version of this pipeline, at 1B events/day.
- Apache Iceberg's snapshot-commit semantics — the production-scale equivalent of the staging-rename pattern. Worth reading once now and again after Build 12.
- The append-only log: simplest store — the cross-domain gold-standard chapter. Same underlying invariant: a writer publishes once, a reader sees a consistent view always.
- pandas
mergedocumentation —validate="many_to_one"is the production version of the join discipline shown in this chapter's Transform.