Agent 消息 Schema 设计:让多 Agent 协作可验证、可追踪
30秒要点
- 核心问题:多 Agent 系统中,字段名不匹配会导致静默失败、Pipeline 断链无法定位、Schema 升级崩掉下游——因为没有显式的消息契约。
- 解决方案:四层消息 Schema 模型——数据层承载实际载荷、元数据层负责路由与身份、验证层提供完整性证明、路由层控制投递语义——每层独立演进、互不耦合。
- 关键产出:五类核心消息类型(TaskHandoff、ToolResult、ApprovalRequest、StatusUpdate、ErrorReport)的完整 JSON Schema 定义 + discriminated union 路由,可直接复制到项目中使用。
- 读完能做什么:为你的多 Agent 系统设计一套可版本化、可验证、可演化且不绑定任何框架的消息 Schema,从根本上消灭追踪断链和版本不兼容。
1. 为什么多 Agent 协作需要消息 Schema?——从追踪断链说起
先看一段真实发生过的事故。一个电商客服系统有三个 Agent 协作:订单查询 Agent 接收用户查询请求 → 库存检查 Agent 验证商品可用性 → 通知 Agent 将结果推送给用户。系统上线第一周就跑得很顺畅——直到有一天,仓库盘点导致某个 SKU 库存归零,但用户还是收到了"商品有货"的通知。
排查过程花了整整 4 个小时。最终发现问题的根因:
# Agent A(订单查询 Agent)的输出
{
"task_id": "ord-2024-0812",
"result": "pass", # ← 字段名: result
"inventory": {"sku": "A001", "available": 0}
}
# Agent B(库存检查 Agent)的解析逻辑
def handle_order_query(msg):
status = msg.get("output") # ← 期望字段: output
if status == "pass": # status 是 None,跳过所有判断
notify_user("商品已确认,预计3日发货")
# 没有任何 else 分支,静默失败
Agent A 用 result 字段传递结果,Agent B 期望的是 output。因为消息体里完全没有 Schema 定义,两个 Agent 之间形成了一种 隐式消息约定(implicit contract)——它只存在于开发者的脑子里,不存在于任何可被验证的结构中。当约定不一致时,系统不会报错、不会告警、不会回滚——它只是静默地做错了事。
这不仅仅是字段名不匹配的问题。在多 Agent Pipeline 中,隐式消息约定有三个系统性的失败模式:
① 静默吞错(Silent Failure):Agent B 解析 Agent A 的消息时,字段名对不上、类型不符合、嵌套结构不匹配——但代码里没有验证逻辑(或者验证逻辑太弱),导致错误数据一路传播到 Pipeline 末端。你看到的最终输出可能是错的,但无法回溯到哪个环节引入的。
② 追踪断链(Traceability Break):5 个 Agent 协作完成一个任务,中间某个 Agent 产出了错误格式的消息。因为每个 Agent 都是独立部署、独立迭代的,你不知道哪个 Agent 的哪个版本产出了这份消息。没有 sender_agent_id、没有 agent_version、没有 schema_version——追踪链在第一个 Agent 之后就断了。
③ 版本耦合(Version Coupling):你对 Agent A 加了三个新字段,升级部署。Agent B 还在跑老版本,解析到不认识的字段时崩溃了。你发现 Agent A 和 Agent B 之间没有任何兼容性约定——加字段 = 崩下游。
这些问题有一个共同的根因:Agent 之间的消息格式被当成了"JSON 随便传"的实现细节,而不是一个需要显式设计的架构契约。
显式消息 Schema 提供三个核心价值:
① 合同性(Contract):Schema 是 Agent A 和 Agent B 之间的正式契约——"我承诺输出这些字段,你可以依赖这些字段存在"。Schema 可以被自动验证、可以被 CI 检查、可以被文档化。它把"开发者脑子里的约定"变成了"机器可执行的规则"。
② 可追踪性(Traceability):通过在消息体中嵌入 sender_agent_id、agent_version、schema_version 和 content_hash,任何一条消息都可以被独立验证——"这条消息是哪个 Agent 的哪个版本产出的?和上游发来的内容一致吗?"
③ 可演化性(Evolvability):有了显式的 schema_version 字段和兼容性规则,Agent A 升级 Schema 时,Agent B 可以根据版本号决定如何解析——"v1.2.0 的消息我能读,1.0 的字段我都认识,新增的可选字段我忽略"。
做一个对比:传统微服务有 OpenAPI / gRPC proto 作为服务间通信的契约——定义字段、类型、必选/可选、版本。开发者不会把两个没有 API 文档的微服务直接对接。但在 Agent 系统中,绝大多数开发者就是用 {"key": "value"} 裸传 JSON,没有 Schema、没有验证、没有版本管理。
2. Schema 设计的四层模型:数据、元数据、验证、路由
当你决定"要给 Agent 消息设计 Schema"时,第一个问题就是:消息体里到底应该包含哪些字段?
开发者的典型反应是两个极端——要么只放一个 content 字段,把所有东西序列化成字符串塞进去(设计不足);要么把能想到的所有字段全加上——trace_id、span_id、parent_span_id、timestamp、created_at、updated_at、version、api_version……结果一个最简单的任务交接消息变成了 40 个字段(设计过度)。
这两种做法都不对。Agent 消息 Schema 不是一张扁平的大表——它的设计应该分层,每一层解决不同维度的关注点。经过对现有多 Agent 框架(Google A2A、OpenAI Agents SDK、CrewAI、AutoGen)消息格式的逆向分析,我们提炼出一个四层模型:
| 层(Layer) | 职责 | 关键字段 | 何时需要 |
|---|---|---|---|
| 数据层 (Data Layer) | 承载实际业务载荷——"做什么事、给什么参数、出什么结果" | content, parameters, results, artifacts | 所有消息都必须有 |
| 元数据层 (Metadata Layer) | 标识消息身份——"谁发的、发给谁、什么时候、属于哪个任务" | task_id, sender_agent_id, receiver_agent_id, timestamp, correlation_id | ≥2 个 Agent 协作时必须 |
| 验证层 (Verification Layer) | 提供完整性证明——"消息内容有没有被篡改?和上游发的一致吗?" | content_hash, schema_version, agent_signature | 生产环境跨进程/跨主机通信时需要 |
| 路由层 (Routing Layer) | 控制投递语义——"消息发几次?多久超时?失败了怎么办?" | priority, ttl, max_retries, idempotency_key, reply_to | 异步消息队列 / 事件驱动架构时需要 |
用一个完整的电商场景来串起四层:用户下单→风控审核→支付→通知。每一步由一个独立的 Agent 处理,消息在它们之间传递:
Layer 1 — 数据层:承载业务载荷
这是消息的"what"——这条消息到底在传递什么业务内容。数据层存在于每条消息中,不区分场景。
{
"data": {
"content": {
"action": "create_order",
"order_id": "ORD-2024-1205",
"product": {"sku": "PHONE-X1", "quantity": 1, "unit_price": 4999.00},
"customer": {"user_id": "u_8842", "vip_level": "gold"},
"total_amount": 4999.00
},
"parameters": {
"payment_method": "wechat_pay",
"delivery_address_id": "addr_7721"
},
"results": null,
"artifacts": []
}
}
数据层只描述业务语义,不包含任何基础设施关注点。字段的设计遵循一个简单原则:content 是这次交互的核心内容,parameters 是附加约束,results 是执行后的产出(本轮消息可能为空),artifacts 是二进制/大文件的引用而非内容本身(ObjRef)。
Layer 2 — 元数据层:标识消息身份
这是消息的"who、when、where"——只要系统里有 ≥2 个 Agent,元数据层就不可省略。没有它,你无法在日志/追踪系统里把消息和任务关联起来。
{
"metadata": {
"message_id": "msg-8f3a1b2c",
"task_id": "task-2024-1205-001",
"correlation_id": "corr-9b2c3d4e",
"sender_agent_id": "order-intake-agent",
"sender_agent_version": "2.1.0",
"receiver_agent_id": "risk-review-agent",
"timestamp": "2024-12-05T14:23:11.482Z",
"ttl_seconds": 300
}
}
关键设计决策:message_id 和 task_id 是两层标识——message_id 标识这一条消息本身(唯一,用于去重),task_id 标识这个任务的全生命周期(一个任务可能产生 N 条消息)。correlation_id 是可选字段,用于关联多个任务构成的工作流(比如一个"下单任务"属于某个"用户会话")。
Layer 3 — 验证层:内容完整性证明
这是消息的"proof"——当 Agent 跨越进程边界(尤其是通过消息队列或 HTTP 在不同主机间通信)时,你需要在每条消息中携带完整性证明。
{
"verification": {
"schema_version": "1.2.0",
"content_hash": "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"content_hash_algorithm": "sha256",
"agent_signature": null
}
}
content_hash 是数据层的 SHA-256 哈希——下游 Agent 收到消息后可以重新计算数据层的哈希并与这个值比对,确认"上游发来的内容在传输过程中没有被修改"。schema_version 让下游知道"这条消息用的是哪个版本的 Schema",从而选择正确的解析策略。agent_signature 是可选的数字签名,用于跨信任域的身份认证(生产环境中强烈建议开启)。
Layer 4 — 路由层:投递语义控制
这是消息的"how to deliver"——当消息不是同步调用而是通过消息队列异步传递时,你需要控制投递行为。
{
"routing": {
"priority": "high",
"max_retries": 3,
"retry_backoff": "exponential",
"idempotency_key": "idem-msg-8f3a1b2c",
"reply_to": "queue:order-intake-responses",
"dead_letter_queue": "queue:dlq-orders"
}
}
idempotency_key 是路由层最重要的字段——消息队列可能因网络抖动而重复投递同一条消息,接收方用 key 做去重,保证"同一条消息最多处理一次"。reply_to 实现请求-响应模式:发送方指定回复应该投递到哪个队列。
什么时候用几层?
并非所有消息都需要四层全开。选择多少层取决于你的部署架构:
- 单进程内多个 Agent:只需要数据层 + 元数据层(前两层)。消息在内存中传递,不需要验证和路由。
- 同主机不同进程间:数据层 + 元数据层 + 验证层(前三层)。跨进程通信需要完整性验证,但不需要消息队列语义。
- 跨主机通过消息队列:四层全开。需要完整的内容完整性证明和投递语义控制。
这四层的核心设计原则是:上层不依赖下层,下层只提供服务。你可以只改路由层的重试策略,完全不影响数据层和元数据层。这种解耦让每个层级可以独立演进。
3. 五类核心消息类型:任务交接、工具结果、审批请求、状态更新、错误报告
有了四层模型作为"设计框架",下一步是定义具体的消息类型。很多多 Agent 系统犯的一个常见错误是:所有消息共用 {"type": "message", "data": {...}} 这一个结构——类型字段太弱,无法区分任务交接和工具结果、无法做类型安全的路由、下游 Agent 永远在用 if "field" in msg 的方式猜测消息结构。
一个好的 Agent 消息系统需要明确的消息类型分类,每种类型有自己独立的 Schema。经过对多 Agent 系统交互模式的归纳,Agent 之间通信不外乎五类核心消息:
| 消息类型 | 场景 | 方向 | message_type 值 |
|---|---|---|---|
| TaskHandoff | Agent A 将任务移交给 Agent B | 单向(A → B) | task_handoff |
| ToolResult | Agent A 收到工具执行结果 | 应答(Tool → A) | tool_result |
| ApprovalRequest | Agent A 请求人工或上级 Agent 审批 | 请求(A → Human/B) | approval_request |
| StatusUpdate | Agent A 汇报任务进度给协调 Agent | 推送(A → Orchestrator) | status_update |
| ErrorReport | Agent A 报告自身或下游的错误 | 广播(A → Monitor/Orchestrator) | error_report |
下面逐个给出每种类型的完整 JSON Schema 定义。每个 Schema 都包含必填字段、可选字段、使用场景和反模式——你可以直接在项目中使用或按需裁剪。
3.1 TaskHandoff — 任务交接
定义:Agent A 将一个任务正式转交给 Agent B,同时传递任务规格和上下文。这是多 Agent 系统中最核心的消息类型。
必填字段:task_id | from_agent | to_agent | task_spec
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "TaskHandoff",
"type": "object",
"required": ["task_id", "from_agent", "to_agent", "task_spec", "schema_version"],
"properties": {
"message_type": {
"const": "task_handoff"
},
"task_id": {
"type": "string",
"description": "任务全局唯一标识,贯穿整个任务生命周期"
},
"from_agent": {
"type": "string",
"description": "发送方 Agent 标识"
},
"to_agent": {
"type": "string",
"description": "接收方 Agent 标识"
},
"task_spec": {
"type": "object",
"required": ["action", "input"],
"properties": {
"action": {
"type": "string",
"description": "要执行的操作名称,例如 'review_order'"
},
"input": {
"type": "object",
"description": "操作所需的输入参数"
},
"context": {
"type": "object",
"description": "从前序任务中继承的上下文(可选)"
},
"constraints": {
"type": "object",
"properties": {
"max_duration_seconds": {"type": "integer"},
"required_confidence": {"type": "number", "minimum": 0, "maximum": 1}
}
}
}
},
"priority": {
"type": "string",
"enum": ["low", "normal", "high", "critical"],
"default": "normal"
},
"schema_version": {
"type": "string",
"pattern": "^\\d+\\.\\d+\\.\\d+$"
}
}
}
使用场景:电商风控审核 Agent 完成风险评估后,将审核结果移交给支付处理 Agent——携带 action: "process_payment" + 订单信息。
常见反模式:把 from_agent 和 to_agent 写成硬编码的字符串。正确做法是从 Agent 注册表中获取标识符。
3.2 ToolResult — 工具结果
定义:Agent 调用外部工具(API、数据库、文件系统)后,工具返回的结果。这是 Agent→工具→Agent 闭环中的"返回链路"。
必填字段:tool_call_id | tool_name | output
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "ToolResult",
"type": "object",
"required": ["tool_call_id", "tool_name", "output", "schema_version"],
"properties": {
"message_type": {
"const": "tool_result"
},
"tool_call_id": {
"type": "string",
"description": "对应的工具调用 ID,用于匹配请求和响应"
},
"tool_name": {
"type": "string",
"description": "被调用的工具名称,例如 'check_inventory'"
},
"output": {
"oneOf": [
{"type": "object"},
{"type": "array"},
{"type": "string"},
{"type": "number"},
{"type": "boolean"},
{"type": "null"}
],
"description": "工具执行结果——可以是任意 JSON 类型"
},
"error": {
"type": "object",
"description": "工具执行出错时的错误信息(可选,有错时必填)",
"properties": {
"code": {"type": "string"},
"message": {"type": "string"},
"details": {"type": "object"}
}
},
"duration_ms": {
"type": "integer",
"description": "工具执行耗时,单位毫秒"
},
"is_truncated": {
"type": "boolean",
"default": false,
"description": "输出是否因超出大小限制而被截断"
},
"schema_version": {
"type": "string",
"pattern": "^\\d+\\.\\d+\\.\\d+$"
}
}
}
使用场景:库存检查 Agent 调用 check_inventory 工具查询 SKU 库存后,工具返回 {"sku": "A001", "stock": 0, "warehouse": "SH-1"}。
常见反模式:把 100KB 的完整 API 响应原样塞进 output 字段——不只会撑爆 LLM 上下文窗口,还会泄露敏感信息(token、密钥)。正确做法是在工具上下文层做修剪和脱敏后再放进 output。
3.3 ApprovalRequest — 审批请求
定义:Agent 在执行高风险操作(退款、删除数据、修改生产配置)之前,向人类或上级 Agent 发起的审批请求。
必填字段:request_id | action | resource | reason
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "ApprovalRequest",
"type": "object",
"required": ["request_id", "action", "resource", "reason", "schema_version"],
"properties": {
"message_type": {
"const": "approval_request"
},
"request_id": {
"type": "string",
"description": "审批请求唯一标识"
},
"action": {
"type": "string",
"description": "请求执行的操作,例如 'refund'、'delete_user'"
},
"resource": {
"type": "object",
"description": "操作涉及的目标资源",
"properties": {
"type": {"type": "string"},
"id": {"type": "string"},
"summary": {"type": "string"}
}
},
"reason": {
"type": "string",
"description": "Agent 发起此操作的推理过程摘要——审批者需要理解 Agent 为什么做这个决定"
},
"context": {
"type": "object",
"description": "辅助审批者决策的上下文信息(可选)"
},
"timeout_seconds": {
"type": "integer",
"default": 3600,
"description": "审批请求的超时时间,超时后自动拒绝"
},
"risk_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "操作风险评估"
},
"schema_version": {
"type": "string",
"pattern": "^\\d+\\.\\d+\\.\\d+$"
}
}
}
使用场景:退款处理 Agent 检测到退款金额超过 ¥5000,触发人工审批——action: "refund" + resource: {order_id: "ORD-8842", amount: 7200.00}。
常见反模式:审批请求只传操作和资源,不传 reason 和 context。审批者需要在另一个系统里查半天才能理解 Agent 为什么做了这个决定——等于把推理成本转嫁给了人。
3.4 StatusUpdate — 状态更新
定义:Agent 在执行过程中主动向协调者(Orchestrator)或监控系统汇报自身状态和进度。
必填字段:task_id | new_status | timestamp
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "StatusUpdate",
"type": "object",
"required": ["task_id", "new_status", "timestamp", "schema_version"],
"properties": {
"message_type": {
"const": "status_update"
},
"task_id": {
"type": "string",
"description": "关联的任务 ID"
},
"new_status": {
"type": "string",
"enum": ["pending", "running", "waiting_for_approval", "waiting_for_tool",
"completed", "failed", "cancelled", "timed_out"],
"description": "任务新状态"
},
"previous_status": {
"type": "string",
"description": "任务前一个状态(可选,用于状态机校验)"
},
"progress_pct": {
"type": "integer",
"minimum": 0,
"maximum": 100,
"description": "任务完成百分比"
},
"message": {
"type": "string",
"description": "人类可读的状态描述"
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"next_expected_status": {
"type": "string",
"description": "预计下一个状态(可选,用于预判)"
},
"schema_version": {
"type": "string",
"pattern": "^\\d+\\.\\d+\\.\\d+$"
}
}
}
使用场景:订单处理 Orchestrator 追踪三个子 Agent 的进度,每个子 Agent 在关键步骤完成后发送 StatusUpdate——Orchestrator 汇总展示"订单 8842:风控审核✅ → 支付处理⏳ 85% → 通知待发送"。
常见反模式:把 new_status 设计成自由字符串(如 "almost done"、"处理中...")。没有枚举约束的状态字段让下游无法做任何自动化处理——正确的做法是用枚举值 + 可选的 message 字段承载人类可读信息。
3.5 ErrorReport — 错误报告
定义:Agent 在执行过程中遇到无法自行恢复的错误时,向监控系统或协调者发出的错误报告。
必填字段:error_code | error_message | source_agent | timestamp
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "ErrorReport",
"type": "object",
"required": ["error_code", "error_message", "source_agent", "timestamp", "schema_version"],
"properties": {
"message_type": {
"const": "error_report"
},
"error_code": {
"type": "string",
"description": "机器可读的错误码,例如 'TOOL_TIMEOUT'、'SCHEMA_VALIDATION_FAILED'",
"examples": ["TOOL_TIMEOUT", "UPSTREAM_UNREACHABLE", "SCHEMA_VALIDATION_FAILED",
"LLM_RATE_LIMITED", "INVALID_TASK_SPEC"]
},
"error_message": {
"type": "string",
"description": "人类可读的错误描述"
},
"severity": {
"type": "string",
"enum": ["warning", "error", "critical"],
"default": "error"
},
"source_agent": {
"type": "string",
"description": "产生错误的 Agent 标识"
},
"source_task_id": {
"type": "string",
"description": "发生错误时正在执行的任务 ID"
},
"stack_trace": {
"type": "string",
"description": "技术栈追踪(可选,仅调试模式开启)"
},
"recovery_hint": {
"type": "string",
"description": "Agent 对恢复策略的建议(可选但强烈建议)",
"examples": ["Retry with exponential backoff", "Escalate to human operator",
"Skip this item and continue with next batch"]
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"schema_version": {
"type": "string",
"pattern": "^\\d+\\.\\d+\\.\\d+$"
}
}
}
使用场景:支付 Agent 调用外部支付网关超时(3 次重试后仍失败),发送 ErrorReport——error_code: "UPSTREAM_TIMEOUT" + recovery_hint: "escalate_to_human_operator"。Orchestrator 收到后触发人工介入流程。
常见反模式:错误报告里只有一个 error_message 字符串——"出错了"。没有 error_code 意味着下游无法做自动化路由(哪些错误可以重试?哪些需要升级?哪些可以直接忽略?),没有 recovery_hint 意味着每个错误都需要人来看。
3.6 消息类型的联合路由(Discriminated Union)
有了五种消息类型,你需要一个统一的消息信封来包装它们,让消息路由器能根据 message_type 自动分发到正确的处理逻辑:
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "AgentMessage",
"description": "统一消息信封——用 message_type 做 discriminated union",
"type": "object",
"required": ["message_id", "message_type", "payload", "schema_version"],
"properties": {
"message_id": {
"type": "string",
"description": "消息全局唯一 ID"
},
"message_type": {
"type": "string",
"enum": ["task_handoff", "tool_result", "approval_request",
"status_update", "error_report"],
"description": "消息类型标识——路由器用此字段分发"
},
"payload": {
"description": "具体消息载荷——类型由 message_type 决定",
"oneOf": [
{"$ref": "#/$defs/TaskHandoffPayload"},
{"$ref": "#/$defs/ToolResultPayload"},
{"$ref": "#/$defs/ApprovalRequestPayload"},
{"$ref": "#/$defs/StatusUpdatePayload"},
{"$ref": "#/$defs/ErrorReportPayload"}
]
},
"metadata": {
"type": "object",
"description": "元数据层字段——task_id, correlation_id 等"
},
"schema_version": {
"type": "string",
"pattern": "^\\d+\\.\\d+\\.\\d+$"
}
}
}
路由器的消费逻辑非常简单——不用 if isinstance(payload, TaskHandoff) 这种类型检查,而是用 message_type 字符串路由:
# 消息路由器——根据 message_type 分发
ROUTING_TABLE = {
"task_handoff": handle_task_handoff,
"tool_result": handle_tool_result,
"approval_request": handle_approval_request,
"status_update": handle_status_update,
"error_report": handle_error_report,
}
def dispatch(message: dict) -> None:
msg_type = message["message_type"]
handler = ROUTING_TABLE.get(msg_type)
if handler is None:
raise ValueError(f"Unknown message type: {msg_type}")
handler(message["payload"])
4. 版本管理与向前兼容:Schema 升级不中断运行中的 Agent
现实中 Schema 一定会变。你可能会加字段(task_spec 里多了一个 priority)、改字段名(receiver_agent_id 改成 target_agent_id 更顺口)、拆嵌套结构(原来 content 是一大坨 JSON,后来拆成 content + parameters + context)。
没有版本策略 = 一改就崩。Agent A 升级部署 v2.0,发出的消息多了三个字段。Agent B 还在跑老代码,解析到不认识的字段后直接抛异常——整个 Pipeline 停摆。
4.1 三种版本化策略
Agent 消息 Schema 的版本管理有三种主流策略,各有适用场景:
| 策略 | 做法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 追加式 (Additive) | 只新增可选字段,不改名不删字段。字段上标注 @deprecated。 |
100% 向前兼容,老 Agent 完全不受影响 | Schema 会膨胀,废弃字段积累 | 内部系统、Agent 数量 ≤5 |
| 迁移式 (Migratory) | 为新 Schema 分配新 MAJOR 版本,同时维护两个版本的解析器。老版本有明确的 EOL 日期。 | 可以彻底重新设计,不受历史包袱限制 | 需要同时维护两套代码,过渡期成本高 | 公开 API、跨团队 Agent 系统 |
| 双写式 (Dual-Write) | Agent A 同时输出 v1 和 v2 格式的消息,Agent B 逐步迁移到 v2。双写期间消息量翻倍。 | 零故障迁移,新旧 Agent 各自消费对应版本的消息 | 消息流量翻倍,存储成本增加 | 大规模系统、零停机要求的生产环境 |
推荐的起点:先用追加式。绝大多数 Agent 系统的 Schema 变更只是加字段,追加式完全够用。当系统增长到需要彻底重构 Schema 时,再切换为迁移式或双写式。
4.2 核心字段:schema_version
每条消息必须携带 schema_version 字段,使用语义化版本(MAJOR.MINOR.PATCH):
{
"message_type": "task_handoff",
"message_id": "msg-9a2b3c4d",
"schema_version": "2.1.3",
"payload": {
"task_id": "task-8842",
"from_agent": "order-intake-agent",
"to_agent": "risk-review-agent",
"task_spec": {
"action": "review_order",
"input": {"order_id": "ORD-1205"},
"priority": "high" # ← v2.1.0 新增的可选字段
}
}
}
schema_version 让下游 Agent 知道如何解析这条消息。没有这个字段,你只能靠"猜"——你知道收到的消息是 task_handoff 类型,但不知道它的 Schema 是 v1.0 还是 v2.1。
4.3 向前兼容规则
向前兼容(Forward Compatibility)指:用老版本 Schema 构建的消费者 Agent 能正确读取新版本 Schema 产生的消息。
| 变更类型 | 示例 | 版本号变更 | 向前兼容? |
|---|---|---|---|
| 新增可选字段 | task_spec 新增 priority(默认 "normal") | MINOR++ | ✅ 是——老 Agent 忽略不认识的可选字段 |
| 扩大枚举值 | priority 枚举增加 "critical" | MINOR++ | ✅ 是——老 Agent 可以把未知枚举值当 default 处理 |
| 新增必填字段 | task_spec 新增必填 trace_id | MAJOR++ | ❌ 否——老 Agent 无法解析缺少必填字段的消息 |
| 字段改名 | receiver_agent_id → target_agent_id | MAJOR++ | ❌ 否——老 Agent 找不到旧字段名 |
| 字段类型变更 | duration_ms 从 int 变 string | MAJOR++ | ❌ 否——类型不匹配导致解析失败 |
| 删除字段 | 删除 legacy_field | MAJOR++ | ❌ 否——如果有 Agent 依赖该字段会出错 |
核心规则很简单:只要不破坏老 Agent 对已有字段的解析语义,就是 MINOR 变更;一旦破坏,就是 MAJOR 变更。
4.4 弃用策略:@deprecated 标记 + 过渡期
当一个字段需要被移除时,不要直接从 Schema 中删掉——这会让所有还在使用它的 Agent 立即崩溃。正确的做法是分三步走:
| 阶段 | 时间 | 操作 | Agent 行为 |
|---|---|---|---|
| ① 标记弃用 | T+0 | 在 Schema 中标注 "deprecated": true,更新文档。新增替代字段,同时保留旧字段。Schema 版本:MINOR++。 | 所有 Agent 仍然使用旧字段。日志中打印弃用警告。 |
| ② 过渡期 | T+30 天 | 要求所有 Agent 迁移到新字段。在此期间旧字段仍然有效,但性能可能降低(双写)。 | 新 Agent 用新字段,老 Agent 继续用旧字段。 |
| ③ 移除 | T+60 天 | 从 Schema 中删除旧字段。 Schema 版本:MAJOR++。 | 只有已迁移到新版本 Schema 的 Agent 能继续工作。 |
弃用标记在 JSON Schema 中的写法:
{
"old_field_name": {
"type": "string",
"description": "【已弃用】请使用 new_field_name。将在 2026-08-01 移除。",
"deprecated": true,
"deprecation_message": "Use 'new_field_name' instead — this field will be removed in v3.0.0"
},
"new_field_name": {
"type": "string",
"description": "替代 old_field_name 的新字段"
}
}
4.5 消息兼容性检查器
下面是一个可运行的 Python 兼容性检查器——接收一条消息和消费者期望的 Schema 版本,返回是否兼容:
from packaging import version
from typing import Optional
def is_forward_compatible(
message_schema_version: str,
consumer_expected_version: str,
required_fields: Optional[set] = None
) -> tuple[bool, str]:
"""检查消息是否与消费者的期望版本向前兼容。
Args:
message_schema_version: 消息的 schema_version 字段值,如 "2.1.3"
consumer_expected_version: 消费者期望的 MAJOR 版本,如 "2.0.0"
required_fields: 消费者依赖的必填字段集合(可选)
Returns:
(is_compatible, reason) — 是否兼容 + 原因描述
"""
msg_ver = version.parse(message_schema_version)
exp_ver = version.parse(consumer_expected_version)
# 规则 1:MAJOR 版本不同 = 可能不兼容
if msg_ver.major != exp_ver.major:
return False, (
f"Major version mismatch: message v{msg_ver.major}.x "
f"vs consumer expected v{exp_ver.major}.x"
)
# 规则 2:MAJOR 相同,MINOR/PATCH 更大 → 向前兼容(追加式变更)
return True, (
f"Compatible: message v{message_schema_version} >= "
f"consumer expected v{consumer_expected_version} "
f"(same major, newer minor/patch)"
)
这个检查器的核心逻辑只有两行:MAJOR 相同就兼容(新增可选字段不影响老 Agent),MAJOR 不同就不兼容(可能改了必填字段或字段名)。
在生产环境中,你应该在消息路由器中集成这个检查器——每条消息在投递给消费者之前,先验证消费者的期望版本与消息版本是否兼容。不兼容的消息可以路由到死信队列(DLQ)而不是直接崩溃。
schema_version,让下游在做任何解析之前先做版本检查。这就像 TCP 握手——先确认双方"说同一种语言",再开始传输数据。
5. 让消息可验证:内容哈希、Agent 身份、完整性证明
在多 Agent Pipeline 中,有一个场景会让你抓狂:Agent B 收到了一个错误的结果。是 Agent A 发错了,还是中间某个环节(消息队列、代理层、序列化/反序列化过程)改了消息内容?没有内置于消息的验证机制,你根本回答不了这个问题——你只能靠"相信基础设施是可靠的"来安慰自己。
生产环境不能靠"相信"。你需要让每条消息自带完整性证明——下游 Agent 收到消息后,能独立验证"这条消息和上游发出来的时候一模一样"。
5.1 content_hash:内容指纹
content_hash 是数据层(Layer 1)的 SHA-256 哈希值。发送方 Agent 在构造消息时对 data 字段做哈希,将摘要写入验证层;接收方 Agent 收到消息后,对收到的 data 字段重新计算哈希,与验证层中的值比对——匹配则证明内容未被篡改。
# 发送方:计算并嵌入哈希
import hashlib, json
def sign_message(message: dict) -> dict:
"""为消息的数据层计算 content_hash 并写入验证层。"""
data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
message["verification"] = message.get("verification", {})
message["verification"]["content_hash"] = hashlib.sha256(data_bytes).hexdigest()
message["verification"]["content_hash_algorithm"] = "sha256"
return message
# 接收方:验证哈希
def verify_content_integrity(message: dict) -> bool:
"""重新计算数据层哈希并与消息中的 content_hash 比对。"""
if "verification" not in message or "content_hash" not in message["verification"]:
return False # 没有哈希值 = 无法验证
data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
computed = hashlib.sha256(data_bytes).hexdigest()
return computed == message["verification"]["content_hash"]
关键细节:哈希计算时必须用 sort_keys=True,因为 JSON 对象中键的顺序在不同序列化实现中可能不同——{"a":1,"b":2} 和 {"b":2,"a":1} 语义等价但字节不同,不排序会导致假阳性不匹配。
5.2 sender_agent_id + agent_version:身份溯源
哈希只能证明内容没被改,但不能证明"这条消息确实来自声称的那个 Agent"。元数据层中的 sender_agent_id 和 sender_agent_version 提供了身份溯源:
"metadata": {
"sender_agent_id": "risk-review-agent",
"sender_agent_version": "2.1.0",
"receiver_agent_id": "payment-agent",
"timestamp": "2024-12-05T14:23:11.482Z"
}
这两个字段在排障时价值巨大。当你要回答"是谁用哪个版本产出了这条消息?"时,不需要去日志系统里翻——答案就在消息本身。对于跨团队的多 Agent 系统,sender_agent_id 建议使用全局注册的标识符(如 team-risk.reviewer-v2),而不是本地代号。
5.3 signature:跨信任域验证
当消息跨越安全域边界时(例如从生产网络发到外部审计系统),仅靠 content_hash 不够——恶意中间人可以同时修改数据和哈希值。此时需要 HMAC 签名:
import hmac
def sign_with_hmac(message: dict, shared_secret: str) -> dict:
"""为消息添加 HMAC-SHA256 签名。"""
data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
msg_id = message["metadata"]["message_id"].encode("utf-8")
# 签名涵盖 message_id + 数据层,防止重放攻击
signature = hmac.new(
shared_secret.encode("utf-8"),
msg_id + data_bytes,
hashlib.sha256
).hexdigest()
message["verification"]["agent_signature"] = signature
return message
什么时候需要签名?同一主机内 Agent 通信:不需要签名(content_hash 足够);跨主机但同一信任域:content_hash + TLS 传输层加密即可;跨信任域(如 SaaS 平台中不同租户的 Agent):需要 HMAC 或非对称签名。
5.4 proof_chain:多跳追溯链
单跳场景下 content_hash 和 sender_agent_id 够了,但当一个任务经过 N 个 Agent 依次处理时,你需要知道整条链路——每个 Agent 做了什么、产出了什么。这就是 proof_chain 的用武之地。
proof_chain 是一个有序列表,记录消息经过的每一个 Agent:
"verification": {
"schema_version": "1.2.0",
"content_hash": "sha256:a1b2c3...",
"proof_chain": [
{
"agent_id": "research-agent",
"agent_version": "1.5.0",
"action": "initial_search",
"content_hash": "sha256:f1e2d3...",
"timestamp": "2024-12-05T14:20:00Z"
},
{
"agent_id": "writer-agent",
"agent_version": "3.2.1",
"action": "draft_section",
"content_hash": "sha256:a1b2c3...",
"timestamp": "2024-12-05T14:22:30Z"
}
]
}
每个 downstream Agent 在修改消息内容后,将自己的操作记录追加到 proof_chain 中:
def append_to_proof_chain(message: dict, agent_id: str,
agent_version: str, action: str) -> dict:
"""Agent 在修改消息后,将自己的操作追加到 proof_chain。"""
chain = message.setdefault("verification", {}).setdefault("proof_chain", [])
data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
chain.append({
"agent_id": agent_id,
"agent_version": agent_version,
"action": action,
"content_hash": hashlib.sha256(data_bytes).hexdigest(),
"timestamp": datetime.utcnow().isoformat() + "Z"
})
# 更新消息级 content_hash 为最新数据层哈希
message["verification"]["content_hash"] = chain[-1]["content_hash"]
return message
5.5 端到端验证示例
用一个三 Agent 链路来串联所有验证机制:Research Agent 搜索并产出原始资料 → Writer Agent 基于资料撰写初稿 → Reviewer Agent 审核内容。每一步都追加 proof_chain,任何一步的篡改都能被检测出来。
完整消息示例(经过两个 Agent 后的状态):
{
"message_id": "msg-8f3a1b2c",
"message_type": "task_handoff",
"schema_version": "1.2.0",
"data": {
"content": {
"topic": "Agent 消息 Schema 设计",
"draft": "多 Agent 系统中,消息是...",
"sources": ["https://example.com/a2a-spec", "https://example.com/mcp"]
},
"results": null,
"artifacts": []
},
"metadata": {
"task_id": "task-content-gen-001",
"correlation_id": "corr-sprint-42",
"sender_agent_id": "writer-agent",
"sender_agent_version": "3.2.1",
"receiver_agent_id": "reviewer-agent",
"timestamp": "2024-12-05T14:22:30Z"
},
"verification": {
"schema_version": "1.2.0",
"content_hash": "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"content_hash_algorithm": "sha256",
"proof_chain": [
{
"agent_id": "research-agent",
"agent_version": "1.5.0",
"action": "initial_search",
"content_hash": "sha256:d1d2d3...",
"timestamp": "2024-12-05T14:20:00Z"
},
{
"agent_id": "writer-agent",
"agent_version": "3.2.1",
"action": "draft_section",
"content_hash": "sha256:e3b0c4...",
"timestamp": "2024-12-05T14:22:30Z"
}
]
},
"routing": {
"priority": "normal",
"max_retries": 2
}
}
Reviewer Agent 收到这条消息后,可以执行完整的验证链:① 重新计算 data 的哈希 → 与 content_hash 比对;② 遍历 proof_chain,确认每个 Agent 的操作内容哈希与前一个 Agent 的产出一致;③ 检查 chain 中每个 Agent 的 timestamp 是否单调递增(防止时间戳回退攻击)。
def verify_message(message: dict) -> tuple[bool, str]:
"""对消息执行完整验证:内容完整性 + proof_chain 连续性。"""
# Step 1: 内容完整性
if not verify_content_integrity(message):
return False, "Content hash mismatch — data may have been tampered with"
# Step 2: proof_chain 连续性
chain = message.get("verification", {}).get("proof_chain", [])
for i in range(1, len(chain)):
prev_hash = chain[i-1]["content_hash"]
# 在实际系统中,你需要存储每个 Agent 操作前的数据哈希
# 此处简化为检查链中每个条目非空
if not chain[i].get("content_hash"):
return False, f"Proof chain entry {i} missing content_hash"
# Step 3: timestamp 单调性
for i in range(1, len(chain)):
if chain[i]["timestamp"] <= chain[i-1]["timestamp"]:
return False, f"Timestamp regression at entry {i}"
return True, "All verifications passed"
6. 实战:为三 Agent 系统设计完整消息 Schema
前面的理论都懂了——但能不能直接给我一个可运行的、端到端的三 Agent 系统 Schema 实现?能。下面以一个中文开发者友好的场景——内容审核链路——来展示从 Schema 定义到消息流转的完整过程。
6.1 场景:内容审核三 Agent 链路
假设你在做一个内容平台的内容审核系统,有三类消息经过三个 Agent:
| Agent | 职责 | 产出的消息类型 |
|---|---|---|
| ContentFetcher | 从数据库/API 抓取待审核内容 | FetchResult |
| ContentAuditor | 对内容进行审核(违规检测 + 置信度评分) | AuditResult |
| ReportAgent | 汇总审核结果,生成审核报告 | ReportMessage |
消息流向:ContentFetcher → ContentAuditor → ReportAgent。下面定义三个消息类型的完整 Schema 和 Pydantic 模型。
6.2 FetchResult — ContentFetcher 产出
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "FetchResult",
"type": "object",
"required": ["message_type", "trace_id", "items", "fetch_metadata"],
"properties": {
"message_type": {"const": "fetch_result"},
"trace_id": {
"type": "string",
"description": "贯穿整个审核链路的追踪 ID"
},
"items": {
"type": "array",
"items": {
"type": "object",
"required": ["content_id", "content_type", "raw_content"],
"properties": {
"content_id": {"type": "string"},
"content_type": {"type": "string", "enum": ["article", "comment", "image", "video"]},
"raw_content": {"type": "string"},
"author_id": {"type": "string"},
"created_at": {"type": "string", "format": "date-time"},
"source": {"type": "string", "description": "内容来源,如 'db:posts'、'api:comments'"}
}
}
},
"fetch_metadata": {
"type": "object",
"properties": {
"total_fetched": {"type": "integer"},
"fetch_duration_ms": {"type": "integer"},
"fetch_strategy": {"type": "string", "enum": ["batch", "stream", "on_demand"]}
}
}
}
}
6.3 AuditResult — ContentAuditor 产出
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "AuditResult",
"type": "object",
"required": ["message_type", "trace_id", "audit_results"],
"properties": {
"message_type": {"const": "audit_result"},
"trace_id": {
"type": "string",
"description": "与上游 FetchResult 相同的 trace_id"
},
"audit_results": {
"type": "array",
"items": {
"type": "object",
"required": ["content_id", "verdict", "confidence"],
"properties": {
"content_id": {"type": "string"},
"verdict": {
"type": "string",
"enum": ["safe", "unsafe", "needs_review", "error"]
},
"confidence": {"type": "number", "minimum": 0, "maximum": 1},
"violations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"rule_id": {"type": "string"},
"rule_name": {"type": "string"},
"severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
"matched_text": {"type": "string"},
"explanation": {"type": "string"}
}
}
},
"auditor_version": {"type": "string", "description": "审核模型的版本号"},
"audited_at": {"type": "string", "format": "date-time"}
}
}
},
"summary": {
"type": "object",
"properties": {
"total_audited": {"type": "integer"},
"safe_count": {"type": "integer"},
"unsafe_count": {"type": "integer"},
"needs_review_count": {"type": "integer"},
"audit_duration_ms": {"type": "integer"}
}
}
}
}
6.4 ReportMessage — ReportAgent 产出
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "ReportMessage",
"type": "object",
"required": ["message_type", "trace_id", "report"],
"properties": {
"message_type": {"const": "report_message"},
"trace_id": {
"type": "string",
"description": "与上游相同的 trace_id——全链路追踪"
},
"report": {
"type": "object",
"required": ["report_id", "generated_at", "overall_verdict", "details"],
"properties": {
"report_id": {"type": "string"},
"generated_at": {"type": "string", "format": "date-time"},
"overall_verdict": {
"type": "string",
"enum": ["all_safe", "has_violations", "requires_manual_review"]
},
"details": {
"type": "array",
"items": {
"type": "object",
"properties": {
"content_id": {"type": "string"},
"final_decision": {"type": "string", "enum": ["approved", "rejected", "flagged"]},
"action_taken": {"type": "string", "enum": ["publish", "hide", "delete", "warn_user"]},
"human_review_required": {"type": "boolean"},
"notes": {"type": "string"}
}
}
}
}
},
"pipeline_metadata": {
"type": "object",
"description": "整个审核链路的执行摘要",
"properties": {
"pipeline_started_at": {"type": "string", "format": "date-time"},
"pipeline_completed_at": {"type": "string", "format": "date-time"},
"total_duration_ms": {"type": "integer"},
"agents_involved": {
"type": "array",
"items": {"type": "string"}
}
}
}
}
}
6.5 Pydantic 模型实现(可直接运行)
from pydantic import BaseModel, Field
from typing import Optional, List
from enum import Enum
from datetime import datetime
# ── 枚举定义 ──
class ContentType(str, Enum):
ARTICLE = "article"
COMMENT = "comment"
IMAGE = "image"
VIDEO = "video"
class Verdict(str, Enum):
SAFE = "safe"
UNSAFE = "unsafe"
NEEDS_REVIEW = "needs_review"
ERROR = "error"
class FinalDecision(str, Enum):
APPROVED = "approved"
REJECTED = "rejected"
FLAGGED = "flagged"
# ── FetchResult:ContentFetcher 产出 ──
class FetchItem(BaseModel):
content_id: str
content_type: ContentType
raw_content: str
author_id: Optional[str] = None
created_at: Optional[datetime] = None
source: Optional[str] = None
class FetchMetadata(BaseModel):
total_fetched: int
fetch_duration_ms: int
fetch_strategy: str = "batch"
class FetchResult(BaseModel):
message_type: str = Field(default="fetch_result", frozen=True)
trace_id: str
items: List[FetchItem]
fetch_metadata: FetchMetadata
# ── AuditResult:ContentAuditor 产出 ──
class Violation(BaseModel):
rule_id: str
rule_name: str
severity: str # low, medium, high, critical
matched_text: Optional[str] = None
explanation: Optional[str] = None
class AuditItem(BaseModel):
content_id: str
verdict: Verdict
confidence: float = Field(ge=0, le=1)
violations: List[Violation] = []
auditor_version: Optional[str] = None
audited_at: Optional[datetime] = None
class AuditSummary(BaseModel):
total_audited: int
safe_count: int
unsafe_count: int
needs_review_count: int
audit_duration_ms: int
class AuditResult(BaseModel):
message_type: str = Field(default="audit_result", frozen=True)
trace_id: str
audit_results: List[AuditItem]
summary: Optional[AuditSummary] = None
# ── ReportMessage:ReportAgent 产出 ──
class ReportDetail(BaseModel):
content_id: str
final_decision: FinalDecision
action_taken: str
human_review_required: bool = False
notes: Optional[str] = None
class Report(BaseModel):
report_id: str
generated_at: datetime
overall_verdict: str
details: List[ReportDetail]
class PipelineMetadata(BaseModel):
pipeline_started_at: datetime
pipeline_completed_at: datetime
total_duration_ms: int
agents_involved: List[str]
class ReportMessage(BaseModel):
message_type: str = Field(default="report_message", frozen=True)
trace_id: str
report: Report
pipeline_metadata: Optional[PipelineMetadata] = None
6.6 trace_id 传播与 proof_chain 累积
三个 Agent 之间的消息流转遵循一个简单的约定:trace_id 从 FetchResult 一路传播到 ReportMessage,proof_chain 在每一步持续累积。
from uuid import uuid4
import hashlib, json
from datetime import datetime, timezone
# ── 消息序列化/反序列化工具 ──
def serialize_message(model: BaseModel) -> dict:
"""Pydantic 模型 → JSON 字典(用于传输)。"""
return json.loads(model.model_dump_json())
def deserialize_fetch_result(data: dict) -> FetchResult:
return FetchResult(**data)
def deserialize_audit_result(data: dict) -> AuditResult:
return AuditResult(**data)
def deserialize_report_message(data: dict) -> ReportMessage:
return ReportMessage(**data)
# ── 统一的 Agent 消息包装器 ──
def wrap_agent_message(payload: BaseModel, sender_id: str,
sender_version: str, receiver_id: str,
task_id: str, proof_chain: list = None) -> dict:
"""将业务载荷包装为带四层结构的 Agent 消息。"""
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
payload_dict = json.loads(payload.model_dump_json())
# 数据层
data_layer = {
"content": payload_dict,
"parameters": {},
"results": None,
"artifacts": []
}
# 计算 content_hash
data_bytes = json.dumps(data_layer, sort_keys=True).encode("utf-8")
content_hash = hashlib.sha256(data_bytes).hexdigest()
# 构建 proof_chain
chain = proof_chain or []
chain.append({
"agent_id": sender_id,
"agent_version": sender_version,
"action": payload.message_type,
"content_hash": f"sha256:{content_hash}",
"timestamp": now
})
return {
"message_id": f"msg-{uuid4().hex[:8]}",
"message_type": payload.message_type,
"schema_version": "1.0.0",
"data": data_layer,
"metadata": {
"task_id": task_id,
"sender_agent_id": sender_id,
"sender_agent_version": sender_version,
"receiver_agent_id": receiver_id,
"timestamp": now
},
"verification": {
"schema_version": "1.0.0",
"content_hash": f"sha256:{content_hash}",
"content_hash_algorithm": "sha256",
"proof_chain": chain
},
"routing": {
"priority": "normal",
"max_retries": 3
}
}
# ── 端到端示例 ──
def run_content_audit_pipeline():
"""从 ContentFetcher → ContentAuditor → ReportAgent 的完整演示。"""
task_id = f"task-{uuid4().hex[:8]}"
trace_id = f"trace-{uuid4().hex[:8]}"
chain = []
# Step 1: ContentFetcher 产出 FetchResult
fetch_result = FetchResult(
trace_id=trace_id,
items=[
FetchItem(content_id="c-001", content_type=ContentType.ARTICLE,
raw_content="这是一篇关于金融理财的文章...",
author_id="u_1001", source="db:posts"),
FetchItem(content_id="c-002", content_type=ContentType.COMMENT,
raw_content="这个产品真的很好用,推荐!",
author_id="u_2002", source="api:comments"),
],
fetch_metadata=FetchMetadata(total_fetched=2, fetch_duration_ms=45)
)
msg1 = wrap_agent_message(fetch_result, "content-fetcher", "1.0.0",
"content-auditor", task_id, chain)
chain = msg1["verification"]["proof_chain"] # 传递 proof_chain
print(f"[ContentFetcher] 产出 {len(fetch_result.items)} 条待审内容,trace_id={trace_id}")
print(f" proof_chain 长度: {len(chain)}")
# Step 2: ContentAuditor 消费 FetchResult,产出 AuditResult
audit_items = []
for item in fetch_result.items:
verdict = Verdict.SAFE if "推荐" in item.raw_content else Verdict.NEEDS_REVIEW
audit_items.append(AuditItem(
content_id=item.content_id,
verdict=verdict,
confidence=0.92,
violations=[] if verdict == Verdict.SAFE else [
Violation(rule_id="R-001", rule_name="敏感词检测",
severity="medium", matched_text="金融理财")
],
auditor_version="auditor-v3.1",
audited_at=datetime.now(timezone.utc)
))
audit_result = AuditResult(
trace_id=trace_id,
audit_results=audit_items,
summary=AuditSummary(
total_audited=2, safe_count=1, unsafe_count=0,
needs_review_count=1, audit_duration_ms=230
)
)
msg2 = wrap_agent_message(audit_result, "content-auditor", "3.1.0",
"report-agent", task_id, chain)
chain = msg2["verification"]["proof_chain"]
print(f"[ContentAuditor] 审核完成: 安全={audit_result.summary.safe_count}, "
f"需人工={audit_result.summary.needs_review_count}")
print(f" proof_chain 长度: {len(chain)}")
# Step 3: ReportAgent 消费 AuditResult,产出 ReportMessage
details = []
for item in audit_result.audit_results:
if item.verdict == Verdict.SAFE:
decision, action = FinalDecision.APPROVED, "publish"
elif item.verdict == Verdict.UNSAFE:
decision, action = FinalDecision.REJECTED, "delete"
else:
decision, action = FinalDecision.FLAGGED, "warn_user"
details.append(ReportDetail(
content_id=item.content_id, final_decision=decision,
action_taken=action,
human_review_required=(item.verdict == Verdict.NEEDS_REVIEW)
))
report_msg = ReportMessage(
trace_id=trace_id,
report=Report(
report_id=f"rpt-{uuid4().hex[:8]}",
generated_at=datetime.now(timezone.utc),
overall_verdict="requires_manual_review",
details=details
),
pipeline_metadata=PipelineMetadata(
pipeline_started_at=datetime.now(timezone.utc),
pipeline_completed_at=datetime.now(timezone.utc),
total_duration_ms=350,
agents_involved=["content-fetcher", "content-auditor", "report-agent"]
)
)
msg3 = wrap_agent_message(report_msg, "report-agent", "2.0.0",
"orchestrator", task_id, chain)
print(f"[ReportAgent] 报告生成: {report_msg.report.overall_verdict}")
print(f" proof_chain 最终长度: {len(msg3['verification']['proof_chain'])}")
return msg3
# 运行
if __name__ == "__main__":
final_message = run_content_audit_pipeline()
运行上面的代码,你会看到三个 Agent 依次产出消息,trace_id 贯穿始终,proof_chain 从 1 条记录增长到 3 条——任何一步的篡改都能被验证出来。
trace_id 由 ContentFetcher 生成,后续 Agent 通过解析上游消息的 trace_id 字段来衔接链路。proof_chain 通过 wrap_agent_message 函数的 proof_chain 参数在 Agent 之间传递和累积。这两个机制让分布式审计变得简单——你只需要拿到最后一条消息,就能还原整条链路。
常见问题(FAQ):消息 Schema 设计的 6 个高频问题
Q1: 消息 Schema 设计和 REST API 设计有什么本质区别?
REST API 是请求-响应模式,有明确的 Client/Server 角色——客户端发请求,服务端返回响应,一次交互结束。Agent 消息是多跳异步的——一条消息可能经过 3 个 Agent 的传递和修改,每个 Agent 既是消费者也是生产者。这带来两个本质差异:
- 溯源需求不同:Agent 消息需要内嵌溯源信息(proof_chain、sender_agent_id、agent_version),因为"谁在哪个环节改了什么"是排障的核心问题。REST API 的 trace 依赖外部系统(如 OpenTelemetry、分布式追踪),不要求消息体自带溯源。
- 版本耦合面不同:REST API 只有 Client/Server 两个版本点。Agent 消息的 N 个 Agent 有 N 个版本点——Schema 版本管理必须考虑"某个 Agent 升级后,下游 N-1 个 Agent 能否继续工作"。
Q2: 应该用 JSON 还是 Protobuf?
取决于部署场景:
- 内部 Agent 系统(同一进程/同一主机):用 JSON。可读性优先于性能——调试时需要直接看消息内容,不需要 protoc 解码。LLM 原生处理 JSON 也更自然。
- 跨主机/跨语言 Agent 系统:考虑 Protobuf + JSON 双序列化方案。内部 Agent 通信走 Proto(性能好、带宽省),API 边界/对外暴露时转 JSON(可读性好、工具链成熟)。
- 参考 Google A2A 协议:选择了 Proto-first + JSON 兼容的设计——核心传输用 Proto,同时提供规范的 JSON 映射规则,两种格式可以无损互转。
Q3: 消息 Schema 和 MCP 协议是什么关系?
两者是互补关系,不是替代关系:
- MCP(Model Context Protocol)定义了 Agent 与外部工具的通信格式——基于 JSON-RPC 2.0,关注工具发现、调用和结果返回。
- 本文的消息 Schema定义了 Agent 与Agent 之间的通信格式——关注任务交接、审批、状态同步和错误升级。
- MCP 的
ToolResult可以封装成本文定义的ToolResult消息类型的一个子集——MCP 管 Agent↔工具,本文管 Agent↔Agent。在实际系统中,两者需要协同工作:Agent A 通过 MCP 调用工具后,用本文的 Schema 把加工后的结果传递给 Agent B。
Q4: schema_version 从 1.0 升级到 2.0 时,正在运行的任务怎么处理?
三种策略,按推荐度排序:
- 追加式(推荐起步):新任务用 v2 Schema,老任务继续用 v1 跑完。无中断,但同一时期有两套消息格式在线上。
- 迁移式:给 v1 消息自动添加 v2 的默认字段——比如 v2 新增了
priority字段,迁移器给所有 v1 消息补上"priority": "normal"。成本低但需要维护迁移逻辑。 - 双写式(零停机):两个版本的 Handler 同时运行,新 Agent 消费 v2,老 Agent 消费 v1,逐步切流量。成本最高但最安全。
生产环境推荐策略 1 + 3 组合:先双写(新旧 Handler 共存),观察一段时间后,让老任务自然结束,新任务全走 v2。
Q5: 每条消息都需要 content_hash 和 signature 吗?
不是。按消息场景分级要求:
- 低风险场景(Agent 内部日志消息、心跳 ping):可以省略
content_hash——出错不影响业务,性能优先。 - 中风险场景(任务交接、审批请求、工具结果):必须包含
content_hash——这些消息承载业务决策,完整性必须可验证。 - 高风险场景(跨安全域的消息、金融交易指令):还需要
signature——哈希只能防无意的篡改,签名才能防有意的伪造。
建议在 Schema 中将 verification 字段设为可选,通过消息类型的 risk_level 决定是否强制要求。不要让低风险消息背着高风险的验证成本。
Q6: 如何避免 Schema 设计过度?
从最简 Schema 开始——只定义 data layer 的必填字段。用 YAGNI 原则(You Aren't Gonna Need It):
- 调试困难时 → 加
correlation_id - Agent 数量增长到 ≥3 时 → 加
sender_agent_id和agent_version - 安全审计需求出现时 → 加
content_hash和proof_chain - 跨主机通信时 → 加
signature
不要预先设计"可能有用"的字段。每加一个字段都有成本——序列化开销、维护负担、文档更新、下游 Agent 的适配。让真实的痛苦驱动 Schema 的演化,而不是提前的猜测。
继续阅读 / 下一步阅读
消息 Schema 是多 Agent 系统通信的骨架——它定义了"怎么传"。但要构建完整的 Agent 系统,你还需要理解"传什么"、"怎么编排"和"出错了怎么办"。以下文章构成 Agent 通信与协议的完整知识体系:
- Agent 上下文协议设计 — 本文的四层消息 Schema 是上下文协议中"消息总线"层的核心规范。上下文协议定义了"传什么",消息 Schema 定义了"怎么传"。
- Agent 审计日志设计 — 审计日志中的 Pydantic 事件 Schema 就是消息 Schema 设计的一个完整案例——5 种事件类型、条件验证、版本管理。
- 多 Agent 编排 — 编排的黄金法则是 Agent 之间用结构化数据(JSON)通信——本文提供了结构化数据的具体 Schema 设计方法。
- MCP 协议生产实践 — MCP 的 JSON-RPC 2.0 线格式与本文的消息 Schema 形成互补:MCP 管 Agent↔工具,本文管 Agent↔Agent。
- Agent 工具设计 — 工具的参数命名和结构化输出是消息 Schema 设计的基础——好的工具接口定义是好的消息 Schema 的前提。
- Agent 错误恢复 — 结构化错误消息格式是消息 Schema 中 ErrorReport 类型的实践延伸。
如果你还没有读过任何 Agent 工程文章,建议从 什么是 AI Agent 开始。