AIStackInsightsAIStackInsights
HomeBlogCategoriesAboutNewsletter
AIStackInsightsAIStackInsights

Practical AI insights — LLMs, machine learning, prompt engineering, and the tools shaping the future.

Content

  • All Posts
  • LLMs
  • Tutorials
  • AI Tools

Company

  • About
  • Newsletter
  • RSS Feed

Connect

© 2026 AIStackInsights. All rights reserved.

Tutorials

Build an Event-Sourced AI Agent from Scratch: Full Working Code

Step-by-step tutorial with complete Python code to build a production-ready event-sourced AI agent — orchestrator, planner, policy guard, tool executor, and replay engine.

AIStackInsights TeamMarch 19, 202612 min read
ai-agentsevent-sourcingpythontutorialproduction

In our previous article we covered the why — why event sourcing makes agents debuggable, replayable, and safe for production. This article is the how. Every component, every file, every line of code you need to run a working event-sourced AI agent on your machine today.

By the end you will have a fully functional Python agent that:

  • Persists every decision as an immutable event to SQLite
  • Generates a bounded, validated plan using Claude or OpenAI
  • Enforces tool policy rules before any action executes
  • Replays historical runs to test prompt changes safely

No partial snippets. No "left as an exercise." Full code.


Prerequisites

python -m pip install anthropic openai fastapi uvicorn sqlalchemy pydantic python-dotenv

Create a .env file:

ANTHROPIC_API_KEY=your_key_here
# or
OPENAI_API_KEY=your_key_here

Project structure we are building:

agent/
├── .env
├── main.py          # Entry point
├── db.py            # Event store
├── models.py        # Pydantic schemas
├── orchestrator.py  # Run lifecycle
├── planner.py       # LLM plan generation
├── policy.py        # Policy guard
├── tools.py         # Tool executor
└── replay.py        # Replay engine

Step 1: Event Store (db.py)

Every event the agent emits is appended here. Nothing is ever updated or deleted.

# db.py
import json
from datetime import datetime, timezone
from sqlalchemy import create_engine, Column, String, Integer, Text, DateTime
from sqlalchemy.orm import DeclarativeBase, sessionmaker
 
DATABASE_URL = "sqlite:///agent_events.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
Session = sessionmaker(bind=engine)
 
 
class Base(DeclarativeBase):
    pass
 
 
class AgentEvent(Base):
    __tablename__ = "agent_events"
 
    id = Column(Integer, primary_key=True, autoincrement=True)
    run_id = Column(String, nullable=False, index=True)
    seq = Column(Integer, nullable=False)
    event_type = Column(String, nullable=False, index=True)
    actor = Column(String, nullable=False)
    payload = Column(Text, nullable=False)  # JSON string
    event_time = Column(DateTime, default=lambda: datetime.now(timezone.utc))
 
 
Base.metadata.create_all(engine)
 
 
def append_event(run_id: str, seq: int, event_type: str, actor: str, payload: dict) -> AgentEvent:
    """Append an immutable event. Never call this to update existing events."""
    session = Session()
    try:
        event = AgentEvent(
            run_id=run_id,
            seq=seq,
            event_type=event_type,
            actor=actor,
            payload=json.dumps(payload),
        )
        session.add(event)
        session.commit()
        session.refresh(event)
        return event
    finally:
        session.close()
 
 
def get_run_events(run_id: str) -> list[dict]:
    """Return all events for a run in sequence order."""
    session = Session()
    try:
        rows = (
            session.query(AgentEvent)
            .filter_by(run_id=run_id)
            .order_by(AgentEvent.seq)
            .all()
        )
        return [
            {
                "id": r.id,
                "run_id": r.run_id,
                "seq": r.seq,
                "event_type": r.event_type,
                "actor": r.actor,
                "payload": json.loads(r.payload),
                "event_time": r.event_time.isoformat(),
            }
            for r in rows
        ]
    finally:
        session.close()
 
 
def get_all_runs() -> list[str]:
    """Return distinct run IDs ordered by first event time."""
    session = Session()
    try:
        rows = (
            session.query(AgentEvent.run_id)
            .distinct()
            .order_by(AgentEvent.event_time)
            .all()
        )
        return [r.run_id for r in rows]
    finally:
        session.close()

