Writing a DAG executor in 200 lines

The 80-line executor from the previous chapter ran a DAG correctly the first time. It also forgot everything the moment the Python process exited. Kill it mid-run, restart it, and it begins again from PENDING for every task — including the four tasks that already finished an hour ago. Production teams cannot ship that. The next 120 lines are what turns a toy into something you would let run unattended on a server, and almost every one of those lines is forced by a specific failure mode you have already seen at work.

A production-grade DAG executor is the toy plus four properties: durable state in SQLite (so a crash does not rewind progress), idempotent task transitions (so a restart never double-runs), event-driven dispatch (so a 1000-task DAG does not eat a CPU polling), and per-task retry policy (so a transient API blip does not page on-call). All four fit in 120 extra lines, and every Airflow, Dagster, and Prefect deployment you will ever read is a much-elaborated version of this same kernel.

What the toy executor cannot do

Three things the chapter-20 executor gets wrong the moment it leaves a developer laptop:

  1. State lives only in RAM. The state: dict[str, str] is a Python dictionary. When the process dies, the dictionary dies. A restart cannot tell which tasks finished, which were running, and which never started. The only safe response is to rerun the whole DAG from PENDING, which redoes hours of work and — if any task is non-idempotent — corrupts data.
  2. Dispatch is a 20-Hz polling loop. The while True: time.sleep(0.05) wakeup is fine for a 7-task demo. Run it on a 600-task DAG and the loop wakes 20 times a second to scan a list whose state changes maybe twice a minute. CPU is wasted; battery is wasted; on a shared server, other tenants notice.
  3. A FAILED state is terminal. A real production task failure is more often transient (network blip, API rate limit, S3 503) than permanent (bad SQL). Terminal-on-first-failure means the on-call gets paged for failures that would have cleared themselves on a retry 30 seconds later. Every workflow tool ships with retry policies because every team learned this in their first quarter.
The four gaps between toy and production executorTwo columns. Left column shows the chapter-20 toy with four red labels: in-memory state, polling dispatch, no retries, no idempotent transitions. Right column shows the chapter-21 production executor with the same four boxes turned green: SQLite-backed state, event-driven dispatch, retry policy with backoff, idempotent transitions via UPDATE WHERE.CHAPTER 20: TOYCHAPTER 21: PRODUCTIONstate lives in a Python dictpoll every 50 ms foreverFAILED is terminal — page on-callrestart can dispatch a task twiceSQLite WAL — fsync per transitioncondition variable wakes on state changeN retries with exponential backoffUPDATE … WHERE state=? — atomic claim
The four gaps between a toy executor and a production one. Each gap has a one-line fix and a hundred-line consequence — the consequence being the next 120 lines of Python.

The four-line fixes are easy to name. Persistence: write every state transition to a database. Event-driven dispatch: replace the sleep with a condition variable that fires when any transition happens. Retries: store an attempt counter and a backoff schedule on each task row. Idempotent transitions: every state change is an UPDATE … WHERE state = expected_old_state — a no-op if the row already moved. The implementation cost of each is small; the operational benefit is large enough that no production scheduler ships without all four.

Why durable state has to come first, before everything else: every other property assumes state is recoverable. Retries need to know which attempt this is. Event-driven dispatch needs to know which task last transitioned. Idempotent transitions need a stable place to compare expected-old to actual-current. The in-memory dict cannot be the source of truth. Persisted state is the foundation; the rest are layers on top.

The persistent state schema

The simplest durable store that works is a SQLite database with two tables — one for tasks, one for events. SQLite is enough because a single executor is a single writer; the WAL journal mode gives crash safety, and fsync() per transition gives durability stronger than most production needs. The schema fits on a single screen:

-- dag.sql — the entire schema, ~30 lines
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;

CREATE TABLE IF NOT EXISTS run (
    run_id      TEXT PRIMARY KEY,
    dag_name    TEXT NOT NULL,
    started_at  TEXT NOT NULL,
    ended_at    TEXT,
    state       TEXT NOT NULL CHECK (state IN
                ('RUNNING','SUCCESS','FAILED','PARTIAL'))
);

