Task dependencies: wait-for, fan-out, fan-in

The 200-line executor from the last chapter dispatches a task the moment all its parents reach SUCCESS. That rule is enough for a linear pipeline of six steps. Real pipelines are not linear. A task may depend on a file that some other team's pipeline drops at 3 a.m. — that is not a parent task, it is an external precondition. A task may need to run 200 times in parallel, once per merchant — that is not one task with parents, it is a fan-out. A task may need to wait for all 200 of those parallel tasks to finish before computing a global aggregate — that is fan-in. These three shapes — wait-for, fan-out, fan-in — make up almost every production DAG you will read at Razorpay, Swiggy, Flipkart, or anywhere else, and the executor handles each through a different small extension.

A wait-for dependency is a task whose entire body is "did the external thing happen yet?" — the executor calls it a sensor and reschedules it when the answer is no. A fan-out is dynamic task creation: one upstream produces N values, the scheduler materialises N task rows in the database, the executor dispatches them in parallel. A fan-in is the inverse: N siblings each have the same downstream child, and the child waits for all of them via the same NOT EXISTS … != 'SUCCESS' rule the executor already uses. None of the three needs new state — they are layouts of the same task and edge tables.

Three graph shapes, three problems

A real DAG mixes all three patterns inside one run. Here is what a midnight pipeline at a payments company actually looks like:

Wait-for, fan-out, fan-in inside one DAGA single payments DAG showing three graph patterns. On the left, a sensor task waits for an external SFTP file to land. The sensor feeds into a fan-out: one orchestrator task produces N parallel per-merchant tasks. Those parallel tasks then converge into a fan-in: a single aggregate task waits for all of them before computing the daily total.A payments DAG with all three shapes — wait-for → fan-out → fan-inwait-forSFTP file landssensor taskload_fileparse + stagelist_merchantsN = 1247 todayfan-out (N parallel)settle_M_001settle_M_002settle_M_003… 1244 morefan-intotal₹ aggregate
One DAG, three shapes. The dashed-edge sensor on the left waits on an external file. `list_merchants` reads today's merchant count and triggers a fan-out into N parallel settlement tasks. The single `total` task on the right is a fan-in: it cannot start until *every* `settle_M_*` task is done.

The three patterns share the same dispatch rule from chapter 21 — a task is ready when all its parents are SUCCESS. What changes is how each pattern creates the parent set:

The next three sections build each pattern on top of the chapter-21 executor, in a way that adds together — by the end you have a single 80-line extension that handles all three.

Wait-for: the sensor pattern

A sensor is a task whose body is "poll an external condition; succeed if it is true, otherwise sleep and try again". The classic example is S3FileSensor in Airflow — wait until a specific S3 key exists. The next-most-classic is ExternalTaskSensor — wait until a different DAG's task has reached SUCCESS for the same logical run-date. Razorpay uses sensors to wait for daily NPCI settlement files; Swiggy uses them to wait for restaurant menu uploads from POS partners; every team uses them.

The naive shape is a sensor task whose Python body looks like this:

def sftp_file_arrived():
    if not sftp_client.exists("/incoming/2026-04-25/settlements.csv"):
        raise NotReady()      # treated as a transient failure → retry
    return                    # success

That works, but it abuses the retry mechanism. The executor sees a NotReady exception, increments attempt, schedules a retry with exponential backoff. After three retries (2 + 4 + 8 = 14 seconds total) the task is FAILED and the rest of the DAG is UPSTREAM_FAILED. A sensor that needs to wait six hours for a 3 a.m. file is going to chew through retry budgets fast. The fix is a new task state — SENSING — that is identical to RETRYING except the attempt counter does not increment and max_attempts does not apply. The executor reschedules the sensor on a fixed interval (typically 60 seconds) until it either returns success or hits a timeout.

The schema change is two lines:

-- extend the task state CHECK constraint:
state TEXT NOT NULL CHECK (state IN
   ('PENDING','READY','RUNNING','SUCCESS','FAILED',
    'UPSTREAM_FAILED','RETRYING','SENSING'));
ALTER TABLE task ADD COLUMN sensor_timeout_at TEXT;

The new sensor_timeout_at column is when the executor should give up and mark the sensor FAILED instead of rescheduling it. The standard default is 12 hours; cron-based daily files set it to 18 hours so a midnight file that lands at 3 p.m. tomorrow is still acceptable but a 6 p.m. arrival the day after is not.

The executor extension is one new method and a small change to _run_task:

