从零写 Agent 框架 — 可验证的执行追踪与沙箱安全

这是 AI Agent 系列的最后一篇。

前五篇讲了概念、代码实战、记忆、错误恢复、多 Agent 编排。现在我们把所有知识整合起来——从零实现一个生产级的 Agent 框架

目标不是替代 LangChain,而是理解框架的每一行为什么在那里

框架骨架

一个完整的 Agent 框架需要这些模块:

agent_framework/
├── core/
│   ├── agent.py          # Agent 主循环
│   ├── tool_registry.py  # 工具注册与管理
│   └── memory.py         # 三层记忆
├── execution/
│   ├── sandbox.py         # Docker 沙箱
│   ├── trace.py           # 执行追踪
│   └── validator.py       # 输出验证
├── orchestration/
│   ├── pipeline.py        # 顺序流水线
│   └── parallel.py        # 并行分发
└── observability/
    ├── logger.py           # 结构化日志
    └── metrics.py          # 指标收集

1. 工具注册中心

框架的核心不应该是硬编码的 if/elif。工具要像插件一样注册:

from typing import Callable, Any

class ToolRegistry:
    """工具注册中心。"""
    def __init__(self):
        self._tools: dict[str, Callable] = {}
        self._schemas: list[dict] = []

    def register(self, name: str, description: str,
                 parameters: dict, handler: Callable):
        """注册一个工具。"""
        self._tools[name] = handler
        self._schemas.append({
            "type": "function",
            "function": {
                "name": name,
                "description": description,
                "parameters": parameters
            }
        })

    def get_schemas(self) -> list[dict]:
        return self._schemas

    def execute(self, name: str, args: dict) -> str:
        if name not in self._tools:
            return json.dumps({
                "success": False,
                "error": f"未知工具: {name}",
                "available": list(self._tools.keys())
            })
        try:
            return self._tools[name](**args)
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "tool": name,
                "args": args
            })

# 使用
registry = ToolRegistry()
registry.register(
    name="search", description="搜索网页", parameters={...},
    handler=search_web
)
registry.register(
    name="python", description="执行 Python 代码", parameters={...},
    handler=run_python_sandboxed  # 沙箱版本
)

2. 沙箱执行

让 Agent 执行任意代码是危险的。必须用 Docker 隔离:

import subprocess, tempfile, os

SANDBOX_IMAGE = "python:3.11-slim"
MEMORY_LIMIT = "256m"
CPU_LIMIT = "1.0"
TIMEOUT = 30

def run_in_sandbox(code: str) -> str:
    """在 Docker 沙箱中执行 Python 代码。"""
    with tempfile.NamedTemporaryFile(
        mode="w", suffix=".py", delete=False
    ) as f:
        f.write(code)
        script_path = f.name

    try:
        result = subprocess.run([
            "docker", "run", "--rm",
            f"--memory={MEMORY_LIMIT}",
            f"--cpus={CPU_LIMIT}",
            "--network=none",           # 禁止网络
            "--read-only",              # 只读文件系统
            "--tmpfs=/tmp:rw,noexec",   # 仅 /tmp 可写
            "-v", f"{script_path}:/code.py:ro",
            SANDBOX_IMAGE,
            "python", "/code.py"
        ], capture_output=True, text=True, timeout=TIMEOUT)

        if result.returncode == 0:
            return result.stdout
        return json.dumps({
            "success": False,
            "error": result.stderr[:500],
            "exit_code": result.returncode
        })
    except subprocess.TimeoutExpired:
        return json.dumps({
            "success": False,
            "error": f"代码执行超时(>{TIMEOUT}秒)"
        })
    finally:
        os.unlink(script_path)
⚠️ 安全要点:--network=none 禁止网络访问,--read-only 防止修改文件系统,--memory 限制资源。这三项缺一不可。

3. 执行追踪

Agent 做了什么是可验证的——不是靠信任,是靠证据。

import time, uuid, json
from dataclasses import dataclass, asdict

@dataclass
class Step:
    step_id: str
    type: str          # "think" | "act" | "observe"
    timestamp: float
    data: dict
    duration_ms: float = 0

class ExecutionTrace:
    """Agent 执行追踪器。"""
    def __init__(self, task_id: str = None):
        self.task_id = task_id or str(uuid.uuid4())[:8]
        self.steps: list[Step] = []
        self.start_time = time.time()

    def record(self, step_type: str, data: dict, duration_ms: float = 0):
        self.steps.append(Step(
            step_id=f"{self.task_id}-{len(self.steps)}",
            type=step_type,
            timestamp=time.time(),
            data=data,
            duration_ms=duration_ms
        ))

    def export(self, format: str = "json") -> str:
        """导出完整追踪记录。"""
        record = {
            "task_id": self.task_id,
            "total_duration_s": time.time() - self.start_time,
            "step_count": len(self.steps),
            "steps": [asdict(s) for s in self.steps]
        }
        if format == "json":
            return json.dumps(record, indent=2, ensure_ascii=False)
        return str(record)

    def replay_summary(self) -> str:
        """生成可读的执行摘要。"""
        lines = [f"Task {self.task_id} — {len(self.steps)} steps:"]
        for s in self.steps:
            icon = {"think": "🤔", "act": "🔧", "observe": "👁"}.get(s.type, "•")
            summary = str(s.data)[:100]
            lines.append(f"  {icon} [{s.type}] {summary}")
        return "\n".join(lines)

