Agent 上下文窗口管理:如何压缩、保留和淘汰任务信息

30秒要点

  • 核心问题:Agent 在生产环境中运行超过 50 步工具调用或数小时后,上下文窗口必然溢出。即使模型支持 128K token,上下文衰减(context rot)也会让 Agent 在长任务中越跑越差——这不是换个更大的模型能解决的。
  • 解决方案:一套完整的上下文窗口生命周期管理系统——感知压力(何时介入)→ 淘汰(扔掉什么)→ 压缩(不丢状态地缩小)→ 跨窗口状态保持(重置后继续工作)。每一步都有代码实现和决策框架。
  • 关键实现:ContextWindowManager 编排器整合 6 个子系统——ContextPressureMonitor、EvictionEngine(6 种淘汰策略)、CompressionEngine(5 种压缩策略)、TokenBudgetManager、CrossWindowStateManager、ContextHealthMonitor。含 8 段完整 Python 代码。
  • 读完能做什么:为你的 Agent 实现一套生产级的上下文窗口管理——在上下文耗尽前主动压缩,在冗余产生时精准淘汰,在窗口重置后无缝恢复。告别「Agent 跑着跑着就崩了」的生产事故。

1. 为什么上下文窗口管理是第一优先级工程问题

一个代码审查 Agent 在处理一个大型重构任务。用户要求:"把 user-service 里所有 REST API 调用从 axios 迁移到 fetch,同时保持错误处理逻辑不变。"Agent 开始工作——第 1 步搜索所有 axios 引用,第 5 步分析错误处理模式,第 15 步开始逐文件替换,第 30 步运行测试发现 break。到第 40 步时,Agent 的行为开始变得奇怪——它反复修改同一个文件而不推进任务,忘记了自己在第 15 步已经分析过的依赖关系,甚至在 50 步后直接抛出 model_context_window_exceeded 错误。

这不是 Agent 能力的问题。这是上下文窗口被当作无限资源来使用的结果。第 1 步的 axios 搜索结果、第 5 步的错误处理模式分析、第 15 步的逐文件依赖关系——所有内容都被忠实地追加到上下文窗口中。128K 的窗口看似巨大,但经过 40 轮的累积,系统提示词、工具定义、对话历史、工具调用结果已经把窗口塞满。Agent 在第 41 步开始丢失关键上下文,在第 50 步撞墙。

五种「不管上下文窗口」的失败模式

在生产环境中,简单地"把所有历史消息塞进上下文"会触发五类问题:

  1. 上下文衰减(Context Rot):LLM 的注意力机制中,每对 token 之间都存在注意力关系(n² 复杂度)。当上下文扩展到数万 token 时,模型对每条信息的平均注意力被稀释。Chroma 的研究证实:即使模型声称支持 128K token,在 64K+ 的上下文长度下,GPT-4 开始产生幻觉性推断,Claude 倾向于过度谨慎地拒绝回答。这不是模型 bug,是注意力预算耗尽的物理规律。
  2. Token 成本线性膨胀:每轮对话都把历史完整发送给 LLM——第 1 轮成本 $0.01,第 50 轮成本 $0.80。一个 50 步的 Agent 任务的 token 成本是指数级增长的,其中 80% 的 token 都花在了已经不需要看的内容上。
  3. 溢出崩溃(Overflow Crash):最直接的失败模式。Agent 在第 N 步时上下文窗口被完全填满,LLM API 返回错误。任务中断,已执行的前 N-1 步工作全部丢失——因为没有跨窗口的状态保持机制。
  4. 僵尸信息污染(Zombie Information):旧的工具调用结果、已解决的讨论、被放弃的探索路径——这些内容永远不会被自动清理,一直占据上下文窗口。更糟的是,LLM 可能被这些过时信息误导——"你之前在第 3 步已经决定用 axios 了呀"(这是 30 步前被推翻的决定)。
  5. 会话失忆(Session Amnesia):Agent 因为上下文溢出被强制重启,新的上下文窗口从头开始——之前的分析、决策、进度全部丢失。它不得不在第 51 步重新搜索 axios 引用,重新分析错误处理模式,重新建立依赖关系图。

关键洞察:上下文窗口是 Agent 的「工作记忆」——有限、昂贵、需要主动管理。把它当成无限的消息队列是对 LLM 工作机制的根本性误解。换个更大的模型(从 128K 到 200K)只是把撞墙的时间推迟了几个小时,不能解决上下文衰减和注意力稀释的问题。解决上下文窗口问题的正确方法不是「买更大的窗口」,而是「在窗口内做更好的管理」。

三个管理维度:压缩、淘汰、委托

本文提出的上下文窗口管理系统围绕三个操作维度:


  维度 1 — 压缩 (Compress):缩小上下文而不丢失关键状态
      策略:对话压缩 / 结构化笔记 / 工具结果摘要 / 渐进压缩 / 子Agent委托

  维度 2 — 淘汰 (Evict):移除不再有用的内容
      策略:FIFO / LRU / 优先级 / 语义相似度合并 / 按类型 / 混合

  维度 3 — 委托 (Delegate):把工作分发到独立的上下文中
      策略:子Agent上下文隔离 → 浓缩结果返回 → 主Agent上下文保持清爽
  

这三个维度不对等——压缩保留信息(以浓缩形式),淘汰删除信息,委托移动信息到独立空间。一个成熟的上下文窗口管理系统在所有三个维度上都有策略,并在运行时根据上下文压力动态选择。

关于上下文窗口管理的上游和下游:Agent 记忆系统设计 定义了 L0-L3 四层记忆架构(L0 就是上下文窗口),本文是 L0 层的操作手册——管理 L0 的大小和内容。记忆系统设计是「仓库建筑师」,本文是「仓库管理员」——决定什么该留在货架上、什么该打包压缩、什么该搬走。

2. 感知上下文压力:什么时候该介入管理

上下文窗口管理的第一步不是「怎么管理」,而是「什么时候该管理」。过早介入浪费计算资源——把还不需要压缩的上下文提前压缩了;过晚介入则风险不可控——上下文已经接近溢出,任何一步额外的工具调用都可能触发崩溃。需要一个精确的压力感知系统。

上下文压力曲线:四个区间

把上下文利用率(当前 tokens / 最大窗口 tokens)映射为四个压力区间:


  利用率范围          压力等级      动作
  ───────────────────────────────────────────────
   0% – 50%          🟢 绿色      正常运行,无需干预
  50% – 75%          🟡 黄色      预备——评估淘汰候选、预计算压缩成本
  75% – 90%          🟠 橙色      行动——执行淘汰,准备压缩
  90% – 100%         🔴 红色      紧急——强制压缩或触发跨窗口保存
  

这些阈值不是随意的。绿色区提供了足够的缓冲——Agent 可以安全地执行多步工具调用而不必担心溢出。黄色区是「感知但不行动」——系统开始收集淘汰候选的优先级分数,但不执行任何修改。橙色区是「这里必须做点什么」——淘汰引擎启动,低优先级内容被移除。红色区是「再不处理就崩了」——如果压缩也来不及,则触发跨窗口状态序列化,在当前窗口崩溃前把状态保存下来。

压力速度:不只是看绝对值

上下文利用率的绝对值("当前 65%")是静态快照。更重要的是压力速度——每轮对话消耗多少 token。一个快速膨胀的 Agent(每轮消耗 3000+ token,因为工具返回大量输出)和一个缓慢膨胀的 Agent(每轮 500 token,因为工具输出简洁)即使在同一利用率下,处理策略也完全不同。

压力速度还暴露了一个隐藏的危险信号:runaway agent loop。如果压力速度突然从 500 token/轮飙升到 5000 token/轮,通常是 Agent 陷入了某种循环——反复重试失败的请求、重复调用返回大量数据的工具、或 LLM 开始生成越来越长的冗餘推理。这些情况下,上下文管理应该与错误恢复联动(见 Agent 错误恢复)。

代码:ContextPressureMonitor

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable
from enum import Enum

class PressureLevel(Enum):
    GREEN = "green"      # 0%-50%: 正常
    YELLOW = "yellow"    # 50%-75%: 预备
    ORANGE = "orange"    # 75%-90%: 行动
    RED = "red"          # 90%-100%: 紧急

