This is the final article in the AI Agent series.
The first five covered concepts, hands-on code, memory, error recovery, and multi-agent orchestration. Now we put it all together — building a production-grade Agent framework from scratch.
The goal isn't to replace LangChain. It's to understand why every line in a framework is there.
A complete Agent framework needs these modules:
agent_framework/
├── core/
│ ├── agent.py # Main Agent loop
│ ├── tool_registry.py # Tool registration & management
│ └── memory.py # Three-layer memory
├── execution/
│ ├── sandbox.py # Docker sandbox
│ ├── trace.py # Execution tracing
│ └── validator.py # Output validation
├── orchestration/
│ ├── pipeline.py # Sequential pipeline
│ └── parallel.py # Parallel fan-out
└── observability/
├── logger.py # Structured logging
└── metrics.py # Metrics collection
The framework core shouldn't be hardcoded if/elif chains. Tools should register like plugins:
from typing import Callable, Any
class ToolRegistry:
"""Central tool registry."""
def __init__(self):
self._tools: dict[str, Callable] = {}
self._schemas: list[dict] = []
def register(self, name: str, description: str,
parameters: dict, handler: Callable):
"""Register a tool."""
self._tools[name] = handler
self._schemas.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters
}
})
def get_schemas(self) -> list[dict]:
return self._schemas
def execute(self, name: str, args: dict) -> str:
if name not in self._tools:
return json.dumps({
"success": False,
"error": f"Unknown tool: {name}",
"available": list(self._tools.keys())
})
try:
return self._tools[name](**args)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"tool": name,
"args": args
})
# Usage
registry = ToolRegistry()
registry.register(
name="search", description="Search the web", parameters={...},
handler=search_web
)
registry.register(
name="python", description="Execute Python code", parameters={...},
handler=run_python_sandboxed # Sandboxed version
)
Letting an Agent execute arbitrary code is dangerous. Docker isolation is mandatory:
import subprocess, tempfile, os
SANDBOX_IMAGE = "python:3.11-slim"
MEMORY_LIMIT = "256m"
CPU_LIMIT = "1.0"
TIMEOUT = 30
def run_in_sandbox(code: str) -> str:
"""Execute Python code in a Docker sandbox."""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False
) as f:
f.write(code)
script_path = f.name
try:
result = subprocess.run([
"docker", "run", "--rm",
f"--memory={MEMORY_LIMIT}",
f"--cpus={CPU_LIMIT}",
"--network=none", # No network access
"--read-only", # Read-only filesystem
"--tmpfs=/tmp:rw,noexec", # Only /tmp is writable
"-v", f"{script_path}:/code.py:ro",
SANDBOX_IMAGE,
"python", "/code.py"
], capture_output=True, text=True, timeout=TIMEOUT)
if result.returncode == 0:
return result.stdout
return json.dumps({
"success": False,
"error": result.stderr[:500],
"exit_code": result.returncode
})
except subprocess.TimeoutExpired:
return json.dumps({
"success": False,
"error": f"Code execution timed out (>{TIMEOUT}s)"
})
finally:
os.unlink(script_path)
--network=none blocks network access, --read-only prevents filesystem modification, --memory limits resources. All three are non-negotiable.
What the Agent did must be verifiable — not by trust, by evidence.
import time, uuid, json
from dataclasses import dataclass, asdict
@dataclass
class Step:
step_id: str
type: str # "think" | "act" | "observe"
timestamp: float
data: dict
duration_ms: float = 0
class ExecutionTrace:
"""Agent execution tracer."""
def __init__(self, task_id: str = None):
self.task_id = task_id or str(uuid.uuid4())[:8]
self.steps: list[Step] = []
self.start_time = time.time()
def record(self, step_type: str, data: dict, duration_ms: float = 0):
self.steps.append(Step(
step_id=f"{self.task_id}-{len(self.steps)}",
type=step_type,
timestamp=time.time(),
data=data,
duration_ms=duration_ms
))
def export(self, format: str = "json") -> str:
"""Export the complete trace."""
record = {
"task_id": self.task_id,
"total_duration_s": time.time() - self.start_time,
"step_count": len(self.steps),
"steps": [asdict(s) for s in self.steps]
}
if format == "json":
return json.dumps(record, indent=2, ensure_ascii=False)
return str(record)
def replay_summary(self) -> str:
"""Generate a human-readable execution summary."""
lines = [f"Task {self.task_id} — {len(self.steps)} steps:"]
for s in self.steps:
icon = {"think": "🤔", "act": "🔧", "observe": "👁"}.get(s.type, "•")
summary = str(s.data)[:100]
lines.append(f" {icon} [{s.type}] {summary}")
return "\n".join(lines)
Wiring everything together:
class Agent:
def __init__(self, model: str, registry: ToolRegistry,
memory: Memory, trace_enabled: bool = True):
self.model = model
self.registry = registry
self.memory = memory
self.trace_enabled = trace_enabled
self.max_turns = 15
self.max_consecutive_errors = 3
def run(self, user_input: str, task_id: str = None) -> dict:
trace = ExecutionTrace(task_id)
messages = [
{"role": "system", "content": self._build_system_prompt()},
{"role": "user", "content": user_input}
]
consecutive_errors = 0
for turn in range(self.max_turns):
t0 = time.time()
response = client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.registry.get_schemas()
)
msg = response.choices[0].message
duration = (time.time() - t0) * 1000
trace.record("think", {
"turn": turn,
"has_tool_calls": bool(msg.tool_calls),
"content_preview": (msg.content or "")[:100],
"duration_ms": duration
})
if not msg.tool_calls:
self.memory.save_fact("last_task", user_input[:200])
return {
"output": msg.content,
"trace": trace.replay_summary(),
"turns": turn + 1
}
for tool_call in msg.tool_calls:
name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
t0 = time.time()
result = self.registry.execute(name, args)
duration = (time.time() - t0) * 1000
trace.record("act", {
"tool": name, "args": args,
"result_preview": result[:200],
"duration_ms": duration
})
error = self._parse_error(result)
if error:
consecutive_errors += 1
if consecutive_errors >= self.max_consecutive_errors:
return {
"output": f"Failed {consecutive_errors} consecutive times. Aborted.",
"trace": trace.replay_summary(),
"error": error
}
else:
consecutive_errors = 0
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
messages.append(msg)
return {
"output": "Max turns reached. Task incomplete.",
"trace": trace.replay_summary(),
"turns": self.max_turns
}
def _build_system_prompt(self) -> str:
return f"""You are the user's AI assistant.
## Available Tools
{self.registry.get_schemas()}
## User Preferences
{self.memory.get_context()}
## Rules
- Prioritize tools for accurate information
- On errors, follow the suggestion and retry
- Max {self.max_consecutive_errors} consecutive failures"""
def _parse_error(self, result: str) -> dict | None:
try:
data = json.loads(result)
if isinstance(data, dict) and not data.get("success", True):
return data
except:
pass
return None
class AgentMetrics:
"""Agent runtime metrics."""
def __init__(self):
self.total_tasks = 0
self.completed_tasks = 0
self.total_turns = 0
self.total_tool_calls = 0
self.total_errors = 0
self.total_tokens = 0
def record_task(self, result: dict):
self.total_tasks += 1
if result.get("output") and not result.get("error"):
self.completed_tasks += 1
self.total_turns += result.get("turns", 0)
self.total_errors += 1 if result.get("error") else 0
def summary(self) -> str:
return json.dumps({
"tasks": self.total_tasks,
"completion_rate": f"{self.completed_tasks/max(1,self.total_tasks)*100:.1f}%",
"avg_turns": f"{self.total_turns/max(1,self.total_tasks):.1f}",
"error_rate": f"{self.total_errors/max(1,self.total_tasks)*100:.1f}%"
}, indent=2)
This is the underlying logic of LangChain, CrewAI, and similar frameworks. They add more integrations, more syntactic sugar, more abstractions — but the skeleton is the same. Understand this ~300-line core, and you understand every Agent framework.
Six articles, from "What is an Agent" to "Build a Framework from Scratch":
Go ahead and use LangChain now — but this time, you know what's happening under the hood.