Replace sqlite:///agent_events.db with a Postgres URL in production: postgresql://user:pass@host/db. The rest of the code does not change.


Step 2: Pydantic Schemas (models.py)

Strict schemas prevent the LLM from returning malformed plans.

# models.py
from typing import Literal
from pydantic import BaseModel, Field
 
 
ALLOWED_TOOLS = Literal["web_search", "read_file", "write_file", "summarize"]
 
 
class PlanStep(BaseModel):
    id: str = Field(..., min_length=1)
    objective: str = Field(..., min_length=5)
    tool: ALLOWED_TOOLS
    input: dict
    success_criteria: str = Field(..., min_length=5)
 
 
class AgentPlan(BaseModel):
    run_id: str
    goal: str
    max_steps: int = Field(..., ge=1, le=10)
    budget_usd: float = Field(..., ge=0.01, le=5.0)
    steps: list[PlanStep] = Field(..., min_length=1)
    stop_conditions: list[str] = Field(..., min_length=1)
 
 
class RunRequest(BaseModel):
    goal: str
    max_steps: int = 5
    budget_usd: float = 1.0
 
 
class PolicyDecision(BaseModel):
    step_id: str
    tool: str
    allowed: bool
    reason: str

Step 3: Planner (planner.py)

The planner calls an LLM and forces it to return a valid AgentPlan. If the model returns garbage, the run fails fast with a PlanRejected event — not a silent bad execution.

# planner.py
import json
import os
from models import AgentPlan
 
SYSTEM_PROMPT = """You are an AI agent planner. Given a goal, produce a JSON plan with this exact schema:
 
{
  "run_id": "<provided>",
  "goal": "<the goal>",
  "max_steps": <int 1-10>,
  "budget_usd": <float 0.01-5.0>,
  "steps": [
    {
      "id": "step_1",
      "objective": "What this step achieves",
      "tool": "web_search" | "read_file" | "write_file" | "summarize",
      "input": { ... tool-specific params ... },
      "success_criteria": "How to know this step succeeded"
    }
  ],
  "stop_conditions": ["Condition that means the goal is complete"]
}
 
Return ONLY valid JSON. No explanation. No markdown fences."""
 
 
def generate_plan(run_id: str, goal: str, max_steps: int, budget_usd: float) -> AgentPlan:
    """Call LLM to generate a bounded, validated plan."""
 
    user_message = f"run_id: {run_id}\ngoal: {goal}\nmax_steps: {max_steps}\nbudget_usd: {budget_usd}"
 
    if os.getenv("ANTHROPIC_API_KEY"):
        import anthropic
        client = anthropic.Anthropic()
        response = client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=1024,
            system=SYSTEM_PROMPT,
            messages=[{"role": "user", "content": user_message}],
        )
        raw = response.content[0].text
    else:
        from openai import OpenAI
        client = OpenAI()
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": user_message},
            ],
            response_format={"type": "json_object"},
        )
        raw = response.choices[0].message.content
 
    data = json.loads(raw)
    return AgentPlan.model_validate(data)  # raises ValidationError if schema is wrong

Use claude-3-5-haiku or gpt-4o-mini for planning — fast and cheap. Reserve the stronger model for synthesis at the response stage only.


Step 4: Policy Guard (policy.py)

No tool executes without passing policy. This is code, not a prompt.

# policy.py
from models import PlanStep, PolicyDecision
 
# Simple in-memory rules. In production: load from YAML or a config service.
POLICY_RULES = [
    {
        "name": "deny_write_without_explicit_path",
        "when": {"tool": "write_file"},
        "require_field": "path",
        "deny_if_missing": True,
        "reason": "write_file requires an explicit path in input",
    },
    {
        "name": "deny_search_with_empty_query",
        "when": {"tool": "web_search"},
        "require_field": "query",
        "deny_if_missing": True,
        "reason": "web_search requires a non-empty query",
    },
]
 
 
def evaluate(step: PlanStep) -> PolicyDecision:
    """Evaluate a plan step against all policy rules. First deny wins."""
    for rule in POLICY_RULES:
        if rule["when"].get("tool") == step.tool:
            required_field = rule.get("require_field")
            if required_field and not step.input.get(required_field):
                return PolicyDecision(
                    step_id=step.id,
                    tool=step.tool,
                    allowed=False,
                    reason=rule["reason"],
                )
 
    return PolicyDecision(
        step_id=step.id,
        tool=step.tool,
        allowed=True,
        reason="All policy rules passed",
    )