@dataclass
class ContextPressureMonitor:
    """上下文压力感知器——检测利用率、跟踪膨胀速度、发出预警"""

    max_context_tokens: int = 128000       # 模型最大上下文窗口(token)
    current_tokens: int = 0                # 当前上下文中的 token 数

    # 压力区间阈值(可配置)
    threshold_yellow: float = 0.50         # 50% → 黄色
    threshold_orange: float = 0.75         # 75% → 橙色
    threshold_red: float = 0.90            # 90% → 红色

    # 速度跟踪(tokens/轮)
    velocity_window_size: int = 5          # 取最近 N 轮的平均值
    _token_history: list = field(default_factory=list)
    _turn_count: int = 0

    # 预警回调
    on_yellow: Optional[Callable] = None   # 进入黄色区时调用
    on_orange: Optional[Callable] = None   # 进入橙色区时调用
    on_red: Optional[Callable] = None      # 进入红色区时调用
    on_velocity_spike: Optional[Callable] = None  # 膨胀速度异常时调用

    _current_level: PressureLevel = PressureLevel.GREEN
    _previous_level: PressureLevel = PressureLevel.GREEN

    def update(self, tokens_added_this_turn: int):
        """每轮推理结束后调用——更新当前 token 计数和压力评估"""
        self.current_tokens += tokens_added_this_turn
        self._turn_count += 1
        self._token_history.append(tokens_added_this_turn)

        # 只保留最近 N 轮的速度历史
        if len(self._token_history) > self.velocity_window_size:
            self._token_history.pop(0)

        # 评估当前压力等级
        utilization = self.utilization()
        if utilization >= self.threshold_red:
            self._current_level = PressureLevel.RED
        elif utilization >= self.threshold_orange:
            self._current_level = PressureLevel.ORANGE
        elif utilization >= self.threshold_yellow:
            self._current_level = PressureLevel.YELLOW
        else:
            self._current_level = PressureLevel.GREEN

        # 检测等级变化并触发回调
        if self._current_level != self._previous_level:
            self._trigger_level_change()

        # 检测膨胀速度异常
        self._check_velocity_spike()

        self._previous_level = self._current_level

    def utilization(self) -> float:
        """当前上下文利用率(0.0 - 1.0)"""
        if self.max_context_tokens <= 0:
            return 0.0
        return min(1.0, self.current_tokens / self.max_context_tokens)

    def velocity(self) -> float:
        """最近 N 轮的平均 token 消耗速度(tokens/轮)"""
        if not self._token_history:
            return 0.0
        return sum(self._token_history) / len(self._token_history)

    def turns_until_overflow(self) -> Optional[int]:
        """预测距离溢出还有多少轮(-1 表示已溢出)"""
        if self.current_tokens >= self.max_context_tokens:
            return -1
        vel = self.velocity()
        if vel <= 0:
            return None  # 无法预测
        remaining = self.max_context_tokens - self.current_tokens
        return int(remaining / vel)

    def tokens_remaining(self) -> int:
        """剩余可用 token 数"""
        return max(0, self.max_context_tokens - self.current_tokens)

    def pressure_summary(self) -> dict:
        """生成当前压力状态的摘要报告"""
        return {
            "utilization": round(self.utilization(), 3),
            "current_tokens": self.current_tokens,
            "tokens_remaining": self.tokens_remaining(),
            "pressure_level": self._current_level.value,
            "velocity": round(self.velocity(), 1),
            "turns_until_overflow": self.turns_until_overflow(),
            "turn_count": self._turn_count,
        }

    def _trigger_level_change(self):
        """压力等级发生变化——触发对应的回调"""
        callbacks = {
            PressureLevel.YELLOW: self.on_yellow,
            PressureLevel.ORANGE: self.on_orange,
            PressureLevel.RED: self.on_red,
        }
        cb = callbacks.get(self._current_level)
        if cb:
            cb(self.pressure_summary())

    def _check_velocity_spike(self):
        """检测膨胀速度是否异常飙升(> 3x 历史平均)"""
        if len(self._token_history) < self.velocity_window_size:
            return
        current = self._token_history[-1]
        historical_avg = sum(self._token_history[:-1]) / max(len(self._token_history) - 1, 1)
        if historical_avg > 0 and current > historical_avg * 3:
            if self.on_velocity_spike:
                self.on_velocity_spike({
                    "current_turn_tokens": current,
                    "historical_avg": round(historical_avg, 1),
                    "spike_ratio": round(current / historical_avg, 2),
                })


# ─── 使用示例 ───
monitor = ContextPressureMonitor(
    max_context_tokens=128000,
    on_yellow=lambda s: print(f"[YELLOW] 利用率 {s['utilization']:.1%}, 预估 {s['turns_until_overflow']} 轮后溢出"),
    on_orange=lambda s: print(f"[ORANGE] 触发淘汰,利用率 {s['utilization']:.1%}"),
    on_red=lambda s: print(f"[RED] 紧急压缩,利用率 {s['utilization']:.1%}"),
    on_velocity_spike=lambda s: print(f"[SPIKE] 膨胀速度异常: {s['spike_ratio']}x"),
)

# 模拟 20 轮 Agent 循环,每轮消耗 2000 token
for turn in range(20):
    monitor.update(2000)
    if turn % 5 == 0:
        print(monitor.pressure_summary())

这个监控器的核心价值在于预测性——它不只告诉你「现在用了多少」,还告诉你「按当前速度,还有 N 轮就会溢出」。这给了上级系统充足的反应时间。当 velocity_spike 被触发时,它可以与 Agent 错误恢复 中的循环检测机制联动——识别 Agent 是否陷入了 runaway loop。

3. 淘汰策略:上下文装不下了,扔掉什么

当上下文压力进入橙色区(75%+),淘汰引擎启动。这是上下文管理中最需要判断力的环节——扔错了,Agent 后续推理失去关键依据;扔少了,剩余空间不够用。淘汰不是一个「找到就删」的简单操作,而是一个多维度的排序和选择问题

上下文内容的分类

在讨论「淘汰什么」之前,先明确上下文中有哪些类型的内容:

内容类型典型占比淘汰风险说明
系统提示词 5-10% 🚫 绝不淘汰 定义了 Agent 的行为框架。部分淘汰会导致 Agent 行为异常
用户消息 5-15% ⚠️ 极高风险 包含初始任务目标和最新指令。早期用户消息可以压缩但不应彻底删除
Assistant 响应 20-40% ⚠️ 中高风险 包含推理链和决策记录。旧的、已完成的推理可以压缩或淘汰
工具调用结果 30-50% 🟢 主要淘汰目标 文件内容、API 响应、搜索结果——通常体量最大、时效性最强
思考块(Thinking) 5-15% 🟢 次要淘汰目标 扩展思考模式的中间推理——完成工具调用周期后可以安全清除
记忆注入(Memory) 5-10% ⚠️ 中低风险 从 L1/L2 拉取的相关记忆——有明确 TTL,过期后可安全移除

六种淘汰策略

策略 1 — FIFO(先进先出)

核心思想:最早进入上下文的消息最先被淘汰。经典滚动窗口。

优点:实现极简(O(1)),行为可预测。
缺点:无视内容重要性——Agent 运行 50 步后初始任务目标被淘汰。
适用:对话式聊天。不适用:需要始终牢记初始目标的 Agent 任务。

策略 2 — LRU(最近最少引用)

核心思想:跟踪哪些内容块在后续推理中被引用。未被重新引用的先淘汰。

优点:自然清除「死胡同探索」。
缺点:需要引用跟踪基础设施,冷启动问题。
适用:探索性 Agent 任务——代码搜索、数据分析。

策略 3 — 优先级淘汰

核心思想:给每个内容块打重要性分数,淘汰最低分的。评分维度包括来源权重(用户消息 > 工具结果)、新鲜度、语义相关性、显式标记。

优点:最灵活。
缺点:评分函数需要调优。
适用:异构内容混合的复杂 Agent 任务。

策略 4 — 语义相似度合并

核心思想:相似度超过 0.85 的两个块合并为一条摘要,token 减半。

优点:保留信息(而非删除),自然去重。
缺点:需要额外 LLM 调用生成合并摘要。
适用:监控/轮询类 Agent——反复获取相似信息。

策略 5 — 按类型淘汰

核心思想:按内容类型的预设规则淘汰——「工具结果超过 10 轮清除」「思考块每轮清除」。

优点:反映自然的内容生命周期。
缺点:规则静态,可能误删关键工具结果。
适用:工具密集型 Agent,配合显式保护标记使用。

策略 6 — 混合淘汰

核心思想:多维度加权评分——总分 = 0.3 × freshness + 0.3 × priority + 0.2 × type_ttl + 0.2 × references

优点:最鲁棒。
缺点:权重调优复杂。
适用:生产级 Agent——追求稳定可靠。

绝对不能淘汰的内容

代码:EvictionEngine — 可插拔的淘汰策略

from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import math

class ContentType(Enum):
    SYSTEM = "system"
    USER_MESSAGE = "user_message"
    ASSISTANT = "assistant"
    TOOL_RESULT = "tool_result"
    THINKING = "thinking"
    MEMORY_INJECTION = "memory_injection"

@dataclass
class ContextBlock:
    """上下文中的一个内容块"""
    block_id: str
    content_type: ContentType
    content: str
    token_count: int
    turn_added: int           # 在哪一轮加入的
    importance_score: float = 0.5
    reference_count: int = 0  # 被后续推理引用的次数
    embedding: Optional[list] = None
    protected: bool = False   # 是否在保护名单上

