Agent 消息 Schema 设计:让多 Agent 协作可验证、可追踪

30秒要点

  • 核心问题:多 Agent 系统中,字段名不匹配会导致静默失败、Pipeline 断链无法定位、Schema 升级崩掉下游——因为没有显式的消息契约。
  • 解决方案:四层消息 Schema 模型——数据层承载实际载荷、元数据层负责路由与身份、验证层提供完整性证明、路由层控制投递语义——每层独立演进、互不耦合。
  • 关键产出:五类核心消息类型(TaskHandoff、ToolResult、ApprovalRequest、StatusUpdate、ErrorReport)的完整 JSON Schema 定义 + discriminated union 路由,可直接复制到项目中使用。
  • 读完能做什么:为你的多 Agent 系统设计一套可版本化、可验证、可演化且不绑定任何框架的消息 Schema,从根本上消灭追踪断链和版本不兼容。

1. 为什么多 Agent 协作需要消息 Schema?——从追踪断链说起

先看一段真实发生过的事故。一个电商客服系统有三个 Agent 协作:订单查询 Agent 接收用户查询请求 → 库存检查 Agent 验证商品可用性 → 通知 Agent 将结果推送给用户。系统上线第一周就跑得很顺畅——直到有一天,仓库盘点导致某个 SKU 库存归零,但用户还是收到了"商品有货"的通知。

排查过程花了整整 4 个小时。最终发现问题的根因:

# Agent A(订单查询 Agent)的输出
{
  "task_id": "ord-2024-0812",
  "result": "pass",              # ← 字段名: result
  "inventory": {"sku": "A001", "available": 0}
}

# Agent B(库存检查 Agent)的解析逻辑
def handle_order_query(msg):
    status = msg.get("output")   # ← 期望字段: output
    if status == "pass":         # status 是 None,跳过所有判断
        notify_user("商品已确认,预计3日发货")
    # 没有任何 else 分支,静默失败

Agent A 用 result 字段传递结果,Agent B 期望的是 output。因为消息体里完全没有 Schema 定义,两个 Agent 之间形成了一种 隐式消息约定(implicit contract)——它只存在于开发者的脑子里,不存在于任何可被验证的结构中。当约定不一致时,系统不会报错、不会告警、不会回滚——它只是静默地做错了事

这不仅仅是字段名不匹配的问题。在多 Agent Pipeline 中,隐式消息约定有三个系统性的失败模式:

① 静默吞错(Silent Failure):Agent B 解析 Agent A 的消息时,字段名对不上、类型不符合、嵌套结构不匹配——但代码里没有验证逻辑(或者验证逻辑太弱),导致错误数据一路传播到 Pipeline 末端。你看到的最终输出可能是错的,但无法回溯到哪个环节引入的。

② 追踪断链(Traceability Break):5 个 Agent 协作完成一个任务,中间某个 Agent 产出了错误格式的消息。因为每个 Agent 都是独立部署、独立迭代的,你不知道哪个 Agent 的哪个版本产出了这份消息。没有 sender_agent_id、没有 agent_version、没有 schema_version——追踪链在第一个 Agent 之后就断了。

③ 版本耦合(Version Coupling):你对 Agent A 加了三个新字段,升级部署。Agent B 还在跑老版本,解析到不认识的字段时崩溃了。你发现 Agent A 和 Agent B 之间没有任何兼容性约定——加字段 = 崩下游。

这些问题有一个共同的根因:Agent 之间的消息格式被当成了"JSON 随便传"的实现细节,而不是一个需要显式设计的架构契约

显式消息 Schema 提供三个核心价值:

① 合同性(Contract):Schema 是 Agent A 和 Agent B 之间的正式契约——"我承诺输出这些字段,你可以依赖这些字段存在"。Schema 可以被自动验证、可以被 CI 检查、可以被文档化。它把"开发者脑子里的约定"变成了"机器可执行的规则"。

② 可追踪性(Traceability):通过在消息体中嵌入 sender_agent_idagent_versionschema_versioncontent_hash,任何一条消息都可以被独立验证——"这条消息是哪个 Agent 的哪个版本产出的?和上游发来的内容一致吗?"

③ 可演化性(Evolvability):有了显式的 schema_version 字段和兼容性规则,Agent A 升级 Schema 时,Agent B 可以根据版本号决定如何解析——"v1.2.0 的消息我能读,1.0 的字段我都认识,新增的可选字段我忽略"。

做一个对比:传统微服务有 OpenAPI / gRPC proto 作为服务间通信的契约——定义字段、类型、必选/可选、版本。开发者不会把两个没有 API 文档的微服务直接对接。但在 Agent 系统中,绝大多数开发者就是用 {"key": "value"} 裸传 JSON,没有 Schema、没有验证、没有版本管理。

📌 核心认知:多 Agent 系统不是"一个程序里调几个函数"——它是多个独立部署、独立迭代的智能体之间的分布式消息系统。没有 Schema,就没有可靠性。参考多 Agent 编排的黄金法则:Agent 之间用结构化数据(JSON)通信,不用自然语言——但这只是第一步。结构化数据需要一个 Schema 来定义结构本身。

2. Schema 设计的四层模型:数据、元数据、验证、路由

当你决定"要给 Agent 消息设计 Schema"时,第一个问题就是:消息体里到底应该包含哪些字段?

开发者的典型反应是两个极端——要么只放一个 content 字段,把所有东西序列化成字符串塞进去(设计不足);要么把能想到的所有字段全加上——trace_idspan_idparent_span_idtimestampcreated_atupdated_atversionapi_version……结果一个最简单的任务交接消息变成了 40 个字段(设计过度)。

这两种做法都不对。Agent 消息 Schema 不是一张扁平的大表——它的设计应该分层,每一层解决不同维度的关注点。经过对现有多 Agent 框架(Google A2A、OpenAI Agents SDK、CrewAI、AutoGen)消息格式的逆向分析,我们提炼出一个四层模型

层(Layer)职责关键字段何时需要
数据层 (Data Layer)承载实际业务载荷——"做什么事、给什么参数、出什么结果"content, parameters, results, artifacts所有消息都必须有
元数据层 (Metadata Layer)标识消息身份——"谁发的、发给谁、什么时候、属于哪个任务"task_id, sender_agent_id, receiver_agent_id, timestamp, correlation_id≥2 个 Agent 协作时必须
验证层 (Verification Layer)提供完整性证明——"消息内容有没有被篡改?和上游发的一致吗?"content_hash, schema_version, agent_signature生产环境跨进程/跨主机通信时需要
路由层 (Routing Layer)控制投递语义——"消息发几次?多久超时?失败了怎么办?"priority, ttl, max_retries, idempotency_key, reply_to异步消息队列 / 事件驱动架构时需要

用一个完整的电商场景来串起四层:用户下单→风控审核→支付→通知。每一步由一个独立的 Agent 处理,消息在它们之间传递:

Layer 1 — 数据层:承载业务载荷

这是消息的"what"——这条消息到底在传递什么业务内容。数据层存在于每条消息中,不区分场景。

{
  "data": {
    "content": {
      "action": "create_order",
      "order_id": "ORD-2024-1205",
      "product": {"sku": "PHONE-X1", "quantity": 1, "unit_price": 4999.00},
      "customer": {"user_id": "u_8842", "vip_level": "gold"},
      "total_amount": 4999.00
    },
    "parameters": {
      "payment_method": "wechat_pay",
      "delivery_address_id": "addr_7721"
    },
    "results": null,
    "artifacts": []
  }
}