class NotReady(Exception):
    """Raised by a sensor when the condition is not yet true."""
    def __init__(self, recheck_in_seconds: int = 60):
        self.recheck_in = recheck_in_seconds

@dataclass
class TaskDef:
    name: str
    fn: Callable[[], None]
    parents: List[str] = field(default_factory=list)
    max_attempts: int = 3
    is_sensor: bool = False
    sensor_timeout_seconds: int = 12 * 3600   # 12-hour default

def _run_task(self, name: str):
    # ... PENDING/RETRYING → RUNNING transition as before ...
    try:
        self.tasks[name].fn()
        self._transition(name, "RUNNING", "SUCCESS", ended_at=now())
    except NotReady as nr:
        # Sensor: re-check after the requested interval
        if self.tasks[name].is_sensor:
            timeout_at = self._sensor_timeout_at(name)
            if timeout_at and timeout_at < datetime.utcnow():
                self._transition(name, "RUNNING", "FAILED",
                                 ended_at=now(), error="sensor timeout")
                self._propagate_failure(name)
            else:
                next_at = (datetime.utcnow()
                           + timedelta(seconds=nr.recheck_in)).isoformat()
                self._transition(name, "RUNNING", "SENSING",
                                 next_retry_at=next_at)
        else:
            # Non-sensor: NotReady is a programming error
            self._transition(name, "RUNNING", "FAILED",
                             ended_at=now(),
                             error="NotReady raised by non-sensor task")
    except Exception as e:
        # ... existing retry logic ...

The SENSING state participates in the dispatcher's _ready_to_dispatch — when next_retry_at expires, the sensor moves back to PENDING and the next loop pass picks it up. Why a separate SENSING state and not just RETRYING with max_attempts = inf: observability. An on-call dashboard wants to show "3 sensors waiting, 1 task retrying because of an actual error" — those are different operational situations. A sensor that has been waiting 6 hours is healthy; a task that has been retrying for 6 hours is sick. The state distinction is what separates them in the UI and the alerting rules.

A representative log of the sensor for an NPCI file:

$ python dag_exec.py settlements
  -> wait_for_npci_file SENSING (recheck in 60s)
  [60s]
  -> wait_for_npci_file SENSING (recheck in 60s)
  [60s]
  ... 47 minutes later ...
  -> wait_for_npci_file SUCCESS
  -> load_file RUNNING
  -> ...

The downstream load_file task does not start until the sensor transitions out of SENSING. The executor's existing parent-state check — NOT EXISTS … WHERE p.state != 'SUCCESS' — already excludes SENSING (it is not SUCCESS), so no dispatch logic changes for the children. The whole sensor extension is the new state, the new exception class, and the modified exception branch in _run_task — about 30 extra lines.

Fan-out: dynamic task generation

A static DAG is one where every node is known at DAG-author time. A real-world DAG often has nodes that depend on data: "settle each merchant who had transactions today" produces a different number of tasks every day. On Republic Day or Diwali the number of active Razorpay merchants might be 8.2 lakh; on a normal Tuesday it might be 5.3 lakh. Hardcoding "one task per merchant" in the DAG file is impossible because the count is unknown until list_merchants finishes.

The pattern is dynamic task mapping: an upstream task produces a list of values, and the scheduler materialises one downstream task per value. Airflow added task.expand() for this in version 2.3 (2022); Dagster has had dynamic_outputs since launch; Prefect's .map() predates both. They all do the same thing under the hood — write new rows to the task and edge tables after the upstream completes.

The schema does not need to change. What changes is the dispatcher's flow:

Fan-out: how dynamic task rows materialiseThree-step diagram showing the lifecycle of a fan-out. Step 1: list_merchants runs and returns N values, written to the database. Step 2: a fan-out hook reads the N values and inserts N new task rows plus N new edge rows. Step 3: the executor's normal dispatch loop sees the new task rows in PENDING state and begins running them in parallel.Fan-out lifecycle: SUCCESS → INSERT N rows → dispatch normallystep 1: source task runslist_merchants → SUCCESSwrites N=1247 to a tablefan_out_value columnstep 2: fan-out hook firesreads list, inserts N task rowsname = "settle_M_<id>"all PENDING, parents = list_merchantsstep 3: normal dispatch_ready_tasks() sees themruns up to max_parallelN tasks → SUCCESSNote: step 2 runs INSIDE the dispatcher's main loop, between SUCCESS detection and the next _ready_tasks() pass.
Fan-out is a write to the `task` and `edge` tables, not a new dispatch rule. The hook runs as soon as the upstream succeeds; from then on the executor treats the children as it would any other tasks.

