Build an Event-Sourced AI Agent from Scratch: Full Working Code
Step-by-step tutorial with complete Python code to build a production-ready event-sourced AI agent — orchestrator, planner, policy guard, tool executor, and replay engine.
In our previous article we covered the why — why event sourcing makes agents debuggable, replayable, and safe for production. This article is the how. Every component, every file, every line of code you need to run a working event-sourced AI agent on your machine today.
By the end you will have a fully functional Python agent that:
- Persists every decision as an immutable event to SQLite
- Generates a bounded, validated plan using Claude or OpenAI
- Enforces tool policy rules before any action executes
- Replays historical runs to test prompt changes safely
No partial snippets. No "left as an exercise." Full code.
Prerequisites
python -m pip install anthropic openai fastapi uvicorn sqlalchemy pydantic python-dotenvCreate a .env file:
ANTHROPIC_API_KEY=your_key_here
# or
OPENAI_API_KEY=your_key_hereProject structure we are building:
agent/
├── .env
├── main.py # Entry point
├── db.py # Event store
├── models.py # Pydantic schemas
├── orchestrator.py # Run lifecycle
├── planner.py # LLM plan generation
├── policy.py # Policy guard
├── tools.py # Tool executor
└── replay.py # Replay engine
Step 1: Event Store (db.py)
Every event the agent emits is appended here. Nothing is ever updated or deleted.
# db.py
import json
from datetime import datetime, timezone
from sqlalchemy import create_engine, Column, String, Integer, Text, DateTime
from sqlalchemy.orm import DeclarativeBase, sessionmaker
DATABASE_URL = "sqlite:///agent_events.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
Session = sessionmaker(bind=engine)
class Base(DeclarativeBase):
pass
class AgentEvent(Base):
__tablename__ = "agent_events"
id = Column(Integer, primary_key=True, autoincrement=True)
run_id = Column(String, nullable=False, index=True)
seq = Column(Integer, nullable=False)
event_type = Column(String, nullable=False, index=True)
actor = Column(String, nullable=False)
payload = Column(Text, nullable=False) # JSON string
event_time = Column(DateTime, default=lambda: datetime.now(timezone.utc))
Base.metadata.create_all(engine)
def append_event(run_id: str, seq: int, event_type: str, actor: str, payload: dict) -> AgentEvent:
"""Append an immutable event. Never call this to update existing events."""
session = Session()
try:
event = AgentEvent(
run_id=run_id,
seq=seq,
event_type=event_type,
actor=actor,
payload=json.dumps(payload),
)
session.add(event)
session.commit()
session.refresh(event)
return event
finally:
session.close()
def get_run_events(run_id: str) -> list[dict]:
"""Return all events for a run in sequence order."""
session = Session()
try:
rows = (
session.query(AgentEvent)
.filter_by(run_id=run_id)
.order_by(AgentEvent.seq)
.all()
)
return [
{
"id": r.id,
"run_id": r.run_id,
"seq": r.seq,
"event_type": r.event_type,
"actor": r.actor,
"payload": json.loads(r.payload),
"event_time": r.event_time.isoformat(),
}
for r in rows
]
finally:
session.close()
def get_all_runs() -> list[str]:
"""Return distinct run IDs ordered by first event time."""
session = Session()
try:
rows = (
session.query(AgentEvent.run_id)
.distinct()
.order_by(AgentEvent.event_time)
.all()
)
return [r.run_id for r in rows]
finally:
session.close()Replace sqlite:///agent_events.db with a Postgres URL in production: postgresql://user:pass@host/db. The rest of the code does not change.
Step 2: Pydantic Schemas (models.py)
Strict schemas prevent the LLM from returning malformed plans.
# models.py
from typing import Literal
from pydantic import BaseModel, Field
ALLOWED_TOOLS = Literal["web_search", "read_file", "write_file", "summarize"]
class PlanStep(BaseModel):
id: str = Field(..., min_length=1)
objective: str = Field(..., min_length=5)
tool: ALLOWED_TOOLS
input: dict
success_criteria: str = Field(..., min_length=5)
class AgentPlan(BaseModel):
run_id: str
goal: str
max_steps: int = Field(..., ge=1, le=10)
budget_usd: float = Field(..., ge=0.01, le=5.0)
steps: list[PlanStep] = Field(..., min_length=1)
stop_conditions: list[str] = Field(..., min_length=1)
class RunRequest(BaseModel):
goal: str
max_steps: int = 5
budget_usd: float = 1.0
class PolicyDecision(BaseModel):
step_id: str
tool: str
allowed: bool
reason: strStep 3: Planner (planner.py)
The planner calls an LLM and forces it to return a valid AgentPlan. If the model returns garbage, the run fails fast with a PlanRejected event — not a silent bad execution.
# planner.py
import json
import os
from models import AgentPlan
SYSTEM_PROMPT = """You are an AI agent planner. Given a goal, produce a JSON plan with this exact schema:
{
"run_id": "<provided>",
"goal": "<the goal>",
"max_steps": <int 1-10>,
"budget_usd": <float 0.01-5.0>,
"steps": [
{
"id": "step_1",
"objective": "What this step achieves",
"tool": "web_search" | "read_file" | "write_file" | "summarize",
"input": { ... tool-specific params ... },
"success_criteria": "How to know this step succeeded"
}
],
"stop_conditions": ["Condition that means the goal is complete"]
}
Return ONLY valid JSON. No explanation. No markdown fences."""
def generate_plan(run_id: str, goal: str, max_steps: int, budget_usd: float) -> AgentPlan:
"""Call LLM to generate a bounded, validated plan."""
user_message = f"run_id: {run_id}\ngoal: {goal}\nmax_steps: {max_steps}\nbudget_usd: {budget_usd}"
if os.getenv("ANTHROPIC_API_KEY"):
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=1024,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_message}],
)
raw = response.content[0].text
else:
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_message},
],
response_format={"type": "json_object"},
)
raw = response.choices[0].message.content
data = json.loads(raw)
return AgentPlan.model_validate(data) # raises ValidationError if schema is wrongUse claude-3-5-haiku or gpt-4o-mini for planning — fast and cheap. Reserve the stronger model for synthesis at the response stage only.
Step 4: Policy Guard (policy.py)
No tool executes without passing policy. This is code, not a prompt.
# policy.py
from models import PlanStep, PolicyDecision
# Simple in-memory rules. In production: load from YAML or a config service.
POLICY_RULES = [
{
"name": "deny_write_without_explicit_path",
"when": {"tool": "write_file"},
"require_field": "path",
"deny_if_missing": True,
"reason": "write_file requires an explicit path in input",
},
{
"name": "deny_search_with_empty_query",
"when": {"tool": "web_search"},
"require_field": "query",
"deny_if_missing": True,
"reason": "web_search requires a non-empty query",
},
]
def evaluate(step: PlanStep) -> PolicyDecision:
"""Evaluate a plan step against all policy rules. First deny wins."""
for rule in POLICY_RULES:
if rule["when"].get("tool") == step.tool:
required_field = rule.get("require_field")
if required_field and not step.input.get(required_field):
return PolicyDecision(
step_id=step.id,
tool=step.tool,
allowed=False,
reason=rule["reason"],
)
return PolicyDecision(
step_id=step.id,
tool=step.tool,
allowed=True,
reason="All policy rules passed",
)Step 5: Tool Executor (tools.py)
Each tool runs in isolation and returns a structured result. Errors are caught and returned as observations — never swallowed.
# tools.py
import json
from datetime import datetime
def run_tool(tool: str, input_params: dict) -> dict:
"""
Execute a tool and return a structured observation.
Never raises — errors become observations with success=False.
"""
try:
if tool == "web_search":
return _web_search(input_params)
elif tool == "read_file":
return _read_file(input_params)
elif tool == "write_file":
return _write_file(input_params)
elif tool == "summarize":
return _summarize(input_params)
else:
return {"success": False, "error": f"Unknown tool: {tool}"}
except Exception as e:
return {"success": False, "error": str(e), "tool": tool}
def _web_search(params: dict) -> dict:
"""Stub: replace with real search API (Brave, Tavily, etc.)"""
query = params.get("query", "")
# In production: call Brave Search API or Tavily
return {
"success": True,
"query": query,
"results": [
{"title": f"Result for '{query}'", "url": "https://example.com", "snippet": "Stub result."}
],
"timestamp": datetime.utcnow().isoformat(),
}
def _read_file(params: dict) -> dict:
path = params.get("path", "")
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
return {"success": True, "path": path, "content": content[:4000]} # cap at 4k chars
except FileNotFoundError:
return {"success": False, "error": f"File not found: {path}"}
def _write_file(params: dict) -> dict:
path = params.get("path", "")
content = params.get("content", "")
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return {"success": True, "path": path, "bytes_written": len(content)}
def _summarize(params: dict) -> dict:
text = params.get("text", "")
# Stub: in production call LLM with summarization prompt
truncated = text[:200] + "..." if len(text) > 200 else text
return {"success": True, "summary": f"[Summary of: {truncated}]", "original_length": len(text)}Step 6: Orchestrator (orchestrator.py)
This is the heart of the system. It sequences events, calls each service, and ensures every transition is recorded before it happens.
# orchestrator.py
import uuid
from db import append_event, get_run_events
from models import RunRequest, AgentPlan
from planner import generate_plan
from policy import evaluate
from tools import run_tool
def run_agent(request: RunRequest) -> dict:
run_id = str(uuid.uuid4())[:8]
seq = 0
def emit(event_type: str, actor: str, payload: dict):
nonlocal seq
seq += 1
append_event(run_id, seq, event_type, actor, payload)
print(f" [{seq}] {event_type} — {actor}")
# --- RunStarted ---
emit("RunStarted", "orchestrator", {
"goal": request.goal,
"max_steps": request.max_steps,
"budget_usd": request.budget_usd,
})
# --- Generate Plan ---
try:
plan: AgentPlan = generate_plan(
run_id=run_id,
goal=request.goal,
max_steps=request.max_steps,
budget_usd=request.budget_usd,
)
emit("PlanGenerated", "planner", plan.model_dump())
except Exception as e:
emit("PlanRejected", "planner", {"error": str(e)})
emit("RunFailed", "orchestrator", {"reason": "Plan generation failed"})
return {"run_id": run_id, "status": "failed", "reason": str(e)}
# --- Execute Steps ---
observations = []
for step in plan.steps:
# Policy check
decision = evaluate(step)
if decision.allowed:
emit("ToolApproved", "policy_guard", {
"step_id": step.id,
"tool": step.tool,
"reason": decision.reason,
})
else:
emit("ToolRejected", "policy_guard", {
"step_id": step.id,
"tool": step.tool,
"reason": decision.reason,
})
continue # skip this step, keep going
# Execute tool
emit("ToolExecuted", "tool_executor", {"step_id": step.id, "tool": step.tool, "input": step.input})
result = run_tool(step.tool, step.input)
emit("ObservationCaptured", "tool_executor", {
"step_id": step.id,
"tool": step.tool,
"result": result,
})
observations.append({"step_id": step.id, "result": result})
# --- Finalize Response ---
summary = f"Completed {len(observations)} steps for goal: {request.goal}"
emit("ResponseFinalized", "orchestrator", {"summary": summary, "step_count": len(observations)})
emit("RunCompleted", "orchestrator", {"run_id": run_id, "status": "success"})
return {"run_id": run_id, "status": "success", "summary": summary, "observations": observations}Step 7: Replay Engine (replay.py)
The replay engine reads historical events and re-runs them through new logic without touching production. This is how you safely test prompt changes.
# replay.py
from db import get_run_events, get_all_runs
from models import RunRequest
from orchestrator import run_agent
def replay_run(run_id: str, verbose: bool = True) -> dict:
"""
Load a historical run and re-execute it with the current code.
Useful for regression testing after prompt or tool changes.
"""
events = get_run_events(run_id)
if not events:
return {"error": f"No events found for run_id={run_id}"}
# Extract original RunStarted event to get the original goal
started = next((e for e in events if e["event_type"] == "RunStarted"), None)
if not started:
return {"error": "RunStarted event not found — cannot replay"}
original_goal = started["payload"]["goal"]
original_steps = started["payload"]["max_steps"]
original_budget = started["payload"]["budget_usd"]
if verbose:
print(f"\n--- REPLAYING run_id={run_id} ---")
print(f"Original goal: {original_goal}")
print(f"Original events: {len(events)}")
print(f"Re-running with current code...\n")
# Re-run with current code
new_result = run_agent(RunRequest(
goal=original_goal,
max_steps=original_steps,
budget_usd=original_budget,
))
# Compare step counts as a basic quality signal
original_completed = sum(1 for e in events if e["event_type"] == "ObservationCaptured")
new_completed = len(new_result.get("observations", []))
delta = {
"original_run_id": run_id,
"new_run_id": new_result["run_id"],
"original_steps_completed": original_completed,
"new_steps_completed": new_completed,
"regression": new_completed < original_completed,
}
if verbose:
print(f"\n--- REPLAY DELTA ---")
print(f"Original: {original_completed} steps completed")
print(f"New: {new_completed} steps completed")
print(f"Regression: {delta['regression']}")
return delta
def replay_all(verbose: bool = False) -> list[dict]:
"""Replay every historical run. Use in CI before deploying prompt changes."""
run_ids = get_all_runs()
results = []
regressions = 0
for run_id in run_ids:
delta = replay_run(run_id, verbose=verbose)
results.append(delta)
if delta.get("regression"):
regressions += 1
print(f"\n=== Replay Summary: {len(run_ids)} runs, {regressions} regressions ===")
return resultsIn production, replay against a separate database to avoid polluting your event store with test runs. Use DATABASE_URL=sqlite:///replay_test.db as an env override.
Step 8: Wire It All Together (main.py)
# main.py
from dotenv import load_dotenv
load_dotenv()
from models import RunRequest
from orchestrator import run_agent
from replay import replay_run, replay_all
from db import get_run_events, get_all_runs
def demo_run():
print("=== Running Agent ===\n")
request = RunRequest(
goal="Research the latest developments in AI agent frameworks and summarize the key findings",
max_steps=4,
budget_usd=0.50,
)
result = run_agent(request)
print(f"\nResult: {result['status']} — {result['summary']}")
return result["run_id"]
def show_events(run_id: str):
print(f"\n=== Event Log for {run_id} ===")
events = get_run_events(run_id)
for e in events:
print(f" [{e['seq']}] {e['event_type']} ({e['actor']})")
def demo_replay(run_id: str):
print("\n=== Replaying Run ===")
delta = replay_run(run_id, verbose=True)
return delta
if __name__ == "__main__":
# 1. Run the agent
run_id = demo_run()
# 2. Print the event log
show_events(run_id)
# 3. Replay it (simulates testing a prompt change)
demo_replay(run_id)Running It
cd agent/
python main.pyExpected output:
=== Running Agent ===
[1] RunStarted — orchestrator
[2] PlanGenerated — planner
[3] ToolApproved — policy_guard
[4] ToolExecuted — tool_executor
[5] ObservationCaptured — tool_executor
[6] ToolApproved — policy_guard
[7] ToolExecuted — tool_executor
[8] ObservationCaptured — tool_executor
[9] ResponseFinalized — orchestrator
[10] RunCompleted — orchestrator
Result: success — Completed 2 steps for goal: Research the latest...
=== Event Log for a3f91c2b ===
[1] RunStarted (orchestrator)
[2] PlanGenerated (planner)
...
=== Replaying Run ===
--- REPLAYING run_id=a3f91c2b ---
Original goal: Research the latest developments...
Re-running with current code...
--- REPLAY DELTA ---
Original: 2 steps completed
New: 2 steps completed
Regression: False
Adding a Real Search Tool
Replace the stub in tools.py with a real Brave Search call:
import httpx
import os
def _web_search(params: dict) -> dict:
query = params.get("query", "")
api_key = os.getenv("BRAVE_API_KEY")
response = httpx.get(
"https://api.search.brave.com/res/v1/web/search",
headers={"Accept": "application/json", "X-Subscription-Token": api_key},
params={"q": query, "count": 5},
timeout=10,
)
data = response.json()
results = [
{"title": r["title"], "url": r["url"], "snippet": r.get("description", "")}
for r in data.get("web", {}).get("results", [])
]
return {"success": True, "query": query, "results": results}Get a free Brave Search API key at api.search.brave.com.
What to Build Next
With this foundation running, the natural next steps are:
- Human approval queue — when
PolicyDecision.allowed = False, write to a queue and wait for a human/approveendpoint before continuing - Cost tracking — parse token usage from LLM responses and emit a
CostRecordedevent per step; fail the run ifbudget_usdis exceeded - Postgres + FastAPI — wrap
run_agent()in a POST endpoint, switch to Postgres, and you have a production-ready API - Replay in CI — run
replay_all()in your GitHub Actions pipeline before any prompt change ships
The full source code for this tutorial is structured to be copy-paste complete. Each file is self-contained and imports only from the files above it.
References
- Event-Sourced AI Agents: The Production Blueprint — the architecture overview this code implements
- Martin Fowler — Event Sourcing — the foundational pattern
- Anthropic API Docs — Claude model reference for the planner
- OpenAI Structured Outputs — JSON mode for GPT-based planners
- Pydantic v2 Docs — model validation used throughout
- SQLAlchemy ORM — event store persistence layer
- Brave Search API — recommended search tool integration
- OWASP Top 10 for LLM Applications — security considerations for the policy guard
Related Posts
Event-Sourced AI Agents: The Production Blueprint for 2026
Most AI agents fail in production because they are not replayable, testable, or safe. Learn an event-sourced architecture that gives your agents deterministic behavior, cost control, and enterprise-grade reliability.
Read moreNVIDIA NemoClaw: The One-Command Security Stack Making Autonomous AI Agents Safe to Deploy
NVIDIA's NemoClaw brings policy-based security, privacy guardrails, and local model execution to OpenClaw agents in a single install. Here's what developers need to know.
Read moreMCP: The Developer's Guide to the Protocol Quietly Rewiring AI Applications
Model Context Protocol (MCP) is becoming the USB-C of AI integration — a single standard for connecting LLMs to any tool, database, or API. Here's the architecture, the primitives, and how to build your first server.
Read more