@dataclass
class EvictionEngine:
    """淘汰引擎——6 种可插拔策略"""
    # 混合策略权重
    weight_freshness: float = 0.3
    weight_priority: float = 0.3
    weight_type: float = 0.2
    weight_reference: float = 0.2
    # 语义相似度合并阈值
    similarity_merge_threshold: float = 0.85
    # 类型级 TTL(保留轮数)
    type_ttl: dict = field(default_factory=lambda: {
        ContentType.TOOL_RESULT: 10,
        ContentType.THINKING: 3,
        ContentType.MEMORY_INJECTION: 5,
    })
    # 保护名单
    protected_types: set = field(default_factory=lambda: {
        ContentType.SYSTEM,
    })

    def score_fifo(self, block: ContextBlock, current_turn: int) -> float:
        """FIFO——越早加入的分数越低"""
        age = current_turn - block.turn_added
        return 1.0 / (1.0 + age)

    def score_lru(self, block: ContextBlock) -> float:
        """LRU——被引用越多次分数越高"""
        if block.reference_count == 0:
            return 0.1  # 冷启动保护
        return min(1.0, block.reference_count / 5.0)

    def score_priority(self, block: ContextBlock,
                        current_task_embedding=None) -> float:
        """优先级——来源权重 + 语义相关性"""
        s = block.importance_score
        source_bonus = {
            ContentType.USER_MESSAGE: 0.3,
            ContentType.SYSTEM: 1.0,
            ContentType.ASSISTANT: 0.1,
            ContentType.TOOL_RESULT: 0.0,
            ContentType.THINKING: -0.1,
            ContentType.MEMORY_INJECTION: 0.05,
        }
        s += source_bonus.get(block.content_type, 0.0)
        if current_task_embedding and block.embedding:
            similarity = self._cosine_similarity(
                current_task_embedding, block.embedding)
            s += similarity * 0.2
        return max(0.0, min(1.0, s))

    def score_type_based(self, block: ContextBlock,
                          current_turn: int) -> float:
        """按类型——超过 TTL 则分数归零"""
        ttl = self.type_ttl.get(block.content_type)
        if ttl is None:
            return 0.5
        age = current_turn - block.turn_added
        if age > ttl:
            return 0.0
        return 1.0 - (age / ttl)

    def score_hybrid(self, block: ContextBlock, current_turn: int,
                      current_task_embedding=None) -> float:
        """混合策略——加权组合所有维度"""
        return (
            self.weight_freshness * self.score_fifo(block, current_turn) +
            self.weight_priority * self.score_priority(
                block, current_task_embedding) +
            self.weight_type * self.score_type_based(block, current_turn) +
            self.weight_reference * self.score_lru(block)
        )

    def select_eviction_candidates(
            self, blocks: list, current_turn: int,
            target_tokens_to_free: int, policy: str = "hybrid",
            current_task_embedding=None) -> list:
        """选择淘汰候选——返回应被淘汰的 block_id 列表"""
        evictable = [b for b in blocks
                     if b.content_type not in self.protected_types
                     and not b.protected]
        if not evictable:
            return []

        score_fn = {
            "fifo": lambda b: self.score_fifo(b, current_turn),
            "lru": lambda b: self.score_lru(b),
            "priority": lambda b: self.score_priority(
                b, current_task_embedding),
            "type_based": lambda b: self.score_type_based(b, current_turn),
            "hybrid": lambda b: self.score_hybrid(
                b, current_turn, current_task_embedding),
        }.get(policy, lambda b: self.score_hybrid(
            b, current_turn, current_task_embedding))

        evictable.sort(key=score_fn)  # 低分优先淘汰

        selected = []
        freed = 0
        for block in evictable:
            if freed >= target_tokens_to_free:
                break
            selected.append(block.block_id)
            freed += block.token_count
        return selected

    def _cosine_similarity(self, a: list, b: list) -> float:
        dot = sum(x * y for x, y in zip(a, b))
        na = math.sqrt(sum(x * x for x in a))
        nb = math.sqrt(sum(y * y for y in b))
        return dot / (na * nb) if na and nb else 0.0

# ─── 使用示例 ───
engine = EvictionEngine()
blocks = [
    ContextBlock("sys1", ContentType.SYSTEM, "你是代码审查 Agent", 200, 0,
                 importance_score=1.0, protected=True),
    ContextBlock("usr1", ContentType.USER_MESSAGE, "迁移 axios→fetch", 50, 0,
                 importance_score=0.9),
    ContextBlock("tool1", ContentType.TOOL_RESULT, "grep: 45 个 axios 引用...", 1500, 1,
                 importance_score=0.5, reference_count=3),
    ContextBlock("think1", ContentType.THINKING, "考虑拦截器...", 200, 4,
                 importance_score=0.2),
]

candidates = engine.select_eviction_candidates(
    blocks, current_turn=10, target_tokens_to_free=2000, policy="hybrid")
print(f"淘汰候选: {candidates}")
# → ['think1', 'tool1'](分数最低的两个,sys1 受保护不受影响)

淘汰引擎的关键设计决策:贪心选择而非全局最优。每个内容块独立评分,从低到高累加直到满足目标释放量。这不是数学上的最优解(没有考虑淘汰 A 后 B 的语义相关性变化),但它是工程上的可行解——计算复杂度 O(n log n),在大规模上下文中可以实时执行。

关于内容类型与上下文信封的对应关系:Agent 上下文协议设计 定义了每种内容如何被打包成上下文信封(Envelope)。淘汰引擎操作在信封粒度——决定哪些信封保留、哪些从窗口中移除。

4. 压缩策略:不丢状态地缩小上下文

淘汰是「删除」——信息永久消失。压缩是「浓缩」——信息减少体积但保留关键内容。在上下文压力达到橙色区时,应该先尝试压缩再淘汰——因为压缩有概率保留信息,淘汰则确定丢失。压缩也不只是「把对话摘要一下」——针对不同类型的内容和不同的压缩需求,有五种策略。

策略 1 — 对话压缩(Conversation Compaction)

核心思想:将整个对话历史发给 LLM,要求其生成一份浓缩摘要。然后清空上下文窗口,以摘要 + 最近 N 条消息重新开始。

压缩提示词的核心原则:不是「总结这段对话」,而是「从这段对话中提取什么必须保留」。一个生产级的压缩提示词需要明确区分:

压缩提示词的设计陷阱:最常见的错误是使用通用摘要提示词(如「请总结以下对话」)。LLM 会把对话总结成自然语言叙述——「用户要求迁移 axios 到 fetch,Agent 搜索了代码库,发现了 45 个引用……」——这种摘要可读不可操作。新窗口启动后,Agent 读了这个摘要仍然不知道下一步该干什么。正确的压缩提示词应该以结构化的方式要求输出:任务状态、架构决策、开放问题、下一步。

策略 2 — 结构化笔记(Structured Note-Taking)

核心思想:Agent 在任务执行过程中主动向外部文件(如 NOTES.mdprogress.json)写入结构化笔记。压缩时,对话历史被丢弃,只保留笔记文件的内容注入新窗口。

这个模式源自生产级 Agent 的实践经验。Anthropic 的 Claude Code 使用 to-do 列表和架构决策记录;Claude Plays Pokémon 使用训练进度日志和已探索地图。笔记文件的本质是「冷启动就绪」——一个新 Agent 会话读取笔记后应该能立即知道状态并继续工作。

笔记写入的时机:

  1. 每个关键决策之后——「决定不修改 users 表 schema」
  2. 每个子任务完成后——「已完成 breaking changes 检查,发现 3 个不兼容 API」
  3. 发现重要教训时——「v3.0 移除了 /api/v1/users,改用 /api/v2/users」
  4. 压缩前——在触发压缩之前,先把所有未写入笔记的关键信息写入

笔记模式与 Agent 记忆系统设计 中 L1 → L2 的提升机制互补——笔记是 Agent 主动选择记录的内容,提升是系统自动评估并持久化的内容。两者叠加形成双重保障。

策略 3 — 工具结果摘要(Tool Result Summarization)

核心思想:对于体积大的工具输出(如 5000 token 的文件内容),不直接删除,而是先用 LLM 提取关键发现,将 5000 token 的输出压缩为 200 token 的摘要。原始输出被丢弃,但摘要保留了关键信息。

触发条件:工具输出 > 摘要阈值(如 500 token)且 Agent 已完成对该输出的推理。如果 Agent 还在引用这个工具输出做决策——不压缩,等推理完成后再压缩。

局限:摘要丢失细节。对于要求精度的任务(如代码审查中需要检查具体的行号),摘要可能不够——需要保留原始引用(如「原始文件路径 + 行号」),让 Agent 在需要细节时重新读取。