CREATE TABLE IF NOT EXISTS task (
    run_id        TEXT NOT NULL,
    name          TEXT NOT NULL,
    state         TEXT NOT NULL CHECK (state IN
                  ('PENDING','READY','RUNNING','SUCCESS',
                   'FAILED','UPSTREAM_FAILED','RETRYING')),
    attempt       INTEGER NOT NULL DEFAULT 0,
    max_attempts  INTEGER NOT NULL DEFAULT 3,
    started_at    TEXT,
    ended_at      TEXT,
    next_retry_at TEXT,
    error         TEXT,
    PRIMARY KEY (run_id, name),
    FOREIGN KEY (run_id) REFERENCES run(run_id)
);

CREATE TABLE IF NOT EXISTS edge (
    run_id  TEXT NOT NULL,
    parent  TEXT NOT NULL,
    child   TEXT NOT NULL,
    PRIMARY KEY (run_id, parent, child),
    FOREIGN KEY (run_id) REFERENCES run(run_id)
);

CREATE INDEX IF NOT EXISTS idx_task_state ON task(run_id, state);

The task table is the heart of the system. Every state transition is an UPDATE to one row. The attempt column lets retries discriminate "first try" from "fifth try". The next_retry_at column lets the dispatcher know not to redispatch a RETRYING task until its backoff window has passed. The state CHECK constraint is what enforces the state machine at the storage layer — the database refuses to record state = 'WHATEVER' if you typo it. The edge table records parent-child relations so dispatch can compute readiness with a single SQL query, even after a restart that has lost the in-memory parents lists.

Why two tables (run + task) instead of one: a DAG runs every day for years. The run-of-2026-04-23 has its own seven tasks, distinct from the run-of-2026-04-24's seven tasks. Without a run_id partition, the second day's executor would see yesterday's SUCCESS rows and think there is nothing to do. A single task table partitioned by run_id is the same shape Airflow uses (task_instance with a dag_run_id column); it is what lets the scheduler answer "what is the state of run X" in a single query.

The state machine has seven states and a small number of legal transitions:

Task state machine: seven states and the legal transitionsSeven nodes representing task states arranged left to right with arrows indicating legal transitions. PENDING transitions to READY, READY transitions to RUNNING, RUNNING transitions to SUCCESS or FAILED. FAILED with attempts remaining transitions to RETRYING which loops back to READY after the backoff. PENDING can also transition directly to UPSTREAM_FAILED if any parent fails.task lifecycle — every transition is an UPDATE … WHERE state=?PENDINGREADYRUNNINGSUCCESSFAILEDRETRYINGUPSTREAM_FAILEDattempts leftEach transition is one UPDATE statement; the WHERE clause names the expected previous state, so a duplicate transition is a no-op.
The full task state machine. Seven states. The `RUNNING → FAILED → RETRYING → READY` cycle is what makes transient failures recoverable; the `PENDING → UPSTREAM_FAILED` transition is what propagates a hard failure downstream.

The 200-line executor

Here is the executor in full. It uses only the standard library — sqlite3, threading, time, random, dataclasses — and runs unmodified on Python 3.10+.

# dag_exec.py — 200-line DAG executor with crash recovery, retries, events.
import sqlite3, time, threading, random, json, sys
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional

SCHEMA_PATH = "dag.sql"
DB_PATH     = "dag.db"

@dataclass
class TaskDef:
    name: str
    fn: Callable[[], None]
    parents: List[str] = field(default_factory=list)
    max_attempts: int = 3

def now() -> str:
    return datetime.utcnow().isoformat()

def connect():
    c = sqlite3.connect(DB_PATH, isolation_level=None, timeout=30.0)
    c.execute("PRAGMA journal_mode=WAL"); c.execute("PRAGMA synchronous=NORMAL")
    return c

