Agent 上下文协议设计:如何在工具、记忆与任务之间传递状态

30秒要点

  • 核心问题:Agent 系统在工具调用、记忆读写、任务切换时,如果没有统一的上下文传递协议,会导致提示词膨胀、密钥泄露、结果路由混乱和状态丢失。
  • 解决方案:四层上下文架构——消息总线、工具上下文、记忆上下文、任务上下文——每一层有明确的职责和数据结构。
  • 关键实现:ToolResultEnvelope(工具结果修剪+脱敏+路由)、MemoryContextGate(记忆读写门控+命名空间隔离)。
  • 读完能做什么:为你的 Agent 系统设计一套框架无关的上下文传递协议,避免常见的安全和成本陷阱。

1. 为什么没有上下文协议,Agent 系统一定会崩

先讲一个真实发生过的事故。一个团队构建了一个代码审查 Agent,用于自动扫描 GitHub 仓库的安全漏洞。Agent 通过 GitHub API 工具拉取代码,然后将结果送入 LLM 分析。一切看起来正常——直到有人在团队 Slack 频道里看到了一条完整的 GitHub Personal Access Token。

问题出在哪?Agent 调用了 list_repos 工具,工具返回的原始 JSON 里包含了认证用的 token 字段(因为 API 响应的 Authorization header 被日志中间件原样记录了)。开发者的做法是把工具返回值直接 append 到 messages 列表里,没有任何过滤:

# 常见的天真写法——直接把工具结果丢进上下文
result = execute_tool(tool_name, arguments)
messages.append({
    "role": "tool",
    "tool_call_id": call_id,
    "content": json.dumps(result)  # ← 包含 token、密码、所有原始字段
})

这个 token 进入了 LLM 的上下文窗口,然后 LLM 在生成分析报告时把它「复述」了出来。报告被自动推送到 Slack——于是 token 就公开了。

这只是上下文管理不善的冰山一角。更系统性地看,缺少上下文协议会导致五类问题同时爆发:

① 提示词膨胀(Prompt Bloat):每次 LLM 调用都把完整对话历史 + 原始工具输出 + 记忆搜索结果全塞进去。一个三步推理的任务,第二次调用可能就带着 15KB 的工具输出了,token 成本呈几何级数增长。

② 密钥泄露(Secret Leakage):工具返回的 API key、token、密码、内网地址直接进入 LLM 上下文,没有任何脱敏。一旦 LLM 在输出中复述这些信息——它们就进入了日志、通知、共享文档。

③ 工具结果路由混乱(Tool Result Confusion):并行调用 3 个工具——search_codeget_filerun_tests——三个结果同时返回,但你把它们全部无差别地追加到消息列表里。LLM 需要自己「猜」哪个结果对应哪个调用,错误率随并发数急剧上升。

④ 记忆污染(Memory Contamination):上一个任务的中间推理过程被错误地持久化到了共享记忆库,下一个不相关的任务读到这些记忆后产生了幻觉。

⑤ 任务间状态丢失(Inter-Task State Loss):多步工作流中——第一步拉取源码、第二步静态分析、第三步生成修复建议——第一步生成的产物(文件列表、依赖图)没有规范化的传递机制,到第二步时已经丢失了,只能重新计算。

这些问题有一个共同的根因:Agent 的上下文不是一个被动的数据容器,而是一个需要显式设计、结构化管理的协议层。把上下文当作「消息列表随便 append」的想法,在单步推理时还能工作,一旦进入生产环境的多工具、多任务、多记忆场景,就会系统性崩溃。

📌 核心认知:上下文协议不是优化项——它是 Agent 系统的安全边界和成本边界。没有协议,你的 Agent 不是「不够好」,而是「迟早会炸」。

2. 上下文协议的四层架构模型

开发者在 Agent 的不同组件之间传递状态时,面临一个根本性的困惑:工具输出放哪里?记忆读哪里?任务状态怎么传?没有一个统一的心智模型,每个人都在根据自己的直觉瞎搞——有的把一切都序列化成 JSON 塞进 system prompt,有的在全局变量里到处写临时状态。

经过对多个生产级 Agent 系统的分析,我们提炼出一个四层架构模型。每一层有明确的职责边界和数据契约:

Layer 1 — 消息总线(Message Bus):最底层,原始的 LLM 对话轮次存储层。包含 system prompt、user messages、assistant responses、tool call requests 和 tool call results。所有上下文最终都流向这里,但不应该直接操作这一层——应该通过上层的结构化封装来注入内容。这一层面向 LLM API 的消息格式,是上下文协议的「物理层」。

Layer 2 — 工具上下文(Tool Context):工具输入/输出的结构化封装层。核心职责:结果修剪(截断过长输出)、密钥脱敏(正则检测并替换敏感字段)、路由匹配(通过 tool_call_id 将结果对应到正确的推理步骤)。每一个工具结果在注入消息总线之前,必须先经过这层处理。

Layer 3 — 记忆上下文(Memory Context):记忆系统的读写门控层。不是所有记忆都该注入当前的推理上下文——需要相关性过滤(embedding 相似度 / BM25 评分);不是所有 Agent 输出都该被记住——需要写入门控来防止中间推理过程污染持久化记忆。同时提供命名空间隔离(user → session → task 三层)。

Layer 4 — 任务上下文(Task Context):跨步骤状态传递层。多步工作流的每一步可能产生大量中间产物——文件列表、分析结果、生成的代码。任务上下文的做法是传递产物引用而非全量复制——上一个任务只传递一个指向结果的指针(文件路径、S3 key、数据库 ID)加上结构化摘要,下一个任务按需读取。

