Building an Agent Framework from Scratch — Verifiable Traces & Sandbox Security May 14, 2026 · Expert Core takeaway: Every Agent framework shares the same skeleton — tool registry, sandbox execution, trace logging, self-healing loop. Understand these ~300 lines, and you understand the underlying logic of LangChain, CrewAI, and beyond. This is the final article in the AI Agent series. The first five covered concepts, hands-on code, memory, error recovery, and multi-agent orchestration. Now we put it all together — building a production-grade Agent framework from scratch . The goal isn't to replace LangChain. It's to understand why every line in a framework is there . Framework Skeleton A complete Agent framework needs these modules: agent_framework/ ├── core/ │ ├── agent.py # Main Agent loop │ ├── tool_registry.py # Tool registration & management │ └── memory.py # Three-layer memory ├── execution/ │ ├── sandbox.py # Docker sandbox │ ├── trace.py # Execution tracing │ └── validator.py # Output validation ├── orchestration/ │ ├── pipeline.py # Sequential pipeline │ └── parallel.py # Parallel fan-out └── observability/ ├── logger.py # Structured logging └── metrics.py # Metrics collection 1. Tool Registry The framework core shouldn't be hardcoded if/elif chains. Tools should register like plugins: from typing import Callable, Any class ToolRegistry: """Central tool registry.""" def __init__(self): self._tools: dict[str, Callable] = {} self._schemas: list[dict] = [] def register(self, name: str, description: str, parameters: dict, handler: Callable): """Register a tool.""" self._tools[name] = handler self._schemas.append({ "type": "function", "function": { "name": name, "description": description, "parameters": parameters } }) def get_schemas(self) -> list[dict]: return self._schemas def execute(self, name: str, args: dict) -> str: if name not in self._tools: return json.dumps({ "success": False, "error": f"Unknown tool: {name}", "available": list(self._tools.keys()) }) try: return self._tools[name](**args) except Exception as e: return json.dumps({ "success": False, "error": str(e), "tool": name, "args": args }) # Usage registry = ToolRegistry() registry.register( name="search", description="Search the web", parameters={...}, handler=search_web ) registry.register( name="python", description="Execute Python code", parameters={...}, handler=run_python_sandboxed # Sandboxed version ) 2. Sandbox Execution Letting an Agent execute arbitrary code is dangerous. Docker isolation is mandatory: import subprocess, tempfile, os SANDBOX_IMAGE = "python:3.11-slim" MEMORY_LIMIT = "256m" CPU_LIMIT = "1.0" TIMEOUT = 30 def run_in_sandbox(code: str) -> str: """Execute Python code in a Docker sandbox.""" with tempfile.NamedTemporaryFile( mode="w", suffix=".py", delete=False ) as f: f.write(code) script_path = f.name try: result = subprocess.run([ "docker", "run", "--rm", f"--memory={MEMORY_LIMIT}", f"--cpus={CPU_LIMIT}", "--network=none", # No network access "--read-only", # Read-only filesystem "--tmpfs=/tmp:rw,noexec", # Only /tmp is writable "-v", f"{script_path}:/code.py:ro", SANDBOX_IMAGE, "python", "/code.py" ], capture_output=True, text=True, timeout=TIMEOUT) if result.returncode == 0: return result.stdout return json.dumps({ "success": False, "error": result.stderr[:500], "exit_code": result.returncode }) except subprocess.TimeoutExpired: return json.dumps({ "success": False, "error": f"Code execution timed out (>{TIMEOUT}s)" }) finally: os.unlink(script_path) ⚠️ Security essentials: --network=none blocks network access, --read-only prevents filesystem modification, --memory limits resources. All three are non-negotiable. 3. Execution Tracing What the Agent did must be verifiable — not by trust, by evidence. import time, uuid, json from dataclasses import dataclass, asdict @dataclass class Step: step_id: str type: str # "think" | "act" | "observe" timestamp: float data: dict duration_ms: float = 0 class ExecutionTrace: """Agent execution tracer.""" def __init__(self, task_id: str = None): self.task_id = task_id or str(uuid.uuid4())[:8] self.steps: list[Step] = [] self.start_time = time.time() def record(self, step_type: str, data: dict, duration_ms: float = 0): self.steps.append(Step( step_id=f"{self.task_id}-{len(self.steps)}", type=step_type, timestamp=time.time(), data=data, duration_ms=duration_ms )) def export(self, format: str = "json") -> str: """Export the complete trace.""" record = { "task_id": self.task_id, "total_duration_s": time.time() - self.start_time, "step_count": len(self.steps), "steps": [asdict(s) for s in self.steps] } if format == "json": return json.dumps(record, indent=2, ensure_ascii=False) return str(record) def replay_summary(self) -> str: """Generate a human-readable execution summary.""" lines = [f"Task {self.task_id} — {len(self.steps)} steps:"] for s in self.steps: icon = {"think": "🤔", "act": "🔧", "observe": "👁"}.get(s.type, "•") summary = str(s.data)[:100] lines.append(f" {icon} [{s.type