Step 5: Tool Executor (tools.py)

Each tool runs in isolation and returns a structured result. Errors are caught and returned as observations — never swallowed.

# tools.py
import json
from datetime import datetime
 
 
def run_tool(tool: str, input_params: dict) -> dict:
    """
    Execute a tool and return a structured observation.
    Never raises — errors become observations with success=False.
    """
    try:
        if tool == "web_search":
            return _web_search(input_params)
        elif tool == "read_file":
            return _read_file(input_params)
        elif tool == "write_file":
            return _write_file(input_params)
        elif tool == "summarize":
            return _summarize(input_params)
        else:
            return {"success": False, "error": f"Unknown tool: {tool}"}
    except Exception as e:
        return {"success": False, "error": str(e), "tool": tool}
 
 
def _web_search(params: dict) -> dict:
    """Stub: replace with real search API (Brave, Tavily, etc.)"""
    query = params.get("query", "")
    # In production: call Brave Search API or Tavily
    return {
        "success": True,
        "query": query,
        "results": [
            {"title": f"Result for '{query}'", "url": "https://example.com", "snippet": "Stub result."}
        ],
        "timestamp": datetime.utcnow().isoformat(),
    }
 
 
def _read_file(params: dict) -> dict:
    path = params.get("path", "")
    try:
        with open(path, "r", encoding="utf-8") as f:
            content = f.read()
        return {"success": True, "path": path, "content": content[:4000]}  # cap at 4k chars
    except FileNotFoundError:
        return {"success": False, "error": f"File not found: {path}"}
 
 
def _write_file(params: dict) -> dict:
    path = params.get("path", "")
    content = params.get("content", "")
    with open(path, "w", encoding="utf-8") as f:
        f.write(content)
    return {"success": True, "path": path, "bytes_written": len(content)}
 
 
def _summarize(params: dict) -> dict:
    text = params.get("text", "")
    # Stub: in production call LLM with summarization prompt
    truncated = text[:200] + "..." if len(text) > 200 else text
    return {"success": True, "summary": f"[Summary of: {truncated}]", "original_length": len(text)}

Step 6: Orchestrator (orchestrator.py)

This is the heart of the system. It sequences events, calls each service, and ensures every transition is recorded before it happens.

# orchestrator.py
import uuid
from db import append_event, get_run_events
from models import RunRequest, AgentPlan
from planner import generate_plan
from policy import evaluate
from tools import run_tool
 
 
def run_agent(request: RunRequest) -> dict:
    run_id = str(uuid.uuid4())[:8]
    seq = 0
 
    def emit(event_type: str, actor: str, payload: dict):
        nonlocal seq
        seq += 1
        append_event(run_id, seq, event_type, actor, payload)
        print(f"  [{seq}] {event_type} — {actor}")
 
    # --- RunStarted ---
    emit("RunStarted", "orchestrator", {
        "goal": request.goal,
        "max_steps": request.max_steps,
        "budget_usd": request.budget_usd,
    })
 
    # --- Generate Plan ---
    try:
        plan: AgentPlan = generate_plan(
            run_id=run_id,
            goal=request.goal,
            max_steps=request.max_steps,
            budget_usd=request.budget_usd,
        )
        emit("PlanGenerated", "planner", plan.model_dump())
    except Exception as e:
        emit("PlanRejected", "planner", {"error": str(e)})
        emit("RunFailed", "orchestrator", {"reason": "Plan generation failed"})
        return {"run_id": run_id, "status": "failed", "reason": str(e)}
 
    # --- Execute Steps ---
    observations = []
 
    for step in plan.steps:
        # Policy check
        decision = evaluate(step)
        if decision.allowed:
            emit("ToolApproved", "policy_guard", {
                "step_id": step.id,
                "tool": step.tool,
                "reason": decision.reason,
            })
        else:
            emit("ToolRejected", "policy_guard", {
                "step_id": step.id,
                "tool": step.tool,
                "reason": decision.reason,
            })
            continue  # skip this step, keep going
 
        # Execute tool
        emit("ToolExecuted", "tool_executor", {"step_id": step.id, "tool": step.tool, "input": step.input})
        result = run_tool(step.tool, step.input)
 
        emit("ObservationCaptured", "tool_executor", {
            "step_id": step.id,
            "tool": step.tool,
            "result": result,
        })
        observations.append({"step_id": step.id, "result": result})
 
    # --- Finalize Response ---
    summary = f"Completed {len(observations)} steps for goal: {request.goal}"
    emit("ResponseFinalized", "orchestrator", {"summary": summary, "step_count": len(observations)})
    emit("RunCompleted", "orchestrator", {"run_id": run_id, "status": "success"})
 
    return {"run_id": run_id, "status": "success", "summary": summary, "observations": observations}