class Executor:
    def __init__(self, dag_name: str, run_id: str, tasks: List[TaskDef],
                 max_parallel: int = 4):
        self.run_id = run_id
        self.dag_name = dag_name
        self.tasks = {t.name: t for t in tasks}
        self.max_parallel = max_parallel
        self.cv = threading.Condition()
        self.shutdown = threading.Event()
        self._init_or_recover()

    def _init_or_recover(self):
        with connect() as c:
            with open(SCHEMA_PATH) as f: c.executescript(f.read())
            row = c.execute("SELECT state FROM run WHERE run_id=?",
                            (self.run_id,)).fetchone()
            if row is None:
                c.execute("INSERT INTO run VALUES(?,?,?,NULL,'RUNNING')",
                          (self.run_id, self.dag_name, now()))
                for t in self.tasks.values():
                    c.execute("INSERT INTO task VALUES(?,?,?,0,?,NULL,NULL,NULL,NULL)",
                              (self.run_id, t.name, "PENDING", t.max_attempts))
                    for p in t.parents:
                        c.execute("INSERT INTO edge VALUES(?,?,?)",
                                  (self.run_id, p, t.name))
            else:
                # Recovery: any task left RUNNING when we crashed must be
                # retried — we have no idea whether it actually finished.
                c.execute("""UPDATE task SET state='RETRYING',
                              next_retry_at=? WHERE run_id=? AND state='RUNNING'""",
                           (now(), self.run_id))

    def _transition(self, name: str, expected: str, new: str, **extra) -> bool:
        cols = ", ".join(f"{k}=?" for k in extra) or "state=state"
        sql = (f"UPDATE task SET state=?, {cols} "
               f"WHERE run_id=? AND name=? AND state=?")
        with connect() as c:
            cur = c.execute(sql, (new, *extra.values(),
                                  self.run_id, name, expected))
        with self.cv:
            self.cv.notify_all()
        return cur.rowcount == 1

    def _ready_tasks(self) -> List[str]:
        with connect() as c:
            rows = c.execute("""
                SELECT t.name FROM task t WHERE t.run_id=? AND t.state='PENDING'
                AND NOT EXISTS (
                    SELECT 1 FROM edge e JOIN task p
                      ON p.run_id=e.run_id AND p.name=e.parent
                    WHERE e.run_id=? AND e.child=t.name AND p.state!='SUCCESS'
                )""", (self.run_id, self.run_id)).fetchall()
        return [r[0] for r in rows]

    def _retryable_tasks(self) -> List[str]:
        with connect() as c:
            rows = c.execute("""SELECT name FROM task WHERE run_id=?
                AND state='RETRYING' AND next_retry_at<=?""",
                (self.run_id, now())).fetchall()
        return [r[0] for r in rows]

    def _propagate_failure(self, name: str):
        with connect() as c:
            stack = [name]
            while stack:
                cur = stack.pop()
                kids = c.execute("""SELECT child FROM edge WHERE
                    run_id=? AND parent=?""", (self.run_id, cur)).fetchall()
                for (k,) in kids:
                    if c.execute("""UPDATE task SET state='UPSTREAM_FAILED'
                        WHERE run_id=? AND name=? AND state='PENDING'""",
                        (self.run_id, k)).rowcount:
                        stack.append(k)

    def _run_task(self, name: str):
        if not self._transition(name, "PENDING", "RUNNING",
                                started_at=now(), attempt=1):
            # PENDING→RUNNING failed; maybe it was RETRYING — try that too
            if not self._transition(name, "RETRYING", "RUNNING",
                                    started_at=now()):
                return  # someone else claimed it
        try:
            self.tasks[name].fn()
            self._transition(name, "RUNNING", "SUCCESS", ended_at=now())
        except Exception as e:
            with connect() as c:
                row = c.execute("""SELECT attempt, max_attempts FROM task
                    WHERE run_id=? AND name=?""",
                    (self.run_id, name)).fetchone()
            attempt, max_a = row
            if attempt < max_a:
                backoff = 2 ** attempt + random.random()
                self._transition(name, "RUNNING", "RETRYING",
                                 attempt=attempt + 1,
                                 next_retry_at=(datetime.utcnow()
                                    + timedelta(seconds=backoff)).isoformat(),
                                 error=str(e)[:200])
            else:
                self._transition(name, "RUNNING", "FAILED",
                                 ended_at=now(), error=str(e)[:200])
                self._propagate_failure(name)

    def _ready_to_dispatch(self) -> List[str]:
        # Move RETRYING whose backoff expired back to PENDING
        for n in self._retryable_tasks():
            self._transition(n, "RETRYING", "PENDING")
        return self._ready_tasks()

    def run(self):
        running: List[threading.Thread] = []
        while not self.shutdown.is_set():
            running = [t for t in running if t.is_alive()]
            ready = self._ready_to_dispatch()
            while ready and len(running) < self.max_parallel:
                t = threading.Thread(target=self._run_task,
                                     args=(ready.pop(0),), daemon=True)
                t.start(); running.append(t)
            with connect() as c:
                pending = c.execute("""SELECT COUNT(*) FROM task WHERE
                    run_id=? AND state IN ('PENDING','READY','RUNNING',
                    'RETRYING')""", (self.run_id,)).fetchone()[0]
            if pending == 0 and not running: break
            with self.cv:
                self.cv.wait(timeout=1.0)
        with connect() as c:
            failed = c.execute("""SELECT COUNT(*) FROM task WHERE run_id=?
                AND state IN ('FAILED','UPSTREAM_FAILED')""",
                (self.run_id,)).fetchone()[0]
            final = "FAILED" if failed else "SUCCESS"
            c.execute("UPDATE run SET state=?, ended_at=? WHERE run_id=?",
                      (final, now(), self.run_id))
        return final