四层之间的数据流关系如下:

                ┌──────────────────┐
                │   Task Context    │ ← 跨步骤状态信封、产物指针
                │     (Layer 4)     │
                └────────┬─────────┘
                         │ 读/写状态引用
                         ▼
                ┌──────────────────┐
                │  Memory Context   │ ← 读/写门控、命名空间隔离
                │     (Layer 3)     │
                └────────┬─────────┘
                         │ 注入/提取记忆块
                         ▼
                ┌──────────────────┐
                │   Tool Context    │ ← 修剪、脱敏、路由匹配
                │     (Layer 2)     │
                └────────┬─────────┘
                         │ 安全的上下文片段
                         ▼
                ┌──────────────────┐
                │   Message Bus     │ ← LLM 对话轮次(最终汇入)
                │     (Layer 1)     │
                └──────────────────┘

对应的核心数据结构定义:

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime

# Layer 1: Message Bus
@dataclass
class Message:
    role: str                      # "system" | "user" | "assistant" | "tool"
    content: str
    tool_call_id: Optional[str] = None
    tool_calls: Optional[list] = None

# Layer 2: Tool Context
@dataclass
class ToolCallRequest:
    id: str
    name: str
    arguments: dict

@dataclass
class ToolResultEnvelope:
    tool_call_id: str
    raw_result: Any
    trimmed_result: Optional[str] = None
    redacted: bool = False
    redacted_fields: list = field(default_factory=list)

# Layer 3: Memory Context
@dataclass
class MemoryEntry:
    namespace: str                 # "user_42:session_a:task_1"
    content: str
    embedding: Optional[list] = None
    relevance_score: float = 0.0
    timestamp: datetime = field(default_factory=datetime.now)

# Layer 4: Task Context
@dataclass
class TaskStateEnvelope:
    task_id: str
    parent_task_id: Optional[str] = None
    artifact_refs: dict = field(default_factory=dict)
    summary: str = ""
    metadata: dict = field(default_factory=dict)

这个四层模型的关键设计原则是:数据自顶向下过滤,自底向上汇聚。上层决定什么内容可以进入下层,下层只管理最终的序列化格式。每一层都可以独立测试、独立演进,不会因为改了工具脱敏规则就破坏记忆读写。

📌 对比 MCP:MCP(Model Context Protocol)解决的是「Agent 如何与外部工具/数据源通信」的问题,偏向于传输层协议。本文的上下文协议解决的是「Agent 内部各组件之间如何传递状态」的问题,偏向于应用层架构。两者是互补关系,不是替代关系。

3. 工具结果的路由、修剪与脱敏

一个推理步骤可能触发多个工具并行调用。比如 Agent 同时调用 search_filesread_logscheck_api_status。三个结果返回后,有两个关键挑战:(1)必须把每个结果正确路由回对应的 tool_call_id;(2)原始结果可能很长(100KB 的日志文件)或包含敏感数据(API key 藏在响应头里),不能原样注入 LLM 上下文。

解决方案是 ToolResultEnvelope——一个在工具结果进入消息总线之前的强制性过滤层:

import re
import json
from typing import Any, Optional, Literal

class ToolResultEnvelope:
    """工具结果的安全封装:修剪、脱敏、路由匹配。"""

    DEFAULT_SECRET_PATTERNS = [
        (r'(?:api[_-]?key|apikey|api_token|access_token|secret)["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{20,})', 'API_KEY'),
        (r'(?:password|passwd|pwd)["\s:=]+["\x27]?([^"\x27&\s]{4,})', 'PASSWORD'),
        (r'(?:token|auth)["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{16,})', 'TOKEN'),
        (r'(?:private[_-]?key|privkey)["\s:=]+["\x27]?([A-Za-z0-9+/=]{32,})', 'PRIVATE_KEY'),
        (r'(?:bearer|basic)\s+([A-Za-z0-9_\-\.=]{16,})', 'AUTH_HEADER'),
        (r'ghp_[A-Za-z0-9]{36}', 'GITHUB_TOKEN'),
        (r'glpat-[A-Za-z0-9\-]{20,}', 'GITLAB_TOKEN'),
        (r'sk-[A-Za-z0-9]{32,}', 'OPENAI_KEY'),
    ]

    def __init__(self, tool_call_id: str, raw_result: Any, max_tokens: int = 2000):
        self.tool_call_id = tool_call_id
        self.raw_result = raw_result
        self.max_tokens = max_tokens
        self._trimmed: Optional[str] = None
        self._redacted: bool = False
        self._found_secrets: list = []

    def trim(self, strategy: Literal["first_n", "field_whitelist", "summary"] = "first_n",
             whitelist_fields: Optional[list] = None) -> 'ToolResultEnvelope':
        """修剪策略:
        - first_n: 保留前 max_tokens 个 token(简单截断)
        - field_whitelist: 只保留白名单中的字段
        - summary: 对结果做结构化摘要
        """
        text = self._serialize(self.raw_result)

        if strategy == "first_n":
            tokens = text.split()
            self._trimmed = ' '.join(tokens[:self.max_tokens])
        elif strategy == "field_whitelist" and whitelist_fields:
            if isinstance(self.raw_result, dict):
                filtered = {k: v for k, v in self.raw_result.items() if k in whitelist_fields}
                self._trimmed = self._serialize(filtered)
            else:
                self._trimmed = text[:self.max_tokens * 4]
        elif strategy == "summary":
            summary_parts = [
                f"type={type(self.raw_result).__name__}",
                f"size={len(text)} chars"
            ]
            if isinstance(self.raw_result, dict):
                summary_parts.append(f"keys={list(self.raw_result.keys())[:10]}")
            elif isinstance(self.raw_result, list):
                summary_parts.append(f"count={len(self.raw_result)}")
                if self.raw_result and isinstance(self.raw_result[0], dict):
                    summary_parts.append(f"item_keys={list(self.raw_result[0].keys())[:5]}")
            self._trimmed = ' | '.join(summary_parts)

        return self

    def redact_secrets(self, patterns: Optional[list] = None) -> 'ToolResultEnvelope':
        """检测并替换敏感信息。"""
        if patterns is None:
            patterns = self.DEFAULT_SECRET_PATTERNS

        text = self._trimmed or self._serialize(self.raw_result)
        self._found_secrets = []

        for pattern, label in patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            if matches:
                for m in matches:
                    secret_val = m if isinstance(m, str) else m[0]
                    self._found_secrets.append({'label': label, 'value_preview': secret_val[:8] + '...'})
                text = re.sub(pattern, f'[{label}:REDACTED]', text, flags=re.IGNORECASE)

        self._redacted = len(self._found_secrets) > 0
        self._trimmed = text
        return self

    def to_context_chunk(self) -> dict:
        """生成可直接注入 messages 列表的安全上下文片段。"""
        content = self._trimmed or self._serialize(self.raw_result)
        return {
            "role": "tool",
            "tool_call_id": self.tool_call_id,
            "content": content
        }

    def _serialize(self, data: Any) -> str:
        if isinstance(data, str):
            return data
        try:
            return json.dumps(data, ensure_ascii=False, default=str)
        except (TypeError, ValueError):
            return str(data)

    def _detect_secrets(self, text: str) -> list:
        """独立的安全扫描,不修改文本。"""
        found = []
        for pattern, label in self.DEFAULT_SECRET_PATTERNS:
            if re.search(pattern, text, re.IGNORECASE):
                found.append(label)
        return found