Step 7: Replay Engine (replay.py)

The replay engine reads historical events and re-runs them through new logic without touching production. This is how you safely test prompt changes.

# replay.py
from db import get_run_events, get_all_runs
from models import RunRequest
from orchestrator import run_agent
 
 
def replay_run(run_id: str, verbose: bool = True) -> dict:
    """
    Load a historical run and re-execute it with the current code.
    Useful for regression testing after prompt or tool changes.
    """
    events = get_run_events(run_id)
    if not events:
        return {"error": f"No events found for run_id={run_id}"}
 
    # Extract original RunStarted event to get the original goal
    started = next((e for e in events if e["event_type"] == "RunStarted"), None)
    if not started:
        return {"error": "RunStarted event not found — cannot replay"}
 
    original_goal = started["payload"]["goal"]
    original_steps = started["payload"]["max_steps"]
    original_budget = started["payload"]["budget_usd"]
 
    if verbose:
        print(f"\n--- REPLAYING run_id={run_id} ---")
        print(f"Original goal: {original_goal}")
        print(f"Original events: {len(events)}")
        print(f"Re-running with current code...\n")
 
    # Re-run with current code
    new_result = run_agent(RunRequest(
        goal=original_goal,
        max_steps=original_steps,
        budget_usd=original_budget,
    ))
 
    # Compare step counts as a basic quality signal
    original_completed = sum(1 for e in events if e["event_type"] == "ObservationCaptured")
    new_completed = len(new_result.get("observations", []))
 
    delta = {
        "original_run_id": run_id,
        "new_run_id": new_result["run_id"],
        "original_steps_completed": original_completed,
        "new_steps_completed": new_completed,
        "regression": new_completed < original_completed,
    }
 
    if verbose:
        print(f"\n--- REPLAY DELTA ---")
        print(f"Original: {original_completed} steps completed")
        print(f"New:      {new_completed} steps completed")
        print(f"Regression: {delta['regression']}")
 
    return delta
 
 
def replay_all(verbose: bool = False) -> list[dict]:
    """Replay every historical run. Use in CI before deploying prompt changes."""
    run_ids = get_all_runs()
    results = []
    regressions = 0
 
    for run_id in run_ids:
        delta = replay_run(run_id, verbose=verbose)
        results.append(delta)
        if delta.get("regression"):
            regressions += 1
 
    print(f"\n=== Replay Summary: {len(run_ids)} runs, {regressions} regressions ===")
    return results

In production, replay against a separate database to avoid polluting your event store with test runs. Use DATABASE_URL=sqlite:///replay_test.db as an env override.


Step 8: Wire It All Together (main.py)

# main.py
from dotenv import load_dotenv
load_dotenv()
 
from models import RunRequest
from orchestrator import run_agent
from replay import replay_run, replay_all
from db import get_run_events, get_all_runs
 
 
def demo_run():
    print("=== Running Agent ===\n")
    request = RunRequest(
        goal="Research the latest developments in AI agent frameworks and summarize the key findings",
        max_steps=4,
        budget_usd=0.50,
    )
    result = run_agent(request)
    print(f"\nResult: {result['status']} — {result['summary']}")
    return result["run_id"]
 
 
def show_events(run_id: str):
    print(f"\n=== Event Log for {run_id} ===")
    events = get_run_events(run_id)
    for e in events:
        print(f"  [{e['seq']}] {e['event_type']} ({e['actor']})")
 
 
