Task dependencies: wait-for, fan-out, fan-in
The 200-line executor from the last chapter dispatches a task the moment all its parents reach SUCCESS. That rule is enough for a linear pipeline of six steps. Real pipelines are not linear. A task may depend on a file that some other team's pipeline drops at 3 a.m. — that is not a parent task, it is an external precondition. A task may need to run 200 times in parallel, once per merchant — that is not one task with parents, it is a fan-out. A task may need to wait for all 200 of those parallel tasks to finish before computing a global aggregate — that is fan-in. These three shapes — wait-for, fan-out, fan-in — make up almost every production DAG you will read at Razorpay, Swiggy, Flipkart, or anywhere else, and the executor handles each through a different small extension.
A wait-for dependency is a task whose entire body is "did the external thing happen yet?" — the executor calls it a sensor and reschedules it when the answer is no. A fan-out is dynamic task creation: one upstream produces N values, the scheduler materialises N task rows in the database, the executor dispatches them in parallel. A fan-in is the inverse: N siblings each have the same downstream child, and the child waits for all of them via the same NOT EXISTS … != 'SUCCESS' rule the executor already uses. None of the three needs new state — they are layouts of the same task and edge tables.
Three graph shapes, three problems
A real DAG mixes all three patterns inside one run. Here is what a midnight pipeline at a payments company actually looks like:
The three patterns share the same dispatch rule from chapter 21 — a task is ready when all its parents are SUCCESS. What changes is how each pattern creates the parent set:
- Wait-for — the parent is a synthetic task whose body is "poll the outside world and either succeed or sleep". The executor needs to know that "did not succeed" means "try again later", not "fail".
- Fan-out — the parent set is not known at DAG-author time. A single upstream task computes how many children to create, then writes their
taskandedgerows on the fly. The executor's dispatch rule keeps working, because by the time it scans for ready tasks, the children exist in the database. - Fan-in — the child has many parents and waits for all of them. The executor's existing
NOT EXISTSquery already does this; the only new thing is the SQL writer's instinct that a single child node is the right shape, not one chained child per parent.
The next three sections build each pattern on top of the chapter-21 executor, in a way that adds together — by the end you have a single 80-line extension that handles all three.
Wait-for: the sensor pattern
A sensor is a task whose body is "poll an external condition; succeed if it is true, otherwise sleep and try again". The classic example is S3FileSensor in Airflow — wait until a specific S3 key exists. The next-most-classic is ExternalTaskSensor — wait until a different DAG's task has reached SUCCESS for the same logical run-date. Razorpay uses sensors to wait for daily NPCI settlement files; Swiggy uses them to wait for restaurant menu uploads from POS partners; every team uses them.
The naive shape is a sensor task whose Python body looks like this:
def sftp_file_arrived():
if not sftp_client.exists("/incoming/2026-04-25/settlements.csv"):
raise NotReady() # treated as a transient failure → retry
return # success
That works, but it abuses the retry mechanism. The executor sees a NotReady exception, increments attempt, schedules a retry with exponential backoff. After three retries (2 + 4 + 8 = 14 seconds total) the task is FAILED and the rest of the DAG is UPSTREAM_FAILED. A sensor that needs to wait six hours for a 3 a.m. file is going to chew through retry budgets fast. The fix is a new task state — SENSING — that is identical to RETRYING except the attempt counter does not increment and max_attempts does not apply. The executor reschedules the sensor on a fixed interval (typically 60 seconds) until it either returns success or hits a timeout.
The schema change is two lines:
-- extend the task state CHECK constraint:
state TEXT NOT NULL CHECK (state IN
('PENDING','READY','RUNNING','SUCCESS','FAILED',
'UPSTREAM_FAILED','RETRYING','SENSING'));
ALTER TABLE task ADD COLUMN sensor_timeout_at TEXT;
The new sensor_timeout_at column is when the executor should give up and mark the sensor FAILED instead of rescheduling it. The standard default is 12 hours; cron-based daily files set it to 18 hours so a midnight file that lands at 3 p.m. tomorrow is still acceptable but a 6 p.m. arrival the day after is not.
The executor extension is one new method and a small change to _run_task:
class NotReady(Exception):
"""Raised by a sensor when the condition is not yet true."""
def __init__(self, recheck_in_seconds: int = 60):
self.recheck_in = recheck_in_seconds
@dataclass
class TaskDef:
name: str
fn: Callable[[], None]
parents: List[str] = field(default_factory=list)
max_attempts: int = 3
is_sensor: bool = False
sensor_timeout_seconds: int = 12 * 3600 # 12-hour default
def _run_task(self, name: str):
# ... PENDING/RETRYING → RUNNING transition as before ...
try:
self.tasks[name].fn()
self._transition(name, "RUNNING", "SUCCESS", ended_at=now())
except NotReady as nr:
# Sensor: re-check after the requested interval
if self.tasks[name].is_sensor:
timeout_at = self._sensor_timeout_at(name)
if timeout_at and timeout_at < datetime.utcnow():
self._transition(name, "RUNNING", "FAILED",
ended_at=now(), error="sensor timeout")
self._propagate_failure(name)
else:
next_at = (datetime.utcnow()
+ timedelta(seconds=nr.recheck_in)).isoformat()
self._transition(name, "RUNNING", "SENSING",
next_retry_at=next_at)
else:
# Non-sensor: NotReady is a programming error
self._transition(name, "RUNNING", "FAILED",
ended_at=now(),
error="NotReady raised by non-sensor task")
except Exception as e:
# ... existing retry logic ...
The SENSING state participates in the dispatcher's _ready_to_dispatch — when next_retry_at expires, the sensor moves back to PENDING and the next loop pass picks it up. Why a separate SENSING state and not just RETRYING with max_attempts = inf: observability. An on-call dashboard wants to show "3 sensors waiting, 1 task retrying because of an actual error" — those are different operational situations. A sensor that has been waiting 6 hours is healthy; a task that has been retrying for 6 hours is sick. The state distinction is what separates them in the UI and the alerting rules.
A representative log of the sensor for an NPCI file:
$ python dag_exec.py settlements
-> wait_for_npci_file SENSING (recheck in 60s)
[60s]
-> wait_for_npci_file SENSING (recheck in 60s)
[60s]
... 47 minutes later ...
-> wait_for_npci_file SUCCESS
-> load_file RUNNING
-> ...
The downstream load_file task does not start until the sensor transitions out of SENSING. The executor's existing parent-state check — NOT EXISTS … WHERE p.state != 'SUCCESS' — already excludes SENSING (it is not SUCCESS), so no dispatch logic changes for the children. The whole sensor extension is the new state, the new exception class, and the modified exception branch in _run_task — about 30 extra lines.
Fan-out: dynamic task generation
A static DAG is one where every node is known at DAG-author time. A real-world DAG often has nodes that depend on data: "settle each merchant who had transactions today" produces a different number of tasks every day. On Republic Day or Diwali the number of active Razorpay merchants might be 8.2 lakh; on a normal Tuesday it might be 5.3 lakh. Hardcoding "one task per merchant" in the DAG file is impossible because the count is unknown until list_merchants finishes.
The pattern is dynamic task mapping: an upstream task produces a list of values, and the scheduler materialises one downstream task per value. Airflow added task.expand() for this in version 2.3 (2022); Dagster has had dynamic_outputs since launch; Prefect's .map() predates both. They all do the same thing under the hood — write new rows to the task and edge tables after the upstream completes.
The schema does not need to change. What changes is the dispatcher's flow:
The TaskDef extension is one optional field — a callable that, given the upstream's result, returns the list of child tasks to create:
@dataclass
class TaskDef:
name: str
fn: Callable
parents: List[str] = field(default_factory=list)
max_attempts: int = 3
is_sensor: bool = False
sensor_timeout_seconds: int = 12 * 3600
fan_out: Optional[Callable[[], List["TaskDef"]]] = None
def _on_task_success(self, name: str):
"""Hook that runs after a task transitions to SUCCESS.
If the task has a fan_out callable, materialise the children."""
td = self.tasks.get(name)
if td is None or td.fan_out is None:
return
children = td.fan_out()
with connect() as c:
for child in children:
c.execute("""INSERT OR IGNORE INTO task VALUES
(?,?,'PENDING',0,?,NULL,NULL,NULL,NULL,NULL)""",
(self.run_id, child.name, child.max_attempts))
c.execute("""INSERT OR IGNORE INTO edge VALUES (?,?,?)""",
(self.run_id, name, child.name))
self.tasks[child.name] = child
The INSERT OR IGNORE clause is the idempotency guard. Why INSERT OR IGNORE and not plain INSERT: a crash recovery may invoke _on_task_success a second time after a restart — the source task is already SUCCESS, the children are already in the database. A plain INSERT would raise a UNIQUE constraint violation and crash the executor; INSERT OR IGNORE makes the hook idempotent. Same primitive that makes _transition idempotent — at-least-once execution wrapped in a safe no-op for the second-and-subsequent calls.
A worked example: settling 1247 merchants in parallel.
def list_merchants():
# In real life, query an OLTP DB; here, just drop a file
today = datetime.utcnow().date().isoformat()
merchants = [f"M_{i:06d}" for i in range(1247)]
Path(f"/tmp/{today}_merchants.json").write_text(json.dumps(merchants))
def make_settle_tasks():
today = datetime.utcnow().date().isoformat()
merchants = json.loads(Path(f"/tmp/{today}_merchants.json").read_text())
return [TaskDef(name=f"settle_{m}", fn=lambda m=m: settle_one(m),
parents=["list_merchants"]) for m in merchants]
def settle_one(merchant_id):
# The actual per-merchant settlement work — typically 0.5–3 seconds
time.sleep(random.uniform(0.5, 3.0))
tasks = [
TaskDef("list_merchants", list_merchants, fan_out=make_settle_tasks),
TaskDef("total_revenue", compute_total, parents=[]), # fan-in below
]
A sample run on a 4-thread executor takes about 12 minutes (1247 tasks × 1.75s mean / 4 parallel = 545s). On a 16-thread executor it takes 137 seconds; on a Kubernetes pool with 80 workers it is 27 seconds. The fan-out is what makes the per-merchant work a knob the operator can tune, not a property of the DAG file.
The output shows what the database looks like:
sqlite> SELECT state, COUNT(*) FROM task WHERE run_id='2026-04-25' GROUP BY state;
SUCCESS | 1249
PENDING | 0
sqlite> SELECT name FROM task WHERE name LIKE 'settle_%' LIMIT 5;
settle_M_000000
settle_M_000001
settle_M_000002
settle_M_000003
settle_M_000004
The 1247 children plus list_merchants and total_revenue make 1249 rows. Every one was created at runtime by make_settle_tasks reading the JSON list — none was hardcoded.
Fan-in: when one task waits for many
Fan-in is the syntactically obvious case — give a task many parents and the existing dispatch rule already does the right thing. The interesting question is when not to use a single fan-in node, because the wrong shape can make a 6-minute pipeline 6 hours long.
TaskDef("total_revenue", compute_total,
parents=[f"settle_{m}" for m in merchants]) # 1247 parents
This works. The executor's _ready_tasks query will check 1247 parent rows for total_revenue on every dispatch pass, find one not-SUCCESS, and skip it; when the last parent finishes, the same query returns total_revenue as ready. The cost of each check is ~1 ms on SQLite for 1247 rows because of the idx_task_state index; a 1000-task fan-in with _ready_tasks running every state change is fine up to about 10 000 children. Beyond that, the parent-set check starts to dominate the executor's runtime and you want a different shape — a tree of fan-ins, where 100 child groups feed 10 mid-level aggregators, which feed 1 top aggregator. The total work is the same; the per-step parent count drops by an order of magnitude.
The wrong shape is chained fan-in — making settle_M_001 a parent of settle_M_002, and settle_M_002 a parent of settle_M_003, and so on, then having total_revenue only depend on the last one in the chain. This serialises 1247 tasks that should run in parallel. The 12-minute pipeline becomes 36 minutes (1247 × 1.75s = 2182s = 36 min). The author of such a DAG usually meant "in order, please" but did not realise that ordering on a fan-out is rarely needed — most settlements are independent of each other.
Why fan-in does not need a new state or new SQL: the existing rule "ready iff every parent is SUCCESS" works for any number of parents. The only thing that scales with parent count is the dispatcher's per-pass query cost, and that scales linearly with the index lookup, not the parent count itself. A 5-parent task and a 1000-parent task both compute readiness in essentially the same query — the difference is JOIN cardinality on a covered index, sub-millisecond either way.
A subtle question: what should total_revenue do if 30 of its 1247 parents are FAILED? Three policies are common in production:
all_success(default). Any failed parent →total_revenuebecomesUPSTREAM_FAILED. Strict; the aggregate is wrong if anyone failed.all_done. As long as every parent is in a terminal state (SUCCESSorFAILEDorUPSTREAM_FAILED), runtotal_revenueand let it decide. Lenient; the aggregate task is responsible for filtering out failed merchants.one_success. Run as soon as any parent succeeds. Useful for "try mirror A, mirror B, mirror C; whichever works wins" patterns.
Airflow calls these trigger_rules; Dagster calls them dependency rules; Prefect uses task-level boolean flags. The executor extension is a single column on the edge:
ALTER TABLE edge ADD COLUMN trigger_rule TEXT NOT NULL DEFAULT 'all_success';
And one new clause in _ready_tasks:
-- For each parent, the parent's state must satisfy the edge's trigger_rule.
-- all_success: parent.state = 'SUCCESS'
-- all_done: parent.state IN ('SUCCESS','FAILED','UPSTREAM_FAILED')
-- one_success: at least one parent.state = 'SUCCESS' (different shape)
The one_success rule is a different shape — it is "at least one" rather than "all", which inverts the NOT EXISTS to EXISTS. Most production DAGs default to all_success; teams reach for all_done when the aggregate is robust to missing inputs (e.g. a daily summary that should ship even if one merchant's pipeline failed) and one_success for failover patterns.
Where it goes wrong
Each of the three patterns has a characteristic failure mode that a 3 a.m. on-call needs to recognise.
Sensor stuck forever. A sensor that polls an external system can be stuck not because the system is slow but because the check is wrong. Common case: the file path includes a date, the date arithmetic is wrong (UTC vs IST, or off-by-one on month boundaries), the sensor is polling a path the file will never appear at. The fix is in the timeout — every sensor should have a hard upper bound (12 hours is the usual default), and a sensor that hits the timeout pages on-call rather than failing silently. Razorpay's settlement sensors have a 14-hour timeout because NPCI files can land as late as 2 p.m. IST; anything later is genuinely an outage.
Fan-out exploding. A list_merchants that returns 12 million rows because of a missing WHERE active = 1 clause produces a fan-out of 12 million tasks. The executor will dutifully insert 12 million rows into task and 12 million into edge, exhaust SQLite's lock timeout, and fall over. The defence is a hard cap on fan-out width — refuse to materialise more than (say) 50 000 children for one parent. The cap should be loud: log the violation, fail the upstream, page on-call. Swiggy's restaurant-onboarding pipeline added this guard in 2024 after a bad merchant-status-flag rollout fanned out into 4× normal width and brought down the scheduler.
Fan-in serialising under the hood. A task with 1247 parents looks parallel, but if the dispatcher's _ready_tasks query on each state change is slow (no index on task.state, no index on edge.parent, the join is wrong) then the dispatcher's overhead becomes the bottleneck — workers sit idle waiting for the next dispatch decision. Always check that idx_task_state and idx_edge_parent exist before scaling fan-in beyond a few hundred. The Airflow scheduler's "parsing" phase (which builds the equivalent in-memory graph) is the same kind of cost — Airflow 2.7 added incremental graph parsing to keep this from dominating large deployments.
Common confusions
- "A sensor is the same as a task with retries." A retry is for transient errors that should be recovered with backoff and an attempt cap; a sensor is for an external precondition that may take hours or never. They share machinery (rescheduling) but the policy differs: retries are bounded (
max_attempts), sensors are time-bounded (sensor_timeout). Mixing them up means either a sensor gives up too early or a real failure retries forever. Keep the states separate (RETRYINGvsSENSING) for both correctness and observability. - "Fan-out is the same as fan-in." They are mirror images at the topology level but operationally opposite. Fan-out is a write (insert N children); fan-in is a read (check N parents). A bug in fan-out fails loudly (insert errors, lock timeouts); a bug in fan-in fails silently (the child is stuck
PENDINGbecause of one stale parent row). Different debugging tools. - "The executor needs to be aware of fan-out at the dispatch layer." It does not. Fan-out is a hook that runs in the success path, materialises rows in the existing
taskandedgetables, and then disappears. The dispatcher's main loop is unchanged — it scans the (now larger)tasktable on the next iteration and finds newPENDINGrows. The cleanest extensions to a scheduler are the ones that touch only the data, not the control flow. - "
trigger_rule = all_donemeans a task always runs." Only if every parent reaches a terminal state. A parent stuck inRETRYINGkeeps the childPENDING. The rule is "all parents are done", not "ignore parent state". For "always run regardless" you need a separate state or a sentinel parent that is artificially markedSUCCESS. - "You should always use trees of fan-ins to keep parent count low." Most fan-ins are fine at any reasonable count. The tree-of-fan-ins shape is a real fix only when you genuinely have tens of thousands of siblings and the dispatcher's
_ready_tasksquery is showing up as a hot path in profiling. Premature decomposition adds DAG nodes (more rows, more state to manage) for no operational benefit. - "Fan-out children share retry budget with their parent." They do not — each child task has its own
attemptcounter andmax_attempts. A fan-out of 1247 settlements withmax_attempts=3per child gives the system 3741 total attempt-slots, not 3. This is usually what you want (one merchant's API blip should not exhaust the whole pipeline's retry budget) but it does mean a systematically flaky downstream can chew through 3741 retries before the DAG fails — bound the total runtime with an SLA, not just per-task retry caps.
Going deeper
Sensor reschedule mode vs poking mode
Two implementation strategies for sensors that look identical from the DAG author's view but cost very differently in production. Poking mode keeps the sensor in RUNNING state and the worker thread alive between checks (the worker's body is while True: if check(): return; time.sleep(60)). Reschedule mode is what the executor in this chapter does: the sensor returns to the dispatcher between checks, the worker thread is freed, the next check is a fresh dispatch. Poking mode wastes a worker slot for the entire wait; reschedule mode wastes nothing. On a Kubernetes Airflow deployment with 50 worker pods, 30 sensors waiting for SFTP files in poking mode would consume 30 pods (60 % of capacity) for hours; in reschedule mode they consume zero. Airflow added mode='reschedule' for sensors in 2018 specifically to recover from the "all the workers are sensors" deadlock — every team learns to set it on day two.
The Razorpay merchant-settlement DAG
Razorpay's daily settlement pipeline (as documented in their 2024 engineering blog) processes 8.2 lakh active merchants on a peak day. The shape is exactly the three-pattern DAG above: a sensor for the NPCI settlement file (timeout 14 hours), a load_npci_file extract task, a list_merchants upstream that produces N merchant IDs from the day's order log, a fan-out into N settle_<merchant> tasks, and a daily_total fan-in. The fan-out width on Republic Day 2024 was 9.4 lakh; the pipeline ran in 23 minutes on a 200-pod Kubernetes worker pool. The interesting choice in their design: each settle_<merchant> task has max_attempts = 5 rather than the default 3, because settlement involves 4 dependent network calls (UPI ack, ledger update, RBI compliance log, merchant notification) and a retry of any one is cheap relative to the cost of failure (a missed settlement is a customer-support ticket). The 5-attempt budget × 9.4 lakh tasks = 47 lakh attempt-slots; the day-of-Republic-Day actually used 11.2 lakh attempts — roughly one in eight tasks retried at least once. That is the kind of throughput a fan-out architecture is forced into to handle both transient failure and Indian-scale concurrency.
Why Airflow's TaskGroup is not a fan-out primitive
Airflow has two superficially similar features: TaskGroup (a visual grouping of tasks for the UI) and dynamic task mapping (task.expand()). Confusing them is common — both group related tasks. The difference: TaskGroup is static (you write the group's contents at DAG-author time); task.expand() is dynamic (the count is read at runtime). A monthly report DAG with 50 hardcoded categories is a TaskGroup. A daily settlement DAG with N variable merchants is task.expand(). Picking TaskGroup for a dynamic count means hardcoding the upper bound and accepting that DAG re-deploys are required when the bound changes; picking task.expand() for a static count means accepting unnecessary runtime overhead. Read the Airflow source for taskmixin.expand if you want the exact mechanism — the implementation writes new task_instance rows during scheduler parsing, which is the same hook the executor in this chapter exposes via fan_out.
Fan-out without ordering: the "no two writers to the same key" rule
A fan-out is parallel by design; the executor makes no guarantee about the order of execution among siblings. If two siblings write to the same key (same merchant ID, same row in a warehouse table), you have a race condition — last writer wins, and "last" is unpredictable. The defence is a partition contract: each sibling writes to a key it alone owns. settle_M_001 writes to merchant M_001's row and nobody else's; settle_M_002 writes to M_002. With this contract there is no race; without it, a fan-out is a bug factory. Idempotent writes (chapter 13) plus partition discipline are what make fan-out safe at scale; both are policies the DAG author enforces, not mechanisms the executor provides.
Where this leads next
- Retries, timeouts, and poisoned tasks — chapter 23, where the policy questions the sensor and retry primitives raise get answered.
- Backfills: re-running history correctly — chapter 24, what changes when you replay these three patterns over 90 days of historical run-dates.
- Airflow vs Dagster vs Prefect: the real design differences — chapter 26, how the three production schedulers each implement these primitives.
The three patterns plus the 200-line executor cover roughly 90 % of pipeline shapes you will see in the wild. The remaining 10 % — recursive fan-outs, conditional branches that depend on data, cross-DAG dependencies — are extensions of the same dispatch rule.
References
- Airflow dynamic task mapping (
expand) — the production-grade reference for fan-out. - Airflow sensors and reschedule mode — why poking mode is a deadlock waiting to happen.
- Dagster dynamic outputs — the asset-graph framing of fan-out.
- Razorpay engineering: scaling daily settlement — Razorpay's published view of their settlement DAG architecture.
- Prefect 2.0
.map()and.unmapped()— the third major scheduler's take on fan-out. - Apache Airflow trigger rules —
all_success,all_done,one_success, and the seven less common ones. - Writing a DAG executor in 200 lines — chapter 21, the executor this chapter extends.
- The DAG as the right abstraction — chapter 20, the abstraction every pattern in this chapter is a layout of.