策略 4 — 渐进压缩(Progressive Summarization)

核心思想:随信息老化,逐渐提高压缩比。不需要一次性把整个对话摘要——而是分层处理:

  L0 — 当前轮(0 轮前)   → 完整原文
  L1 — 最近 5 轮           → 关键要点摘要
  L2 — 5-20 轮前           → 一句话摘要
  L3 — 20+ 轮前            → 标题(如「axios 迁移:已确认 45 个文件需修改」)
  

渐进压缩的哲学:信息越老,精确细节的价值越低,但概括性的「这件事做完了,结果是 X」仍然有价值。不需要等到 20 轮时再把前 20 轮一起压缩——而是在每轮推进时把最老的信息从 L1 推到 L2、从 L2 推到 L3。

策略 5 — 子 Agent 委托(Sub-Agent Delegation)

核心思想:把一个需要大量上下文的子任务分发给一个独立的子 Agent(拥有干净的上下文窗口),子 Agent 完成任务后返回一份浓缩摘要(1K-2K token)给主 Agent。主 Agent 的上下文保持清爽,子 Agent 在隔离的环境中工作。

何时使用子 Agent 委托:

反模式:不压缩子 Agent 的输出就直接注入主 Agent 上下文。如果子 Agent 执行了 50 步搜索,把所有结果原样返回——主 Agent 的上下文窗口被污染,委托失去了意义。子 Agent 的价值在于浓缩——把 50 步的工作浓缩为一段关键结论。

子 Agent 委托和 多 Agent 编排 的差异:本文讨论的是「用子 Agent 解决上下文压力」这一单一动机;多 Agent 编排覆盖更广的动机——并行、专门化、投票、层级控制等。

压缩触发策略

压缩的触发不能只是「利用率到了 90%」——那个时间点太晚。有效的触发策略是主动 + 分级

  1. 利用率达 75%(橙色):启动工具结果摘要——这是最低成本的压缩,对 Agent 行为影响最小。
  2. 利用率达 85%:执行渐进压缩——把 10+ 轮前的内容层推高。
  3. 利用率达 92%(红色):对话压缩——将整个对话浓缩为摘要并重启上下文。
  4. 利用率达 95% + 子任务可隔离:子 Agent 委托——把可独立的部分分发给子 Agent。

这是一个压缩级联——从低成本低影响的策略开始,逐步升级到高成本高影响的策略。永远不会在橙色区就执行对话压缩(成本过高),也不会在红色区才开始工具结果摘要(效果不足)。

代码:CompressionEngine

代码:CompressionEngine

from dataclasses import dataclass, field
from typing import Optional, Callable

class CompactionStrategy(Enum):
    CONVERSATION_COMPACTION = "conversation_compaction"
    NOTE_TAKING = "note_taking"
    TOOL_RESULT_SUMMARIZATION = "tool_result_summarization"
    PROGRESSIVE = "progressive"
    SUB_AGENT = "sub_agent"

@dataclass
class CompressionEngine:
    """5 种可插拔压缩策略"""

    llm_call: Callable = None
    notes_file_path: str = "agent_notes.md"
    summarization_threshold_tokens: int = 500

    progressive_window_sizes: dict = field(default_factory=lambda: dict(
        l0_full=1, l1_keypoints=5, l2_oneline=15, l3_title=50
    ))

    cascade_thresholds: dict = field(default_factory=lambda: {
        0.75: CompactionStrategy.TOOL_RESULT_SUMMARIZATION,
        0.85: CompactionStrategy.PROGRESSIVE,
        0.92: CompactionStrategy.CONVERSATION_COMPACTION,
        0.95: CompactionStrategy.SUB_AGENT,
    })

    def compact_conversation(self, messages, current_task_goal):
        """对话压缩——生成结构化摘要"""
        if not self.llm_call:
            return ""
        history_text = self._format_messages(messages)
        prompt = (
            "从对话历史提取必须保留的信息。任务目标:"
            + current_task_goal + "。"
            "规则:架构决策、未解决问题、进度、教训。"
        )
        return self.llm_call(prompt)

    def take_structured_note(self, note_type, content):
        """写入结构化笔记"""
        from datetime import datetime
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        with open(self.notes_file_path, "a", encoding="utf-8") as f:
            f.write("## [" + ts + "] " + note_type + "\n")
            f.write(content + "\n---\n")

    def read_notes(self):
        """读取笔记用于新窗口启动"""
        try:
            with open(self.notes_file_path, "r", encoding="utf-8") as f:
                return f.read()
        except FileNotFoundError:
            return ""

    def summarize_tool_result(self, tool_name, raw_output):
        """工具结果摘要"""
        if len(raw_output) < self.summarization_threshold_tokens:
            return raw_output
        if not self.llm_call:
            return raw_output[:self.summarization_threshold_tokens] + "..."
        prompt = "从 " + tool_name + " 输出提取关键发现"
        summary = self.llm_call(prompt)
        return "[摘要] " + summary + "\n[来源: " + tool_name + "]"

    def progressive_compress(self, blocks, current_turn):
        """渐进压缩——按轮次年龄提升压缩层级"""
        result = []
        for block in blocks:
            age = current_turn - block.turn_added
            ws = self.progressive_window_sizes
            if age <= ws["l0_full"]:
                block.compression_level = "l0"
            elif age <= ws["l1_keypoints"]:
                block.content = block.content[:300]
                block.compression_level = "l1"
            elif age <= ws["l2_oneline"]:
                fl = block.content.split("\n")[0][:100]
                block.content = str(block.turn_added) + " " + fl + "..."
                block.compression_level = "l2"
            else:
                block.content = "轮" + str(block.turn_added) + ": " + block.content_type.value
                block.compression_level = "l3"
            result.append(block)
        return result

    def delegate_to_sub_agent(self, task_spec, relevant_context):
        """子 Agent 委托"""
        if not self.llm_call:
            return "[子Agent不可用] 任务: " + task_spec
        prompt = "子任务:" + task_spec + "。返回浓缩摘要。"
        return self.llm_call(prompt)

    def select_strategy(self, utilization):
        """根据利用率选择压缩策略"""
        for t in sorted(self.cascade_thresholds.keys(), reverse=True):
            if utilization >= t:
                return self.cascade_thresholds[t]
        return None

    def _format_messages(self, messages):
        lines = []
        for msg in messages[-100:]:
            r = getattr(msg, "role", "unknown")
            c = getattr(msg, "content", "")[:500]
            lines.append("[" + r + "] " + c)
        return "\n".join(lines)


# --- 使用示例 ---
def mock_llm(prompt):
    return "v3 API 端点变更,45个文件受影响"

engine = CompressionEngine(llm_call=mock_llm)
summary = engine.compact_conversation([], "迁移 axios 到 fetch")
engine.take_structured_note("ARCH_DECISION", "使用原生 fetch")
tool_out = "ERROR at line 142" * 10
comp = engine.summarize_tool_result("grep_axios", tool_out)
print("75% 利用率策略:", engine.select_strategy(0.75))
print("95% 利用率策略:", engine.select_strategy(0.95))

压缩引擎的 select_strategy 方法是级联决策的核心。ContextWindowManager 的上层只需调用它就能获取正确的下一步操作。压缩质量评估将在 第 7 章:上下文健康监控 中详细讨论。

5. Token 预算管理:追踪、分配与限额执行

淘汰和压缩解决了「上下文装不下了怎么办」。但一个更根本的问题是:「每个组件在上下文中该占多少空间?」如果没有预算管理,淘汰和压缩都是在救火——而预算管理是防火。

Token 预算的数据模型

上下文窗口是一个共享空间——系统提示词、工具定义、消息历史、工具调用结果、思考块、记忆注入都在争抢同一个窗口。每类组件的 token 消耗需要被独立追踪:

组件建议占比策略超额处理
系统提示词5-8%固定分配,永不淘汰💀 不可超额——提示词必须精简
工具定义3-5%固定分配,可延迟加载使用工具搜索代替全量注入
消息历史60-70%动态分配,主要淘汰目标触发压缩/淘汰
工具调用结果10-15%上限分配,超限摘要摘要后淘汰旧的
记忆注入5-10%上限分配,相关性过滤降低相关性门槛或淘汰旧记忆
输出预留4-8%始终保留如果剩余不足,触发紧急压缩

软限额 vs 硬限额

预算管理不是「超了就报错」那么简单。软限额和硬限额的区分让系统在严格和灵活之间找到平衡:

Token 消耗速度(Burn Rate)预测

预算管理器不只是记录「当前用了多少」,还预测「按当前速度,什么时候用完」。这个预测能力让系统可以在临界点之前主动采取行动。预测公式:


  turns_remaining = (total_budget - used_tokens) / avg_tokens_per_turn
  minutes_remaining = turns_remaining * avg_seconds_per_turn / 60
  