The TaskDef extension is one optional field — a callable that, given the upstream's result, returns the list of child tasks to create:

@dataclass
class TaskDef:
    name: str
    fn: Callable
    parents: List[str] = field(default_factory=list)
    max_attempts: int = 3
    is_sensor: bool = False
    sensor_timeout_seconds: int = 12 * 3600
    fan_out: Optional[Callable[[], List["TaskDef"]]] = None

def _on_task_success(self, name: str):
    """Hook that runs after a task transitions to SUCCESS.
    If the task has a fan_out callable, materialise the children."""
    td = self.tasks.get(name)
    if td is None or td.fan_out is None:
        return
    children = td.fan_out()
    with connect() as c:
        for child in children:
            c.execute("""INSERT OR IGNORE INTO task VALUES
                (?,?,'PENDING',0,?,NULL,NULL,NULL,NULL,NULL)""",
                (self.run_id, child.name, child.max_attempts))
            c.execute("""INSERT OR IGNORE INTO edge VALUES (?,?,?)""",
                      (self.run_id, name, child.name))
            self.tasks[child.name] = child

The INSERT OR IGNORE clause is the idempotency guard. Why INSERT OR IGNORE and not plain INSERT: a crash recovery may invoke _on_task_success a second time after a restart — the source task is already SUCCESS, the children are already in the database. A plain INSERT would raise a UNIQUE constraint violation and crash the executor; INSERT OR IGNORE makes the hook idempotent. Same primitive that makes _transition idempotent — at-least-once execution wrapped in a safe no-op for the second-and-subsequent calls.

A worked example: settling 1247 merchants in parallel.

def list_merchants():
    # In real life, query an OLTP DB; here, just drop a file
    today = datetime.utcnow().date().isoformat()
    merchants = [f"M_{i:06d}" for i in range(1247)]
    Path(f"/tmp/{today}_merchants.json").write_text(json.dumps(merchants))

def make_settle_tasks():
    today = datetime.utcnow().date().isoformat()
    merchants = json.loads(Path(f"/tmp/{today}_merchants.json").read_text())
    return [TaskDef(name=f"settle_{m}", fn=lambda m=m: settle_one(m),
                    parents=["list_merchants"]) for m in merchants]

def settle_one(merchant_id):
    # The actual per-merchant settlement work — typically 0.5–3 seconds
    time.sleep(random.uniform(0.5, 3.0))

tasks = [
    TaskDef("list_merchants", list_merchants, fan_out=make_settle_tasks),
    TaskDef("total_revenue",  compute_total, parents=[]),  # fan-in below
]

A sample run on a 4-thread executor takes about 12 minutes (1247 tasks × 1.75s mean / 4 parallel = 545s). On a 16-thread executor it takes 137 seconds; on a Kubernetes pool with 80 workers it is 27 seconds. The fan-out is what makes the per-merchant work a knob the operator can tune, not a property of the DAG file.

The output shows what the database looks like:

sqlite> SELECT state, COUNT(*) FROM task WHERE run_id='2026-04-25' GROUP BY state;
SUCCESS         | 1249
PENDING         | 0

sqlite> SELECT name FROM task WHERE name LIKE 'settle_%' LIMIT 5;
settle_M_000000
settle_M_000001
settle_M_000002
settle_M_000003
settle_M_000004

The 1247 children plus list_merchants and total_revenue make 1249 rows. Every one was created at runtime by make_settle_tasks reading the JSON list — none was hardcoded.

Fan-in: when one task waits for many

Fan-in is the syntactically obvious case — give a task many parents and the existing dispatch rule already does the right thing. The interesting question is when not to use a single fan-in node, because the wrong shape can make a 6-minute pipeline 6 hours long.

TaskDef("total_revenue", compute_total,
        parents=[f"settle_{m}" for m in merchants])  # 1247 parents

This works. The executor's _ready_tasks query will check 1247 parent rows for total_revenue on every dispatch pass, find one not-SUCCESS, and skip it; when the last parent finishes, the same query returns total_revenue as ready. The cost of each check is ~1 ms on SQLite for 1247 rows because of the idx_task_state index; a 1000-task fan-in with _ready_tasks running every state change is fine up to about 10 000 children. Beyond that, the parent-set check starts to dominate the executor's runtime and you want a different shape — a tree of fan-ins, where 100 child groups feed 10 mid-level aggregators, which feed 1 top aggregator. The total work is the same; the per-step parent count drops by an order of magnitude.

