Agent Context Window Management: Compressing, Preserving, and Evicting Task State

⚡ 30-Second Takeaway

  • Core Problem: Agents running 50+ tool calls or multi-hour sessions inevitably hit context window overflow. Even with 128K-token windows, context rot causes accuracy to degrade as attention dilutes across accumulated tokens. A bigger model is not the answer -- structural context management is.
  • The Solution: A complete context window lifecycle -- detect pressure (when to intervene) → evict (what to remove) → compress (shrink without losing state) → delegate (isolate to sub-contexts). Every stage has decision criteria, trade-off analysis, and code implementation.
  • Key Implementation: ContextWindowManager orchestrating 6 subsystems: pressure monitoring, 6 eviction policies, 5 compression strategies, token budgeting, cross-window state persistence, and health monitoring. Full Python implementation with pluggable strategy patterns.
  • What You'll Walk Away With: A production-grade context management system for your agent -- when to compress vs evict vs delegate to sub-agents, how to budget tokens across components, how to resume work cleanly after context reset, and how to measure whether your compression is losing critical state.

1. Why Context Window Management Is a First-Class Engineering Problem

A code-review agent embarks on a large refactoring task: "Migrate all REST API calls in user-service from axios to fetch, preserving error-handling logic." The agent starts methodically -- step 1 searches for axios references, step 5 analyzes error-handling patterns, step 15 begins file-by-file replacement, step 30 runs tests and finds breakages. By step 40, something has gone wrong: the agent keeps modifying the same file without advancing, has forgotten dependency analyses completed 25 steps earlier, and at step 50 throws a model_context_window_exceeded error.

This is not a model capability problem. It is the result of treating the context window as an infinite, append-only log. The axios search results from step 1, the error-pattern analysis from step 5, the per-file dependency map from step 15 -- every piece of output has been faithfully appended to the context window. A 128K window looks capacious at first, but after 40 turns of accumulation -- system prompt, tool definitions, conversation history, tool call results -- it is full. The agent begins losing critical context at step 41 and hits the wall at step 50.

Five Failure Modes of Unmanaged Context

In production, treating the context window as a passive message queue triggers five distinct failure classes:

  1. Context Rot (attention dilution): Transformer attention scales as O(n²) -- every token attends to every other token. As context grows, the attention budget per token shrinks. Chroma's research on 18 models confirmed this empirically: even models claiming 128K+ context windows show significant accuracy degradation past 64K tokens. GPT-4 begins hallucinating confident but incorrect inferences; Claude tends to abstain when uncertain. This is not a bug -- it is the physical limit of soft attention. More context means less attention per piece of information.
  2. Token cost inflation: Every turn sends the full accumulated history to the LLM. Turn 1 costs $0.01. Turn 50 costs $0.80. The cost curve is superlinear -- roughly 80% of tokens in a long-running agent task are spent on content the agent no longer needs to see. For teams running agents at scale, unmanaged context is not just a correctness problem -- it is a cost-control problem.
  3. Overflow crashes: The most visible failure. At step N the window hits its hard limit, the API returns an error, and the task terminates. All work from steps 1 through N-1 is lost because there is no cross-window state persistence. The agent must restart from zero, with zero memory of what it already did.
  4. Zombie information: Old tool results, resolved discussions, abandoned exploration paths -- these are never automatically removed. They sit in context forever, consuming both attention budget and token budget. Worse: the LLM can be misled by stale information. "But you decided to use axios back in step 3" -- a decision that was overturned 30 steps ago but whose text still lives in the history.
  5. Session amnesia: The agent is forcibly restarted after overflow, landing in a fresh context window with zero continuity. It must re-search for axios references, re-analyze error-handling patterns, re-build dependency maps -- re-deriving knowledge it already produced. If the original task required those 50+ steps to reach the halfway point, the agent is now in an infinite restart loop.
📌 Key Insight: The context window is the agent's working memory -- finite, expensive, and requiring active management. Treating it as an infinite message queue is a fundamental misunderstanding of how LLMs operate. Switching to a larger model (128K → 200K) only delays the wall by a few hours -- it does nothing to solve attention dilution or cost inflation. The right response to context pressure is not "buy a bigger window" -- it is "manage the window you have."

Three Axes of Context Management: Compress, Evict, Delegate

The context window management system proposed in this article operates along three axes:


  Axis 1 -- Compress: Shrink context without losing critical state
      Strategies: conversation compaction / structured note-taking /
      tool result summarization / progressive summarization /
      sub-agent delegation

  Axis 2 -- Evict: Remove what is no longer useful
      Policies: FIFO / LRU / Priority-based / Semantic similarity merge /
      Type-based / Hybrid (weighted composite)

  Axis 3 -- Delegate: Move work to isolated sub-contexts
      Pattern: spawn sub-agent with clean context → focused subtask →
      condensed summary returned to main agent
  

These three axes are not alternatives -- they are complementary layers that activate at different pressure levels. The decision framework in Sections 3 and 4 provides concrete guidance on when to use each.

The ContextWindowManager: Architecture Preview


  Agent Loop → ContextWindowManager
                 |-- ContextPressureMonitor   (when to act -- Section 2)
                 |-- EvictionEngine           (what to remove -- Section 3)
                 |     |-- FIFO / LRU / Priority / Semantic / Type / Hybrid
                 |-- CompressionEngine        (how to shrink -- Section 4)
                 |     |-- Compaction / Note-Taking / ToolResultSummarization
                 |     |-- Progressive / SubAgent
                 |-- TokenBudgetManager       (track, allocate, enforce -- Section 5)
                 |-- CrossWindowStateManager  (serialize, resume, verify -- Section 6)
                 |-- ContextHealthMonitor     (metrics, fidelity, alerts -- Section 7)
  

This article is the operational manual for L0 (the context window). The companion article Agent Memory System Design defines the full L0-L3 memory architecture -- what each layer stores, how retrieval boundaries work, how memory is scoped and hygienic. If agent-memory-design is the warehouse architect, this article is the warehouse operator managing shelf space. Both are necessary; they address complementary halves of the memory+context problem.

For the broader context protocol that governs how data flows into the context window in the first place, see Agent Context Protocol Design. For the observability infrastructure that consumes the health metrics emitted by the ContextWindowManager, see Agent Observability.

2. Understanding Context Pressure: When to Intervene

The first question in context management is not "what should I do?" -- it is "do I need to do anything right now?" Intervening too early wastes tokens on unnecessary compression. Intervening too late means the overflow has already happened, the error has already fired, and the recovery cost is much higher. The goal of context pressure monitoring is to detect the optimal intervention window -- early enough to act safely, late enough to avoid unnecessary work.

The Token Utilization Curve

Every agent loop produces a characteristic token utilization curve. Understanding its shape is the first step to predicting when intervention will be needed. The curve is typically linear in slope but can spike sharply when a tool returns unexpectedly large output (e.g., a web search returning a 20K-token page). Pressure monitoring must therefore track two signals: absolute level (current token count / max window) and velocity (tokens added per turn). Velocity tells you whether you have 10 turns or 2 turns before hitting the red zone.

Four Pressure Zones

ZoneUtilizationActionWhy This Threshold?
🟢 Green 0--50% No intervention needed. Full history is available and attention dilution is minimal. Below 50%, the opportunity cost of compression (losing detail) almost always exceeds the benefit (freeing space).
🟡 Yellow 50--75% Prepare eviction/compression strategy. No action yet, but evaluate: which blocks are eviction candidates? What is the burn rate? This is the strategic planning zone. At 50%, you have time to make good decisions. At 90%, you have time only to panic.
🟠 Orange 75--90% Execute eviction. Remove low-priority content. If eviction is insufficient, prepare compaction. At 75%, attention dilution becomes measurable. Eviction restores attention quality while preserving recent context.
🔴 Red 90--100% Force evict or compact immediately. The next LLM call risks overflow. Cross-window state should be serialized as a safety net. At 90%, you are one large tool output away from a crash. Delay is not an option.

The zone thresholds (50%, 75%, 90%) are defaults that work well for 128K-token windows. For smaller windows (8K--32K), tighten the thresholds -- the absolute token budget is smaller, so you have less margin for error. For very large windows (200K+), you can loosen the yellow/orange thresholds but must keep the red zone tight -- attention dilution is a function of both absolute token count and information density, and long contexts amplify rot even within "safe" zones.

Velocity Tracking: The Pressure Speed Problem

Absolute utilization tells you where you are. Velocity tells you how fast you're getting to the red zone. Not all agent loops add tokens at the same rate:

Velocity is computed as a rolling average over the last N turns (default N=5). A sudden velocity spike -- e.g., a tool returning 15K tokens when the average is 2K -- should trigger an immediate pressure re-evaluation, regardless of the current zone.

Context Awareness: Leveraging Model-Reported Token Budget

Some modern models (Claude Sonnet 4.5+) report their remaining token budget through system warnings. This is a powerful signal -- it is the ground truth of what the model actually sees, accounting for internal overhead that client-side token counting may miss. The ContextPressureMonitor should consume this signal when available, falling back to client-side estimation when not.

Trade-off: Relying solely on model-reported budgets creates vendor lock-in -- not all providers expose this. Relying solely on client-side counting risks underestimating usage. The recommended approach: prefer model-reported budget when available, validate against client-side count as a cross-check, and fall back to client-side when the model doesn't report.

Code: ContextPressureMonitor

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
from collections import deque

class PressureZone(Enum):
    GREEN = "green"      # 0-50%: no action
    YELLOW = "yellow"    # 50-75%: prepare strategy
    ORANGE = "orange"    # 75-90%: execute eviction
    RED = "red"          # 90-100%: force compact

@dataclass
class PressureReading:
    """A single pressure measurement snapshot"""
    current_tokens: int
    max_tokens: int
    utilization_pct: float
    zone: PressureZone
    velocity_tokens_per_turn: float
    turns_until_red: Optional[float]
    source: str  # "model_reported" or "client_estimated"

@dataclass
class ContextPressureMonitor:
    """Monitors context window pressure with zone detection and velocity tracking.

    Call .measure() before each LLM invocation in the agent loop.
    The returned PressureReading drives decisions in the EvictionEngine
    and CompressionEngine."""

    max_context_tokens: int = 128_000
    velocity_window_turns: int = 5

    # Zone thresholds as fraction of max context
    green_threshold: float = 0.50   # below this = green
    yellow_threshold: float = 0.75  # below this = yellow
    orange_threshold: float = 0.90  # below this = orange; above = red

    # Velocity spike detection: flag when velocity exceeds baseline * N
    spike_multiplier: float = 3.0

    # Callbacks for integration with observability pipeline
    on_zone_change: Optional[Callable] = None
    on_velocity_spike: Optional[Callable] = None

    # Internal tracking state
    _token_history: deque = field(default_factory=deque)
    _current_zone: PressureZone = PressureZone.GREEN
    _baseline_velocity: float = 0.0
    _turn_count: int = 0

    def measure(self, current_tokens: int,
                model_reported_tokens: Optional[int] = None) -> PressureReading:
        """Take a pressure reading before the next LLM call.

        Prefer model-reported token count when available (ground truth);
        fall back to client-side estimate otherwise."""
        effective_tokens = (model_reported_tokens
                           if model_reported_tokens is not None
                           else current_tokens)

        utilization = effective_tokens / self.max_context_tokens
        zone = self._classify_zone(utilization)
        velocity = self._compute_velocity(effective_tokens)
        turns_until_red = self._estimate_turns_to_red(effective_tokens, velocity)

        source = ("model_reported" if model_reported_tokens is not None
                  else "client_estimated")

        reading = PressureReading(
            current_tokens=effective_tokens,
            max_tokens=self.max_context_tokens,
            utilization_pct=round(utilization * 100, 1),
            zone=zone,
            velocity_tokens_per_turn=round(velocity, 1),
            turns_until_red=turns_until_red,
            source=source,
        )

        # Detect and emit events
        self._detect_events(reading, velocity)

        self._token_history.append(effective_tokens)
        self._turn_count += 1
        self._current_zone = zone

        return reading

    def _classify_zone(self, utilization: float) -> PressureZone:
        if utilization >= self.orange_threshold:
            return PressureZone.RED
        elif utilization >= self.yellow_threshold:
            return PressureZone.ORANGE
        elif utilization >= self.green_threshold:
            return PressureZone.YELLOW
        return PressureZone.GREEN

    def _compute_velocity(self, current_tokens: int) -> float:
        """Rolling average of tokens added per turn over recent window."""
        if len(self._token_history) < 2:
            return 0.0
        recent = list(self._token_history)[-self.velocity_window_turns:]
        if len(recent) < 2:
            return 0.0
        deltas = [recent[i+1] - recent[i] for i in range(len(recent)-1)]
        deltas.append(current_tokens - recent[-1])
        return sum(deltas) / len(deltas)

    def _estimate_turns_to_red(self, tokens: int,
                                velocity: float) -> Optional[float]:
        """Estimate how many turns until context hits the red zone."""
        if velocity <= 0:
            return None
        red_tokens = int(self.max_context_tokens * self.orange_threshold)
        remaining = red_tokens - tokens
        return max(0.0, remaining / velocity) if remaining > 0 else 0.0

    def _detect_events(self, reading: PressureReading, velocity: float):
        """Fire callbacks on zone transitions and velocity spikes."""
        if reading.zone != self._current_zone and self.on_zone_change:
            self.on_zone_change(self._current_zone, reading.zone, reading)

        if (self._baseline_velocity > 0
                and velocity > self._baseline_velocity * self.spike_multiplier
                and self.on_velocity_spike):
            self.on_velocity_spike(velocity, self._baseline_velocity, reading)

        # Exponential moving average for baseline velocity
        alpha = 0.3
        self._baseline_velocity = (
            alpha * velocity + (1 - alpha) * self._baseline_velocity
            if self._baseline_velocity > 0 else velocity
        )

    def reset(self):
        """Reset monitor state for a new task."""
        self._token_history.clear()
        self._current_zone = PressureZone.GREEN
        self._baseline_velocity = 0.0
        self._turn_count = 0