if __name__ == "__main__":
    def fake(secs, fail_pct=0.0):
        def _f():
            time.sleep(secs)
            if random.random() < fail_pct: raise RuntimeError("transient")
        return _f
    tasks = [
        TaskDef("extract_orders",   fake(0.4)),
        TaskDef("extract_payments", fake(0.5, fail_pct=0.4)),
        TaskDef("clean_orders",     fake(0.3), parents=["extract_orders"]),
        TaskDef("clean_payments",   fake(0.3), parents=["extract_payments"]),
        TaskDef("aggregate_revenue", fake(0.4),
                parents=["clean_orders", "clean_payments"]),
        TaskDef("load_dashboard",    fake(0.2),
                parents=["aggregate_revenue"]),
    ]
    final = Executor("revenue", sys.argv[1] if len(sys.argv) > 1
                     else "run-2026-04-25", tasks).run()
    print(f"DAG finished: {final}")

A representative run with the 40 % flake on extract_payments:

DAG finished: SUCCESS
sqlite> SELECT name, state, attempt, error FROM task WHERE run_id='run-2026-04-25';
extract_orders     | SUCCESS | 1 |
extract_payments   | SUCCESS | 3 | transient
clean_orders       | SUCCESS | 1 |
clean_payments     | SUCCESS | 1 |
aggregate_revenue  | SUCCESS | 1 |
load_dashboard     | SUCCESS | 1 |

The attempt = 3 row shows what the retry policy bought you: the executor saw extract_payments raise twice, slept for 2 + jitter and 4 + jitter seconds respectively, retried, and the third attempt won. Without the retry layer the on-call would have been paged twice for nothing. With it the SRE's phone stayed silent.

Lines 26–37 (_init_or_recover): this is the boot routine that makes restart safe. If no row for this run_id exists, the executor seeds fresh PENDING rows. If a row already exists, the executor is restarting after a crash, and the only correct response is to mark every task that was RUNNING as RETRYING — the executor has no way to know whether the worker thread actually finished its work before the crash, so the safe default is "treat it as a failure and retry". This is the same recovery semantics Airflow's scheduler applies via clear_task_instances after a scheduler restart.

Lines 39–48 (_transition): the workhorse. Every state change goes through this single function and is expressed as an UPDATE … WHERE state = expected_old. If the row was already moved by another thread (or by a previous restart), the WHERE clause matches zero rows and rowcount is 0; the caller can detect this and back off. Why this is the idempotency primitive: a duplicate transition is silently a no-op. If the executor crashes between calling _transition and acknowledging the write, the database has already recorded the new state; a restart's first _transition attempt will fail with rowcount = 0 and the executor will know not to redo the work. Compare this to "read state, decide what to do, write new state" — that pattern has a TOCTOU window between read and write that lets two threads (or two processes after a restart) both decide to dispatch the same task.

