Retries, timeouts, and poisoned tasks

The 200-line executor from chapter 21 retries failed tasks up to max_attempts times with a fixed backoff. That is the bare minimum a production scheduler needs and also a near-perfect machine for amplifying every kind of partial failure into a system-wide outage. A flaky downstream API that is slow but eventually answers will tie up workers for hours before the scheduler decides anything is wrong. A task with a deterministic bug — a divide-by-zero on a row that arrived this morning — will retry three times, fail three times, and the only thing the system has gained is three log lines that say the same thing. A task that hangs forever waiting for a TCP connection that the kernel has already given up on will never even reach the retry counter, because no exception will ever be raised. Three different failure modes, three different fixes, all of them belonging to the same chapter because the on-call engineer at 3 a.m. cannot tell them apart from the dashboard.

Retries fix transient failures and need exponential backoff with jitter so a downstream outage does not turn into a thundering herd at recovery. Timeouts kill tasks that hang, and need to apply at the executor level — never trust the task body to enforce its own. A poisoned task is one that fails deterministically the same way on every retry; the scheduler must detect it (same exception fingerprint, same line number) and stop retrying. Three policies, one cost: every retry is a free copy of the original work, so the scheduler that does not bound them costs you compute and time you cannot get back.

Three failure modes the executor must distinguish

The chapter-21 executor treats every exception identically — increment attempt, check max_attempts, schedule a retry. That conflates problems with very different shapes:

Three failure modes a scheduler must distinguishA three-column diagram. Left column shows a transient failure: task fails, retries, eventually succeeds. Middle column shows a hang: task starts, never finishes, holds a worker slot indefinitely. Right column shows a poisoned task: every attempt fails with the same error, so retries waste compute without progress.Three failure modes — same exception type, three different fixestransientflaky network, brief 503try 1 ✗try 2 ✗try 3 ✓fix: retry with backoff+ jitter to avoid herdscost: a few secondshangstuck TCP, deadlock, infinite looprunning... running... running...no exception ever raisedfix: external timeoutenforced by executor, not taskcost: a worker slot, indefinitelypoisoneddeterministic bug, bad input rowtry 1 ✗try 2 ✗try 3 ✗same line, same trace, same errorfix: detect, dead-letterstop retrying, alertcost: the work × max_attemptsA retry budget meant for transient failures pays for poisoned tasks too — until you teach the executor the difference.
The three failure modes look identical to a fixed-policy retry loop. The fix in each column is different — and the cost of conflating them in production is paid in compute, in worker slots, and in the on-call engineer's sleep.

The rest of this chapter is the executor extension that handles all three.

Exponential backoff and the thundering herd

The chapter-21 executor uses fixed backoff — wait RETRY_DELAY_SECONDS (typically 30 seconds) between attempts. That is fine for one task on one downstream. It is exactly wrong when 5000 tasks all retry against the same downstream that just came back from a 2-minute outage. Every one of those 5000 retries fires at the same instant the downstream recovers — the recovered service sees its normal Tuesday-evening traffic plus 5000 retries in the same second and falls over again. The pattern has a name: the thundering herd. Razorpay's payments gateway saw exactly this in 2023 when a Kafka rebalance delayed 12 000 settlement tasks by 90 seconds; all 12 000 retried within a 30-second window and pushed the downstream to 8× normal QPS, triggering its rate limiter, triggering more retries, and so on for 18 minutes.

The fix has two parts: make each task wait longer than the last, and add randomness so two tasks scheduled at the same instant do not retry at the same instant.

import random

def backoff_delay(attempt: int,
                  base: float = 1.0,
                  cap: float = 300.0,
                  jitter: str = "full") -> float:
    """Compute the delay before the (attempt+1)-th retry.

    base    — the unit; first retry waits ~base seconds
    cap     — never wait longer than this
    jitter  — 'full' (uniform 0..exp), 'equal' (exp/2 .. exp), 'none'
    """
    exp = min(cap, base * (2 ** attempt))
    if jitter == "full":
        return random.uniform(0, exp)
    if jitter == "equal":
        return exp / 2 + random.uniform(0, exp / 2)
    return exp                              # no jitter — DON'T do this in prod

A run with base = 1.0 shows what the curve actually looks like:

attempt  exponential   full-jitter sample
   0       1.0 s        0.61 s
   1       2.0 s        1.34 s
   2       4.0 s        2.81 s
   3       8.0 s        5.12 s
   4      16.0 s       12.87 s
   5      32.0 s       19.40 s
   6      64.0 s       58.22 s
   7     128.0 s      102.95 s
   8     256.0 s      183.71 s
   9     300.0 s      241.04 s   ← capped