# -- Usage example --
def on_zone_change(old, new, reading):
    print(f"[PRESSURE] {old.value} -> {new.value} "
          f"(utilization: {reading.utilization_pct}%)")

def on_velocity_spike(current, baseline, reading):
    print(f"[SPIKE] {current:.0f} t/turn vs baseline {baseline:.0f}")

monitor = ContextPressureMonitor(
    max_context_tokens=128_000,
    on_zone_change=on_zone_change,
    on_velocity_spike=on_velocity_spike,
)

# Simulate growing context across 10 turns
token_counts = [5000, 8000, 12000, 18000, 28000,
                45000, 68000, 90000, 105000, 118000]
for i, tokens in enumerate(token_counts):
    r = monitor.measure(tokens)
    icons = {"green": "G", "yellow": "Y", "orange": "O", "red": "R"}
    print(f"Turn {i+1}: [{icons[r.zone.value]}] {r.utilization_pct:.0f}% "
          f"| vel: {r.velocity_tokens_per_turn:.0f} t/t "
          f"| red in: {r.turns_until_red}")

The ContextPressureMonitor is designed to be called before every LLM invocation in the agent loop. Its output -- current zone, velocity, estimated turns until red -- feeds directly into the decision logic of the EvictionEngine and CompressionEngine. The zone transition and velocity spike callbacks enable integration with the observability pipeline: every zone transition is an event worth logging, and every velocity spike is a signal worth alerting on (see Agent Observability for the metrics pipeline).

📌 Design Decision -- Thresholds vs ML Prediction: Why use static thresholds instead of training a predictor? Static thresholds are explainable -- you can reason about them during incidents. They are tunable -- one YAML change adapts to a new model. They are zero-cost -- no training data, no inference latency. The cost of getting a threshold wrong (intervening one turn early or late) is small. ML prediction makes sense only when the cost of misprediction is high and the relationship between features and pressure is nonlinear -- which is not the case for token accumulation, which is fundamentally linear.

3. Eviction Policies: What to Remove When Context Is Full

When the pressure monitor reports orange or red, the first and cheapest intervention is eviction -- removing content that is no longer contributing to the agent's current task. Eviction is preferred over compression because it costs nothing (no LLM calls) and is reversible in principle (the evicted content may still exist in a lower memory layer). But the decision of what to evict is where the engineering challenge lies -- a wrong eviction choice silently degrades the agent's reasoning quality.

The Eviction Decision Space

Before designing eviction policies, you must understand what types of content exist in the context window. Each type has a different lifecycle, importance characteristics, and safe-eviction rules:

Content Type~Token ShareLifecycleSafe to Evict?
System prompt5--15%Static❌ Never. Partial eviction breaks agent behavior.
Tool definitions3--8%Static⚠ Only if tool is no longer needed.
User messages2--5%Per-turn⚠ Older ones evictable; original task goal never.
Assistant responses15--35%Per-turn✅ Safe if covered by compaction summary.
Tool call results30--60%Single-use✅ Primary eviction target. Most consumed once.
Memory injections5--20%Per-session⚠ Evict stale; keep high-relevance recent.

The key insight: tool call results are the dominant token consumer in agentic workloads (30--60% of context) and are the safest to evict. A web search result from 15 turns ago, a directory listing from the initial exploration phase, a verbose error log from a resolved issue -- these are dead weight. Evicting them costs nothing and frees substantial space.

Six Eviction Policies: A Comparative Analysis

Policy 1 -- FIFO (First-In-First-Out)

Strategy: Remove the oldest messages first. Maintain a rolling window of the last N turns.

Policy 2 -- LRU (Least Recently Used)

Strategy: Track which content blocks are referenced in subsequent reasoning. Evict blocks that haven't been referenced for the longest time.

Policy 3 -- Priority-Based Eviction

Strategy: Assign an importance score to each content block. Evict lowest-scoring blocks first. Scoring dimensions:

Policy 4 -- Semantic Similarity Merge

Strategy: When two content blocks have high semantic similarity (cosine > 0.85), merge them into a single summarized block rather than evicting either.

Policy 5 -- Type-Based Eviction

Strategy: Apply different eviction rules per content type based on their natural lifecycle:

Policy 6 -- Hybrid (Weighted Composite)

Strategy: Combine multiple policies into a single scoring function with configurable weights:

eviction_priority = (
    0.30 * recency + 0.25 * priority
    + 0.20 * type_ttl + 0.15 * reference_freq
    + 0.10 * semantic_duplication
)

What Must NEVER Be Evicted

  1. System prompt: Partial eviction creates undefined agent behavior.
  2. Current turn's user message: The agent must always see what it's being asked to do right now.
  3. Active tool call results: Evicting mid-cycle breaks the reasoning chain.
  4. Explicit preservation markers: Content flagged DO_NOT_EVICT by the agent or system.

Eviction Safety: Placeholder Messages

When a tool result is evicted, leave a placeholder so the agent knows what was removed:

  [Evicted: tool_result from turn 7 (search_files "axios").
   Finding: 23 files reference axios in user-service/src/.
   Full result in session memory if needed.]

This costs ~30 tokens vs the original's ~3,000 -- a 100x compression -- while preserving the agent's awareness of past findings.

Decision Framework: Choosing an Eviction Policy

If your agent...Start with...Then consider...
Does simple Q&A or chatFIFONot needed
Runs long exploration-heavy loopsType-basedLRU if dead-ends accumulate
Has heterogeneous content importancePriority-basedHybrid after tuning weights
Receives repeated similar outputsType-based + Semantic MergeAdd priority scoring
Is mission-critical productionHybrid (conservative weights)Tune based on eviction audit logs

Code: EvictionEngine

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
import math

class ContentType(Enum):
    SYSTEM_PROMPT = "system_prompt"
    TOOL_DEFINITION = "tool_definition"
    USER_MESSAGE = "user_message"
    ASSISTANT_RESPONSE = "assistant_response"
    TOOL_RESULT = "tool_result"
    MEMORY_INJECTION = "memory_injection"

class EvictionPolicy(Enum):
    FIFO = "fifo"
    LRU = "lru"
    PRIORITY = "priority"
    SEMANTIC_MERGE = "semantic_merge"
    TYPE_BASED = "type_based"
    HYBRID = "hybrid"

@dataclass
class ContentBlock:
    """A single block of content in the context window"""
    block_id: str
    content_type: ContentType
    content: str
    token_count: int
    turn_created: int
    last_referenced_turn: int = 0
    priority_score: float = 0.5
    embedding: Optional[list] = None
    do_not_evict: bool = False
    metadata: dict = field(default_factory=dict)


# ---- Pluggable Scoring Strategies ----

class EvictionScorer(ABC):
    """Abstract scorer: lower score = higher eviction priority."""

    @abstractmethod
    def score(self, block: ContentBlock, context: dict) -> float:
        """Return 0.0 (evict first) to 1.0 (keep)."""
        ...

class FIFOScorer(EvictionScorer):
    """Oldest blocks score lowest -> evicted first."""

    def score(self, block: ContentBlock, context: dict) -> float:
        current_turn = context.get("current_turn", 0)
        age = current_turn - block.turn_created
        half_life = context.get("fifo_half_life", 10)
        return math.exp(-age / max(half_life, 1))

class LRUScorer(EvictionScorer):
    """Least-recently-referenced blocks score lowest."""

    def score(self, block: ContentBlock, context: dict) -> float:
        current_turn = context.get("current_turn", 0)
        idle = current_turn - block.last_referenced_turn
        if block.last_referenced_turn == 0:
            return 0.3  # cold start: moderate suspicion
        half_life = context.get("lru_half_life", 5)
        return 1.0 - math.exp(-idle / max(half_life, 1))

class PriorityScorer(EvictionScorer):
    """Weighted multi-signal priority scoring."""

    def score(self, block: ContentBlock, context: dict) -> float:
        current_turn = context.get("current_turn", 0)
        w = context.get("priority_weights", {
            "recency": 0.30, "semantic": 0.25, "source": 0.20,
            "reuse": 0.15, "explicit": 0.10,
        })

        # Recency: exponential decay
        age = current_turn - block.turn_created
        recency = math.exp(-age / max(context.get("recency_half_life", 15), 1))

        # Source type prestige
        type_scores = {
            ContentType.USER_MESSAGE: 1.0,
            ContentType.TOOL_DEFINITION: 0.9,
            ContentType.MEMORY_INJECTION: 0.7,
            ContentType.ASSISTANT_RESPONSE: 0.5,
            ContentType.TOOL_RESULT: 0.3,
            ContentType.SYSTEM_PROMPT: 1.0,
        }
        source_score = type_scores.get(block.content_type, 0.5)

        # Reuse frequency
        reuse = min(block.metadata.get("reference_count", 0) / 5.0, 1.0)

        # Explicit DO_NOT_EVICT flag
        explicit = 1.0 if block.do_not_evict else 0.0

        # Semantic relevance (requires embedding)
        semantic = 0.5
        if block.embedding and context.get("task_embedding"):
            task_emb = context["task_embedding"]
            dot = sum(x * y for x, y in zip(block.embedding, task_emb))
            n_a = math.sqrt(sum(x*x for x in block.embedding))
            n_b = math.sqrt(sum(y*y for y in task_emb))
            semantic = max(0.0, dot / max(n_a * n_b, 1e-10))

        return (w["recency"] * recency + w["semantic"] * semantic
                + w["source"] * source_score + w["reuse"] * reuse
                + w["explicit"] * explicit)

class TypeBasedScorer(EvictionScorer):
    """Per-type TTL: blocks past their type's max age score zero."""

    def score(self, block: ContentBlock, context: dict) -> float:
        current_turn = context.get("current_turn", 0)
        ttl = context.get("type_ttl", {
            ContentType.TOOL_RESULT: 8,
            ContentType.MEMORY_INJECTION: 20,
            ContentType.ASSISTANT_RESPONSE: 30,
        })
        max_age = ttl.get(block.content_type, 50)
        age = current_turn - block.turn_created
        if age > max_age:
            return 0.0
        return 1.0 - (age / max(max_age, 1))

class HybridScorer(EvictionScorer):
    """Weighted composite of all scoring strategies."""

    def __init__(self):
        self._fifo = FIFOScorer()
        self._lru = LRUScorer()
        self._priority = PriorityScorer()
        self._type = TypeBasedScorer()

    def score(self, block: ContentBlock, context: dict) -> float:
        w = context.get("hybrid_weights", {
            "fifo": 0.15, "lru": 0.15,
            "priority": 0.40, "type": 0.30,
        })
        return (w["fifo"] * self._fifo.score(block, context)
                + w["lru"] * self._lru.score(block, context)
                + w["priority"] * self._priority.score(block, context)
                + w["type"] * self._type.score(block, context))


# ---- Eviction Engine ----

@dataclass
class EvictionResult:
    blocks_evicted: list
    tokens_freed: int
    remaining_tokens: int
    policy_used: EvictionPolicy
    placeholders_generated: int

@dataclass
class EvictionEngine:
    """Pluggable eviction engine with safe-eviction guardrails.

    Usage:
        engine = EvictionEngine(policy=EvictionPolicy.HYBRID)
        result = engine.evict(blocks, target_free_tokens=5000, current_turn=12)
    """

    scorer: EvictionScorer = field(default_factory=HybridScorer)
    policy: EvictionPolicy = EvictionPolicy.HYBRID
    NEVER_EVICT: tuple = (ContentType.SYSTEM_PROMPT,)
    generate_placeholders: bool = True
    placeholder_max_tokens: int = 50

    def evict(self, blocks: list, target_free_tokens: int,
              current_turn: int = 0, context: dict = None) -> EvictionResult:
        """Evict blocks to free at least target_free_tokens."""
        if context is None:
            context = {}
        context["current_turn"] = current_turn

        # Separate protected and evictable
        protected, evictable = [], []
        for b in blocks:
            if b.content_type in self.NEVER_EVICT or b.do_not_evict:
                protected.append(b)
            else:
                evictable.append(b)

        # Score and sort (lowest first = evict first)
        scored = [(self.scorer.score(b, context), b) for b in evictable]
        scored.sort(key=lambda x: x[0])

        # Evict until target met
        freed, evicted = 0, []
        for score, block in scored:
            if freed >= target_free_tokens:
                break
            evicted.append(block)
            freed += block.token_count

        remaining = sum(b.token_count for b in protected) + sum(
            b.token_count for b in evictable if b not in evicted)

        return EvictionResult(
            blocks_evicted=evicted,
            tokens_freed=freed,
            remaining_tokens=remaining,
            policy_used=self.policy,
            placeholders_generated=0,
        )

    def set_policy(self, policy: EvictionPolicy):
        """Switch eviction policy at runtime."""
        self.policy = policy
        scorers = {
            EvictionPolicy.FIFO: FIFOScorer,
            EvictionPolicy.LRU: LRUScorer,
            EvictionPolicy.PRIORITY: PriorityScorer,
            EvictionPolicy.TYPE_BASED: TypeBasedScorer,
            EvictionPolicy.HYBRID: HybridScorer,
        }
        if policy in scorers:
            self.scorer = scorers[policy]()


# -- Usage: hybrid eviction freeing 500 tokens --
engine = EvictionEngine(policy=EvictionPolicy.HYBRID)
blocks = [
    ContentBlock("b1", ContentType.USER_MESSAGE,
                 "Migrate all axios calls to fetch", 12, 1, do_not_evict=True),
    ContentBlock("b2", ContentType.TOOL_RESULT,
                 "Found 23 axios refs in user-service/src/...", 450, 2),
    ContentBlock("b3", ContentType.TOOL_RESULT,
                 "Error handling: try/catch in 18 files, config in 5...", 2800, 5),
    ContentBlock("b4", ContentType.TOOL_RESULT,
                 "LS output (irrelevant): total 48 files...", 120, 6),
]