使用示例——一次工具调用从原始结果到安全注入的完整流程:

# Agent 调用工具,获取原始结果
raw_result = execute_command("curl -H 'Authorization: Bearer sk-abc123...' https://api.service.com/logs")
# raw_result 可能包含 80KB 日志 + 响应头中的 token

envelope = ToolResultEnvelope(
    tool_call_id="call_7a3f",
    raw_result=raw_result,
    max_tokens=1500
)

# 链式处理:先修剪,再脱敏,最后生成上下文片段
context_chunk = (envelope
    .trim(strategy="first_n")
    .redact_secrets()
    .to_context_chunk())

# context_chunk 现在可以安全地 append 到 messages 列表
if envelope._found_secrets:
    logging.warning(f"已脱敏 {len(envelope._found_secrets)} 个敏感字段: {[s['label'] for s in envelope._found_secrets]}")

ToolResultEnvelope 的设计有三个关键决策:(1)默认拒绝——不调用 redact_secrets() 就不过滤,但没有显式过滤的结果不应该进入消息总线;(2)链式 API——修剪和脱敏是可组合的,调用顺序自由但建议先修剪再脱敏(减少正则需要扫描的文本量);(3)可审计——_found_secrets 记录所有被检测到的敏感字段,可以集成到审计日志中。

📌 与工具设计的关联:本节的 ToolResultEnvelope 是 Agent 工具设计 的补充——工具定义负责「模型什么时候用、怎么调用」,上下文协议负责「调用结果怎么安全地回到模型」。

4. 记忆访问模式与命名空间设计

Agent 和记忆的关系不是简单的「记住一切、搜索一切」。有三个核心决策点:Agent 什么时候该读取记忆?什么时候该写入记忆?如何防止任务 A 的记忆污染任务 B 的推理?如果把所有相关的记忆都注入上下文,token 预算会瞬间被撑爆——一个存了 500 条记忆的 Agent,每条 500 tokens,全部注入就是 250K tokens,直接超出大多数模型的上下文窗口。

MemoryContextGate 通过三个机制解决这些问题:读入门控(相关性过滤)、写入门控(内容验证)、命名空间隔离(多租户记忆分区)。

import hashlib
from typing import Any, Optional
from dataclasses import dataclass

@dataclass
class MemoryQueryResult:
    content: str
    relevance_score: float
    namespace: str
    memory_id: str

class MemoryContextGate:
    """记忆上下文的读写门控。"""

    def __init__(self, relevance_threshold: float = 0.75, max_memory_tokens: int = 1000):
        self.relevance_threshold = relevance_threshold
        self.max_memory_tokens = max_memory_tokens
        self._read_count = 0
        self._write_count = 0

    def should_read(self, query: str, namespace: str,
                    memories: list[MemoryQueryResult]) -> tuple[bool, float, list[MemoryQueryResult]]:
        """决定是否读取记忆,以及返回哪些记忆块。

        Returns:
            (should_read, best_score, filtered_memories)
        """
        if not memories:
            return False, 0.0, []

        # 过滤:只保留相关性 >= 阈值的记忆
        relevant = [m for m in memories if m.relevance_score >= self.relevance_threshold]

        # 按分数降序排列
        relevant.sort(key=lambda m: m.relevance_score, reverse=True)

        # Token 预算控制:累计 token 数不超过上限
        selected = []
        token_count = 0
        for mem in relevant:
            mem_tokens = len(mem.content.split())
            if token_count + mem_tokens <= self.max_memory_tokens:
                selected.append(mem)
                token_count += mem_tokens
            else:
                break

        if not selected:
            return False, 0.0, []

        self._read_count += 1
        best_score = selected[0].relevance_score
        return True, best_score, selected

    def should_write(self, content: str, namespace: str) -> bool:
        """决定是否将内容持久化到记忆库。

        只写入明确验证过的输出,拒绝:
        - 中间推理过程(过短或无结构)
        - 纯工具日志(无语义价值)
        - 重复内容(与已有记忆高度相似)
        """
        # 基本验证
        if len(content.strip()) < 50:       # 太短,可能是碎片输出
            return False
        if len(content) > 50000:            # 太长,可能是完整日志
            return False

        # 结构检查:应该包含有意义的语义内容
        # 过滤纯数字/符号内容(大概率是日志输出)
        alpha_ratio = sum(c.isalpha() for c in content) / max(len(content), 1)
        if alpha_ratio < 0.3:
            return False

        self._write_count += 1
        return True

    def resolve_namespace(self, user_id: str, session_id: Optional[str] = None,
                          task_id: Optional[str] = None) -> str:
        """构建三层命名空间:user → session → task。

        每一层可选。支持前缀查询(如查询 user_42:*:* 获取该用户所有记忆)。
        """
        parts = [f"user_{user_id}"]
        if session_id:
            parts.append(f"session_{session_id}")
            if task_id:
                parts.append(f"task_{task_id}")
        return ':'.join(parts)

    def get_namespace_prefix(self, user_id: str, session_id: Optional[str] = None) -> str:
        """获取命名空间前缀,用于记忆库的范围查询。"""
        parts = [f"user_{user_id}"]
        if session_id:
            parts.append(f"session_{session_id}")
        return ':'.join(parts) + ':'

    @property
    def stats(self) -> dict:
        return {
            'reads': self._read_count,
            'writes': self._write_count,
            'read_threshold': self.relevance_threshold,
            'max_memory_tokens': self.max_memory_tokens
        }