数据层只描述业务语义,不包含任何基础设施关注点。字段的设计遵循一个简单原则:content 是这次交互的核心内容,parameters 是附加约束,results 是执行后的产出(本轮消息可能为空),artifacts 是二进制/大文件的引用而非内容本身(ObjRef)。

Layer 2 — 元数据层:标识消息身份

这是消息的"who、when、where"——只要系统里有 ≥2 个 Agent,元数据层就不可省略。没有它,你无法在日志/追踪系统里把消息和任务关联起来。

{
  "metadata": {
    "message_id": "msg-8f3a1b2c",
    "task_id": "task-2024-1205-001",
    "correlation_id": "corr-9b2c3d4e",
    "sender_agent_id": "order-intake-agent",
    "sender_agent_version": "2.1.0",
    "receiver_agent_id": "risk-review-agent",
    "timestamp": "2024-12-05T14:23:11.482Z",
    "ttl_seconds": 300
  }
}

关键设计决策:message_idtask_id 是两层标识——message_id 标识这一条消息本身(唯一,用于去重),task_id 标识这个任务的全生命周期(一个任务可能产生 N 条消息)。correlation_id 是可选字段,用于关联多个任务构成的工作流(比如一个"下单任务"属于某个"用户会话")。

Layer 3 — 验证层:内容完整性证明

这是消息的"proof"——当 Agent 跨越进程边界(尤其是通过消息队列或 HTTP 在不同主机间通信)时,你需要在每条消息中携带完整性证明。

{
  "verification": {
    "schema_version": "1.2.0",
    "content_hash": "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
    "content_hash_algorithm": "sha256",
    "agent_signature": null
  }
}

content_hash 是数据层的 SHA-256 哈希——下游 Agent 收到消息后可以重新计算数据层的哈希并与这个值比对,确认"上游发来的内容在传输过程中没有被修改"。schema_version 让下游知道"这条消息用的是哪个版本的 Schema",从而选择正确的解析策略。agent_signature 是可选的数字签名,用于跨信任域的身份认证(生产环境中强烈建议开启)。

Layer 4 — 路由层:投递语义控制

这是消息的"how to deliver"——当消息不是同步调用而是通过消息队列异步传递时,你需要控制投递行为。

{
  "routing": {
    "priority": "high",
    "max_retries": 3,
    "retry_backoff": "exponential",
    "idempotency_key": "idem-msg-8f3a1b2c",
    "reply_to": "queue:order-intake-responses",
    "dead_letter_queue": "queue:dlq-orders"
  }
}

idempotency_key 是路由层最重要的字段——消息队列可能因网络抖动而重复投递同一条消息,接收方用 key 做去重,保证"同一条消息最多处理一次"。reply_to 实现请求-响应模式:发送方指定回复应该投递到哪个队列。

什么时候用几层?

并非所有消息都需要四层全开。选择多少层取决于你的部署架构:

这四层的核心设计原则是:上层不依赖下层,下层只提供服务。你可以只改路由层的重试策略,完全不影响数据层和元数据层。这种解耦让每个层级可以独立演进。

📌 对比上下文协议:本文的四层消息 Schema 模型和Agent 上下文协议设计的四层架构(消息总线→工具上下文→记忆上下文→任务上下文)是互补关系——上下文协议解决"Agent 内部组件间如何传递状态",消息 Schema 解决"Agent 之间如何传递消息"。两者都采用四层分层思想,但作用域不同。

3. 五类核心消息类型:任务交接、工具结果、审批请求、状态更新、错误报告

有了四层模型作为"设计框架",下一步是定义具体的消息类型。很多多 Agent 系统犯的一个常见错误是:所有消息共用 {"type": "message", "data": {...}} 这一个结构——类型字段太弱,无法区分任务交接和工具结果、无法做类型安全的路由、下游 Agent 永远在用 if "field" in msg 的方式猜测消息结构。

一个好的 Agent 消息系统需要明确的消息类型分类,每种类型有自己独立的 Schema。经过对多 Agent 系统交互模式的归纳,Agent 之间通信不外乎五类核心消息:

消息类型场景方向message_type 值
TaskHandoffAgent A 将任务移交给 Agent B单向(A → B)task_handoff
ToolResultAgent A 收到工具执行结果应答(Tool → A)tool_result
ApprovalRequestAgent A 请求人工或上级 Agent 审批请求(A → Human/B)approval_request
StatusUpdateAgent A 汇报任务进度给协调 Agent推送(A → Orchestrator)status_update
ErrorReportAgent A 报告自身或下游的错误广播(A → Monitor/Orchestrator)error_report

下面逐个给出每种类型的完整 JSON Schema 定义。每个 Schema 都包含必填字段、可选字段、使用场景和反模式——你可以直接在项目中使用或按需裁剪。

3.1 TaskHandoff — 任务交接

定义:Agent A 将一个任务正式转交给 Agent B,同时传递任务规格和上下文。这是多 Agent 系统中最核心的消息类型。

必填字段:task_id | from_agent | to_agent | task_spec

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "TaskHandoff",
  "type": "object",
  "required": ["task_id", "from_agent", "to_agent", "task_spec", "schema_version"],
  "properties": {
    "message_type": {
      "const": "task_handoff"
    },
    "task_id": {
      "type": "string",
      "description": "任务全局唯一标识,贯穿整个任务生命周期"
    },
    "from_agent": {
      "type": "string",
      "description": "发送方 Agent 标识"
    },
    "to_agent": {
      "type": "string",
      "description": "接收方 Agent 标识"
    },
    "task_spec": {
      "type": "object",
      "required": ["action", "input"],
      "properties": {
        "action": {
          "type": "string",
          "description": "要执行的操作名称,例如 'review_order'"
        },
        "input": {
          "type": "object",
          "description": "操作所需的输入参数"
        },
        "context": {
          "type": "object",
          "description": "从前序任务中继承的上下文(可选)"
        },
        "constraints": {
          "type": "object",
          "properties": {
            "max_duration_seconds": {"type": "integer"},
            "required_confidence": {"type": "number", "minimum": 0, "maximum": 1}
          }
        }
      }
    },
    "priority": {
      "type": "string",
      "enum": ["low", "normal", "high", "critical"],
      "default": "normal"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

使用场景:电商风控审核 Agent 完成风险评估后,将审核结果移交给支付处理 Agent——携带 action: "process_payment" + 订单信息。

常见反模式:from_agentto_agent 写成硬编码的字符串。正确做法是从 Agent 注册表中获取标识符。

3.2 ToolResult — 工具结果

定义:Agent 调用外部工具(API、数据库、文件系统)后,工具返回的结果。这是 Agent→工具→Agent 闭环中的"返回链路"。

必填字段:tool_call_id | tool_name | output

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "ToolResult",
  "type": "object",
  "required": ["tool_call_id", "tool_name", "output", "schema_version"],
  "properties": {
    "message_type": {
      "const": "tool_result"
    },
    "tool_call_id": {
      "type": "string",
      "description": "对应的工具调用 ID,用于匹配请求和响应"
    },
    "tool_name": {
      "type": "string",
      "description": "被调用的工具名称,例如 'check_inventory'"
    },
    "output": {
      "oneOf": [
        {"type": "object"},
        {"type": "array"},
        {"type": "string"},
        {"type": "number"},
        {"type": "boolean"},
        {"type": "null"}
      ],
      "description": "工具执行结果——可以是任意 JSON 类型"
    },
    "error": {
      "type": "object",
      "description": "工具执行出错时的错误信息(可选,有错时必填)",
      "properties": {
        "code": {"type": "string"},
        "message": {"type": "string"},
        "details": {"type": "object"}
      }
    },
    "duration_ms": {
      "type": "integer",
      "description": "工具执行耗时,单位毫秒"
    },
    "is_truncated": {
      "type": "boolean",
      "default": false,
      "description": "输出是否因超出大小限制而被截断"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

使用场景:库存检查 Agent 调用 check_inventory 工具查询 SKU 库存后,工具返回 {"sku": "A001", "stock": 0, "warehouse": "SH-1"}

常见反模式:把 100KB 的完整 API 响应原样塞进 output 字段——不只会撑爆 LLM 上下文窗口,还会泄露敏感信息(token、密钥)。正确做法是在工具上下文层做修剪和脱敏后再放进 output

3.3 ApprovalRequest — 审批请求

定义:Agent 在执行高风险操作(退款、删除数据、修改生产配置)之前,向人类或上级 Agent 发起的审批请求。

必填字段:request_id | action | resource | reason

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "ApprovalRequest",
  "type": "object",
  "required": ["request_id", "action", "resource", "reason", "schema_version"],
  "properties": {
    "message_type": {
      "const": "approval_request"
    },
    "request_id": {
      "type": "string",
      "description": "审批请求唯一标识"
    },
    "action": {
      "type": "string",
      "description": "请求执行的操作,例如 'refund'、'delete_user'"
    },
    "resource": {
      "type": "object",
      "description": "操作涉及的目标资源",
      "properties": {
        "type": {"type": "string"},
        "id": {"type": "string"},
        "summary": {"type": "string"}
      }
    },
    "reason": {
      "type": "string",
      "description": "Agent 发起此操作的推理过程摘要——审批者需要理解 Agent 为什么做这个决定"
    },
    "context": {
      "type": "object",
      "description": "辅助审批者决策的上下文信息(可选)"
    },
    "timeout_seconds": {
      "type": "integer",
      "default": 3600,
      "description": "审批请求的超时时间,超时后自动拒绝"
    },
    "risk_level": {
      "type": "string",
      "enum": ["low", "medium", "high"],
      "description": "操作风险评估"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

使用场景:退款处理 Agent 检测到退款金额超过 ¥5000,触发人工审批——action: "refund" + resource: {order_id: "ORD-8842", amount: 7200.00}

常见反模式:审批请求只传操作和资源,不传 reasoncontext。审批者需要在另一个系统里查半天才能理解 Agent 为什么做了这个决定——等于把推理成本转嫁给了人。

3.4 StatusUpdate — 状态更新

定义:Agent 在执行过程中主动向协调者(Orchestrator)或监控系统汇报自身状态和进度。

必填字段:task_id | new_status | timestamp

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "StatusUpdate",
  "type": "object",
  "required": ["task_id", "new_status", "timestamp", "schema_version"],
  "properties": {
    "message_type": {
      "const": "status_update"
    },
    "task_id": {
      "type": "string",
      "description": "关联的任务 ID"
    },
    "new_status": {
      "type": "string",
      "enum": ["pending", "running", "waiting_for_approval", "waiting_for_tool",
               "completed", "failed", "cancelled", "timed_out"],
      "description": "任务新状态"
    },
    "previous_status": {
      "type": "string",
      "description": "任务前一个状态(可选,用于状态机校验)"
    },
    "progress_pct": {
      "type": "integer",
      "minimum": 0,
      "maximum": 100,
      "description": "任务完成百分比"
    },
    "message": {
      "type": "string",
      "description": "人类可读的状态描述"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "next_expected_status": {
      "type": "string",
      "description": "预计下一个状态(可选,用于预判)"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

使用场景:订单处理 Orchestrator 追踪三个子 Agent 的进度,每个子 Agent 在关键步骤完成后发送 StatusUpdate——Orchestrator 汇总展示"订单 8842:风控审核✅ → 支付处理⏳ 85% → 通知待发送"。

常见反模式:new_status 设计成自由字符串(如 "almost done""处理中...")。没有枚举约束的状态字段让下游无法做任何自动化处理——正确的做法是用枚举值 + 可选的 message 字段承载人类可读信息。

3.5 ErrorReport — 错误报告

定义:Agent 在执行过程中遇到无法自行恢复的错误时,向监控系统或协调者发出的错误报告。

必填字段:error_code | error_message | source_agent | timestamp

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "ErrorReport",
  "type": "object",
  "required": ["error_code", "error_message", "source_agent", "timestamp", "schema_version"],
  "properties": {
    "message_type": {
      "const": "error_report"
    },
    "error_code": {
      "type": "string",
      "description": "机器可读的错误码,例如 'TOOL_TIMEOUT'、'SCHEMA_VALIDATION_FAILED'",
      "examples": ["TOOL_TIMEOUT", "UPSTREAM_UNREACHABLE", "SCHEMA_VALIDATION_FAILED",
                   "LLM_RATE_LIMITED", "INVALID_TASK_SPEC"]
    },
    "error_message": {
      "type": "string",
      "description": "人类可读的错误描述"
    },
    "severity": {
      "type": "string",
      "enum": ["warning", "error", "critical"],
      "default": "error"
    },
    "source_agent": {
      "type": "string",
      "description": "产生错误的 Agent 标识"
    },
    "source_task_id": {
      "type": "string",
      "description": "发生错误时正在执行的任务 ID"
    },
    "stack_trace": {
      "type": "string",
      "description": "技术栈追踪(可选,仅调试模式开启)"
    },
    "recovery_hint": {
      "type": "string",
      "description": "Agent 对恢复策略的建议(可选但强烈建议)",
      "examples": ["Retry with exponential backoff", "Escalate to human operator",
                   "Skip this item and continue with next batch"]
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

使用场景:支付 Agent 调用外部支付网关超时(3 次重试后仍失败),发送 ErrorReport——error_code: "UPSTREAM_TIMEOUT" + recovery_hint: "escalate_to_human_operator"。Orchestrator 收到后触发人工介入流程。

常见反模式:错误报告里只有一个 error_message 字符串——"出错了"。没有 error_code 意味着下游无法做自动化路由(哪些错误可以重试?哪些需要升级?哪些可以直接忽略?),没有 recovery_hint 意味着每个错误都需要人来看。

3.6 消息类型的联合路由(Discriminated Union)

有了五种消息类型,你需要一个统一的消息信封来包装它们,让消息路由器能根据 message_type 自动分发到正确的处理逻辑:

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "AgentMessage",
  "description": "统一消息信封——用 message_type 做 discriminated union",
  "type": "object",
  "required": ["message_id", "message_type", "payload", "schema_version"],
  "properties": {
    "message_id": {
      "type": "string",
      "description": "消息全局唯一 ID"
    },
    "message_type": {
      "type": "string",
      "enum": ["task_handoff", "tool_result", "approval_request",
               "status_update", "error_report"],
      "description": "消息类型标识——路由器用此字段分发"
    },
    "payload": {
      "description": "具体消息载荷——类型由 message_type 决定",
      "oneOf": [
        {"$ref": "#/$defs/TaskHandoffPayload"},
        {"$ref": "#/$defs/ToolResultPayload"},
        {"$ref": "#/$defs/ApprovalRequestPayload"},
        {"$ref": "#/$defs/StatusUpdatePayload"},
        {"$ref": "#/$defs/ErrorReportPayload"}
      ]
    },
    "metadata": {
      "type": "object",
      "description": "元数据层字段——task_id, correlation_id 等"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

路由器的消费逻辑非常简单——不用 if isinstance(payload, TaskHandoff) 这种类型检查,而是用 message_type 字符串路由:

# 消息路由器——根据 message_type 分发
ROUTING_TABLE = {
    "task_handoff": handle_task_handoff,
    "tool_result": handle_tool_result,
    "approval_request": handle_approval_request,
    "status_update": handle_status_update,
    "error_report": handle_error_report,
}

def dispatch(message: dict) -> None:
    msg_type = message["message_type"]
    handler = ROUTING_TABLE.get(msg_type)
    if handler is None:
        raise ValueError(f"Unknown message type: {msg_type}")
    handler(message["payload"])
📌 不是所有系统都需要全部五类:如果你的系统只有 2 个 Agent 做简单的工具调用,可能只需要 TaskHandoff 和 ToolResult。随着系统复杂度的增长,再逐步引入 ApprovalRequest(人工审批)、StatusUpdate(进度追踪)和 ErrorReport(错误升级)。Schema 应该适配系统复杂度,而不是反过来。

4. 版本管理与向前兼容:Schema 升级不中断运行中的 Agent

现实中 Schema 一定会变。你可能会加字段(task_spec 里多了一个 priority)、改字段名(receiver_agent_id 改成 target_agent_id 更顺口)、拆嵌套结构(原来 content 是一大坨 JSON,后来拆成 content + parameters + context)。

没有版本策略 = 一改就崩。Agent A 升级部署 v2.0,发出的消息多了三个字段。Agent B 还在跑老代码,解析到不认识的字段后直接抛异常——整个 Pipeline 停摆。

4.1 三种版本化策略

Agent 消息 Schema 的版本管理有三种主流策略,各有适用场景:

策略做法优点缺点适用场景
追加式 (Additive) 只新增可选字段,不改名不删字段。字段上标注 @deprecated 100% 向前兼容,老 Agent 完全不受影响 Schema 会膨胀,废弃字段积累 内部系统、Agent 数量 ≤5
迁移式 (Migratory) 为新 Schema 分配新 MAJOR 版本,同时维护两个版本的解析器。老版本有明确的 EOL 日期。 可以彻底重新设计,不受历史包袱限制 需要同时维护两套代码,过渡期成本高 公开 API、跨团队 Agent 系统
双写式 (Dual-Write) Agent A 同时输出 v1 和 v2 格式的消息,Agent B 逐步迁移到 v2。双写期间消息量翻倍。 零故障迁移,新旧 Agent 各自消费对应版本的消息 消息流量翻倍,存储成本增加 大规模系统、零停机要求的生产环境

推荐的起点:先用追加式。绝大多数 Agent 系统的 Schema 变更只是加字段,追加式完全够用。当系统增长到需要彻底重构 Schema 时,再切换为迁移式或双写式。

4.2 核心字段:schema_version

每条消息必须携带 schema_version 字段,使用语义化版本(MAJOR.MINOR.PATCH):

{
  "message_type": "task_handoff",
  "message_id": "msg-9a2b3c4d",
  "schema_version": "2.1.3",
  "payload": {
    "task_id": "task-8842",
    "from_agent": "order-intake-agent",
    "to_agent": "risk-review-agent",
    "task_spec": {
      "action": "review_order",
      "input": {"order_id": "ORD-1205"},
      "priority": "high"           # ← v2.1.0 新增的可选字段
    }
  }
}

schema_version 让下游 Agent 知道如何解析这条消息。没有这个字段,你只能靠"猜"——你知道收到的消息是 task_handoff 类型,但不知道它的 Schema 是 v1.0 还是 v2.1。

4.3 向前兼容规则

向前兼容(Forward Compatibility)指:用老版本 Schema 构建的消费者 Agent 能正确读取新版本 Schema 产生的消息。

变更类型示例版本号变更向前兼容?
新增可选字段task_spec 新增 priority(默认 "normal")MINOR++✅ 是——老 Agent 忽略不认识的可选字段
扩大枚举值priority 枚举增加 "critical"MINOR++✅ 是——老 Agent 可以把未知枚举值当 default 处理
新增必填字段task_spec 新增必填 trace_idMAJOR++❌ 否——老 Agent 无法解析缺少必填字段的消息
字段改名receiver_agent_idtarget_agent_idMAJOR++❌ 否——老 Agent 找不到旧字段名
字段类型变更duration_ms 从 int 变 stringMAJOR++❌ 否——类型不匹配导致解析失败
删除字段删除 legacy_fieldMAJOR++❌ 否——如果有 Agent 依赖该字段会出错

核心规则很简单:只要不破坏老 Agent 对已有字段的解析语义,就是 MINOR 变更;一旦破坏,就是 MAJOR 变更。

4.4 弃用策略:@deprecated 标记 + 过渡期

当一个字段需要被移除时,不要直接从 Schema 中删掉——这会让所有还在使用它的 Agent 立即崩溃。正确的做法是分三步走:

阶段时间操作Agent 行为
① 标记弃用T+0在 Schema 中标注 "deprecated": true,更新文档。新增替代字段,同时保留旧字段。
Schema 版本:MINOR++。
所有 Agent 仍然使用旧字段。日志中打印弃用警告。
② 过渡期T+30 天要求所有 Agent 迁移到新字段。在此期间旧字段仍然有效,但性能可能降低(双写)。新 Agent 用新字段,老 Agent 继续用旧字段。
③ 移除T+60 天从 Schema 中删除旧字段。
Schema 版本:MAJOR++。
只有已迁移到新版本 Schema 的 Agent 能继续工作。

弃用标记在 JSON Schema 中的写法:

{
  "old_field_name": {
    "type": "string",
    "description": "【已弃用】请使用 new_field_name。将在 2026-08-01 移除。",
    "deprecated": true,
    "deprecation_message": "Use 'new_field_name' instead — this field will be removed in v3.0.0"
  },
  "new_field_name": {
    "type": "string",
    "description": "替代 old_field_name 的新字段"
  }
}

4.5 消息兼容性检查器

下面是一个可运行的 Python 兼容性检查器——接收一条消息和消费者期望的 Schema 版本,返回是否兼容:

from packaging import version
from typing import Optional

def is_forward_compatible(
    message_schema_version: str,
    consumer_expected_version: str,
    required_fields: Optional[set] = None
) -> tuple[bool, str]:
    """检查消息是否与消费者的期望版本向前兼容。

    Args:
        message_schema_version: 消息的 schema_version 字段值,如 "2.1.3"
        consumer_expected_version: 消费者期望的 MAJOR 版本,如 "2.0.0"
        required_fields: 消费者依赖的必填字段集合(可选)

    Returns:
        (is_compatible, reason) — 是否兼容 + 原因描述
    """
    msg_ver = version.parse(message_schema_version)
    exp_ver = version.parse(consumer_expected_version)

    # 规则 1:MAJOR 版本不同 = 可能不兼容
    if msg_ver.major != exp_ver.major:
        return False, (
            f"Major version mismatch: message v{msg_ver.major}.x "
            f"vs consumer expected v{exp_ver.major}.x"
        )

    # 规则 2:MAJOR 相同,MINOR/PATCH 更大 → 向前兼容(追加式变更)
    return True, (
        f"Compatible: message v{message_schema_version} >= "
        f"consumer expected v{consumer_expected_version} "
        f"(same major, newer minor/patch)"
    )

这个检查器的核心逻辑只有两行:MAJOR 相同就兼容(新增可选字段不影响老 Agent),MAJOR 不同就不兼容(可能改了必填字段或字段名)。

在生产环境中,你应该在消息路由器中集成这个检查器——每条消息在投递给消费者之前,先验证消费者的期望版本与消息版本是否兼容。不兼容的消息可以路由到死信队列(DLQ)而不是直接崩溃。

📌 版本管理的黄金法则:不要让消费者(下游 Agent)去"猜"消息的 Schema 版本。每条消息明确携带 schema_version,让下游在做任何解析之前先做版本检查。这就像 TCP 握手——先确认双方"说同一种语言",再开始传输数据。

5. 让消息可验证:内容哈希、Agent 身份、完整性证明

在多 Agent Pipeline 中,有一个场景会让你抓狂:Agent B 收到了一个错误的结果。是 Agent A 发错了,还是中间某个环节(消息队列、代理层、序列化/反序列化过程)改了消息内容?没有内置于消息的验证机制,你根本回答不了这个问题——你只能靠"相信基础设施是可靠的"来安慰自己。

生产环境不能靠"相信"。你需要让每条消息自带完整性证明——下游 Agent 收到消息后,能独立验证"这条消息和上游发出来的时候一模一样"。

5.1 content_hash:内容指纹

content_hash 是数据层(Layer 1)的 SHA-256 哈希值。发送方 Agent 在构造消息时对 data 字段做哈希,将摘要写入验证层;接收方 Agent 收到消息后,对收到的 data 字段重新计算哈希,与验证层中的值比对——匹配则证明内容未被篡改。

# 发送方:计算并嵌入哈希
import hashlib, json

def sign_message(message: dict) -> dict:
    """为消息的数据层计算 content_hash 并写入验证层。"""
    data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
    message["verification"] = message.get("verification", {})
    message["verification"]["content_hash"] = hashlib.sha256(data_bytes).hexdigest()
    message["verification"]["content_hash_algorithm"] = "sha256"
    return message

# 接收方:验证哈希
def verify_content_integrity(message: dict) -> bool:
    """重新计算数据层哈希并与消息中的 content_hash 比对。"""
    if "verification" not in message or "content_hash" not in message["verification"]:
        return False  # 没有哈希值 = 无法验证
    data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
    computed = hashlib.sha256(data_bytes).hexdigest()
    return computed == message["verification"]["content_hash"]

关键细节:哈希计算时必须用 sort_keys=True,因为 JSON 对象中键的顺序在不同序列化实现中可能不同——{"a":1,"b":2}{"b":2,"a":1} 语义等价但字节不同,不排序会导致假阳性不匹配。

5.2 sender_agent_id + agent_version:身份溯源

哈希只能证明内容没被改,但不能证明"这条消息确实来自声称的那个 Agent"。元数据层中的 sender_agent_idsender_agent_version 提供了身份溯源:

"metadata": {
  "sender_agent_id": "risk-review-agent",
  "sender_agent_version": "2.1.0",
  "receiver_agent_id": "payment-agent",
  "timestamp": "2024-12-05T14:23:11.482Z"
}

这两个字段在排障时价值巨大。当你要回答"是谁用哪个版本产出了这条消息?"时,不需要去日志系统里翻——答案就在消息本身。对于跨团队的多 Agent 系统,sender_agent_id 建议使用全局注册的标识符(如 team-risk.reviewer-v2),而不是本地代号。

5.3 signature:跨信任域验证

当消息跨越安全域边界时(例如从生产网络发到外部审计系统),仅靠 content_hash 不够——恶意中间人可以同时修改数据和哈希值。此时需要 HMAC 签名:

import hmac

def sign_with_hmac(message: dict, shared_secret: str) -> dict:
    """为消息添加 HMAC-SHA256 签名。"""
    data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
    msg_id = message["metadata"]["message_id"].encode("utf-8")
    # 签名涵盖 message_id + 数据层,防止重放攻击
    signature = hmac.new(
        shared_secret.encode("utf-8"),
        msg_id + data_bytes,
        hashlib.sha256
    ).hexdigest()
    message["verification"]["agent_signature"] = signature
    return message

什么时候需要签名?同一主机内 Agent 通信:不需要签名(content_hash 足够);跨主机但同一信任域:content_hash + TLS 传输层加密即可;跨信任域(如 SaaS 平台中不同租户的 Agent):需要 HMAC 或非对称签名。

5.4 proof_chain:多跳追溯链

单跳场景下 content_hashsender_agent_id 够了,但当一个任务经过 N 个 Agent 依次处理时,你需要知道整条链路——每个 Agent 做了什么、产出了什么。这就是 proof_chain 的用武之地。

proof_chain 是一个有序列表,记录消息经过的每一个 Agent:

"verification": {
  "schema_version": "1.2.0",
  "content_hash": "sha256:a1b2c3...",
  "proof_chain": [
    {
      "agent_id": "research-agent",
      "agent_version": "1.5.0",
      "action": "initial_search",
      "content_hash": "sha256:f1e2d3...",
      "timestamp": "2024-12-05T14:20:00Z"
    },
    {
      "agent_id": "writer-agent",
      "agent_version": "3.2.1",
      "action": "draft_section",
      "content_hash": "sha256:a1b2c3...",
      "timestamp": "2024-12-05T14:22:30Z"
    }
  ]
}

每个 downstream Agent 在修改消息内容后,将自己的操作记录追加到 proof_chain 中:

def append_to_proof_chain(message: dict, agent_id: str,
                          agent_version: str, action: str) -> dict:
    """Agent 在修改消息后,将自己的操作追加到 proof_chain。"""
    chain = message.setdefault("verification", {}).setdefault("proof_chain", [])
    data_bytes = json.dumps(message["data"], sort_keys=True).encode("utf-8")
    chain.append({
        "agent_id": agent_id,
        "agent_version": agent_version,
        "action": action,
        "content_hash": hashlib.sha256(data_bytes).hexdigest(),
        "timestamp": datetime.utcnow().isoformat() + "Z"
    })
    # 更新消息级 content_hash 为最新数据层哈希
    message["verification"]["content_hash"] = chain[-1]["content_hash"]
    return message

5.5 端到端验证示例

用一个三 Agent 链路来串联所有验证机制:Research Agent 搜索并产出原始资料 → Writer Agent 基于资料撰写初稿 → Reviewer Agent 审核内容。每一步都追加 proof_chain,任何一步的篡改都能被检测出来。

完整消息示例(经过两个 Agent 后的状态):

{
  "message_id": "msg-8f3a1b2c",
  "message_type": "task_handoff",
  "schema_version": "1.2.0",
  "data": {
    "content": {
      "topic": "Agent 消息 Schema 设计",
      "draft": "多 Agent 系统中,消息是...",
      "sources": ["https://example.com/a2a-spec", "https://example.com/mcp"]
    },
    "results": null,
    "artifacts": []
  },
  "metadata": {
    "task_id": "task-content-gen-001",
    "correlation_id": "corr-sprint-42",
    "sender_agent_id": "writer-agent",
    "sender_agent_version": "3.2.1",
    "receiver_agent_id": "reviewer-agent",
    "timestamp": "2024-12-05T14:22:30Z"
  },
  "verification": {
    "schema_version": "1.2.0",
    "content_hash": "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
    "content_hash_algorithm": "sha256",
    "proof_chain": [
      {
        "agent_id": "research-agent",
        "agent_version": "1.5.0",
        "action": "initial_search",
        "content_hash": "sha256:d1d2d3...",
        "timestamp": "2024-12-05T14:20:00Z"
      },
      {
        "agent_id": "writer-agent",
        "agent_version": "3.2.1",
        "action": "draft_section",
        "content_hash": "sha256:e3b0c4...",
        "timestamp": "2024-12-05T14:22:30Z"
      }
    ]
  },
  "routing": {
    "priority": "normal",
    "max_retries": 2
  }
}

Reviewer Agent 收到这条消息后,可以执行完整的验证链:① 重新计算 data 的哈希 → 与 content_hash 比对;② 遍历 proof_chain,确认每个 Agent 的操作内容哈希与前一个 Agent 的产出一致;③ 检查 chain 中每个 Agent 的 timestamp 是否单调递增(防止时间戳回退攻击)。

def verify_message(message: dict) -> tuple[bool, str]:
    """对消息执行完整验证:内容完整性 + proof_chain 连续性。"""
    # Step 1: 内容完整性
    if not verify_content_integrity(message):
        return False, "Content hash mismatch — data may have been tampered with"

    # Step 2: proof_chain 连续性
    chain = message.get("verification", {}).get("proof_chain", [])
    for i in range(1, len(chain)):
        prev_hash = chain[i-1]["content_hash"]
        # 在实际系统中,你需要存储每个 Agent 操作前的数据哈希
        # 此处简化为检查链中每个条目非空
        if not chain[i].get("content_hash"):
            return False, f"Proof chain entry {i} missing content_hash"

    # Step 3: timestamp 单调性
    for i in range(1, len(chain)):
        if chain[i]["timestamp"] <= chain[i-1]["timestamp"]:
            return False, f"Timestamp regression at entry {i}"

    return True, "All verifications passed"
📌 防篡改 + 审计合规:proof_chain 不仅是技术验证工具,更是审计合规的基础设施。在金融、医疗等受监管行业,审计人员需要回答"这个决策是谁做的、基于什么数据、经过了哪些环节"——proof_chain 让这些问题有据可查,不需要翻日志、不需要看代码,只看消息本身就能还原全链路。

6. 实战:为三 Agent 系统设计完整消息 Schema

前面的理论都懂了——但能不能直接给我一个可运行的、端到端的三 Agent 系统 Schema 实现?能。下面以一个中文开发者友好的场景——内容审核链路——来展示从 Schema 定义到消息流转的完整过程。

6.1 场景:内容审核三 Agent 链路

假设你在做一个内容平台的内容审核系统,有三类消息经过三个 Agent:

Agent职责产出的消息类型
ContentFetcher从数据库/API 抓取待审核内容FetchResult
ContentAuditor对内容进行审核(违规检测 + 置信度评分)AuditResult
ReportAgent汇总审核结果,生成审核报告ReportMessage

消息流向:ContentFetcher → ContentAuditor → ReportAgent。下面定义三个消息类型的完整 Schema 和 Pydantic 模型。

6.2 FetchResult — ContentFetcher 产出

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "FetchResult",
  "type": "object",
  "required": ["message_type", "trace_id", "items", "fetch_metadata"],
  "properties": {
    "message_type": {"const": "fetch_result"},
    "trace_id": {
      "type": "string",
      "description": "贯穿整个审核链路的追踪 ID"
    },
    "items": {
      "type": "array",
      "items": {
        "type": "object",
        "required": ["content_id", "content_type", "raw_content"],
        "properties": {
          "content_id": {"type": "string"},
          "content_type": {"type": "string", "enum": ["article", "comment", "image", "video"]},
          "raw_content": {"type": "string"},
          "author_id": {"type": "string"},
          "created_at": {"type": "string", "format": "date-time"},
          "source": {"type": "string", "description": "内容来源,如 'db:posts'、'api:comments'"}
        }
      }
    },
    "fetch_metadata": {
      "type": "object",
      "properties": {
        "total_fetched": {"type": "integer"},
        "fetch_duration_ms": {"type": "integer"},
        "fetch_strategy": {"type": "string", "enum": ["batch", "stream", "on_demand"]}
      }
    }
  }
}

6.3 AuditResult — ContentAuditor 产出

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "AuditResult",
  "type": "object",
  "required": ["message_type", "trace_id", "audit_results"],
  "properties": {
    "message_type": {"const": "audit_result"},
    "trace_id": {
      "type": "string",
      "description": "与上游 FetchResult 相同的 trace_id"
    },
    "audit_results": {
      "type": "array",
      "items": {
        "type": "object",
        "required": ["content_id", "verdict", "confidence"],
        "properties": {
          "content_id": {"type": "string"},
          "verdict": {
            "type": "string",
            "enum": ["safe", "unsafe", "needs_review", "error"]
          },
          "confidence": {"type": "number", "minimum": 0, "maximum": 1},
          "violations": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "rule_id": {"type": "string"},
                "rule_name": {"type": "string"},
                "severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
                "matched_text": {"type": "string"},
                "explanation": {"type": "string"}
              }
            }
          },
          "auditor_version": {"type": "string", "description": "审核模型的版本号"},
          "audited_at": {"type": "string", "format": "date-time"}
        }
      }
    },
    "summary": {
      "type": "object",
      "properties": {
        "total_audited": {"type": "integer"},
        "safe_count": {"type": "integer"},
        "unsafe_count": {"type": "integer"},
        "needs_review_count": {"type": "integer"},
        "audit_duration_ms": {"type": "integer"}
      }
    }
  }
}

6.4 ReportMessage — ReportAgent 产出

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "ReportMessage",
  "type": "object",
  "required": ["message_type", "trace_id", "report"],
  "properties": {
    "message_type": {"const": "report_message"},
    "trace_id": {
      "type": "string",
      "description": "与上游相同的 trace_id——全链路追踪"
    },
    "report": {
      "type": "object",
      "required": ["report_id", "generated_at", "overall_verdict", "details"],
      "properties": {
        "report_id": {"type": "string"},
        "generated_at": {"type": "string", "format": "date-time"},
        "overall_verdict": {
          "type": "string",
          "enum": ["all_safe", "has_violations", "requires_manual_review"]
        },
        "details": {
          "type": "array",
          "items": {
            "type": "object",
            "properties": {
              "content_id": {"type": "string"},
              "final_decision": {"type": "string", "enum": ["approved", "rejected", "flagged"]},
              "action_taken": {"type": "string", "enum": ["publish", "hide", "delete", "warn_user"]},
              "human_review_required": {"type": "boolean"},
              "notes": {"type": "string"}
            }
          }
        }
      }
    },
    "pipeline_metadata": {
      "type": "object",
      "description": "整个审核链路的执行摘要",
      "properties": {
        "pipeline_started_at": {"type": "string", "format": "date-time"},
        "pipeline_completed_at": {"type": "string", "format": "date-time"},
        "total_duration_ms": {"type": "integer"},
        "agents_involved": {
          "type": "array",
          "items": {"type": "string"}
        }
      }
    }
  }
}

6.5 Pydantic 模型实现(可直接运行)

from pydantic import BaseModel, Field
from typing import Optional, List
from enum import Enum
from datetime import datetime

# ── 枚举定义 ──
class ContentType(str, Enum):
    ARTICLE = "article"
    COMMENT = "comment"
    IMAGE = "image"
    VIDEO = "video"

class Verdict(str, Enum):
    SAFE = "safe"
    UNSAFE = "unsafe"
    NEEDS_REVIEW = "needs_review"
    ERROR = "error"

class FinalDecision(str, Enum):
    APPROVED = "approved"
    REJECTED = "rejected"
    FLAGGED = "flagged"

# ── FetchResult:ContentFetcher 产出 ──
class FetchItem(BaseModel):
    content_id: str
    content_type: ContentType
    raw_content: str
    author_id: Optional[str] = None
    created_at: Optional[datetime] = None
    source: Optional[str] = None

class FetchMetadata(BaseModel):
    total_fetched: int
    fetch_duration_ms: int
    fetch_strategy: str = "batch"

class FetchResult(BaseModel):
    message_type: str = Field(default="fetch_result", frozen=True)
    trace_id: str
    items: List[FetchItem]
    fetch_metadata: FetchMetadata

# ── AuditResult:ContentAuditor 产出 ──
class Violation(BaseModel):
    rule_id: str
    rule_name: str
    severity: str  # low, medium, high, critical
    matched_text: Optional[str] = None
    explanation: Optional[str] = None

class AuditItem(BaseModel):
    content_id: str
    verdict: Verdict
    confidence: float = Field(ge=0, le=1)
    violations: List[Violation] = []
    auditor_version: Optional[str] = None
    audited_at: Optional[datetime] = None

class AuditSummary(BaseModel):
    total_audited: int
    safe_count: int
    unsafe_count: int
    needs_review_count: int
    audit_duration_ms: int

class AuditResult(BaseModel):
    message_type: str = Field(default="audit_result", frozen=True)
    trace_id: str
    audit_results: List[AuditItem]
    summary: Optional[AuditSummary] = None

# ── ReportMessage:ReportAgent 产出 ──
class ReportDetail(BaseModel):
    content_id: str
    final_decision: FinalDecision
    action_taken: str
    human_review_required: bool = False
    notes: Optional[str] = None

class Report(BaseModel):
    report_id: str
    generated_at: datetime
    overall_verdict: str
    details: List[ReportDetail]

class PipelineMetadata(BaseModel):
    pipeline_started_at: datetime
    pipeline_completed_at: datetime
    total_duration_ms: int
    agents_involved: List[str]

class ReportMessage(BaseModel):
    message_type: str = Field(default="report_message", frozen=True)
    trace_id: str
    report: Report
    pipeline_metadata: Optional[PipelineMetadata] = None

6.6 trace_id 传播与 proof_chain 累积

三个 Agent 之间的消息流转遵循一个简单的约定:trace_id 从 FetchResult 一路传播到 ReportMessage,proof_chain 在每一步持续累积

from uuid import uuid4
import hashlib, json
from datetime import datetime, timezone

# ── 消息序列化/反序列化工具 ──
def serialize_message(model: BaseModel) -> dict:
    """Pydantic 模型 → JSON 字典(用于传输)。"""
    return json.loads(model.model_dump_json())

def deserialize_fetch_result(data: dict) -> FetchResult:
    return FetchResult(**data)

def deserialize_audit_result(data: dict) -> AuditResult:
    return AuditResult(**data)

def deserialize_report_message(data: dict) -> ReportMessage:
    return ReportMessage(**data)

# ── 统一的 Agent 消息包装器 ──
def wrap_agent_message(payload: BaseModel, sender_id: str,
                       sender_version: str, receiver_id: str,
                       task_id: str, proof_chain: list = None) -> dict:
    """将业务载荷包装为带四层结构的 Agent 消息。"""
    now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
    payload_dict = json.loads(payload.model_dump_json())

    # 数据层
    data_layer = {
        "content": payload_dict,
        "parameters": {},
        "results": None,
        "artifacts": []
    }

    # 计算 content_hash
    data_bytes = json.dumps(data_layer, sort_keys=True).encode("utf-8")
    content_hash = hashlib.sha256(data_bytes).hexdigest()

    # 构建 proof_chain
    chain = proof_chain or []
    chain.append({
        "agent_id": sender_id,
        "agent_version": sender_version,
        "action": payload.message_type,
        "content_hash": f"sha256:{content_hash}",
        "timestamp": now
    })

    return {
        "message_id": f"msg-{uuid4().hex[:8]}",
        "message_type": payload.message_type,
        "schema_version": "1.0.0",
        "data": data_layer,
        "metadata": {
            "task_id": task_id,
            "sender_agent_id": sender_id,
            "sender_agent_version": sender_version,
            "receiver_agent_id": receiver_id,
            "timestamp": now
        },
        "verification": {
            "schema_version": "1.0.0",
            "content_hash": f"sha256:{content_hash}",
            "content_hash_algorithm": "sha256",
            "proof_chain": chain
        },
        "routing": {
            "priority": "normal",
            "max_retries": 3
        }
    }

# ── 端到端示例 ──
def run_content_audit_pipeline():
    """从 ContentFetcher → ContentAuditor → ReportAgent 的完整演示。"""
    task_id = f"task-{uuid4().hex[:8]}"
    trace_id = f"trace-{uuid4().hex[:8]}"
    chain = []

    # Step 1: ContentFetcher 产出 FetchResult
    fetch_result = FetchResult(
        trace_id=trace_id,
        items=[
            FetchItem(content_id="c-001", content_type=ContentType.ARTICLE,
                      raw_content="这是一篇关于金融理财的文章...",
                      author_id="u_1001", source="db:posts"),
            FetchItem(content_id="c-002", content_type=ContentType.COMMENT,
                      raw_content="这个产品真的很好用,推荐!",
                      author_id="u_2002", source="api:comments"),
        ],
        fetch_metadata=FetchMetadata(total_fetched=2, fetch_duration_ms=45)
    )

    msg1 = wrap_agent_message(fetch_result, "content-fetcher", "1.0.0",
                              "content-auditor", task_id, chain)
    chain = msg1["verification"]["proof_chain"]  # 传递 proof_chain

    print(f"[ContentFetcher] 产出 {len(fetch_result.items)} 条待审内容,trace_id={trace_id}")
    print(f"  proof_chain 长度: {len(chain)}")

    # Step 2: ContentAuditor 消费 FetchResult,产出 AuditResult
    audit_items = []
    for item in fetch_result.items:
        verdict = Verdict.SAFE if "推荐" in item.raw_content else Verdict.NEEDS_REVIEW
        audit_items.append(AuditItem(
            content_id=item.content_id,
            verdict=verdict,
            confidence=0.92,
            violations=[] if verdict == Verdict.SAFE else [
                Violation(rule_id="R-001", rule_name="敏感词检测",
                          severity="medium", matched_text="金融理财")
            ],
            auditor_version="auditor-v3.1",
            audited_at=datetime.now(timezone.utc)
        ))

    audit_result = AuditResult(
        trace_id=trace_id,
        audit_results=audit_items,
        summary=AuditSummary(
            total_audited=2, safe_count=1, unsafe_count=0,
            needs_review_count=1, audit_duration_ms=230
        )
    )

    msg2 = wrap_agent_message(audit_result, "content-auditor", "3.1.0",
                              "report-agent", task_id, chain)
    chain = msg2["verification"]["proof_chain"]

    print(f"[ContentAuditor] 审核完成: 安全={audit_result.summary.safe_count}, "
          f"需人工={audit_result.summary.needs_review_count}")
    print(f"  proof_chain 长度: {len(chain)}")

    # Step 3: ReportAgent 消费 AuditResult,产出 ReportMessage
    details = []
    for item in audit_result.audit_results:
        if item.verdict == Verdict.SAFE:
            decision, action = FinalDecision.APPROVED, "publish"
        elif item.verdict == Verdict.UNSAFE:
            decision, action = FinalDecision.REJECTED, "delete"
        else:
            decision, action = FinalDecision.FLAGGED, "warn_user"
        details.append(ReportDetail(
            content_id=item.content_id, final_decision=decision,
            action_taken=action,
            human_review_required=(item.verdict == Verdict.NEEDS_REVIEW)
        ))

    report_msg = ReportMessage(
        trace_id=trace_id,
        report=Report(
            report_id=f"rpt-{uuid4().hex[:8]}",
            generated_at=datetime.now(timezone.utc),
            overall_verdict="requires_manual_review",
            details=details
        ),
        pipeline_metadata=PipelineMetadata(
            pipeline_started_at=datetime.now(timezone.utc),
            pipeline_completed_at=datetime.now(timezone.utc),
            total_duration_ms=350,
            agents_involved=["content-fetcher", "content-auditor", "report-agent"]
        )
    )

    msg3 = wrap_agent_message(report_msg, "report-agent", "2.0.0",
                              "orchestrator", task_id, chain)

    print(f"[ReportAgent] 报告生成: {report_msg.report.overall_verdict}")
    print(f"  proof_chain 最终长度: {len(msg3['verification']['proof_chain'])}")

    return msg3

# 运行
if __name__ == "__main__":
    final_message = run_content_audit_pipeline()

运行上面的代码,你会看到三个 Agent 依次产出消息,trace_id 贯穿始终,proof_chain 从 1 条记录增长到 3 条——任何一步的篡改都能被验证出来。

📌 关键设计决策:在这个三 Agent 系统中,trace_id 由 ContentFetcher 生成,后续 Agent 通过解析上游消息的 trace_id 字段来衔接链路。proof_chain 通过 wrap_agent_message 函数的 proof_chain 参数在 Agent 之间传递和累积。这两个机制让分布式审计变得简单——你只需要拿到最后一条消息,就能还原整条链路。

常见问题(FAQ):消息 Schema 设计的 6 个高频问题

Q1: 消息 Schema 设计和 REST API 设计有什么本质区别?

REST API 是请求-响应模式,有明确的 Client/Server 角色——客户端发请求,服务端返回响应,一次交互结束。Agent 消息是多跳异步的——一条消息可能经过 3 个 Agent 的传递和修改,每个 Agent 既是消费者也是生产者。这带来两个本质差异:

  • 溯源需求不同:Agent 消息需要内嵌溯源信息(proof_chain、sender_agent_id、agent_version),因为"谁在哪个环节改了什么"是排障的核心问题。REST API 的 trace 依赖外部系统(如 OpenTelemetry、分布式追踪),不要求消息体自带溯源。
  • 版本耦合面不同:REST API 只有 Client/Server 两个版本点。Agent 消息的 N 个 Agent 有 N 个版本点——Schema 版本管理必须考虑"某个 Agent 升级后,下游 N-1 个 Agent 能否继续工作"。
Q2: 应该用 JSON 还是 Protobuf?

取决于部署场景:

  • 内部 Agent 系统(同一进程/同一主机):用 JSON。可读性优先于性能——调试时需要直接看消息内容,不需要 protoc 解码。LLM 原生处理 JSON 也更自然。
  • 跨主机/跨语言 Agent 系统:考虑 Protobuf + JSON 双序列化方案。内部 Agent 通信走 Proto(性能好、带宽省),API 边界/对外暴露时转 JSON(可读性好、工具链成熟)。
  • 参考 Google A2A 协议:选择了 Proto-first + JSON 兼容的设计——核心传输用 Proto,同时提供规范的 JSON 映射规则,两种格式可以无损互转。
Q3: 消息 Schema 和 MCP 协议是什么关系?

两者是互补关系,不是替代关系

  • MCP(Model Context Protocol)定义了 Agent 与外部工具的通信格式——基于 JSON-RPC 2.0,关注工具发现、调用和结果返回。
  • 本文的消息 Schema定义了 Agent 与Agent 之间的通信格式——关注任务交接、审批、状态同步和错误升级。
  • MCP 的 ToolResult 可以封装成本文定义的 ToolResult 消息类型的一个子集——MCP 管 Agent↔工具,本文管 Agent↔Agent。在实际系统中,两者需要协同工作:Agent A 通过 MCP 调用工具后,用本文的 Schema 把加工后的结果传递给 Agent B。
Q4: schema_version 从 1.0 升级到 2.0 时,正在运行的任务怎么处理?

三种策略,按推荐度排序:

  1. 追加式(推荐起步):新任务用 v2 Schema,老任务继续用 v1 跑完。无中断,但同一时期有两套消息格式在线上。
  2. 迁移式:给 v1 消息自动添加 v2 的默认字段——比如 v2 新增了 priority 字段,迁移器给所有 v1 消息补上 "priority": "normal"。成本低但需要维护迁移逻辑。
  3. 双写式(零停机):两个版本的 Handler 同时运行,新 Agent 消费 v2,老 Agent 消费 v1,逐步切流量。成本最高但最安全。

生产环境推荐策略 1 + 3 组合:先双写(新旧 Handler 共存),观察一段时间后,让老任务自然结束,新任务全走 v2。

Q5: 每条消息都需要 content_hash 和 signature 吗?

不是。按消息场景分级要求:

  • 低风险场景(Agent 内部日志消息、心跳 ping):可以省略 content_hash——出错不影响业务,性能优先。
  • 中风险场景(任务交接、审批请求、工具结果):必须包含 content_hash——这些消息承载业务决策,完整性必须可验证。
  • 高风险场景(跨安全域的消息、金融交易指令):还需要 signature——哈希只能防无意的篡改,签名才能防有意的伪造。

建议在 Schema 中将 verification 字段设为可选,通过消息类型的 risk_level 决定是否强制要求。不要让低风险消息背着高风险的验证成本。

Q6: 如何避免 Schema 设计过度?

最简 Schema 开始——只定义 data layer 的必填字段。用 YAGNI 原则(You Aren't Gonna Need It):

  • 调试困难时 → 加 correlation_id
  • Agent 数量增长到 ≥3 时 → 加 sender_agent_idagent_version
  • 安全审计需求出现时 → 加 content_hashproof_chain
  • 跨主机通信时 → 加 signature

不要预先设计"可能有用"的字段。每加一个字段都有成本——序列化开销、维护负担、文档更新、下游 Agent 的适配。让真实的痛苦驱动 Schema 的演化,而不是提前的猜测。

消息 Schema 是多 Agent 系统通信的骨架——它定义了"怎么传"。但要构建完整的 Agent 系统,你还需要理解"传什么"、"怎么编排"和"出错了怎么办"。以下文章构成 Agent 通信与协议的完整知识体系:

如果你还没有读过任何 Agent 工程文章,建议从 什么是 AI Agent 开始。