result = engine.evict(blocks, target_free_tokens=500,
                      current_turn=8,
                      context={"hybrid_weights": {
                          "fifo": 0.15, "lru": 0.15,
                          "priority": 0.40, "type": 0.30}})
print(f"Evicted {len(result.blocks_evicted)} blocks, freed {result.tokens_freed} tokens")
for b in result.blocks_evicted:
    print(f"  -> {b.block_id} ({b.content_type.value})")

The eviction engine is designed to be called from the agent loop whenever the ContextPressureMonitor reports orange or red. The pluggable scorer architecture means you can start with HybridScorer and conservative weights, observe eviction decisions over multiple runs, and progressively tune. For the design of token-efficient tools that minimize the need for eviction, see Agent Tool Design.

4. Compression Strategies: Making Context Smaller Without Losing State

Eviction removes content entirely. Compression transforms it -- preserving information in a reduced form. When eviction alone cannot free enough tokens (or when the content slated for eviction contains information the agent still needs), compression is the next intervention in the cascade.

The fundamental distinction: eviction trades completeness for space; compression trades fidelity for space. Eviction says "this information is not needed." Compression says "this information is needed, but not at full resolution." The engineering challenge is maximizing the fidelity-to-token ratio -- preserving as much decision-critical information as possible per token of compressed output.

Five Compression Strategies

Strategy 1 -- Conversation Compaction

How it works: Send the conversation history to an LLM with a compaction prompt that produces a structured summary capturing: architectural decisions, unresolved issues, implementation state, next steps, key learnings. Replace the original conversation with the summary + the most recent N messages.

The compaction prompt is the most critical artifact. A poorly designed prompt produces summaries that omit the one detail that matters 20 turns later:

  Compaction prompt structure:
  1. PRESERVE (mandatory):
     - All architectural decisions and rationale
     - All unresolved bugs, errors, blockers -- include exact messages
     - Current task progress: completed, current, remaining steps
     - All user constraints and preferences (verbatim if short)
     - Key file paths, function names, data structures
     - Numbers: counts, measurements, config values, versions
  2. DROP:
     - Verbose tool output where the key finding is captured in 1-2 lines
     - Redundant confirmations and status checks
     - Dead-end explorations yielding no useful information
     - Boilerplate error messages (keep type + key detail)
     - Intermediate reasoning superseded by a final decision
  3. FORMAT: Structured sections, bullet points. Not narrative -- a reference document.