三个设计决策的深入说明:

读入门控——相关性不是二分的,是连续的:不是简单地「相关就注入,不相关就不注入」,而是把相关性作为一个连续值,结合 token 预算做多路召回。高相关性的记忆优先注入,中相关性的在预算充足时注入,低相关性的始终不注入。relevance_threshold=0.75 是一个经过实践检验的默认值——低于 0.6 时噪声显著增加,高于 0.85 时可能漏掉有用的弱相关记忆。相关性分数可以通过 embedding 余弦相似度或 BM25 关键词匹配来计算,取决于你的记忆存储后端。

写入门控——Agent 产生的绝大部分文本都不值得记住:一个 Agent 在一次任务中可能生成 50 轮推理过程,但真正有价值的只有最后的结论、发现的事实、用户的明确偏好。中间推理过程(「让我想想……」「根据工具结果,当前状态是……」)如果被持久化,反而会污染后续任务。写入门控的验证逻辑很简单——信息密度太低的内容不值得持久化。

命名空间隔离——三层分区的实际使用模式:user_42:session_abc:task_1 不是死板的必须三层都填。实际使用中,大部分记忆存在 user_id:session_id 层级(一个对话会话内),少部分跨会话的持久偏好存在 user_id:* 层级。三层设计让你可以灵活控制记忆的可见范围——任务级记忆只在当前任务内可见,会话级记忆跨任务共享,用户级记忆跨会话持久存在。

# 使用示例:Agent 推理循环中的记忆读写
gate = MemoryContextGate(relevance_threshold=0.75, max_memory_tokens=1000)
namespace = gate.resolve_namespace(user_id="42", session_id="abc", task_id="1")

# 读取:Agent 在做代码分析时,先检索相关记忆
query = "Python 项目的依赖管理偏好"
candidates = memory_store.search(query, namespace=gate.get_namespace_prefix("42"))
should_read, score, selected = gate.should_read(query, namespace, candidates)

if should_read:
    memory_context = "\n".join([m.content for m in selected])
    # 注入到 system prompt 或消息列表中
    system_prompt += f"\n\n用户历史偏好:\n{memory_context}"

# 写入:Agent 完成任务后,只持久化有价值的结论
agent_conclusion = "项目 A 使用 pip-tools 管理依赖,用户偏好 requirements.in + requirements.txt 模式"
if gate.should_write(agent_conclusion, namespace):
    memory_store.save(namespace, agent_conclusion)
📌 与记忆系统的关联:MemoryContextGate 是 Agent 记忆系统 中提到的三层记忆模型的应用层控制。记忆系统负责「怎么存、怎么搜」,内存门控负责「什么时候读、什么时候写、读多少」。

5. 跨任务状态传播:不让多步工作流丢失产物

Agent 的典型工作流不是单步的——它由多个步骤串联而成:搜索 → 提取 → 分析 → 报告。每一步都可能产生大量中间产物:搜索步骤返回一个 50KB 的 JSON 结果集,提取步骤从中切出 200 个文本片段,分析步骤生成统计图表和结构化结论。问题在于:这些产物怎么传给下一步?

两种常见的错误做法。第一种:把整个产物序列化后塞进上下文,传给下一步的 LLM。搜索结果的 50KB JSON 加上提取的 200 个片段,token 成本瞬间飙到 15 万以上,每一步都在几何级增长。第二种:只传一个简短摘要(「搜索返回了 348 条结果,提取了 200 个片段」),下游任务因为缺少细节而无法工作——它需要具体的数值、字段、文件名,而不是一句话概括。

正确的做法是 TaskStateEnvelope:传递产物引用,而非全量复制。核心思路:大产物存到外部(文件系统、S3、数据库),信封只携带引用指针 + 结构化摘要 + 元数据。下游任务通过「字段声明」机制精确指定自己需要什么,只注入必要的数据。

import uuid
from typing import Any, Optional
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class Artifact:
    """任务产物——外部存储的引用 + 内联摘要。"""
    artifact_id: str
    artifact_type: str              # "json" | "text" | "image" | "table" | "file"
    pointer: str                    # 外部引用:s3://bucket/key 或 /tmp/result_xxx.json
    summary: str                    # 结构化摘要:描述产物的内容和结构
    size: int                       # 大小(bytes),用于预算判断
    metadata: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.now)