The walkthrough is short but matters. exp = min(cap, base * (2 ** attempt)) is the textbook exponential ramp, capped to keep the wait from becoming hours after attempt 12. random.uniform(0, exp) is full jitter, the policy AWS recommended in their 2015 architecture blog and the one the AWS SDKs use by default — every retry waits a uniformly random time between 0 and the current exponential ceiling. This is what scatters 5000 simultaneous retries across the next 60 seconds instead of bunching them into the same millisecond. return exp with jitter='none' is what the chapter-21 executor essentially had with a fixed delay; it is the herd-amplifier.

Why full jitter and not equal jitter or no jitter: full jitter spreads retries uniformly over the entire backoff window, which gives the recovering downstream the smoothest possible traffic shape. Equal jitter (waiting at least exp/2 then a random amount more) keeps the worst-case wait closer to exp, which is sometimes preferred when the cost of an early retry is high. No jitter is what produces thundering herds — every task with the same attempt count retries at the same instant, and with at-least-once execution every batch of failed tasks shares the same attempt count. The 5000-retry-spike Razorpay saw in 2023 was eliminated within one week of switching to full jitter.

The executor's retry path becomes:

def _on_task_failure(self, name: str, err: Exception):
    td = self.tasks[name]
    row = self._task_row(name)
    if row.attempt + 1 >= td.max_attempts:
        self._transition(name, "RUNNING", "FAILED",
                         ended_at=now(), error=str(err))
        self._propagate_failure(name)
        return
    delay = backoff_delay(row.attempt, base=2.0, cap=600.0, jitter="full")
    next_at = (datetime.utcnow() + timedelta(seconds=delay)).isoformat()
    self._transition(name, "RUNNING", "RETRYING",
                     attempt=row.attempt + 1, next_retry_at=next_at,
                     error=str(err))

The dispatcher already picks up RETRYING tasks whose next_retry_at is past — no other change is needed.

Timeouts: when the task body never returns

Retries assume the task eventually raises an exception. A task that hangs — stuck in a TCP recv on a connection that the kernel has half-closed but not torn down, or in a Python loop that should have terminated but did not — never raises and never finishes. The chapter-21 executor will happily wait 4 hours for it. Worse, the worker thread it occupies is unavailable to run any other task. A 4-thread executor with one stuck task is now 75 % capacity; a 4-thread executor with four stuck tasks is 0 %.

A task body cannot be trusted to enforce its own timeout. Even a well-written one that uses socket.settimeout(30) only covers the socket call — a regex with catastrophic backtracking, an os.read on a slow disk, or a C extension that ignores Python signals will sail past the timeout. The executor must enforce timeouts from outside the task.

The two implementation strategies are subprocess and thread-with-cancel:

Timeout enforcement: subprocess vs threadTwo columns. Left: subprocess-based timeout. The executor forks a child process, sends it the task to run, sets a wall-clock alarm; if the alarm fires, the executor sends SIGTERM, waits 5 seconds, sends SIGKILL. Right: thread-based with cooperative cancellation. The executor sets a deadline; the task body must check the deadline at every I/O boundary and raise TimeoutError. Subprocess is enforceable; thread is opt-in.Timeout enforcement: subprocess (kill-able) vs thread (cooperative)subprocessenforceable, OS-levelfork() → run task in child PIDdeadline hit: SIGTERM, 5s, SIGKILL+ kernel reclaims memory + FDs- 30 ms fork overhead per taskthread + cancel-tokencheap, requires task cooperationthreading.Thread + Event flagdeadline hit: set flag → task checks+ no fork overhead- a hung task ignores the flag
Subprocess-based timeout is the only one that *enforces*. Thread-based timeout needs the task body to cooperate — which is exactly what you cannot rely on when the task is hung in a C extension that holds the GIL or a kernel call that does not return.

Airflow uses subprocess isolation through its LocalExecutor; Dagster's "step worker" runs each step in a subprocess for the same reason; Prefect's ProcessPoolExecutor does the same. Spotify's Luigi historically ran tasks in-thread and shipped a string of "stuck task" production stories that the others learned from. The pattern that works:

import multiprocessing as mp
import os, signal, time

def _run_with_timeout(fn, timeout_s: int):
    """Run fn in a child process; SIGTERM-then-SIGKILL on timeout."""
    parent, child = mp.Pipe()
    def child_body():
        try:
            fn()
            child.send(("ok", None))
        except Exception as e:
            child.send(("err", repr(e)))
    p = mp.Process(target=child_body)
    p.start()
    deadline = time.monotonic() + timeout_s
    while p.is_alive():
        remaining = deadline - time.monotonic()
        if remaining <= 0:
            os.kill(p.pid, signal.SIGTERM)        # graceful: 5-second grace
            p.join(timeout=5)
            if p.is_alive():
                os.kill(p.pid, signal.SIGKILL)    # ungraceful: definitive
                p.join()
            raise TimeoutError(f"task exceeded {timeout_s}s")
        if parent.poll(timeout=min(remaining, 1.0)):
            status, payload = parent.recv()
            p.join()
            if status == "err":
                raise RuntimeError(payload)
            return
    raise RuntimeError("child died without sending result")

A worked example with a deliberately stuck task:

$ python timeout_demo.py
[14:02:00] starting task hang_forever (timeout=10s)
[14:02:10] timeout reached, sending SIGTERM
[14:02:15] still alive, sending SIGKILL
[14:02:15] task hang_forever FAILED: task exceeded 10s

os.kill(p.pid, signal.SIGTERM) is the polite ask — most Python code responds by raising KeyboardInterrupt and unwinding cleanly. p.join(timeout=5) gives the child five seconds to flush logs, close sockets, release locks. os.kill(p.pid, signal.SIGKILL) is the unconditional kill that the kernel honours regardless of what the child is doing — it is the one that protects you from C extensions that ignore signals or kernel calls that block uninterruptibly.

Why SIGTERM-then-SIGKILL and not just SIGKILL: a SIGKILL'd process cannot run cleanup code. If the task held an exclusive write lock on a Parquet file, SIGKILL leaves the lock file behind and the next attempt blocks indefinitely. SIGTERM gives the task a 5-second window to release locks, flush a partial write, write a "I died" marker. Only after that window expires does the executor escalate to the unconditional kill. Postgres uses the same pattern internally for pg_cancel_backend (polite) vs pg_terminate_backend (unconditional); Linux's systemd does the same with KillSignal=SIGTERM followed by TimeoutStopSec.

The TaskDef extension is one field — timeout_seconds: Optional[int] = None. Tasks without a timeout retain the chapter-21 behaviour (run in-thread, no enforcement); tasks with a timeout run via _run_with_timeout. A reasonable default for ETL tasks is 30 minutes; for a sensor it is recheck_in + 60 (the polling task should be much shorter than the polling interval); for a backfill it can be 6 hours.

Poisoned tasks: when retrying is the bug

The third failure mode is the one that costs the most and is the easiest to ignore. A poisoned task is one that fails deterministically — every attempt produces the same exception, at the same line, with the same message. The classic case is a single bad row in the input: a record with a null field that the parser does not handle, or a UTF-8 decode error on a row that was written in Latin-1 by a system upstream that nobody owns any more. The first attempt fails. The retry runs the same code on the same input and fails the same way. The third retry does too. The chapter-21 executor has now spent 3× the work for zero progress.

Worse, in a fan-out (chapter 22) where the bad row affects 1247 sibling tasks — say, every task that reads from the same upstream Parquet file with a corrupted header — the cost is 3 × 1247 = 3741 wasted task runs, plus the wall-clock time of the entire DAG.

The fix has three layers:

Layer 1 — fingerprint the failure. Hash the exception type and the trace's "innermost" frame (the file and line where the exception was raised). If two consecutive attempts of the same task have the same fingerprint, the failure is deterministic and there is no point in further retries.

Layer 2 — distinguish poison-pill rows from poison-pill code. A bad row should be quarantined (skip the row, continue the task); a bad code-path is the task itself being broken. The distinction comes from how the task is written — a try/except around the per-row body that writes bad rows to a dead_letter table is the per-row variant; the per-task variant is what the executor handles.

Layer 3 — dead-letter the task. A task whose fingerprint repeats moves to a new state, DEAD_LETTER, that does not propagate UPSTREAM_FAILED to children automatically. Instead it pages the on-call: "the same error has fired N times; a human must look".

The schema and code:

ALTER TABLE task ADD COLUMN failure_fingerprint TEXT;
-- extend the CHECK constraint to include DEAD_LETTER:
state TEXT NOT NULL CHECK (state IN
   ('PENDING','READY','RUNNING','SUCCESS','FAILED',
    'UPSTREAM_FAILED','RETRYING','SENSING','DEAD_LETTER'));
import hashlib, traceback

def _fingerprint(err: Exception) -> str:
    """Stable hash of (type, innermost frame). Same bug → same hash."""
    tb = traceback.extract_tb(err.__traceback__)
    last = tb[-1] if tb else None
    sig = f"{type(err).__name__}|{last.filename if last else ''}:{last.lineno if last else 0}"
    return hashlib.sha1(sig.encode()).hexdigest()[:12]

def _on_task_failure(self, name: str, err: Exception):
    td = self.tasks[name]
    row = self._task_row(name)
    fp_now = _fingerprint(err)
    fp_prev = row.failure_fingerprint
    same_bug = (fp_prev is not None and fp_prev == fp_now)
    # Poison-pill detection: 2 identical fingerprints = no point retrying
    if same_bug and row.attempt >= 1:
        self._transition(name, "RUNNING", "DEAD_LETTER",
                         ended_at=now(), error=str(err),
                         failure_fingerprint=fp_now)
        self._page_oncall(name, fp_now, repr(err))
        return
    if row.attempt + 1 >= td.max_attempts:
        self._transition(name, "RUNNING", "FAILED",
                         ended_at=now(), error=str(err),
                         failure_fingerprint=fp_now)
        self._propagate_failure(name)
        return
    delay = backoff_delay(row.attempt, base=2.0, cap=600.0, jitter="full")
    next_at = (datetime.utcnow() + timedelta(seconds=delay)).isoformat()
    self._transition(name, "RUNNING", "RETRYING",
                     attempt=row.attempt + 1, next_retry_at=next_at,
                     error=str(err), failure_fingerprint=fp_now)

A run on a deterministically-broken task:

$ python dag_exec.py settlement
[03:11:00] settle_M_000042 RUNNING
[03:11:00] settle_M_000042 FAILED ZeroDivisionError|/.../settle.py:87 (fp=4a7c1d)
[03:11:00] settle_M_000042 RETRYING attempt=1 next_retry_at=03:11:01.34
[03:11:01] settle_M_000042 RUNNING
[03:11:01] settle_M_000042 DEAD_LETTER ZeroDivisionError|/.../settle.py:87 (fp=4a7c1d)
[03:11:01] PAGED on-call: poison-pill on settle_M_000042 (4a7c1d)

The executor recognised the same fingerprint on attempt 2 and stopped — saving the third retry that would have failed identically. Why fingerprint on (type, file, line) and not on the full message: an exception message often includes a row ID or a timestamp that changes between attempts. KeyError: 42 and KeyError: 43 are the same bug if both fired at parser.py:87. Hashing only on the structural elements catches the deterministic case while ignoring the noise. Sentry's grouping algorithm uses essentially the same approach since 2014; Bugsnag and Rollbar followed.

A DEAD_LETTER task does not automatically propagate UPSTREAM_FAILED. The on-call runbook is: read the error, decide whether to fix the code (most common), fix the data (second most common), or mark the task as a permitted failure and continue the DAG (rarest, useful for one-off backfills). A future chapter on observability will wire this into the dashboard.

What goes wrong

Retry budgets that expire mid-day. A max_attempts = 3 with full-jitter backoff capped at 600 seconds means the worst-case retry sequence consumes 0–600 + 0–600 = up to 20 minutes before the task is FAILED. Pipelines with strict SLAs need the budget to fit inside the SLA — a daily report with a 2 a.m. delivery deadline that started at 11 p.m. has 3 hours of total slack, of which retries can eat at most 30 minutes. Pick max_attempts × cap with the SLA in mind.