def demo_replay(run_id: str):
    print("\n=== Replaying Run ===")
    delta = replay_run(run_id, verbose=True)
    return delta
 
 
if __name__ == "__main__":
    # 1. Run the agent
    run_id = demo_run()
 
    # 2. Print the event log
    show_events(run_id)
 
    # 3. Replay it (simulates testing a prompt change)
    demo_replay(run_id)

Running It

cd agent/
python main.py

Expected output:

=== Running Agent ===

  [1] RunStarted — orchestrator
  [2] PlanGenerated — planner
  [3] ToolApproved — policy_guard
  [4] ToolExecuted — tool_executor
  [5] ObservationCaptured — tool_executor
  [6] ToolApproved — policy_guard
  [7] ToolExecuted — tool_executor
  [8] ObservationCaptured — tool_executor
  [9] ResponseFinalized — orchestrator
  [10] RunCompleted — orchestrator

Result: success — Completed 2 steps for goal: Research the latest...

=== Event Log for a3f91c2b ===
  [1] RunStarted (orchestrator)
  [2] PlanGenerated (planner)
  ...

=== Replaying Run ===
--- REPLAYING run_id=a3f91c2b ---
Original goal: Research the latest developments...
Re-running with current code...

--- REPLAY DELTA ---
Original: 2 steps completed
New:      2 steps completed
Regression: False

Adding a Real Search Tool

Replace the stub in tools.py with a real Brave Search call:

import httpx
import os
 
def _web_search(params: dict) -> dict:
    query = params.get("query", "")
    api_key = os.getenv("BRAVE_API_KEY")
 
    response = httpx.get(
        "https://api.search.brave.com/res/v1/web/search",
        headers={"Accept": "application/json", "X-Subscription-Token": api_key},
        params={"q": query, "count": 5},
        timeout=10,
    )
    data = response.json()
    results = [
        {"title": r["title"], "url": r["url"], "snippet": r.get("description", "")}
        for r in data.get("web", {}).get("results", [])
    ]
    return {"success": True, "query": query, "results": results}

Get a free Brave Search API key at api.search.brave.com.


What to Build Next

With this foundation running, the natural next steps are:

  1. Human approval queue — when PolicyDecision.allowed = False, write to a queue and wait for a human /approve endpoint before continuing
  2. Cost tracking — parse token usage from LLM responses and emit a CostRecorded event per step; fail the run if budget_usd is exceeded
  3. Postgres + FastAPI — wrap run_agent() in a POST endpoint, switch to Postgres, and you have a production-ready API
  4. Replay in CI — run replay_all() in your GitHub Actions pipeline before any prompt change ships

The full source code for this tutorial is structured to be copy-paste complete. Each file is self-contained and imports only from the files above it.


References

  • Event-Sourced AI Agents: The Production Blueprint — the architecture overview this code implements
  • Martin Fowler — Event Sourcing — the foundational pattern
  • Anthropic API Docs — Claude model reference for the planner
  • OpenAI Structured Outputs — JSON mode for GPT-based planners
  • Pydantic v2 Docs — model validation used throughout
  • SQLAlchemy ORM — event store persistence layer
  • Brave Search API — recommended search tool integration
  • OWASP Top 10 for LLM Applications — security considerations for the policy guard
Share:

Related Posts

Tutorials

Event-Sourced AI Agents: The Production Blueprint for 2026

Most AI agents fail in production because they are not replayable, testable, or safe. Learn an event-sourced architecture that gives your agents deterministic behavior, cost control, and enterprise-grade reliability.

Read more
Tutorials

NVIDIA NemoClaw: The One-Command Security Stack Making Autonomous AI Agents Safe to Deploy

NVIDIA's NemoClaw brings policy-based security, privacy guardrails, and local model execution to OpenClaw agents in a single install. Here's what developers need to know.

Read more
Tutorials

MCP: The Developer's Guide to the Protocol Quietly Rewiring AI Applications

Model Context Protocol (MCP) is becoming the USB-C of AI integration — a single standard for connecting LLMs to any tool, database, or API. Here's the architecture, the primitives, and how to build your first server.

Read more
Weekly AI insights

Join developers getting LLM tips, ML guides, and tool reviews.

Ad Slot:

Sponsor this space

Reach thousands of AI engineers weekly.