class TaskStateEnvelope:
    """跨任务状态信封:传递产物引用而非全量数据。"""

    def __init__(self, task_id: str, parent_task_id: Optional[str] = None):
        self.task_id = task_id
        self.parent_task_id = parent_task_id
        self.version = 1
        self.artifacts: dict[str, Artifact] = {}       # artifact_id → Artifact
        self.metadata: dict[str, Any] = {}
        self.lineage: list[str] = []                    # 祖先任务链:[parent, grandparent, ...]
        if parent_task_id:
            self.lineage.append(parent_task_id)

    def add_artifact(self, artifact_type: str, pointer: str,
                     summary: str, size: int, metadata: dict = None) -> str:
        """添加一个产物到信封。返回 artifact_id。"""
        artifact_id = f"{self.task_id}:{artifact_type}:{uuid.uuid4().hex[:8]}"
        self.artifacts[artifact_id] = Artifact(
            artifact_id=artifact_id,
            artifact_type=artifact_type,
            pointer=pointer,
            summary=summary,
            size=size,
            metadata=metadata or {}
        )
        return artifact_id

    def resolve_artifact(self, artifact_id: str) -> Optional[Artifact]:
        """按 ID 获取产物引用。不读取实际内容——只返回指针。"""
        return self.artifacts.get(artifact_id)

    def declare_fields(self, field_names: list[str]) -> dict:
        """下游任务声明自己需要的字段——只返回请求的部分。

        field_names 可选值:
          - "summary": 信封级别的摘要
          - "artifacts": 所有产物引用(不含实际内容)
          - "metadata": 元数据
          - "lineage": 任务血缘链
          - "stats": 统计信息(产物数量、总大小)
        """
        result = {}
        for field in field_names:
            if field == "summary":
                result["summary"] = self._build_summary()
            elif field == "artifacts":
                result["artifacts"] = [
                    {"id": aid, "type": a.artifact_type, "summary": a.summary, "size": a.size}
                    for aid, a in self.artifacts.items()
                ]
            elif field == "metadata":
                result["metadata"] = self.metadata
            elif field == "lineage":
                result["lineage"] = self.lineage
            elif field == "stats":
                result["stats"] = {
                    "artifact_count": len(self.artifacts),
                    "total_size": sum(a.size for a in self.artifacts.values()),
                    "artifact_types": list(set(a.artifact_type for a in self.artifacts.values()))
                }
        return result

    def _build_summary(self) -> str:
        parts = [f"task={self.task_id}", f"version={self.version}"]
        if self.parent_task_id:
            parts.append(f"parent={self.parent_task_id}")
        parts.append(f"artifacts={len(self.artifacts)}")
        return ' | '.join(parts)

    def to_dict(self) -> dict:
        return {
            "task_id": self.task_id,
            "parent_task_id": self.parent_task_id,
            "version": self.version,
            "artifacts": {aid: a.__dict__ for aid, a in self.artifacts.items()},
            "metadata": self.metadata,
            "lineage": self.lineage
        }

