Agent 上下文窗口管理:如何压缩、保留和淘汰任务信息
30秒要点
- 核心问题:Agent 在生产环境中运行超过 50 步工具调用或数小时后,上下文窗口必然溢出。即使模型支持 128K token,上下文衰减(context rot)也会让 Agent 在长任务中越跑越差——这不是换个更大的模型能解决的。
- 解决方案:一套完整的上下文窗口生命周期管理系统——感知压力(何时介入)→ 淘汰(扔掉什么)→ 压缩(不丢状态地缩小)→ 跨窗口状态保持(重置后继续工作)。每一步都有代码实现和决策框架。
- 关键实现:ContextWindowManager 编排器整合 6 个子系统——ContextPressureMonitor、EvictionEngine(6 种淘汰策略)、CompressionEngine(5 种压缩策略)、TokenBudgetManager、CrossWindowStateManager、ContextHealthMonitor。含 8 段完整 Python 代码。
- 读完能做什么:为你的 Agent 实现一套生产级的上下文窗口管理——在上下文耗尽前主动压缩,在冗余产生时精准淘汰,在窗口重置后无缝恢复。告别「Agent 跑着跑着就崩了」的生产事故。
1. 为什么上下文窗口管理是第一优先级工程问题
一个代码审查 Agent 在处理一个大型重构任务。用户要求:"把 user-service 里所有 REST API 调用从 axios 迁移到 fetch,同时保持错误处理逻辑不变。"Agent 开始工作——第 1 步搜索所有 axios 引用,第 5 步分析错误处理模式,第 15 步开始逐文件替换,第 30 步运行测试发现 break。到第 40 步时,Agent 的行为开始变得奇怪——它反复修改同一个文件而不推进任务,忘记了自己在第 15 步已经分析过的依赖关系,甚至在 50 步后直接抛出 model_context_window_exceeded 错误。
这不是 Agent 能力的问题。这是上下文窗口被当作无限资源来使用的结果。第 1 步的 axios 搜索结果、第 5 步的错误处理模式分析、第 15 步的逐文件依赖关系——所有内容都被忠实地追加到上下文窗口中。128K 的窗口看似巨大,但经过 40 轮的累积,系统提示词、工具定义、对话历史、工具调用结果已经把窗口塞满。Agent 在第 41 步开始丢失关键上下文,在第 50 步撞墙。
五种「不管上下文窗口」的失败模式
在生产环境中,简单地"把所有历史消息塞进上下文"会触发五类问题:
- 上下文衰减(Context Rot):LLM 的注意力机制中,每对 token 之间都存在注意力关系(n² 复杂度)。当上下文扩展到数万 token 时,模型对每条信息的平均注意力被稀释。Chroma 的研究证实:即使模型声称支持 128K token,在 64K+ 的上下文长度下,GPT-4 开始产生幻觉性推断,Claude 倾向于过度谨慎地拒绝回答。这不是模型 bug,是注意力预算耗尽的物理规律。
- Token 成本线性膨胀:每轮对话都把历史完整发送给 LLM——第 1 轮成本 $0.01,第 50 轮成本 $0.80。一个 50 步的 Agent 任务的 token 成本是指数级增长的,其中 80% 的 token 都花在了已经不需要看的内容上。
- 溢出崩溃(Overflow Crash):最直接的失败模式。Agent 在第 N 步时上下文窗口被完全填满,LLM API 返回错误。任务中断,已执行的前 N-1 步工作全部丢失——因为没有跨窗口的状态保持机制。
- 僵尸信息污染(Zombie Information):旧的工具调用结果、已解决的讨论、被放弃的探索路径——这些内容永远不会被自动清理,一直占据上下文窗口。更糟的是,LLM 可能被这些过时信息误导——"你之前在第 3 步已经决定用 axios 了呀"(这是 30 步前被推翻的决定)。
- 会话失忆(Session Amnesia):Agent 因为上下文溢出被强制重启,新的上下文窗口从头开始——之前的分析、决策、进度全部丢失。它不得不在第 51 步重新搜索 axios 引用,重新分析错误处理模式,重新建立依赖关系图。
关键洞察:上下文窗口是 Agent 的「工作记忆」——有限、昂贵、需要主动管理。把它当成无限的消息队列是对 LLM 工作机制的根本性误解。换个更大的模型(从 128K 到 200K)只是把撞墙的时间推迟了几个小时,不能解决上下文衰减和注意力稀释的问题。解决上下文窗口问题的正确方法不是「买更大的窗口」,而是「在窗口内做更好的管理」。
三个管理维度:压缩、淘汰、委托
本文提出的上下文窗口管理系统围绕三个操作维度:
维度 1 — 压缩 (Compress):缩小上下文而不丢失关键状态
策略:对话压缩 / 结构化笔记 / 工具结果摘要 / 渐进压缩 / 子Agent委托
维度 2 — 淘汰 (Evict):移除不再有用的内容
策略:FIFO / LRU / 优先级 / 语义相似度合并 / 按类型 / 混合
维度 3 — 委托 (Delegate):把工作分发到独立的上下文中
策略:子Agent上下文隔离 → 浓缩结果返回 → 主Agent上下文保持清爽
这三个维度不对等——压缩保留信息(以浓缩形式),淘汰删除信息,委托移动信息到独立空间。一个成熟的上下文窗口管理系统在所有三个维度上都有策略,并在运行时根据上下文压力动态选择。
关于上下文窗口管理的上游和下游:Agent 记忆系统设计 定义了 L0-L3 四层记忆架构(L0 就是上下文窗口),本文是 L0 层的操作手册——管理 L0 的大小和内容。记忆系统设计是「仓库建筑师」,本文是「仓库管理员」——决定什么该留在货架上、什么该打包压缩、什么该搬走。
2. 感知上下文压力:什么时候该介入管理
上下文窗口管理的第一步不是「怎么管理」,而是「什么时候该管理」。过早介入浪费计算资源——把还不需要压缩的上下文提前压缩了;过晚介入则风险不可控——上下文已经接近溢出,任何一步额外的工具调用都可能触发崩溃。需要一个精确的压力感知系统。
上下文压力曲线:四个区间
把上下文利用率(当前 tokens / 最大窗口 tokens)映射为四个压力区间:
利用率范围 压力等级 动作
───────────────────────────────────────────────
0% – 50% 🟢 绿色 正常运行,无需干预
50% – 75% 🟡 黄色 预备——评估淘汰候选、预计算压缩成本
75% – 90% 🟠 橙色 行动——执行淘汰,准备压缩
90% – 100% 🔴 红色 紧急——强制压缩或触发跨窗口保存
这些阈值不是随意的。绿色区提供了足够的缓冲——Agent 可以安全地执行多步工具调用而不必担心溢出。黄色区是「感知但不行动」——系统开始收集淘汰候选的优先级分数,但不执行任何修改。橙色区是「这里必须做点什么」——淘汰引擎启动,低优先级内容被移除。红色区是「再不处理就崩了」——如果压缩也来不及,则触发跨窗口状态序列化,在当前窗口崩溃前把状态保存下来。
压力速度:不只是看绝对值
上下文利用率的绝对值("当前 65%")是静态快照。更重要的是压力速度——每轮对话消耗多少 token。一个快速膨胀的 Agent(每轮消耗 3000+ token,因为工具返回大量输出)和一个缓慢膨胀的 Agent(每轮 500 token,因为工具输出简洁)即使在同一利用率下,处理策略也完全不同。
压力速度还暴露了一个隐藏的危险信号:runaway agent loop。如果压力速度突然从 500 token/轮飙升到 5000 token/轮,通常是 Agent 陷入了某种循环——反复重试失败的请求、重复调用返回大量数据的工具、或 LLM 开始生成越来越长的冗餘推理。这些情况下,上下文管理应该与错误恢复联动(见 Agent 错误恢复)。
代码:ContextPressureMonitor
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable
from enum import Enum
class PressureLevel(Enum):
GREEN = "green" # 0%-50%: 正常
YELLOW = "yellow" # 50%-75%: 预备
ORANGE = "orange" # 75%-90%: 行动
RED = "red" # 90%-100%: 紧急
@dataclass
class ContextPressureMonitor:
"""上下文压力感知器——检测利用率、跟踪膨胀速度、发出预警"""
max_context_tokens: int = 128000 # 模型最大上下文窗口(token)
current_tokens: int = 0 # 当前上下文中的 token 数
# 压力区间阈值(可配置)
threshold_yellow: float = 0.50 # 50% → 黄色
threshold_orange: float = 0.75 # 75% → 橙色
threshold_red: float = 0.90 # 90% → 红色
# 速度跟踪(tokens/轮)
velocity_window_size: int = 5 # 取最近 N 轮的平均值
_token_history: list = field(default_factory=list)
_turn_count: int = 0
# 预警回调
on_yellow: Optional[Callable] = None # 进入黄色区时调用
on_orange: Optional[Callable] = None # 进入橙色区时调用
on_red: Optional[Callable] = None # 进入红色区时调用
on_velocity_spike: Optional[Callable] = None # 膨胀速度异常时调用
_current_level: PressureLevel = PressureLevel.GREEN
_previous_level: PressureLevel = PressureLevel.GREEN
def update(self, tokens_added_this_turn: int):
"""每轮推理结束后调用——更新当前 token 计数和压力评估"""
self.current_tokens += tokens_added_this_turn
self._turn_count += 1
self._token_history.append(tokens_added_this_turn)
# 只保留最近 N 轮的速度历史
if len(self._token_history) > self.velocity_window_size:
self._token_history.pop(0)
# 评估当前压力等级
utilization = self.utilization()
if utilization >= self.threshold_red:
self._current_level = PressureLevel.RED
elif utilization >= self.threshold_orange:
self._current_level = PressureLevel.ORANGE
elif utilization >= self.threshold_yellow:
self._current_level = PressureLevel.YELLOW
else:
self._current_level = PressureLevel.GREEN
# 检测等级变化并触发回调
if self._current_level != self._previous_level:
self._trigger_level_change()
# 检测膨胀速度异常
self._check_velocity_spike()
self._previous_level = self._current_level
def utilization(self) -> float:
"""当前上下文利用率(0.0 - 1.0)"""
if self.max_context_tokens <= 0:
return 0.0
return min(1.0, self.current_tokens / self.max_context_tokens)
def velocity(self) -> float:
"""最近 N 轮的平均 token 消耗速度(tokens/轮)"""
if not self._token_history:
return 0.0
return sum(self._token_history) / len(self._token_history)
def turns_until_overflow(self) -> Optional[int]:
"""预测距离溢出还有多少轮(-1 表示已溢出)"""
if self.current_tokens >= self.max_context_tokens:
return -1
vel = self.velocity()
if vel <= 0:
return None # 无法预测
remaining = self.max_context_tokens - self.current_tokens
return int(remaining / vel)
def tokens_remaining(self) -> int:
"""剩余可用 token 数"""
return max(0, self.max_context_tokens - self.current_tokens)
def pressure_summary(self) -> dict:
"""生成当前压力状态的摘要报告"""
return {
"utilization": round(self.utilization(), 3),
"current_tokens": self.current_tokens,
"tokens_remaining": self.tokens_remaining(),
"pressure_level": self._current_level.value,
"velocity": round(self.velocity(), 1),
"turns_until_overflow": self.turns_until_overflow(),
"turn_count": self._turn_count,
}
def _trigger_level_change(self):
"""压力等级发生变化——触发对应的回调"""
callbacks = {
PressureLevel.YELLOW: self.on_yellow,
PressureLevel.ORANGE: self.on_orange,
PressureLevel.RED: self.on_red,
}
cb = callbacks.get(self._current_level)
if cb:
cb(self.pressure_summary())
def _check_velocity_spike(self):
"""检测膨胀速度是否异常飙升(> 3x 历史平均)"""
if len(self._token_history) < self.velocity_window_size:
return
current = self._token_history[-1]
historical_avg = sum(self._token_history[:-1]) / max(len(self._token_history) - 1, 1)
if historical_avg > 0 and current > historical_avg * 3:
if self.on_velocity_spike:
self.on_velocity_spike({
"current_turn_tokens": current,
"historical_avg": round(historical_avg, 1),
"spike_ratio": round(current / historical_avg, 2),
})
# ─── 使用示例 ───
monitor = ContextPressureMonitor(
max_context_tokens=128000,
on_yellow=lambda s: print(f"[YELLOW] 利用率 {s['utilization']:.1%}, 预估 {s['turns_until_overflow']} 轮后溢出"),
on_orange=lambda s: print(f"[ORANGE] 触发淘汰,利用率 {s['utilization']:.1%}"),
on_red=lambda s: print(f"[RED] 紧急压缩,利用率 {s['utilization']:.1%}"),
on_velocity_spike=lambda s: print(f"[SPIKE] 膨胀速度异常: {s['spike_ratio']}x"),
)
# 模拟 20 轮 Agent 循环,每轮消耗 2000 token
for turn in range(20):
monitor.update(2000)
if turn % 5 == 0:
print(monitor.pressure_summary())
这个监控器的核心价值在于预测性——它不只告诉你「现在用了多少」,还告诉你「按当前速度,还有 N 轮就会溢出」。这给了上级系统充足的反应时间。当 velocity_spike 被触发时,它可以与 Agent 错误恢复 中的循环检测机制联动——识别 Agent 是否陷入了 runaway loop。
3. 淘汰策略:上下文装不下了,扔掉什么
当上下文压力进入橙色区(75%+),淘汰引擎启动。这是上下文管理中最需要判断力的环节——扔错了,Agent 后续推理失去关键依据;扔少了,剩余空间不够用。淘汰不是一个「找到就删」的简单操作,而是一个多维度的排序和选择问题。
上下文内容的分类
在讨论「淘汰什么」之前,先明确上下文中有哪些类型的内容:
| 内容类型 | 典型占比 | 淘汰风险 | 说明 |
|---|---|---|---|
| 系统提示词 | 5-10% | 🚫 绝不淘汰 | 定义了 Agent 的行为框架。部分淘汰会导致 Agent 行为异常 |
| 用户消息 | 5-15% | ⚠️ 极高风险 | 包含初始任务目标和最新指令。早期用户消息可以压缩但不应彻底删除 |
| Assistant 响应 | 20-40% | ⚠️ 中高风险 | 包含推理链和决策记录。旧的、已完成的推理可以压缩或淘汰 |
| 工具调用结果 | 30-50% | 🟢 主要淘汰目标 | 文件内容、API 响应、搜索结果——通常体量最大、时效性最强 |
| 思考块(Thinking) | 5-15% | 🟢 次要淘汰目标 | 扩展思考模式的中间推理——完成工具调用周期后可以安全清除 |
| 记忆注入(Memory) | 5-10% | ⚠️ 中低风险 | 从 L1/L2 拉取的相关记忆——有明确 TTL,过期后可安全移除 |
六种淘汰策略
策略 1 — FIFO(先进先出)
核心思想:最早进入上下文的消息最先被淘汰。经典滚动窗口。
优点:实现极简(O(1)),行为可预测。
缺点:无视内容重要性——Agent 运行 50 步后初始任务目标被淘汰。
适用:对话式聊天。不适用:需要始终牢记初始目标的 Agent 任务。
策略 2 — LRU(最近最少引用)
核心思想:跟踪哪些内容块在后续推理中被引用。未被重新引用的先淘汰。
优点:自然清除「死胡同探索」。
缺点:需要引用跟踪基础设施,冷启动问题。
适用:探索性 Agent 任务——代码搜索、数据分析。
策略 3 — 优先级淘汰
核心思想:给每个内容块打重要性分数,淘汰最低分的。评分维度包括来源权重(用户消息 > 工具结果)、新鲜度、语义相关性、显式标记。
优点:最灵活。
缺点:评分函数需要调优。
适用:异构内容混合的复杂 Agent 任务。
策略 4 — 语义相似度合并
核心思想:相似度超过 0.85 的两个块合并为一条摘要,token 减半。
优点:保留信息(而非删除),自然去重。
缺点:需要额外 LLM 调用生成合并摘要。
适用:监控/轮询类 Agent——反复获取相似信息。
策略 5 — 按类型淘汰
核心思想:按内容类型的预设规则淘汰——「工具结果超过 10 轮清除」「思考块每轮清除」。
优点:反映自然的内容生命周期。
缺点:规则静态,可能误删关键工具结果。
适用:工具密集型 Agent,配合显式保护标记使用。
策略 6 — 混合淘汰
核心思想:多维度加权评分——总分 = 0.3 × freshness + 0.3 × priority + 0.2 × type_ttl + 0.2 × references。
优点:最鲁棒。
缺点:权重调优复杂。
适用:生产级 Agent——追求稳定可靠。
绝对不能淘汰的内容
- 系统提示词——部分淘汰导致 Agent 行为不可预测
- 当前轮的用户消息——Agent 正在处理这条指令
- 正在执行的工具调用结果——Agent 需要根据结果决定下一步
- 显式保护标记——开发者在提示中标记的
<preserve>块
代码:EvictionEngine — 可插拔的淘汰策略
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import math
class ContentType(Enum):
SYSTEM = "system"
USER_MESSAGE = "user_message"
ASSISTANT = "assistant"
TOOL_RESULT = "tool_result"
THINKING = "thinking"
MEMORY_INJECTION = "memory_injection"
@dataclass
class ContextBlock:
"""上下文中的一个内容块"""
block_id: str
content_type: ContentType
content: str
token_count: int
turn_added: int # 在哪一轮加入的
importance_score: float = 0.5
reference_count: int = 0 # 被后续推理引用的次数
embedding: Optional[list] = None
protected: bool = False # 是否在保护名单上
@dataclass
class EvictionEngine:
"""淘汰引擎——6 种可插拔策略"""
# 混合策略权重
weight_freshness: float = 0.3
weight_priority: float = 0.3
weight_type: float = 0.2
weight_reference: float = 0.2
# 语义相似度合并阈值
similarity_merge_threshold: float = 0.85
# 类型级 TTL(保留轮数)
type_ttl: dict = field(default_factory=lambda: {
ContentType.TOOL_RESULT: 10,
ContentType.THINKING: 3,
ContentType.MEMORY_INJECTION: 5,
})
# 保护名单
protected_types: set = field(default_factory=lambda: {
ContentType.SYSTEM,
})
def score_fifo(self, block: ContextBlock, current_turn: int) -> float:
"""FIFO——越早加入的分数越低"""
age = current_turn - block.turn_added
return 1.0 / (1.0 + age)
def score_lru(self, block: ContextBlock) -> float:
"""LRU——被引用越多次分数越高"""
if block.reference_count == 0:
return 0.1 # 冷启动保护
return min(1.0, block.reference_count / 5.0)
def score_priority(self, block: ContextBlock,
current_task_embedding=None) -> float:
"""优先级——来源权重 + 语义相关性"""
s = block.importance_score
source_bonus = {
ContentType.USER_MESSAGE: 0.3,
ContentType.SYSTEM: 1.0,
ContentType.ASSISTANT: 0.1,
ContentType.TOOL_RESULT: 0.0,
ContentType.THINKING: -0.1,
ContentType.MEMORY_INJECTION: 0.05,
}
s += source_bonus.get(block.content_type, 0.0)
if current_task_embedding and block.embedding:
similarity = self._cosine_similarity(
current_task_embedding, block.embedding)
s += similarity * 0.2
return max(0.0, min(1.0, s))
def score_type_based(self, block: ContextBlock,
current_turn: int) -> float:
"""按类型——超过 TTL 则分数归零"""
ttl = self.type_ttl.get(block.content_type)
if ttl is None:
return 0.5
age = current_turn - block.turn_added
if age > ttl:
return 0.0
return 1.0 - (age / ttl)
def score_hybrid(self, block: ContextBlock, current_turn: int,
current_task_embedding=None) -> float:
"""混合策略——加权组合所有维度"""
return (
self.weight_freshness * self.score_fifo(block, current_turn) +
self.weight_priority * self.score_priority(
block, current_task_embedding) +
self.weight_type * self.score_type_based(block, current_turn) +
self.weight_reference * self.score_lru(block)
)
def select_eviction_candidates(
self, blocks: list, current_turn: int,
target_tokens_to_free: int, policy: str = "hybrid",
current_task_embedding=None) -> list:
"""选择淘汰候选——返回应被淘汰的 block_id 列表"""
evictable = [b for b in blocks
if b.content_type not in self.protected_types
and not b.protected]
if not evictable:
return []
score_fn = {
"fifo": lambda b: self.score_fifo(b, current_turn),
"lru": lambda b: self.score_lru(b),
"priority": lambda b: self.score_priority(
b, current_task_embedding),
"type_based": lambda b: self.score_type_based(b, current_turn),
"hybrid": lambda b: self.score_hybrid(
b, current_turn, current_task_embedding),
}.get(policy, lambda b: self.score_hybrid(
b, current_turn, current_task_embedding))
evictable.sort(key=score_fn) # 低分优先淘汰
selected = []
freed = 0
for block in evictable:
if freed >= target_tokens_to_free:
break
selected.append(block.block_id)
freed += block.token_count
return selected
def _cosine_similarity(self, a: list, b: list) -> float:
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(y * y for y in b))
return dot / (na * nb) if na and nb else 0.0
# ─── 使用示例 ───
engine = EvictionEngine()
blocks = [
ContextBlock("sys1", ContentType.SYSTEM, "你是代码审查 Agent", 200, 0,
importance_score=1.0, protected=True),
ContextBlock("usr1", ContentType.USER_MESSAGE, "迁移 axios→fetch", 50, 0,
importance_score=0.9),
ContextBlock("tool1", ContentType.TOOL_RESULT, "grep: 45 个 axios 引用...", 1500, 1,
importance_score=0.5, reference_count=3),
ContextBlock("think1", ContentType.THINKING, "考虑拦截器...", 200, 4,
importance_score=0.2),
]
candidates = engine.select_eviction_candidates(
blocks, current_turn=10, target_tokens_to_free=2000, policy="hybrid")
print(f"淘汰候选: {candidates}")
# → ['think1', 'tool1'](分数最低的两个,sys1 受保护不受影响)
淘汰引擎的关键设计决策:贪心选择而非全局最优。每个内容块独立评分,从低到高累加直到满足目标释放量。这不是数学上的最优解(没有考虑淘汰 A 后 B 的语义相关性变化),但它是工程上的可行解——计算复杂度 O(n log n),在大规模上下文中可以实时执行。
关于内容类型与上下文信封的对应关系:Agent 上下文协议设计 定义了每种内容如何被打包成上下文信封(Envelope)。淘汰引擎操作在信封粒度——决定哪些信封保留、哪些从窗口中移除。
4. 压缩策略:不丢状态地缩小上下文
淘汰是「删除」——信息永久消失。压缩是「浓缩」——信息减少体积但保留关键内容。在上下文压力达到橙色区时,应该先尝试压缩再淘汰——因为压缩有概率保留信息,淘汰则确定丢失。压缩也不只是「把对话摘要一下」——针对不同类型的内容和不同的压缩需求,有五种策略。
策略 1 — 对话压缩(Conversation Compaction)
核心思想:将整个对话历史发给 LLM,要求其生成一份浓缩摘要。然后清空上下文窗口,以摘要 + 最近 N 条消息重新开始。
压缩提示词的核心原则:不是「总结这段对话」,而是「从这段对话中提取什么必须保留」。一个生产级的压缩提示词需要明确区分:
- 必须保留:架构决策(「我们决定用 Postgres 而非 MongoDB」)、未解决的问题(「为什么测试 3 失败——还没找到原因」)、当前进度(「已完成 7/12 步,第 8 步正在进行」)、学到的关键教训(「v3.0 API 端点名称全部从 /v1/ 改为 /v2/」)
- 可以丢弃:冗余的工具输出(同一个 grep 执行了 3 次,结果相同)、已解决的讨论(「第 3 步时讨论了用 Redis 还是 Memcached,最终选了 Redis……相关内容可丢弃」)、死胡同探索(「尝试了用 orm.query() 但行不通——放弃这个方向」)、冗长的日志输出
压缩提示词的设计陷阱:最常见的错误是使用通用摘要提示词(如「请总结以下对话」)。LLM 会把对话总结成自然语言叙述——「用户要求迁移 axios 到 fetch,Agent 搜索了代码库,发现了 45 个引用……」——这种摘要可读但不可操作。新窗口启动后,Agent 读了这个摘要仍然不知道下一步该干什么。正确的压缩提示词应该以结构化的方式要求输出:任务状态、架构决策、开放问题、下一步。
策略 2 — 结构化笔记(Structured Note-Taking)
核心思想:Agent 在任务执行过程中主动向外部文件(如 NOTES.md、progress.json)写入结构化笔记。压缩时,对话历史被丢弃,只保留笔记文件的内容注入新窗口。
这个模式源自生产级 Agent 的实践经验。Anthropic 的 Claude Code 使用 to-do 列表和架构决策记录;Claude Plays Pokémon 使用训练进度日志和已探索地图。笔记文件的本质是「冷启动就绪」——一个新 Agent 会话读取笔记后应该能立即知道状态并继续工作。
笔记写入的时机:
- 每个关键决策之后——「决定不修改 users 表 schema」
- 每个子任务完成后——「已完成 breaking changes 检查,发现 3 个不兼容 API」
- 发现重要教训时——「v3.0 移除了 /api/v1/users,改用 /api/v2/users」
- 压缩前——在触发压缩之前,先把所有未写入笔记的关键信息写入
笔记模式与 Agent 记忆系统设计 中 L1 → L2 的提升机制互补——笔记是 Agent 主动选择记录的内容,提升是系统自动评估并持久化的内容。两者叠加形成双重保障。
策略 3 — 工具结果摘要(Tool Result Summarization)
核心思想:对于体积大的工具输出(如 5000 token 的文件内容),不直接删除,而是先用 LLM 提取关键发现,将 5000 token 的输出压缩为 200 token 的摘要。原始输出被丢弃,但摘要保留了关键信息。
触发条件:工具输出 > 摘要阈值(如 500 token)且 Agent 已完成对该输出的推理。如果 Agent 还在引用这个工具输出做决策——不压缩,等推理完成后再压缩。
局限:摘要丢失细节。对于要求精度的任务(如代码审查中需要检查具体的行号),摘要可能不够——需要保留原始引用(如「原始文件路径 + 行号」),让 Agent 在需要细节时重新读取。
策略 4 — 渐进压缩(Progressive Summarization)
核心思想:随信息老化,逐渐提高压缩比。不需要一次性把整个对话摘要——而是分层处理:
L0 — 当前轮(0 轮前) → 完整原文
L1 — 最近 5 轮 → 关键要点摘要
L2 — 5-20 轮前 → 一句话摘要
L3 — 20+ 轮前 → 标题(如「axios 迁移:已确认 45 个文件需修改」)
渐进压缩的哲学:信息越老,精确细节的价值越低,但概括性的「这件事做完了,结果是 X」仍然有价值。不需要等到 20 轮时再把前 20 轮一起压缩——而是在每轮推进时把最老的信息从 L1 推到 L2、从 L2 推到 L3。
策略 5 — 子 Agent 委托(Sub-Agent Delegation)
核心思想:把一个需要大量上下文的子任务分发给一个独立的子 Agent(拥有干净的上下文窗口),子 Agent 完成任务后返回一份浓缩摘要(1K-2K token)给主 Agent。主 Agent 的上下文保持清爽,子 Agent 在隔离的环境中工作。
何时使用子 Agent 委托:
- 子任务需要大量探索(搜索文件、分析依赖、Web 研究)→ 会产生大量中间结果,全部塞进主 Agent 上下文会快速填满窗口
- 子任务的上下文主 Agent 后续不需要——如「检查所有 breaking changes」只需要知道「发现了 3 个 breaking changes:A、B、C」,不需要知道搜索过程的每一步
- 子任务是独立可并行的——可以同时启动多个子 Agent 处理不同的子任务
反模式:不压缩子 Agent 的输出就直接注入主 Agent 上下文。如果子 Agent 执行了 50 步搜索,把所有结果原样返回——主 Agent 的上下文窗口被污染,委托失去了意义。子 Agent 的价值在于浓缩——把 50 步的工作浓缩为一段关键结论。
子 Agent 委托和 多 Agent 编排 的差异:本文讨论的是「用子 Agent 解决上下文压力」这一单一动机;多 Agent 编排覆盖更广的动机——并行、专门化、投票、层级控制等。
压缩触发策略
压缩的触发不能只是「利用率到了 90%」——那个时间点太晚。有效的触发策略是主动 + 分级:
- 利用率达 75%(橙色):启动工具结果摘要——这是最低成本的压缩,对 Agent 行为影响最小。
- 利用率达 85%:执行渐进压缩——把 10+ 轮前的内容层推高。
- 利用率达 92%(红色):对话压缩——将整个对话浓缩为摘要并重启上下文。
- 利用率达 95% + 子任务可隔离:子 Agent 委托——把可独立的部分分发给子 Agent。
这是一个压缩级联——从低成本低影响的策略开始,逐步升级到高成本高影响的策略。永远不会在橙色区就执行对话压缩(成本过高),也不会在红色区才开始工具结果摘要(效果不足)。
代码:CompressionEngine
代码:CompressionEngine
from dataclasses import dataclass, field
from typing import Optional, Callable
class CompactionStrategy(Enum):
CONVERSATION_COMPACTION = "conversation_compaction"
NOTE_TAKING = "note_taking"
TOOL_RESULT_SUMMARIZATION = "tool_result_summarization"
PROGRESSIVE = "progressive"
SUB_AGENT = "sub_agent"
@dataclass
class CompressionEngine:
"""5 种可插拔压缩策略"""
llm_call: Callable = None
notes_file_path: str = "agent_notes.md"
summarization_threshold_tokens: int = 500
progressive_window_sizes: dict = field(default_factory=lambda: dict(
l0_full=1, l1_keypoints=5, l2_oneline=15, l3_title=50
))
cascade_thresholds: dict = field(default_factory=lambda: {
0.75: CompactionStrategy.TOOL_RESULT_SUMMARIZATION,
0.85: CompactionStrategy.PROGRESSIVE,
0.92: CompactionStrategy.CONVERSATION_COMPACTION,
0.95: CompactionStrategy.SUB_AGENT,
})
def compact_conversation(self, messages, current_task_goal):
"""对话压缩——生成结构化摘要"""
if not self.llm_call:
return ""
history_text = self._format_messages(messages)
prompt = (
"从对话历史提取必须保留的信息。任务目标:"
+ current_task_goal + "。"
"规则:架构决策、未解决问题、进度、教训。"
)
return self.llm_call(prompt)
def take_structured_note(self, note_type, content):
"""写入结构化笔记"""
from datetime import datetime
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(self.notes_file_path, "a", encoding="utf-8") as f:
f.write("## [" + ts + "] " + note_type + "\n")
f.write(content + "\n---\n")
def read_notes(self):
"""读取笔记用于新窗口启动"""
try:
with open(self.notes_file_path, "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
return ""
def summarize_tool_result(self, tool_name, raw_output):
"""工具结果摘要"""
if len(raw_output) < self.summarization_threshold_tokens:
return raw_output
if not self.llm_call:
return raw_output[:self.summarization_threshold_tokens] + "..."
prompt = "从 " + tool_name + " 输出提取关键发现"
summary = self.llm_call(prompt)
return "[摘要] " + summary + "\n[来源: " + tool_name + "]"
def progressive_compress(self, blocks, current_turn):
"""渐进压缩——按轮次年龄提升压缩层级"""
result = []
for block in blocks:
age = current_turn - block.turn_added
ws = self.progressive_window_sizes
if age <= ws["l0_full"]:
block.compression_level = "l0"
elif age <= ws["l1_keypoints"]:
block.content = block.content[:300]
block.compression_level = "l1"
elif age <= ws["l2_oneline"]:
fl = block.content.split("\n")[0][:100]
block.content = str(block.turn_added) + " " + fl + "..."
block.compression_level = "l2"
else:
block.content = "轮" + str(block.turn_added) + ": " + block.content_type.value
block.compression_level = "l3"
result.append(block)
return result
def delegate_to_sub_agent(self, task_spec, relevant_context):
"""子 Agent 委托"""
if not self.llm_call:
return "[子Agent不可用] 任务: " + task_spec
prompt = "子任务:" + task_spec + "。返回浓缩摘要。"
return self.llm_call(prompt)
def select_strategy(self, utilization):
"""根据利用率选择压缩策略"""
for t in sorted(self.cascade_thresholds.keys(), reverse=True):
if utilization >= t:
return self.cascade_thresholds[t]
return None
def _format_messages(self, messages):
lines = []
for msg in messages[-100:]:
r = getattr(msg, "role", "unknown")
c = getattr(msg, "content", "")[:500]
lines.append("[" + r + "] " + c)
return "\n".join(lines)
# --- 使用示例 ---
def mock_llm(prompt):
return "v3 API 端点变更,45个文件受影响"
engine = CompressionEngine(llm_call=mock_llm)
summary = engine.compact_conversation([], "迁移 axios 到 fetch")
engine.take_structured_note("ARCH_DECISION", "使用原生 fetch")
tool_out = "ERROR at line 142" * 10
comp = engine.summarize_tool_result("grep_axios", tool_out)
print("75% 利用率策略:", engine.select_strategy(0.75))
print("95% 利用率策略:", engine.select_strategy(0.95))
压缩引擎的 select_strategy 方法是级联决策的核心。ContextWindowManager 的上层只需调用它就能获取正确的下一步操作。压缩质量评估将在 第 7 章:上下文健康监控 中详细讨论。
5. Token 预算管理:追踪、分配与限额执行
淘汰和压缩解决了「上下文装不下了怎么办」。但一个更根本的问题是:「每个组件在上下文中该占多少空间?」如果没有预算管理,淘汰和压缩都是在救火——而预算管理是防火。
Token 预算的数据模型
上下文窗口是一个共享空间——系统提示词、工具定义、消息历史、工具调用结果、思考块、记忆注入都在争抢同一个窗口。每类组件的 token 消耗需要被独立追踪:
| 组件 | 建议占比 | 策略 | 超额处理 |
|---|---|---|---|
| 系统提示词 | 5-8% | 固定分配,永不淘汰 | 💀 不可超额——提示词必须精简 |
| 工具定义 | 3-5% | 固定分配,可延迟加载 | 使用工具搜索代替全量注入 |
| 消息历史 | 60-70% | 动态分配,主要淘汰目标 | 触发压缩/淘汰 |
| 工具调用结果 | 10-15% | 上限分配,超限摘要 | 摘要后淘汰旧的 |
| 记忆注入 | 5-10% | 上限分配,相关性过滤 | 降低相关性门槛或淘汰旧记忆 |
| 输出预留 | 4-8% | 始终保留 | 如果剩余不足,触发紧急压缩 |
软限额 vs 硬限额
预算管理不是「超了就报错」那么简单。软限额和硬限额的区分让系统在严格和灵活之间找到平衡:
- 软限额(Soft Limit):达到组件预算的 80% 时发出警告,触发该组件的预压缩/预淘汰。Agent 仍然可以继续——但系统知道「快满了,该准备了」。
- 硬限额(Hard Limit):达到组件预算的 100% 时,强制执行压缩或淘汰。如果该组件不可压缩(如系统提示词),则抛出
budget_exceeded异常——不是 API 拒绝,而是我们的预算管理器拒绝。
Token 消耗速度(Burn Rate)预测
预算管理器不只是记录「当前用了多少」,还预测「按当前速度,什么时候用完」。这个预测能力让系统可以在临界点之前主动采取行动。预测公式:
turns_remaining = (total_budget - used_tokens) / avg_tokens_per_turn
minutes_remaining = turns_remaining * avg_seconds_per_turn / 60
如果预测显示 3 轮后就会溢出而当前任务还需要 10 轮——必须立即启动压缩或委托,不能等到 3 轮后再救火。
Token 成本是与工具设计强相关的——工具返回的 token 越多,消息历史的预算消耗越快。有效的工具应该返回精简、高信号的结果。参见 Agent 工具设计 中关于 token 高效工具的设计原则。
代码:TokenBudgetManager
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class BudgetComponent(Enum):
SYSTEM_PROMPT = "system_prompt"
TOOL_DEFINITIONS = "tool_definitions"
MESSAGE_HISTORY = "message_history"
TOOL_RESULTS = "tool_results"
MEMORY_INJECTION = "memory_injection"
OUTPUT_RESERVED = "output_reserved"
@dataclass
class ComponentBudget:
"""单个组件的 token 预算"""
used: int = 0
soft_limit_pct: float = 0.80 # 80% 软限额
hard_limit_tokens: int = 0 # 硬限额(绝对上限)
def soft_limit(self):
return int(self.hard_limit_tokens * self.soft_limit_pct)
def utilization(self):
if self.hard_limit_tokens == 0:
return 0.0
return self.used / self.hard_limit_tokens
def is_soft_exceeded(self):
return self.used >= self.soft_limit()
def is_hard_exceeded(self):
return self.used >= self.hard_limit_tokens
@dataclass
class TokenBudgetManager:
"""Token 预算管理器——追踪、分配、限额执行"""
max_context_tokens: int = 128000
# 每个组件在总预算中的占比
allocation_pct: dict = field(default_factory=lambda: {
BudgetComponent.SYSTEM_PROMPT: 0.06,
BudgetComponent.TOOL_DEFINITIONS: 0.04,
BudgetComponent.MESSAGE_HISTORY: 0.65,
BudgetComponent.TOOL_RESULTS: 0.12,
BudgetComponent.MEMORY_INJECTION: 0.08,
BudgetComponent.OUTPUT_RESERVED: 0.05,
})
# 各组件当前状态
budgets: dict = field(default_factory=dict)
# 速度追踪(tokens/轮)
burn_history: list = field(default_factory=list)
burn_window_size: int = 10
def __post_init__(self):
for comp, pct in self.allocation_pct.items():
self.budgets[comp] = ComponentBudget(
hard_limit_tokens=int(self.max_context_tokens * pct))
def track_usage(self, component, tokens_added):
"""追踪组件 token 消耗"""
budget = self.budgets.get(component)
if not budget:
return
budget.used += tokens_added
def check_before_add(self, component, tokens_to_add):
"""添加内容前检查——是否超限"""
budget = self.budgets.get(component)
if not budget:
return dict(allowed=True)
after = budget.used + tokens_to_add
if after >= budget.hard_limit_tokens:
return dict(
allowed=False,
reason="硬限额超出",
component=component.value,
current=budget.used,
limit=budget.hard_limit_tokens,
excess=after - budget.hard_limit_tokens,
)
if after >= budget.soft_limit():
return dict(
allowed=True,
warning="软限额超出——建议压缩",
component=component.value,
)
return dict(allowed=True)
def total_used(self):
return sum(b.used for b in self.budgets.values())
def total_utilization(self):
return self.total_used() / self.max_context_tokens
def burn_rate(self):
"""平均每轮 token 消耗速度"""
if not self.burn_history:
return 0.0
return sum(self.burn_history) / len(self.burn_history)
def turns_until_exhausted(self):
"""预测距离预算耗尽还有多少轮"""
rate = self.burn_rate()
if rate <= 0:
return None
remaining = self.max_context_tokens - self.total_used()
return int(remaining / rate)
def record_turn(self, tokens_this_turn):
"""每轮结束后记录消耗"""
self.burn_history.append(tokens_this_turn)
if len(self.burn_history) > self.burn_window_size:
self.burn_history.pop(0)
def budget_report(self):
"""生成预算报告"""
report = []
for comp, budget in self.budgets.items():
report.append(dict(
component=comp.value,
used=budget.used,
limit=budget.hard_limit_tokens,
utilization=round(budget.utilization(), 2),
exceeded=budget.is_hard_exceeded(),
))
return dict(
components=report,
total_used=self.total_used(),
total_utilization=round(self.total_utilization(), 2),
burn_rate=round(self.burn_rate()),
turns_remaining=self.turns_until_exhausted(),
)
# --- 使用示例 ---
budget_mgr = TokenBudgetManager(max_context_tokens=128000)
# 追踪各组件消耗
budget_mgr.track_usage(BudgetComponent.SYSTEM_PROMPT, 5000)
budget_mgr.track_usage(BudgetComponent.TOOL_DEFINITIONS, 3000)
budget_mgr.track_usage(BudgetComponent.MESSAGE_HISTORY, 45000)
# 添加前检查
check = budget_mgr.check_before_add(
BudgetComponent.MESSAGE_HISTORY, 20000)
print("添加检查:", check)
# 预算报告
report = budget_mgr.budget_report()
print("总利用率:", report["total_utilization"])
print("预估剩余轮数:", report["turns_remaining"])
TokenBudgetManager 的核心价值在于事前预防。在每次向上下文添加内容之前,check_before_add 评估「加进去会不会超限」。如果会——不是拒绝添加,而是触发压缩/淘汰/委托。这样上下文管理从「被动救火」变成了「主动防火」。
预算管理器与上下文协议的集成:Agent 上下文协议设计 中的信封路由系统根据组件类型将内容分配到正确的预算桶中。
6. 跨窗口状态管理:让 Agent 在上下文重置后继续工作
压缩和淘汰能延长上下文窗口的寿命,但有些任务实在太长——即使经过多轮压缩,压缩摘要 + 关键上下文 + 新输出仍然会填满一个窗口。此时必须跨越上下文窗口边界:在当前窗口关闭前保存状态,在下一个窗口启动时恢复状态。
跨窗口状态序列化合约
上下文窗口之间传递的不是「对话历史」,而是一份结构化状态对象。这份对象必须包含新窗口立即可用的所有信息:
- task_definition:原始任务目标、成功标准、硬约束——不可丢失
- progress_state:已完成子任务列表、当前子任务、剩余子任务——丢失意味着重复工作
- decisions_log:架构决策、做出的取舍、原因——防止新窗口重复争论
- open_issues:未解决的 bug、待回答问题、阻塞点
- environment_state:文件路径、服务状态——可通过引导序列重建
- learnings:发现的模式、有效策略、该避免的死胡同
启动引导序列
新上下文窗口启动时,Agent 执行标准化的引导序列来重建工作状态:
- 定位:执行
pwd确认工作目录 - 读取状态文件:加载上次窗口序列化的 JSON 状态
- 验证环境:运行
git status等基本检查确认环境一致 - 注入摘要:将上次窗口的压缩摘要注入新窗口的工作记忆
- 确认下一步:从进度状态提取当前子任务,开始执行
「干净状态」要求
每次上下文窗口关闭时,Agent 必须确保外部环境处于可接力的状态:没有未提交的代码变更、没有遗留的运行中服务、进度文件已更新到最新。
反模式:跨窗口状态的常见陷阱
孤儿状态:状态文件写入了「计划做」但 Agent 未执行就被中断——新窗口误以为已完成。修复:在状态中区分「planned」和「done」标记。
过期检查点:多个窗口并行操作同一状态文件,旧检查点覆盖了新状态。修复:使用单调递增的窗口编号和状态版本号。
隐式状态:Agent 依赖「我记得上次看到了 X」而未写入状态——跨窗口时「记忆」不存在。修复:所有关键发现必须显式序列化。
跨窗口状态管理和 Agent 错误恢复 互补:上下文溢出是一种特殊的错误场景,错误恢复检测到溢出时应触发跨窗口状态保存,在崩溃前持久化进度。
代码:CrossWindowStateManager
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional
import json, os
@dataclass
class AgentState:
"""跨窗口传递的 Agent 状态对象"""
window_id: int = 1
task_goal: str = ""
success_criteria: list = field(default_factory=list)
constraints: list = field(default_factory=list)
completed_subtasks: list = field(default_factory=list)
current_subtask: str = ""
remaining_subtasks: list = field(default_factory=list)
decisions: list = field(default_factory=list)
open_issues: list = field(default_factory=list)
learnings: list = field(default_factory=list)
compaction_summary: str = ""
saved_at: str = ""
@dataclass
class CrossWindowStateManager:
"""跨窗口状态管理器——序列化、恢复、验证"""
state_file_path: str = "agent_state.json"
progress_file_path: str = "agent_progress.md"
def save_state(self, state: AgentState):
"""保存状态到文件——在窗口关闭前调用"""
state.saved_at = datetime.now().isoformat()
# JSON 格式——机器可解析
with open(self.state_file_path, "w", encoding="utf-8") as f:
json.dump(asdict(state), f, ensure_ascii=False, indent=2)
# Markdown 格式——人类可读
self._write_progress_md(state)
def load_state(self) -> Optional[AgentState]:
"""从文件加载状态——在新窗口启动时调用"""
if not os.path.exists(self.state_file_path):
return None
with open(self.state_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
state = AgentState(**data)
# 验证状态完整性
validation = self.validate_state(state)
if validation["has_errors"]:
print("状态验证警告:", validation["errors"])
return state
def bootstrap_prompt(self, state: AgentState) -> str:
"""生成新窗口的启动引导提示词"""
lines = []
lines.append("[CONTEXT_WINDOW_RESUME] 从上一个上下文窗口恢复")
lines.append(f"任务目标: {state.task_goal}")
lines.append(f"已完成: {len(state.completed_subtasks)} 个子任务")
for task in state.completed_subtasks[-5:]:
lines.append(f" [done] {task}")
if state.current_subtask:
lines.append(f"当前: {state.current_subtask}")
if state.remaining_subtasks:
lines.append(f"剩余: {len(state.remaining_subtasks)} 个子任务")
for task in state.remaining_subtasks[:5]:
lines.append(f" [todo] {task}")
if state.open_issues:
lines.append("未解决问题:")
for issue in state.open_issues:
lines.append(f" [!] {issue}")
if state.decisions:
lines.append("关键决策:")
for d in state.decisions[-5:]:
lines.append(f" [dec] {d}")
if state.compaction_summary:
lines.append(f"上一窗口摘要: {state.compaction_summary}")
return "\n".join(lines)
def validate_state(self, state: AgentState) -> dict:
"""验证状态对象的完整性"""
errors = []
if not state.task_goal:
errors.append("缺少任务目标")
if not state.current_subtask and state.remaining_subtasks:
errors.append("有剩余子任务但未指定当前子任务")
if state.window_id <= 0:
errors.append("无效的窗口 ID")
return dict(
has_errors=len(errors) > 0,
errors=errors,
)
def _write_progress_md(self, state: AgentState):
"""写入人类可读的 Markdown 进度文件"""
with open(self.progress_file_path, "w", encoding="utf-8") as f:
f.write(f"# Agent Progress - Window {state.window_id}\n\n")
f.write(f"**Goal:** {state.task_goal}\n\n")
f.write(f"**Saved:** {state.saved_at}\n\n")
f.write("## Completed\n")
for task in state.completed_subtasks:
f.write(f"- [x] {task}\n")
f.write(f"\n## Current: {state.current_subtask}\n\n")
f.write("## Remaining\n")
for task in state.remaining_subtasks:
f.write(f"- [ ] {task}\n")
if state.open_issues:
f.write("\n## Open Issues\n")
for issue in state.open_issues:
f.write(f"- [!] {issue}\n")
# --- 使用示例 ---
manager = CrossWindowStateManager()
# 窗口 1 结束时保存状态
state = AgentState(
window_id=1,
task_goal="将 user-service 从 v2.1 升级到 v3.0",
completed_subtasks=["检查 breaking changes", "staging 部署成功"],
current_subtask="运行集成测试",
remaining_subtasks=["金丝雀发布", "全量切换"],
decisions=["使用滚动更新保证零停机"],
open_issues=[],
compaction_summary="v3.0 移除了 /api/v1/users 端点",
)
manager.save_state(state)
# 窗口 2 启动时恢复
loaded = manager.load_state()
if loaded:
bootstrap = manager.bootstrap_prompt(loaded)
print(bootstrap[:200])
跨窗口状态管理是上下文窗口管理系统的最后一道防线——当压缩和淘汰都无法避免窗口重置时,它确保 Agent 不是从零开始,而是从上一个窗口的精确检查点继续。这是长任务 Agent 能够在生产环境中运行数小时的关键。
7. 上下文健康监控:指标、告警与压缩质量评估
你实现了压缩、淘汰、预算管理——但它们真的在工作吗?压缩有没有丢失关键信息?淘汰是不是太激进了?上下文衰减有没有在暗中恶化?没有监控的上下文管理是盲飞——你需要定量指标来验证每个子系统的效果。
核心健康指标
| 指标 | 含义 | 健康范围 | 告警阈值 |
|---|---|---|---|
| 利用率(%) | 当前 tokens / 最大窗口 tokens | < 60% | > 90% 持续 3 轮 → CRITICAL |
| 淘汰频率 | 每轮淘汰的事件数 | < 2/轮 | > 5/轮 → WARNING(窗口可能太小) |
| 压缩比 | 原始 tokens / 压缩后 tokens | 3x–10x | < 2x → 压缩效果差,需优化提示词 |
| Token 消耗速度 | tokens/轮 | 稳定或线性增长 | 突然 3x+ → 可能 runaway loop |
| 压缩保真度 | 压缩后关键信息保留率(0-1) | > 0.85 | < 0.75 → 压缩丢了重要信息 |
| 工具结果膨胀比 | 工具结果 tokens / 总上下文 tokens | 10–15% | > 30% → 工具返回过多数据 |
压缩保真度评估
压缩质量评估是上下文监控中最难也最关键的环节。你怎么知道压缩后的摘要是否保留了关键信息?答案是LLM-as-Judge——用另一个 LLM 调用来评估压缩质量。
评估方法:
- 准备测试问题集:针对原始对话内容,准备 N 个问题(如「任务当前在第几步?」「用户在第 3 轮提了什么关键要求?」)
- 对压缩前和压缩后的上下文分别提问:记录两个答案
- LLM-as-Judge 评分:将两个答案和正确答案发送给 Judge LLM,让它评分答案是否一致
- 保真度 = 一致答案数 / 总问题数:目标 > 0.85
这个评估流程可以自动化——在 CI 中作为压缩提示词的回归测试,每次修改压缩提示词后自动运行保真度评估。
保真度评估与 Agent 评估框架 紧密相关——评估框架提供了 LLM-as-Judge 评分的基础设施,保真度评估是其在压缩领域的特化应用。
告警规则
告警不应该「响了再说」——上下文管理的告警应该是分级的:
- CRITICAL:利用率 > 90% 持续 3 轮 → 上下文即将溢出,可能触发跨窗口保存
- WARNING(高频淘汰):淘汰频率 > 5/轮 → 窗口可能太小或淘汰策略过于激进
- WARNING(低保真度):压缩保真度 < 0.75 → 压缩提示词可能丢失关键信息
- WARNING(速度异常):Token 消耗速度超过历史均值的 2x → 可能 runaway loop
这些告警指标应该作为 Prometheus 指标发射,与 Agent 可观察性 的基础设施集成——通过 Grafana 仪表盘可视化、AlertManager 触发通知。
代码:ContextHealthMonitor
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable
from enum import Enum
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class HealthMetrics:
"""上下文健康指标快照"""
utilization: float = 0.0
eviction_frequency: float = 0.0 # 每轮淘汰次数
compression_ratio: float = 1.0 # 原始/压缩后(> 1 表示压缩有效)
burn_rate: float = 0.0 # tokens/轮
fidelity_score: float = 1.0 # 压缩保真度(0-1)
tool_result_bloat: float = 0.0 # 工具结果占比
timestamp: str = ""
@dataclass
class ContextHealthMonitor:
"""上下文健康监控——指标收集、保真度评估、告警"""
# 指标历史(用于趋势分析)
metrics_history: list = field(default_factory=list)
max_history_size: int = 100
# 告警回调
on_alert: Optional[Callable] = None
# 保真度评估
fidelity_test_questions: list = field(default_factory=list)
def collect_metrics(self, pressure_monitor, eviction_engine,
budget_manager, compression_engine) -> HealthMetrics:
"""从各子系统收集指标并计算健康状态"""
m = HealthMetrics()
m.timestamp = datetime.now().isoformat()
m.utilization = pressure_monitor.utilization()
m.burn_rate = budget_manager.burn_rate()
# 工具结果膨胀比
tr_budget = budget_manager.budgets.get("tool_results")
if tr_budget:
total = budget_manager.total_used()
m.tool_result_bloat = tr_budget.used / max(total, 1)
self.metrics_history.append(m)
if len(self.metrics_history) > self.max_history_size:
self.metrics_history.pop(0)
return m
def evaluate_fidelity(self, original_context: str,
compressed_context: str,
llm_judge: Callable) -> float:
"""评估压缩保真度——LLM-as-Judge 方法"""
if not self.fidelity_test_questions:
return 1.0 # 没有测试问题,跳过评估
consistent_count = 0
for question in self.fidelity_test_questions:
# 对原始上下文提问
answer_orig = llm_judge(
f"上下文:{original_context[:5000]}\n"
f"问题:{question}\n请用一句话回答。")
# 对压缩上下文提问
answer_comp = llm_judge(
f"上下文:{compressed_context[:5000]}\n"
f"问题:{question}\n请用一句话回答。")
# Judge 判断是否一致
judge_result = llm_judge(
f"判断以下两个答案是否传达相同的关键信息:\n"
f"A: {answer_orig}\nB: {answer_comp}\n"
f"只回答 yes 或 no。")
if "yes" in judge_result.lower():
consistent_count += 1
fidelity = consistent_count / len(self.fidelity_test_questions)
return fidelity
def check_alerts(self, metrics: HealthMetrics):
"""根据指标检查告警条件"""
alerts = []
if metrics.utilization > 0.90:
alerts.append(dict(
level=AlertLevel.CRITICAL,
message=f"上下文利用率 {metrics.utilization:.1%} 超过 90%",
metric="utilization",
))
if metrics.eviction_frequency > 5:
alerts.append(dict(
level=AlertLevel.WARNING,
message=f"淘汰频率过高: {metrics.eviction_frequency}/轮",
metric="eviction_frequency",
))
if metrics.fidelity_score < 0.75:
alerts.append(dict(
level=AlertLevel.WARNING,
message=f"压缩保真度过低: {metrics.fidelity_score:.2f}",
metric="fidelity_score",
))
if metrics.compression_ratio < 2.0 and metrics.compression_ratio > 1.0:
alerts.append(dict(
level=AlertLevel.WARNING,
message=f"压缩比偏低: {metrics.compression_ratio:.1f}x",
metric="compression_ratio",
))
for alert in alerts:
if self.on_alert:
self.on_alert(alert)
return alerts
def trend_report(self) -> dict:
"""生成趋势报告——对比最近 N 个指标快照"""
if len(self.metrics_history) < 2:
return dict(status="insufficient_data")
recent = self.metrics_history[-10:]
first = recent[0]
last = recent[-1]
return dict(
utilization_change=last.utilization - first.utilization,
burn_rate_change=last.burn_rate - first.burn_rate,
avg_fidelity=sum(
m.fidelity_score for m in recent) / len(recent),
data_points=len(recent),
)
# --- 使用示例 ---
monitor = ContextHealthMonitor(
fidelity_test_questions=[
"当前任务的目标是什么?",
"已经完成了哪些步骤?",
"有什么未解决的问题?",
]
)
def judge_fn(prompt):
return "yes, the answers convey the same information"
# 评估压缩保真度
fidelity = monitor.evaluate_fidelity(
"我们决定迁移 axios 到 fetch。已完成 7/12 任务...",
"迁移 axios 到 fetch,完成 7/12 任务",
llm_judge=judge_fn,
)
print(f"压缩保真度: {fidelity:.2f}")
上下文健康监控将「上下文管理是否有效」从一个定性感受变成了一个定量事实。每个压缩操作、每次淘汰决策都产生可测量的影响。这套指标不仅用于实时告警,也为后续的权重调优和策略选择提供了数据基础。
8. 整合:完整的 ContextWindowManager 架构
前面七章分别实现了压力感知、淘汰引擎、压缩引擎、预算管理、跨窗口状态和健康监控。这些子系统不是独立运行的——它们需要在一个统一的编排器中协同工作。本章给出完整的 ContextWindowManager 整合架构。
架构全景图
Agent Loop → ContextWindowManager(统一编排器)
│
┌──────────────┼──────────────┬──────────────┬──────────────┐
▼ ▼ ▼ ▼ ▼
PressureMonitor EvictionEngine Compression TokenBudget CrossWindow
(何时介入) (扔掉什么) Engine Manager StateManager
│ (缩小上下文) (限额执行) (持久化状态)
┌────┴────┐
▼ ▼
6种淘汰策略 5种压缩策略
(FIFO/LRU/ (Compaction/
Priority/ Notes/Summary/
Semantic/ Progressive/
Type/Hybrid) SubAgent)
┌──────────────────────────────────────────────────┐
▼ ▼
ContextHealthMonitor 可观察性管道
(指标/保真度/告警) (Prometheus + Grafana)
生命周期:一个 Agent 任务如何流经 ContextWindowManager
从任务启动到完成,上下文窗口管理的每一步都有明确的子系统负责:
- 任务启动:TokenBudgetManager 根据配置分配预算 → CrossWindowStateManager 检查是否有上一个窗口的残留状态(如果是恢复任务,加载状态并执行引导序列)→ ContextPressureMonitor 开始追踪利用率
- 每次 LLM 调用前:ContextPressureMonitor 检查利用率 → 如果进入黄色/橙色区,EvictionEngine 选择淘汰候选并执行 → 如果压力持续,CompressionEngine 执行级联压缩 → TokenBudgetManager 验证下一轮是否有足够预算
- 工具调用后:TokenBudgetManager 追踪工具结果的 token 消耗 → 如果工具结果膨胀比超标,CompressionEngine.summarize_tool_result 进行摘要 → EvictionEngine 按类型 TTL 淘汰旧工具结果
- 上下文溢出(红色区):如果利用率达到 95%+ 且压缩不足以缓解 → CrossWindowStateManager 序列化当前状态 → 上下文窗口重置 → 引导序列启动 → Agent 从检查点恢复
- 任务完成:CompressionEngine 生成最终压缩摘要 → CrossWindowStateManager 归档状态 → ContextHealthMonitor 发射最终指标快照
代码:ContextWindowManager(完整整合)
from dataclasses import dataclass, field
from typing import Optional, Callable
from enum import Enum
class WindowStatus(Enum):
HEALTHY = "healthy" # 绿色区:正常
PREPARING = "preparing" # 黄色区:准备淘汰/压缩
ACTIVE_MANAGEMENT = "active_management" # 橙色区:主动淘汰+压缩
EMERGENCY = "emergency" # 红色区:紧急跨窗口保存
@dataclass
class ContextWindowManager:
"""上下文窗口管理器——整合所有子系统"""
# 六大子系统
pressure_monitor: object = None # ContextPressureMonitor
eviction_engine: object = None # EvictionEngine
compression_engine: object = None # CompressionEngine
budget_manager: object = None # TokenBudgetManager
state_manager: object = None # CrossWindowStateManager
health_monitor: object = None # ContextHealthMonitor
# 配置
eviction_policy: str = "hybrid" # 淘汰策略选择
max_context_tokens: int = 128000
task_goal: str = ""
# 运行时状态
status: WindowStatus = WindowStatus.HEALTHY
turn_count: int = 0
windows_created: int = 1
context_blocks: list = field(default_factory=list)
events_log: list = field(default_factory=list)
def on_turn_start(self) -> dict:
"""每轮推理开始前调用——返回上下文管理决策"""
self.turn_count += 1
utilization = self.pressure_monitor.utilization()
budget_ok = self.budget_manager.check_before_add(
"message_history", 4000) # 预估输出量
# 判断当前状态
if utilization >= 0.95:
self.status = WindowStatus.EMERGENCY
return self._handle_emergency()
elif utilization >= 0.75:
self.status = WindowStatus.ACTIVE_MANAGEMENT
return self._handle_active_management(utilization)
elif utilization >= 0.50:
self.status = WindowStatus.PREPARING
return self._handle_preparing()
else:
self.status = WindowStatus.HEALTHY
return dict(action="none", utilization=utilization)
def on_turn_end(self, tokens_added: int):
"""每轮推理结束后调用——更新追踪"""
self.pressure_monitor.update(tokens_added)
self.budget_manager.track_usage("message_history", tokens_added)
self.budget_manager.record_turn(tokens_added)
def on_tool_result(self, tool_name: str, result: str,
token_count: int):
"""工具调用完成后——追踪、摘要、淘汰"""
# 追踪预算
self.budget_manager.track_usage("tool_results", token_count)
# 工具结果摘要
if token_count > self.compression_engine.summarization_threshold_tokens:
result = self.compression_engine.summarize_tool_result(
tool_name, result)
# 添加到上下文块列表(用于淘汰决策)
from eviction_engine import ContextBlock, ContentType
block = ContextBlock(
block_id=f"tool_{self.turn_count}",
content_type=ContentType.TOOL_RESULT,
content=result[:500],
token_count=token_count,
turn_added=self.turn_count,
importance_score=0.5,
)
self.context_blocks.append(block)
# 更新压力监控
self.pressure_monitor.update(token_count)
def on_context_overflow(self):
"""上下文即将溢出——触发跨窗口保存"""
self._log_event("overflow", dict(turn=self.turn_count))
# 序列化当前状态
state = AgentState(
window_id=self.windows_created,
task_goal=self.task_goal,
compaction_summary=self.compression_engine.compact_conversation(
self.context_blocks, self.task_goal),
)
self.state_manager.save_state(state)
# 收集健康指标
metrics = self.health_monitor.collect_metrics(
self.pressure_monitor, self.eviction_engine,
self.budget_manager, self.compression_engine)
self.health_monitor.check_alerts(metrics)
return dict(
action="cross_window_save",
new_window_id=self.windows_created + 1,
metrics=metrics,
)
def _handle_preparing(self) -> dict:
"""黄色区——评估但不行动"""
candidates = self.eviction_engine.select_eviction_candidates(
self.context_blocks, self.turn_count,
target_tokens_to_free=0, # 只评估,不淘汰
policy=self.eviction_policy)
return dict(
action="evaluate_candidates",
candidate_count=len(candidates),
recommendation="准备淘汰——如果利用率继续上升",
)
def _handle_active_management(self, utilization: float) -> dict:
"""橙色区——主动淘汰 + 按需压缩"""
# 计算需要释放多少 token
target = self.max_context_tokens - (
self.max_context_tokens * 0.6)
target_to_free = self.pressure_monitor.current_tokens - target
# 1. 先尝试淘汰
evicted = self.eviction_engine.select_eviction_candidates(
self.context_blocks, self.turn_count,
target_tokens_to_free=target_to_free,
policy=self.eviction_policy)
# 2. 如果淘汰不够,启动压缩
strategy = self.compression_engine.select_strategy(utilization)
compressed = None
if strategy:
compressed = self.compression_engine.progressive_compress(
self.context_blocks, self.turn_count)
self._log_event("compression", dict(strategy=strategy.value))
return dict(
action="active_management",
evicted_count=len(evicted),
evicted_ids=evicted,
compression_strategy=strategy.value if strategy else None,
tokens_freed_estimate=sum(
b.token_count for b in self.context_blocks
if b.block_id in evicted) if evicted else 0,
)
def _handle_emergency(self) -> dict:
"""红色区——强制压缩或跨窗口保存"""
# 尝试最后一次对话压缩
summary = self.compression_engine.compact_conversation(
self.context_blocks, self.task_goal)
self._log_event("emergency_compaction", dict())
# 评估是否需要跨窗口保存
if self.pressure_monitor.turns_until_overflow() < 2:
return self.on_context_overflow()
return dict(
action="emergency_progressive_compression",
summary_length=len(summary),
)
def _log_event(self, event_type: str, metadata: dict):
self.events_log.append(dict(
turn=self.turn_count,
type=event_type,
timestamp=datetime.now().isoformat(),
metadata=metadata,
))
def get_audit_trail(self) -> list:
"""获取上下文管理的完整审计追踪"""
return self.events_log
# ─── 集成配置(YAML) ───
# context_window_config.yaml
"""
max_context_tokens: 128000
pressure_thresholds:
yellow: 0.50
orange: 0.75
red: 0.90
eviction:
policy: hybrid
weights:
freshness: 0.3
priority: 0.3
type_ttl: 0.2
reference: 0.2
type_ttl:
tool_result: 10
thinking: 3
memory_injection: 5
protected_types:
- system
- user_message_current_turn
compression:
summarization_threshold_tokens: 500
cascade:
0.75: tool_result_summarization
0.85: progressive
0.92: conversation_compaction
0.95: sub_agent
progressive:
l0_full: 1
l1_keypoints: 5
l2_oneline: 15
l3_title: 50
notes_file: agent_notes.md
budget:
allocations:
system_prompt: 0.06
tool_definitions: 0.04
message_history: 0.65
tool_results: 0.12
memory_injection: 0.08
output_reserved: 0.05
soft_limit_pct: 0.80
cross_window:
state_file: agent_state.json
progress_file: agent_progress.md
health_monitoring:
metrics:
- utilization
- eviction_frequency
- compression_ratio
- burn_rate
- fidelity_score
alerts:
utilization_critical: 0.90
eviction_warning: 5
fidelity_warning: 0.75
burn_rate_spike_ratio: 2.0
"""
ContextWindowManager 是整套上下文窗口管理系统的单一入口。Agent 开发者不需要分别调用六个子系统——只需在 Agent 主循环的关键钩子点(on_turn_start、on_turn_end、on_tool_result、on_context_overflow)调用 ContextWindowManager 的对应方法。内部的压力感知、淘汰选择、压缩级联、预算验证、状态序列化全部透明。
这份 YAML 配配置将所有可调参数集中在一处——淘汰策略和权重、压缩触发阈值、预算分配比例、跨窗口文件路径、健康监控告警阈值。针对不同任务类型(代码审查、数据分析、客户服务),只需切换配置文件即可获得不同的上下文管理行为。
上下文管理的所有事件(淘汰、压缩、预算超限、跨窗口保存)都应作为审计事件被记录。参见 Agent 审计日志设计——每条上下文管理事件都是一条不可篡改的审计记录,形成完整的证据链。
常见问题(FAQ)
上下文窗口管理和 agent-memory-design 那篇文章的关系是什么?
agent-memory-design 是「仓库建筑师」——定义 L0-L3 四层记忆架构、每层存什么、检索边界如何设计。本文是「仓库管理员」——管理 L0(上下文窗口)的大小和内容。memory-design 说 L0 应该存什么(task_goal、active_plan、observations),本文说 L0 装不下了怎么办(压缩、淘汰、预算)。两篇文章互补:先读 memory-design 理解全貌,再读本文获取 L0 层的操作细节。
什么时候该压缩,什么时候该淘汰,什么时候该委托给子 Agent?
这三个操作不是互斥的——它们构成一个级联:淘汰优先(成本最低,删除冗余信息)→ 压缩跟进(当淘汰不够时,浓缩信息而非删除)→ 委托兜底(当子任务需要大量独立上下文时,分发给子 Agent)。具体触发点:利用率 75% 开始淘汰 + 工具结果摘要,85% 渐进压缩,92% 对话压缩,95%+ 且子任务可隔离时触发子 Agent 委托。这个级联由 CompressionEngine.select_strategy() 自动决策。
怎么衡量压缩有没有丢失关键信息?
使用 LLM-as-Judge 保真度评估:准备 N 个关于原始上下文的测试问题 → 分别用原始上下文和压缩上下文回答 → 用 Judge LLM 判断答案是否一致 → 保真度 = 一致数 / N。目标 > 0.85。这个评估可以自动化跑在 CI 中,作为压缩提示词的回归测试。ContextHealthMonitor.evaluate_fidelity() 提供了完整实现。
FIFO、LRU、优先级淘汰,到底该用哪个?
按任务类型选择:对话式聊天用 FIFO(简单可预测)→ 探索性任务(代码搜索、研究调研)用 LRU(自动清除死胡同)→ 异构内容混合的生产 Agent 用混合策略(优先级 + 类型 TTL + 引用频率)。一个实用的经验法则:先用混合策略的默认权重运行,观察 ContextHealthMonitor 的指标(淘汰频率、压缩保真度),根据数据调整权重——没有「万能的最佳策略」,只有「针对你的任务调优后的最佳策略」。
跨窗口状态管理会不会导致状态不一致?怎么验证?
有这种风险,但可以通过以下措施防范:(1)状态文件使用递增的窗口编号和版本号——新窗口启动时只读取版本号 >= 当前版本的状态,防止过期检查点覆盖新状态;(2)在状态中明确区分「planned」(计划做但未执行)和「done」(已完成),防止孤儿状态被误认为已完成;(3)CrossWindowStateManager.validate_state() 在加载状态时进行完整性检查——缺少任务目标、有剩余子任务但无当前子任务等情况会触发验证警告;(4)引导序列包含环境验证步骤(git status、服务健康检查),确保状态和实际环境一致。
token 预算设多大合适?怎么根据任务类型调整?
没有绝对的「合适」——预算取决于任务类型和模型窗口大小。一个基于 128K 窗口的实用起点:系统提示词 6%(~7.5K)→ 工具定义 4%(~5K)→ 消息历史 65%(~83K)→ 工具结果 12%(~15K)→ 记忆注入 8%(~10K)→ 输出预留 5%(~6K)。按任务类型调整:工具密集型任务(代码审查、运维自动化)→ 增加工具结果预算(15-20%)、减少消息历史;对话密集型任务(客服、咨询)→ 增加消息历史预算(70-75%)、减少工具结果。核心原则:输出预留永远不能低于 4K token——否则 LLM 可能连一个完整的响应都生成不了。
继续阅读 / 下一步阅读
本文是 Agent Memory and Context Engineering 系列中 L0(上下文窗口)的操作手册。建议按以下路径继续:
- Agent 记忆系统设计 — L0-L3 四层记忆架构的完整设计,本文的架构基础(先读)
- Agent 上下文协议设计 — 上下文数据的流动管道和信封协议,与本文的淘汰/预算系统互补
- Agent 可观察性 — 将本文的上下文健康指标接入 Prometheus/Grafana,构建实时监控面板
- Agent 工具设计 — 从源头减少上下文压力:设计 token 高效的工具,降低工具结果膨胀比
- 多 Agent 编排 — 子 Agent 上下文隔离的完整编排模式,本文的委托策略在此展开
- Agent 错误恢复 — 上下文溢出作为特殊错误场景的恢复策略,与本文的跨窗口状态管理联动
- Agent 审计日志设计 — 上下文管理事件的审计追踪,确保淘汰/压缩/预算决策可追溯
- Agent 评估框架 — 压缩保真度评估的完整方法论,LLM-as-Judge 评分基础设施