Agent Context Window Management: Compressing, Preserving, and Evicting Task State
⚡ 30-Second Takeaway
- Core Problem: Agents running 50+ tool calls or multi-hour sessions inevitably hit context window overflow. Even with 128K-token windows, context rot causes accuracy to degrade as attention dilutes across accumulated tokens. A bigger model is not the answer -- structural context management is.
- The Solution: A complete context window lifecycle -- detect pressure (when to intervene) → evict (what to remove) → compress (shrink without losing state) → delegate (isolate to sub-contexts). Every stage has decision criteria, trade-off analysis, and code implementation.
- Key Implementation:
ContextWindowManagerorchestrating 6 subsystems: pressure monitoring, 6 eviction policies, 5 compression strategies, token budgeting, cross-window state persistence, and health monitoring. Full Python implementation with pluggable strategy patterns. - What You'll Walk Away With: A production-grade context management system for your agent -- when to compress vs evict vs delegate to sub-agents, how to budget tokens across components, how to resume work cleanly after context reset, and how to measure whether your compression is losing critical state.
1. Why Context Window Management Is a First-Class Engineering Problem
A code-review agent embarks on a large refactoring task: "Migrate all REST API calls in user-service from axios to fetch, preserving error-handling logic." The agent starts methodically -- step 1 searches for axios references, step 5 analyzes error-handling patterns, step 15 begins file-by-file replacement, step 30 runs tests and finds breakages. By step 40, something has gone wrong: the agent keeps modifying the same file without advancing, has forgotten dependency analyses completed 25 steps earlier, and at step 50 throws a model_context_window_exceeded error.
This is not a model capability problem. It is the result of treating the context window as an infinite, append-only log. The axios search results from step 1, the error-pattern analysis from step 5, the per-file dependency map from step 15 -- every piece of output has been faithfully appended to the context window. A 128K window looks capacious at first, but after 40 turns of accumulation -- system prompt, tool definitions, conversation history, tool call results -- it is full. The agent begins losing critical context at step 41 and hits the wall at step 50.
Five Failure Modes of Unmanaged Context
In production, treating the context window as a passive message queue triggers five distinct failure classes:
- Context Rot (attention dilution): Transformer attention scales as O(n²) -- every token attends to every other token. As context grows, the attention budget per token shrinks. Chroma's research on 18 models confirmed this empirically: even models claiming 128K+ context windows show significant accuracy degradation past 64K tokens. GPT-4 begins hallucinating confident but incorrect inferences; Claude tends to abstain when uncertain. This is not a bug -- it is the physical limit of soft attention. More context means less attention per piece of information.
- Token cost inflation: Every turn sends the full accumulated history to the LLM. Turn 1 costs $0.01. Turn 50 costs $0.80. The cost curve is superlinear -- roughly 80% of tokens in a long-running agent task are spent on content the agent no longer needs to see. For teams running agents at scale, unmanaged context is not just a correctness problem -- it is a cost-control problem.
- Overflow crashes: The most visible failure. At step N the window hits its hard limit, the API returns an error, and the task terminates. All work from steps 1 through N-1 is lost because there is no cross-window state persistence. The agent must restart from zero, with zero memory of what it already did.
- Zombie information: Old tool results, resolved discussions, abandoned exploration paths -- these are never automatically removed. They sit in context forever, consuming both attention budget and token budget. Worse: the LLM can be misled by stale information. "But you decided to use axios back in step 3" -- a decision that was overturned 30 steps ago but whose text still lives in the history.
- Session amnesia: The agent is forcibly restarted after overflow, landing in a fresh context window with zero continuity. It must re-search for axios references, re-analyze error-handling patterns, re-build dependency maps -- re-deriving knowledge it already produced. If the original task required those 50+ steps to reach the halfway point, the agent is now in an infinite restart loop.
Three Axes of Context Management: Compress, Evict, Delegate
The context window management system proposed in this article operates along three axes:
Axis 1 -- Compress: Shrink context without losing critical state
Strategies: conversation compaction / structured note-taking /
tool result summarization / progressive summarization /
sub-agent delegation
Axis 2 -- Evict: Remove what is no longer useful
Policies: FIFO / LRU / Priority-based / Semantic similarity merge /
Type-based / Hybrid (weighted composite)
Axis 3 -- Delegate: Move work to isolated sub-contexts
Pattern: spawn sub-agent with clean context → focused subtask →
condensed summary returned to main agent
These three axes are not alternatives -- they are complementary layers that activate at different pressure levels. The decision framework in Sections 3 and 4 provides concrete guidance on when to use each.
The ContextWindowManager: Architecture Preview
Agent Loop → ContextWindowManager
|-- ContextPressureMonitor (when to act -- Section 2)
|-- EvictionEngine (what to remove -- Section 3)
| |-- FIFO / LRU / Priority / Semantic / Type / Hybrid
|-- CompressionEngine (how to shrink -- Section 4)
| |-- Compaction / Note-Taking / ToolResultSummarization
| |-- Progressive / SubAgent
|-- TokenBudgetManager (track, allocate, enforce -- Section 5)
|-- CrossWindowStateManager (serialize, resume, verify -- Section 6)
|-- ContextHealthMonitor (metrics, fidelity, alerts -- Section 7)
This article is the operational manual for L0 (the context window). The companion article Agent Memory System Design defines the full L0-L3 memory architecture -- what each layer stores, how retrieval boundaries work, how memory is scoped and hygienic. If agent-memory-design is the warehouse architect, this article is the warehouse operator managing shelf space. Both are necessary; they address complementary halves of the memory+context problem.
For the broader context protocol that governs how data flows into the context window in the first place, see Agent Context Protocol Design. For the observability infrastructure that consumes the health metrics emitted by the ContextWindowManager, see Agent Observability.
2. Understanding Context Pressure: When to Intervene
The first question in context management is not "what should I do?" -- it is "do I need to do anything right now?" Intervening too early wastes tokens on unnecessary compression. Intervening too late means the overflow has already happened, the error has already fired, and the recovery cost is much higher. The goal of context pressure monitoring is to detect the optimal intervention window -- early enough to act safely, late enough to avoid unnecessary work.
The Token Utilization Curve
Every agent loop produces a characteristic token utilization curve. Understanding its shape is the first step to predicting when intervention will be needed. The curve is typically linear in slope but can spike sharply when a tool returns unexpectedly large output (e.g., a web search returning a 20K-token page). Pressure monitoring must therefore track two signals: absolute level (current token count / max window) and velocity (tokens added per turn). Velocity tells you whether you have 10 turns or 2 turns before hitting the red zone.
Four Pressure Zones
| Zone | Utilization | Action | Why This Threshold? |
|---|---|---|---|
| 🟢 Green | 0--50% | No intervention needed. Full history is available and attention dilution is minimal. | Below 50%, the opportunity cost of compression (losing detail) almost always exceeds the benefit (freeing space). |
| 🟡 Yellow | 50--75% | Prepare eviction/compression strategy. No action yet, but evaluate: which blocks are eviction candidates? What is the burn rate? | This is the strategic planning zone. At 50%, you have time to make good decisions. At 90%, you have time only to panic. |
| 🟠 Orange | 75--90% | Execute eviction. Remove low-priority content. If eviction is insufficient, prepare compaction. | At 75%, attention dilution becomes measurable. Eviction restores attention quality while preserving recent context. |
| 🔴 Red | 90--100% | Force evict or compact immediately. The next LLM call risks overflow. Cross-window state should be serialized as a safety net. | At 90%, you are one large tool output away from a crash. Delay is not an option. |
The zone thresholds (50%, 75%, 90%) are defaults that work well for 128K-token windows. For smaller windows (8K--32K), tighten the thresholds -- the absolute token budget is smaller, so you have less margin for error. For very large windows (200K+), you can loosen the yellow/orange thresholds but must keep the red zone tight -- attention dilution is a function of both absolute token count and information density, and long contexts amplify rot even within "safe" zones.
Velocity Tracking: The Pressure Speed Problem
Absolute utilization tells you where you are. Velocity tells you how fast you're getting to the red zone. Not all agent loops add tokens at the same rate:
- Low-velocity agents (chat, Q&A): ~200--500 tokens/turn. Pressure builds slowly. The yellow zone offers plenty of planning time.
- Medium-velocity agents (code review, data analysis): ~1K--3K tokens/turn. Moderate pressure growth.
- High-velocity agents (web research, multi-tool pipelines): ~5K--20K tokens/turn. Pressure can spike from green to red in 2--3 turns. The monitoring system must detect this acceleration and trigger pre-emptive eviction before reaching the orange zone.
Velocity is computed as a rolling average over the last N turns (default N=5). A sudden velocity spike -- e.g., a tool returning 15K tokens when the average is 2K -- should trigger an immediate pressure re-evaluation, regardless of the current zone.
Context Awareness: Leveraging Model-Reported Token Budget
Some modern models (Claude Sonnet 4.5+) report their remaining token budget through system warnings. This is a powerful signal -- it is the ground truth of what the model actually sees, accounting for internal overhead that client-side token counting may miss. The ContextPressureMonitor should consume this signal when available, falling back to client-side estimation when not.
Trade-off: Relying solely on model-reported budgets creates vendor lock-in -- not all providers expose this. Relying solely on client-side counting risks underestimating usage. The recommended approach: prefer model-reported budget when available, validate against client-side count as a cross-check, and fall back to client-side when the model doesn't report.
Code: ContextPressureMonitor
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
from collections import deque
class PressureZone(Enum):
GREEN = "green" # 0-50%: no action
YELLOW = "yellow" # 50-75%: prepare strategy
ORANGE = "orange" # 75-90%: execute eviction
RED = "red" # 90-100%: force compact
@dataclass
class PressureReading:
"""A single pressure measurement snapshot"""
current_tokens: int
max_tokens: int
utilization_pct: float
zone: PressureZone
velocity_tokens_per_turn: float
turns_until_red: Optional[float]
source: str # "model_reported" or "client_estimated"
@dataclass
class ContextPressureMonitor:
"""Monitors context window pressure with zone detection and velocity tracking.
Call .measure() before each LLM invocation in the agent loop.
The returned PressureReading drives decisions in the EvictionEngine
and CompressionEngine."""
max_context_tokens: int = 128_000
velocity_window_turns: int = 5
# Zone thresholds as fraction of max context
green_threshold: float = 0.50 # below this = green
yellow_threshold: float = 0.75 # below this = yellow
orange_threshold: float = 0.90 # below this = orange; above = red
# Velocity spike detection: flag when velocity exceeds baseline * N
spike_multiplier: float = 3.0
# Callbacks for integration with observability pipeline
on_zone_change: Optional[Callable] = None
on_velocity_spike: Optional[Callable] = None
# Internal tracking state
_token_history: deque = field(default_factory=deque)
_current_zone: PressureZone = PressureZone.GREEN
_baseline_velocity: float = 0.0
_turn_count: int = 0
def measure(self, current_tokens: int,
model_reported_tokens: Optional[int] = None) -> PressureReading:
"""Take a pressure reading before the next LLM call.
Prefer model-reported token count when available (ground truth);
fall back to client-side estimate otherwise."""
effective_tokens = (model_reported_tokens
if model_reported_tokens is not None
else current_tokens)
utilization = effective_tokens / self.max_context_tokens
zone = self._classify_zone(utilization)
velocity = self._compute_velocity(effective_tokens)
turns_until_red = self._estimate_turns_to_red(effective_tokens, velocity)
source = ("model_reported" if model_reported_tokens is not None
else "client_estimated")
reading = PressureReading(
current_tokens=effective_tokens,
max_tokens=self.max_context_tokens,
utilization_pct=round(utilization * 100, 1),
zone=zone,
velocity_tokens_per_turn=round(velocity, 1),
turns_until_red=turns_until_red,
source=source,
)
# Detect and emit events
self._detect_events(reading, velocity)
self._token_history.append(effective_tokens)
self._turn_count += 1
self._current_zone = zone
return reading
def _classify_zone(self, utilization: float) -> PressureZone:
if utilization >= self.orange_threshold:
return PressureZone.RED
elif utilization >= self.yellow_threshold:
return PressureZone.ORANGE
elif utilization >= self.green_threshold:
return PressureZone.YELLOW
return PressureZone.GREEN
def _compute_velocity(self, current_tokens: int) -> float:
"""Rolling average of tokens added per turn over recent window."""
if len(self._token_history) < 2:
return 0.0
recent = list(self._token_history)[-self.velocity_window_turns:]
if len(recent) < 2:
return 0.0
deltas = [recent[i+1] - recent[i] for i in range(len(recent)-1)]
deltas.append(current_tokens - recent[-1])
return sum(deltas) / len(deltas)
def _estimate_turns_to_red(self, tokens: int,
velocity: float) -> Optional[float]:
"""Estimate how many turns until context hits the red zone."""
if velocity <= 0:
return None
red_tokens = int(self.max_context_tokens * self.orange_threshold)
remaining = red_tokens - tokens
return max(0.0, remaining / velocity) if remaining > 0 else 0.0
def _detect_events(self, reading: PressureReading, velocity: float):
"""Fire callbacks on zone transitions and velocity spikes."""
if reading.zone != self._current_zone and self.on_zone_change:
self.on_zone_change(self._current_zone, reading.zone, reading)
if (self._baseline_velocity > 0
and velocity > self._baseline_velocity * self.spike_multiplier
and self.on_velocity_spike):
self.on_velocity_spike(velocity, self._baseline_velocity, reading)
# Exponential moving average for baseline velocity
alpha = 0.3
self._baseline_velocity = (
alpha * velocity + (1 - alpha) * self._baseline_velocity
if self._baseline_velocity > 0 else velocity
)
def reset(self):
"""Reset monitor state for a new task."""
self._token_history.clear()
self._current_zone = PressureZone.GREEN
self._baseline_velocity = 0.0
self._turn_count = 0
# -- Usage example --
def on_zone_change(old, new, reading):
print(f"[PRESSURE] {old.value} -> {new.value} "
f"(utilization: {reading.utilization_pct}%)")
def on_velocity_spike(current, baseline, reading):
print(f"[SPIKE] {current:.0f} t/turn vs baseline {baseline:.0f}")
monitor = ContextPressureMonitor(
max_context_tokens=128_000,
on_zone_change=on_zone_change,
on_velocity_spike=on_velocity_spike,
)
# Simulate growing context across 10 turns
token_counts = [5000, 8000, 12000, 18000, 28000,
45000, 68000, 90000, 105000, 118000]
for i, tokens in enumerate(token_counts):
r = monitor.measure(tokens)
icons = {"green": "G", "yellow": "Y", "orange": "O", "red": "R"}
print(f"Turn {i+1}: [{icons[r.zone.value]}] {r.utilization_pct:.0f}% "
f"| vel: {r.velocity_tokens_per_turn:.0f} t/t "
f"| red in: {r.turns_until_red}")
The ContextPressureMonitor is designed to be called before every LLM invocation in the agent loop. Its output -- current zone, velocity, estimated turns until red -- feeds directly into the decision logic of the EvictionEngine and CompressionEngine. The zone transition and velocity spike callbacks enable integration with the observability pipeline: every zone transition is an event worth logging, and every velocity spike is a signal worth alerting on (see Agent Observability for the metrics pipeline).
3. Eviction Policies: What to Remove When Context Is Full
When the pressure monitor reports orange or red, the first and cheapest intervention is eviction -- removing content that is no longer contributing to the agent's current task. Eviction is preferred over compression because it costs nothing (no LLM calls) and is reversible in principle (the evicted content may still exist in a lower memory layer). But the decision of what to evict is where the engineering challenge lies -- a wrong eviction choice silently degrades the agent's reasoning quality.
The Eviction Decision Space
Before designing eviction policies, you must understand what types of content exist in the context window. Each type has a different lifecycle, importance characteristics, and safe-eviction rules:
| Content Type | ~Token Share | Lifecycle | Safe to Evict? |
|---|---|---|---|
| System prompt | 5--15% | Static | ❌ Never. Partial eviction breaks agent behavior. |
| Tool definitions | 3--8% | Static | ⚠ Only if tool is no longer needed. |
| User messages | 2--5% | Per-turn | ⚠ Older ones evictable; original task goal never. |
| Assistant responses | 15--35% | Per-turn | ✅ Safe if covered by compaction summary. |
| Tool call results | 30--60% | Single-use | ✅ Primary eviction target. Most consumed once. |
| Memory injections | 5--20% | Per-session | ⚠ Evict stale; keep high-relevance recent. |
The key insight: tool call results are the dominant token consumer in agentic workloads (30--60% of context) and are the safest to evict. A web search result from 15 turns ago, a directory listing from the initial exploration phase, a verbose error log from a resolved issue -- these are dead weight. Evicting them costs nothing and frees substantial space.
Six Eviction Policies: A Comparative Analysis
Policy 1 -- FIFO (First-In-First-Out)
Strategy: Remove the oldest messages first. Maintain a rolling window of the last N turns.
- Pros: Predictable, zero computational overhead, preserves recency bias. Simple to implement and reason about during debugging.
- Cons: Loses critical early information -- the user's original task description, initial constraints, architectural decisions made in the first few turns. In agent tasks, "old" does not mean "unimportant."
- Best for: Chat-style agents where conversation naturally moves forward. Conversational customer support, Q&A agents.
- Worst for: Long-horizon task agents where the goal statement and early decisions remain critical throughout.
- Verdict: Too blunt for most agentic workloads. Use only when the task pattern is strictly linear with no backward references.
Policy 2 -- LRU (Least Recently Used)
Strategy: Track which content blocks are referenced in subsequent reasoning. Evict blocks that haven't been referenced for the longest time.
- Pros: Naturally keeps "active" information. Automatically removes dead-end explorations the agent never revisited.
- Cons: Requires content reference tracking -- detecting when the agent's response references a previous tool result or message. This is non-trivial: references can be implicit ("as discussed earlier") or fragmented. Cold-start problem: new content has no reference history.
- Best for: Long agent loops with exploratory phases where dead-end explorations accumulate.
- Verdict: Conceptually elegant but operationally complex. Worth implementing only if your agent has high exploration-to-convergence ratios.
Policy 3 -- Priority-Based Eviction
Strategy: Assign an importance score to each content block. Evict lowest-scoring blocks first. Scoring dimensions:
- Recency: Newer content scores higher (exponential decay).
- Semantic relevance: Cosine similarity between the content block and the current task goal.
- Source type: User messages > assistant responses > tool results.
- Reuse frequency: How many subsequent turns referenced this content.
- Explicit markers: Content flagged as IMPORTANT or DO_NOT_EVICT.
- Pros: Most tunable -- encodes domain-specific importance heuristics. Transparent and debuggable.
- Cons: Weight tuning is non-trivial and use-case-dependent. Semantic relevance requires embedding computation (cost).
- Best for: Agents with heterogeneous content importance. Production agents in well-understood domains.
- Verdict: The recommended default for production agents. Start with equal weights, monitor eviction decisions for a few runs, then tune.
Policy 4 -- Semantic Similarity Merge
Strategy: When two content blocks have high semantic similarity (cosine > 0.85), merge them into a single summarized block rather than evicting either.
- Pros: Preserves information while reducing tokens. Natural fit for agents receiving repeated similar outputs (status polling, iterative refinement).
- Cons: Requires an LLM call for merge summarization. Merging can lose nuanced differences. The similarity threshold is a critical tuning knob.
- Best for: Infrastructure monitoring agents, CI/CD pipeline agents, data collection agents.
- Verdict: A complementary policy, not standalone. Combine with priority-based eviction -- merge similar blocks first, then evict lowest-priority remaining.
Policy 5 -- Type-Based Eviction
Strategy: Apply different eviction rules per content type based on their natural lifecycle:
- Tool results older than N turns: clear automatically.
- Thinking blocks after tool cycle completes: clear.
- Memory injections: downweight and evict when relevance drops.
- User messages: never evict original task statement.
- Pros: Matches natural content lifecycle. Predictable behavior. Aligns with Anthropic's context editing API semantics.
- Cons: Static rules can be too aggressive -- a seemingly old tool result might contain a constraint critical later.
- Best for: Agentic workflows with predictable tool-use patterns.
- Verdict: Excellent as a safety layer -- enforce hard constraints ("never evict system prompt") while using priority scoring for nuanced decisions within evictable types.
Policy 6 -- Hybrid (Weighted Composite)
Strategy: Combine multiple policies into a single scoring function with configurable weights:
eviction_priority = (
0.30 * recency + 0.25 * priority
+ 0.20 * type_ttl + 0.15 * reference_freq
+ 0.10 * semantic_duplication
)
- Pros: Most flexible -- captures multiple dimensions. Weights tunable per agent type or task phase.
- Cons: Most complex to tune. Weight sensitivity means small changes can produce surprising eviction decisions.
- Best for: Production agents where the cost of a bad eviction is high and you're willing to invest in tuning.
- Verdict: The architecture implemented by the
EvictionEnginebelow -- pluggable scoring strategies let you start simple and add complexity as you understand your agent's eviction patterns.
What Must NEVER Be Evicted
- System prompt: Partial eviction creates undefined agent behavior.
- Current turn's user message: The agent must always see what it's being asked to do right now.
- Active tool call results: Evicting mid-cycle breaks the reasoning chain.
- Explicit preservation markers: Content flagged DO_NOT_EVICT by the agent or system.
Eviction Safety: Placeholder Messages
When a tool result is evicted, leave a placeholder so the agent knows what was removed:
[Evicted: tool_result from turn 7 (search_files "axios").
Finding: 23 files reference axios in user-service/src/.
Full result in session memory if needed.]
This costs ~30 tokens vs the original's ~3,000 -- a 100x compression -- while preserving the agent's awareness of past findings.
Decision Framework: Choosing an Eviction Policy
| If your agent... | Start with... | Then consider... |
|---|---|---|
| Does simple Q&A or chat | FIFO | Not needed |
| Runs long exploration-heavy loops | Type-based | LRU if dead-ends accumulate |
| Has heterogeneous content importance | Priority-based | Hybrid after tuning weights |
| Receives repeated similar outputs | Type-based + Semantic Merge | Add priority scoring |
| Is mission-critical production | Hybrid (conservative weights) | Tune based on eviction audit logs |
Code: EvictionEngine
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
import math
class ContentType(Enum):
SYSTEM_PROMPT = "system_prompt"
TOOL_DEFINITION = "tool_definition"
USER_MESSAGE = "user_message"
ASSISTANT_RESPONSE = "assistant_response"
TOOL_RESULT = "tool_result"
MEMORY_INJECTION = "memory_injection"
class EvictionPolicy(Enum):
FIFO = "fifo"
LRU = "lru"
PRIORITY = "priority"
SEMANTIC_MERGE = "semantic_merge"
TYPE_BASED = "type_based"
HYBRID = "hybrid"
@dataclass
class ContentBlock:
"""A single block of content in the context window"""
block_id: str
content_type: ContentType
content: str
token_count: int
turn_created: int
last_referenced_turn: int = 0
priority_score: float = 0.5
embedding: Optional[list] = None
do_not_evict: bool = False
metadata: dict = field(default_factory=dict)
# ---- Pluggable Scoring Strategies ----
class EvictionScorer(ABC):
"""Abstract scorer: lower score = higher eviction priority."""
@abstractmethod
def score(self, block: ContentBlock, context: dict) -> float:
"""Return 0.0 (evict first) to 1.0 (keep)."""
...
class FIFOScorer(EvictionScorer):
"""Oldest blocks score lowest -> evicted first."""
def score(self, block: ContentBlock, context: dict) -> float:
current_turn = context.get("current_turn", 0)
age = current_turn - block.turn_created
half_life = context.get("fifo_half_life", 10)
return math.exp(-age / max(half_life, 1))
class LRUScorer(EvictionScorer):
"""Least-recently-referenced blocks score lowest."""
def score(self, block: ContentBlock, context: dict) -> float:
current_turn = context.get("current_turn", 0)
idle = current_turn - block.last_referenced_turn
if block.last_referenced_turn == 0:
return 0.3 # cold start: moderate suspicion
half_life = context.get("lru_half_life", 5)
return 1.0 - math.exp(-idle / max(half_life, 1))
class PriorityScorer(EvictionScorer):
"""Weighted multi-signal priority scoring."""
def score(self, block: ContentBlock, context: dict) -> float:
current_turn = context.get("current_turn", 0)
w = context.get("priority_weights", {
"recency": 0.30, "semantic": 0.25, "source": 0.20,
"reuse": 0.15, "explicit": 0.10,
})
# Recency: exponential decay
age = current_turn - block.turn_created
recency = math.exp(-age / max(context.get("recency_half_life", 15), 1))
# Source type prestige
type_scores = {
ContentType.USER_MESSAGE: 1.0,
ContentType.TOOL_DEFINITION: 0.9,
ContentType.MEMORY_INJECTION: 0.7,
ContentType.ASSISTANT_RESPONSE: 0.5,
ContentType.TOOL_RESULT: 0.3,
ContentType.SYSTEM_PROMPT: 1.0,
}
source_score = type_scores.get(block.content_type, 0.5)
# Reuse frequency
reuse = min(block.metadata.get("reference_count", 0) / 5.0, 1.0)
# Explicit DO_NOT_EVICT flag
explicit = 1.0 if block.do_not_evict else 0.0
# Semantic relevance (requires embedding)
semantic = 0.5
if block.embedding and context.get("task_embedding"):
task_emb = context["task_embedding"]
dot = sum(x * y for x, y in zip(block.embedding, task_emb))
n_a = math.sqrt(sum(x*x for x in block.embedding))
n_b = math.sqrt(sum(y*y for y in task_emb))
semantic = max(0.0, dot / max(n_a * n_b, 1e-10))
return (w["recency"] * recency + w["semantic"] * semantic
+ w["source"] * source_score + w["reuse"] * reuse
+ w["explicit"] * explicit)
class TypeBasedScorer(EvictionScorer):
"""Per-type TTL: blocks past their type's max age score zero."""
def score(self, block: ContentBlock, context: dict) -> float:
current_turn = context.get("current_turn", 0)
ttl = context.get("type_ttl", {
ContentType.TOOL_RESULT: 8,
ContentType.MEMORY_INJECTION: 20,
ContentType.ASSISTANT_RESPONSE: 30,
})
max_age = ttl.get(block.content_type, 50)
age = current_turn - block.turn_created
if age > max_age:
return 0.0
return 1.0 - (age / max(max_age, 1))
class HybridScorer(EvictionScorer):
"""Weighted composite of all scoring strategies."""
def __init__(self):
self._fifo = FIFOScorer()
self._lru = LRUScorer()
self._priority = PriorityScorer()
self._type = TypeBasedScorer()
def score(self, block: ContentBlock, context: dict) -> float:
w = context.get("hybrid_weights", {
"fifo": 0.15, "lru": 0.15,
"priority": 0.40, "type": 0.30,
})
return (w["fifo"] * self._fifo.score(block, context)
+ w["lru"] * self._lru.score(block, context)
+ w["priority"] * self._priority.score(block, context)
+ w["type"] * self._type.score(block, context))
# ---- Eviction Engine ----
@dataclass
class EvictionResult:
blocks_evicted: list
tokens_freed: int
remaining_tokens: int
policy_used: EvictionPolicy
placeholders_generated: int
@dataclass
class EvictionEngine:
"""Pluggable eviction engine with safe-eviction guardrails.
Usage:
engine = EvictionEngine(policy=EvictionPolicy.HYBRID)
result = engine.evict(blocks, target_free_tokens=5000, current_turn=12)
"""
scorer: EvictionScorer = field(default_factory=HybridScorer)
policy: EvictionPolicy = EvictionPolicy.HYBRID
NEVER_EVICT: tuple = (ContentType.SYSTEM_PROMPT,)
generate_placeholders: bool = True
placeholder_max_tokens: int = 50
def evict(self, blocks: list, target_free_tokens: int,
current_turn: int = 0, context: dict = None) -> EvictionResult:
"""Evict blocks to free at least target_free_tokens."""
if context is None:
context = {}
context["current_turn"] = current_turn
# Separate protected and evictable
protected, evictable = [], []
for b in blocks:
if b.content_type in self.NEVER_EVICT or b.do_not_evict:
protected.append(b)
else:
evictable.append(b)
# Score and sort (lowest first = evict first)
scored = [(self.scorer.score(b, context), b) for b in evictable]
scored.sort(key=lambda x: x[0])
# Evict until target met
freed, evicted = 0, []
for score, block in scored:
if freed >= target_free_tokens:
break
evicted.append(block)
freed += block.token_count
remaining = sum(b.token_count for b in protected) + sum(
b.token_count for b in evictable if b not in evicted)
return EvictionResult(
blocks_evicted=evicted,
tokens_freed=freed,
remaining_tokens=remaining,
policy_used=self.policy,
placeholders_generated=0,
)
def set_policy(self, policy: EvictionPolicy):
"""Switch eviction policy at runtime."""
self.policy = policy
scorers = {
EvictionPolicy.FIFO: FIFOScorer,
EvictionPolicy.LRU: LRUScorer,
EvictionPolicy.PRIORITY: PriorityScorer,
EvictionPolicy.TYPE_BASED: TypeBasedScorer,
EvictionPolicy.HYBRID: HybridScorer,
}
if policy in scorers:
self.scorer = scorers[policy]()
# -- Usage: hybrid eviction freeing 500 tokens --
engine = EvictionEngine(policy=EvictionPolicy.HYBRID)
blocks = [
ContentBlock("b1", ContentType.USER_MESSAGE,
"Migrate all axios calls to fetch", 12, 1, do_not_evict=True),
ContentBlock("b2", ContentType.TOOL_RESULT,
"Found 23 axios refs in user-service/src/...", 450, 2),
ContentBlock("b3", ContentType.TOOL_RESULT,
"Error handling: try/catch in 18 files, config in 5...", 2800, 5),
ContentBlock("b4", ContentType.TOOL_RESULT,
"LS output (irrelevant): total 48 files...", 120, 6),
]
result = engine.evict(blocks, target_free_tokens=500,
current_turn=8,
context={"hybrid_weights": {
"fifo": 0.15, "lru": 0.15,
"priority": 0.40, "type": 0.30}})
print(f"Evicted {len(result.blocks_evicted)} blocks, freed {result.tokens_freed} tokens")
for b in result.blocks_evicted:
print(f" -> {b.block_id} ({b.content_type.value})")
The eviction engine is designed to be called from the agent loop whenever the ContextPressureMonitor reports orange or red. The pluggable scorer architecture means you can start with HybridScorer and conservative weights, observe eviction decisions over multiple runs, and progressively tune. For the design of token-efficient tools that minimize the need for eviction, see Agent Tool Design.
4. Compression Strategies: Making Context Smaller Without Losing State
Eviction removes content entirely. Compression transforms it -- preserving information in a reduced form. When eviction alone cannot free enough tokens (or when the content slated for eviction contains information the agent still needs), compression is the next intervention in the cascade.
The fundamental distinction: eviction trades completeness for space; compression trades fidelity for space. Eviction says "this information is not needed." Compression says "this information is needed, but not at full resolution." The engineering challenge is maximizing the fidelity-to-token ratio -- preserving as much decision-critical information as possible per token of compressed output.
Five Compression Strategies
Strategy 1 -- Conversation Compaction
How it works: Send the conversation history to an LLM with a compaction prompt that produces a structured summary capturing: architectural decisions, unresolved issues, implementation state, next steps, key learnings. Replace the original conversation with the summary + the most recent N messages.
The compaction prompt is the most critical artifact. A poorly designed prompt produces summaries that omit the one detail that matters 20 turns later:
Compaction prompt structure:
1. PRESERVE (mandatory):
- All architectural decisions and rationale
- All unresolved bugs, errors, blockers -- include exact messages
- Current task progress: completed, current, remaining steps
- All user constraints and preferences (verbatim if short)
- Key file paths, function names, data structures
- Numbers: counts, measurements, config values, versions
2. DROP:
- Verbose tool output where the key finding is captured in 1-2 lines
- Redundant confirmations and status checks
- Dead-end explorations yielding no useful information
- Boilerplate error messages (keep type + key detail)
- Intermediate reasoning superseded by a final decision
3. FORMAT: Structured sections, bullet points. Not narrative -- a reference document.
Trade-off -- Server-side vs Client-side: Server-side (Anthropic's compaction API) is automatic and reliable but vendor-locked and black-box. Client-side (this implementation) is provider-agnostic and tunable but costs an extra LLM call and quality depends on your prompt design. Choose server-side when you're on a single provider and want zero-code integration. Choose client-side when you need provider independence, prompt transparency, or integration with custom compaction logic.
Strategy 2 -- Structured Note-Taking
How it works: The agent writes persistent notes to external storage during operation. On compression, notes -- not raw conversation -- are carried forward. This is distinct from compaction: compaction summarizes post-hoc; note-taking captures information at the moment of generation, when understanding is freshest.
Patterns from Claude Code and Claude Plays Pokemon:
- TODO lists: Structured task list, marked complete as items finish. On context reset, the TODO list is the single source of truth.
- Architecture Decision Records (ADRs): Every decision with rationale, alternatives considered, and implications -- written to persistent storage.
- Bug tracking notes: Unresolved issues, error reproduction steps, suspected root causes -- written in real time.
The "cold start" requirement: Notes must be written so a new agent session, reading only the notes, can become operational immediately. "Continuing investigation from earlier" is useless. "Investigating 503 error on /api/v2/users -- occurs at ~200 concurrent requests, suspected connection pool exhaustion, see /var/log/user-service/error.log" provides everything the new session needs.
Trade-off: Note-taking costs tokens during operation (every note is an LLM output). But those tokens are an investment -- they reduce later compression costs and improve context resumption quality. For long tasks (>20 turns), the investment pays off. For short tasks (<10 turns), it is overhead without benefit.
Strategy 3 -- Tool Result Summarization
How it works: When a tool returns large output, immediately summarize it into key findings. The original output is discarded; only the summary stays in context. Apply when tool_output_tokens > summarization_threshold (default: 500 tokens). Below this threshold, the compression benefit is marginal; above it, summarization can achieve 5--20x compression ratios.
Pattern from Claude Code: Instead of loading entire files, Claude Code uses grep for pattern matching, head/tail for snippets. This is tool-result summarization at the tool-design level. See Agent Tool Design for designing tools that produce token-efficient output by default.
Strategy 4 -- Progressive Summarization
How it works: Summarize at increasing compression ratios as information ages -- mirroring how human memory works:
L0 (current turn): Full text
L1 (last 1-5 turns): Key points, decisions (~30% of original)
L2 (5-20 turns ago): One-line summary (~5%)
L3 (>20 turns ago): Title/topic only (~1%)
Why this works: Information utility decays over time. The exact wording of a tool result from 25 turns ago is almost never needed -- but knowing that a tool was called and what its general finding was may still be relevant. Progressive summarization captures this natural decay curve.
Implementation challenge: The transition between compression levels requires re-summarization. A turn at L1 (key points) must be further compressed to L2 (one-line) when it ages past 5 turns. This distributes compression cost over time -- which is both a benefit (smoother cost) and a risk (accumulated drift as each level loses a bit more fidelity).
Strategy 5 -- Sub-Agent Delegation (Context Isolation)
How it works: Spawn a sub-agent with a clean context window for a focused subtask. The sub-agent explores, reasons, and produces a condensed summary (1K--2K tokens) returned to the main agent. The sub-agent's full context -- potentially 50K+ tokens of exploration -- is discarded after summarization.
This achieves extreme compression ratios (20--100x) by leveraging the sub-agent's intelligence to determine what matters. A sub-agent tasked with "find all authentication middleware bypasses" may search 50 files, read 30, and return a 1K-token summary of the 3 files that actually have issues. The main agent receives only the conclusion.
When to use sub-agent delegation vs local compression:
| Factor | Favor Local Compression | Favor Sub-Agent Delegation |
|---|---|---|
| Subtask independence | Tightly coupled to main context | Self-contained; specifiable in 1-2 sentences |
| Exploration volume | <5 tool calls needed | 10+ tool calls needed |
| Context dependence | Needs full conversation history | Only needs task spec + minimal context |
| Result complexity | Simple value or boolean | Structured analysis requiring synthesis |
| Cost sensitivity | Budget-conscious | Quality-conscious; worth paying for sub-agent |
Anti-pattern: Spawning a sub-agent and not summarizing its output. If the sub-agent returns its full 50K-token context, delegation increases context pressure. The sub-agent's result must always be a compressed summary. For orchestration patterns governing sub-agent lifecycle, see Multi-Agent Orchestration.
The Compression Cascade: Order of Operations
Compression strategies follow an escalation order minimizing cost and maximizing information preservation:
Context pressure detected
|
|-- 1. Tool result summarization (cheap, point intervention)
| Compress individual large tool outputs >500 tokens
|
|-- 2. Eviction of low-priority content (zero-cost, Section 3)
| Remove dead weight before spending on compression
|
|-- 3. Progressive summarization of aging turns
| Compress turns >5 old to key points, >20 old to titles
|
|-- 4. Conversation compaction (expensive but comprehensive)
| Full summarization of entire history window
|
|-- 5. Sub-agent delegation (most expensive, most powerful)
Spawn sub-agent for largest independent subtask
The cascade is ordered by cost per token freed. Tool result summarization and eviction are cheap first steps. Progressive summarization and compaction are more expensive but free more space. Sub-agent delegation is the most expensive but also the most powerful -- reserve for tasks that genuinely cannot fit in a single window.
Compression Fidelity: The Litmus Test
How do you know if your compression is working? Three dimensions (detailed in Section 7):
- Key fact retention: After compression, can the agent answer questions about the compressed content?
- State continuity: Does agent task state (current step, remaining steps, unresolved issues) match pre-compression?
- Decision recall: Can the agent recall architectural decisions and their rationale?
Code: CompressionEngine
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
import time
class CompressionStrategy(Enum):
COMPACTION = "compaction"
NOTE_TAKING = "note_taking"
TOOL_RESULT = "tool_result_summarization"
PROGRESSIVE = "progressive"
SUB_AGENT = "sub_agent"
@dataclass
class CompressionResult:
"""Result of a single compression operation."""
strategy: CompressionStrategy
original_tokens: int
compressed_tokens: int
compression_ratio: float # original / compressed
compressed_content: str
fidelity_score: Optional[float] = None
llm_calls_used: int = 0
metadata: dict = field(default_factory=dict)
# ---- Compaction Prompt Templates ----
COMPACTION_SYSTEM = """You are a context-compaction engine. Compress conversation
history into a structured summary preserving all decision-critical information.
RULES:
1. PRESERVE (mandatory):
- Every architectural decision with rationale and alternatives
- Every unresolved bug, error, blocker -- include exact error messages
- Current task progress: completed steps, step in progress, remaining
- All user constraints and preferences (verbatim if short)
- Key file paths, function names, class names, data structures
- All numbers: counts, measurements, config values, versions
2. DROP:
- Verbose tool output where key finding fits in 1-2 sentences
- Redundant status confirmations
- Dead-end explorations yielding no useful information
- Boilerplate error messages (keep type + key detail)
- Intermediate reasoning superseded by a final decision
3. OUTPUT FORMAT:
Structured sections with clear headers. Bullet points preferred over prose.
This is a reference document, not a narrative."""
COMPACTION_USER = """Compress the following conversation. Task goal:
{task_goal}
Conversation to compress:
{conversation}
Produce the structured summary. A new agent session with only this summary
must be able to resume the task immediately."""
TOOL_SUMMARIZE_PROMPT = """Summarize the following tool output. Extract:
1. The key finding or result (1 sentence)
2. Any numbers, paths, names, or identifiers mentioned
3. Any errors or warnings (type + key detail)
4. Any actionable information
Tool: {tool_name}
Output ({output_tokens} tokens):
{output}
Summary (aim for <20% of original length):"""
# ---- Pluggable Compression Methods ----
class CompressionMethod(ABC):
"""Abstract compression strategy."""
@abstractmethod
def compress(self, content: str, context: dict,
llm_call: Callable) -> CompressionResult:
"""llm_call is a function: (system_prompt, user_prompt) -> str"""
...
class CompactionMethod(CompressionMethod):
"""Full conversation compaction via LLM summarization."""
def compress(self, content: str, context: dict,
llm_call: Callable) -> CompressionResult:
task_goal = context.get("task_goal", "Unknown task")
user = COMPACTION_USER.format(
task_goal=task_goal, conversation=content)
summary = llm_call(COMPACTION_SYSTEM, user)
orig = context.get("original_tokens", len(content) // 4)
comp = len(summary) // 4
return CompressionResult(
strategy=CompressionStrategy.COMPACTION,
original_tokens=orig,
compressed_tokens=comp,
compression_ratio=orig / max(comp, 1),
compressed_content=summary,
llm_calls_used=1,
)
class ToolResultSummarizationMethod(CompressionMethod):
"""Summarize individual large tool outputs."""
def __init__(self, min_tokens: int = 500):
self.min_tokens = min_tokens
def compress(self, content: str, context: dict,
llm_call: Callable) -> CompressionResult:
tool_name = context.get("tool_name", "unknown")
output_tokens = context.get("output_tokens", len(content) // 4)
# Below threshold: no summarization needed
if output_tokens < self.min_tokens:
return CompressionResult(
strategy=CompressionStrategy.TOOL_RESULT,
original_tokens=output_tokens,
compressed_tokens=output_tokens,
compression_ratio=1.0,
compressed_content=content,
)
user = TOOL_SUMMARIZE_PROMPT.format(
tool_name=tool_name, output_tokens=output_tokens, output=content)
summary = llm_call(
"You are a precise tool output summarizer.", user)
comp = len(summary) // 4
return CompressionResult(
strategy=CompressionStrategy.TOOL_RESULT,
original_tokens=output_tokens,
compressed_tokens=comp,
compression_ratio=output_tokens / max(comp, 1),
compressed_content=summary,
llm_calls_used=1,
)
class ProgressiveSummarizationMethod(CompressionMethod):
"""Multi-level summarization based on content age."""
# (max_age_turns, target_ratio, label)
LEVELS = [
(1, 1.0, "L0_full"),
(5, 0.30, "L1_key_points"),
(20, 0.05, "L2_one_line"),
(999, 0.01, "L3_title"),
]
def compress(self, content: str, context: dict,
llm_call: Callable) -> CompressionResult:
turns = context.get("turns", [])
compressed_parts = []
total_orig, llm_calls = 0, 0
for turn in turns:
age = turn.get("age", 0)
text = turn.get("text", "")
total_orig += len(text) // 4
target_ratio, label = 1.0, "L0_full"
for max_age, ratio, lbl in self.LEVELS:
if age <= max_age:
target_ratio, label = ratio, lbl
break
if target_ratio >= 1.0:
compressed_parts.append(
f"[Turn {turn['turn']}] {text}")
else:
target_tokens = max(
int((len(text) // 4) * target_ratio), 10)
prompt = (
f"Compress to ~{target_tokens} tokens ({label}). "
f"Preserve: decisions, errors, key findings, numbers.\n\n"
f"{text}")
summary = llm_call(
"You are a progressive summarizer.", prompt)
compressed_parts.append(
f"[Turn {turn['turn']}, {label}] {summary}")
llm_calls += 1
compressed = "\n\n".join(compressed_parts)
comp_tokens = len(compressed) // 4
return CompressionResult(
strategy=CompressionStrategy.PROGRESSIVE,
original_tokens=total_orig,
compressed_tokens=comp_tokens,
compression_ratio=total_orig / max(comp_tokens, 1),
compressed_content=compressed,
llm_calls_used=llm_calls,
)
class NoteTakingMethod(CompressionMethod):
"""Structured notes written during operation; notes carried forward."""
def __init__(self):
self.notes: dict = {} # category -> list of notes
def write_note(self, category: str, note: str,
importance: str = "normal"):
"""Called by the agent during operation to record a note."""
self.notes.setdefault(category, []).append({
"content": note,
"importance": importance,
"timestamp": time.time(),
})
def get_notes_for_context(self, max_tokens: int = 2000) -> str:
"""Generate a context-ready notes summary bounded by max_tokens."""
sections = []
budget = max_tokens
priority = ["task_progress", "decisions", "bugs",
"constraints", "learnings", "misc"]
for category in priority:
if category not in self.notes:
continue
cat_notes = sorted(
self.notes[category],
key=lambda n: (
0 if n["importance"] == "critical" else 1,
-n["timestamp"]))
lines = [f"## {category.replace('_', ' ').title()}"]
for n in cat_notes:
line = f"- [{n['importance'].upper()}] {n['content']}"
est = len(line) // 4
if budget - est < 50 and lines:
break
lines.append(line)
budget -= est
sections.append("\n".join(lines))
return "\n\n".join(sections)
def compress(self, content: str, context: dict,
llm_call: Callable) -> CompressionResult:
max_tokens = context.get("max_notes_tokens", 2000)
notes_text = self.get_notes_for_context(max_tokens)
orig = context.get("original_tokens", len(content) // 4)
comp = len(notes_text) // 4
return CompressionResult(
strategy=CompressionStrategy.NOTE_TAKING,
original_tokens=orig,
compressed_tokens=comp,
compression_ratio=orig / max(comp, 1),
compressed_content=notes_text,
llm_calls_used=0,
)
# ---- Compression Engine ----
@dataclass
class CompressionEngine:
"""Orchestrates compression strategies; executes the cascade."""
compaction: CompactionMethod = field(default_factory=CompactionMethod)
tool_summarizer: ToolResultSummarizationMethod = field(
default_factory=ToolResultSummarizationMethod)
progressive: ProgressiveSummarizationMethod = field(
default_factory=ProgressiveSummarizationMethod)
note_taking: NoteTakingMethod = field(default_factory=NoteTakingMethod)
# Sub-agent dispatcher (delegates to multi-agent-orchestration)
sub_agent_dispatcher: Optional[Callable] = None
# Cascade trigger thresholds
summarization_token_threshold: int = 500
progressive_age_threshold: int = 5
compaction_utilization_threshold: float = 0.85
def compress_tool_result(self, tool_name: str, output: str,
llm_call: Callable) -> CompressionResult:
"""Strategy 3: Compress a single large tool output."""
return self.tool_summarizer.compress(
output,
context={"tool_name": tool_name,
"output_tokens": len(output) // 4},
llm_call=llm_call)
def compact_conversation(self, history: str, task_goal: str,
token_count: int,
llm_call: Callable) -> CompressionResult:
"""Strategy 1: Full conversation compaction."""
return self.compaction.compress(
history,
context={"task_goal": task_goal,
"original_tokens": token_count},
llm_call=llm_call)
def progressive_summarize(self, turns: list,
llm_call: Callable) -> CompressionResult:
"""Strategy 4: Progressive summarization by age."""
total = sum(len(t.get("text", "")) // 4 for t in turns)
return self.progressive.compress(
"", context={"turns": turns, "original_tokens": total},
llm_call=llm_call)
def dispatch_to_sub_agent(self, task_spec: str,
required_context: str = "") -> CompressionResult:
"""Strategy 5: Delegate to sub-agent with clean context."""
if not self.sub_agent_dispatcher:
return CompressionResult(
strategy=CompressionStrategy.SUB_AGENT,
original_tokens=len(task_spec) // 4,
compressed_tokens=len(task_spec) // 4,
compression_ratio=1.0,
compressed_content="[Sub-agent delegation not configured]",
)
sub = self.sub_agent_dispatcher(task_spec, required_context)
return CompressionResult(
strategy=CompressionStrategy.SUB_AGENT,
original_tokens=sub.get("sub_agent_tokens_used", 0),
compressed_tokens=sub.get("summary_tokens", 0),
compression_ratio=sub.get("compression_ratio", 0.0),
compressed_content=sub.get("summary", ""),
llm_calls_used=sub.get("llm_calls", 0),
)
def execute_cascade(self, history: str, task_goal: str,
current_tokens: int, max_tokens: int,
llm_call: Callable,
turns: list = None) -> list:
"""Execute compression cascade based on pressure level.
Returns list of CompressionResult per step executed."""
results = []
utilization = current_tokens / max_tokens
# Step 2: Progressive summarization for aged turns
if turns and len(turns) > self.progressive_age_threshold:
results.append(self.progressive_summarize(turns, llm_call))
# Step 3: Full compaction at high utilization
if utilization >= self.compaction_utilization_threshold:
results.append(self.compact_conversation(
history, task_goal, current_tokens, llm_call))
return results
# -- Usage examples --
def mock_llm(system: str, user: str) -> str:
return "[Compressed: key decisions preserved, 3 unresolved issues, step 4/7]"
engine = CompressionEngine()
# 1. Tool result summarization
large = "Search results:\n" + "\n".join(
[f"File {i}: content line {j}" for i in range(200) for j in range(2)])
r = engine.compress_tool_result("search_files", large, mock_llm)
print(f"Tool: {r.original_tokens} -> {r.compressed_tokens} tokens "
f"({r.compression_ratio:.1f}x)")
# 2. Conversation compaction
history = "Turn 1-50: agent migration task..."
r = engine.compact_conversation(history, "Migrate axios to fetch",
12000, mock_llm)
print(f"Compaction: {r.original_tokens} -> {r.compressed_tokens} tokens "
f"({r.compression_ratio:.1f}x)")
# 3. Progressive summarization
turns = [
{"turn": 1, "text": "User requested migration of 23 axios calls...", "age": 25},
{"turn": 10, "text": "Began file-by-file replacement...", "age": 15},
{"turn": 20, "text": "Currently replacing file 12/23...", "age": 5},
{"turn": 25, "text": "Just fixed build error in api.ts...", "age": 0},
]
r = engine.progressive_summarize(turns, mock_llm)
print(f"Progressive: {r.original_tokens} -> {r.compressed_tokens} tokens "
f"({r.compression_ratio:.1f}x, {r.llm_calls_used} calls)")
# 4. Note-taking
engine.note_taking.write_note(
"decisions", "Use native fetch with custom error wrapper", "critical")
engine.note_taking.write_note(
"bugs", "api.ts:42 -- type error after migration, see build log", "high")
engine.note_taking.write_note(
"task_progress", "Completed 12/23 files, currently on api.ts", "normal")
notes = engine.note_taking.get_notes_for_context(max_tokens=500)
print(f"Notes:\n{notes}")
The CompressionEngine is the most architecturally complex component in the context management system because it must make cost-quality trade-offs at runtime. Each strategy has a different cost profile (LLM calls, tokens consumed) and fidelity profile (what information is preserved vs lost). The engine's design -- pluggable methods with a cascade execution order -- allows you to start conservative and progressively optimize as you gather fidelity metrics.
For the token-budgeting system that tracks how many tokens each compression strategy consumes and whether the cost is justified by space freed, continue to Section 5. For sub-agent orchestration patterns, see Multi-Agent Orchestration.
5. Token Budgeting: Tracking, Allocating, and Enforcing Limits
Eviction and compression answer the question: "the context is full — what do I do now?" Token budgeting answers the more fundamental question: "how much space should each component have in the first place, and am I tracking actual vs expected consumption?" Budgeting is fire prevention; eviction and compression are firefighting. Both are necessary, but a well-tuned budget reduces the frequency and severity of eviction+compression interventions.
Why Budgeting Matters: The Tragedy of the Commons
The context window is a shared resource. System prompt, tool definitions, message history, tool results, memory injections, and output reserve all compete for the same finite token pool. Without explicit budgets, the tragedy of the commons plays out predictably: tool results balloon to consume 50% of the window, message history grows unbounded, and the output reserve — the space the LLM needs to actually generate a response — shrinks below the minimum viable threshold. The agent doesn't crash because the model is bad; it crashes because no one was tracking who was consuming the window.
Trade-off — Static vs Dynamic Allocation: Static allocation (fixed percentages per component) is simple, predictable, and debuggable. Dynamic allocation (budget shifts based on task phase) can be more efficient but introduces complexity and tuning overhead. The recommendation: start with static allocation, monitor actual consumption patterns across 10+ real agent runs, and only then consider adding dynamism. Premature dynamic allocation is the leading cause of budget-tuning-death-spirals — where shifting budgets create feedback loops that make certain tasks unfinishable.
The Six Budget Components
| Component | Recommended % | 128K Tokens | Strategy | Overflow Action |
|---|---|---|---|---|
| System Prompt | 5--8% | ~6K--10K | Fixed, never evicted | 💀 Cannot overflow — prompt must be trimmed at design time |
| Tool Definitions | 3--5% | ~4K--6K | Fixed, lazy-load optional | Use tool search/retrieval instead of full injection |
| Message History | 60--70% | ~77K--90K | Dynamic, primary eviction target | Trigger compression cascade (Section 4) |
| Tool Results | 10--15% | ~13K--19K | Capped, auto-summarize | Summarize oldest results; evict if still over |
| Memory Injections | 5--10% | ~6K--13K | Capped, relevance-filtered | Raise relevance threshold; evict stale |
| Output Reserve | 4--8% | ~5K--10K | Always preserved | If <4K remaining, emergency compaction before next LLM call |
Trade-off — Generous vs Conservative Output Reserve: A generous output reserve (8%+) guarantees room for complex, multi-step reasoning chains — critical for coding and planning agents. A conservative reserve (4%) maximizes space for history but risks the LLM producing truncated responses when deep reasoning is required. The risk asymmetry is clear: a truncated response breaks the agent loop; slightly less history space means one extra compression cycle. Err on the side of generous — the output reserve is your circuit breaker.
Soft Limits vs Hard Limits: Designing Graceful Degradation
Budget enforcement is not binary. A hard limit that rejects content at 100% creates brittle failure — the agent hits a wall and stops. A soft limit that warns at 80% creates graceful degradation — the agent has time to compress, evict, or reallocate before hitting the wall.
Component budget lifecycle:
┌─────────────────────────────────────────────────────────────┐
│ 0% ─────────── 80% (soft) ─────── 100% (hard) │
│ │ │ │ │
│ │ Normal ops │ Pre-compress │ Force action │
│ │ No alerts │ WARNING emitted │ Block addition │
│ │ │ Prepare eviction │ Trigger cascade │
│ └─────────────────┴────────────────────┴───────────────────┘
Trade-off — Per-Component vs Global Budgeting: Per-component budgets (this design) give precise control but require tuning 6 allocation percentages. Global budgeting (one number: "keep utilization under 80%") is simpler but provides no insight into which component is the problem. The recommendation: implement per-component tracking (for observability) but enforce globally (for simplicity). If tool results are consuming 40% instead of the budgeted 12%, you want to know that — even if you're not blocking the addition. See Agent Observability for exporting these component-level metrics.
Burn Rate Prediction: From Reactive to Proactive Budgeting
The most valuable feature of a budget manager is not "how much is used now" — that's a snapshot. The most valuable feature is "how many turns until this budget is exhausted?" This transforms context management from reactive (responding to crises) to proactive (anticipating them):
turns_remaining = (component_budget - used) / avg_tokens_per_turn_for_component
urgency = "critical" if turns_remaining < 3 else "warning" if turns_remaining < 10 else "normal"
If the message history budget has 3 turns remaining but the task needs 15 more turns, the system knows to trigger compression now, not when the budget is exhausted. This is the difference between a controlled landing and a crash.
For the design of tools that minimize token consumption per result — reducing burn rate at the source — see Agent Tool Design.
Code: TokenBudgetManager
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
from collections import deque
import math
class BudgetComponent(Enum):
SYSTEM_PROMPT = "system_prompt"
TOOL_DEFINITIONS = "tool_definitions"
MESSAGE_HISTORY = "message_history"
TOOL_RESULTS = "tool_results"
MEMORY_INJECTION = "memory_injection"
OUTPUT_RESERVED = "output_reserved"
@dataclass
class ComponentBudget:
"""Budget tracking for a single context-window component."""
hard_limit_tokens: int = 0
used: int = 0
soft_limit_pct: float = 0.80
@property
def soft_limit(self) -> int:
return int(self.hard_limit_tokens * self.soft_limit_pct)
@property
def utilization(self) -> float:
return self.used / self.hard_limit_tokens if self.hard_limit_tokens else 0.0
@property
def is_soft_exceeded(self) -> bool:
return self.used >= self.soft_limit
@property
def is_hard_exceeded(self) -> bool:
return self.used >= self.hard_limit_tokens
@property
def remaining(self) -> int:
return max(0, self.hard_limit_tokens - self.used)
@dataclass
class TokenBudgetManager:
"""Tracks token consumption per component, enforces soft/hard limits,
predicts exhaustion timelines via burn-rate analysis.
Integration: called before every LLM invocation (check_before_add) and
after every turn (record_turn). Emits events for observability pipeline."""
max_context_tokens: int = 128_000
# Default allocation: system prompt 6%, tools 4%, history 65%,
# tool results 12%, memory 8%, output reserve 5%
allocations: dict = field(default_factory=lambda: {
BudgetComponent.SYSTEM_PROMPT: 0.06,
BudgetComponent.TOOL_DEFINITIONS: 0.04,
BudgetComponent.MESSAGE_HISTORY: 0.65,
BudgetComponent.TOOL_RESULTS: 0.12,
BudgetComponent.MEMORY_INJECTION: 0.08,
BudgetComponent.OUTPUT_RESERVED: 0.05,
})
budgets: dict = field(default_factory=dict)
burn_history: deque = field(default_factory=lambda: deque(maxlen=20))
on_soft_warning: Optional[Callable] = None
on_hard_violation: Optional[Callable] = None
def __post_init__(self):
for comp, pct in self.allocations.items():
self.budgets[comp] = ComponentBudget(
hard_limit_tokens=int(self.max_context_tokens * pct))
# ── Tracking ──
def track(self, component: BudgetComponent, tokens: int):
"""Register token consumption against a component budget."""
budget = self.budgets.get(component)
if not budget:
return
budget.used += tokens
def record_turn(self, tokens_this_turn: int):
"""Record total token consumption for this turn (for burn-rate calc)."""
self.burn_history.append(tokens_this_turn)
# ── Enforcement ──
def check_before_add(self, component: BudgetComponent,
tokens_to_add: int) -> dict:
"""Gate check: can we add tokens_to_add to this component's budget?
Returns {"allowed": True} or {"allowed": False, "reason": ...}.
Soft-limit crossings emit warnings; hard-limit crossings block."""
budget = self.budgets.get(component)
if not budget:
return {"allowed": True}
projected = budget.used + tokens_to_add
if projected >= budget.hard_limit_tokens:
if self.on_hard_violation:
self.on_hard_violation(component, budget, projected)
return {
"allowed": False,
"reason": f"Hard limit exceeded: {component.value} "
f"({budget.used}/{budget.hard_limit_tokens}, "
f"+{tokens_to_add} would reach {projected})",
"component": component.value,
"current": budget.used,
"limit": budget.hard_limit_tokens,
"excess": projected - budget.hard_limit_tokens,
}
if projected >= budget.soft_limit:
if self.on_soft_warning:
self.on_soft_warning(component, budget, projected)
return {
"allowed": True,
"warning": f"Soft limit exceeded: {component.value} "
f"({budget.used}/{budget.hard_limit_tokens})",
"component": component.value,
"utilization_after": round(projected / budget.hard_limit_tokens, 2),
}
return {"allowed": True}
# ── Prediction ──
@property
def burn_rate(self) -> float:
"""Average tokens consumed per turn over recent history."""
if not self.burn_history:
return 0.0
return sum(self.burn_history) / len(self.burn_history)
def turns_until_exhausted(self) -> Optional[float]:
"""Estimate how many turns until total context is exhausted."""
rate = self.burn_rate
if rate <= 0:
return None
total_used = sum(b.used for b in self.budgets.values())
remaining = self.max_context_tokens - total_used
return max(0.0, remaining / rate)
# ── Reporting ──
@property
def total_used(self) -> int:
return sum(b.used for b in self.budgets.values())
@property
def total_utilization(self) -> float:
return self.total_used / self.max_context_tokens
def report(self) -> dict:
"""Full budget status report for observability pipeline."""
components = {}
for comp, budget in self.budgets.items():
components[comp.value] = {
"used": budget.used,
"limit": budget.hard_limit_tokens,
"remaining": budget.remaining,
"utilization": round(budget.utilization, 2),
"soft_exceeded": budget.is_soft_exceeded,
"hard_exceeded": budget.is_hard_exceeded,
}
return {
"components": components,
"total_used": self.total_used,
"total_utilization": round(self.total_utilization, 2),
"burn_rate_tokens_per_turn": round(self.burn_rate, 1),
"turns_until_exhausted": self.turns_until_exhausted(),
}
# ── Usage ──
def on_warning(comp, budget, projected):
print(f"[BUDGET WARN] {comp.value}: {budget.used}/{budget.hard_limit_tokens} "
f"→ {projected} ({budget.utilization:.0%}→{projected/budget.hard_limit_tokens:.0%})")
def on_violation(comp, budget, projected):
print(f"[BUDGET BLOCK] {comp.value}: would exceed hard limit "
f"({projected} > {budget.hard_limit_tokens})")
mgr = TokenBudgetManager(max_context_tokens=128_000,
on_soft_warning=on_warning,
on_hard_violation=on_violation)
# Track initial allocations
mgr.track(BudgetComponent.SYSTEM_PROMPT, 5_000)
mgr.track(BudgetComponent.TOOL_DEFINITIONS, 3_000)
mgr.track(BudgetComponent.MESSAGE_HISTORY, 45_000)
# Check before adding a large tool result
check = mgr.check_before_add(BudgetComponent.TOOL_RESULTS, 8_000)
print(f"Add 8K tool result: {check}")
# Simulate burn-rate tracking across turns
for turn_tokens in [2_000, 3_500, 2_800, 5_200, 3_100]:
mgr.record_turn(turn_tokens)
print(f"Total utilization: {mgr.total_utilization:.1%}")
print(f"Burn rate: {mgr.burn_rate:.0f} tokens/turn")
print(f"Turns until exhausted: {mgr.turns_until_exhausted()}")
The TokenBudgetManager is the financial controller of the context window. Its check_before_add method is the gate that prevents runaway consumption — called before every content addition, it either approves the addition, warns of approaching limits, or blocks outright. The design choice to separate tracking (track) from enforcement (check_before_add) is deliberate: tracking is always safe; enforcement involves policy decisions that may need to vary by task phase or severity level.
Trade-off — Precise Tracking vs Estimation Overhead: Token counting is inherently imprecise. Different tokenizers produce different counts for the same text. The budget manager should use the same tokenizer as the target model, but even then, API overhead (message formatting, role tokens) adds ~3--5% that client-side counting misses. The pragmatic approach: track client-side with a 5% safety margin. Running at 95% client-side utilization is effectively at 100% model-side. This safety margin is built into the soft-limit positioning at 80% — there's headroom for estimation error.
For the protocol layer that routes content into the correct budget bucket based on envelope type, see Agent Context Protocol Design.
6. Cross-Window State Management: Continue Work Across Context Windows
Compression and eviction extend the life of a context window, but they cannot make it infinite. Some tasks — multi-hour code migrations, exhaustive security audits, long-running research — will inevitably exhaust even a well-managed window. When that happens, the agent must cross the window boundary: serialize its state before the current window closes, and reconstruct it in a fresh window. This is the last line of defense — the mechanism that turns a finite context window into an effectively unbounded agent runtime.
The Serialization Contract: What Must Survive
Not everything in the context window is worth carrying across the boundary. The serialization contract defines exactly what state must survive a window reset for the agent to resume without re-deriving previous work:
| State Artifact | Serialized As | Loss Consequence | Priority |
|---|---|---|---|
| Task Goal + Constraints | Verbatim text | Agent forgets what it's building | 🔴 Critical |
| Progress State | Checklist (done/current/todo) | Duplicate work or skip steps | 🔴 Critical |
| Architectural Decisions | ADR log (decision, rationale, alternatives) | Re-debate settled questions | 🟡 High |
| Open Issues/Blockers | Issue list with reproduction steps | Lose awareness of unresolved problems | 🟡 High |
| Environment State | Paths, git status, service health | Operate in wrong context | 🟡 High |
| Key Learnings | Pattern log | Repeat known-bad approaches | 🟠 Medium |
| Intermediate Reasoning | Omitted (inferrable from decisions) | Minor re-derivation cost | 🟢 Low |
The crucial design decision: The serialized state object is not a compressed version of the context window — it is a structured, machine-parseable checkpoint. The difference is profound. A compressed summary says "the agent was working on X and found Y." A structured checkpoint says {"current_subtask": "replace axios in api.ts", "completed": ["scan references", "build error handler"], "blockers": []}. The summary is for human reading; the checkpoint is for programmatic resumption.
The Bootstrap Sequence: Cold-Starting a Resume
When a fresh context window opens and loads serialized state, the agent executes a standardized bootstrap sequence to re-establish operational readiness:
- Orient: Verify working directory, git branch, tool availability — the environment must match what the state expects.
- Load state: Parse the serialized state file. Validate schema completeness (task goal present? progress list well-formed?).
- Verify environment: Run
git status, check running services, confirm file paths still exist. Environment drift between windows is the most common cause of resume failures. - Inject context: Build the initial context window from state — task goal, current subtask, recent decisions, open issues, compaction summary from previous window.
- Resume: Begin execution from
current_subtask. The agent should not re-derive, re-search, or re-analyze anything captured in state.
Trade-off — Eager vs Lazy State Loading: Eager loading (inject all state into the new window immediately) gives the agent full awareness but consumes the new window's token budget upfront. Lazy loading (inject only the current subtask; retrieve other state on demand) conserves tokens but risks the agent making decisions without full context. For most agent tasks, eager loading with prioritization works best: inject the top-priority state (goal, current task, blockers) immediately, and append lower-priority state (learnings, archived decisions) as the window has room.
Three Failure Modes of Cross-Window State
Failure 1 — Orphan State
The agent writes "plan to refactor user-service next" to the state file but is interrupted before executing. The next window reads this as a completed plan and skips it — or worse, assumes the refactor was done. Fix: Distinguish planned, in_progress, and done statuses. On window resume, treat planned items as unexecuted intentions.
Failure 2 — Stale Checkpoints
Window 3 writes state. Window 4 starts, makes progress, writes updated state. Then a bug causes window 3's old state file to be read — reverting progress. Fix: Monotonically incrementing window ID + state version number. On load, reject any state with version < current version. This is a simple optimistic concurrency control.
Failure 3 — Implicit State
The agent "knows" something from earlier in the conversation but never writes it to the state object — because it assumed it would always be in context. After window reset, that knowledge is gone. Fix: The serialization contract must be exhaustive. If the agent learned it, it must serialize it. This is a discipline problem, not a technical one — the state manager can validate schema but cannot detect missing information.
Cross-window state management is the recovery path for context overflow, making it a natural integration point with Agent Error Recovery — the error recovery system detects the overflow, triggers state serialization before the crash, and hands off to the bootstrap sequence for the next window.
Code: CrossWindowStateManager
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Optional
import json
import os
@dataclass
class TaskCheckpoint:
"""Serializable agent state that survives context-window boundaries.
DESIGN RULE: Every field must be populated before serialization.
Missing fields = lost information = agent amnesia on resume."""
window_id: int = 1
state_version: int = 1
task_goal: str = ""
success_criteria: list = field(default_factory=list)
constraints: list = field(default_factory=list)
completed_subtasks: list = field(default_factory=list)
current_subtask: str = ""
remaining_subtasks: list = field(default_factory=list)
decisions: list = field(default_factory=list) # ADR entries
open_issues: list = field(default_factory=list) # blockers + bugs
learnings: list = field(default_factory=list) # patterns + pitfalls
compaction_summary: str = "" # previous window's compressed history
saved_at: str = ""
def increment_version(self):
self.state_version += 1
@dataclass
class CrossWindowStateManager:
"""Manages serialization, deserialization, validation, and bootstrap
prompt generation for cross-window agent state.
Integration: called by ContextWindowManager.on_context_overflow() to
serialize state before window close; called at window start to load
and validate checkpoint for resume."""
state_file: str = "agent_state.json"
progress_file: str = "agent_progress.md"
# ── Serialization ──
def save(self, checkpoint: TaskCheckpoint):
"""Serialize state to disk before context window closes.
Writes both JSON (machine-parseable, for automated resume) and
Markdown (human-readable, for debugging and audit)."""
checkpoint.saved_at = datetime.now(timezone.utc).isoformat()
checkpoint.increment_version()
# Machine-parseable checkpoint
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(asdict(checkpoint), f, ensure_ascii=False, indent=2)
# Human-readable progress log
self._write_progress_md(checkpoint)
def load(self) -> Optional[TaskCheckpoint]:
"""Load serialized state at new window start.
Returns None if no prior state exists (cold start).
Validates schema completeness before returning."""
if not os.path.exists(self.state_file):
return None
with open(self.state_file, "r", encoding="utf-8") as f:
data = json.load(f)
checkpoint = TaskCheckpoint(**{k: v for k, v in data.items()
if k in TaskCheckpoint.__dataclass_fields__})
validation = self.validate(checkpoint)
if validation["errors"]:
print(f"[STATE VALIDATION] Warnings: {validation['errors']}")
return checkpoint
# ── Bootstrap ──
def bootstrap_prompt(self, cp: TaskCheckpoint) -> str:
"""Generate the initial system injection for a resumed window.
The output text is inserted at the top of the new context window,
providing the agent with everything it needs to resume immediately
without re-deriving previous decisions."""
lines = [
"[CONTEXT_WINDOW_RESUME] Resuming from checkpoint.",
f"Window: {cp.window_id} | Version: {cp.state_version}",
f"Task: {cp.task_goal}",
"",
f"Completed ({len(cp.completed_subtasks)}):",
]
for t in cp.completed_subtasks[-10:]:
lines.append(f" [DONE] {t}")
lines.append(f"\nCurrent: {cp.current_subtask or '(none specified)'}")
lines.append(f"\nRemaining ({len(cp.remaining_subtasks)}):")
for t in cp.remaining_subtasks[:10]:
lines.append(f" [TODO] {t}")
if cp.open_issues:
lines.append(f"\nOpen Issues ({len(cp.open_issues)}):")
for issue in cp.open_issues:
lines.append(f" [!] {issue}")
if cp.decisions:
lines.append(f"\nKey Decisions ({len(cp.decisions)}):")
for d in cp.decisions[-5:]:
lines.append(f" [DEC] {d}")
if cp.compaction_summary:
lines.append(f"\nPrevious Window Summary:\n{cp.compaction_summary}")
return "\n".join(lines)
# ── Validation ──
def validate(self, cp: TaskCheckpoint) -> dict:
"""Schema and integrity validation before resuming from checkpoint.
Returns {"errors": [...], "warnings": [...]}. Errors indicate the
checkpoint is likely unusable; warnings indicate recoverable issues."""
errors, warnings = [], []
if not cp.task_goal:
errors.append("Missing task_goal — agent will have no objective")
if not cp.current_subtask and cp.remaining_subtasks:
warnings.append("Has remaining subtasks but no current_subtask — "
"agent may pick wrong starting point")
if cp.window_id < 1:
errors.append(f"Invalid window_id: {cp.window_id}")
if cp.state_version < 1:
errors.append(f"Invalid state_version: {cp.state_version}")
return {"errors": errors, "warnings": warnings, "valid": len(errors) == 0}
# ── Internal ──
def _write_progress_md(self, cp: TaskCheckpoint):
with open(self.progress_file, "w", encoding="utf-8") as f:
f.write(f"# Agent Progress — Window {cp.window_id} (v{cp.state_version})\n\n")
f.write(f"**Goal:** {cp.task_goal}\n\n")
f.write(f"**Saved:** {cp.saved_at}\n\n")
f.write("## ✅ Completed\n")
for t in cp.completed_subtasks:
f.write(f"- [x] {t}\n")
f.write(f"\n## 🔄 Current\n- {cp.current_subtask}\n\n")
f.write("## 📋 Remaining\n")
for t in cp.remaining_subtasks:
f.write(f"- [ ] {t}\n")
if cp.open_issues:
f.write(f"\n## ⚠️ Open Issues\n")
for issue in cp.open_issues:
f.write(f"- {issue}\n")
if cp.decisions:
f.write(f"\n## 📐 Decisions\n")
for d in cp.decisions:
f.write(f"- {d}\n")
# ── Usage: window boundary crossing ──
mgr = CrossWindowStateManager()
# Window 1: save before forced reset
cp = TaskCheckpoint(
window_id=1,
task_goal="Migrate user-service REST calls from axios to fetch",
completed_subtasks=["Scanned 23 axios references", "Built error-handling wrapper",
"Migrated 12/23 files"],
current_subtask="Migrate api.ts (file 13/23)",
remaining_subtasks=["Migrate remaining 10 files", "Integration tests",
"Canary deploy"],
decisions=["Use native fetch + custom error wrapper (not a library)",
"Keep response interceptor pattern for consistency"],
open_issues=["api.ts:42 — type incompatibility after migration"],
learnings=["v3 API uses /v2/ prefix, not /v1/"],
compaction_summary="Migration 52% complete. 12 files done, api.ts in progress. "
"One type error at api.ts:42 unresolved.",
)
mgr.save(cp)
# Window 2: load and resume
loaded = mgr.load()
if loaded:
bootstrap = mgr.bootstrap_prompt(loaded)
# Inject bootstrap into new context window as first system message
print(bootstrap[:300] + "...")
The CrossWindowStateManager is the safety net that makes long-running agents viable. Without it, every context overflow is a hard crash with total state loss. With it, an overflow becomes a minor checkpoint — a moment of serialization followed by clean resumption. The dual-format output (JSON for machines, Markdown for humans) is a deliberate design choice: JSON enables automated resumption without parsing ambiguity; Markdown enables debugging when something goes wrong and a human needs to inspect what the agent thought it knew.
Trade-off — Checkpoint Frequency: Saving after every turn guarantees minimal data loss but adds I/O overhead. Saving only at overflow risks losing turns of work if the overflow is sudden. The recommended approach: save at every major milestone (subtask completion, decision made, blocker found) plus a safety save whenever utilization crosses 85%. This balances overhead against data-loss risk. The checkpoint itself is tiny (~2--5KB) so I/O cost is negligible; the real cost is the discipline of keeping the state object current.
7. Context Health Monitoring: Metrics, Alerts, and Fidelity Evaluation
You've implemented pressure detection, eviction, compression, budgeting, and cross-window state. The systems are running. But a question remains: are they working? Is compression losing critical information? Is eviction removing content the agent needed three turns later? Is the budget allocation appropriate for this task type? Without monitoring, you're flying blind — you won't know about failures until the agent produces visibly wrong output, which in production can mean hours of wasted compute and incorrect results.
Six Metrics That Matter
Not all metrics are equal. The six below form the minimum viable set for context health monitoring — they answer the questions "is the window healthy?" and "are the management interventions working?":
| Metric | What It Measures | Healthy Range | Alert Threshold | Action If Breached |
|---|---|---|---|---|
| Utilization | Current tokens / max window | <60% | >90% for 3+ turns | Emergency compaction |
| Eviction Rate | Blocks evicted per turn | <2/turn | >5/turn sustained | Window may be undersized; review eviction policy |
| Compression Ratio | Original tokens / compressed tokens | 3x--15x | <2x consistently | Compaction prompt needs redesign |
| Burn Rate | Tokens consumed per turn | Stable or linear | Sudden 3x+ spike | Possible runaway loop; escalate to error recovery |
| Compression Fidelity | Key information retention post-compression | >0.85 | <0.75 | Compression too aggressive; add preservation rules |
| Tool Result Bloat | Tool result tokens / total context | 10--15% | >30% | Tools returning too much data; add output limits |
Trade-off — Metric Granularity vs Observability Cost: Emitting per-turn metrics with 6 dimensions costs negligible compute (~5ms per turn) but provides rich diagnostic data. Emitting only utilization is simpler but leaves you blind to why utilization is high. The recommendation: emit all six. The cost of not knowing why your agent is failing far exceeds the cost of collecting the metrics. For the observability infrastructure that ingests these metrics, see Agent Observability.
Compression Fidelity: The Hardest Metric
Compression ratio is easy to measure — it's just division. Fidelity is hard because it asks: "did the compressed version preserve the information needed for correct decision-making?" This cannot be measured by token counting alone. The standard approach is LLM-as-Judge evaluation:
- Prepare a test suite: For each agent task type, create N questions that probe critical information — "What was the decision about error handling?" "What file is currently being modified?" "What unresolved issue was found?"
- Query both contexts: Ask the same question against the original (pre-compression) context and the compressed context. Record both answers.
- Judge consistency: A separate LLM call (the "judge") compares the two answers and scores whether they convey the same key information. The judge prompt must be precise: "Do these two answers agree on the factual claims they make? Ignore phrasing differences. Answer YES or NO."
- Compute fidelity:
fidelity = consistent_answers / total_questions. Target >0.85.
Trade-off — LLM-as-Judge vs Human Evaluation: LLM-as-Judge is cheap, fast, and automatable — but imperfect. Judges can miss subtle information loss (a nuance that matters in context but not in isolation). Human evaluation is the gold standard but costs time and doesn't scale. The pragmatic approach: use LLM-as-Judge for CI regression testing (every prompt change), and periodically validate the judge itself with human spot-checks (every 10--20 runs). If the judge consistently gives high fidelity scores but human reviewers spot critical losses, the judge prompt needs refinement.
Alerting: When Metrics Become Actions
Metrics without alerts are dashboards people forget to check. Alerts without severity gradation create alert fatigue. The context health monitor emits alerts at three levels:
- CRITICAL: Immediate action required. Utilization >90% for 3+ turns — context will overflow within 1--2 turns. Trigger emergency compaction and prepare cross-window save.
- WARNING — Degradation: System is trending in the wrong direction. Eviction rate >5/turn sustained, fidelity <0.75, compression ratio <2x — the management strategies are losing effectiveness. Review prompt design and eviction weights.
- WARNING — Anomaly: Unusual behavior that may indicate a bug. Burn rate spike >3x historical average — possible runaway agent loop. Escalate to Agent Error Recovery for loop detection.
Trade-off — Alert Thresholds: Tight thresholds (alert at 85% utilization) create false positives — alerts that fire when no real problem exists, training operators to ignore them. Loose thresholds (alert at 98%) create false negatives — the overflow happens before the alert fires. The thresholds in this implementation (90% for CRITICAL, 75% fidelity floor) are calibrated for 128K windows with typical agent workloads. Tune them based on your agent's velocity profile and risk tolerance.
Code: ContextHealthMonitor
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Callable
from enum import Enum
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class HealthSnapshot:
"""A single point-in-time measurement of context window health."""
timestamp: str = ""
utilization: float = 0.0
eviction_rate: float = 0.0 # evictions per turn
compression_ratio: float = 1.0 # original / compressed (>1 = effective)
burn_rate: float = 0.0 # tokens per turn
fidelity_score: float = 1.0 # 0.0–1.0
tool_result_bloat: float = 0.0 # tool result tokens / total tokens
window_id: int = 1
@dataclass
class ContextHealthMonitor:
"""Collects health metrics from all subsystems, evaluates compression
fidelity via LLM-as-Judge, and emits graded alerts to observability.
Integration: called per-turn (collect) and post-compression (evaluate_fidelity).
Alerts feed into the observability pipeline for dashboards and notifications."""
metrics_history: list = field(default_factory=list)
max_history: int = 100
fidelity_test_questions: list = field(default_factory=list)
on_alert: Optional[Callable] = None
_eviction_counter: int = 0
_turn_counter: int = 0
# ── Collection ──
def collect(self, pressure_monitor, eviction_engine,
budget_manager) -> HealthSnapshot:
"""Gather metrics from all active subsystems into a health snapshot."""
self._turn_counter += 1
snap = HealthSnapshot(
timestamp=datetime.now(timezone.utc).isoformat(),
utilization=pressure_monitor.utilization(),
eviction_rate=self._eviction_counter / max(self._turn_counter, 1),
burn_rate=budget_manager.burn_rate,
window_id=getattr(pressure_monitor, '_turn_count', 0),
)
# Tool result bloat: what fraction of context is tool output?
total_used = budget_manager.total_used
tr_used = budget_manager.budgets.get(
type(budget_manager).__dataclass_fields__['budgets'].type.__args__[0]
if hasattr(budget_manager, 'budgets') else None, None)
snap.tool_result_bloat = (tr_used.used / max(total_used, 1)
if tr_used else 0.0)
self.metrics_history.append(snap)
if len(self.metrics_history) > self.max_history:
self.metrics_history.pop(0)
return snap
def record_eviction(self, count: int = 1):
"""Called by EvictionEngine after each eviction operation."""
self._eviction_counter += count
# ── Fidelity Evaluation ──
def evaluate_fidelity(self, original_context: str,
compressed_context: str,
llm_judge: Callable[[str], str]) -> float:
"""LLM-as-Judge compression fidelity evaluation.
llm_judge is a function (prompt: str) -> response: str.
Returns fidelity score 0.0–1.0."""
if not self.fidelity_test_questions:
return 1.0 # No test suite = assume fidelity is fine
consistent = 0
for i, question in enumerate(self.fidelity_test_questions):
# Query original context
a_orig = llm_judge(
f"Context:\n{original_context[:8000]}\n\n"
f"Question: {question}\nAnswer in 1-2 sentences.")
# Query compressed context
a_comp = llm_judge(
f"Context:\n{compressed_context[:8000]}\n\n"
f"Question: {question}\nAnswer in 1-2 sentences.")
# Judge consistency
verdict = llm_judge(
f"Compare these two answers. Do they convey the SAME key "
f"factual information? Ignore wording differences.\n\n"
f"Answer A: {a_orig}\nAnswer B: {a_comp}\n\n"
f"Reply ONLY 'yes' or 'no'.")
if "yes" in verdict.lower():
consistent += 1
return consistent / len(self.fidelity_test_questions)
# ── Alerting ──
def check_alerts(self, snap: HealthSnapshot) -> list:
"""Evaluate health snapshot against alert thresholds.
Returns list of alert dicts: {level, metric, message, value}."""
alerts = []
# CRITICAL: utilization > 90%
if snap.utilization > 0.90:
alerts.append({
"level": AlertLevel.CRITICAL,
"metric": "utilization",
"message": f"Context utilization at {snap.utilization:.1%} — "
f"overflow imminent",
"value": snap.utilization,
})
# WARNING: high eviction rate
if snap.eviction_rate > 5.0:
alerts.append({
"level": AlertLevel.WARNING,
"metric": "eviction_rate",
"message": f"Eviction rate {snap.eviction_rate:.1f}/turn — "
f"window may be undersized",
"value": snap.eviction_rate,
})
# WARNING: low compression fidelity
if snap.fidelity_score < 0.75:
alerts.append({
"level": AlertLevel.WARNING,
"metric": "fidelity_score",
"message": f"Compression fidelity {snap.fidelity_score:.2f} "
f"below 0.75 threshold — review compaction prompt",
"value": snap.fidelity_score,
})
# WARNING: ineffective compression
if 1.0 < snap.compression_ratio < 2.0:
alerts.append({
"level": AlertLevel.WARNING,
"metric": "compression_ratio",
"message": f"Compression ratio only {snap.compression_ratio:.1f}x "
f"— compaction prompt may need redesign",
"value": snap.compression_ratio,
})
# WARNING: tool result bloat
if snap.tool_result_bloat > 0.30:
alerts.append({
"level": AlertLevel.WARNING,
"metric": "tool_result_bloat",
"message": f"Tool results consuming {snap.tool_result_bloat:.0%} "
f"of context — add output limits to tools",
"value": snap.tool_result_bloat,
})
for alert in alerts:
if self.on_alert:
self.on_alert(alert)
return alerts
# ── Trend Analysis ──
def trend_report(self) -> dict:
"""Analyze metric trends over recent history for proactive detection."""
if len(self.metrics_history) < 5:
return {"status": "insufficient_data", "samples": len(self.metrics_history)}
recent = self.metrics_history[-10:]
first, last = recent[0], recent[-1]
return {
"samples": len(recent),
"utilization_delta": round(last.utilization - first.utilization, 3),
"burn_rate_delta": round(last.burn_rate - first.burn_rate, 1),
"avg_fidelity": round(
sum(m.fidelity_score for m in recent) / len(recent), 2),
"utilization_trend": "rising" if last.utilization > first.utilization
else "falling" if last.utilization < first.utilization
else "stable",
}
# ── Usage ──
def mock_judge(prompt: str) -> str:
"""Simulated LLM judge — in production, this calls an actual model."""
return "yes" if "error handling" in prompt.lower() else "no"
health = ContextHealthMonitor(
fidelity_test_questions=[
"What is the current task goal?",
"What step is currently in progress?",
"What architectural decision was made about error handling?",
"Are there any unresolved issues or blockers?",
],
on_alert=lambda a: print(f"[{a['level'].value.upper()}] {a['message']}"),
)
# Evaluate fidelity after a compaction
fidelity = health.evaluate_fidelity(
original_context="Decided to use native fetch with custom error wrapper. "
"Currently migrating api.ts (file 13/23). "
"One unresolved type error at api.ts:42.",
compressed_context="Using native fetch + error wrapper. Migrating api.ts. "
"Type error at api.ts:42 unresolved.",
llm_judge=mock_judge,
)
print(f"Compression fidelity: {fidelity:.2f}")
# Check alerts on a snapshot
snap = HealthSnapshot(
utilization=0.92, eviction_rate=6.0,
compression_ratio=3.5, burn_rate=2500.0,
fidelity_score=fidelity, tool_result_bloat=0.35
)
alerts = health.check_alerts(snap)
for a in alerts:
print(f" [{a['level'].value}] {a['metric']}: {a['message']}")
The ContextHealthMonitor closes the feedback loop. Every management decision — evict this block, compress this conversation, allocate this budget — produces measurable effects. The monitor captures those effects, evaluates them against thresholds, and surfaces problems before they become failures. The fidelity evaluation in particular is the quality gate for the entire compression subsystem: without it, you can endlessly tune compaction prompts for higher ratios with no awareness that you're silently losing critical state.
For the evaluation infrastructure that provides the LLM-as-Judge capabilities and test suite management, see Agent Evaluation Framework. For the audit trail that records every health snapshot as an immutable event, see Agent Audit Log Design.
8. Putting It All Together: The Complete ContextWindowManager Architecture
The previous seven sections designed six independent subsystems. But an agent doesn't call six subsystems — it calls one. The ContextWindowManager is the unified orchestrator that wires pressure monitoring, eviction, compression, budgeting, cross-window state, and health monitoring into a single integration surface. This section presents the complete architecture and its integration into the agent loop.
Architecture: Six Subsystems, One Orchestrator
┌──────────────────────────────────────────────────────────────────┐
│ Agent Loop │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Think │───▶│ Act │───▶│ Observe │───▶│ Think │ ... │
│ └────┬────┘ └────┬────┘ └────┬────┘ └─────────┘ │
│ │ │ │ │
└───────┼──────────────┼──────────────┼────────────────────────────┘
│ │ │
▼ ▼ ▼
┌───────────────────────────────────────────────────────────────────┐
│ ContextWindowManager │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌───────────────────┐ │
│ │ PressureMonitor │ │ EvictionEngine │ │ CompressionEngine │ │
│ │ │ │ │ │ │ │
│ │ • utilization │ │ • FIFO │ │ • Compaction │ │
│ │ • velocity │ │ • LRU │ │ • Note-Taking │ │
│ │ • zone detection │ │ • Priority │ │ • Tool Summarize │ │
│ │ • spike alerts │ │ • Semantic Merge │ │ • Progressive │ │
│ │ │ │ • Type-Based │ │ • Sub-Agent │ │
│ │ │ │ • Hybrid │ │ │ │
│ └────────┬─────────┘ └────────┬─────────┘ └─────────┬─────────┘ │
│ │ │ │ │
│ ┌────────┴─────────┐ ┌────────┴─────────┐ │
│ │ TokenBudgetMgr │ │ CrossWindowState │ │
│ │ │ │ │ │
│ │ • 6 components │ │ • Serialization │ │
│ │ • soft/hard lim. │ │ • Bootstrap │ │
│ │ • burn rate │ │ • Validation │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ┌────────┴─────────────────────┴─────────┐ │
│ │ ContextHealthMonitor │ │
│ │ • Metrics collection │ │
│ │ • Fidelity evaluation (LLM-as-Judge) │ │
│ │ • Alerting (CRITICAL/WARNING/INFO) │ │
│ └────────────────────┬───────────────────┘ │
│ │ │
└───────────────────────┼──────────────────────────────────────────────┘
│
▼
┌─────────────────────────┐
│ Observability Pipeline │
│ (Prometheus + Grafana) │
└─────────────────────────┘
Lifecycle: How an Agent Task Flows Through the Manager
From task start to completion, every phase of context management has a clear subsystem owner:
- Task Start:
TokenBudgetManagerallocates budgets from config.CrossWindowStateManagerchecks for a prior checkpoint (resume) or initializes fresh (cold start).ContextPressureMonitorbegins tracking utilization. - Before Each LLM Call (
on_turn_start): Monitor checks utilization → under 50%: no action; 50--75%: evaluate eviction candidates; 75--90%: execute eviction + tool-result summarization; 90--95%: conversation compaction; 95%+: emergency cross-window save. - After Each LLM Call (
on_turn_end): Budget manager records token consumption. Monitor updates utilization and velocity. Health monitor collects metrics snapshot. - After Tool Call (
on_tool_result): Budget manager tracks tool result tokens. If output exceeds summarization threshold, compress immediately. Eviction engine evaluates whether old tool results should be cleared. - On Context Overflow (
on_context_overflow): Cross-window state manager serializes checkpoint. Health monitor emits final metrics. Context window resets. Bootstrap sequence loads checkpoint into new window. Agent resumes. - Task Complete: Final compaction summary generated. Checkpoint archived. Health monitor emits completion snapshot with trend report.
Trade-off — Tight vs Loose Coupling: The orchestrator design intentionally does not hardwire the six subsystems together. Each subsystem is instantiated independently and passed to the manager. This loose coupling means you can swap the eviction engine's policy without touching anything else, replace the compression engine with a server-side API implementation, or run without the health monitor in environments where metrics aren't needed. The manager is a conductor, not a monolith. This design philosophy is shared with Multi-Agent Orchestration — each component has a clear interface contract; the orchestrator composes them.
Code: ContextWindowManager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Callable
from enum import Enum
class WindowStatus(Enum):
HEALTHY = "healthy" # Green: normal operation
PREPARING = "preparing" # Yellow: evaluating candidates
ACTIVE_MANAGEMENT = "active" # Orange: evicting + compressing
EMERGENCY = "emergency" # Red: cross-window imminent
@dataclass
class ContextWindowManager:
"""Unified orchestrator for all context window management subsystems.
This is the SINGLE integration point for the agent loop. The agent
calls four hooks per cycle:
on_turn_start() → before LLM call
on_turn_end() → after LLM call
on_tool_result() → after tool execution
on_overflow() → when context is about to overflow
Each hook delegates to the appropriate subsystem based on current
pressure zone and budget state."""
# ── Subsystem instances (injected, not created) ──
pressure_monitor: object = None # ContextPressureMonitor
eviction_engine: object = None # EvictionEngine
compression_engine: object = None # CompressionEngine
budget_manager: object = None # TokenBudgetManager
state_manager: object = None # CrossWindowStateManager
health_monitor: object = None # ContextHealthMonitor
# ── Configuration ──
max_context_tokens: int = 128_000
eviction_policy: str = "hybrid"
task_goal: str = ""
task_success_criteria: list = field(default_factory=list)
# ── Runtime state ──
status: WindowStatus = WindowStatus.HEALTHY
turn_count: int = 0
windows_created: int = 1
context_blocks: list = field(default_factory=list)
audit_log: list = field(default_factory=list)
# ── Callbacks for external integration ──
on_status_change: Optional[Callable] = None
on_overflow_detected: Optional[Callable] = None
# ═══════════════════════════════════════════════════════════════
# Hook 1: Before LLM Call
# ═══════════════════════════════════════════════════════════════
def on_turn_start(self, estimated_response_tokens: int = 4_000) -> dict:
"""Called before every LLM invocation.
Evaluates current context pressure, executes appropriate management
action, and returns a decision dict consumed by the agent loop."""
self.turn_count += 1
utilization = self.pressure_monitor.utilization()
# Verify output reserve
output_check = self.budget_manager.check_before_add(
type(self.budget_manager).__dataclass_fields__['budgets'].type
if hasattr(self.budget_manager, 'budgets') else None,
estimated_response_tokens)
# ── Zone-based decision routing ──
if utilization >= 0.95:
return self._emergency(utilization)
elif utilization >= 0.75:
return self._active_management(utilization)
elif utilization >= 0.50:
return self._preparing(utilization)
else:
self._set_status(WindowStatus.HEALTHY)
return {"action": "none", "utilization": utilization}
# ═══════════════════════════════════════════════════════════════
# Hook 2: After LLM Call
# ═══════════════════════════════════════════════════════════════
def on_turn_end(self, response_tokens: int):
"""Called after LLM response is received.
Updates pressure tracking, budget consumption, and health metrics."""
self.pressure_monitor.update(response_tokens)
self.budget_manager.track(
self._resolve_component("message_history"), response_tokens)
self.budget_manager.record_turn(response_tokens)
# Collect health snapshot
if self.health_monitor:
snap = self.health_monitor.collect(
self.pressure_monitor, self.eviction_engine,
self.budget_manager)
self.health_monitor.check_alerts(snap)
# ═══════════════════════════════════════════════════════════════
# Hook 3: After Tool Execution
# ═══════════════════════════════════════════════════════════════
def on_tool_result(self, tool_name: str, result: str,
token_count: int):
"""Called after a tool returns its output.
Tracks budget, triggers summarization for large outputs,
and evaluates eviction candidates for old tool results."""
# Budget tracking
self.budget_manager.track(
self._resolve_component("tool_results"), token_count)
# Summarize large tool outputs immediately
if token_count > 500:
if self.compression_engine:
result = self.compression_engine.compress_tool_result(
tool_name, result, self._get_llm_call())
# Register as context block for eviction scoring
# (simplified — in production, ContentBlock from EvictionEngine)
self.context_blocks.append({
"id": f"tool_{self.turn_count}_{tool_name}",
"type": "tool_result",
"tool": tool_name,
"tokens": token_count,
"turn": self.turn_count,
"content_snippet": result[:200],
})
# Pressure update
self.pressure_monitor.update(token_count)
self._log("tool_result", {"tool": tool_name, "tokens": token_count})
# ═══════════════════════════════════════════════════════════════
# Hook 4: Context Overflow
# ═══════════════════════════════════════════════════════════════
def on_overflow(self) -> dict:
"""Called when context is about to overflow.
Serializes agent state, prepares for window reset, and emits
final health metrics before the window closes."""
self._set_status(WindowStatus.EMERGENCY)
self._log("overflow", {"turn": self.turn_count})
# Build compaction summary for next window
summary = ""
if self.compression_engine:
history_text = self._format_blocks()
result = self.compression_engine.compact_conversation(
history_text, self.task_goal, len(history_text) // 4,
self._get_llm_call())
summary = result.compressed_content
# Serialize state for cross-window resume
if self.state_manager:
from cross_window_state_manager import TaskCheckpoint
cp = TaskCheckpoint(
window_id=self.windows_created,
task_goal=self.task_goal,
success_criteria=self.task_success_criteria,
compaction_summary=summary,
)
self.state_manager.save(cp)
# Final health snapshot
if self.health_monitor:
snap = self.health_monitor.collect(
self.pressure_monitor, self.eviction_engine,
self.budget_manager)
self.health_monitor.check_alerts(snap)
self.windows_created += 1
if self.on_overflow_detected:
self.on_overflow_detected({
"window": self.windows_created,
"turn": self.turn_count,
"summary_tokens": len(summary) // 4,
})
return {
"action": "cross_window_save",
"new_window_id": self.windows_created,
"compaction_summary_tokens": len(summary) // 4,
}
# ═══════════════════════════════════════════════════════════════
# Internal: Decision Routers
# ═══════════════════════════════════════════════════════════════
def _preparing(self, utilization: float) -> dict:
"""Yellow zone: evaluate eviction candidates without acting."""
self._set_status(WindowStatus.PREPARING)
# In production, this calls eviction_engine.select_eviction_candidates
# with target_free_tokens=0 to list candidates without removing them
return {
"action": "evaluate",
"utilization": utilization,
"recommendation": "Monitor closely — prepare eviction candidates",
}
def _active_management(self, utilization: float) -> dict:
"""Orange zone: execute eviction, then compression if needed."""
self._set_status(WindowStatus.ACTIVE_MANAGEMENT)
# Calculate target: aim to bring utilization down to ~60%
target_utilization = 0.60
target_tokens = int(self.max_context_tokens * target_utilization)
current_tokens = int(self.max_context_tokens * utilization)
target_free = current_tokens - target_tokens
# Step 1: Evict low-priority content
evicted_count = 0
if self.eviction_engine and target_free > 0:
# Convert context_blocks dicts to EvictionEngine ContentBlock objects
blocks = self._to_content_blocks()
result = self.eviction_engine.evict(
blocks, target_free_tokens=target_free,
current_turn=self.turn_count)
evicted_count = len(result.blocks_evicted)
if self.health_monitor:
self.health_monitor.record_eviction(evicted_count)
# Step 2: If eviction was insufficient, trigger compression cascade
strategy = None
if evicted_count == 0 and self.compression_engine:
strategy = self.compression_engine.select_strategy(utilization)
if strategy:
self.compression_engine.execute_cascade(
self._format_blocks(), self.task_goal,
current_tokens, self.max_context_tokens,
self._get_llm_call())
self._log("active_mgmt", {
"evicted": evicted_count,
"compression": strategy.value if strategy else None,
})
return {
"action": "evict_and_compress",
"utilization": utilization,
"evicted_blocks": evicted_count,
"compression_strategy": strategy.value if strategy else None,
}
def _emergency(self, utilization: float) -> dict:
"""Red zone: force compaction or trigger cross-window save."""
self._set_status(WindowStatus.EMERGENCY)
# Attempt emergency compaction
if self.compression_engine:
result = self.compression_engine.compact_conversation(
self._format_blocks(), self.task_goal,
int(self.max_context_tokens * utilization),
self._get_llm_call())
# If still at risk, trigger cross-window save
new_utilization = self.pressure_monitor.utilization()
if new_utilization >= 0.93:
return self.on_overflow()
self._log("emergency", {"utilization_before": utilization,
"utilization_after": new_utilization})
return {
"action": "emergency_compaction",
"utilization_before": utilization,
"utilization_after": new_utilization,
}
# ═══════════════════════════════════════════════════════════════
# Internal: Helpers
# ═══════════════════════════════════════════════════════════════
def _set_status(self, status: WindowStatus):
prev = self.status
self.status = status
if prev != status and self.on_status_change:
self.on_status_change(prev, status)
def _log(self, event_type: str, metadata: dict):
self.audit_log.append({
"turn": self.turn_count,
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"metadata": metadata,
})
def _format_blocks(self) -> str:
return "\n".join(
f"[{b.get('type', '?')} T{b['turn']}] {b.get('content_snippet', '')[:500]}"
for b in self.context_blocks[-50:])
def _to_content_blocks(self) -> list:
"""Convert internal block dicts to EvictionEngine ContentBlock objects."""
blocks = []
for b in self.context_blocks:
try:
from eviction_engine import ContentBlock, ContentType
ct_map = {
"user_message": ContentType.USER_MESSAGE,
"assistant_response": ContentType.ASSISTANT_RESPONSE,
"tool_result": ContentType.TOOL_RESULT,
"memory_injection": ContentType.MEMORY_INJECTION,
}
blocks.append(ContentBlock(
block_id=b["id"],
content_type=ct_map.get(b.get("type", ""), ContentType.TOOL_RESULT),
content=b.get("content_snippet", ""),
token_count=b.get("tokens", 0),
turn_created=b.get("turn", 0)))
except Exception:
pass
return blocks
def _resolve_component(self, name: str):
"""Resolve a BudgetComponent by string name (bridge between subsystems)."""
try:
from token_budget_manager import BudgetComponent
return BudgetComponent(name.upper())
except Exception:
return None
def _get_llm_call(self) -> Callable:
"""Placeholder — in production, injected via dependency injection."""
def mock_llm(system: str, user: str) -> str:
return "[Compacted summary of conversation]"
return mock_llm
# ═══════════════════════════════════════════════════════════════
# Public API
# ═══════════════════════════════════════════════════════════════
def get_audit_trail(self) -> list:
"""Return complete audit log of all context management actions."""
return self.audit_log
def get_status_report(self) -> dict:
"""Aggregate status from all subsystems for dashboard display."""
return {
"window_status": self.status.value,
"turn": self.turn_count,
"windows_created": self.windows_created,
"utilization": (self.pressure_monitor.utilization()
if self.pressure_monitor else 0.0),
"budget_report": (self.budget_manager.report()
if self.budget_manager else {}),
"audit_events": len(self.audit_log),
}
# ═══════════════════════════════════════════════════════════════════
# Configuration (YAML)
# ═══════════════════════════════════════════════════════════════════
# context_window_config.yaml
"""
max_context_tokens: 128000
pressure:
thresholds:
yellow: 0.50
orange: 0.75
red: 0.90
velocity_window: 5
eviction:
policy: hybrid
weights:
recency: 0.30
priority: 0.25
type_ttl: 0.20
reference_freq: 0.15
semantic_duplication: 0.10
type_ttl:
tool_result: 8
thinking: 2
memory_injection: 20
protected_types: [system_prompt]
compression:
summarization_threshold_tokens: 500
compaction_utilization_threshold: 0.85
cascade:
0.75: tool_result_summarization
0.85: progressive
0.92: conversation_compaction
0.95: sub_agent
notes_file: agent_notes.md
budget:
allocations:
system_prompt: 0.06
tool_definitions: 0.04
message_history: 0.65
tool_results: 0.12
memory_injection: 0.08
output_reserved: 0.05
soft_limit_pct: 0.80
cross_window:
state_file: agent_state.json
progress_file: agent_progress.md
auto_save_threshold: 0.85
health_monitoring:
metrics:
- utilization
- eviction_rate
- compression_ratio
- burn_rate
- fidelity_score
- tool_result_bloat
alerts:
utilization_critical: 0.90
eviction_rate_warning: 5
fidelity_warning: 0.75
compression_ratio_min: 2.0
tool_bloat_warning: 0.30
"""
The ContextWindowManager is designed as a conductor, not a monolith. Each subsystem is instantiated independently and injected — you can swap the eviction policy, replace the compression engine with a server-side implementation, or omit the health monitor entirely without touching the orchestration logic. The four hook methods (on_turn_start, on_turn_end, on_tool_result, on_overflow) form the integration contract with the agent loop. Everything else is internal decision routing.
The YAML configuration consolidates every tunable parameter — pressure thresholds, eviction weights, budget allocations, compression cascade triggers, health alert thresholds — into a single file. For different task profiles (code review vs research vs customer support), you maintain different config files. The manager loads its config at startup; runtime tuning is possible through the public API (e.g., manager.eviction_engine.set_policy("fifo")).
Every management decision — eviction, compression, budget violation, cross-window save — is recorded in the audit_log. This forms an immutable evidence chain for debugging and compliance. For the audit infrastructure that stores these events durably, see Agent Audit Log Design.
FAQ
How does this article relate to agent-memory-design?
Agent Memory System Design is the warehouse architect — it defines the L0-L3 four-layer memory architecture, what each layer stores, how retrieval boundaries work, and how memory is scoped and hygienic. This article is the warehouse operator — it manages L0 (the context window): how to compress it, evict from it, budget its tokens, and resume work across window boundaries. agent-memory-design says "L0 should contain X, Y, Z." This article says "and when L0 is full, here's how to make room." Read memory-design first for the architecture; then read this for the operational playbook.
When should I compress vs evict vs delegate to a sub-agent?
These three are not alternatives — they form a cascade ordered by cost and information preservation:
- Evict first (cheapest, safest): remove tool results older than 8 turns, thinking blocks from completed cycles, stale memory injections. Cost: zero LLM calls. Risk: information is deleted, but it was low-priority by design.
- Compress next (medium cost): if eviction isn't enough, compress. Start with tool-result summarization (cheap), escalate to progressive summarization, and only use full conversation compaction at 90%+ utilization. Cost: 1--N LLM calls. Risk: some fidelity loss — monitor with health metrics.
- Delegate last (most expensive, most powerful): when a subtask is large, independent, and would consume 50K+ tokens of exploration, spawn a sub-agent with a clean context. The sub-agent returns a 1K--2K summary. Cost: sub-agent's full LLM costs. Risk: the sub-agent's summary may miss nuance — validate with fidelity evaluation.
The CompressionEngine.execute_cascade() method in Section 4 implements this ordering automatically. The key heuristic: if you can evict, don't compress. If you can compress locally, don't delegate.
How do I measure whether compression lost critical information?
Use compression fidelity evaluation — the single most important health metric (Section 7):
- Build a test suite: For your agent's task domain, create 10--20 questions that probe critical information — task goals, architectural decisions, unresolved issues, current progress, key learnings. These are your fidelity test cases.
- Run before/after queries: Ask each question against the original (pre-compression) context and the compressed context. Use a separate LLM call for each.
- Judge consistency: A third LLM call (the "judge") compares the two answers and scores whether they convey the same factual information. Fidelity = consistent answers / total questions.
- Set a quality gate: Target fidelity > 0.85. If it drops below 0.75, your compression is too aggressive — add more preservation rules to the compaction prompt.
This evaluation should run in CI as a regression test every time you modify the compaction prompt. A higher compression ratio is only an improvement if fidelity doesn't drop. The ContextHealthMonitor.evaluate_fidelity() method provides the implementation. See Agent Evaluation Framework for the full LLM-as-Judge methodology.
FIFO, LRU, or priority eviction — which one should I use?
The answer depends on your agent's task structure — there is no universal best policy:
| If your agent… | Start with… | Why |
|---|---|---|
| Does chat or Q&A (linear conversation) | FIFO | Old messages naturally lose relevance. Simple, predictable, zero overhead. |
| Runs exploration-heavy loops (code search, research) | Type-based + LRU | Dead-end explorations accumulate; LRU naturally clears them. Type-based rules handle tool results automatically. |
| Has heterogeneous content importance | Priority-based | Different content types have genuinely different importance. Tune weights per domain. |
| Is mission-critical production | Hybrid with conservative weights | Combines all signals. Start with equal weights, run 5--10 tasks, analyze eviction audit logs, then tune. |
The pragmatic approach: Start with Hybrid and conservative weights (equal contribution from all signals). Run your agent on real tasks and collect the eviction audit log. After 5--10 runs, analyze which blocks were evicted and whether any evictions caused downstream problems. Then tune the weights. Premature optimization of eviction weights is a common anti-pattern — you're tuning a system you haven't yet observed.
Can cross-window state become inconsistent? How do I verify it?
Yes, it can — and it will, if you don't guard against it. Three failure modes and their mitigations:
- Orphan state: The agent writes "plan to do X" to state but is interrupted before executing. The next window reads this as completed work. Mitigation: Schema distinguishes
planned,in_progress, anddonestatuses. On resume,planneditems are treated as unexecuted intentions. - Stale checkpoints: Multiple windows write state concurrently; an old checkpoint overwrites newer progress. Mitigation: Monotonically incrementing
window_idandstate_version. On load, reject any state with version < current version. - Environment drift: The state says "file X exists at path Y" but between windows, someone moved the file. Mitigation: The bootstrap sequence validates environment state —
git status, file existence checks, service health probes — before trusting the checkpoint.
Verification: CrossWindowStateManager.validate() performs schema-level checks (missing task goal? inconsistent progress?). Beyond schema, run resume tests: take a checkpoint from a completed task, load it into a fresh window, and verify the agent resumes at the correct subtask without re-deriving previous decisions. This should be part of your CI pipeline.
How large should the token budget be? How do I adjust by task type?
Start with these defaults for a 128K-token window and tune from observed behavior:
| Component | Default | Code Review | Research | Customer Support |
|---|---|---|---|---|
| System Prompt | 6% (7.7K) | 6% | 5% | 8% (more instructions) |
| Tool Definitions | 4% (5.1K) | 5% (more tools) | 5% | 3% (fewer tools) |
| Message History | 65% (83.2K) | 60% | 55% | 70% (conversation-heavy) |
| Tool Results | 12% (15.4K) | 18% (files, diffs) | 15% | 8% |
| Memory Injection | 8% (10.2K) | 6% | 15% (research context) | 6% |
| Output Reserve | 5% (6.4K) | 5% | 5% | 5% |
Core principles for tuning:
- Output reserve never below 4K tokens. If the LLM doesn't have room to generate a complete response, the agent loop breaks. This is a hard floor, not a guideline.
- Tool-heavy tasks need more tool-result budget. Code review agents reading files, research agents querying APIs — their tool budgets should be 15--20%. If tool results are constantly being evicted mid-task, the budget is too small.
- Conversation-heavy tasks need more message history. Customer support agents that need full conversation context for empathy and accuracy — allocate 70--75% to message history.
- Don't tune until you have data. Run your agent with defaults for 10+ real tasks. Review the
TokenBudgetManager.report()output. Identify which component is consistently hitting its soft limit. That's the component that needs a larger allocation.
Next Steps
This article is the L0 operational manual for the agent context window — part of the Agent Memory and Context Engineering series. Here's where to go next, ordered by dependency:
- Agent Memory System Design — The architectural foundation. Defines the L0-L3 four-layer memory architecture that this article's context manager plugs into. Understand what each layer stores before you manage how L0 is operated. (Read first if you haven't.)
- Agent Context Protocol Design — The data-flow protocol that governs how content enters the context window. Its envelope typing system directly feeds the eviction engine's type-based scoring and the budget manager's component tracking.
- Agent Observability — The metrics pipeline that consumes the health snapshots, zone transitions, velocity spikes, and budget reports emitted by the ContextWindowManager. Build dashboards, configure alerts, and track long-term trends.
- Agent Tool Design — Reduce context pressure at the source. Design tools that return concise, high-signal output — minimizing the need for tool-result summarization and eviction. Token-efficient tools are the best context management strategy.
- Multi-Agent Orchestration — The sub-agent delegation pattern (Section 4, Strategy 5) as a full orchestration framework. Spawn, manage, and collect results from sub-agents with isolated context windows.
- Agent Error Recovery — Context overflow as a recoverable error. The error recovery system detects overflows, triggers cross-window state serialization, and manages the bootstrap handoff. Runaway loop detection integrates with the pressure monitor's velocity spike alerts.
- Agent Audit Log Design — Every eviction, compression, budget violation, and cross-window save is an auditable event. The audit log infrastructure stores these events immutably for debugging, compliance, and policy tuning.
- Agent Evaluation Framework — The LLM-as-Judge methodology that powers compression fidelity evaluation. Build test suites, run automated evaluations in CI, and gate compression-prompt changes on fidelity scores.
End of Article — 8 sections, 6 FAQ items, 8 next-step links