AIStackInsightsAIStackInsights
HomeBlogCategoriesAboutNewsletter
AIStackInsightsAIStackInsights

Practical AI insights — LLMs, machine learning, prompt engineering, and the tools shaping the future.

Content

  • All Posts
  • LLMs
  • Tutorials
  • AI Tools

Company

  • About
  • Newsletter
  • RSS Feed

Connect

© 2026 AIStackInsights. All rights reserved.

Tutorials

AI Agents Keep Dying in Production. The Fix Was Invented in 1986.

Your agent framework handles the happy path. Erlang's supervision trees handled telecom uptime for 40 years. Here's how to apply the same 'let it crash' philosophy to make AI agents self-healing.

AIStackInsights TeamApril 5, 202614 min read
ai-agentsproductionreliabilitypythontutorialsarchitecture

Your AI agent works perfectly in your notebook. It calls tools, reasons through multi-step plans, and produces beautiful structured output. Then you deploy it.

In production, the OpenAI API returns a 529. Your vector database times out. The agent hallucinates a tool name that doesn't exist and enters an infinite retry loop. Your monitoring shows the process is alive. Your users show you the blank screen they've been staring at for three minutes.

Every agent framework — LangChain, CrewAI, AutoGen, OpenAI Agents SDK — gives you the happy path. None of them give you a real answer for what happens when things go wrong at 3am. But this problem was solved decades ago, in a domain where "the process crashed" could mean a phone network serving millions of subscribers went dark.

The solution is supervision trees, and they come from Erlang's OTP framework — battle-tested in telecom switches that achieved 99.9999999% uptime (nine nines). Here's how to apply the same architecture to make your AI agents self-healing.

Why AI Agents Fail Differently

Traditional software fails in predictable ways. A null pointer, an out-of-bounds index, a connection timeout. You can write try/catch blocks for these because you can enumerate the failure modes.

AI agents fail in ways you cannot predict:

Failure ModeTraditional SoftwareAI Agent
Invalid outputType error at compile/runtimeSyntactically valid JSON that is semantically wrong
Infinite loopDetectable via static analysisAgent "reasons" itself in circles, burning tokens
Resource exhaustionOOM, disk full$400 API bill from a single runaway chain
Partial failureTransaction rollbackAgent completed 7 of 10 steps, state is inconsistent
Silent corruptionData integrity checks catch itAgent confidently returns wrong answer, no error raised

The try/catch model breaks down because most agent failures are not exceptions. They are valid executions that produce wrong results. You cannot catch a hallucination with except Exception.

The Erlang Insight: Let It Crash

In 1986, Joe Armstrong at Ericsson started building a language to run telephone switches. The key insight was radical: don't try to prevent crashes. Instead, make crashes cheap and recovery automatic.

The architecture has three principles:

  1. Isolation. Each process runs in its own memory space. A crash in one process cannot corrupt another.
  2. Supervision. Every process has a parent (a supervisor) that monitors it and knows how to restart it.
  3. Restart strategies. Supervisors follow explicit policies: restart just the failed child, restart all children, or restart all children that started after the failed one.

This is not theoretical. Ericsson's AXD 301 switch ran Erlang with 1.7 million lines of code and achieved nine nines of uptime — 31 milliseconds of downtime per year. WhatsApp used Erlang to serve 900 million users with a 32-person engineering team.

The question is: can we apply these patterns to AI agents?

Yes. And it works remarkably well.

Supervision Trees for AI Agents

Here is the architecture. Every component is a Python class you can implement today.

                    ┌─────────────────┐
                    │  RootSupervisor │
                    │  (one_for_one)  │
                    └────────┬────────┘
                             │
              ┌──────────────┼──────────────┐
              │              │              │
        ┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
        │  Research  │ │  Analysis │ │  Writing   │
        │  Supervisor│ │ Supervisor│ │ Supervisor │
        │ (rest_for_ │ │(one_for_  │ │(one_for_   │
        │   one)     │ │  all)     │ │  one)      │
        └─────┬──────┘ └─────┬─────┘ └─────┬─────┘
              │              │              │
         ┌────┼────┐    ┌───┼───┐     ┌───┼───┐
         │    │    │    │   │   │     │   │   │
        Web  API  DB  Parse Rank  Fmt Draft Edit  Review
       Agent Agent Agent Agent Agent Agent Agent Agent Agent

Each leaf is an AI agent (or a tool-calling process). Each intermediate node is a supervisor. The root supervisor manages the top-level pipeline.

The Core Abstraction