The wrong shape is chained fan-in — making settle_M_001 a parent of settle_M_002, and settle_M_002 a parent of settle_M_003, and so on, then having total_revenue only depend on the last one in the chain. This serialises 1247 tasks that should run in parallel. The 12-minute pipeline becomes 36 minutes (1247 × 1.75s = 2182s = 36 min). The author of such a DAG usually meant "in order, please" but did not realise that ordering on a fan-out is rarely needed — most settlements are independent of each other.

Why fan-in does not need a new state or new SQL: the existing rule "ready iff every parent is SUCCESS" works for any number of parents. The only thing that scales with parent count is the dispatcher's per-pass query cost, and that scales linearly with the index lookup, not the parent count itself. A 5-parent task and a 1000-parent task both compute readiness in essentially the same query — the difference is JOIN cardinality on a covered index, sub-millisecond either way.

A subtle question: what should total_revenue do if 30 of its 1247 parents are FAILED? Three policies are common in production:

  1. all_success (default). Any failed parent → total_revenue becomes UPSTREAM_FAILED. Strict; the aggregate is wrong if anyone failed.
  2. all_done. As long as every parent is in a terminal state (SUCCESS or FAILED or UPSTREAM_FAILED), run total_revenue and let it decide. Lenient; the aggregate task is responsible for filtering out failed merchants.
  3. one_success. Run as soon as any parent succeeds. Useful for "try mirror A, mirror B, mirror C; whichever works wins" patterns.

Airflow calls these trigger_rules; Dagster calls them dependency rules; Prefect uses task-level boolean flags. The executor extension is a single column on the edge:

ALTER TABLE edge ADD COLUMN trigger_rule TEXT NOT NULL DEFAULT 'all_success';

And one new clause in _ready_tasks:

-- For each parent, the parent's state must satisfy the edge's trigger_rule.
-- all_success: parent.state = 'SUCCESS'
-- all_done:    parent.state IN ('SUCCESS','FAILED','UPSTREAM_FAILED')
-- one_success: at least one parent.state = 'SUCCESS' (different shape)

The one_success rule is a different shape — it is "at least one" rather than "all", which inverts the NOT EXISTS to EXISTS. Most production DAGs default to all_success; teams reach for all_done when the aggregate is robust to missing inputs (e.g. a daily summary that should ship even if one merchant's pipeline failed) and one_success for failover patterns.

Where it goes wrong

Each of the three patterns has a characteristic failure mode that a 3 a.m. on-call needs to recognise.

Sensor stuck forever. A sensor that polls an external system can be stuck not because the system is slow but because the check is wrong. Common case: the file path includes a date, the date arithmetic is wrong (UTC vs IST, or off-by-one on month boundaries), the sensor is polling a path the file will never appear at. The fix is in the timeout — every sensor should have a hard upper bound (12 hours is the usual default), and a sensor that hits the timeout pages on-call rather than failing silently. Razorpay's settlement sensors have a 14-hour timeout because NPCI files can land as late as 2 p.m. IST; anything later is genuinely an outage.

Fan-out exploding. A list_merchants that returns 12 million rows because of a missing WHERE active = 1 clause produces a fan-out of 12 million tasks. The executor will dutifully insert 12 million rows into task and 12 million into edge, exhaust SQLite's lock timeout, and fall over. The defence is a hard cap on fan-out width — refuse to materialise more than (say) 50 000 children for one parent. The cap should be loud: log the violation, fail the upstream, page on-call. Swiggy's restaurant-onboarding pipeline added this guard in 2024 after a bad merchant-status-flag rollout fanned out into 4× normal width and brought down the scheduler.

Fan-in serialising under the hood. A task with 1247 parents looks parallel, but if the dispatcher's _ready_tasks query on each state change is slow (no index on task.state, no index on edge.parent, the join is wrong) then the dispatcher's overhead becomes the bottleneck — workers sit idle waiting for the next dispatch decision. Always check that idx_task_state and idx_edge_parent exist before scaling fan-in beyond a few hundred. The Airflow scheduler's "parsing" phase (which builds the equivalent in-memory graph) is the same kind of cost — Airflow 2.7 added incremental graph parsing to keep this from dominating large deployments.

Common confusions

Going deeper

Sensor reschedule mode vs poking mode

Two implementation strategies for sensors that look identical from the DAG author's view but cost very differently in production. Poking mode keeps the sensor in RUNNING state and the worker thread alive between checks (the worker's body is while True: if check(): return; time.sleep(60)). Reschedule mode is what the executor in this chapter does: the sensor returns to the dispatcher between checks, the worker thread is freed, the next check is a fresh dispatch. Poking mode wastes a worker slot for the entire wait; reschedule mode wastes nothing. On a Kubernetes Airflow deployment with 50 worker pods, 30 sensors waiting for SFTP files in poking mode would consume 30 pods (60 % of capacity) for hours; in reschedule mode they consume zero. Airflow added mode='reschedule' for sensors in 2018 specifically to recover from the "all the workers are sensors" deadlock — every team learns to set it on day two.