数据流示意:

  ┌──────────────┐       ┌──────────────────┐       ┌──────────────┐
  │   Task A      │──────▶│  外部存储层       │◀──────│   Task B      │
  │  (搜索+提取)   │       │  (S3 / FS / DB)   │       │  (分析+报告)   │
  └──────┬───────┘       └──────────────────┘       └──────▲───────┘
         │                                                  │
         │  ① 存产物 + 创建信封                               │
         │     artifacts: [{json, s3://..., summary}]         │
         │     ───────────────────────────────────────────────│
         │                                                    │
         └────────────────────────────────────────────────────┘
                ② Task B declare_fields(["artifacts", "summary"])
                   只收到引用 + 摘要,不下载全量数据
                ③ 需要细节时,通过 pointer 按需读取

三个关键设计决策:(1)引用而非复制——TaskStateEnvelope 不携带实际数据,只带指针。下游任务通过 resolve_artifact() 拿到引用后,自行决定是否按需读取。(2)字段声明——declare_fields() 让下游任务精确声明自己需要什么,避免无关数据的注入污染上下文。一个只需统计信息的任务不应该看到所有产物细节。(3)版本化与血缘——version 字段防止向前兼容问题,lineage 记录任务的祖先链,方便审计和错误回溯。

📌 与任务编排的关联:TaskStateEnvelope 是任务编排系统(Airflow、Prefect、自研 DAG 引擎)和 Agent 推理循环之间的桥梁。编排系统负责任务的执行顺序和依赖关系,信封负责任务间的数据传递格式。

6. 完整参考实现:把所有东西串起来的 ContextProtocol

前面四节讲了四个独立组件——MessageBus、ToolResultEnvelope、MemoryContextGate、TaskStateEnvelope——它们各自解决一个子问题。但开发者需要的不是一个组件库,而是一个可以直接集成到 Agent 推理循环中的统一接口。这就是 ContextProtocol 类的职责:把四层组件整合成一个对外暴露四个核心方法(prepare_context / handle_tool_result / query_memory / persist_memory)的统一门面。

from typing import Any, Optional
from dataclasses import dataclass, field

class ContextProtocol:
    """Agent 上下文协议的统一入口。

    组合 ToolResultEnvelope、MemoryContextGate、TaskStateEnvelope,
    为 Agent 推理循环提供以下核心能力:
      1. prepare_context()   — 准备下一次 LLM 调用的完整上下文
      2. handle_tool_result() — 处理单个工具结果(修剪 + 脱敏 + 路由)
      3. query_memory()      — 门控记忆读取
      4. persist_memory()    — 门控记忆写入
      5. get_task_state()    — 字段声明式任务状态获取
    """

    def __init__(self, max_context_tokens: int = 8000,
                 memory_backend: Any = None,
                 artifact_store: Any = None):
        self.max_context_tokens = max_context_tokens
        self.memory_backend = memory_backend or {}       # 简易 dict 后端,生产环境替换为向量数据库
        self.artifact_store = artifact_store or {}        # 简易 dict 后端
        self.tool_context = ToolResultEnvelope            # 工厂类
        self.memory_gate = MemoryContextGate(max_memory_tokens=max_context_tokens // 4)
        self.task_states: dict[str, TaskStateEnvelope] = {}
        self._audit_log: list[dict] = []

    def prepare_context(self, user_message: str,
                        tool_results: list[dict] = None,
                        task_state: Optional[TaskStateEnvelope] = None,
                        memory_namespace: str = None) -> list[dict]:
        """准备 LLM 调用的完整上下文。

        返回可直接传给 LLM API 的 messages 列表。
        执行步骤:
          1. 处理所有工具结果(修剪 + 脱敏)
          2. 检索并注入相关记忆
          3. 注入任务状态(如果提供)
          4. Token 预算控制和裁剪
        """
        messages = [{"role": "system", "content": self._build_system_prompt()}]

        # Step 1: 处理工具结果
        if tool_results:
            for tr in tool_results:
                envelope = self.tool_context(
                    tool_call_id=tr["id"],
                    raw_result=tr["result"]
                )
                safe_chunk = (envelope
                    .trim(strategy="first_n")
                    .redact_secrets()
                    .to_context_chunk())
                messages.append(safe_chunk)
                if envelope._found_secrets:
                    self._audit_log.append({
                        "event": "secrets_redacted",
                        "tool_call_id": tr["id"],
                        "secrets": [s["label"] for s in envelope._found_secrets]
                    })

        # Step 2: 注入记忆
        if memory_namespace and self.memory_backend:
            memories = self._search_memory(
                user_message, memory_namespace)
            should_read, score, selected = self.memory_gate.should_read(
                user_message, memory_namespace, memories)
            if should_read:
                memory_text = "\n".join(
                    [f"[记忆 {i+1}] {m.content}" for i, m in enumerate(selected)])
                messages.append({
                    "role": "system",
                    "content": f"相关历史记忆 (相关性: {score:.2f}):\n{memory_text}"
                })

        # Step 3: 注入任务状态
        if task_state:
            state_fields = task_state.declare_fields(
                ["summary", "artifacts", "stats"])
            state_text = f"[任务状态] {state_fields['summary']}\n"
            state_text += f"产物数: {state_fields['stats']['artifact_count']} | "
            state_text += f"总大小: {state_fields['stats']['total_size']} bytes"
            messages.append({"role": "system", "content": state_text})

        # Step 4: 用户消息
        messages.append({"role": "user", "content": user_message})

        self._audit_log.append({
            "event": "context_prepared",
            "message_count": len(messages),
            "estimted_tokens": sum(len(m["content"].split()) for m in messages)
        })

        return messages

    def handle_tool_result(self, tool_call_id: str,
                           raw_result: Any,
                           max_tokens: int = 2000) -> dict:
        """处理单个工具结果:修剪 + 脱敏 + 生成上下文片段。"""
        envelope = self.tool_context(
            tool_call_id=tool_call_id,
            raw_result=raw_result,
            max_tokens=max_tokens
        )
        return (envelope
            .trim(strategy="first_n")
            .redact_secrets()
            .to_context_chunk())

    def query_memory(self, query: str,
                     namespace: str) -> list[dict]:
        """门控记忆读取。只返回通过相关性阈值的记忆。"""
        candidates = self._search_memory(query, namespace)
        _, _, selected = self.memory_gate.should_read(
            query, namespace, candidates)
        return [{"id": m.memory_id, "content": m.content,
                 "score": m.relevance_score} for m in selected]

    def persist_memory(self, content: str, namespace: str) -> bool:
        """门控记忆写入。只有通过内容验证的才持久化。"""
        if not self.memory_gate.should_write(content, namespace):
            self._audit_log.append({
                "event": "memory_write_rejected",
                "namespace": namespace,
                "content_preview": content[:80]
            })
            return False

        memory_id = f"{namespace}:{hash(content) % 10**8}"
        self.memory_backend[memory_id] = {
            "namespace": namespace,
            "content": content,
            "timestamp": datetime.now().isoformat()
        }
        self._audit_log.append({
            "event": "memory_written",
            "memory_id": memory_id,
            "namespace": namespace
        })
        return True

    def get_task_state(self, task_id: str,
                       requested_fields: list[str] = None) -> Optional[dict]:
        """字段声明式任务状态获取。"""
        envelope = self.task_states.get(task_id)
        if not envelope:
            return None
        fields = requested_fields or ["summary", "artifacts"]
        return envelope.declare_fields(fields)

    def create_task_envelope(self, task_id: str,
                             parent_task_id: str = None) -> TaskStateEnvelope:
        """创建新的任务状态信封。"""
        envelope = TaskStateEnvelope(task_id, parent_task_id)
        if parent_task_id and parent_task_id in self.task_states:
            envelope.lineage = (
                self.task_states[parent_task_id].lineage + [parent_task_id])
        self.task_states[task_id] = envelope
        return envelope

    def _build_system_prompt(self) -> str:
        return "你是一个 AI 助手。请基于提供的上下文回答用户问题。"

    def _search_memory(self, query: str, namespace: str) -> list:
        """模拟记忆搜索(生产环境替换为向量检索)。"""
        prefix = self.memory_gate.get_namespace_prefix(
            *self._parse_namespace(namespace))
        results = []
        for mid, entry in list(self.memory_backend.items())[-50:]:
            if entry["namespace"].startswith(prefix):
                # 简单关键词匹配模拟相关性
                score = sum(1 for w in query if w in entry["content"]) / max(len(query), 1)
                results.append(MemoryQueryResult(
                    content=entry["content"],
                    relevance_score=min(score, 1.0),
                    namespace=entry["namespace"],
                    memory_id=mid
                ))
        return results

    def _parse_namespace(self, namespace: str) -> tuple:
        parts = namespace.split(':')
        user_id = parts[0].replace('user_', '') if parts else 'default'
        session_id = parts[1].replace('session_', '') if len(parts) > 1 else None
        return user_id, session_id

    @property
    def audit_log(self) -> list[dict]:
        return self._audit_log

端到端演示——一个最小化 Agent 用 2 步完成「2026 年云计算市场规模 + 增长分析」:

# 初始化协议和模拟后端
protocol = ContextProtocol(max_context_tokens=8000)
protocol.memory_backend = {}    # 生产环境:替换为 ChromaDB / Pinecone
protocol.artifact_store = {}    # 生产环境:替换为 S3 客户端

# ── 第 1 步:搜索 ──
# 模拟工具结果
search_result = {
    "query": "2026 cloud computing market size",
    "results": [
        {"source": "Gartner", "market_size": "723B USD", "year": 2026},
        {"source": "IDC", "market_size": "698B USD", "year": 2026},
        {"source": "Statista", "market_size": "712B USD", "year": 2026}
    ],
    "total_results": 3
}

# 处理工具结果
safe_chunk = protocol.handle_tool_result(
    tool_call_id="call_search_1",
    raw_result=search_result
)

# 创建任务状态信封
envelope = protocol.create_task_envelope(task_id="task_market_research")
envelope.add_artifact(
    artifact_type="market_data_json",
    pointer="store://market_data_2026.json",
    summary="三家研究机构 2026 云计算市场规模:Gartner 723B, IDC 698B, Statista 712B",
    size=512
)

# 准备第 1 步上下文
ctx1 = protocol.prepare_context(
    user_message="搜索 2026 年云计算市场规模",
    tool_results=[{"id": "call_search_1", "result": search_result}]
)
# → 此时 ctx1 可传入 LLM API

# ── 第 2 步:分析 ──
# 获取任务状态——只声明需要的字段
task_state = protocol.get_task_state(
    "task_market_research",
    requested_fields=["artifacts", "summary", "stats"]
)

# 模拟计算工具结果
calc_result = {"operation": "avg", "values": [723, 698, 712], "result": 711}

# 准备第 2 步上下文
ctx2 = protocol.prepare_context(
    user_message="计算平均市场规模并分析趋势",
    tool_results=[{"id": "call_calc_1", "result": calc_result}],
    task_state=protocol.task_states.get("task_market_research")
)

# 持久化分析结论
protocol.persist_memory(
    content="2026 云计算市场规模平均约 711B USD (来源: Gartner, IDC, Statista)",
    namespace=protocol.memory_gate.resolve_namespace("user_42", "session_abc")
)

print(f"审计日志条目:{len(protocol.audit_log)} 条")
# → 审计日志条目:2 条(记录 context_prepared 事件)

ContextProtocol 的设计原则:(1)框架无关——不 import 任何 LLM SDK,只操作 dict/list 数据结构,与 OpenAI、Anthropic、DeepSeek 等任意 LLM API 兼容。(2)可替换后端——memory_backendartifact_store 默认为 dict,生产环境替换为向量数据库和对象存储。(3)完整审计——每次安全决策(脱敏了什么、为什么拒绝写入记忆)都记录在 _audit_log 中。

7. 上下文协议的正确性测试与调试

上下文协议是 Agent 系统中最不可见的部分,也是最危险的部分——prompt 内容在执行时看不到,工具结果路由是否正确无法直观验证,密钥是否泄露只能等事故发生后才知道。如果你的上下文协议无法测试,你就无法信任你的 Agent。每次 LLM 调用前的上下文是 Agent 的「世界模型」——它决定了 Agent 看到什么、知道什么、能做什么。

解决方法:三个测试基元 + 审计集成。

import json
import os
from pathlib import Path
from typing import Optional

# ── 测试基元 1:密钥泄露检测 ──
def assert_no_secrets(context: list[dict],
                      patterns: Optional[list] = None) -> bool:
    """扫描上下文中的所有文本,确保没有 API key、token、密码模式。

    Returns:
        True 如果通过(无密钥),否则抛出 AssertionError 并列出所有发现。
    """
    if patterns is None:
        patterns = ToolResultEnvelope.DEFAULT_SECRET_PATTERNS

    all_text = ""
    for msg in context:
        if isinstance(msg.get("content"), str):
            all_text += msg["content"] + "\n"

    found = []
    for pattern, label in patterns:
        matches = list(re.finditer(pattern, all_text, re.IGNORECASE))
        if matches:
            for m in matches:
                found.append({
                    "label": label,
                    "position": m.start(),
                    "context": all_text[max(0, m.start()-20):m.end()+20]
                })

    if found:
        raise AssertionError(
            f"上下文中发现 {len(found)} 个疑似密钥/敏感字段: "
            f"{json.dumps(found, indent=2, ensure_ascii=False)}"
        )
    return True


# ── 测试基元 2:上下文快照 ──
def dump_context_snapshot(context: list[dict], step: int,
                          output_dir: str = "debug/context_snapshots"):
    """在每个推理步骤后保存上下文的完整快照,用于事后调试。

    快照文件命名:step_{step:04d}_{timestamp}.json
    """
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now().strftime("%H%M%S_%f")
    filename = f"{output_dir}/step_{step:04d}_{timestamp}.json"

    snapshot = {
        "step": step,
        "timestamp": datetime.now().isoformat(),
        "message_count": len(context),
        "estimated_tokens": sum(len(m.get("content", "").split()) for m in context),
        "messages": [
            {"role": m.get("role"), "content": m.get("content"),
             "tool_call_id": m.get("tool_call_id")}
            for m in context
        ]
    }

    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(snapshot, f, ensure_ascii=False, indent=2, default=str)

    return filename


# ── 测试基元 3:路由追踪 ──
def trace_context_routing(protocol: ContextProtocol,
                          step: int) -> dict:
    """记录当前步骤的上下文路由决策。

    Returns:
        {step, timestamp, audit_since_last_trace, memory_gate_stats}
    """
    # 获取自上次 trace 以来的审计事件
    recent_events = protocol.audit_log[-20:]  # 最近 20 条

    tool_events = [e for e in recent_events if "tool_call_id" in e]
    memory_events = [e for e in recent_events if "memory" in e.get("event", "")]

    return {
        "step": step,
        "timestamp": datetime.now().isoformat(),
        "tool_routing": [
            {"id": e.get("tool_call_id"), "redacted": e.get("secrets", [])}
            for e in tool_events
        ],
        "memory_operations": [
            {"event": e["event"], "namespace": e.get("namespace")}
            for e in memory_events
        ],
        "gate_stats": protocol.memory_gate.stats
    }

在 Agent 推理循环中的集成示例:

# ── 调试模式下的 Agent 推理循环 ──
protocol = ContextProtocol(max_context_tokens=8000)
step = 0

for turn in conversation:
    step += 1

    # 准备上下文
    context = protocol.prepare_context(
        user_message=turn.user_message,
        tool_results=turn.tool_results,
        task_state=protocol.task_states.get(turn.task_id)
    )

    # 测试基元 1:每次 LLM 调用前自动扫描密钥
    assert_no_secrets(context)  # ← 如果泄露则立即抛出异常,不会进入 LLM

    # 测试基元 2:保存快照
    snapshot_file = dump_context_snapshot(context, step)
    print(f"  [debug] 步骤 {step} 上下文快照 → {snapshot_file}")

    # 调用 LLM
    llm_response = call_llm(context)

    # 处理 LLM 的 tool calls...

    # 测试基元 3:路由追踪
    trace = trace_context_routing(protocol, step)
    print(f"  [debug] 步骤 {step} 路由追踪 → 工具: {len(trace['tool_routing'])}, "
          f"记忆操作: {len(trace['memory_operations'])}")

三个测试基元的设计哲学:(1)assert_no_secrets 是安全门——在上下文进入 LLM 之前做最后的防线扫描。任何时候密钥进入了上下文,都应该立即中断而不是「记录一下继续执行」。(2)dump_context_snapshot 是黑匣子——生产环境的 Agent 出错后,工程师最痛苦的就是「当时的 prompt 到底是什么」。每一步的快照让你可以精确回放任何一次 LLM 调用。(3)trace_context_routing 是导航图——它告诉你每个工具结果去了哪里、哪些记忆被注入了、哪些被拒绝了。这对于调试「Agent 为什么没有读到那个记忆」至关重要。

📌 核心认知:如果你不能测试你的上下文协议,你就无法信任你的 Agent。这三个测试基元——密钥扫描、快照保存、路由追踪——应该集成到每一个 Agent 推理循环中,作为非功能性的基础设施而非可选功能。

常见问题(FAQ)

上下文协议和 MCP 协议是什么关系?

MCP 是外部连接协议——定义 Agent 如何发现和调用外部工具/数据源。上下文协议是内部管理协议——定义 Agent 如何在自身的工具、记忆、任务组件之间传递状态。两者互补:MCP 负责「连接什么」,上下文协议负责「传什么、怎么传」。

为什么不直接把所有东西塞进 prompt?

三个代价:① token 成本——每次 LLM 调用都携带全量历史,一个 10 步工作流的 token 成本可以轻松超过 100 万;② 安全风险——工具返回的密钥、令牌、敏感数据直接暴露给 LLM;③ 性能退化——过长的上下文降低 LLM 的注意力质量。

这个协议框架依赖特定的 LLM 或框架吗?

不依赖。ContextProtocol 是一个纯 Python 类,不 import 任何 LLM SDK 或框架。它只操作数据结构(dict/list),与任何 LLM API(OpenAI、Anthropic、DeepSeek)和任何 Agent 框架(LangChain、CrewAI、原生实现)兼容。

命名空间设计的最佳实践是什么?

推荐三层命名空间:{user_id}:{session_id}:{task_id}。用户级记忆(偏好、历史)存在 user 空间;会话级记忆(当前对话上下文)存在 session 空间;任务级记忆(单次任务的中间产物)存在 task 空间。前缀查询可以高效获取相关记忆而不污染无关数据。

如何确定 memory relevance threshold 的最佳值?

0.75 是一个好的起点(基于 embedding 余弦相似度)。高于 0.85:只注入高度相关的记忆(可能漏掉有用信息);低于 0.6:注入过多不相关记忆(token 浪费)。建议在 0.7-0.8 区间做 A/B 测试。

可以在没有外部存储的情况下使用 TaskStateEnvelope 吗?

可以。如果没有外部存储可用,可以将产物数据作为内存字典传递——把原始数据作为 pointer 值存储(比如全局 store 中的字典键)。信封的 _artifact_store 是注入的,可以是任何实现了 read() 方法的对象——包括简单的 dict 包装器。架构不强制外部存储,它是在产物太大无法放入内存时的一种可选方案。

本文是 Agent Communication and Protocols 系列的第一篇。建议按以下顺序继续阅读:

如果你还没有读过任何 Agent 工程文章,建议从 什么是 AI Agent 开始。