from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import asyncio
import time
import uuid
 
 
class RestartStrategy(Enum):
    ONE_FOR_ONE = "one_for_one"      # restart only the failed child
    ONE_FOR_ALL = "one_for_all"      # restart all children
    REST_FOR_ONE = "rest_for_one"    # restart failed + all children started after it
 
 
class ChildState(Enum):
    RUNNING = "running"
    STOPPED = "stopped"
    FAILED = "failed"
    RESTARTING = "restarting"
 
 
@dataclass
class ChildSpec:
    """Specification for a supervised child process."""
    id: str
    start_fn: Any          # async callable that returns a result
    max_restarts: int = 3  # max restarts within restart_window
    restart_window: int = 60  # seconds
    shutdown_timeout: int = 5  # seconds to wait for graceful shutdown
    state: ChildState = ChildState.STOPPED
    restart_count: int = 0
    restart_timestamps: list = field(default_factory=list)
    last_error: Exception | None = None

This is the equivalent of Erlang's child_spec. Every agent you want to supervise gets wrapped in one.

The Supervisor

class Supervisor:
    def __init__(
        self,
        name: str,
        strategy: RestartStrategy = RestartStrategy.ONE_FOR_ONE,
        max_intensity: int = 5,    # max total restarts
        intensity_window: int = 60  # within this window (seconds)
    ):
        self.name = name
        self.strategy = strategy
        self.max_intensity = max_intensity
        self.intensity_window = intensity_window
        self.children: list[ChildSpec] = []
        self.restart_history: list[float] = []
 
    def add_child(self, spec: ChildSpec) -> None:
        self.children.append(spec)
 
    async def start(self) -> dict[str, Any]:
        """Start all children and supervise them."""
        results = {}
        for child in self.children:
            results[child.id] = await self._start_child(child)
        return results
 
    async def _start_child(self, child: ChildSpec) -> Any:
        """Start a single child with supervision."""
        child.state = ChildState.RUNNING
        try:
            result = await asyncio.wait_for(
                child.start_fn(),
                timeout=child.shutdown_timeout * 10  # generous timeout
            )
            child.state = ChildState.STOPPED
            return result
        except Exception as e:
            child.state = ChildState.FAILED
            child.last_error = e
            return await self._handle_failure(child, e)
 
    async def _handle_failure(self, failed_child: ChildSpec, error: Exception) -> Any:
        """Apply the restart strategy."""
        print(f"[{self.name}] Child '{failed_child.id}' failed: {error}")
 
        if not self._can_restart(failed_child):
            raise SupervisorShutdown(
                f"Child '{failed_child.id}' exceeded max restarts "
                f"({failed_child.max_restarts} in {failed_child.restart_window}s)"
            )
 
        if not self._check_intensity():
            raise SupervisorShutdown(
                f"Supervisor '{self.name}' exceeded max restart intensity "
                f"({self.max_intensity} in {self.intensity_window}s)"
            )
 
        match self.strategy:
            case RestartStrategy.ONE_FOR_ONE:
                return await self._restart_one(failed_child)
            case RestartStrategy.ONE_FOR_ALL:
                return await self._restart_all(failed_child)
            case RestartStrategy.REST_FOR_ONE:
                return await self._restart_rest(failed_child)
 
    def _can_restart(self, child: ChildSpec) -> bool:
        """Check if child hasn't exceeded its restart budget."""
        now = time.time()
        child.restart_timestamps = [
            t for t in child.restart_timestamps
            if now - t < child.restart_window
        ]
        if len(child.restart_timestamps) >= child.max_restarts:
            return False
        child.restart_timestamps.append(now)
        child.restart_count += 1
        return True
 
    def _check_intensity(self) -> bool:
        """Check if supervisor hasn't exceeded global restart intensity."""
        now = time.time()
        self.restart_history = [
            t for t in self.restart_history
            if now - t < self.intensity_window
        ]
        if len(self.restart_history) >= self.max_intensity:
            return False
        self.restart_history.append(now)
        return True
 
    async def _restart_one(self, child: ChildSpec) -> Any:
        """Restart only the failed child."""
        print(f"[{self.name}] Restarting '{child.id}' (attempt {child.restart_count})")
        child.state = ChildState.RESTARTING
        return await self._start_child(child)
 
    async def _restart_all(self, failed_child: ChildSpec) -> dict[str, Any]:
        """Restart all children (order matters)."""
        print(f"[{self.name}] Restarting ALL children due to '{failed_child.id}' failure")
        results = {}
        for child in self.children:
            child.state = ChildState.RESTARTING
            results[child.id] = await self._start_child(child)
        return results
 
    async def _restart_rest(self, failed_child: ChildSpec) -> dict[str, Any]:
        """Restart the failed child and all children started after it."""
        idx = self.children.index(failed_child)
        to_restart = self.children[idx:]
        print(f"[{self.name}] Restarting '{failed_child.id}' + {len(to_restart)-1} subsequent children")
        results = {}
        for child in to_restart:
            child.state = ChildState.RESTARTING
            results[child.id] = await self._start_child(child)
        return results
 
 