The Razorpay merchant-settlement DAG

Razorpay's daily settlement pipeline (as documented in their 2024 engineering blog) processes 8.2 lakh active merchants on a peak day. The shape is exactly the three-pattern DAG above: a sensor for the NPCI settlement file (timeout 14 hours), a load_npci_file extract task, a list_merchants upstream that produces N merchant IDs from the day's order log, a fan-out into N settle_<merchant> tasks, and a daily_total fan-in. The fan-out width on Republic Day 2024 was 9.4 lakh; the pipeline ran in 23 minutes on a 200-pod Kubernetes worker pool. The interesting choice in their design: each settle_<merchant> task has max_attempts = 5 rather than the default 3, because settlement involves 4 dependent network calls (UPI ack, ledger update, RBI compliance log, merchant notification) and a retry of any one is cheap relative to the cost of failure (a missed settlement is a customer-support ticket). The 5-attempt budget × 9.4 lakh tasks = 47 lakh attempt-slots; the day-of-Republic-Day actually used 11.2 lakh attempts — roughly one in eight tasks retried at least once. That is the kind of throughput a fan-out architecture is forced into to handle both transient failure and Indian-scale concurrency.

Why Airflow's TaskGroup is not a fan-out primitive

Airflow has two superficially similar features: TaskGroup (a visual grouping of tasks for the UI) and dynamic task mapping (task.expand()). Confusing them is common — both group related tasks. The difference: TaskGroup is static (you write the group's contents at DAG-author time); task.expand() is dynamic (the count is read at runtime). A monthly report DAG with 50 hardcoded categories is a TaskGroup. A daily settlement DAG with N variable merchants is task.expand(). Picking TaskGroup for a dynamic count means hardcoding the upper bound and accepting that DAG re-deploys are required when the bound changes; picking task.expand() for a static count means accepting unnecessary runtime overhead. Read the Airflow source for taskmixin.expand if you want the exact mechanism — the implementation writes new task_instance rows during scheduler parsing, which is the same hook the executor in this chapter exposes via fan_out.

Fan-out without ordering: the "no two writers to the same key" rule

A fan-out is parallel by design; the executor makes no guarantee about the order of execution among siblings. If two siblings write to the same key (same merchant ID, same row in a warehouse table), you have a race condition — last writer wins, and "last" is unpredictable. The defence is a partition contract: each sibling writes to a key it alone owns. settle_M_001 writes to merchant M_001's row and nobody else's; settle_M_002 writes to M_002. With this contract there is no race; without it, a fan-out is a bug factory. Idempotent writes (chapter 13) plus partition discipline are what make fan-out safe at scale; both are policies the DAG author enforces, not mechanisms the executor provides.

Where this leads next

The three patterns plus the 200-line executor cover roughly 90 % of pipeline shapes you will see in the wild. The remaining 10 % — recursive fan-outs, conditional branches that depend on data, cross-DAG dependencies — are extensions of the same dispatch rule.

References

  1. Airflow dynamic task mapping (expand) — the production-grade reference for fan-out.
  2. Airflow sensors and reschedule mode — why poking mode is a deadlock waiting to happen.
  3. Dagster dynamic outputs — the asset-graph framing of fan-out.
  4. Razorpay engineering: scaling daily settlement — Razorpay's published view of their settlement DAG architecture.
  5. Prefect 2.0 .map() and .unmapped() — the third major scheduler's take on fan-out.
  6. Apache Airflow trigger rulesall_success, all_done, one_success, and the seven less common ones.
  7. Writing a DAG executor in 200 lines — chapter 21, the executor this chapter extends.
  8. The DAG as the right abstraction — chapter 20, the abstraction every pattern in this chapter is a layout of.