Trade-off -- Server-side vs Client-side: Server-side (Anthropic's compaction API) is automatic and reliable but vendor-locked and black-box. Client-side (this implementation) is provider-agnostic and tunable but costs an extra LLM call and quality depends on your prompt design. Choose server-side when you're on a single provider and want zero-code integration. Choose client-side when you need provider independence, prompt transparency, or integration with custom compaction logic.

Strategy 2 -- Structured Note-Taking

How it works: The agent writes persistent notes to external storage during operation. On compression, notes -- not raw conversation -- are carried forward. This is distinct from compaction: compaction summarizes post-hoc; note-taking captures information at the moment of generation, when understanding is freshest.

Patterns from Claude Code and Claude Plays Pokemon:

The "cold start" requirement: Notes must be written so a new agent session, reading only the notes, can become operational immediately. "Continuing investigation from earlier" is useless. "Investigating 503 error on /api/v2/users -- occurs at ~200 concurrent requests, suspected connection pool exhaustion, see /var/log/user-service/error.log" provides everything the new session needs.

Trade-off: Note-taking costs tokens during operation (every note is an LLM output). But those tokens are an investment -- they reduce later compression costs and improve context resumption quality. For long tasks (>20 turns), the investment pays off. For short tasks (<10 turns), it is overhead without benefit.

Strategy 3 -- Tool Result Summarization

How it works: When a tool returns large output, immediately summarize it into key findings. The original output is discarded; only the summary stays in context. Apply when tool_output_tokens > summarization_threshold (default: 500 tokens). Below this threshold, the compression benefit is marginal; above it, summarization can achieve 5--20x compression ratios.

Pattern from Claude Code: Instead of loading entire files, Claude Code uses grep for pattern matching, head/tail for snippets. This is tool-result summarization at the tool-design level. See Agent Tool Design for designing tools that produce token-efficient output by default.

Strategy 4 -- Progressive Summarization

How it works: Summarize at increasing compression ratios as information ages -- mirroring how human memory works:

  L0 (current turn):      Full text
  L1 (last 1-5 turns):    Key points, decisions (~30% of original)
  L2 (5-20 turns ago):    One-line summary (~5%)
  L3 (>20 turns ago):     Title/topic only (~1%)

Why this works: Information utility decays over time. The exact wording of a tool result from 25 turns ago is almost never needed -- but knowing that a tool was called and what its general finding was may still be relevant. Progressive summarization captures this natural decay curve.

Implementation challenge: The transition between compression levels requires re-summarization. A turn at L1 (key points) must be further compressed to L2 (one-line) when it ages past 5 turns. This distributes compression cost over time -- which is both a benefit (smoother cost) and a risk (accumulated drift as each level loses a bit more fidelity).

Strategy 5 -- Sub-Agent Delegation (Context Isolation)

How it works: Spawn a sub-agent with a clean context window for a focused subtask. The sub-agent explores, reasons, and produces a condensed summary (1K--2K tokens) returned to the main agent. The sub-agent's full context -- potentially 50K+ tokens of exploration -- is discarded after summarization.

This achieves extreme compression ratios (20--100x) by leveraging the sub-agent's intelligence to determine what matters. A sub-agent tasked with "find all authentication middleware bypasses" may search 50 files, read 30, and return a 1K-token summary of the 3 files that actually have issues. The main agent receives only the conclusion.

When to use sub-agent delegation vs local compression:

FactorFavor Local CompressionFavor Sub-Agent Delegation
Subtask independenceTightly coupled to main contextSelf-contained; specifiable in 1-2 sentences
Exploration volume<5 tool calls needed10+ tool calls needed
Context dependenceNeeds full conversation historyOnly needs task spec + minimal context
Result complexitySimple value or booleanStructured analysis requiring synthesis
Cost sensitivityBudget-consciousQuality-conscious; worth paying for sub-agent

Anti-pattern: Spawning a sub-agent and not summarizing its output. If the sub-agent returns its full 50K-token context, delegation increases context pressure. The sub-agent's result must always be a compressed summary. For orchestration patterns governing sub-agent lifecycle, see Multi-Agent Orchestration.

The Compression Cascade: Order of Operations

Compression strategies follow an escalation order minimizing cost and maximizing information preservation:


  Context pressure detected
      |
      |-- 1. Tool result summarization (cheap, point intervention)
      |      Compress individual large tool outputs >500 tokens
      |
      |-- 2. Eviction of low-priority content (zero-cost, Section 3)
      |      Remove dead weight before spending on compression
      |
      |-- 3. Progressive summarization of aging turns
      |      Compress turns >5 old to key points, >20 old to titles
      |
      |-- 4. Conversation compaction (expensive but comprehensive)
      |      Full summarization of entire history window
      |
      |-- 5. Sub-agent delegation (most expensive, most powerful)
             Spawn sub-agent for largest independent subtask
  

The cascade is ordered by cost per token freed. Tool result summarization and eviction are cheap first steps. Progressive summarization and compaction are more expensive but free more space. Sub-agent delegation is the most expensive but also the most powerful -- reserve for tasks that genuinely cannot fit in a single window.

Compression Fidelity: The Litmus Test

How do you know if your compression is working? Three dimensions (detailed in Section 7):

  1. Key fact retention: After compression, can the agent answer questions about the compressed content?
  2. State continuity: Does agent task state (current step, remaining steps, unresolved issues) match pre-compression?
  3. Decision recall: Can the agent recall architectural decisions and their rationale?
📌 Design Principle -- Fidelity vs Token Savings: Every compression strategy involves a fidelity-savings trade-off. A compaction prompt preserving every detail has a ratio of 1.0x (no savings). An overly aggressive prompt may achieve 20x but lose critical state. Start conservative (preserve more than you think needed), run fidelity evaluations, and only then increase aggressiveness.

Code: CompressionEngine

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
import time

class CompressionStrategy(Enum):
    COMPACTION = "compaction"
    NOTE_TAKING = "note_taking"
    TOOL_RESULT = "tool_result_summarization"
    PROGRESSIVE = "progressive"
    SUB_AGENT = "sub_agent"

@dataclass
class CompressionResult:
    """Result of a single compression operation."""
    strategy: CompressionStrategy
    original_tokens: int
    compressed_tokens: int
    compression_ratio: float       # original / compressed
    compressed_content: str
    fidelity_score: Optional[float] = None
    llm_calls_used: int = 0
    metadata: dict = field(default_factory=dict)


# ---- Compaction Prompt Templates ----

COMPACTION_SYSTEM = """You are a context-compaction engine. Compress conversation
history into a structured summary preserving all decision-critical information.

RULES:
1. PRESERVE (mandatory):
   - Every architectural decision with rationale and alternatives
   - Every unresolved bug, error, blocker -- include exact error messages
   - Current task progress: completed steps, step in progress, remaining
   - All user constraints and preferences (verbatim if short)
   - Key file paths, function names, class names, data structures
   - All numbers: counts, measurements, config values, versions
2. DROP:
   - Verbose tool output where key finding fits in 1-2 sentences
   - Redundant status confirmations
   - Dead-end explorations yielding no useful information
   - Boilerplate error messages (keep type + key detail)
   - Intermediate reasoning superseded by a final decision
3. OUTPUT FORMAT:
   Structured sections with clear headers. Bullet points preferred over prose.
   This is a reference document, not a narrative."""

COMPACTION_USER = """Compress the following conversation. Task goal:

{task_goal}

Conversation to compress:
{conversation}

Produce the structured summary. A new agent session with only this summary
must be able to resume the task immediately."""

TOOL_SUMMARIZE_PROMPT = """Summarize the following tool output. Extract:
1. The key finding or result (1 sentence)
2. Any numbers, paths, names, or identifiers mentioned
3. Any errors or warnings (type + key detail)
4. Any actionable information

Tool: {tool_name}
Output ({output_tokens} tokens):
{output}

Summary (aim for <20% of original length):"""


# ---- Pluggable Compression Methods ----

class CompressionMethod(ABC):
    """Abstract compression strategy."""

    @abstractmethod
    def compress(self, content: str, context: dict,
                 llm_call: Callable) -> CompressionResult:
        """llm_call is a function: (system_prompt, user_prompt) -> str"""
        ...

class CompactionMethod(CompressionMethod):
    """Full conversation compaction via LLM summarization."""

    def compress(self, content: str, context: dict,
                 llm_call: Callable) -> CompressionResult:
        task_goal = context.get("task_goal", "Unknown task")
        user = COMPACTION_USER.format(
            task_goal=task_goal, conversation=content)
        summary = llm_call(COMPACTION_SYSTEM, user)

        orig = context.get("original_tokens", len(content) // 4)
        comp = len(summary) // 4

        return CompressionResult(
            strategy=CompressionStrategy.COMPACTION,
            original_tokens=orig,
            compressed_tokens=comp,
            compression_ratio=orig / max(comp, 1),
            compressed_content=summary,
            llm_calls_used=1,
        )

class ToolResultSummarizationMethod(CompressionMethod):
    """Summarize individual large tool outputs."""

    def __init__(self, min_tokens: int = 500):
        self.min_tokens = min_tokens

    def compress(self, content: str, context: dict,
                 llm_call: Callable) -> CompressionResult:
        tool_name = context.get("tool_name", "unknown")
        output_tokens = context.get("output_tokens", len(content) // 4)

        # Below threshold: no summarization needed
        if output_tokens < self.min_tokens:
            return CompressionResult(
                strategy=CompressionStrategy.TOOL_RESULT,
                original_tokens=output_tokens,
                compressed_tokens=output_tokens,
                compression_ratio=1.0,
                compressed_content=content,
            )

        user = TOOL_SUMMARIZE_PROMPT.format(
            tool_name=tool_name, output_tokens=output_tokens, output=content)
        summary = llm_call(
            "You are a precise tool output summarizer.", user)

        comp = len(summary) // 4
        return CompressionResult(
            strategy=CompressionStrategy.TOOL_RESULT,
            original_tokens=output_tokens,
            compressed_tokens=comp,
            compression_ratio=output_tokens / max(comp, 1),
            compressed_content=summary,
            llm_calls_used=1,
        )

class ProgressiveSummarizationMethod(CompressionMethod):
    """Multi-level summarization based on content age."""

    # (max_age_turns, target_ratio, label)
    LEVELS = [
        (1,   1.0,  "L0_full"),
        (5,   0.30, "L1_key_points"),
        (20,  0.05, "L2_one_line"),
        (999, 0.01, "L3_title"),
    ]

    def compress(self, content: str, context: dict,
                 llm_call: Callable) -> CompressionResult:
        turns = context.get("turns", [])
        compressed_parts = []
        total_orig, llm_calls = 0, 0

        for turn in turns:
            age = turn.get("age", 0)
            text = turn.get("text", "")
            total_orig += len(text) // 4

            target_ratio, label = 1.0, "L0_full"
            for max_age, ratio, lbl in self.LEVELS:
                if age <= max_age:
                    target_ratio, label = ratio, lbl
                    break

            if target_ratio >= 1.0:
                compressed_parts.append(
                    f"[Turn {turn['turn']}] {text}")
            else:
                target_tokens = max(
                    int((len(text) // 4) * target_ratio), 10)
                prompt = (
                    f"Compress to ~{target_tokens} tokens ({label}). "
                    f"Preserve: decisions, errors, key findings, numbers.\n\n"
                    f"{text}")
                summary = llm_call(
                    "You are a progressive summarizer.", prompt)
                compressed_parts.append(
                    f"[Turn {turn['turn']}, {label}] {summary}")
                llm_calls += 1

        compressed = "\n\n".join(compressed_parts)
        comp_tokens = len(compressed) // 4

        return CompressionResult(
            strategy=CompressionStrategy.PROGRESSIVE,
            original_tokens=total_orig,
            compressed_tokens=comp_tokens,
            compression_ratio=total_orig / max(comp_tokens, 1),
            compressed_content=compressed,
            llm_calls_used=llm_calls,
        )

class NoteTakingMethod(CompressionMethod):
    """Structured notes written during operation; notes carried forward."""

    def __init__(self):
        self.notes: dict = {}  # category -> list of notes

    def write_note(self, category: str, note: str,
                    importance: str = "normal"):
        """Called by the agent during operation to record a note."""
        self.notes.setdefault(category, []).append({
            "content": note,
            "importance": importance,
            "timestamp": time.time(),
        })

    def get_notes_for_context(self, max_tokens: int = 2000) -> str:
        """Generate a context-ready notes summary bounded by max_tokens."""
        sections = []
        budget = max_tokens
        priority = ["task_progress", "decisions", "bugs",
                    "constraints", "learnings", "misc"]

        for category in priority:
            if category not in self.notes:
                continue
            cat_notes = sorted(
                self.notes[category],
                key=lambda n: (
                    0 if n["importance"] == "critical" else 1,
                    -n["timestamp"]))
            lines = [f"## {category.replace('_', ' ').title()}"]
            for n in cat_notes:
                line = f"- [{n['importance'].upper()}] {n['content']}"
                est = len(line) // 4
                if budget - est < 50 and lines:
                    break
                lines.append(line)
                budget -= est
            sections.append("\n".join(lines))
        return "\n\n".join(sections)

    def compress(self, content: str, context: dict,
                 llm_call: Callable) -> CompressionResult:
        max_tokens = context.get("max_notes_tokens", 2000)
        notes_text = self.get_notes_for_context(max_tokens)
        orig = context.get("original_tokens", len(content) // 4)
        comp = len(notes_text) // 4
        return CompressionResult(
            strategy=CompressionStrategy.NOTE_TAKING,
            original_tokens=orig,
            compressed_tokens=comp,
            compression_ratio=orig / max(comp, 1),
            compressed_content=notes_text,
            llm_calls_used=0,
        )


# ---- Compression Engine ----

@dataclass
class CompressionEngine:
    """Orchestrates compression strategies; executes the cascade."""

    compaction: CompactionMethod = field(default_factory=CompactionMethod)
    tool_summarizer: ToolResultSummarizationMethod = field(
        default_factory=ToolResultSummarizationMethod)
    progressive: ProgressiveSummarizationMethod = field(
        default_factory=ProgressiveSummarizationMethod)
    note_taking: NoteTakingMethod = field(default_factory=NoteTakingMethod)

    # Sub-agent dispatcher (delegates to multi-agent-orchestration)
    sub_agent_dispatcher: Optional[Callable] = None

    # Cascade trigger thresholds
    summarization_token_threshold: int = 500
    progressive_age_threshold: int = 5
    compaction_utilization_threshold: float = 0.85

    def compress_tool_result(self, tool_name: str, output: str,
                             llm_call: Callable) -> CompressionResult:
        """Strategy 3: Compress a single large tool output."""
        return self.tool_summarizer.compress(
            output,
            context={"tool_name": tool_name,
                     "output_tokens": len(output) // 4},
            llm_call=llm_call)

    def compact_conversation(self, history: str, task_goal: str,
                             token_count: int,
                             llm_call: Callable) -> CompressionResult:
        """Strategy 1: Full conversation compaction."""
        return self.compaction.compress(
            history,
            context={"task_goal": task_goal,
                     "original_tokens": token_count},
            llm_call=llm_call)

    def progressive_summarize(self, turns: list,
                               llm_call: Callable) -> CompressionResult:
        """Strategy 4: Progressive summarization by age."""
        total = sum(len(t.get("text", "")) // 4 for t in turns)
        return self.progressive.compress(
            "", context={"turns": turns, "original_tokens": total},
            llm_call=llm_call)

    def dispatch_to_sub_agent(self, task_spec: str,
                               required_context: str = "") -> CompressionResult:
        """Strategy 5: Delegate to sub-agent with clean context."""
        if not self.sub_agent_dispatcher:
            return CompressionResult(
                strategy=CompressionStrategy.SUB_AGENT,
                original_tokens=len(task_spec) // 4,
                compressed_tokens=len(task_spec) // 4,
                compression_ratio=1.0,
                compressed_content="[Sub-agent delegation not configured]",
            )
        sub = self.sub_agent_dispatcher(task_spec, required_context)
        return CompressionResult(
            strategy=CompressionStrategy.SUB_AGENT,
            original_tokens=sub.get("sub_agent_tokens_used", 0),
            compressed_tokens=sub.get("summary_tokens", 0),
            compression_ratio=sub.get("compression_ratio", 0.0),
            compressed_content=sub.get("summary", ""),
            llm_calls_used=sub.get("llm_calls", 0),
        )

    def execute_cascade(self, history: str, task_goal: str,
                        current_tokens: int, max_tokens: int,
                        llm_call: Callable,
                        turns: list = None) -> list:
        """Execute compression cascade based on pressure level.

        Returns list of CompressionResult per step executed."""
        results = []
        utilization = current_tokens / max_tokens

        # Step 2: Progressive summarization for aged turns
        if turns and len(turns) > self.progressive_age_threshold:
            results.append(self.progressive_summarize(turns, llm_call))

        # Step 3: Full compaction at high utilization
        if utilization >= self.compaction_utilization_threshold:
            results.append(self.compact_conversation(
                history, task_goal, current_tokens, llm_call))

        return results


# -- Usage examples --

def mock_llm(system: str, user: str) -> str:
    return "[Compressed: key decisions preserved, 3 unresolved issues, step 4/7]"

engine = CompressionEngine()

# 1. Tool result summarization
large = "Search results:\n" + "\n".join(
    [f"File {i}: content line {j}" for i in range(200) for j in range(2)])
r = engine.compress_tool_result("search_files", large, mock_llm)
print(f"Tool: {r.original_tokens} -> {r.compressed_tokens} tokens "
      f"({r.compression_ratio:.1f}x)")

# 2. Conversation compaction
history = "Turn 1-50: agent migration task..."
r = engine.compact_conversation(history, "Migrate axios to fetch",
                                 12000, mock_llm)
print(f"Compaction: {r.original_tokens} -> {r.compressed_tokens} tokens "
      f"({r.compression_ratio:.1f}x)")

# 3. Progressive summarization
turns = [
    {"turn": 1, "text": "User requested migration of 23 axios calls...", "age": 25},
    {"turn": 10, "text": "Began file-by-file replacement...", "age": 15},
    {"turn": 20, "text": "Currently replacing file 12/23...", "age": 5},
    {"turn": 25, "text": "Just fixed build error in api.ts...", "age": 0},
]
r = engine.progressive_summarize(turns, mock_llm)
print(f"Progressive: {r.original_tokens} -> {r.compressed_tokens} tokens "
      f"({r.compression_ratio:.1f}x, {r.llm_calls_used} calls)")

# 4. Note-taking
engine.note_taking.write_note(
    "decisions", "Use native fetch with custom error wrapper", "critical")
engine.note_taking.write_note(
    "bugs", "api.ts:42 -- type error after migration, see build log", "high")
engine.note_taking.write_note(
    "task_progress", "Completed 12/23 files, currently on api.ts", "normal")
notes = engine.note_taking.get_notes_for_context(max_tokens=500)
print(f"Notes:\n{notes}")

The CompressionEngine is the most architecturally complex component in the context management system because it must make cost-quality trade-offs at runtime. Each strategy has a different cost profile (LLM calls, tokens consumed) and fidelity profile (what information is preserved vs lost). The engine's design -- pluggable methods with a cascade execution order -- allows you to start conservative and progressively optimize as you gather fidelity metrics.

For the token-budgeting system that tracks how many tokens each compression strategy consumes and whether the cost is justified by space freed, continue to Section 5. For sub-agent orchestration patterns, see Multi-Agent Orchestration.

5. Token Budgeting: Tracking, Allocating, and Enforcing Limits

Eviction and compression answer the question: "the context is full — what do I do now?" Token budgeting answers the more fundamental question: "how much space should each component have in the first place, and am I tracking actual vs expected consumption?" Budgeting is fire prevention; eviction and compression are firefighting. Both are necessary, but a well-tuned budget reduces the frequency and severity of eviction+compression interventions.

Why Budgeting Matters: The Tragedy of the Commons

The context window is a shared resource. System prompt, tool definitions, message history, tool results, memory injections, and output reserve all compete for the same finite token pool. Without explicit budgets, the tragedy of the commons plays out predictably: tool results balloon to consume 50% of the window, message history grows unbounded, and the output reserve — the space the LLM needs to actually generate a response — shrinks below the minimum viable threshold. The agent doesn't crash because the model is bad; it crashes because no one was tracking who was consuming the window.

Trade-off — Static vs Dynamic Allocation: Static allocation (fixed percentages per component) is simple, predictable, and debuggable. Dynamic allocation (budget shifts based on task phase) can be more efficient but introduces complexity and tuning overhead. The recommendation: start with static allocation, monitor actual consumption patterns across 10+ real agent runs, and only then consider adding dynamism. Premature dynamic allocation is the leading cause of budget-tuning-death-spirals — where shifting budgets create feedback loops that make certain tasks unfinishable.

The Six Budget Components

ComponentRecommended %128K TokensStrategyOverflow Action
System Prompt5--8%~6K--10KFixed, never evicted💀 Cannot overflow — prompt must be trimmed at design time
Tool Definitions3--5%~4K--6KFixed, lazy-load optionalUse tool search/retrieval instead of full injection
Message History60--70%~77K--90KDynamic, primary eviction targetTrigger compression cascade (Section 4)
Tool Results10--15%~13K--19KCapped, auto-summarizeSummarize oldest results; evict if still over
Memory Injections5--10%~6K--13KCapped, relevance-filteredRaise relevance threshold; evict stale
Output Reserve4--8%~5K--10KAlways preservedIf <4K remaining, emergency compaction before next LLM call

Trade-off — Generous vs Conservative Output Reserve: A generous output reserve (8%+) guarantees room for complex, multi-step reasoning chains — critical for coding and planning agents. A conservative reserve (4%) maximizes space for history but risks the LLM producing truncated responses when deep reasoning is required. The risk asymmetry is clear: a truncated response breaks the agent loop; slightly less history space means one extra compression cycle. Err on the side of generous — the output reserve is your circuit breaker.

Soft Limits vs Hard Limits: Designing Graceful Degradation

Budget enforcement is not binary. A hard limit that rejects content at 100% creates brittle failure — the agent hits a wall and stops. A soft limit that warns at 80% creates graceful degradation — the agent has time to compress, evict, or reallocate before hitting the wall.


  Component budget lifecycle:
  ┌─────────────────────────────────────────────────────────────┐
  │  0% ─────────── 80% (soft) ─────── 100% (hard)             │
  │  │                 │                    │                   │
  │  │  Normal ops     │  Pre-compress      │  Force action     │
  │  │  No alerts      │  WARNING emitted   │  Block addition   │
  │  │                 │  Prepare eviction  │  Trigger cascade  │
  │  └─────────────────┴────────────────────┴───────────────────┘
  

Trade-off — Per-Component vs Global Budgeting: Per-component budgets (this design) give precise control but require tuning 6 allocation percentages. Global budgeting (one number: "keep utilization under 80%") is simpler but provides no insight into which component is the problem. The recommendation: implement per-component tracking (for observability) but enforce globally (for simplicity). If tool results are consuming 40% instead of the budgeted 12%, you want to know that — even if you're not blocking the addition. See Agent Observability for exporting these component-level metrics.

Burn Rate Prediction: From Reactive to Proactive Budgeting

The most valuable feature of a budget manager is not "how much is used now" — that's a snapshot. The most valuable feature is "how many turns until this budget is exhausted?" This transforms context management from reactive (responding to crises) to proactive (anticipating them):


  turns_remaining = (component_budget - used) / avg_tokens_per_turn_for_component
  urgency = "critical" if turns_remaining < 3 else "warning" if turns_remaining < 10 else "normal"
  

If the message history budget has 3 turns remaining but the task needs 15 more turns, the system knows to trigger compression now, not when the budget is exhausted. This is the difference between a controlled landing and a crash.

For the design of tools that minimize token consumption per result — reducing burn rate at the source — see Agent Tool Design.

Code: TokenBudgetManager

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable
from collections import deque
import math

class BudgetComponent(Enum):
    SYSTEM_PROMPT = "system_prompt"
    TOOL_DEFINITIONS = "tool_definitions"
    MESSAGE_HISTORY = "message_history"
    TOOL_RESULTS = "tool_results"
    MEMORY_INJECTION = "memory_injection"
    OUTPUT_RESERVED = "output_reserved"

@dataclass
class ComponentBudget:
    """Budget tracking for a single context-window component."""
    hard_limit_tokens: int = 0
    used: int = 0
    soft_limit_pct: float = 0.80

    @property
    def soft_limit(self) -> int:
        return int(self.hard_limit_tokens * self.soft_limit_pct)

    @property
    def utilization(self) -> float:
        return self.used / self.hard_limit_tokens if self.hard_limit_tokens else 0.0

    @property
    def is_soft_exceeded(self) -> bool:
        return self.used >= self.soft_limit

    @property
    def is_hard_exceeded(self) -> bool:
        return self.used >= self.hard_limit_tokens

    @property
    def remaining(self) -> int:
        return max(0, self.hard_limit_tokens - self.used)


@dataclass
class TokenBudgetManager:
    """Tracks token consumption per component, enforces soft/hard limits,
    predicts exhaustion timelines via burn-rate analysis.

    Integration: called before every LLM invocation (check_before_add) and
    after every turn (record_turn). Emits events for observability pipeline."""

    max_context_tokens: int = 128_000

    # Default allocation: system prompt 6%, tools 4%, history 65%,
    # tool results 12%, memory 8%, output reserve 5%
    allocations: dict = field(default_factory=lambda: {
        BudgetComponent.SYSTEM_PROMPT: 0.06,
        BudgetComponent.TOOL_DEFINITIONS: 0.04,
        BudgetComponent.MESSAGE_HISTORY: 0.65,
        BudgetComponent.TOOL_RESULTS: 0.12,
        BudgetComponent.MEMORY_INJECTION: 0.08,
        BudgetComponent.OUTPUT_RESERVED: 0.05,
    })

    budgets: dict = field(default_factory=dict)
    burn_history: deque = field(default_factory=lambda: deque(maxlen=20))
    on_soft_warning: Optional[Callable] = None
    on_hard_violation: Optional[Callable] = None

    def __post_init__(self):
        for comp, pct in self.allocations.items():
            self.budgets[comp] = ComponentBudget(
                hard_limit_tokens=int(self.max_context_tokens * pct))

    # ── Tracking ──

    def track(self, component: BudgetComponent, tokens: int):
        """Register token consumption against a component budget."""
        budget = self.budgets.get(component)
        if not budget:
            return
        budget.used += tokens

    def record_turn(self, tokens_this_turn: int):
        """Record total token consumption for this turn (for burn-rate calc)."""
        self.burn_history.append(tokens_this_turn)

    # ── Enforcement ──

    def check_before_add(self, component: BudgetComponent,
                         tokens_to_add: int) -> dict:
        """Gate check: can we add tokens_to_add to this component's budget?

        Returns {"allowed": True} or {"allowed": False, "reason": ...}.
        Soft-limit crossings emit warnings; hard-limit crossings block."""
        budget = self.budgets.get(component)
        if not budget:
            return {"allowed": True}

        projected = budget.used + tokens_to_add

        if projected >= budget.hard_limit_tokens:
            if self.on_hard_violation:
                self.on_hard_violation(component, budget, projected)
            return {
                "allowed": False,
                "reason": f"Hard limit exceeded: {component.value} "
                          f"({budget.used}/{budget.hard_limit_tokens}, "
                          f"+{tokens_to_add} would reach {projected})",
                "component": component.value,
                "current": budget.used,
                "limit": budget.hard_limit_tokens,
                "excess": projected - budget.hard_limit_tokens,
            }

        if projected >= budget.soft_limit:
            if self.on_soft_warning:
                self.on_soft_warning(component, budget, projected)
            return {
                "allowed": True,
                "warning": f"Soft limit exceeded: {component.value} "
                           f"({budget.used}/{budget.hard_limit_tokens})",
                "component": component.value,
                "utilization_after": round(projected / budget.hard_limit_tokens, 2),
            }

        return {"allowed": True}

    # ── Prediction ──

    @property
    def burn_rate(self) -> float:
        """Average tokens consumed per turn over recent history."""
        if not self.burn_history:
            return 0.0
        return sum(self.burn_history) / len(self.burn_history)

    def turns_until_exhausted(self) -> Optional[float]:
        """Estimate how many turns until total context is exhausted."""
        rate = self.burn_rate
        if rate <= 0:
            return None
        total_used = sum(b.used for b in self.budgets.values())
        remaining = self.max_context_tokens - total_used
        return max(0.0, remaining / rate)

    # ── Reporting ──

    @property
    def total_used(self) -> int:
        return sum(b.used for b in self.budgets.values())

    @property
    def total_utilization(self) -> float:
        return self.total_used / self.max_context_tokens

    def report(self) -> dict:
        """Full budget status report for observability pipeline."""
        components = {}
        for comp, budget in self.budgets.items():
            components[comp.value] = {
                "used": budget.used,
                "limit": budget.hard_limit_tokens,
                "remaining": budget.remaining,
                "utilization": round(budget.utilization, 2),
                "soft_exceeded": budget.is_soft_exceeded,
                "hard_exceeded": budget.is_hard_exceeded,
            }
        return {
            "components": components,
            "total_used": self.total_used,
            "total_utilization": round(self.total_utilization, 2),
            "burn_rate_tokens_per_turn": round(self.burn_rate, 1),
            "turns_until_exhausted": self.turns_until_exhausted(),
        }


# ── Usage ──

def on_warning(comp, budget, projected):
    print(f"[BUDGET WARN] {comp.value}: {budget.used}/{budget.hard_limit_tokens} "
          f"→ {projected} ({budget.utilization:.0%}→{projected/budget.hard_limit_tokens:.0%})")

def on_violation(comp, budget, projected):
    print(f"[BUDGET BLOCK] {comp.value}: would exceed hard limit "
          f"({projected} > {budget.hard_limit_tokens})")

mgr = TokenBudgetManager(max_context_tokens=128_000,
                         on_soft_warning=on_warning,
                         on_hard_violation=on_violation)

# Track initial allocations
mgr.track(BudgetComponent.SYSTEM_PROMPT, 5_000)
mgr.track(BudgetComponent.TOOL_DEFINITIONS, 3_000)
mgr.track(BudgetComponent.MESSAGE_HISTORY, 45_000)

# Check before adding a large tool result
check = mgr.check_before_add(BudgetComponent.TOOL_RESULTS, 8_000)
print(f"Add 8K tool result: {check}")

# Simulate burn-rate tracking across turns
for turn_tokens in [2_000, 3_500, 2_800, 5_200, 3_100]:
    mgr.record_turn(turn_tokens)

print(f"Total utilization: {mgr.total_utilization:.1%}")
print(f"Burn rate: {mgr.burn_rate:.0f} tokens/turn")
print(f"Turns until exhausted: {mgr.turns_until_exhausted()}")

The TokenBudgetManager is the financial controller of the context window. Its check_before_add method is the gate that prevents runaway consumption — called before every content addition, it either approves the addition, warns of approaching limits, or blocks outright. The design choice to separate tracking (track) from enforcement (check_before_add) is deliberate: tracking is always safe; enforcement involves policy decisions that may need to vary by task phase or severity level.

Trade-off — Precise Tracking vs Estimation Overhead: Token counting is inherently imprecise. Different tokenizers produce different counts for the same text. The budget manager should use the same tokenizer as the target model, but even then, API overhead (message formatting, role tokens) adds ~3--5% that client-side counting misses. The pragmatic approach: track client-side with a 5% safety margin. Running at 95% client-side utilization is effectively at 100% model-side. This safety margin is built into the soft-limit positioning at 80% — there's headroom for estimation error.

For the protocol layer that routes content into the correct budget bucket based on envelope type, see Agent Context Protocol Design.

6. Cross-Window State Management: Continue Work Across Context Windows

Compression and eviction extend the life of a context window, but they cannot make it infinite. Some tasks — multi-hour code migrations, exhaustive security audits, long-running research — will inevitably exhaust even a well-managed window. When that happens, the agent must cross the window boundary: serialize its state before the current window closes, and reconstruct it in a fresh window. This is the last line of defense — the mechanism that turns a finite context window into an effectively unbounded agent runtime.

The Serialization Contract: What Must Survive

Not everything in the context window is worth carrying across the boundary. The serialization contract defines exactly what state must survive a window reset for the agent to resume without re-deriving previous work:

State ArtifactSerialized AsLoss ConsequencePriority
Task Goal + ConstraintsVerbatim textAgent forgets what it's building🔴 Critical
Progress StateChecklist (done/current/todo)Duplicate work or skip steps🔴 Critical
Architectural DecisionsADR log (decision, rationale, alternatives)Re-debate settled questions🟡 High
Open Issues/BlockersIssue list with reproduction stepsLose awareness of unresolved problems🟡 High
Environment StatePaths, git status, service healthOperate in wrong context🟡 High
Key LearningsPattern logRepeat known-bad approaches🟠 Medium
Intermediate ReasoningOmitted (inferrable from decisions)Minor re-derivation cost🟢 Low

The crucial design decision: The serialized state object is not a compressed version of the context window — it is a structured, machine-parseable checkpoint. The difference is profound. A compressed summary says "the agent was working on X and found Y." A structured checkpoint says {"current_subtask": "replace axios in api.ts", "completed": ["scan references", "build error handler"], "blockers": []}. The summary is for human reading; the checkpoint is for programmatic resumption.

📌 Design Principle — State vs Context: State is what the agent knows and has decided. Context is what the agent sees right now. The serialization contract carries state across windows; context is rebuilt from state during the bootstrap sequence. Confusing the two — trying to serialize the full context — leads to unbounded state objects that defeat the purpose of the window reset. Carry state, rebuild context.

The Bootstrap Sequence: Cold-Starting a Resume

When a fresh context window opens and loads serialized state, the agent executes a standardized bootstrap sequence to re-establish operational readiness:

  1. Orient: Verify working directory, git branch, tool availability — the environment must match what the state expects.
  2. Load state: Parse the serialized state file. Validate schema completeness (task goal present? progress list well-formed?).
  3. Verify environment: Run git status, check running services, confirm file paths still exist. Environment drift between windows is the most common cause of resume failures.
  4. Inject context: Build the initial context window from state — task goal, current subtask, recent decisions, open issues, compaction summary from previous window.
  5. Resume: Begin execution from current_subtask. The agent should not re-derive, re-search, or re-analyze anything captured in state.

Trade-off — Eager vs Lazy State Loading: Eager loading (inject all state into the new window immediately) gives the agent full awareness but consumes the new window's token budget upfront. Lazy loading (inject only the current subtask; retrieve other state on demand) conserves tokens but risks the agent making decisions without full context. For most agent tasks, eager loading with prioritization works best: inject the top-priority state (goal, current task, blockers) immediately, and append lower-priority state (learnings, archived decisions) as the window has room.

Three Failure Modes of Cross-Window State

Failure 1 — Orphan State

The agent writes "plan to refactor user-service next" to the state file but is interrupted before executing. The next window reads this as a completed plan and skips it — or worse, assumes the refactor was done. Fix: Distinguish planned, in_progress, and done statuses. On window resume, treat planned items as unexecuted intentions.

Failure 2 — Stale Checkpoints

Window 3 writes state. Window 4 starts, makes progress, writes updated state. Then a bug causes window 3's old state file to be read — reverting progress. Fix: Monotonically incrementing window ID + state version number. On load, reject any state with version < current version. This is a simple optimistic concurrency control.

Failure 3 — Implicit State

The agent "knows" something from earlier in the conversation but never writes it to the state object — because it assumed it would always be in context. After window reset, that knowledge is gone. Fix: The serialization contract must be exhaustive. If the agent learned it, it must serialize it. This is a discipline problem, not a technical one — the state manager can validate schema but cannot detect missing information.

Cross-window state management is the recovery path for context overflow, making it a natural integration point with Agent Error Recovery — the error recovery system detects the overflow, triggers state serialization before the crash, and hands off to the bootstrap sequence for the next window.

Code: CrossWindowStateManager

from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Optional
import json
import os

@dataclass
class TaskCheckpoint:
    """Serializable agent state that survives context-window boundaries.

    DESIGN RULE: Every field must be populated before serialization.
    Missing fields = lost information = agent amnesia on resume."""
    window_id: int = 1
    state_version: int = 1
    task_goal: str = ""
    success_criteria: list = field(default_factory=list)
    constraints: list = field(default_factory=list)
    completed_subtasks: list = field(default_factory=list)
    current_subtask: str = ""
    remaining_subtasks: list = field(default_factory=list)
    decisions: list = field(default_factory=list)       # ADR entries
    open_issues: list = field(default_factory=list)     # blockers + bugs
    learnings: list = field(default_factory=list)       # patterns + pitfalls
    compaction_summary: str = ""                         # previous window's compressed history
    saved_at: str = ""

    def increment_version(self):
        self.state_version += 1


@dataclass
class CrossWindowStateManager:
    """Manages serialization, deserialization, validation, and bootstrap
    prompt generation for cross-window agent state.

    Integration: called by ContextWindowManager.on_context_overflow() to
    serialize state before window close; called at window start to load
    and validate checkpoint for resume."""

    state_file: str = "agent_state.json"
    progress_file: str = "agent_progress.md"

    # ── Serialization ──

    def save(self, checkpoint: TaskCheckpoint):
        """Serialize state to disk before context window closes.

        Writes both JSON (machine-parseable, for automated resume) and
        Markdown (human-readable, for debugging and audit)."""
        checkpoint.saved_at = datetime.now(timezone.utc).isoformat()
        checkpoint.increment_version()

        # Machine-parseable checkpoint
        with open(self.state_file, "w", encoding="utf-8") as f:
            json.dump(asdict(checkpoint), f, ensure_ascii=False, indent=2)

        # Human-readable progress log
        self._write_progress_md(checkpoint)

    def load(self) -> Optional[TaskCheckpoint]:
        """Load serialized state at new window start.

        Returns None if no prior state exists (cold start).
        Validates schema completeness before returning."""
        if not os.path.exists(self.state_file):
            return None
        with open(self.state_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        checkpoint = TaskCheckpoint(**{k: v for k, v in data.items()
                                        if k in TaskCheckpoint.__dataclass_fields__})
        validation = self.validate(checkpoint)
        if validation["errors"]:
            print(f"[STATE VALIDATION] Warnings: {validation['errors']}")
        return checkpoint

    # ── Bootstrap ──

    def bootstrap_prompt(self, cp: TaskCheckpoint) -> str:
        """Generate the initial system injection for a resumed window.

        The output text is inserted at the top of the new context window,
        providing the agent with everything it needs to resume immediately
        without re-deriving previous decisions."""
        lines = [
            "[CONTEXT_WINDOW_RESUME] Resuming from checkpoint.",
            f"Window: {cp.window_id}  |  Version: {cp.state_version}",
            f"Task: {cp.task_goal}",
            "",
            f"Completed ({len(cp.completed_subtasks)}):",
        ]
        for t in cp.completed_subtasks[-10:]:
            lines.append(f"  [DONE] {t}")

        lines.append(f"\nCurrent: {cp.current_subtask or '(none specified)'}")

        lines.append(f"\nRemaining ({len(cp.remaining_subtasks)}):")
        for t in cp.remaining_subtasks[:10]:
            lines.append(f"  [TODO] {t}")

        if cp.open_issues:
            lines.append(f"\nOpen Issues ({len(cp.open_issues)}):")
            for issue in cp.open_issues:
                lines.append(f"  [!] {issue}")

        if cp.decisions:
            lines.append(f"\nKey Decisions ({len(cp.decisions)}):")
            for d in cp.decisions[-5:]:
                lines.append(f"  [DEC] {d}")

        if cp.compaction_summary:
            lines.append(f"\nPrevious Window Summary:\n{cp.compaction_summary}")

        return "\n".join(lines)

    # ── Validation ──

    def validate(self, cp: TaskCheckpoint) -> dict:
        """Schema and integrity validation before resuming from checkpoint.

        Returns {"errors": [...], "warnings": [...]}. Errors indicate the
        checkpoint is likely unusable; warnings indicate recoverable issues."""
        errors, warnings = [], []

        if not cp.task_goal:
            errors.append("Missing task_goal — agent will have no objective")
        if not cp.current_subtask and cp.remaining_subtasks:
            warnings.append("Has remaining subtasks but no current_subtask — "
                           "agent may pick wrong starting point")
        if cp.window_id < 1:
            errors.append(f"Invalid window_id: {cp.window_id}")
        if cp.state_version < 1:
            errors.append(f"Invalid state_version: {cp.state_version}")

        return {"errors": errors, "warnings": warnings, "valid": len(errors) == 0}

    # ── Internal ──

    def _write_progress_md(self, cp: TaskCheckpoint):
        with open(self.progress_file, "w", encoding="utf-8") as f:
            f.write(f"# Agent Progress — Window {cp.window_id} (v{cp.state_version})\n\n")
            f.write(f"**Goal:** {cp.task_goal}\n\n")
            f.write(f"**Saved:** {cp.saved_at}\n\n")
            f.write("## ✅ Completed\n")
            for t in cp.completed_subtasks:
                f.write(f"- [x] {t}\n")
            f.write(f"\n## 🔄 Current\n- {cp.current_subtask}\n\n")
            f.write("## 📋 Remaining\n")
            for t in cp.remaining_subtasks:
                f.write(f"- [ ] {t}\n")
            if cp.open_issues:
                f.write(f"\n## ⚠️ Open Issues\n")
                for issue in cp.open_issues:
                    f.write(f"- {issue}\n")
            if cp.decisions:
                f.write(f"\n## 📐 Decisions\n")
                for d in cp.decisions:
                    f.write(f"- {d}\n")


# ── Usage: window boundary crossing ──

mgr = CrossWindowStateManager()

# Window 1: save before forced reset
cp = TaskCheckpoint(
    window_id=1,
    task_goal="Migrate user-service REST calls from axios to fetch",
    completed_subtasks=["Scanned 23 axios references", "Built error-handling wrapper",
                        "Migrated 12/23 files"],
    current_subtask="Migrate api.ts (file 13/23)",
    remaining_subtasks=["Migrate remaining 10 files", "Integration tests",
                        "Canary deploy"],
    decisions=["Use native fetch + custom error wrapper (not a library)",
               "Keep response interceptor pattern for consistency"],
    open_issues=["api.ts:42 — type incompatibility after migration"],
    learnings=["v3 API uses /v2/ prefix, not /v1/"],
    compaction_summary="Migration 52% complete. 12 files done, api.ts in progress. "
                       "One type error at api.ts:42 unresolved.",
)
mgr.save(cp)

# Window 2: load and resume
loaded = mgr.load()
if loaded:
    bootstrap = mgr.bootstrap_prompt(loaded)
    # Inject bootstrap into new context window as first system message
    print(bootstrap[:300] + "...")

The CrossWindowStateManager is the safety net that makes long-running agents viable. Without it, every context overflow is a hard crash with total state loss. With it, an overflow becomes a minor checkpoint — a moment of serialization followed by clean resumption. The dual-format output (JSON for machines, Markdown for humans) is a deliberate design choice: JSON enables automated resumption without parsing ambiguity; Markdown enables debugging when something goes wrong and a human needs to inspect what the agent thought it knew.

Trade-off — Checkpoint Frequency: Saving after every turn guarantees minimal data loss but adds I/O overhead. Saving only at overflow risks losing turns of work if the overflow is sudden. The recommended approach: save at every major milestone (subtask completion, decision made, blocker found) plus a safety save whenever utilization crosses 85%. This balances overhead against data-loss risk. The checkpoint itself is tiny (~2--5KB) so I/O cost is negligible; the real cost is the discipline of keeping the state object current.

7. Context Health Monitoring: Metrics, Alerts, and Fidelity Evaluation

You've implemented pressure detection, eviction, compression, budgeting, and cross-window state. The systems are running. But a question remains: are they working? Is compression losing critical information? Is eviction removing content the agent needed three turns later? Is the budget allocation appropriate for this task type? Without monitoring, you're flying blind — you won't know about failures until the agent produces visibly wrong output, which in production can mean hours of wasted compute and incorrect results.

Six Metrics That Matter

Not all metrics are equal. The six below form the minimum viable set for context health monitoring — they answer the questions "is the window healthy?" and "are the management interventions working?":

MetricWhat It MeasuresHealthy RangeAlert ThresholdAction If Breached
UtilizationCurrent tokens / max window<60%>90% for 3+ turnsEmergency compaction
Eviction RateBlocks evicted per turn<2/turn>5/turn sustainedWindow may be undersized; review eviction policy
Compression RatioOriginal tokens / compressed tokens3x--15x<2x consistentlyCompaction prompt needs redesign
Burn RateTokens consumed per turnStable or linearSudden 3x+ spikePossible runaway loop; escalate to error recovery
Compression FidelityKey information retention post-compression>0.85<0.75Compression too aggressive; add preservation rules
Tool Result BloatTool result tokens / total context10--15%>30%Tools returning too much data; add output limits

Trade-off — Metric Granularity vs Observability Cost: Emitting per-turn metrics with 6 dimensions costs negligible compute (~5ms per turn) but provides rich diagnostic data. Emitting only utilization is simpler but leaves you blind to why utilization is high. The recommendation: emit all six. The cost of not knowing why your agent is failing far exceeds the cost of collecting the metrics. For the observability infrastructure that ingests these metrics, see Agent Observability.

Compression Fidelity: The Hardest Metric

Compression ratio is easy to measure — it's just division. Fidelity is hard because it asks: "did the compressed version preserve the information needed for correct decision-making?" This cannot be measured by token counting alone. The standard approach is LLM-as-Judge evaluation:

  1. Prepare a test suite: For each agent task type, create N questions that probe critical information — "What was the decision about error handling?" "What file is currently being modified?" "What unresolved issue was found?"
  2. Query both contexts: Ask the same question against the original (pre-compression) context and the compressed context. Record both answers.
  3. Judge consistency: A separate LLM call (the "judge") compares the two answers and scores whether they convey the same key information. The judge prompt must be precise: "Do these two answers agree on the factual claims they make? Ignore phrasing differences. Answer YES or NO."
  4. Compute fidelity: fidelity = consistent_answers / total_questions. Target >0.85.
📌 Fidelity Evaluation Is a Compiler for Your Compression Prompt: Treat fidelity evaluation like a unit test — it should run in CI whenever the compaction prompt changes. A prompt change that improves compression ratio from 5x to 8x but drops fidelity from 0.90 to 0.70 is a regression, not an improvement. The fidelity score is the constraint that allows you to safely tune for higher compression ratios.

Trade-off — LLM-as-Judge vs Human Evaluation: LLM-as-Judge is cheap, fast, and automatable — but imperfect. Judges can miss subtle information loss (a nuance that matters in context but not in isolation). Human evaluation is the gold standard but costs time and doesn't scale. The pragmatic approach: use LLM-as-Judge for CI regression testing (every prompt change), and periodically validate the judge itself with human spot-checks (every 10--20 runs). If the judge consistently gives high fidelity scores but human reviewers spot critical losses, the judge prompt needs refinement.

Alerting: When Metrics Become Actions

Metrics without alerts are dashboards people forget to check. Alerts without severity gradation create alert fatigue. The context health monitor emits alerts at three levels:

Trade-off — Alert Thresholds: Tight thresholds (alert at 85% utilization) create false positives — alerts that fire when no real problem exists, training operators to ignore them. Loose thresholds (alert at 98%) create false negatives — the overflow happens before the alert fires. The thresholds in this implementation (90% for CRITICAL, 75% fidelity floor) are calibrated for 128K windows with typical agent workloads. Tune them based on your agent's velocity profile and risk tolerance.

Code: ContextHealthMonitor

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Callable
from enum import Enum

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class HealthSnapshot:
    """A single point-in-time measurement of context window health."""
    timestamp: str = ""
    utilization: float = 0.0
    eviction_rate: float = 0.0           # evictions per turn
    compression_ratio: float = 1.0        # original / compressed (>1 = effective)
    burn_rate: float = 0.0               # tokens per turn
    fidelity_score: float = 1.0           # 0.0–1.0
    tool_result_bloat: float = 0.0        # tool result tokens / total tokens
    window_id: int = 1


@dataclass
class ContextHealthMonitor:
    """Collects health metrics from all subsystems, evaluates compression
    fidelity via LLM-as-Judge, and emits graded alerts to observability.

    Integration: called per-turn (collect) and post-compression (evaluate_fidelity).
    Alerts feed into the observability pipeline for dashboards and notifications."""

    metrics_history: list = field(default_factory=list)
    max_history: int = 100
    fidelity_test_questions: list = field(default_factory=list)
    on_alert: Optional[Callable] = None
    _eviction_counter: int = 0
    _turn_counter: int = 0

    # ── Collection ──

    def collect(self, pressure_monitor, eviction_engine,
                budget_manager) -> HealthSnapshot:
        """Gather metrics from all active subsystems into a health snapshot."""
        self._turn_counter += 1
        snap = HealthSnapshot(
            timestamp=datetime.now(timezone.utc).isoformat(),
            utilization=pressure_monitor.utilization(),
            eviction_rate=self._eviction_counter / max(self._turn_counter, 1),
            burn_rate=budget_manager.burn_rate,
            window_id=getattr(pressure_monitor, '_turn_count', 0),
        )

        # Tool result bloat: what fraction of context is tool output?
        total_used = budget_manager.total_used
        tr_used = budget_manager.budgets.get(
            type(budget_manager).__dataclass_fields__['budgets'].type.__args__[0]
            if hasattr(budget_manager, 'budgets') else None, None)
        snap.tool_result_bloat = (tr_used.used / max(total_used, 1)
                                  if tr_used else 0.0)

        self.metrics_history.append(snap)
        if len(self.metrics_history) > self.max_history:
            self.metrics_history.pop(0)

        return snap

    def record_eviction(self, count: int = 1):
        """Called by EvictionEngine after each eviction operation."""
        self._eviction_counter += count

    # ── Fidelity Evaluation ──

    def evaluate_fidelity(self, original_context: str,
                          compressed_context: str,
                          llm_judge: Callable[[str], str]) -> float:
        """LLM-as-Judge compression fidelity evaluation.

        llm_judge is a function (prompt: str) -> response: str.
        Returns fidelity score 0.0–1.0."""
        if not self.fidelity_test_questions:
            return 1.0  # No test suite = assume fidelity is fine

        consistent = 0
        for i, question in enumerate(self.fidelity_test_questions):
            # Query original context
            a_orig = llm_judge(
                f"Context:\n{original_context[:8000]}\n\n"
                f"Question: {question}\nAnswer in 1-2 sentences.")

            # Query compressed context
            a_comp = llm_judge(
                f"Context:\n{compressed_context[:8000]}\n\n"
                f"Question: {question}\nAnswer in 1-2 sentences.")

            # Judge consistency
            verdict = llm_judge(
                f"Compare these two answers. Do they convey the SAME key "
                f"factual information? Ignore wording differences.\n\n"
                f"Answer A: {a_orig}\nAnswer B: {a_comp}\n\n"
                f"Reply ONLY 'yes' or 'no'.")
            if "yes" in verdict.lower():
                consistent += 1

        return consistent / len(self.fidelity_test_questions)

    # ── Alerting ──

    def check_alerts(self, snap: HealthSnapshot) -> list:
        """Evaluate health snapshot against alert thresholds.

        Returns list of alert dicts: {level, metric, message, value}."""
        alerts = []

        # CRITICAL: utilization > 90%
        if snap.utilization > 0.90:
            alerts.append({
                "level": AlertLevel.CRITICAL,
                "metric": "utilization",
                "message": f"Context utilization at {snap.utilization:.1%} — "
                           f"overflow imminent",
                "value": snap.utilization,
            })

        # WARNING: high eviction rate
        if snap.eviction_rate > 5.0:
            alerts.append({
                "level": AlertLevel.WARNING,
                "metric": "eviction_rate",
                "message": f"Eviction rate {snap.eviction_rate:.1f}/turn — "
                           f"window may be undersized",
                "value": snap.eviction_rate,
            })

        # WARNING: low compression fidelity
        if snap.fidelity_score < 0.75:
            alerts.append({
                "level": AlertLevel.WARNING,
                "metric": "fidelity_score",
                "message": f"Compression fidelity {snap.fidelity_score:.2f} "
                           f"below 0.75 threshold — review compaction prompt",
                "value": snap.fidelity_score,
            })

        # WARNING: ineffective compression
        if 1.0 < snap.compression_ratio < 2.0:
            alerts.append({
                "level": AlertLevel.WARNING,
                "metric": "compression_ratio",
                "message": f"Compression ratio only {snap.compression_ratio:.1f}x "
                           f"— compaction prompt may need redesign",
                "value": snap.compression_ratio,
            })

        # WARNING: tool result bloat
        if snap.tool_result_bloat > 0.30:
            alerts.append({
                "level": AlertLevel.WARNING,
                "metric": "tool_result_bloat",
                "message": f"Tool results consuming {snap.tool_result_bloat:.0%} "
                           f"of context — add output limits to tools",
                "value": snap.tool_result_bloat,
            })

        for alert in alerts:
            if self.on_alert:
                self.on_alert(alert)

        return alerts

    # ── Trend Analysis ──

    def trend_report(self) -> dict:
        """Analyze metric trends over recent history for proactive detection."""
        if len(self.metrics_history) < 5:
            return {"status": "insufficient_data", "samples": len(self.metrics_history)}

        recent = self.metrics_history[-10:]
        first, last = recent[0], recent[-1]

        return {
            "samples": len(recent),
            "utilization_delta": round(last.utilization - first.utilization, 3),
            "burn_rate_delta": round(last.burn_rate - first.burn_rate, 1),
            "avg_fidelity": round(
                sum(m.fidelity_score for m in recent) / len(recent), 2),
            "utilization_trend": "rising" if last.utilization > first.utilization
                                 else "falling" if last.utilization < first.utilization
                                 else "stable",
        }


# ── Usage ──

def mock_judge(prompt: str) -> str:
    """Simulated LLM judge — in production, this calls an actual model."""
    return "yes" if "error handling" in prompt.lower() else "no"

health = ContextHealthMonitor(
    fidelity_test_questions=[
        "What is the current task goal?",
        "What step is currently in progress?",
        "What architectural decision was made about error handling?",
        "Are there any unresolved issues or blockers?",
    ],
    on_alert=lambda a: print(f"[{a['level'].value.upper()}] {a['message']}"),
)

# Evaluate fidelity after a compaction
fidelity = health.evaluate_fidelity(
    original_context="Decided to use native fetch with custom error wrapper. "
                     "Currently migrating api.ts (file 13/23). "
                     "One unresolved type error at api.ts:42.",
    compressed_context="Using native fetch + error wrapper. Migrating api.ts. "
                        "Type error at api.ts:42 unresolved.",
    llm_judge=mock_judge,
)
print(f"Compression fidelity: {fidelity:.2f}")

# Check alerts on a snapshot
snap = HealthSnapshot(
    utilization=0.92, eviction_rate=6.0,
    compression_ratio=3.5, burn_rate=2500.0,
    fidelity_score=fidelity, tool_result_bloat=0.35
)
alerts = health.check_alerts(snap)
for a in alerts:
    print(f"  [{a['level'].value}] {a['metric']}: {a['message']}")

The ContextHealthMonitor closes the feedback loop. Every management decision — evict this block, compress this conversation, allocate this budget — produces measurable effects. The monitor captures those effects, evaluates them against thresholds, and surfaces problems before they become failures. The fidelity evaluation in particular is the quality gate for the entire compression subsystem: without it, you can endlessly tune compaction prompts for higher ratios with no awareness that you're silently losing critical state.

For the evaluation infrastructure that provides the LLM-as-Judge capabilities and test suite management, see Agent Evaluation Framework. For the audit trail that records every health snapshot as an immutable event, see Agent Audit Log Design.

8. Putting It All Together: The Complete ContextWindowManager Architecture

The previous seven sections designed six independent subsystems. But an agent doesn't call six subsystems — it calls one. The ContextWindowManager is the unified orchestrator that wires pressure monitoring, eviction, compression, budgeting, cross-window state, and health monitoring into a single integration surface. This section presents the complete architecture and its integration into the agent loop.

Architecture: Six Subsystems, One Orchestrator


  ┌──────────────────────────────────────────────────────────────────┐
  │                        Agent Loop                                │
  │  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐      │
  │  │  Think  │───▶│   Act   │───▶│ Observe │───▶│  Think  │ ...  │
  │  └────┬────┘    └────┬────┘    └────┬────┘    └─────────┘      │
  │       │              │              │                            │
  └───────┼──────────────┼──────────────┼────────────────────────────┘
          │              │              │
          ▼              ▼              ▼
  ┌───────────────────────────────────────────────────────────────────┐
  │                    ContextWindowManager                            │
  │                                                                   │
  │  ┌─────────────────┐  ┌─────────────────┐  ┌───────────────────┐  │
  │  │ PressureMonitor  │  │  EvictionEngine  │  │ CompressionEngine │  │
  │  │                  │  │                  │  │                   │  │
  │  │ • utilization    │  │ • FIFO           │  │ • Compaction       │  │
  │  │ • velocity       │  │ • LRU            │  │ • Note-Taking     │  │
  │  │ • zone detection │  │ • Priority       │  │ • Tool Summarize  │  │
  │  │ • spike alerts   │  │ • Semantic Merge │  │ • Progressive     │  │
  │  │                  │  │ • Type-Based     │  │ • Sub-Agent       │  │
  │  │                  │  │ • Hybrid         │  │                   │  │
  │  └────────┬─────────┘  └────────┬─────────┘  └─────────┬─────────┘  │
  │           │                     │                       │            │
  │  ┌────────┴─────────┐  ┌────────┴─────────┐                           │
  │  │ TokenBudgetMgr   │  │ CrossWindowState │                           │
  │  │                  │  │                  │                           │
  │  │ • 6 components   │  │ • Serialization  │                           │
  │  │ • soft/hard lim. │  │ • Bootstrap      │                           │
  │  │ • burn rate      │  │ • Validation     │                           │
  │  └────────┬─────────┘  └────────┬─────────┘                           │
  │           │                     │                                     │
  │  ┌────────┴─────────────────────┴─────────┐                          │
  │  │          ContextHealthMonitor          │                          │
  │  │  • Metrics collection                  │                          │
  │  │  • Fidelity evaluation (LLM-as-Judge)  │                          │
  │  │  • Alerting (CRITICAL/WARNING/INFO)    │                          │
  │  └────────────────────┬───────────────────┘                          │
  │                       │                                              │
  └───────────────────────┼──────────────────────────────────────────────┘
                          │
                          ▼
            ┌─────────────────────────┐
            │   Observability Pipeline │
            │   (Prometheus + Grafana) │
            └─────────────────────────┘
  

Lifecycle: How an Agent Task Flows Through the Manager

From task start to completion, every phase of context management has a clear subsystem owner:

  1. Task Start: TokenBudgetManager allocates budgets from config. CrossWindowStateManager checks for a prior checkpoint (resume) or initializes fresh (cold start). ContextPressureMonitor begins tracking utilization.
  2. Before Each LLM Call (on_turn_start): Monitor checks utilization → under 50%: no action; 50--75%: evaluate eviction candidates; 75--90%: execute eviction + tool-result summarization; 90--95%: conversation compaction; 95%+: emergency cross-window save.
  3. After Each LLM Call (on_turn_end): Budget manager records token consumption. Monitor updates utilization and velocity. Health monitor collects metrics snapshot.
  4. After Tool Call (on_tool_result): Budget manager tracks tool result tokens. If output exceeds summarization threshold, compress immediately. Eviction engine evaluates whether old tool results should be cleared.
  5. On Context Overflow (on_context_overflow): Cross-window state manager serializes checkpoint. Health monitor emits final metrics. Context window resets. Bootstrap sequence loads checkpoint into new window. Agent resumes.
  6. Task Complete: Final compaction summary generated. Checkpoint archived. Health monitor emits completion snapshot with trend report.

Trade-off — Tight vs Loose Coupling: The orchestrator design intentionally does not hardwire the six subsystems together. Each subsystem is instantiated independently and passed to the manager. This loose coupling means you can swap the eviction engine's policy without touching anything else, replace the compression engine with a server-side API implementation, or run without the health monitor in environments where metrics aren't needed. The manager is a conductor, not a monolith. This design philosophy is shared with Multi-Agent Orchestration — each component has a clear interface contract; the orchestrator composes them.

Code: ContextWindowManager

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Callable
from enum import Enum


class WindowStatus(Enum):
    HEALTHY = "healthy"               # Green: normal operation
    PREPARING = "preparing"           # Yellow: evaluating candidates
    ACTIVE_MANAGEMENT = "active"      # Orange: evicting + compressing
    EMERGENCY = "emergency"           # Red: cross-window imminent


@dataclass
class ContextWindowManager:
    """Unified orchestrator for all context window management subsystems.

    This is the SINGLE integration point for the agent loop. The agent
    calls four hooks per cycle:
      on_turn_start()  → before LLM call
      on_turn_end()    → after LLM call
      on_tool_result() → after tool execution
      on_overflow()    → when context is about to overflow

    Each hook delegates to the appropriate subsystem based on current
    pressure zone and budget state."""

    # ── Subsystem instances (injected, not created) ──
    pressure_monitor: object = None      # ContextPressureMonitor
    eviction_engine: object = None       # EvictionEngine
    compression_engine: object = None    # CompressionEngine
    budget_manager: object = None        # TokenBudgetManager
    state_manager: object = None         # CrossWindowStateManager
    health_monitor: object = None        # ContextHealthMonitor

    # ── Configuration ──
    max_context_tokens: int = 128_000
    eviction_policy: str = "hybrid"
    task_goal: str = ""
    task_success_criteria: list = field(default_factory=list)

    # ── Runtime state ──
    status: WindowStatus = WindowStatus.HEALTHY
    turn_count: int = 0
    windows_created: int = 1
    context_blocks: list = field(default_factory=list)
    audit_log: list = field(default_factory=list)

    # ── Callbacks for external integration ──
    on_status_change: Optional[Callable] = None
    on_overflow_detected: Optional[Callable] = None

    # ═══════════════════════════════════════════════════════════════
    #  Hook 1: Before LLM Call
    # ═══════════════════════════════════════════════════════════════

    def on_turn_start(self, estimated_response_tokens: int = 4_000) -> dict:
        """Called before every LLM invocation.

        Evaluates current context pressure, executes appropriate management
        action, and returns a decision dict consumed by the agent loop."""
        self.turn_count += 1
        utilization = self.pressure_monitor.utilization()

        # Verify output reserve
        output_check = self.budget_manager.check_before_add(
            type(self.budget_manager).__dataclass_fields__['budgets'].type
            if hasattr(self.budget_manager, 'budgets') else None,
            estimated_response_tokens)

        # ── Zone-based decision routing ──
        if utilization >= 0.95:
            return self._emergency(utilization)
        elif utilization >= 0.75:
            return self._active_management(utilization)
        elif utilization >= 0.50:
            return self._preparing(utilization)
        else:
            self._set_status(WindowStatus.HEALTHY)
            return {"action": "none", "utilization": utilization}

    # ═══════════════════════════════════════════════════════════════
    #  Hook 2: After LLM Call
    # ═══════════════════════════════════════════════════════════════

    def on_turn_end(self, response_tokens: int):
        """Called after LLM response is received.

        Updates pressure tracking, budget consumption, and health metrics."""
        self.pressure_monitor.update(response_tokens)
        self.budget_manager.track(
            self._resolve_component("message_history"), response_tokens)
        self.budget_manager.record_turn(response_tokens)

        # Collect health snapshot
        if self.health_monitor:
            snap = self.health_monitor.collect(
                self.pressure_monitor, self.eviction_engine,
                self.budget_manager)
            self.health_monitor.check_alerts(snap)

    # ═══════════════════════════════════════════════════════════════
    #  Hook 3: After Tool Execution
    # ═══════════════════════════════════════════════════════════════

    def on_tool_result(self, tool_name: str, result: str,
                       token_count: int):
        """Called after a tool returns its output.

        Tracks budget, triggers summarization for large outputs,
        and evaluates eviction candidates for old tool results."""
        # Budget tracking
        self.budget_manager.track(
            self._resolve_component("tool_results"), token_count)

        # Summarize large tool outputs immediately
        if token_count > 500:
            if self.compression_engine:
                result = self.compression_engine.compress_tool_result(
                    tool_name, result, self._get_llm_call())

        # Register as context block for eviction scoring
        # (simplified — in production, ContentBlock from EvictionEngine)
        self.context_blocks.append({
            "id": f"tool_{self.turn_count}_{tool_name}",
            "type": "tool_result",
            "tool": tool_name,
            "tokens": token_count,
            "turn": self.turn_count,
            "content_snippet": result[:200],
        })

        # Pressure update
        self.pressure_monitor.update(token_count)

        self._log("tool_result", {"tool": tool_name, "tokens": token_count})

    # ═══════════════════════════════════════════════════════════════
    #  Hook 4: Context Overflow
    # ═══════════════════════════════════════════════════════════════

    def on_overflow(self) -> dict:
        """Called when context is about to overflow.

        Serializes agent state, prepares for window reset, and emits
        final health metrics before the window closes."""
        self._set_status(WindowStatus.EMERGENCY)
        self._log("overflow", {"turn": self.turn_count})

        # Build compaction summary for next window
        summary = ""
        if self.compression_engine:
            history_text = self._format_blocks()
            result = self.compression_engine.compact_conversation(
                history_text, self.task_goal, len(history_text) // 4,
                self._get_llm_call())
            summary = result.compressed_content

        # Serialize state for cross-window resume
        if self.state_manager:
            from cross_window_state_manager import TaskCheckpoint
            cp = TaskCheckpoint(
                window_id=self.windows_created,
                task_goal=self.task_goal,
                success_criteria=self.task_success_criteria,
                compaction_summary=summary,
            )
            self.state_manager.save(cp)

        # Final health snapshot
        if self.health_monitor:
            snap = self.health_monitor.collect(
                self.pressure_monitor, self.eviction_engine,
                self.budget_manager)
            self.health_monitor.check_alerts(snap)

        self.windows_created += 1

        if self.on_overflow_detected:
            self.on_overflow_detected({
                "window": self.windows_created,
                "turn": self.turn_count,
                "summary_tokens": len(summary) // 4,
            })

        return {
            "action": "cross_window_save",
            "new_window_id": self.windows_created,
            "compaction_summary_tokens": len(summary) // 4,
        }

    # ═══════════════════════════════════════════════════════════════
    #  Internal: Decision Routers
    # ═══════════════════════════════════════════════════════════════

    def _preparing(self, utilization: float) -> dict:
        """Yellow zone: evaluate eviction candidates without acting."""
        self._set_status(WindowStatus.PREPARING)
        # In production, this calls eviction_engine.select_eviction_candidates
        # with target_free_tokens=0 to list candidates without removing them
        return {
            "action": "evaluate",
            "utilization": utilization,
            "recommendation": "Monitor closely — prepare eviction candidates",
        }

    def _active_management(self, utilization: float) -> dict:
        """Orange zone: execute eviction, then compression if needed."""
        self._set_status(WindowStatus.ACTIVE_MANAGEMENT)

        # Calculate target: aim to bring utilization down to ~60%
        target_utilization = 0.60
        target_tokens = int(self.max_context_tokens * target_utilization)
        current_tokens = int(self.max_context_tokens * utilization)
        target_free = current_tokens - target_tokens

        # Step 1: Evict low-priority content
        evicted_count = 0
        if self.eviction_engine and target_free > 0:
            # Convert context_blocks dicts to EvictionEngine ContentBlock objects
            blocks = self._to_content_blocks()
            result = self.eviction_engine.evict(
                blocks, target_free_tokens=target_free,
                current_turn=self.turn_count)
            evicted_count = len(result.blocks_evicted)
            if self.health_monitor:
                self.health_monitor.record_eviction(evicted_count)

        # Step 2: If eviction was insufficient, trigger compression cascade
        strategy = None
        if evicted_count == 0 and self.compression_engine:
            strategy = self.compression_engine.select_strategy(utilization)
            if strategy:
                self.compression_engine.execute_cascade(
                    self._format_blocks(), self.task_goal,
                    current_tokens, self.max_context_tokens,
                    self._get_llm_call())

        self._log("active_mgmt", {
            "evicted": evicted_count,
            "compression": strategy.value if strategy else None,
        })

        return {
            "action": "evict_and_compress",
            "utilization": utilization,
            "evicted_blocks": evicted_count,
            "compression_strategy": strategy.value if strategy else None,
        }

    def _emergency(self, utilization: float) -> dict:
        """Red zone: force compaction or trigger cross-window save."""
        self._set_status(WindowStatus.EMERGENCY)

        # Attempt emergency compaction
        if self.compression_engine:
            result = self.compression_engine.compact_conversation(
                self._format_blocks(), self.task_goal,
                int(self.max_context_tokens * utilization),
                self._get_llm_call())

        # If still at risk, trigger cross-window save
        new_utilization = self.pressure_monitor.utilization()
        if new_utilization >= 0.93:
            return self.on_overflow()

        self._log("emergency", {"utilization_before": utilization,
                                "utilization_after": new_utilization})

        return {
            "action": "emergency_compaction",
            "utilization_before": utilization,
            "utilization_after": new_utilization,
        }

    # ═══════════════════════════════════════════════════════════════
    #  Internal: Helpers
    # ═══════════════════════════════════════════════════════════════

    def _set_status(self, status: WindowStatus):
        prev = self.status
        self.status = status
        if prev != status and self.on_status_change:
            self.on_status_change(prev, status)

    def _log(self, event_type: str, metadata: dict):
        self.audit_log.append({
            "turn": self.turn_count,
            "type": event_type,
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "metadata": metadata,
        })

    def _format_blocks(self) -> str:
        return "\n".join(
            f"[{b.get('type', '?')} T{b['turn']}] {b.get('content_snippet', '')[:500]}"
            for b in self.context_blocks[-50:])

    def _to_content_blocks(self) -> list:
        """Convert internal block dicts to EvictionEngine ContentBlock objects."""
        blocks = []
        for b in self.context_blocks:
            try:
                from eviction_engine import ContentBlock, ContentType
                ct_map = {
                    "user_message": ContentType.USER_MESSAGE,
                    "assistant_response": ContentType.ASSISTANT_RESPONSE,
                    "tool_result": ContentType.TOOL_RESULT,
                    "memory_injection": ContentType.MEMORY_INJECTION,
                }
                blocks.append(ContentBlock(
                    block_id=b["id"],
                    content_type=ct_map.get(b.get("type", ""), ContentType.TOOL_RESULT),
                    content=b.get("content_snippet", ""),
                    token_count=b.get("tokens", 0),
                    turn_created=b.get("turn", 0)))
            except Exception:
                pass
        return blocks

    def _resolve_component(self, name: str):
        """Resolve a BudgetComponent by string name (bridge between subsystems)."""
        try:
            from token_budget_manager import BudgetComponent
            return BudgetComponent(name.upper())
        except Exception:
            return None

    def _get_llm_call(self) -> Callable:
        """Placeholder — in production, injected via dependency injection."""
        def mock_llm(system: str, user: str) -> str:
            return "[Compacted summary of conversation]"
        return mock_llm

    # ═══════════════════════════════════════════════════════════════
    #  Public API
    # ═══════════════════════════════════════════════════════════════

    def get_audit_trail(self) -> list:
        """Return complete audit log of all context management actions."""
        return self.audit_log

    def get_status_report(self) -> dict:
        """Aggregate status from all subsystems for dashboard display."""
        return {
            "window_status": self.status.value,
            "turn": self.turn_count,
            "windows_created": self.windows_created,
            "utilization": (self.pressure_monitor.utilization()
                           if self.pressure_monitor else 0.0),
            "budget_report": (self.budget_manager.report()
                             if self.budget_manager else {}),
            "audit_events": len(self.audit_log),
        }


# ═══════════════════════════════════════════════════════════════════
#  Configuration (YAML)
# ═══════════════════════════════════════════════════════════════════

# context_window_config.yaml
"""
max_context_tokens: 128000

pressure:
  thresholds:
    yellow: 0.50
    orange: 0.75
    red: 0.90
  velocity_window: 5

eviction:
  policy: hybrid
  weights:
    recency: 0.30
    priority: 0.25
    type_ttl: 0.20
    reference_freq: 0.15
    semantic_duplication: 0.10
  type_ttl:
    tool_result: 8
    thinking: 2
    memory_injection: 20
  protected_types: [system_prompt]

compression:
  summarization_threshold_tokens: 500
  compaction_utilization_threshold: 0.85
  cascade:
    0.75: tool_result_summarization
    0.85: progressive
    0.92: conversation_compaction
    0.95: sub_agent
  notes_file: agent_notes.md

budget:
  allocations:
    system_prompt: 0.06
    tool_definitions: 0.04
    message_history: 0.65
    tool_results: 0.12
    memory_injection: 0.08
    output_reserved: 0.05
  soft_limit_pct: 0.80

cross_window:
  state_file: agent_state.json
  progress_file: agent_progress.md
  auto_save_threshold: 0.85

health_monitoring:
  metrics:
    - utilization
    - eviction_rate
    - compression_ratio
    - burn_rate
    - fidelity_score
    - tool_result_bloat
  alerts:
    utilization_critical: 0.90
    eviction_rate_warning: 5
    fidelity_warning: 0.75
    compression_ratio_min: 2.0
    tool_bloat_warning: 0.30
"""

The ContextWindowManager is designed as a conductor, not a monolith. Each subsystem is instantiated independently and injected — you can swap the eviction policy, replace the compression engine with a server-side implementation, or omit the health monitor entirely without touching the orchestration logic. The four hook methods (on_turn_start, on_turn_end, on_tool_result, on_overflow) form the integration contract with the agent loop. Everything else is internal decision routing.

The YAML configuration consolidates every tunable parameter — pressure thresholds, eviction weights, budget allocations, compression cascade triggers, health alert thresholds — into a single file. For different task profiles (code review vs research vs customer support), you maintain different config files. The manager loads its config at startup; runtime tuning is possible through the public API (e.g., manager.eviction_engine.set_policy("fifo")).

Every management decision — eviction, compression, budget violation, cross-window save — is recorded in the audit_log. This forms an immutable evidence chain for debugging and compliance. For the audit infrastructure that stores these events durably, see Agent Audit Log Design.

FAQ

How does this article relate to agent-memory-design?

Agent Memory System Design is the warehouse architect — it defines the L0-L3 four-layer memory architecture, what each layer stores, how retrieval boundaries work, and how memory is scoped and hygienic. This article is the warehouse operator — it manages L0 (the context window): how to compress it, evict from it, budget its tokens, and resume work across window boundaries. agent-memory-design says "L0 should contain X, Y, Z." This article says "and when L0 is full, here's how to make room." Read memory-design first for the architecture; then read this for the operational playbook.

When should I compress vs evict vs delegate to a sub-agent?

These three are not alternatives — they form a cascade ordered by cost and information preservation:

  1. Evict first (cheapest, safest): remove tool results older than 8 turns, thinking blocks from completed cycles, stale memory injections. Cost: zero LLM calls. Risk: information is deleted, but it was low-priority by design.
  2. Compress next (medium cost): if eviction isn't enough, compress. Start with tool-result summarization (cheap), escalate to progressive summarization, and only use full conversation compaction at 90%+ utilization. Cost: 1--N LLM calls. Risk: some fidelity loss — monitor with health metrics.
  3. Delegate last (most expensive, most powerful): when a subtask is large, independent, and would consume 50K+ tokens of exploration, spawn a sub-agent with a clean context. The sub-agent returns a 1K--2K summary. Cost: sub-agent's full LLM costs. Risk: the sub-agent's summary may miss nuance — validate with fidelity evaluation.

The CompressionEngine.execute_cascade() method in Section 4 implements this ordering automatically. The key heuristic: if you can evict, don't compress. If you can compress locally, don't delegate.

How do I measure whether compression lost critical information?

Use compression fidelity evaluation — the single most important health metric (Section 7):

  1. Build a test suite: For your agent's task domain, create 10--20 questions that probe critical information — task goals, architectural decisions, unresolved issues, current progress, key learnings. These are your fidelity test cases.
  2. Run before/after queries: Ask each question against the original (pre-compression) context and the compressed context. Use a separate LLM call for each.
  3. Judge consistency: A third LLM call (the "judge") compares the two answers and scores whether they convey the same factual information. Fidelity = consistent answers / total questions.
  4. Set a quality gate: Target fidelity > 0.85. If it drops below 0.75, your compression is too aggressive — add more preservation rules to the compaction prompt.

This evaluation should run in CI as a regression test every time you modify the compaction prompt. A higher compression ratio is only an improvement if fidelity doesn't drop. The ContextHealthMonitor.evaluate_fidelity() method provides the implementation. See Agent Evaluation Framework for the full LLM-as-Judge methodology.

FIFO, LRU, or priority eviction — which one should I use?

The answer depends on your agent's task structure — there is no universal best policy:

If your agent…Start with…Why
Does chat or Q&A (linear conversation)FIFOOld messages naturally lose relevance. Simple, predictable, zero overhead.
Runs exploration-heavy loops (code search, research)Type-based + LRUDead-end explorations accumulate; LRU naturally clears them. Type-based rules handle tool results automatically.
Has heterogeneous content importancePriority-basedDifferent content types have genuinely different importance. Tune weights per domain.
Is mission-critical productionHybrid with conservative weightsCombines all signals. Start with equal weights, run 5--10 tasks, analyze eviction audit logs, then tune.

The pragmatic approach: Start with Hybrid and conservative weights (equal contribution from all signals). Run your agent on real tasks and collect the eviction audit log. After 5--10 runs, analyze which blocks were evicted and whether any evictions caused downstream problems. Then tune the weights. Premature optimization of eviction weights is a common anti-pattern — you're tuning a system you haven't yet observed.

Can cross-window state become inconsistent? How do I verify it?

Yes, it can — and it will, if you don't guard against it. Three failure modes and their mitigations:

  1. Orphan state: The agent writes "plan to do X" to state but is interrupted before executing. The next window reads this as completed work. Mitigation: Schema distinguishes planned, in_progress, and done statuses. On resume, planned items are treated as unexecuted intentions.
  2. Stale checkpoints: Multiple windows write state concurrently; an old checkpoint overwrites newer progress. Mitigation: Monotonically incrementing window_id and state_version. On load, reject any state with version < current version.
  3. Environment drift: The state says "file X exists at path Y" but between windows, someone moved the file. Mitigation: The bootstrap sequence validates environment state — git status, file existence checks, service health probes — before trusting the checkpoint.

Verification: CrossWindowStateManager.validate() performs schema-level checks (missing task goal? inconsistent progress?). Beyond schema, run resume tests: take a checkpoint from a completed task, load it into a fresh window, and verify the agent resumes at the correct subtask without re-deriving previous decisions. This should be part of your CI pipeline.

How large should the token budget be? How do I adjust by task type?

Start with these defaults for a 128K-token window and tune from observed behavior:

ComponentDefaultCode ReviewResearchCustomer Support
System Prompt6% (7.7K)6%5%8% (more instructions)
Tool Definitions4% (5.1K)5% (more tools)5%3% (fewer tools)
Message History65% (83.2K)60%55%70% (conversation-heavy)
Tool Results12% (15.4K)18% (files, diffs)15%8%
Memory Injection8% (10.2K)6%15% (research context)6%
Output Reserve5% (6.4K)5%5%5%

Core principles for tuning:

  • Output reserve never below 4K tokens. If the LLM doesn't have room to generate a complete response, the agent loop breaks. This is a hard floor, not a guideline.
  • Tool-heavy tasks need more tool-result budget. Code review agents reading files, research agents querying APIs — their tool budgets should be 15--20%. If tool results are constantly being evicted mid-task, the budget is too small.
  • Conversation-heavy tasks need more message history. Customer support agents that need full conversation context for empathy and accuracy — allocate 70--75% to message history.
  • Don't tune until you have data. Run your agent with defaults for 10+ real tasks. Review the TokenBudgetManager.report() output. Identify which component is consistently hitting its soft limit. That's the component that needs a larger allocation.

Next Steps

This article is the L0 operational manual for the agent context window — part of the Agent Memory and Context Engineering series. Here's where to go next, ordered by dependency:

End of Article — 8 sections, 6 FAQ items, 8 next-step links