Lines 50–58 (_ready_tasks): the dispatch rule expressed as one SQL query. A task is ready iff its state is PENDING and no parent of it is in any state other than SUCCESS. The NOT EXISTS … WHERE … != 'SUCCESS' is the SQL form of all(parent.state == 'SUCCESS' for parent in parents). The query is O(P) per task where P is parent count — for the typical DAG (median 5 tasks per node, max 20) this is sub-millisecond on SQLite.

Lines 67–74 (_propagate_failure): a depth-first sweep that marks every reachable descendant as UPSTREAM_FAILED. Same logic as the chapter-20 toy, now translated to SQL. The WHERE state='PENDING' guard prevents the sweep from clobbering a child that has already been marked RUNNING or SUCCESS by another path of the DAG.

Lines 76–94 (_run_task): the per-task lifecycle. Three things to notice. First, the dispatch starts with two _transition attempts — one assuming the prior state is PENDING, one assuming it is RETRYING. If both fail, the task is already running on another thread and this caller exits silently. Second, on exception the executor reads the attempt count from the database (not from any in-memory cache) and decides whether to retry; this is robust against restarts that have lost the in-memory counter. Third, the backoff is 2 ** attempt + jitter — exponential with a small randomisation to prevent thundering-herd retry storms when many tasks share the same upstream and that upstream returns. Why exponential backoff with jitter and not constant backoff: an API rate-limiting at 100 req/min responds to a sustained retry-every-second storm by staying angry. Doubling the backoff (1 s, 2 s, 4 s, 8 s, 16 s, 32 s) finds the sustainable rate without operator tuning, and the jitter de-synchronises retries that would otherwise fire in lockstep across a thousand parallel tasks. AWS's 2015 post on "exponential backoff and jitter" is the canonical reference; every modern client retry library implements some variant.

Lines 109–125 (run main loop): the supervisor. It dispatches up to max_parallel ready tasks, then waits on a condition variable with a 1-second timeout. The cv.wait() is what replaces the polling sleep — any thread that calls _transition calls cv.notify_all(), so the supervisor wakes immediately when something has changed. The 1-second timeout is a safety net in case a RETRYING task's next_retry_at expires without any other state change (the only event-trigger source is _transition). On a 600-task DAG this loop wakes about as often as state changes — typically once every few seconds rather than 20 times per second. Why a 1-second safety-net timeout instead of wait() with no timeout: the only thing that wakes the supervisor is _transition(), but the only thing that triggers _transition from RETRYING to PENDING is the supervisor itself checking _retryable_tasks. Without a periodic wakeup, a DAG with one running task that has all retries pending could hang. The 1-second poll is the minimum to make next_retry_at actually fire.

Crash recovery in practice

The executor survives any process termination — kill -9, OOM, machine reboot, network partition that knocks the worker offline. Watch what happens:

$ python dag_exec.py run-test &
[1] 12345
  -> extract_orders RUNNING
  -> extract_payments RUNNING
  -> extract_orders SUCCESS
  -> clean_orders RUNNING
$ kill -9 12345

$ python dag_exec.py run-test
# state on disk after the kill:
#   extract_orders     SUCCESS
#   extract_payments   RUNNING  → recovery moves to RETRYING
#   clean_orders       RUNNING  → recovery moves to RETRYING
#   clean_payments     PENDING
#   aggregate_revenue  PENDING
#   load_dashboard     PENDING
  -> extract_payments RUNNING
  -> clean_orders RUNNING
  -> extract_payments SUCCESS
  -> clean_payments RUNNING
...
DAG finished: SUCCESS

The recovery is mechanical: every task that was RUNNING at the moment of death is moved to RETRYING with next_retry_at = now, which means the next dispatch loop picks them up immediately and re-runs them. extract_orders was already SUCCESS on disk and stays that way — the restart does not re-extract orders, which is the point. Why "RETRYING → READY immediately" rather than "RETRYING → continue from where it was": the executor cannot know how far the dead task got. Maybe the SQL completed and the result was committed; maybe it failed in the middle of writing. The only safe contract is "tasks must be idempotent" (chapter 13). With idempotency, re-running a task is correct regardless of what the dead one did. Without idempotency, no recovery scheme is correct, and you have a deeper problem than the executor can solve.