如果预测显示 3 轮后就会溢出而当前任务还需要 10 轮——必须立即启动压缩或委托,不能等到 3 轮后再救火。

Token 成本是与工具设计强相关的——工具返回的 token 越多,消息历史的预算消耗越快。有效的工具应该返回精简、高信号的结果。参见 Agent 工具设计 中关于 token 高效工具的设计原则。

代码:TokenBudgetManager

from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class BudgetComponent(Enum):
    SYSTEM_PROMPT = "system_prompt"
    TOOL_DEFINITIONS = "tool_definitions"
    MESSAGE_HISTORY = "message_history"
    TOOL_RESULTS = "tool_results"
    MEMORY_INJECTION = "memory_injection"
    OUTPUT_RESERVED = "output_reserved"

@dataclass
class ComponentBudget:
    """单个组件的 token 预算"""
    used: int = 0
    soft_limit_pct: float = 0.80   # 80% 软限额
    hard_limit_tokens: int = 0       # 硬限额(绝对上限)

    def soft_limit(self):
        return int(self.hard_limit_tokens * self.soft_limit_pct)

    def utilization(self):
        if self.hard_limit_tokens == 0:
            return 0.0
        return self.used / self.hard_limit_tokens

    def is_soft_exceeded(self):
        return self.used >= self.soft_limit()

    def is_hard_exceeded(self):
        return self.used >= self.hard_limit_tokens


@dataclass
class TokenBudgetManager:
    """Token 预算管理器——追踪、分配、限额执行"""

    max_context_tokens: int = 128000
    # 每个组件在总预算中的占比
    allocation_pct: dict = field(default_factory=lambda: {
        BudgetComponent.SYSTEM_PROMPT: 0.06,
        BudgetComponent.TOOL_DEFINITIONS: 0.04,
        BudgetComponent.MESSAGE_HISTORY: 0.65,
        BudgetComponent.TOOL_RESULTS: 0.12,
        BudgetComponent.MEMORY_INJECTION: 0.08,
        BudgetComponent.OUTPUT_RESERVED: 0.05,
    })
    # 各组件当前状态
    budgets: dict = field(default_factory=dict)
    # 速度追踪(tokens/轮)
    burn_history: list = field(default_factory=list)
    burn_window_size: int = 10

    def __post_init__(self):
        for comp, pct in self.allocation_pct.items():
            self.budgets[comp] = ComponentBudget(
                hard_limit_tokens=int(self.max_context_tokens * pct))

    def track_usage(self, component, tokens_added):
        """追踪组件 token 消耗"""
        budget = self.budgets.get(component)
        if not budget:
            return
        budget.used += tokens_added

    def check_before_add(self, component, tokens_to_add):
        """添加内容前检查——是否超限"""
        budget = self.budgets.get(component)
        if not budget:
            return dict(allowed=True)
        after = budget.used + tokens_to_add
        if after >= budget.hard_limit_tokens:
            return dict(
                allowed=False,
                reason="硬限额超出",
                component=component.value,
                current=budget.used,
                limit=budget.hard_limit_tokens,
                excess=after - budget.hard_limit_tokens,
            )
        if after >= budget.soft_limit():
            return dict(
                allowed=True,
                warning="软限额超出——建议压缩",
                component=component.value,
            )
        return dict(allowed=True)

    def total_used(self):
        return sum(b.used for b in self.budgets.values())

    def total_utilization(self):
        return self.total_used() / self.max_context_tokens

    def burn_rate(self):
        """平均每轮 token 消耗速度"""
        if not self.burn_history:
            return 0.0
        return sum(self.burn_history) / len(self.burn_history)

    def turns_until_exhausted(self):
        """预测距离预算耗尽还有多少轮"""
        rate = self.burn_rate()
        if rate <= 0:
            return None
        remaining = self.max_context_tokens - self.total_used()
        return int(remaining / rate)

    def record_turn(self, tokens_this_turn):
        """每轮结束后记录消耗"""
        self.burn_history.append(tokens_this_turn)
        if len(self.burn_history) > self.burn_window_size:
            self.burn_history.pop(0)

    def budget_report(self):
        """生成预算报告"""
        report = []
        for comp, budget in self.budgets.items():
            report.append(dict(
                component=comp.value,
                used=budget.used,
                limit=budget.hard_limit_tokens,
                utilization=round(budget.utilization(), 2),
                exceeded=budget.is_hard_exceeded(),
            ))
        return dict(
            components=report,
            total_used=self.total_used(),
            total_utilization=round(self.total_utilization(), 2),
            burn_rate=round(self.burn_rate()),
            turns_remaining=self.turns_until_exhausted(),
        )


# --- 使用示例 ---
budget_mgr = TokenBudgetManager(max_context_tokens=128000)

# 追踪各组件消耗
budget_mgr.track_usage(BudgetComponent.SYSTEM_PROMPT, 5000)
budget_mgr.track_usage(BudgetComponent.TOOL_DEFINITIONS, 3000)
budget_mgr.track_usage(BudgetComponent.MESSAGE_HISTORY, 45000)

# 添加前检查
check = budget_mgr.check_before_add(
    BudgetComponent.MESSAGE_HISTORY, 20000)
print("添加检查:", check)

# 预算报告
report = budget_mgr.budget_report()
print("总利用率:", report["total_utilization"])
print("预估剩余轮数:", report["turns_remaining"])

TokenBudgetManager 的核心价值在于事前预防。在每次向上下文添加内容之前,check_before_add 评估「加进去会不会超限」。如果会——不是拒绝添加,而是触发压缩/淘汰/委托。这样上下文管理从「被动救火」变成了「主动防火」。

预算管理器与上下文协议的集成:Agent 上下文协议设计 中的信封路由系统根据组件类型将内容分配到正确的预算桶中。

6. 跨窗口状态管理:让 Agent 在上下文重置后继续工作

压缩和淘汰能延长上下文窗口的寿命,但有些任务实在太长——即使经过多轮压缩,压缩摘要 + 关键上下文 + 新输出仍然会填满一个窗口。此时必须跨越上下文窗口边界:在当前窗口关闭前保存状态,在下一个窗口启动时恢复状态。

跨窗口状态序列化合约

上下文窗口之间传递的不是「对话历史」,而是一份结构化状态对象。这份对象必须包含新窗口立即可用的所有信息:

启动引导序列

新上下文窗口启动时,Agent 执行标准化的引导序列来重建工作状态:

  1. 定位:执行 pwd 确认工作目录
  2. 读取状态文件:加载上次窗口序列化的 JSON 状态
  3. 验证环境:运行 git status 等基本检查确认环境一致
  4. 注入摘要:将上次窗口的压缩摘要注入新窗口的工作记忆
  5. 确认下一步:从进度状态提取当前子任务,开始执行

「干净状态」要求

每次上下文窗口关闭时,Agent 必须确保外部环境处于可接力的状态:没有未提交的代码变更、没有遗留的运行中服务、进度文件已更新到最新。

反模式:跨窗口状态的常见陷阱

孤儿状态:状态文件写入了「计划做」但 Agent 未执行就被中断——新窗口误以为已完成。修复:在状态中区分「planned」和「done」标记。

过期检查点:多个窗口并行操作同一状态文件,旧检查点覆盖了新状态。修复:使用单调递增的窗口编号和状态版本号。

隐式状态:Agent 依赖「我记得上次看到了 X」而未写入状态——跨窗口时「记忆」不存在。修复:所有关键发现必须显式序列化。

跨窗口状态管理和 Agent 错误恢复 互补:上下文溢出是一种特殊的错误场景,错误恢复检测到溢出时应触发跨窗口状态保存,在崩溃前持久化进度。

代码:CrossWindowStateManager

from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional
import json, os

@dataclass
class AgentState:
    """跨窗口传递的 Agent 状态对象"""
    window_id: int = 1
    task_goal: str = ""
    success_criteria: list = field(default_factory=list)
    constraints: list = field(default_factory=list)
    completed_subtasks: list = field(default_factory=list)
    current_subtask: str = ""
    remaining_subtasks: list = field(default_factory=list)
    decisions: list = field(default_factory=list)
    open_issues: list = field(default_factory=list)
    learnings: list = field(default_factory=list)
    compaction_summary: str = ""
    saved_at: str = ""