class SupervisorShutdown(Exception):
    """Raised when a supervisor exhausts its restart budget."""
    pass

This is 100 lines. It gives you the same fundamental abstraction that powers telecom switches.

Choosing a Strategy

The strategy you pick depends on how your agents relate to each other:

When to Use Each Strategy

ONE_FOR_ONE — Your agents are independent. A web scraper crashing should not restart the summarizer. Most common for tool-calling agents that don't share state.

ONE_FOR_ALL — Your agents share state or depend on a consistent view. If the embedding agent crashes mid-batch, the ranking agent's state is invalid too. Use for tightly coupled pipelines.

REST_FOR_ONE — Your agents form an ordered pipeline. If step 3 fails, steps 4 and 5 are invalid, but steps 1 and 2 are fine. Use for sequential multi-agent workflows.

Putting It Together: A Self-Healing Research Agent

Let's build something real. A research agent that takes a question, searches the web, extracts information, and synthesizes an answer. When any step fails, supervision handles it.

import anthropic
import httpx
 
client = anthropic.Anthropic()
 
async def web_search_agent(query: str) -> list[str]:
    """Search the web and return relevant URLs."""
    response = client.messages.create(
        model="claude-sonnet-4-6-20250514",
        max_tokens=1024,
        tools=[{
            "name": "web_search",
            "description": "Search the web",
            "input_schema": {"type": "object", "properties": {"q": {"type": "string"}}}
        }],
        messages=[{"role": "user", "content": f"Find sources about: {query}"}]
    )
    # Extract URLs from tool use results
    urls = parse_search_results(response)
    if not urls:
        raise ValueError("Search returned no results")
    return urls
 
 
async def extract_agent(urls: list[str]) -> list[dict]:
    """Extract key information from each URL."""
    results = []
    async with httpx.AsyncClient(timeout=10) as http:
        for url in urls[:5]:  # limit to 5 sources
            try:
                resp = await http.get(url)
                content = resp.text[:8000]
                extraction = client.messages.create(
                    model="claude-sonnet-4-6-20250514",
                    max_tokens=2048,
                    messages=[{
                        "role": "user",
                        "content": f"Extract key facts from:\n\n{content}"
                    }]
                )
                results.append({
                    "url": url,
                    "facts": extraction.content[0].text
                })
            except httpx.HTTPError:
                continue  # individual URL failures are fine
    if not results:
        raise ValueError("All extractions failed")
    return results
 
 
async def synthesize_agent(facts: list[dict], question: str) -> str:
    """Synthesize extracted facts into a coherent answer."""
    context = "\n\n".join(
        f"Source: {f['url']}\nFacts: {f['facts']}" for f in facts
    )
    response = client.messages.create(
        model="claude-sonnet-4-6-20250514",
        max_tokens=4096,
        messages=[{
            "role": "user",
            "content": (
                f"Based on these sources, answer: {question}\n\n"
                f"Sources:\n{context}"
            )
        }]
    )
    return response.content[0].text

Now wrap them in a supervision tree:

async def run_supervised_research(question: str) -> str:
    """Run a research pipeline with Erlang-style supervision."""
 
    # State flows through the pipeline
    state = {"question": question, "urls": [], "facts": [], "answer": ""}
 
    # Define each step as a supervised child
    search_spec = ChildSpec(
        id="web_search",
        start_fn=lambda: web_search_agent(question),
        max_restarts=3,
        restart_window=30
    )
 
    extract_spec = ChildSpec(
        id="extraction",
        start_fn=lambda: extract_agent(state["urls"]),
        max_restarts=2,
        restart_window=60
    )
 
    synth_spec = ChildSpec(
        id="synthesis",
        start_fn=lambda: synthesize_agent(state["facts"], question),
        max_restarts=2,
        restart_window=60
    )
 
    # REST_FOR_ONE: if search fails, re-run search + everything after it
    # but if synthesis fails, only re-run synthesis
    supervisor = Supervisor(
        name="research_pipeline",
        strategy=RestartStrategy.REST_FOR_ONE,
        max_intensity=8,
        intensity_window=120
    )
 
    supervisor.add_child(search_spec)
    supervisor.add_child(extract_spec)
    supervisor.add_child(synth_spec)
 
    # Run pipeline: each step feeds the next
    state["urls"] = await supervisor._start_child(search_spec)
    state["facts"] = await supervisor._start_child(extract_spec)
    state["answer"] = await supervisor._start_child(synth_spec)
 
    return state["answer"]

If the OpenAI API returns a 529 during extraction, the supervisor automatically restarts that step. If it fails three times in 60 seconds, the supervisor itself shuts down and the error propagates to the caller — a clean, predictable failure instead of a zombie process.

The Circuit Breaker Extension

Erlang supervision handles crashes. But AI agents have a failure mode that crashes don't cover: they succeed expensively. An agent stuck in a reasoning loop isn't crashing — it's burning $50 of API calls per minute while returning valid (but useless) responses.

Add a cost circuit breaker to your supervisor:

@dataclass
class CircuitBreaker:
    max_cost_usd: float = 1.0
    max_tokens: int = 100_000
    max_wall_time: int = 120  # seconds
    current_cost: float = 0.0
    current_tokens: int = 0
    start_time: float = field(default_factory=time.time)
 
    def track(self, input_tokens: int, output_tokens: int, model: str) -> None:
        """Track usage and trip if limits exceeded."""
        cost = self._calculate_cost(input_tokens, output_tokens, model)
        self.current_cost += cost
        self.current_tokens += input_tokens + output_tokens
 
        if self.current_cost > self.max_cost_usd:
            raise CircuitBreakerTripped(
                f"Cost limit exceeded: ${self.current_cost:.2f} > ${self.max_cost_usd:.2f}"
            )
        if self.current_tokens > self.max_tokens:
            raise CircuitBreakerTripped(
                f"Token limit exceeded: {self.current_tokens} > {self.max_tokens}"
            )
        if time.time() - self.start_time > self.max_wall_time:
            raise CircuitBreakerTripped(
                f"Wall time exceeded: {time.time() - self.start_time:.0f}s > {self.max_wall_time}s"
            )
 
    def _calculate_cost(self, inp: int, out: int, model: str) -> float:
        rates = {
            "claude-sonnet-4-6-20250514": (0.003, 0.015),
            "claude-haiku-4-5-20251001": (0.0008, 0.004),
        }
        input_rate, output_rate = rates.get(model, (0.003, 0.015))
        return (inp / 1000 * input_rate) + (out / 1000 * output_rate)
 
 
class CircuitBreakerTripped(Exception):
    pass

The circuit breaker is orthogonal to supervision. The supervisor handles what to do when something fails. The circuit breaker handles when to force a failure. Together, they cover the full failure space of AI agents.

The $2,000 Lesson

A real production incident: an agent with a recursive planning step entered a loop where each iteration's output became the next iteration's input — growing context linearly. The agent didn't crash. It didn't timeout. It ran for 47 minutes and consumed $2,147 in API calls before a human noticed. A circuit breaker with a $5 limit would have tripped in under 10 seconds.

Production Patterns

Four patterns that complete the supervision architecture:

1. Graduated Model Fallback

When your primary model fails or is rate-limited, fall back through cheaper models automatically:

MODEL_CHAIN = [
    "claude-sonnet-4-6-20250514",
    "claude-haiku-4-5-20251001",
]
 
async def resilient_llm_call(messages, tools=None, model_idx=0):
    """Try models in order until one succeeds."""
    if model_idx >= len(MODEL_CHAIN):
        raise AllModelsExhausted("No models available")
    try:
        return client.messages.create(
            model=MODEL_CHAIN[model_idx],
            max_tokens=4096,
            messages=messages,
            tools=tools or []
        )
    except (anthropic.RateLimitError, anthropic.APIStatusError):
        return await resilient_llm_call(messages, tools, model_idx + 1)

2. Checkpoint and Resume

Save agent state between steps so restarts don't repeat completed work:

import json
from pathlib import Path
 
class CheckpointStore:
    def __init__(self, run_id: str):
        self.path = Path(f"/tmp/agent_checkpoints/{run_id}")
        self.path.mkdir(parents=True, exist_ok=True)
 
    def save(self, step: str, data: Any) -> None:
        (self.path / f"{step}.json").write_text(json.dumps(data))
 
    def load(self, step: str) -> Any | None:
        f = self.path / f"{step}.json"
        return json.loads(f.read_text()) if f.exists() else None
 
    def has(self, step: str) -> bool:
        return (self.path / f"{step}.json").exists()

When a supervised child restarts, it checks the checkpoint store first. If step 2 already completed, it skips directly to step 3.

3. Heartbeat Monitoring

Detect agents that are alive but stuck:

async def with_heartbeat(fn, timeout_per_beat=30):
    """Wraps an async function with a heartbeat timeout.
    The function must yield periodically to prove it's making progress."""
    last_beat = time.time()
 
    async def monitor():
        nonlocal last_beat
        while True:
            await asyncio.sleep(5)
            if time.time() - last_beat > timeout_per_beat:
                raise HeartbeatTimeout("Agent stopped making progress")
 
    async def tracked():
        nonlocal last_beat
        async for partial in fn():
            last_beat = time.time()
            yield partial
 
    monitor_task = asyncio.create_task(monitor())
    try:
        async for result in tracked():
            yield result
    finally:
        monitor_task.cancel()

4. Poisoned Input Quarantine

When a specific input consistently crashes an agent, quarantine it instead of letting it burn through your restart budget:

class PoisonPill:
    def __init__(self, max_failures: int = 2):
        self.failures: dict[str, int] = {}
        self.max_failures = max_failures
 
    def record_failure(self, input_hash: str) -> None:
        self.failures[input_hash] = self.failures.get(input_hash, 0) + 1
 
    def is_poisoned(self, input_hash: str) -> bool:
        return self.failures.get(input_hash, 0) >= self.max_failures

If the same user query crashes the agent twice, stop trying. Route it to a fallback response or a human queue. This prevents a single bad input from consuming your entire restart budget.

The Supervision Checklist

Before deploying any AI agent to production:

QuestionIf No
Does every agent process have a supervisor?Add one. Unsupervised agents are ticking bombs.
Is there a cost circuit breaker?Add one. Start at $1/request. Adjust after you have data.
Does the pipeline checkpoint between steps?Add checkpointing. Restarts should not repeat work.
Is there a heartbeat for long-running agents?Add one. 30-second timeout per progress beat.
Are poisoned inputs quarantined?Add a poison pill detector. 2 failures = quarantine.
Does the supervisor have a max restart intensity?Set one. 5 restarts in 60s is a good default.

The fundamental shift: Stop writing try/except blocks inside your agents. Let them crash. Move all recovery logic to supervisors. Your agents get simpler. Your recovery gets better. Your system gets more reliable. This is not a paradox — it's the same insight that made Erlang the most reliable platform ever built.

What Joe Armstrong Knew

Joe Armstrong, Erlang's creator, said it plainly: "The way to make reliable systems is not to build perfect components. It is to build imperfect components and supervise them."

AI agents are the most imperfect components we have ever deployed to production. They hallucinate. They loop. They burn money. They fail in ways we cannot anticipate.

The answer is not to make them perfect. The answer is to supervise them, bound their failures, and make recovery automatic. The telecom industry figured this out 40 years ago. It is time the AI industry caught up.


Sources & Further Reading

  1. Erlang/OTP Design Principles: Supervisor Behaviour
  2. Joe Armstrong, "Making Reliable Distributed Systems in the Presence of Software Errors" (PhD thesis, 2003)
  3. Elixir Supervisor documentation
  4. The WhatsApp Architecture at Scale
  5. Anthropic Claude API: Error Handling

Was this article helpful?

Share:

Related Posts

Tutorials

Event-Sourced AI Agents: The Production Blueprint for 2026

Most AI agents fail in production because they are not replayable, testable, or safe. Learn an event-sourced architecture that gives your agents deterministic behavior, cost control, and enterprise-grade reliability.

Read more
Tutorials

Build an Event-Sourced AI Agent from Scratch: Full Working Code

Step-by-step tutorial with complete Python code to build a production-ready event-sourced AI agent — orchestrator, planner, policy guard, tool executor, and replay engine.

Read more
Tutorials

Cursor 3 and Gemma 4 Dropped on the Same Day. Your Stack Just Changed.

On April 2, 2026, Google shipped Gemma 4 (89% on AIME, 80% on LiveCodeBench, 86% on agentic tool use) and Cursor shipped a ground-up agent-first IDE. Here is what the new developer stack looks like.

Read more

Comments

No comments yet. Be the first to share your thoughts!

Leave a comment

Weekly AI insights

Join developers getting LLM tips, ML guides, and tool reviews.

Sponsor this space

Reach thousands of AI engineers weekly.