单个 Agent 能做很多事。但真正的复杂场景——比如一个项目需要同时做代码审查、安全审计、文档生成——一个 Agent 的上下文窗口和注意力很快就不够用了。
这就是多 Agent 编排要解决的问题。不是让一个 Agent 更强,而是让多个 Agent 各司其职。
| 问题 | 单 Agent | 多 Agent |
|---|---|---|
| 上下文窗口 | 一个窗口装所有 | 每个 Agent 独立窗口 |
| 专业深度 | 什么都会,什么都不精 | 每个 Agent 专攻一个领域 |
| 并行能力 | 串行执行 | 多个 Agent 同时工作 |
| 容错性 | 挂了全挂 | 单个 Agent 失败不影响全局 |
Agent A 的输出是 Agent B 的输入。像一个工厂流水线。
典型场景:代码生成 → 代码审查 → 安全扫描 → 文档生成。
def sequential_pipeline(task: str) -> str:
# Agent 1: 生成代码
code = agent_coder.run(f"实现以下功能: {task}")
# Agent 2: 审查代码
review = agent_reviewer.run(f"审查以下代码:\n{code}")
if "需要修改" in review:
code = agent_coder.run(f"根据反馈修改:\n{review}\n原代码:\n{code}")
# Agent 3: 安全扫描
security = agent_security.run(f"扫描安全漏洞:\n{code}")
# Agent 4: 生成文档
docs = agent_writer.run(f"为以下代码写文档:\n{code}")
return {"code": code, "review": review,
"security": security, "docs": docs}
多个 Agent 同时处理不同子任务,最后汇总。
典型场景:市场分析——Agent A 看技术面,Agent B 看基本面,Agent C 看资金面,最后汇总。
import concurrent.futures
def parallel_orchestration(market: str) -> dict:
tasks = {
"technical": f"分析 {market} 的技术指标(MACD、RSI、均线)",
"fundamental": f"分析 {market} 的基本面(估值、盈利、增长)",
"sentiment": f"分析 {market} 的市场情绪和新闻",
"flow": f"分析 {market} 的资金流向和持仓变化"
}
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
name: executor.submit(agent_analyst.run, prompt)
for name, prompt in tasks.items()
}
results = {
name: future.result()
for name, future in futures.items()
}
# 汇总 Agent 综合所有分析
summary = agent_summarizer.run(
f"综合以下分析给出结论:\n" +
"\n".join([f"## {k}\n{v}" for k, v in results.items()])
)
return {"analysis": results, "summary": summary}
MCP(Model Context Protocol)是 Anthropic 推出的开放协议,解决了一个关键问题:每个 Agent 的工具和上下文如何标准化共享。
传统问题:Agent A 的工具 Agent B 用不了,Agent B 的上下文 Agent C 读不到。每个 Agent 都是信息孤岛。
MCP 的三个核心概念:
# mcp_config.yaml — Agent 框架的配置文件
servers:
filesystem:
command: "npx"
args: ["-y", "@anthropic/mcp-server-filesystem", "/workspace"]
github:
command: "npx"
args: ["-y", "@anthropic/mcp-server-github"]
env:
GITHUB_TOKEN: "${GITHUB_TOKEN}"
database:
url: "https://db-mcp.internal/mcp"
transport: "http"
配置好后,Agent 自动获得所有这些 Server 提供的工具——搜索文件、读 PR、查数据库——无需为每个工具单独写集成代码。
Agent 不像分类模型——没法用准确率衡量。需要多维度评估:
| 维度 | 怎么测 | 指标 |
|---|---|---|
| 任务完成率 | 给 100 个标准任务,看完成多少 | 完成率 % |
| 工具调用准确率 | 检查每次工具调用是否正确 | 正确调用 / 总调用 |
| 效率 | 完成任务用了多少轮和 token | 平均轮次、token 消耗 |
| 自愈率 | 遇到错误后自己修复的比例 | 自愈次数 / 错误次数 |
| 安全性 | 注入恶意 prompt 看是否执行 | 拦截率 % |
def evaluate_agent(agent, test_suite: list[dict]) -> dict:
"""基础 Agent 评估框架。"""
results = {"passed": 0, "failed": 0, "details": []}
for case in test_suite:
try:
output = agent.run(case["input"])
# 用另一个 Agent 或规则判断是否通过
verdict = judge_agent.run(
f"任务: {case['input']}\n"
f"预期: {case['expected']}\n"
f"实际: {output}\n"
f"判断是否通过(回复 PASS 或 FAIL)"
)
passed = "PASS" in verdict.upper()
results["passed" if passed else "failed"] += 1
results["details"].append({
"task": case["name"],
"passed": passed,
"verdict": verdict
})
except Exception as e:
results["failed"] += 1
results["details"].append({
"task": case["name"], "passed": False, "error": str(e)
})
return results
| 层面 | 要做什么 |
|---|---|
| 沙箱隔离 | Agent 的代码执行必须在 Docker/VM 沙箱中,绝不能直接访问宿主机 |
| 速率限制 | 每个用户/Agent 设 API 调用上限,防止失控消耗 |
| 审计日志 | 记录每个 Agent 的每次工具调用、参数和结果,用于事后分析 |
| 可观测性 | 实时监控 Agent 状态、token 消耗、错误率 |
| 回退策略 | Agent 无法处理时,优雅降级到人工处理 |
📖 下一篇:从零写 Agent 框架 — 可验证的执行追踪与沙箱安全