@dataclass
class CrossWindowStateManager:
    """跨窗口状态管理器——序列化、恢复、验证"""

    state_file_path: str = "agent_state.json"
    progress_file_path: str = "agent_progress.md"

    def save_state(self, state: AgentState):
        """保存状态到文件——在窗口关闭前调用"""
        state.saved_at = datetime.now().isoformat()
        # JSON 格式——机器可解析
        with open(self.state_file_path, "w", encoding="utf-8") as f:
            json.dump(asdict(state), f, ensure_ascii=False, indent=2)
        # Markdown 格式——人类可读
        self._write_progress_md(state)

    def load_state(self) -> Optional[AgentState]:
        """从文件加载状态——在新窗口启动时调用"""
        if not os.path.exists(self.state_file_path):
            return None
        with open(self.state_file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        state = AgentState(**data)
        # 验证状态完整性
        validation = self.validate_state(state)
        if validation["has_errors"]:
            print("状态验证警告:", validation["errors"])
        return state

    def bootstrap_prompt(self, state: AgentState) -> str:
        """生成新窗口的启动引导提示词"""
        lines = []
        lines.append("[CONTEXT_WINDOW_RESUME] 从上一个上下文窗口恢复")
        lines.append(f"任务目标: {state.task_goal}")
        lines.append(f"已完成: {len(state.completed_subtasks)} 个子任务")
        for task in state.completed_subtasks[-5:]:
            lines.append(f"  [done] {task}")
        if state.current_subtask:
            lines.append(f"当前: {state.current_subtask}")
        if state.remaining_subtasks:
            lines.append(f"剩余: {len(state.remaining_subtasks)} 个子任务")
            for task in state.remaining_subtasks[:5]:
                lines.append(f"  [todo] {task}")
        if state.open_issues:
            lines.append("未解决问题:")
            for issue in state.open_issues:
                lines.append(f"  [!] {issue}")
        if state.decisions:
            lines.append("关键决策:")
            for d in state.decisions[-5:]:
                lines.append(f"  [dec] {d}")
        if state.compaction_summary:
            lines.append(f"上一窗口摘要: {state.compaction_summary}")
        return "\n".join(lines)

    def validate_state(self, state: AgentState) -> dict:
        """验证状态对象的完整性"""
        errors = []
        if not state.task_goal:
            errors.append("缺少任务目标")
        if not state.current_subtask and state.remaining_subtasks:
            errors.append("有剩余子任务但未指定当前子任务")
        if state.window_id <= 0:
            errors.append("无效的窗口 ID")
        return dict(
            has_errors=len(errors) > 0,
            errors=errors,
        )

    def _write_progress_md(self, state: AgentState):
        """写入人类可读的 Markdown 进度文件"""
        with open(self.progress_file_path, "w", encoding="utf-8") as f:
            f.write(f"# Agent Progress - Window {state.window_id}\n\n")
            f.write(f"**Goal:** {state.task_goal}\n\n")
            f.write(f"**Saved:** {state.saved_at}\n\n")
            f.write("## Completed\n")
            for task in state.completed_subtasks:
                f.write(f"- [x] {task}\n")
            f.write(f"\n## Current: {state.current_subtask}\n\n")
            f.write("## Remaining\n")
            for task in state.remaining_subtasks:
                f.write(f"- [ ] {task}\n")
            if state.open_issues:
                f.write("\n## Open Issues\n")
                for issue in state.open_issues:
                    f.write(f"- [!] {issue}\n")


# --- 使用示例 ---
manager = CrossWindowStateManager()

# 窗口 1 结束时保存状态
state = AgentState(
    window_id=1,
    task_goal="将 user-service 从 v2.1 升级到 v3.0",
    completed_subtasks=["检查 breaking changes", "staging 部署成功"],
    current_subtask="运行集成测试",
    remaining_subtasks=["金丝雀发布", "全量切换"],
    decisions=["使用滚动更新保证零停机"],
    open_issues=[],
    compaction_summary="v3.0 移除了 /api/v1/users 端点",
)
manager.save_state(state)

# 窗口 2 启动时恢复
loaded = manager.load_state()
if loaded:
    bootstrap = manager.bootstrap_prompt(loaded)
    print(bootstrap[:200])

跨窗口状态管理是上下文窗口管理系统的最后一道防线——当压缩和淘汰都无法避免窗口重置时,它确保 Agent 不是从零开始,而是从上一个窗口的精确检查点继续。这是长任务 Agent 能够在生产环境中运行数小时的关键。

7. 上下文健康监控:指标、告警与压缩质量评估

你实现了压缩、淘汰、预算管理——但它们真的在工作吗?压缩有没有丢失关键信息?淘汰是不是太激进了?上下文衰减有没有在暗中恶化?没有监控的上下文管理是盲飞——你需要定量指标来验证每个子系统的效果。

核心健康指标

指标含义健康范围告警阈值
利用率(%) 当前 tokens / 最大窗口 tokens < 60% > 90% 持续 3 轮 → CRITICAL
淘汰频率 每轮淘汰的事件数 < 2/轮 > 5/轮 → WARNING(窗口可能太小)
压缩比 原始 tokens / 压缩后 tokens 3x–10x < 2x → 压缩效果差,需优化提示词
Token 消耗速度 tokens/轮 稳定或线性增长 突然 3x+ → 可能 runaway loop
压缩保真度 压缩后关键信息保留率(0-1) > 0.85 < 0.75 → 压缩丢了重要信息
工具结果膨胀比 工具结果 tokens / 总上下文 tokens 10–15% > 30% → 工具返回过多数据

压缩保真度评估

压缩质量评估是上下文监控中最难也最关键的环节。你怎么知道压缩后的摘要是否保留了关键信息?答案是LLM-as-Judge——用另一个 LLM 调用来评估压缩质量。

评估方法:

  1. 准备测试问题集:针对原始对话内容,准备 N 个问题(如「任务当前在第几步?」「用户在第 3 轮提了什么关键要求?」)
  2. 对压缩前和压缩后的上下文分别提问:记录两个答案
  3. LLM-as-Judge 评分:将两个答案和正确答案发送给 Judge LLM,让它评分答案是否一致
  4. 保真度 = 一致答案数 / 总问题数:目标 > 0.85

这个评估流程可以自动化——在 CI 中作为压缩提示词的回归测试,每次修改压缩提示词后自动运行保真度评估。

保真度评估与 Agent 评估框架 紧密相关——评估框架提供了 LLM-as-Judge 评分的基础设施,保真度评估是其在压缩领域的特化应用。

告警规则

告警不应该「响了再说」——上下文管理的告警应该是分级的:

这些告警指标应该作为 Prometheus 指标发射,与 Agent 可观察性 的基础设施集成——通过 Grafana 仪表盘可视化、AlertManager 触发通知。

代码:ContextHealthMonitor

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable
from enum import Enum

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class HealthMetrics:
    """上下文健康指标快照"""
    utilization: float = 0.0
    eviction_frequency: float = 0.0        # 每轮淘汰次数
    compression_ratio: float = 1.0         # 原始/压缩后(> 1 表示压缩有效)
    burn_rate: float = 0.0                 # tokens/轮
    fidelity_score: float = 1.0            # 压缩保真度(0-1)
    tool_result_bloat: float = 0.0         # 工具结果占比
    timestamp: str = ""

@dataclass
class ContextHealthMonitor:
    """上下文健康监控——指标收集、保真度评估、告警"""

    # 指标历史(用于趋势分析)
    metrics_history: list = field(default_factory=list)
    max_history_size: int = 100

    # 告警回调
    on_alert: Optional[Callable] = None

    # 保真度评估
    fidelity_test_questions: list = field(default_factory=list)

    def collect_metrics(self, pressure_monitor, eviction_engine,
                         budget_manager, compression_engine) -> HealthMetrics:
        """从各子系统收集指标并计算健康状态"""
        m = HealthMetrics()
        m.timestamp = datetime.now().isoformat()
        m.utilization = pressure_monitor.utilization()
        m.burn_rate = budget_manager.burn_rate()
        # 工具结果膨胀比
        tr_budget = budget_manager.budgets.get("tool_results")
        if tr_budget:
            total = budget_manager.total_used()
            m.tool_result_bloat = tr_budget.used / max(total, 1)
        self.metrics_history.append(m)
        if len(self.metrics_history) > self.max_history_size:
            self.metrics_history.pop(0)
        return m

    def evaluate_fidelity(self, original_context: str,
                            compressed_context: str,
                            llm_judge: Callable) -> float:
        """评估压缩保真度——LLM-as-Judge 方法"""
        if not self.fidelity_test_questions:
            return 1.0  # 没有测试问题,跳过评估

        consistent_count = 0
        for question in self.fidelity_test_questions:
            # 对原始上下文提问
            answer_orig = llm_judge(
                f"上下文:{original_context[:5000]}\n"
                f"问题:{question}\n请用一句话回答。")
            # 对压缩上下文提问
            answer_comp = llm_judge(
                f"上下文:{compressed_context[:5000]}\n"
                f"问题:{question}\n请用一句话回答。")
            # Judge 判断是否一致
            judge_result = llm_judge(
                f"判断以下两个答案是否传达相同的关键信息:\n"
                f"A: {answer_orig}\nB: {answer_comp}\n"
                f"只回答 yes 或 no。")
            if "yes" in judge_result.lower():
                consistent_count += 1

        fidelity = consistent_count / len(self.fidelity_test_questions)
        return fidelity

    def check_alerts(self, metrics: HealthMetrics):
        """根据指标检查告警条件"""
        alerts = []

        if metrics.utilization > 0.90:
            alerts.append(dict(
                level=AlertLevel.CRITICAL,
                message=f"上下文利用率 {metrics.utilization:.1%} 超过 90%",
                metric="utilization",
            ))

        if metrics.eviction_frequency > 5:
            alerts.append(dict(
                level=AlertLevel.WARNING,
                message=f"淘汰频率过高: {metrics.eviction_frequency}/轮",
                metric="eviction_frequency",
            ))

        if metrics.fidelity_score < 0.75:
            alerts.append(dict(
                level=AlertLevel.WARNING,
                message=f"压缩保真度过低: {metrics.fidelity_score:.2f}",
                metric="fidelity_score",
            ))

        if metrics.compression_ratio < 2.0 and metrics.compression_ratio > 1.0:
            alerts.append(dict(
                level=AlertLevel.WARNING,
                message=f"压缩比偏低: {metrics.compression_ratio:.1f}x",
                metric="compression_ratio",
            ))

        for alert in alerts:
            if self.on_alert:
                self.on_alert(alert)

        return alerts

    def trend_report(self) -> dict:
        """生成趋势报告——对比最近 N 个指标快照"""
        if len(self.metrics_history) < 2:
            return dict(status="insufficient_data")

        recent = self.metrics_history[-10:]
        first = recent[0]
        last = recent[-1]

        return dict(
            utilization_change=last.utilization - first.utilization,
            burn_rate_change=last.burn_rate - first.burn_rate,
            avg_fidelity=sum(
                m.fidelity_score for m in recent) / len(recent),
            data_points=len(recent),
        )


# --- 使用示例 ---
monitor = ContextHealthMonitor(
    fidelity_test_questions=[
        "当前任务的目标是什么?",
        "已经完成了哪些步骤?",
        "有什么未解决的问题?",
    ]
)

def judge_fn(prompt):
    return "yes, the answers convey the same information"

# 评估压缩保真度
fidelity = monitor.evaluate_fidelity(
    "我们决定迁移 axios 到 fetch。已完成 7/12 任务...",
    "迁移 axios 到 fetch,完成 7/12 任务",
    llm_judge=judge_fn,
)
print(f"压缩保真度: {fidelity:.2f}")

上下文健康监控将「上下文管理是否有效」从一个定性感受变成了一个定量事实。每个压缩操作、每次淘汰决策都产生可测量的影响。这套指标不仅用于实时告警,也为后续的权重调优和策略选择提供了数据基础。

8. 整合:完整的 ContextWindowManager 架构

前面七章分别实现了压力感知、淘汰引擎、压缩引擎、预算管理、跨窗口状态和健康监控。这些子系统不是独立运行的——它们需要在一个统一的编排器中协同工作。本章给出完整的 ContextWindowManager 整合架构。

架构全景图


  Agent Loop  →  ContextWindowManager(统一编排器)
                      │
       ┌──────────────┼──────────────┬──────────────┬──────────────┐
       ▼              ▼              ▼              ▼              ▼
  PressureMonitor  EvictionEngine  Compression    TokenBudget    CrossWindow
  (何时介入)     (扔掉什么)    Engine        Manager        StateManager
                      │         (缩小上下文)  (限额执行)   (持久化状态)
                 ┌────┴────┐
                 ▼         ▼
            6种淘汰策略  5种压缩策略
            (FIFO/LRU/  (Compaction/
             Priority/   Notes/Summary/
             Semantic/   Progressive/
             Type/Hybrid) SubAgent)

       ┌──────────────────────────────────────────────────┐
       ▼                                                  ▼
  ContextHealthMonitor                           可观察性管道
  (指标/保真度/告警)                          (Prometheus + Grafana)
  

生命周期:一个 Agent 任务如何流经 ContextWindowManager

从任务启动到完成,上下文窗口管理的每一步都有明确的子系统负责:

  1. 任务启动:TokenBudgetManager 根据配置分配预算 → CrossWindowStateManager 检查是否有上一个窗口的残留状态(如果是恢复任务,加载状态并执行引导序列)→ ContextPressureMonitor 开始追踪利用率
  2. 每次 LLM 调用前:ContextPressureMonitor 检查利用率 → 如果进入黄色/橙色区,EvictionEngine 选择淘汰候选并执行 → 如果压力持续,CompressionEngine 执行级联压缩 → TokenBudgetManager 验证下一轮是否有足够预算
  3. 工具调用后:TokenBudgetManager 追踪工具结果的 token 消耗 → 如果工具结果膨胀比超标,CompressionEngine.summarize_tool_result 进行摘要 → EvictionEngine 按类型 TTL 淘汰旧工具结果
  4. 上下文溢出(红色区):如果利用率达到 95%+ 且压缩不足以缓解 → CrossWindowStateManager 序列化当前状态 → 上下文窗口重置 → 引导序列启动 → Agent 从检查点恢复
  5. 任务完成:CompressionEngine 生成最终压缩摘要 → CrossWindowStateManager 归档状态 → ContextHealthMonitor 发射最终指标快照

代码:ContextWindowManager(完整整合)

from dataclasses import dataclass, field
from typing import Optional, Callable
from enum import Enum

class WindowStatus(Enum):
    HEALTHY = "healthy"       # 绿色区:正常
    PREPARING = "preparing"   # 黄色区:准备淘汰/压缩
    ACTIVE_MANAGEMENT = "active_management"  # 橙色区:主动淘汰+压缩
    EMERGENCY = "emergency"   # 红色区:紧急跨窗口保存

@dataclass
class ContextWindowManager:
    """上下文窗口管理器——整合所有子系统"""

    # 六大子系统
    pressure_monitor: object = None     # ContextPressureMonitor
    eviction_engine: object = None      # EvictionEngine
    compression_engine: object = None   # CompressionEngine
    budget_manager: object = None       # TokenBudgetManager
    state_manager: object = None        # CrossWindowStateManager
    health_monitor: object = None       # ContextHealthMonitor

    # 配置
    eviction_policy: str = "hybrid"     # 淘汰策略选择
    max_context_tokens: int = 128000
    task_goal: str = ""

    # 运行时状态
    status: WindowStatus = WindowStatus.HEALTHY
    turn_count: int = 0
    windows_created: int = 1
    context_blocks: list = field(default_factory=list)
    events_log: list = field(default_factory=list)

    def on_turn_start(self) -> dict:
        """每轮推理开始前调用——返回上下文管理决策"""
        self.turn_count += 1
        utilization = self.pressure_monitor.utilization()
        budget_ok = self.budget_manager.check_before_add(
            "message_history", 4000)  # 预估输出量

        # 判断当前状态
        if utilization >= 0.95:
            self.status = WindowStatus.EMERGENCY
            return self._handle_emergency()

        elif utilization >= 0.75:
            self.status = WindowStatus.ACTIVE_MANAGEMENT
            return self._handle_active_management(utilization)

        elif utilization >= 0.50:
            self.status = WindowStatus.PREPARING
            return self._handle_preparing()

        else:
            self.status = WindowStatus.HEALTHY
            return dict(action="none", utilization=utilization)

    def on_turn_end(self, tokens_added: int):
        """每轮推理结束后调用——更新追踪"""
        self.pressure_monitor.update(tokens_added)
        self.budget_manager.track_usage("message_history", tokens_added)
        self.budget_manager.record_turn(tokens_added)

    def on_tool_result(self, tool_name: str, result: str,
                        token_count: int):
        """工具调用完成后——追踪、摘要、淘汰"""
        # 追踪预算
        self.budget_manager.track_usage("tool_results", token_count)

        # 工具结果摘要
        if token_count > self.compression_engine.summarization_threshold_tokens:
            result = self.compression_engine.summarize_tool_result(
                tool_name, result)

        # 添加到上下文块列表(用于淘汰决策)
        from eviction_engine import ContextBlock, ContentType
        block = ContextBlock(
            block_id=f"tool_{self.turn_count}",
            content_type=ContentType.TOOL_RESULT,
            content=result[:500],
            token_count=token_count,
            turn_added=self.turn_count,
            importance_score=0.5,
        )
        self.context_blocks.append(block)

        # 更新压力监控
        self.pressure_monitor.update(token_count)

    def on_context_overflow(self):
        """上下文即将溢出——触发跨窗口保存"""
        self._log_event("overflow", dict(turn=self.turn_count))

        # 序列化当前状态
        state = AgentState(
            window_id=self.windows_created,
            task_goal=self.task_goal,
            compaction_summary=self.compression_engine.compact_conversation(
                self.context_blocks, self.task_goal),
        )
        self.state_manager.save_state(state)

        # 收集健康指标
        metrics = self.health_monitor.collect_metrics(
            self.pressure_monitor, self.eviction_engine,
            self.budget_manager, self.compression_engine)
        self.health_monitor.check_alerts(metrics)

        return dict(
            action="cross_window_save",
            new_window_id=self.windows_created + 1,
            metrics=metrics,
        )

    def _handle_preparing(self) -> dict:
        """黄色区——评估但不行动"""
        candidates = self.eviction_engine.select_eviction_candidates(
            self.context_blocks, self.turn_count,
            target_tokens_to_free=0,  # 只评估,不淘汰
            policy=self.eviction_policy)
        return dict(
            action="evaluate_candidates",
            candidate_count=len(candidates),
            recommendation="准备淘汰——如果利用率继续上升",
        )

    def _handle_active_management(self, utilization: float) -> dict:
        """橙色区——主动淘汰 + 按需压缩"""
        # 计算需要释放多少 token
        target = self.max_context_tokens - (
            self.max_context_tokens * 0.6)
        target_to_free = self.pressure_monitor.current_tokens - target

        # 1. 先尝试淘汰
        evicted = self.eviction_engine.select_eviction_candidates(
            self.context_blocks, self.turn_count,
            target_tokens_to_free=target_to_free,
            policy=self.eviction_policy)

        # 2. 如果淘汰不够,启动压缩
        strategy = self.compression_engine.select_strategy(utilization)
        compressed = None
        if strategy:
            compressed = self.compression_engine.progressive_compress(
                self.context_blocks, self.turn_count)
            self._log_event("compression", dict(strategy=strategy.value))

        return dict(
            action="active_management",
            evicted_count=len(evicted),
            evicted_ids=evicted,
            compression_strategy=strategy.value if strategy else None,
            tokens_freed_estimate=sum(
                b.token_count for b in self.context_blocks
                if b.block_id in evicted) if evicted else 0,
        )

    def _handle_emergency(self) -> dict:
        """红色区——强制压缩或跨窗口保存"""
        # 尝试最后一次对话压缩
        summary = self.compression_engine.compact_conversation(
            self.context_blocks, self.task_goal)
        self._log_event("emergency_compaction", dict())

        # 评估是否需要跨窗口保存
        if self.pressure_monitor.turns_until_overflow() < 2:
            return self.on_context_overflow()

        return dict(
            action="emergency_progressive_compression",
            summary_length=len(summary),
        )

    def _log_event(self, event_type: str, metadata: dict):
        self.events_log.append(dict(
            turn=self.turn_count,
            type=event_type,
            timestamp=datetime.now().isoformat(),
            metadata=metadata,
        ))

    def get_audit_trail(self) -> list:
        """获取上下文管理的完整审计追踪"""
        return self.events_log


# ─── 集成配置(YAML) ───

# context_window_config.yaml
"""
max_context_tokens: 128000

pressure_thresholds:
  yellow: 0.50
  orange: 0.75
  red: 0.90

eviction:
  policy: hybrid
  weights:
    freshness: 0.3
    priority: 0.3
    type_ttl: 0.2
    reference: 0.2
  type_ttl:
    tool_result: 10
    thinking: 3
    memory_injection: 5
  protected_types:
    - system
    - user_message_current_turn

compression:
  summarization_threshold_tokens: 500
  cascade:
    0.75: tool_result_summarization
    0.85: progressive
    0.92: conversation_compaction
    0.95: sub_agent
  progressive:
    l0_full: 1
    l1_keypoints: 5
    l2_oneline: 15
    l3_title: 50
  notes_file: agent_notes.md

budget:
  allocations:
    system_prompt: 0.06
    tool_definitions: 0.04
    message_history: 0.65
    tool_results: 0.12
    memory_injection: 0.08
    output_reserved: 0.05
  soft_limit_pct: 0.80

cross_window:
  state_file: agent_state.json
  progress_file: agent_progress.md

health_monitoring:
  metrics:
    - utilization
    - eviction_frequency
    - compression_ratio
    - burn_rate
    - fidelity_score
  alerts:
    utilization_critical: 0.90
    eviction_warning: 5
    fidelity_warning: 0.75
    burn_rate_spike_ratio: 2.0
"""

ContextWindowManager 是整套上下文窗口管理系统的单一入口。Agent 开发者不需要分别调用六个子系统——只需在 Agent 主循环的关键钩子点(on_turn_starton_turn_endon_tool_resulton_context_overflow)调用 ContextWindowManager 的对应方法。内部的压力感知、淘汰选择、压缩级联、预算验证、状态序列化全部透明。

这份 YAML 配配置将所有可调参数集中在一处——淘汰策略和权重、压缩触发阈值、预算分配比例、跨窗口文件路径、健康监控告警阈值。针对不同任务类型(代码审查、数据分析、客户服务),只需切换配置文件即可获得不同的上下文管理行为。

上下文管理的所有事件(淘汰、压缩、预算超限、跨窗口保存)都应作为审计事件被记录。参见 Agent 审计日志设计——每条上下文管理事件都是一条不可篡改的审计记录,形成完整的证据链。

常见问题(FAQ)

上下文窗口管理和 agent-memory-design 那篇文章的关系是什么?

agent-memory-design 是「仓库建筑师」——定义 L0-L3 四层记忆架构、每层存什么、检索边界如何设计。本文是「仓库管理员」——管理 L0(上下文窗口)的大小和内容。memory-design 说 L0 应该存什么(task_goal、active_plan、observations),本文说 L0 装不下了怎么办(压缩、淘汰、预算)。两篇文章互补:先读 memory-design 理解全貌,再读本文获取 L0 层的操作细节。

什么时候该压缩,什么时候该淘汰,什么时候该委托给子 Agent?

这三个操作不是互斥的——它们构成一个级联:淘汰优先(成本最低,删除冗余信息)→ 压缩跟进(当淘汰不够时,浓缩信息而非删除)→ 委托兜底(当子任务需要大量独立上下文时,分发给子 Agent)。具体触发点:利用率 75% 开始淘汰 + 工具结果摘要,85% 渐进压缩,92% 对话压缩,95%+ 且子任务可隔离时触发子 Agent 委托。这个级联由 CompressionEngine.select_strategy() 自动决策。

怎么衡量压缩有没有丢失关键信息?

使用 LLM-as-Judge 保真度评估:准备 N 个关于原始上下文的测试问题 → 分别用原始上下文和压缩上下文回答 → 用 Judge LLM 判断答案是否一致 → 保真度 = 一致数 / N。目标 > 0.85。这个评估可以自动化跑在 CI 中,作为压缩提示词的回归测试。ContextHealthMonitor.evaluate_fidelity() 提供了完整实现。

FIFO、LRU、优先级淘汰,到底该用哪个?

按任务类型选择:对话式聊天用 FIFO(简单可预测)→ 探索性任务(代码搜索、研究调研)用 LRU(自动清除死胡同)→ 异构内容混合的生产 Agent 用混合策略(优先级 + 类型 TTL + 引用频率)。一个实用的经验法则:先用混合策略的默认权重运行,观察 ContextHealthMonitor 的指标(淘汰频率、压缩保真度),根据数据调整权重——没有「万能的最佳策略」,只有「针对你的任务调优后的最佳策略」。

跨窗口状态管理会不会导致状态不一致?怎么验证?

有这种风险,但可以通过以下措施防范:(1)状态文件使用递增的窗口编号和版本号——新窗口启动时只读取版本号 >= 当前版本的状态,防止过期检查点覆盖新状态;(2)在状态中明确区分「planned」(计划做但未执行)和「done」(已完成),防止孤儿状态被误认为已完成;(3)CrossWindowStateManager.validate_state() 在加载状态时进行完整性检查——缺少任务目标、有剩余子任务但无当前子任务等情况会触发验证警告;(4)引导序列包含环境验证步骤(git status、服务健康检查),确保状态和实际环境一致。

token 预算设多大合适?怎么根据任务类型调整?

没有绝对的「合适」——预算取决于任务类型和模型窗口大小。一个基于 128K 窗口的实用起点:系统提示词 6%(~7.5K)→ 工具定义 4%(~5K)→ 消息历史 65%(~83K)→ 工具结果 12%(~15K)→ 记忆注入 8%(~10K)→ 输出预留 5%(~6K)。按任务类型调整:工具密集型任务(代码审查、运维自动化)→ 增加工具结果预算(15-20%)、减少消息历史;对话密集型任务(客服、咨询)→ 增加消息历史预算(70-75%)、减少工具结果。核心原则:输出预留永远不能低于 4K token——否则 LLM 可能连一个完整的响应都生成不了。

本文是 Agent Memory and Context Engineering 系列中 L0(上下文窗口)的操作手册。建议按以下路径继续: