Agent 上下文协议设计:如何在工具、记忆与任务之间传递状态
30秒要点
- 核心问题:Agent 系统在工具调用、记忆读写、任务切换时,如果没有统一的上下文传递协议,会导致提示词膨胀、密钥泄露、结果路由混乱和状态丢失。
- 解决方案:四层上下文架构——消息总线、工具上下文、记忆上下文、任务上下文——每一层有明确的职责和数据结构。
- 关键实现:ToolResultEnvelope(工具结果修剪+脱敏+路由)、MemoryContextGate(记忆读写门控+命名空间隔离)。
- 读完能做什么:为你的 Agent 系统设计一套框架无关的上下文传递协议,避免常见的安全和成本陷阱。
1. 为什么没有上下文协议,Agent 系统一定会崩
先讲一个真实发生过的事故。一个团队构建了一个代码审查 Agent,用于自动扫描 GitHub 仓库的安全漏洞。Agent 通过 GitHub API 工具拉取代码,然后将结果送入 LLM 分析。一切看起来正常——直到有人在团队 Slack 频道里看到了一条完整的 GitHub Personal Access Token。
问题出在哪?Agent 调用了 list_repos 工具,工具返回的原始 JSON 里包含了认证用的 token 字段(因为 API 响应的 Authorization header 被日志中间件原样记录了)。开发者的做法是把工具返回值直接 append 到 messages 列表里,没有任何过滤:
# 常见的天真写法——直接把工具结果丢进上下文
result = execute_tool(tool_name, arguments)
messages.append({
"role": "tool",
"tool_call_id": call_id,
"content": json.dumps(result) # ← 包含 token、密码、所有原始字段
})
这个 token 进入了 LLM 的上下文窗口,然后 LLM 在生成分析报告时把它「复述」了出来。报告被自动推送到 Slack——于是 token 就公开了。
这只是上下文管理不善的冰山一角。更系统性地看,缺少上下文协议会导致五类问题同时爆发:
① 提示词膨胀(Prompt Bloat):每次 LLM 调用都把完整对话历史 + 原始工具输出 + 记忆搜索结果全塞进去。一个三步推理的任务,第二次调用可能就带着 15KB 的工具输出了,token 成本呈几何级数增长。
② 密钥泄露(Secret Leakage):工具返回的 API key、token、密码、内网地址直接进入 LLM 上下文,没有任何脱敏。一旦 LLM 在输出中复述这些信息——它们就进入了日志、通知、共享文档。
③ 工具结果路由混乱(Tool Result Confusion):并行调用 3 个工具——search_code、get_file、run_tests——三个结果同时返回,但你把它们全部无差别地追加到消息列表里。LLM 需要自己「猜」哪个结果对应哪个调用,错误率随并发数急剧上升。
④ 记忆污染(Memory Contamination):上一个任务的中间推理过程被错误地持久化到了共享记忆库,下一个不相关的任务读到这些记忆后产生了幻觉。
⑤ 任务间状态丢失(Inter-Task State Loss):多步工作流中——第一步拉取源码、第二步静态分析、第三步生成修复建议——第一步生成的产物(文件列表、依赖图)没有规范化的传递机制,到第二步时已经丢失了,只能重新计算。
这些问题有一个共同的根因:Agent 的上下文不是一个被动的数据容器,而是一个需要显式设计、结构化管理的协议层。把上下文当作「消息列表随便 append」的想法,在单步推理时还能工作,一旦进入生产环境的多工具、多任务、多记忆场景,就会系统性崩溃。
2. 上下文协议的四层架构模型
开发者在 Agent 的不同组件之间传递状态时,面临一个根本性的困惑:工具输出放哪里?记忆读哪里?任务状态怎么传?没有一个统一的心智模型,每个人都在根据自己的直觉瞎搞——有的把一切都序列化成 JSON 塞进 system prompt,有的在全局变量里到处写临时状态。
经过对多个生产级 Agent 系统的分析,我们提炼出一个四层架构模型。每一层有明确的职责边界和数据契约:
Layer 1 — 消息总线(Message Bus):最底层,原始的 LLM 对话轮次存储层。包含 system prompt、user messages、assistant responses、tool call requests 和 tool call results。所有上下文最终都流向这里,但不应该直接操作这一层——应该通过上层的结构化封装来注入内容。这一层面向 LLM API 的消息格式,是上下文协议的「物理层」。
Layer 2 — 工具上下文(Tool Context):工具输入/输出的结构化封装层。核心职责:结果修剪(截断过长输出)、密钥脱敏(正则检测并替换敏感字段)、路由匹配(通过 tool_call_id 将结果对应到正确的推理步骤)。每一个工具结果在注入消息总线之前,必须先经过这层处理。
Layer 3 — 记忆上下文(Memory Context):记忆系统的读写门控层。不是所有记忆都该注入当前的推理上下文——需要相关性过滤(embedding 相似度 / BM25 评分);不是所有 Agent 输出都该被记住——需要写入门控来防止中间推理过程污染持久化记忆。同时提供命名空间隔离(user → session → task 三层)。
Layer 4 — 任务上下文(Task Context):跨步骤状态传递层。多步工作流的每一步可能产生大量中间产物——文件列表、分析结果、生成的代码。任务上下文的做法是传递产物引用而非全量复制——上一个任务只传递一个指向结果的指针(文件路径、S3 key、数据库 ID)加上结构化摘要,下一个任务按需读取。
四层之间的数据流关系如下:
┌──────────────────┐
│ Task Context │ ← 跨步骤状态信封、产物指针
│ (Layer 4) │
└────────┬─────────┘
│ 读/写状态引用
▼
┌──────────────────┐
│ Memory Context │ ← 读/写门控、命名空间隔离
│ (Layer 3) │
└────────┬─────────┘
│ 注入/提取记忆块
▼
┌──────────────────┐
│ Tool Context │ ← 修剪、脱敏、路由匹配
│ (Layer 2) │
└────────┬─────────┘
│ 安全的上下文片段
▼
┌──────────────────┐
│ Message Bus │ ← LLM 对话轮次(最终汇入)
│ (Layer 1) │
└──────────────────┘
对应的核心数据结构定义:
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
# Layer 1: Message Bus
@dataclass
class Message:
role: str # "system" | "user" | "assistant" | "tool"
content: str
tool_call_id: Optional[str] = None
tool_calls: Optional[list] = None
# Layer 2: Tool Context
@dataclass
class ToolCallRequest:
id: str
name: str
arguments: dict
@dataclass
class ToolResultEnvelope:
tool_call_id: str
raw_result: Any
trimmed_result: Optional[str] = None
redacted: bool = False
redacted_fields: list = field(default_factory=list)
# Layer 3: Memory Context
@dataclass
class MemoryEntry:
namespace: str # "user_42:session_a:task_1"
content: str
embedding: Optional[list] = None
relevance_score: float = 0.0
timestamp: datetime = field(default_factory=datetime.now)
# Layer 4: Task Context
@dataclass
class TaskStateEnvelope:
task_id: str
parent_task_id: Optional[str] = None
artifact_refs: dict = field(default_factory=dict)
summary: str = ""
metadata: dict = field(default_factory=dict)
这个四层模型的关键设计原则是:数据自顶向下过滤,自底向上汇聚。上层决定什么内容可以进入下层,下层只管理最终的序列化格式。每一层都可以独立测试、独立演进,不会因为改了工具脱敏规则就破坏记忆读写。
3. 工具结果的路由、修剪与脱敏
一个推理步骤可能触发多个工具并行调用。比如 Agent 同时调用 search_files、read_logs 和 check_api_status。三个结果返回后,有两个关键挑战:(1)必须把每个结果正确路由回对应的 tool_call_id;(2)原始结果可能很长(100KB 的日志文件)或包含敏感数据(API key 藏在响应头里),不能原样注入 LLM 上下文。
解决方案是 ToolResultEnvelope——一个在工具结果进入消息总线之前的强制性过滤层:
import re
import json
from typing import Any, Optional, Literal
class ToolResultEnvelope:
"""工具结果的安全封装:修剪、脱敏、路由匹配。"""
DEFAULT_SECRET_PATTERNS = [
(r'(?:api[_-]?key|apikey|api_token|access_token|secret)["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{20,})', 'API_KEY'),
(r'(?:password|passwd|pwd)["\s:=]+["\x27]?([^"\x27&\s]{4,})', 'PASSWORD'),
(r'(?:token|auth)["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{16,})', 'TOKEN'),
(r'(?:private[_-]?key|privkey)["\s:=]+["\x27]?([A-Za-z0-9+/=]{32,})', 'PRIVATE_KEY'),
(r'(?:bearer|basic)\s+([A-Za-z0-9_\-\.=]{16,})', 'AUTH_HEADER'),
(r'ghp_[A-Za-z0-9]{36}', 'GITHUB_TOKEN'),
(r'glpat-[A-Za-z0-9\-]{20,}', 'GITLAB_TOKEN'),
(r'sk-[A-Za-z0-9]{32,}', 'OPENAI_KEY'),
]
def __init__(self, tool_call_id: str, raw_result: Any, max_tokens: int = 2000):
self.tool_call_id = tool_call_id
self.raw_result = raw_result
self.max_tokens = max_tokens
self._trimmed: Optional[str] = None
self._redacted: bool = False
self._found_secrets: list = []
def trim(self, strategy: Literal["first_n", "field_whitelist", "summary"] = "first_n",
whitelist_fields: Optional[list] = None) -> 'ToolResultEnvelope':
"""修剪策略:
- first_n: 保留前 max_tokens 个 token(简单截断)
- field_whitelist: 只保留白名单中的字段
- summary: 对结果做结构化摘要
"""
text = self._serialize(self.raw_result)
if strategy == "first_n":
tokens = text.split()
self._trimmed = ' '.join(tokens[:self.max_tokens])
elif strategy == "field_whitelist" and whitelist_fields:
if isinstance(self.raw_result, dict):
filtered = {k: v for k, v in self.raw_result.items() if k in whitelist_fields}
self._trimmed = self._serialize(filtered)
else:
self._trimmed = text[:self.max_tokens * 4]
elif strategy == "summary":
summary_parts = [
f"type={type(self.raw_result).__name__}",
f"size={len(text)} chars"
]
if isinstance(self.raw_result, dict):
summary_parts.append(f"keys={list(self.raw_result.keys())[:10]}")
elif isinstance(self.raw_result, list):
summary_parts.append(f"count={len(self.raw_result)}")
if self.raw_result and isinstance(self.raw_result[0], dict):
summary_parts.append(f"item_keys={list(self.raw_result[0].keys())[:5]}")
self._trimmed = ' | '.join(summary_parts)
return self
def redact_secrets(self, patterns: Optional[list] = None) -> 'ToolResultEnvelope':
"""检测并替换敏感信息。"""
if patterns is None:
patterns = self.DEFAULT_SECRET_PATTERNS
text = self._trimmed or self._serialize(self.raw_result)
self._found_secrets = []
for pattern, label in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
for m in matches:
secret_val = m if isinstance(m, str) else m[0]
self._found_secrets.append({'label': label, 'value_preview': secret_val[:8] + '...'})
text = re.sub(pattern, f'[{label}:REDACTED]', text, flags=re.IGNORECASE)
self._redacted = len(self._found_secrets) > 0
self._trimmed = text
return self
def to_context_chunk(self) -> dict:
"""生成可直接注入 messages 列表的安全上下文片段。"""
content = self._trimmed or self._serialize(self.raw_result)
return {
"role": "tool",
"tool_call_id": self.tool_call_id,
"content": content
}
def _serialize(self, data: Any) -> str:
if isinstance(data, str):
return data
try:
return json.dumps(data, ensure_ascii=False, default=str)
except (TypeError, ValueError):
return str(data)
def _detect_secrets(self, text: str) -> list:
"""独立的安全扫描,不修改文本。"""
found = []
for pattern, label in self.DEFAULT_SECRET_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
found.append(label)
return found
使用示例——一次工具调用从原始结果到安全注入的完整流程:
# Agent 调用工具,获取原始结果
raw_result = execute_command("curl -H 'Authorization: Bearer sk-abc123...' https://api.service.com/logs")
# raw_result 可能包含 80KB 日志 + 响应头中的 token
envelope = ToolResultEnvelope(
tool_call_id="call_7a3f",
raw_result=raw_result,
max_tokens=1500
)
# 链式处理:先修剪,再脱敏,最后生成上下文片段
context_chunk = (envelope
.trim(strategy="first_n")
.redact_secrets()
.to_context_chunk())
# context_chunk 现在可以安全地 append 到 messages 列表
if envelope._found_secrets:
logging.warning(f"已脱敏 {len(envelope._found_secrets)} 个敏感字段: {[s['label'] for s in envelope._found_secrets]}")
ToolResultEnvelope 的设计有三个关键决策:(1)默认拒绝——不调用 redact_secrets() 就不过滤,但没有显式过滤的结果不应该进入消息总线;(2)链式 API——修剪和脱敏是可组合的,调用顺序自由但建议先修剪再脱敏(减少正则需要扫描的文本量);(3)可审计——_found_secrets 记录所有被检测到的敏感字段,可以集成到审计日志中。
4. 记忆访问模式与命名空间设计
Agent 和记忆的关系不是简单的「记住一切、搜索一切」。有三个核心决策点:Agent 什么时候该读取记忆?什么时候该写入记忆?如何防止任务 A 的记忆污染任务 B 的推理?如果把所有相关的记忆都注入上下文,token 预算会瞬间被撑爆——一个存了 500 条记忆的 Agent,每条 500 tokens,全部注入就是 250K tokens,直接超出大多数模型的上下文窗口。
MemoryContextGate 通过三个机制解决这些问题:读入门控(相关性过滤)、写入门控(内容验证)、命名空间隔离(多租户记忆分区)。
import hashlib
from typing import Any, Optional
from dataclasses import dataclass
@dataclass
class MemoryQueryResult:
content: str
relevance_score: float
namespace: str
memory_id: str
class MemoryContextGate:
"""记忆上下文的读写门控。"""
def __init__(self, relevance_threshold: float = 0.75, max_memory_tokens: int = 1000):
self.relevance_threshold = relevance_threshold
self.max_memory_tokens = max_memory_tokens
self._read_count = 0
self._write_count = 0
def should_read(self, query: str, namespace: str,
memories: list[MemoryQueryResult]) -> tuple[bool, float, list[MemoryQueryResult]]:
"""决定是否读取记忆,以及返回哪些记忆块。
Returns:
(should_read, best_score, filtered_memories)
"""
if not memories:
return False, 0.0, []
# 过滤:只保留相关性 >= 阈值的记忆
relevant = [m for m in memories if m.relevance_score >= self.relevance_threshold]
# 按分数降序排列
relevant.sort(key=lambda m: m.relevance_score, reverse=True)
# Token 预算控制:累计 token 数不超过上限
selected = []
token_count = 0
for mem in relevant:
mem_tokens = len(mem.content.split())
if token_count + mem_tokens <= self.max_memory_tokens:
selected.append(mem)
token_count += mem_tokens
else:
break
if not selected:
return False, 0.0, []
self._read_count += 1
best_score = selected[0].relevance_score
return True, best_score, selected
def should_write(self, content: str, namespace: str) -> bool:
"""决定是否将内容持久化到记忆库。
只写入明确验证过的输出,拒绝:
- 中间推理过程(过短或无结构)
- 纯工具日志(无语义价值)
- 重复内容(与已有记忆高度相似)
"""
# 基本验证
if len(content.strip()) < 50: # 太短,可能是碎片输出
return False
if len(content) > 50000: # 太长,可能是完整日志
return False
# 结构检查:应该包含有意义的语义内容
# 过滤纯数字/符号内容(大概率是日志输出)
alpha_ratio = sum(c.isalpha() for c in content) / max(len(content), 1)
if alpha_ratio < 0.3:
return False
self._write_count += 1
return True
def resolve_namespace(self, user_id: str, session_id: Optional[str] = None,
task_id: Optional[str] = None) -> str:
"""构建三层命名空间:user → session → task。
每一层可选。支持前缀查询(如查询 user_42:*:* 获取该用户所有记忆)。
"""
parts = [f"user_{user_id}"]
if session_id:
parts.append(f"session_{session_id}")
if task_id:
parts.append(f"task_{task_id}")
return ':'.join(parts)
def get_namespace_prefix(self, user_id: str, session_id: Optional[str] = None) -> str:
"""获取命名空间前缀,用于记忆库的范围查询。"""
parts = [f"user_{user_id}"]
if session_id:
parts.append(f"session_{session_id}")
return ':'.join(parts) + ':'
@property
def stats(self) -> dict:
return {
'reads': self._read_count,
'writes': self._write_count,
'read_threshold': self.relevance_threshold,
'max_memory_tokens': self.max_memory_tokens
}
三个设计决策的深入说明:
读入门控——相关性不是二分的,是连续的:不是简单地「相关就注入,不相关就不注入」,而是把相关性作为一个连续值,结合 token 预算做多路召回。高相关性的记忆优先注入,中相关性的在预算充足时注入,低相关性的始终不注入。relevance_threshold=0.75 是一个经过实践检验的默认值——低于 0.6 时噪声显著增加,高于 0.85 时可能漏掉有用的弱相关记忆。相关性分数可以通过 embedding 余弦相似度或 BM25 关键词匹配来计算,取决于你的记忆存储后端。
写入门控——Agent 产生的绝大部分文本都不值得记住:一个 Agent 在一次任务中可能生成 50 轮推理过程,但真正有价值的只有最后的结论、发现的事实、用户的明确偏好。中间推理过程(「让我想想……」「根据工具结果,当前状态是……」)如果被持久化,反而会污染后续任务。写入门控的验证逻辑很简单——信息密度太低的内容不值得持久化。
命名空间隔离——三层分区的实际使用模式:user_42:session_abc:task_1 不是死板的必须三层都填。实际使用中,大部分记忆存在 user_id:session_id 层级(一个对话会话内),少部分跨会话的持久偏好存在 user_id:* 层级。三层设计让你可以灵活控制记忆的可见范围——任务级记忆只在当前任务内可见,会话级记忆跨任务共享,用户级记忆跨会话持久存在。
# 使用示例:Agent 推理循环中的记忆读写
gate = MemoryContextGate(relevance_threshold=0.75, max_memory_tokens=1000)
namespace = gate.resolve_namespace(user_id="42", session_id="abc", task_id="1")
# 读取:Agent 在做代码分析时,先检索相关记忆
query = "Python 项目的依赖管理偏好"
candidates = memory_store.search(query, namespace=gate.get_namespace_prefix("42"))
should_read, score, selected = gate.should_read(query, namespace, candidates)
if should_read:
memory_context = "\n".join([m.content for m in selected])
# 注入到 system prompt 或消息列表中
system_prompt += f"\n\n用户历史偏好:\n{memory_context}"
# 写入:Agent 完成任务后,只持久化有价值的结论
agent_conclusion = "项目 A 使用 pip-tools 管理依赖,用户偏好 requirements.in + requirements.txt 模式"
if gate.should_write(agent_conclusion, namespace):
memory_store.save(namespace, agent_conclusion)
5. 跨任务状态传播:不让多步工作流丢失产物
Agent 的典型工作流不是单步的——它由多个步骤串联而成:搜索 → 提取 → 分析 → 报告。每一步都可能产生大量中间产物:搜索步骤返回一个 50KB 的 JSON 结果集,提取步骤从中切出 200 个文本片段,分析步骤生成统计图表和结构化结论。问题在于:这些产物怎么传给下一步?
两种常见的错误做法。第一种:把整个产物序列化后塞进上下文,传给下一步的 LLM。搜索结果的 50KB JSON 加上提取的 200 个片段,token 成本瞬间飙到 15 万以上,每一步都在几何级增长。第二种:只传一个简短摘要(「搜索返回了 348 条结果,提取了 200 个片段」),下游任务因为缺少细节而无法工作——它需要具体的数值、字段、文件名,而不是一句话概括。
正确的做法是 TaskStateEnvelope:传递产物引用,而非全量复制。核心思路:大产物存到外部(文件系统、S3、数据库),信封只携带引用指针 + 结构化摘要 + 元数据。下游任务通过「字段声明」机制精确指定自己需要什么,只注入必要的数据。
import uuid
from typing import Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Artifact:
"""任务产物——外部存储的引用 + 内联摘要。"""
artifact_id: str
artifact_type: str # "json" | "text" | "image" | "table" | "file"
pointer: str # 外部引用:s3://bucket/key 或 /tmp/result_xxx.json
summary: str # 结构化摘要:描述产物的内容和结构
size: int # 大小(bytes),用于预算判断
metadata: dict = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
class TaskStateEnvelope:
"""跨任务状态信封:传递产物引用而非全量数据。"""
def __init__(self, task_id: str, parent_task_id: Optional[str] = None):
self.task_id = task_id
self.parent_task_id = parent_task_id
self.version = 1
self.artifacts: dict[str, Artifact] = {} # artifact_id → Artifact
self.metadata: dict[str, Any] = {}
self.lineage: list[str] = [] # 祖先任务链:[parent, grandparent, ...]
if parent_task_id:
self.lineage.append(parent_task_id)
def add_artifact(self, artifact_type: str, pointer: str,
summary: str, size: int, metadata: dict = None) -> str:
"""添加一个产物到信封。返回 artifact_id。"""
artifact_id = f"{self.task_id}:{artifact_type}:{uuid.uuid4().hex[:8]}"
self.artifacts[artifact_id] = Artifact(
artifact_id=artifact_id,
artifact_type=artifact_type,
pointer=pointer,
summary=summary,
size=size,
metadata=metadata or {}
)
return artifact_id
def resolve_artifact(self, artifact_id: str) -> Optional[Artifact]:
"""按 ID 获取产物引用。不读取实际内容——只返回指针。"""
return self.artifacts.get(artifact_id)
def declare_fields(self, field_names: list[str]) -> dict:
"""下游任务声明自己需要的字段——只返回请求的部分。
field_names 可选值:
- "summary": 信封级别的摘要
- "artifacts": 所有产物引用(不含实际内容)
- "metadata": 元数据
- "lineage": 任务血缘链
- "stats": 统计信息(产物数量、总大小)
"""
result = {}
for field in field_names:
if field == "summary":
result["summary"] = self._build_summary()
elif field == "artifacts":
result["artifacts"] = [
{"id": aid, "type": a.artifact_type, "summary": a.summary, "size": a.size}
for aid, a in self.artifacts.items()
]
elif field == "metadata":
result["metadata"] = self.metadata
elif field == "lineage":
result["lineage"] = self.lineage
elif field == "stats":
result["stats"] = {
"artifact_count": len(self.artifacts),
"total_size": sum(a.size for a in self.artifacts.values()),
"artifact_types": list(set(a.artifact_type for a in self.artifacts.values()))
}
return result
def _build_summary(self) -> str:
parts = [f"task={self.task_id}", f"version={self.version}"]
if self.parent_task_id:
parts.append(f"parent={self.parent_task_id}")
parts.append(f"artifacts={len(self.artifacts)}")
return ' | '.join(parts)
def to_dict(self) -> dict:
return {
"task_id": self.task_id,
"parent_task_id": self.parent_task_id,
"version": self.version,
"artifacts": {aid: a.__dict__ for aid, a in self.artifacts.items()},
"metadata": self.metadata,
"lineage": self.lineage
}
数据流示意:
┌──────────────┐ ┌──────────────────┐ ┌──────────────┐
│ Task A │──────▶│ 外部存储层 │◀──────│ Task B │
│ (搜索+提取) │ │ (S3 / FS / DB) │ │ (分析+报告) │
└──────┬───────┘ └──────────────────┘ └──────▲───────┘
│ │
│ ① 存产物 + 创建信封 │
│ artifacts: [{json, s3://..., summary}] │
│ ───────────────────────────────────────────────│
│ │
└────────────────────────────────────────────────────┘
② Task B declare_fields(["artifacts", "summary"])
只收到引用 + 摘要,不下载全量数据
③ 需要细节时,通过 pointer 按需读取
三个关键设计决策:(1)引用而非复制——TaskStateEnvelope 不携带实际数据,只带指针。下游任务通过 resolve_artifact() 拿到引用后,自行决定是否按需读取。(2)字段声明——declare_fields() 让下游任务精确声明自己需要什么,避免无关数据的注入污染上下文。一个只需统计信息的任务不应该看到所有产物细节。(3)版本化与血缘——version 字段防止向前兼容问题,lineage 记录任务的祖先链,方便审计和错误回溯。
6. 完整参考实现:把所有东西串起来的 ContextProtocol
前面四节讲了四个独立组件——MessageBus、ToolResultEnvelope、MemoryContextGate、TaskStateEnvelope——它们各自解决一个子问题。但开发者需要的不是一个组件库,而是一个可以直接集成到 Agent 推理循环中的统一接口。这就是 ContextProtocol 类的职责:把四层组件整合成一个对外暴露四个核心方法(prepare_context / handle_tool_result / query_memory / persist_memory)的统一门面。
from typing import Any, Optional
from dataclasses import dataclass, field
class ContextProtocol:
"""Agent 上下文协议的统一入口。
组合 ToolResultEnvelope、MemoryContextGate、TaskStateEnvelope,
为 Agent 推理循环提供以下核心能力:
1. prepare_context() — 准备下一次 LLM 调用的完整上下文
2. handle_tool_result() — 处理单个工具结果(修剪 + 脱敏 + 路由)
3. query_memory() — 门控记忆读取
4. persist_memory() — 门控记忆写入
5. get_task_state() — 字段声明式任务状态获取
"""
def __init__(self, max_context_tokens: int = 8000,
memory_backend: Any = None,
artifact_store: Any = None):
self.max_context_tokens = max_context_tokens
self.memory_backend = memory_backend or {} # 简易 dict 后端,生产环境替换为向量数据库
self.artifact_store = artifact_store or {} # 简易 dict 后端
self.tool_context = ToolResultEnvelope # 工厂类
self.memory_gate = MemoryContextGate(max_memory_tokens=max_context_tokens // 4)
self.task_states: dict[str, TaskStateEnvelope] = {}
self._audit_log: list[dict] = []
def prepare_context(self, user_message: str,
tool_results: list[dict] = None,
task_state: Optional[TaskStateEnvelope] = None,
memory_namespace: str = None) -> list[dict]:
"""准备 LLM 调用的完整上下文。
返回可直接传给 LLM API 的 messages 列表。
执行步骤:
1. 处理所有工具结果(修剪 + 脱敏)
2. 检索并注入相关记忆
3. 注入任务状态(如果提供)
4. Token 预算控制和裁剪
"""
messages = [{"role": "system", "content": self._build_system_prompt()}]
# Step 1: 处理工具结果
if tool_results:
for tr in tool_results:
envelope = self.tool_context(
tool_call_id=tr["id"],
raw_result=tr["result"]
)
safe_chunk = (envelope
.trim(strategy="first_n")
.redact_secrets()
.to_context_chunk())
messages.append(safe_chunk)
if envelope._found_secrets:
self._audit_log.append({
"event": "secrets_redacted",
"tool_call_id": tr["id"],
"secrets": [s["label"] for s in envelope._found_secrets]
})
# Step 2: 注入记忆
if memory_namespace and self.memory_backend:
memories = self._search_memory(
user_message, memory_namespace)
should_read, score, selected = self.memory_gate.should_read(
user_message, memory_namespace, memories)
if should_read:
memory_text = "\n".join(
[f"[记忆 {i+1}] {m.content}" for i, m in enumerate(selected)])
messages.append({
"role": "system",
"content": f"相关历史记忆 (相关性: {score:.2f}):\n{memory_text}"
})
# Step 3: 注入任务状态
if task_state:
state_fields = task_state.declare_fields(
["summary", "artifacts", "stats"])
state_text = f"[任务状态] {state_fields['summary']}\n"
state_text += f"产物数: {state_fields['stats']['artifact_count']} | "
state_text += f"总大小: {state_fields['stats']['total_size']} bytes"
messages.append({"role": "system", "content": state_text})
# Step 4: 用户消息
messages.append({"role": "user", "content": user_message})
self._audit_log.append({
"event": "context_prepared",
"message_count": len(messages),
"estimted_tokens": sum(len(m["content"].split()) for m in messages)
})
return messages
def handle_tool_result(self, tool_call_id: str,
raw_result: Any,
max_tokens: int = 2000) -> dict:
"""处理单个工具结果:修剪 + 脱敏 + 生成上下文片段。"""
envelope = self.tool_context(
tool_call_id=tool_call_id,
raw_result=raw_result,
max_tokens=max_tokens
)
return (envelope
.trim(strategy="first_n")
.redact_secrets()
.to_context_chunk())
def query_memory(self, query: str,
namespace: str) -> list[dict]:
"""门控记忆读取。只返回通过相关性阈值的记忆。"""
candidates = self._search_memory(query, namespace)
_, _, selected = self.memory_gate.should_read(
query, namespace, candidates)
return [{"id": m.memory_id, "content": m.content,
"score": m.relevance_score} for m in selected]
def persist_memory(self, content: str, namespace: str) -> bool:
"""门控记忆写入。只有通过内容验证的才持久化。"""
if not self.memory_gate.should_write(content, namespace):
self._audit_log.append({
"event": "memory_write_rejected",
"namespace": namespace,
"content_preview": content[:80]
})
return False
memory_id = f"{namespace}:{hash(content) % 10**8}"
self.memory_backend[memory_id] = {
"namespace": namespace,
"content": content,
"timestamp": datetime.now().isoformat()
}
self._audit_log.append({
"event": "memory_written",
"memory_id": memory_id,
"namespace": namespace
})
return True
def get_task_state(self, task_id: str,
requested_fields: list[str] = None) -> Optional[dict]:
"""字段声明式任务状态获取。"""
envelope = self.task_states.get(task_id)
if not envelope:
return None
fields = requested_fields or ["summary", "artifacts"]
return envelope.declare_fields(fields)
def create_task_envelope(self, task_id: str,
parent_task_id: str = None) -> TaskStateEnvelope:
"""创建新的任务状态信封。"""
envelope = TaskStateEnvelope(task_id, parent_task_id)
if parent_task_id and parent_task_id in self.task_states:
envelope.lineage = (
self.task_states[parent_task_id].lineage + [parent_task_id])
self.task_states[task_id] = envelope
return envelope
def _build_system_prompt(self) -> str:
return "你是一个 AI 助手。请基于提供的上下文回答用户问题。"
def _search_memory(self, query: str, namespace: str) -> list:
"""模拟记忆搜索(生产环境替换为向量检索)。"""
prefix = self.memory_gate.get_namespace_prefix(
*self._parse_namespace(namespace))
results = []
for mid, entry in list(self.memory_backend.items())[-50:]:
if entry["namespace"].startswith(prefix):
# 简单关键词匹配模拟相关性
score = sum(1 for w in query if w in entry["content"]) / max(len(query), 1)
results.append(MemoryQueryResult(
content=entry["content"],
relevance_score=min(score, 1.0),
namespace=entry["namespace"],
memory_id=mid
))
return results
def _parse_namespace(self, namespace: str) -> tuple:
parts = namespace.split(':')
user_id = parts[0].replace('user_', '') if parts else 'default'
session_id = parts[1].replace('session_', '') if len(parts) > 1 else None
return user_id, session_id
@property
def audit_log(self) -> list[dict]:
return self._audit_log
端到端演示——一个最小化 Agent 用 2 步完成「2026 年云计算市场规模 + 增长分析」:
# 初始化协议和模拟后端
protocol = ContextProtocol(max_context_tokens=8000)
protocol.memory_backend = {} # 生产环境:替换为 ChromaDB / Pinecone
protocol.artifact_store = {} # 生产环境:替换为 S3 客户端
# ── 第 1 步:搜索 ──
# 模拟工具结果
search_result = {
"query": "2026 cloud computing market size",
"results": [
{"source": "Gartner", "market_size": "723B USD", "year": 2026},
{"source": "IDC", "market_size": "698B USD", "year": 2026},
{"source": "Statista", "market_size": "712B USD", "year": 2026}
],
"total_results": 3
}
# 处理工具结果
safe_chunk = protocol.handle_tool_result(
tool_call_id="call_search_1",
raw_result=search_result
)
# 创建任务状态信封
envelope = protocol.create_task_envelope(task_id="task_market_research")
envelope.add_artifact(
artifact_type="market_data_json",
pointer="store://market_data_2026.json",
summary="三家研究机构 2026 云计算市场规模:Gartner 723B, IDC 698B, Statista 712B",
size=512
)
# 准备第 1 步上下文
ctx1 = protocol.prepare_context(
user_message="搜索 2026 年云计算市场规模",
tool_results=[{"id": "call_search_1", "result": search_result}]
)
# → 此时 ctx1 可传入 LLM API
# ── 第 2 步:分析 ──
# 获取任务状态——只声明需要的字段
task_state = protocol.get_task_state(
"task_market_research",
requested_fields=["artifacts", "summary", "stats"]
)
# 模拟计算工具结果
calc_result = {"operation": "avg", "values": [723, 698, 712], "result": 711}
# 准备第 2 步上下文
ctx2 = protocol.prepare_context(
user_message="计算平均市场规模并分析趋势",
tool_results=[{"id": "call_calc_1", "result": calc_result}],
task_state=protocol.task_states.get("task_market_research")
)
# 持久化分析结论
protocol.persist_memory(
content="2026 云计算市场规模平均约 711B USD (来源: Gartner, IDC, Statista)",
namespace=protocol.memory_gate.resolve_namespace("user_42", "session_abc")
)
print(f"审计日志条目:{len(protocol.audit_log)} 条")
# → 审计日志条目:2 条(记录 context_prepared 事件)
ContextProtocol 的设计原则:(1)框架无关——不 import 任何 LLM SDK,只操作 dict/list 数据结构,与 OpenAI、Anthropic、DeepSeek 等任意 LLM API 兼容。(2)可替换后端——memory_backend 和 artifact_store 默认为 dict,生产环境替换为向量数据库和对象存储。(3)完整审计——每次安全决策(脱敏了什么、为什么拒绝写入记忆)都记录在 _audit_log 中。
7. 上下文协议的正确性测试与调试
上下文协议是 Agent 系统中最不可见的部分,也是最危险的部分——prompt 内容在执行时看不到,工具结果路由是否正确无法直观验证,密钥是否泄露只能等事故发生后才知道。如果你的上下文协议无法测试,你就无法信任你的 Agent。每次 LLM 调用前的上下文是 Agent 的「世界模型」——它决定了 Agent 看到什么、知道什么、能做什么。
解决方法:三个测试基元 + 审计集成。
import json
import os
from pathlib import Path
from typing import Optional
# ── 测试基元 1:密钥泄露检测 ──
def assert_no_secrets(context: list[dict],
patterns: Optional[list] = None) -> bool:
"""扫描上下文中的所有文本,确保没有 API key、token、密码模式。
Returns:
True 如果通过(无密钥),否则抛出 AssertionError 并列出所有发现。
"""
if patterns is None:
patterns = ToolResultEnvelope.DEFAULT_SECRET_PATTERNS
all_text = ""
for msg in context:
if isinstance(msg.get("content"), str):
all_text += msg["content"] + "\n"
found = []
for pattern, label in patterns:
matches = list(re.finditer(pattern, all_text, re.IGNORECASE))
if matches:
for m in matches:
found.append({
"label": label,
"position": m.start(),
"context": all_text[max(0, m.start()-20):m.end()+20]
})
if found:
raise AssertionError(
f"上下文中发现 {len(found)} 个疑似密钥/敏感字段: "
f"{json.dumps(found, indent=2, ensure_ascii=False)}"
)
return True
# ── 测试基元 2:上下文快照 ──
def dump_context_snapshot(context: list[dict], step: int,
output_dir: str = "debug/context_snapshots"):
"""在每个推理步骤后保存上下文的完整快照,用于事后调试。
快照文件命名:step_{step:04d}_{timestamp}.json
"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%H%M%S_%f")
filename = f"{output_dir}/step_{step:04d}_{timestamp}.json"
snapshot = {
"step": step,
"timestamp": datetime.now().isoformat(),
"message_count": len(context),
"estimated_tokens": sum(len(m.get("content", "").split()) for m in context),
"messages": [
{"role": m.get("role"), "content": m.get("content"),
"tool_call_id": m.get("tool_call_id")}
for m in context
]
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(snapshot, f, ensure_ascii=False, indent=2, default=str)
return filename
# ── 测试基元 3:路由追踪 ──
def trace_context_routing(protocol: ContextProtocol,
step: int) -> dict:
"""记录当前步骤的上下文路由决策。
Returns:
{step, timestamp, audit_since_last_trace, memory_gate_stats}
"""
# 获取自上次 trace 以来的审计事件
recent_events = protocol.audit_log[-20:] # 最近 20 条
tool_events = [e for e in recent_events if "tool_call_id" in e]
memory_events = [e for e in recent_events if "memory" in e.get("event", "")]
return {
"step": step,
"timestamp": datetime.now().isoformat(),
"tool_routing": [
{"id": e.get("tool_call_id"), "redacted": e.get("secrets", [])}
for e in tool_events
],
"memory_operations": [
{"event": e["event"], "namespace": e.get("namespace")}
for e in memory_events
],
"gate_stats": protocol.memory_gate.stats
}
在 Agent 推理循环中的集成示例:
# ── 调试模式下的 Agent 推理循环 ──
protocol = ContextProtocol(max_context_tokens=8000)
step = 0
for turn in conversation:
step += 1
# 准备上下文
context = protocol.prepare_context(
user_message=turn.user_message,
tool_results=turn.tool_results,
task_state=protocol.task_states.get(turn.task_id)
)
# 测试基元 1:每次 LLM 调用前自动扫描密钥
assert_no_secrets(context) # ← 如果泄露则立即抛出异常,不会进入 LLM
# 测试基元 2:保存快照
snapshot_file = dump_context_snapshot(context, step)
print(f" [debug] 步骤 {step} 上下文快照 → {snapshot_file}")
# 调用 LLM
llm_response = call_llm(context)
# 处理 LLM 的 tool calls...
# 测试基元 3:路由追踪
trace = trace_context_routing(protocol, step)
print(f" [debug] 步骤 {step} 路由追踪 → 工具: {len(trace['tool_routing'])}, "
f"记忆操作: {len(trace['memory_operations'])}")
三个测试基元的设计哲学:(1)assert_no_secrets 是安全门——在上下文进入 LLM 之前做最后的防线扫描。任何时候密钥进入了上下文,都应该立即中断而不是「记录一下继续执行」。(2)dump_context_snapshot 是黑匣子——生产环境的 Agent 出错后,工程师最痛苦的就是「当时的 prompt 到底是什么」。每一步的快照让你可以精确回放任何一次 LLM 调用。(3)trace_context_routing 是导航图——它告诉你每个工具结果去了哪里、哪些记忆被注入了、哪些被拒绝了。这对于调试「Agent 为什么没有读到那个记忆」至关重要。
常见问题(FAQ)
上下文协议和 MCP 协议是什么关系?
MCP 是外部连接协议——定义 Agent 如何发现和调用外部工具/数据源。上下文协议是内部管理协议——定义 Agent 如何在自身的工具、记忆、任务组件之间传递状态。两者互补:MCP 负责「连接什么」,上下文协议负责「传什么、怎么传」。
为什么不直接把所有东西塞进 prompt?
三个代价:① token 成本——每次 LLM 调用都携带全量历史,一个 10 步工作流的 token 成本可以轻松超过 100 万;② 安全风险——工具返回的密钥、令牌、敏感数据直接暴露给 LLM;③ 性能退化——过长的上下文降低 LLM 的注意力质量。
这个协议框架依赖特定的 LLM 或框架吗?
不依赖。ContextProtocol 是一个纯 Python 类,不 import 任何 LLM SDK 或框架。它只操作数据结构(dict/list),与任何 LLM API(OpenAI、Anthropic、DeepSeek)和任何 Agent 框架(LangChain、CrewAI、原生实现)兼容。
命名空间设计的最佳实践是什么?
推荐三层命名空间:{user_id}:{session_id}:{task_id}。用户级记忆(偏好、历史)存在 user 空间;会话级记忆(当前对话上下文)存在 session 空间;任务级记忆(单次任务的中间产物)存在 task 空间。前缀查询可以高效获取相关记忆而不污染无关数据。
如何确定 memory relevance threshold 的最佳值?
0.75 是一个好的起点(基于 embedding 余弦相似度)。高于 0.85:只注入高度相关的记忆(可能漏掉有用信息);低于 0.6:注入过多不相关记忆(token 浪费)。建议在 0.7-0.8 区间做 A/B 测试。
可以在没有外部存储的情况下使用 TaskStateEnvelope 吗?
可以。如果没有外部存储可用,可以将产物数据作为内存字典传递——把原始数据作为 pointer 值存储(比如全局 store 中的字典键)。信封的 _artifact_store 是注入的,可以是任何实现了 read() 方法的对象——包括简单的 dict 包装器。架构不强制外部存储,它是在产物太大无法放入内存时的一种可选方案。
继续阅读 / 下一步阅读
本文是 Agent Communication and Protocols 系列的第一篇。建议按以下顺序继续阅读:
- Agent 工具设计 — 工具接口、Schema 定义和错误处理规范,为上下文协议中的工具上下文层提供基础
- MCP 协议入门 — MCP 是 Agent 外部连接的标准化协议,与本文的上下文协议形成互补
- Agent 可观测性 — 上下文协议定义了「传什么」,可观测性定义了「怎么看」——两者结合才能调试生产环境 Agent
- Agent 审计日志设计 — 上下文协议的每个决策点(为什么修剪、为什么过滤)都需要记录到审计日志
- MCP 协议生产实践 — 在生产环境中管理 MCP 工具,与上下文协议协同工作
- Agent 错误恢复 — 上下文状态是错误恢复的核心——了解如何重建上下文
如果你还没有读过任何 Agent 工程文章,建议从 什么是 AI Agent 开始。