Timeouts that are tighter than the work. A timeout_seconds = 600 on a task that legitimately needs 12 minutes on Diwali night (when Flipkart's catalogue refresh runs 4× normal volume) will kill the task at 600 seconds, retry, kill, retry, kill, and then dead-letter it as a poisoned task — except the bug is the timeout, not the task. Match timeouts to the 99th-percentile observed duration, not the median; budget for seasonal load.

Poison-pill detection that triggers on flaky tests. A test that intermittently fails with AssertionError at the same line will fingerprint identically across attempts even though the failure is not deterministic — the line of the assertion is where the failure surfaces, not where the underlying flake originates. The defence is a minimum number of attempts before fingerprint-based dead-lettering kicks in (the code above uses row.attempt >= 1, i.e. require at least one prior failure with the same fingerprint). Tune up to 2 for very flaky environments; tune down to 0 (immediate dead-letter on any failure) only for genuinely deterministic pipelines.

Subprocess overhead amplifying for short tasks. A 30 ms subprocess fork is invisible for a 30-second ETL task; for a 100 ms metadata-only task it is a 30 % overhead. The optimisation is to run timeout-required tasks in subprocess and timeout-optional tasks in-thread — the 12 % of tasks that are both short and CPU-bound are not worth the isolation cost. Airflow's LocalExecutor works around this with a per-DAG-config flag; the small-executor-from-scratch above can do the same with td.timeout_seconds is None as the in-thread predicate.

Common confusions

Going deeper

The AWS jitter paper and why "full" beat "equal"

The 2015 AWS Architecture blog post "Exponential Backoff and Jitter" by Marc Brooker compared four strategies — no jitter, equal jitter, full jitter, and decorrelated jitter — across a simulated thundering herd of 1000 retrying clients against one server with a fixed throughput. Full jitter (uniform 0..exp) and decorrelated jitter (each client's wait derived from the previous wait, not the attempt count) both eliminated the herd; equal jitter reduced it but kept a visible spike at the lower bound; no jitter showed a textbook saw-tooth load shape. AWS standardised on full jitter for the SDKs because it was simplest to explain to developers and worked within 5 % of the optimum. Read the original post — it has the simulation graphs that make the case visually. The Razorpay 2023 post-mortem after their 18-minute Kafka cascading outage cites this paper explicitly as the reason they switched.

Why Airflow's "task_concurrency" interacts with timeouts

Airflow has three concurrency knobs that confuse new operators: dag_concurrency (max simultaneous tasks across one DAG), task_concurrency (max simultaneous instances of a single task across all runs), and pool (named slot reservation). When a task with a long timeout (say 6 hours) is configured with task_concurrency = 1, a single hung instance blocks every subsequent run for up to 6 hours. The interaction matters for sensors especially — a sensor with task_concurrency = 1 and a 12-hour timeout is a global serialisation point for every DAG that uses that sensor. The fix is per-DAG sensor instances (use external_task_id sensors that scope to one DAG-run) or move the sensor to mode = 'reschedule' so it does not hold a slot. Production Airflow at Razorpay sets task_concurrency to None for sensors and 1 only for tasks that genuinely cannot run twice (e.g. tasks that take a global lock on a shared mutable warehouse table).

Idempotent retries in the presence of side effects

A retry only works if the task is idempotent — running it twice produces the same final state as running it once. Chapter 13 covered the idempotency primitives (deterministic IDs, MERGE/UPSERT, dedup hashes); this chapter assumes them. The interesting failure mode is a task that is almost idempotent: it writes to a target table idempotently but sends a Slack notification non-idempotently, so a retry sends two Slack messages. The fix is the side-effect-isolation pattern — push side effects to a separate task downstream of the idempotent core, so a retry of the core does not re-fire the side effect. Stripe's payment-confirmation pipeline does this: the core "record the payment in the ledger" task is idempotent and retries freely; the "email the customer" task is a separate downstream that runs at-most-once via an idempotency key on the email service. Splitting the DAG this way is what makes retries safe end-to-end.

Why some teams use circuit breakers instead of unbounded retries

A circuit breaker is the operational sibling of max_attempts. Instead of bounding per-task attempts, it bounds system-wide failures: if more than X % of tasks against the same downstream have failed in the last Y minutes, the circuit "opens" and all subsequent tasks against that downstream FAIL immediately without trying. Hystrix popularised this in 2012; resilience4j is the current Java standard; Python has pybreaker. The integration into a scheduler is one extra column on the task — the "downstream group" — and one shared counter per group. When the counter exceeds the threshold, the dispatcher refuses to dispatch tasks in that group. This pairs with retries (per-task) to give two layers of defence: the per-task retry handles a single transient blip, and the circuit breaker handles a downstream that is genuinely down. PhonePe runs both; their 2024 SRE blog described how the combination cut their cascading-failure incidents by 70 % year-over-year.

Where this leads next

The retry, timeout, and poison-pill primitives close the loop on the chapter-21 executor — every failure mode now has a named state, a bounded cost, and an exit. What remains in Build 4 is the operational layer: backfills, SLAs, and the ergonomics that decide whether a team will use the scheduler in production or quietly route around it.

References

  1. Exponential backoff and jitter (AWS Architecture Blog, 2015) — the foundational analysis of full vs equal vs no jitter.
  2. Hystrix: latency and fault tolerance for distributed systems (Netflix, 2012) — the circuit breaker pattern's first widely-used implementation.
  3. Airflow task retries and timeouts — production reference for retries, retry_delay, retry_exponential_backoff, execution_timeout.
  4. Dagster step worker isolation — why each step gets its own subprocess.
  5. Sentry's exception grouping algorithm — the fingerprint-based deduplication that this chapter borrows for poison detection.
  6. Razorpay engineering: scaling daily settlement — the post-mortem behind the 2023 thundering-herd outage.
  7. Writing a DAG executor in 200 lines — chapter 21, the executor this chapter extends.
  8. Task dependencies: wait-for, fan-out, fan-in — chapter 22, the dependency primitives the retry policy now defends.