4. 整合:完整的 Agent 核心

把所有模块串起来:

class Agent:
    def __init__(self, model: str, registry: ToolRegistry,
                 memory: Memory, trace_enabled: bool = True):
        self.model = model
        self.registry = registry
        self.memory = memory
        self.trace_enabled = trace_enabled
        self.max_turns = 15
        self.max_consecutive_errors = 3

    def run(self, user_input: str, task_id: str = None) -> dict:
        trace = ExecutionTrace(task_id)
        messages = [
            {"role": "system", "content": self._build_system_prompt()},
            {"role": "user", "content": user_input}
        ]
        consecutive_errors = 0

        for turn in range(self.max_turns):
            t0 = time.time()
            response = client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.registry.get_schemas()
            )
            msg = response.choices[0].message
            duration = (time.time() - t0) * 1000

            trace.record("think", {
                "turn": turn,
                "has_tool_calls": bool(msg.tool_calls),
                "content_preview": (msg.content or "")[:100],
                "duration_ms": duration
            })

            if not msg.tool_calls:
                # 保存记忆并返回
                self.memory.save_fact("last_task", user_input[:200])
                return {
                    "output": msg.content,
                    "trace": trace.replay_summary(),
                    "turns": turn + 1
                }

            for tool_call in msg.tool_calls:
                name = tool_call.function.name
                args = json.loads(tool_call.function.arguments)

                t0 = time.time()
                result = self.registry.execute(name, args)
                duration = (time.time() - t0) * 1000

                trace.record("act", {
                    "tool": name, "args": args,
                    "result_preview": result[:200],
                    "duration_ms": duration
                })

                error = self._parse_error(result)
                if error:
                    consecutive_errors += 1
                    if consecutive_errors >= self.max_consecutive_errors:
                        return {
                            "output": f"连续 {consecutive_errors} 次失败,已终止。",
                            "trace": trace.replay_summary(),
                            "error": error
                        }
                else:
                    consecutive_errors = 0

                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result
                })

            messages.append(msg)

        return {
            "output": "达到最大轮次,任务未完成。",
            "trace": trace.replay_summary(),
            "turns": self.max_turns
        }

    def _build_system_prompt(self) -> str:
        return f"""你是用户的 AI 助手。
## 可用工具
{self.registry.get_schemas()}

## 用户偏好
{self.memory.get_context()}

## 规则
- 优先使用工具获取准确信息
- 遇到错误请根据提示重试
- 最多连续失败 {self.max_consecutive_errors} 次"""

    def _parse_error(self, result: str) -> dict | None:
        try:
            data = json.loads(result)
            if isinstance(data, dict) and not data.get("success", True):
                return data
        except:
            pass
        return None

5. 可观测性

class AgentMetrics:
    """Agent 运行指标。"""
    def __init__(self):
        self.total_tasks = 0
        self.completed_tasks = 0
        self.total_turns = 0
        self.total_tool_calls = 0
        self.total_errors = 0
        self.total_tokens = 0

    def record_task(self, result: dict):
        self.total_tasks += 1
        if result.get("output") and not result.get("error"):
            self.completed_tasks += 1
        self.total_turns += result.get("turns", 0)
        self.total_errors += 1 if result.get("error") else 0

    def summary(self) -> str:
        return json.dumps({
            "tasks": self.total_tasks,
            "completion_rate": f"{self.completed_tasks/max(1,self.total_tasks)*100:.1f}%",
            "avg_turns": f"{self.total_turns/max(1,self.total_tasks):.1f}",
            "error_rate": f"{self.total_errors/max(1,self.total_tasks)*100:.1f}%"
        }, indent=2)

这个 300 行框架的价值

这 300 行代码实现了:

  1. 插件化工具系统——注册即用,不碰核心代码
  2. Docker 沙箱——网络隔离、只读文件系统、资源限制
  3. 完整执行追踪——每一步都有时间戳和上下文,可审计可回放
  4. 自愈循环——错误检测 + 结构化反馈
  5. 指标收集——完成率、错误率、平均轮次

这就是 LangChain、CrewAI 等框架的底层逻辑。它们加了更多集成、更多糖、更多抽象——但骨架是一样的。理解了这个 300 行的核心,你就理解了所有 Agent 框架。

系列结语

六篇文章,从「什么是 Agent」到「从零写框架」,我们覆盖了:

  1. Agent 的本质 — ReAct 循环
  2. 动手实战 — 50 行代码跑起来
  3. 记忆系统 — 让 Agent 记住你
  4. 错误恢复 — 让 Agent 自己修
  5. 多 Agent 编排 — 分工协作
  6. 从零造框架 — 理解每一行

现在你可以去用 LangChain 了——但这次你知道它内部在干什么。