这是 AI Agent 系列的最后一篇。
前五篇讲了概念、代码实战、记忆、错误恢复、多 Agent 编排。现在我们把所有知识整合起来——从零实现一个生产级的 Agent 框架。
目标不是替代 LangChain,而是理解框架的每一行为什么在那里。
一个完整的 Agent 框架需要这些模块:
agent_framework/
├── core/
│ ├── agent.py # Agent 主循环
│ ├── tool_registry.py # 工具注册与管理
│ └── memory.py # 三层记忆
├── execution/
│ ├── sandbox.py # Docker 沙箱
│ ├── trace.py # 执行追踪
│ └── validator.py # 输出验证
├── orchestration/
│ ├── pipeline.py # 顺序流水线
│ └── parallel.py # 并行分发
└── observability/
├── logger.py # 结构化日志
└── metrics.py # 指标收集
框架的核心不应该是硬编码的 if/elif。工具要像插件一样注册:
from typing import Callable, Any
class ToolRegistry:
"""工具注册中心。"""
def __init__(self):
self._tools: dict[str, Callable] = {}
self._schemas: list[dict] = []
def register(self, name: str, description: str,
parameters: dict, handler: Callable):
"""注册一个工具。"""
self._tools[name] = handler
self._schemas.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters
}
})
def get_schemas(self) -> list[dict]:
return self._schemas
def execute(self, name: str, args: dict) -> str:
if name not in self._tools:
return json.dumps({
"success": False,
"error": f"未知工具: {name}",
"available": list(self._tools.keys())
})
try:
return self._tools[name](**args)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"tool": name,
"args": args
})
# 使用
registry = ToolRegistry()
registry.register(
name="search", description="搜索网页", parameters={...},
handler=search_web
)
registry.register(
name="python", description="执行 Python 代码", parameters={...},
handler=run_python_sandboxed # 沙箱版本
)
让 Agent 执行任意代码是危险的。必须用 Docker 隔离:
import subprocess, tempfile, os
SANDBOX_IMAGE = "python:3.11-slim"
MEMORY_LIMIT = "256m"
CPU_LIMIT = "1.0"
TIMEOUT = 30
def run_in_sandbox(code: str) -> str:
"""在 Docker 沙箱中执行 Python 代码。"""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False
) as f:
f.write(code)
script_path = f.name
try:
result = subprocess.run([
"docker", "run", "--rm",
f"--memory={MEMORY_LIMIT}",
f"--cpus={CPU_LIMIT}",
"--network=none", # 禁止网络
"--read-only", # 只读文件系统
"--tmpfs=/tmp:rw,noexec", # 仅 /tmp 可写
"-v", f"{script_path}:/code.py:ro",
SANDBOX_IMAGE,
"python", "/code.py"
], capture_output=True, text=True, timeout=TIMEOUT)
if result.returncode == 0:
return result.stdout
return json.dumps({
"success": False,
"error": result.stderr[:500],
"exit_code": result.returncode
})
except subprocess.TimeoutExpired:
return json.dumps({
"success": False,
"error": f"代码执行超时(>{TIMEOUT}秒)"
})
finally:
os.unlink(script_path)
--network=none 禁止网络访问,--read-only 防止修改文件系统,--memory 限制资源。这三项缺一不可。
Agent 做了什么是可验证的——不是靠信任,是靠证据。
import time, uuid, json
from dataclasses import dataclass, asdict
@dataclass
class Step:
step_id: str
type: str # "think" | "act" | "observe"
timestamp: float
data: dict
duration_ms: float = 0
class ExecutionTrace:
"""Agent 执行追踪器。"""
def __init__(self, task_id: str = None):
self.task_id = task_id or str(uuid.uuid4())[:8]
self.steps: list[Step] = []
self.start_time = time.time()
def record(self, step_type: str, data: dict, duration_ms: float = 0):
self.steps.append(Step(
step_id=f"{self.task_id}-{len(self.steps)}",
type=step_type,
timestamp=time.time(),
data=data,
duration_ms=duration_ms
))
def export(self, format: str = "json") -> str:
"""导出完整追踪记录。"""
record = {
"task_id": self.task_id,
"total_duration_s": time.time() - self.start_time,
"step_count": len(self.steps),
"steps": [asdict(s) for s in self.steps]
}
if format == "json":
return json.dumps(record, indent=2, ensure_ascii=False)
return str(record)
def replay_summary(self) -> str:
"""生成可读的执行摘要。"""
lines = [f"Task {self.task_id} — {len(self.steps)} steps:"]
for s in self.steps:
icon = {"think": "🤔", "act": "🔧", "observe": "👁"}.get(s.type, "•")
summary = str(s.data)[:100]
lines.append(f" {icon} [{s.type}] {summary}")
return "\n".join(lines)
把所有模块串起来:
class Agent:
def __init__(self, model: str, registry: ToolRegistry,
memory: Memory, trace_enabled: bool = True):
self.model = model
self.registry = registry
self.memory = memory
self.trace_enabled = trace_enabled
self.max_turns = 15
self.max_consecutive_errors = 3
def run(self, user_input: str, task_id: str = None) -> dict:
trace = ExecutionTrace(task_id)
messages = [
{"role": "system", "content": self._build_system_prompt()},
{"role": "user", "content": user_input}
]
consecutive_errors = 0
for turn in range(self.max_turns):
t0 = time.time()
response = client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.registry.get_schemas()
)
msg = response.choices[0].message
duration = (time.time() - t0) * 1000
trace.record("think", {
"turn": turn,
"has_tool_calls": bool(msg.tool_calls),
"content_preview": (msg.content or "")[:100],
"duration_ms": duration
})
if not msg.tool_calls:
# 保存记忆并返回
self.memory.save_fact("last_task", user_input[:200])
return {
"output": msg.content,
"trace": trace.replay_summary(),
"turns": turn + 1
}
for tool_call in msg.tool_calls:
name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
t0 = time.time()
result = self.registry.execute(name, args)
duration = (time.time() - t0) * 1000
trace.record("act", {
"tool": name, "args": args,
"result_preview": result[:200],
"duration_ms": duration
})
error = self._parse_error(result)
if error:
consecutive_errors += 1
if consecutive_errors >= self.max_consecutive_errors:
return {
"output": f"连续 {consecutive_errors} 次失败,已终止。",
"trace": trace.replay_summary(),
"error": error
}
else:
consecutive_errors = 0
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
messages.append(msg)
return {
"output": "达到最大轮次,任务未完成。",
"trace": trace.replay_summary(),
"turns": self.max_turns
}
def _build_system_prompt(self) -> str:
return f"""你是用户的 AI 助手。
## 可用工具
{self.registry.get_schemas()}
## 用户偏好
{self.memory.get_context()}
## 规则
- 优先使用工具获取准确信息
- 遇到错误请根据提示重试
- 最多连续失败 {self.max_consecutive_errors} 次"""
def _parse_error(self, result: str) -> dict | None:
try:
data = json.loads(result)
if isinstance(data, dict) and not data.get("success", True):
return data
except:
pass
return None
class AgentMetrics:
"""Agent 运行指标。"""
def __init__(self):
self.total_tasks = 0
self.completed_tasks = 0
self.total_turns = 0
self.total_tool_calls = 0
self.total_errors = 0
self.total_tokens = 0
def record_task(self, result: dict):
self.total_tasks += 1
if result.get("output") and not result.get("error"):
self.completed_tasks += 1
self.total_turns += result.get("turns", 0)
self.total_errors += 1 if result.get("error") else 0
def summary(self) -> str:
return json.dumps({
"tasks": self.total_tasks,
"completion_rate": f"{self.completed_tasks/max(1,self.total_tasks)*100:.1f}%",
"avg_turns": f"{self.total_turns/max(1,self.total_tasks):.1f}",
"error_rate": f"{self.total_errors/max(1,self.total_tasks)*100:.1f}%"
}, indent=2)
这 300 行代码实现了:
这就是 LangChain、CrewAI 等框架的底层逻辑。它们加了更多集成、更多糖、更多抽象——但骨架是一样的。理解了这个 300 行的核心,你就理解了所有 Agent 框架。
六篇文章,从「什么是 Agent」到「从零写框架」,我们覆盖了:
现在你可以去用 LangChain 了——但这次你知道它内部在干什么。