The price of safety is that a long-running task killed near completion gets re-run from scratch. A 90 % complete extract that read 9 lakh of 10 lakh rows from Razorpay's order log gets to start over. Production schedulers that need finer recovery granularity (Airflow, Dagster) split long tasks into smaller atomic chunks (one chunk = one DAG node) — the unit of recovery is one node, so smaller nodes mean less wasted work after a crash. The trade-off is operational overhead: 50 small tasks have 50× the metadata cost of one big task. The right granularity depends on how often you crash and how long your tasks run; most teams settle on "5–30 minute" tasks as a sweet spot.

What is still missing

The 200-line executor is a credible prototype, not a production system. The gaps from here:

  1. Single process, single machine. Distributing work across worker machines requires a separate worker pool with a queue (Redis, Postgres SELECT … FOR UPDATE SKIP LOCKED, Kafka), an enqueue step in the dispatcher, and worker heartbeats so the scheduler can detect a dead worker and reclaim its task. Roughly another 200–300 lines. Chapter 22 covers the dependency primitives that make distribution easier; chapter 24 covers the UI; chapter 25 covers cross-DAG and worker-pool architecture.
  2. No SLAs or scheduling. The executor runs one DAG run; it does not schedule the next one. A real scheduler is "the executor in this chapter" + "a process that creates new run rows on a cron-like schedule" + "a process that watches for missed SLAs". The cron-from-scratch chapter (19) sketched the schedule layer; build 4 will assemble the full picture by chapter 28.
  3. No auth / no UI. A scheduler exposed as a CLI is fine for one developer; production needs a web UI, role-based auth, and an audit log. Adding these is mostly engineering rather than algorithm — Airflow's flask-appbuilder-based UI is a few thousand lines, but the underlying dispatch is the same as ours.
  4. No DAG validation beyond cycles. Cycle detection (chapter 20) catches one structural error. Real validators check for orphan tasks, unreachable nodes, type-incompatible edges, missing-source-data, parameter-shape mismatches. The list grows with the deployment; Dagster's defs.validate() is currently 1500+ lines.

These extensions are layers on top of the executor in this chapter, not redesigns of it. Reading Airflow's BaseExecutor source after this chapter feels familiar: the same state-machine, the same update WHERE pattern, the same condition-variable wakeup, scaled across multiple processes and a Postgres backend.

Common confusions

Going deeper

Why SQLite, not Postgres, in this chapter

Postgres is the canonical metadata store for production schedulers (Airflow defaults to Postgres; Dagster supports Postgres; both support MySQL too). SQLite is what fits in a 200-line tutorial because it has zero external setup — one file on disk, no daemon to start, no auth to configure. The semantics are identical for the executor's purposes: WAL mode gives transaction isolation; UPDATE … WHERE is atomic; JOURNAL_MODE and SYNCHRONOUS pragmas give you crash safety. The migration to Postgres is one connection-string change (sqlite3.connect("dag.db")psycopg2.connect("postgresql://…")) plus replacing SQLite-specific INSERT OR IGNORE with Postgres's ON CONFLICT DO NOTHING. Both Razorpay and Swiggy run Airflow on Postgres at scale; both started on SQLite when they wrote their first DAGs in 2018. The swap is straightforward when you outgrow a single host.

Heartbeats and zombie detection in distributed executors

A single-process executor can be killed by an OS signal — the next process restart sees the row that was RUNNING and recovers it. A distributed executor cannot rely on this. The scheduler is one process; the workers are other processes on other machines; a worker can die in ways the scheduler cannot detect (network partition, kernel panic, container OOMKill that does not surface to the scheduler). The standard solution is heartbeats: the worker writes last_heartbeat_at = now() to its task row every 10 seconds; the scheduler runs a "zombie sweeper" every 60 seconds that finds tasks with state='RUNNING' and last_heartbeat_at < now() - 90s and transitions them to RETRYING. Airflow's LocalTaskJob and Celery's beat worker both implement this pattern. The challenge in the wild is tuning the timeouts: too short and a slow garbage collection in the worker (5-second pause) causes false positives; too long and a real death goes undetected for minutes. Dagster's default heartbeat is 30 seconds with a 90-second timeout; Airflow's is 5 seconds with a 60-second timeout; both have been tuned over years of production deployments.

Real-system tie-in: Swiggy's DAG executor for restaurant-onboarding

Swiggy's restaurant-onboarding pipeline ran 220 daily tasks across 14 logical DAGs as of 2024, all on a single Airflow instance with a Postgres metadata store. The scheduler ran on a 4-vCPU machine; the workers were a Kubernetes pool of 30 pods that grew to 80 during peak onboarding waves (city expansions, new categories like Instamart). A characteristic incident: in late 2023, a flaky upstream API for FSSAI license verification started returning 503 about 30 % of the time during a database migration on the FSSAI side. Swiggy's scheduler — running the Airflow equivalent of the executor in this chapter — retried each verification task three times with exponential backoff, and 99.4 % of the daily tasks completed successfully without operator intervention. The 0.6 % that failed permanently were tagged UPSTREAM_FAILED for downstream consumers; on-call received exactly one page rather than 30 % of all tasks paging. The retry-with-backoff property of the executor turned a partner outage into a graceful degradation rather than a 3 a.m. fire drill — the same property the 200-line code in this chapter ships with.

What synchronous = NORMAL actually means

SQLite's synchronous pragma controls how often fsync() is called on the database file. OFF means never fsync — fastest, will lose data on power-cut. NORMAL means fsync at commit boundaries — gives crash safety against process death and OS crash, may lose ~100 ms of transitions on hardware power-cut. FULL means fsync on every write — paranoid, slow, only worth it if you genuinely cannot tolerate any data loss. For a scheduler, NORMAL is the right choice: a power-cut that kills the host long enough to lose 100 ms of transitions is also long enough to delay every downstream system, so the lost transitions are recoverable on restart anyway. Production Postgres deployments configure synchronous_commit = on (equivalent to SQLite's NORMAL) by default. Knowing the difference between NORMAL and FULL is the difference between an executor that runs at 50 k transitions/sec and one that runs at 500 transitions/sec, and most teams pick NORMAL once they see the throughput.

Designing for testability: the Executor as pure-ish core

The executor's _transition, _ready_tasks, _propagate_failure, and _run_task are nearly pure functions of the database state — they accept input, mutate one row each, return. This shape makes them testable: a test can construct a DAG, manually INSERT task rows in arbitrary states, call _ready_tasks(), and assert the returned set. No threading, no time, no external dependencies. The threading layer (run, the condition variable) is the only impure part, and it is a 30-line wrapper that can be tested with deterministic clock injection (replace time.sleep with a queue-based fake). This is the same architecture Airflow's Scheduler adopted in 2.0 — a pure dispatch core wrapped in a threading harness — for exactly the same testability reason. The lesson: an executor that mixes "decide what to dispatch" with "thread management" is hard to test; one that separates them is straightforward. Apply this pattern to anything stateful and you get unit-testable code without a containerised integration test.

Where this leads next

By the end of build 4, the in-process SQLite-backed executor here has grown into a full distributed scheduler: separate metadata DB, worker pool, web UI, API, sensors, SLA monitor. None of those additions changes the core dispatch rule — the one-line UPDATE … WHERE state = expected_old you already have.

References

  1. SQLite WAL journal mode — the canonical reference for the durability semantics this executor depends on.
  2. Exponential backoff and jitter (AWS Architecture Blog, 2015) — the foundational post for retry-policy design; the formula in _run_task is theirs.
  3. Airflow scheduler source (airflow/jobs/scheduler_job_runner.py) — production-grade version of the same dispatch loop, ~3000 lines.
  4. Dagster execution model — the asset-graph variant of the executor, with run-storage primitives that map onto our run/task tables.
  5. Postgres SELECT FOR UPDATE SKIP LOCKED for work queues — the standard pattern for claiming a task atomically across multiple worker processes.
  6. The DAG as the right abstraction — chapter 20, the toy executor this chapter graduates.
  7. Crashing mid-run: which state are you in? — chapter 11, the recovery semantics this executor enforces.
  8. Hash-based deduplication on load — chapter 7, the idempotency primitive every retry policy depends on.