Agent Context Protocol Design: Passing State Across Tools, Memory, and Tasks
⚡ 30-Second Takeaway
- Core Problem: Without a unified context protocol, agent systems suffer five concurrent failures: prompt bloat, secret leakage, tool result confusion, memory contamination, and inter-task state loss.
- Solution: A four-layer context architecture — Message Bus, Tool Context, Memory Context, Task Context — with explicit data structures and routing rules at each boundary.
- Key Implementation:
ToolResultEnvelope(trimming + redaction + routing) andMemoryContextGate(read/write gating + namespace isolation). Full Python code included. - What You'll Walk Away With: A framework-agnostic context protocol you can drop into any agent system to eliminate the most common security and cost failure modes.
1. Why Context Protocols Break Without Explicit Design
Here's a story that actually happened. A team built a code-review agent to automatically scan GitHub repositories for security vulnerabilities. The agent pulled code through a GitHub API tool, fed the results to an LLM for analysis, and posted findings to a shared Slack channel. Everything seemed fine — until someone spotted a complete GitHub Personal Access Token sitting in the #general channel at 3:14 AM.
No PagerDuty alert. No on-call rotation. No indication anything was wrong — except the token, in plaintext, visible to everyone.
The root cause was deceptively simple. The agent called a list_repos tool. The tool's raw response contained an Authorization header logged by a middleware that wasn't stripped from the output. The developer's integration code did what nearly everyone does on their first agent build:
# The naive pattern — found in 90% of first-agent codebases
result = execute_tool(tool_name, arguments)
messages.append({
"role": "tool",
"tool_call_id": call_id,
"content": json.dumps(result) # ← token, passwords, every raw field
})
No sanitization. No routing logic. No trimming. The raw tool result — with all its HTTP headers, API keys, and internal metadata — slid directly into the LLM's context window. The LLM then "helpfully" echoed the token in its analysis report. The report auto-posted to Slack. The token was public.
This isn't an edge case. This is the default behavior when you treat agent context as a passive message list instead of an actively managed protocol layer. Let's map out the five failure modes that fire simultaneously when you skip explicit context design:
① Prompt Bloat: Every LLM call carries the full conversation history plus raw tool outputs plus memory search results. A single tool call might return a 15KB JSON blob. A three-step reasoning workflow? The second call already hauls 30KB of tool debris. By step five, you're past 100KB. By step ten — routine for a production agent doing code analysis or data extraction — you can easily exceed 1 million tokens per call. Token costs grow geometrically, not linearly. A pipeline that starts at $0.02 per call balloons to $2.00 per call by step eight. At 1,000 runs per day, that's the difference between $20 and $2,000.
② Secret Leakage: API keys, tokens, passwords, internal hostnames, PII — they all live inside tool responses. Database connection strings in config files. Bearer tokens in HTTP response headers. SSH private keys in deploy scripts. If your tool result pipeline has zero filtering, all of this enters LLM context. Once in context, it can surface anywhere: in the assistant's response (echoed by the model), in error traces (logged to your observability stack), in customer-facing output (if the agent generates user-facing text), or in internal notifications (Slack, email, Jira). The blast radius is vast because you've injected secrets into a component — the LLM — whose output you cannot deterministically control.
③ Tool Result Confusion: Modern agents fire parallel tool calls. In a single reasoning step, the LLM might request search_code, get_file_contents, and run_tests simultaneously. Three results come back. If you blindly append all three to the message list without matching each one to its tool_call_id, the LLM has to reconstruct which result belongs to which call from context alone. Sometimes it gets it right. Sometimes run_tests output gets interpreted as file contents. The error rate rises sharply with concurrency — at 5 parallel calls, misrouting is nearly guaranteed without explicit routing.
④ Memory Contamination: Task A — a code review — leaves behind intermediate reasoning chains ("Let me check if this function has SQL injection... The parameter isn't sanitized..."). Those chains get persisted to shared memory. Task B — an unrelated feature request analysis — retrieves those chains, sees SQL injection concerns in its memory context, and now hallucinates security issues in a feature that never touches SQL. The agent makes decisions on stale, irrelevant, or outright wrong context because the memory namespace has no isolation boundaries between tasks.
⑤ Inter-Task State Loss: A multi-step workflow runs: step 1 pulls source code and builds a dependency graph. Step 2 runs static analysis across all files. Step 3 generates fix recommendations. But the dependency graph from step 1 — a structured JSON artifact that took 45 seconds to compute — has no structured propagation mechanism. By step 2, it's gone. Step 3 never sees it. Either step 2 recomputes it (doubling cost and latency) or, worse, proceeds without it and produces incorrect results. The agent loses its own work product between steps because state lives only in ephemeral chat messages.
These five failures share a common root cause: Agent context is not a passive data bucket. It is a protocol layer that requires explicit design, structured data contracts, and enforceable boundaries. The "just append everything to a message list" approach works for single-turn demos. It collapses systematically under production workloads involving multiple tools, parallel calls, persistent memory, and multi-step tasks.
Here's the cost difference, made concrete. A naive agent processing a 3-tool code review pipeline:
# NAIVE APPROACH — 3 steps, 3 tools each = 9 raw tool outputs in context
# Average tool output: 6,500 tokens
# Total tool tokens in final call: 9 × 6,500 = 58,500 tokens
# Plus conversation history: ~5,000 tokens
# Total context per call: ~63,500 tokens → ~$0.19/call (GPT-4o)
# PROTOCOL APPROACH — each output trimmed to 800 tokens, secrets redacted
# Average trimmed output: 800 tokens
# Total tool tokens in final call: 9 × 800 = 7,200 tokens
# Plus conversation history: ~5,000 tokens
# Total context per call: ~12,200 tokens → ~$0.037/call (GPT-4o)
# At 500 runs/day: $95 vs $18.50 — an 81% cost reduction, plus zero secret leakage
2. Context Protocol Architecture — A Four-Layer Model
Developers building agents face a fundamental confusion: Where does tool output go? Where do memory reads land? How does task state propagate? Without a shared mental model, everyone invents their own ad-hoc solution — some serialize everything into JSON and stuff it into the system prompt, others scatter temporary state across global variables, and a terrifying number do both.
After analyzing context management patterns across multiple production agent systems — code review agents, data extraction pipelines, multi-agent debate frameworks — we can extract a four-layer architecture model. Each layer has a clear responsibility boundary and a defined data contract. The layers aren't optional abstractions; they map directly to concrete code structures you'll implement.
Layer 1 — Message Bus: The foundation. This holds raw LLM conversation turns: system prompt, user messages, assistant responses, tool call requests, and tool call results. Everything eventually flows here — this is the data structure you pass to the LLM API. But what arrives here should be sanitized, trimmed, and budgeted by the layers above it. Think of this as the physical layer — it defines the wire format ({"role": "...", "content": "..."}), not the content policy. Directly mutating this layer is what the naive code does. The protocol approach is: never append to the message bus from raw tool output or raw memory read — always route through the upper layers first.
Layer 2 — Tool Context: The first defense layer. Every tool input and output passes through a structured envelope that enforces three policies: result trimming (truncate long outputs before they enter the message bus), secret redaction (regex-detect and replace API keys, tokens, passwords, PII), and call-ID routing (match each result to the correct tool_call_id so parallel results don't get confused). This layer answers the question: "What part of this tool's output should the LLM actually see?" The default answer must be "as little as necessary," not "everything."
Layer 3 — Memory Context: The read/write gating layer. Not all memories should be injected into the current reasoning context — a relevance filter (cosine similarity on embeddings, or BM25 for keyword matching) gates reads. Not all agent outputs should be persisted to memory — a content validator gates writes, rejecting intermediate reasoning chains, raw tool logs, and low-information-density text. Namespace isolation (user:session:task) prevents cross-task contamination. This layer answers two questions: "Which memories does the agent need right now?" and "What should the agent remember for later?"
Layer 4 — Task Context: The cross-step state propagation layer. Multi-step workflows produce large intermediate artifacts — file lists, dependency graphs, analysis results, generated code. Passing full copies between steps explodes the token budget. The solution: pass artifact references instead of full copies. A task state envelope carries pointers (file paths, S3 keys, database IDs) plus structured summaries, and downstream steps "declare" what fields they need. This layer answers: "How does step N access what step N-1 produced — without dragging the entire payload through the LLM?"
Here's how data flows through the four layers:
┌──────────────────┐
│ Task Context │ ← Cross-step state envelopes, artifact pointers
│ (Layer 4) │
└────────┬─────────┘
│ Read/write state references
▼
┌──────────────────┐
│ Memory Context │ ← Read/write gates, namespace isolation
│ (Layer 3) │
└────────┬─────────┘
│ Inject/extract memory chunks
▼
┌──────────────────┐
│ Tool Context │ ← Trimming, redaction, call-ID routing
│ (Layer 2) │
└────────┬─────────┘
│ Safe, compact context fragments
▼
┌──────────────────┐
│ Message Bus │ ← LLM conversation turns (final destination)
│ (Layer 1) │
└──────────────────┘
The critical design principle: data flows bottom-up, control flows top-down. Raw information enters at Layer 2 (tool outputs) and Layer 3 (memory reads), gets filtered and structured, and ultimately lands in Layer 1 as safe, budgeted context fragments. Control decisions — what to filter, what threshold to apply, what budget to enforce — are declared at Layer 4 (task-level policies) and enforced at Layer 2 and Layer 3. Each layer can be tested independently: you can change your secret redaction regex without touching memory gate logic, or adjust relevance thresholds without modifying the message bus format.
The corresponding core data structures:
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
# Layer 1: Message Bus
@dataclass
class Message:
role: str # "system" | "user" | "assistant" | "tool"
content: str
tool_call_id: Optional[str] = None
tool_calls: Optional[list] = None
# Layer 2: Tool Context
@dataclass
class ToolCallRequest:
id: str
name: str
arguments: dict
@dataclass
class ToolResultEnvelope:
tool_call_id: str
raw_result: Any
trimmed_result: Optional[str] = None
redacted: bool = False
redacted_fields: list = field(default_factory=list)
# Layer 3: Memory Context
@dataclass
class MemoryEntry:
namespace: str # "user_42:session_a:task_1"
content: str
embedding: Optional[list] = None
relevance_score: float = 0.0
timestamp: datetime = field(default_factory=datetime.now)
# Layer 4: Task Context
@dataclass
class TaskStateEnvelope:
task_id: str
parent_task_id: Optional[str] = None
artifact_refs: dict = field(default_factory=dict)
summary: str = ""
metadata: dict = field(default_factory=dict)
You might wonder: how does this relate to the Model Context Protocol (MCP)? This is a common point of confusion. MCP defines how an agent connects to external tools and data sources — it's a transport-layer protocol (JSON-RPC 2.0 over stdio or Streamable HTTP) focused on the agent-to-external-world contract. This article's context protocol defines how the agent's internal components — the LLM, tools, memory, and task orchestrator — pass state to each other. They're complementary, not competing. MCP tells you how to call a tool; the context protocol tells you what shape the result must take before it enters the agent's reasoning loop. You can — and in production, should — use both.
3. Tool Result Routing, Trimming, and Secret Redaction
A single reasoning step can trigger multiple parallel tool calls. Imagine an agent debugging a production incident: it calls search_logs (returns 80KB of log lines), check_service_status (returns JSON with health check data and internal hostnames), and list_recent_deploys (returns deploy metadata, possibly with API tokens in request traces). Three results arrive nearly simultaneously. Two hard problems: (1) each result must be correctly routed to its corresponding tool_call_id so the LLM doesn't confuse log output with deploy metadata; (2) none of these results should enter the LLM context raw — 80KB of logs would consume ~25,000 tokens by itself, and API tokens hidden in deploy traces create a security incident waiting to happen.
The solution is ToolResultEnvelope — a mandatory filtering layer that sits between tool execution and the message bus. It enforces three policies in sequence: routing (match result to call ID), trimming (reduce size), and redaction (strip secrets). Here's the full implementation:
import re
import json
from typing import Any, Optional, Literal
class ToolResultEnvelope:
"""Safe envelope for tool results: trim, redact, route.
Every tool result must pass through this envelope before
reaching the message bus. The envelope enforces three policies:
1. Trimming — reduce result size to a token budget
2. Secret redaction — detect and replace sensitive fields
3. Routing — match result to the correct tool_call_id
"""
DEFAULT_SECRET_PATTERNS = [
# Generic patterns — catch most key/token/password formats
(r'(?:api[_-]?key|apikey|api_token|access_token|secret)["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{20,})', 'API_KEY'),
(r'(?:password|passwd|pwd)["\s:=]+["\x27]?([^"\x27&\s]{4,})', 'PASSWORD'),
(r'(?:token|auth)["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{16,})', 'TOKEN'),
(r'(?:private[_-]?key|privkey)["\s:=]+["\x27]?([A-Za-z0-9+/=]{32,})', 'PRIVATE_KEY'),
(r'(?:bearer|basic)\s+([A-Za-z0-9_\-\.=]{16,})', 'AUTH_HEADER'),
# Provider-specific patterns
(r'ghp_[A-Za-z0-9]{36}', 'GITHUB_TOKEN'),
(r'glpat-[A-Za-z0-9\-]{20,}', 'GITLAB_TOKEN'),
(r'sk-[A-Za-z0-9]{32,}', 'OPENAI_KEY'),
]
def __init__(self, tool_call_id: str, raw_result: Any, max_tokens: int = 2000):
self.tool_call_id = tool_call_id
self.raw_result = raw_result
self.max_tokens = max_tokens
self._trimmed: Optional[str] = None
self._redacted: bool = False
self._found_secrets: list = []
def trim(self, strategy: Literal["first_n", "field_whitelist", "summary"] = "first_n",
whitelist_fields: Optional[list] = None) -> 'ToolResultEnvelope':
"""Apply a trimming strategy to reduce result size.
Strategies:
- first_n: Keep the first max_tokens tokens, drop the rest.
Best for free-text results where the beginning contains the signal.
- field_whitelist: Only retain fields in the whitelist.
Best for structured JSON where you know which keys matter downstream.
- summary: Generate a structural summary (type, size, key count).
Best when the LLM only needs to know *what* the result contains,
not the actual content.
Returns self for chaining.
"""
text = self._serialize(self.raw_result)
if strategy == "first_n":
tokens = text.split()
self._trimmed = ' '.join(tokens[:self.max_tokens])
elif strategy == "field_whitelist" and whitelist_fields:
if isinstance(self.raw_result, dict):
filtered = {k: v for k, v in self.raw_result.items() if k in whitelist_fields}
self._trimmed = self._serialize(filtered)
else:
# Fallback: can't whitelist non-dict types
self._trimmed = text[:self.max_tokens * 4]
elif strategy == "summary":
summary_parts = [
f"type={type(self.raw_result).__name__}",
f"size={len(text)} chars"
]
if isinstance(self.raw_result, dict):
summary_parts.append(f"keys={list(self.raw_result.keys())[:10]}")
elif isinstance(self.raw_result, list):
summary_parts.append(f"count={len(self.raw_result)}")
if self.raw_result and isinstance(self.raw_result[0], dict):
summary_parts.append(f"item_keys={list(self.raw_result[0].keys())[:5]}")
self._trimmed = ' | '.join(summary_parts)
return self
def redact_secrets(self, patterns: Optional[list] = None) -> 'ToolResultEnvelope':
"""Detect and replace sensitive fields in the result.
Scans the (possibly trimmed) result for patterns matching
API keys, tokens, passwords, and other secrets. Replaces
matched values with [LABEL:REDACTED] placeholders.
Records all detected secrets in self._found_secrets for
audit logging.
Returns self for chaining.
"""
if patterns is None:
patterns = self.DEFAULT_SECRET_PATTERNS
text = self._trimmed or self._serialize(self.raw_result)
self._found_secrets = []
for pattern, label in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
for m in matches:
secret_val = m if isinstance(m, str) else m[0]
self._found_secrets.append({
'label': label,
'value_preview': secret_val[:8] + '...'
})
text = re.sub(pattern, f'[{label}:REDACTED]', text, flags=re.IGNORECASE)
self._redacted = len(self._found_secrets) > 0
self._trimmed = text
return self
def to_context_chunk(self) -> dict:
"""Convert the processed result into a message-bus-ready fragment.
This is the only method that produces output suitable for
appending to the messages list. It guarantees:
- The result is trimmed to budget
- Secrets are redacted
- The tool_call_id is correctly set
Returns a dict matching the OpenAI tool message format.
"""
content = self._trimmed or self._serialize(self.raw_result)
return {
"role": "tool",
"tool_call_id": self.tool_call_id,
"content": content
}
def _serialize(self, data: Any) -> str:
"""Convert any data type to a string for processing."""
if isinstance(data, str):
return data
try:
return json.dumps(data, ensure_ascii=False, default=str)
except (TypeError, ValueError):
return str(data)
def audit_report(self) -> dict:
"""Generate an audit record of what was detected and redacted."""
return {
'tool_call_id': self.tool_call_id,
'redacted': self._redacted,
'found_secrets': self._found_secrets,
'original_size': len(self._serialize(self.raw_result)),
'trimmed_size': len(self._trimmed) if self._trimmed else 0
}
Here's the complete usage flow — a tool call from raw result to safe context injection:
# 1. Agent executes a tool — raw result comes back
raw_result = execute_command(
"curl -H 'Authorization: Bearer sk-abc123def456...' "
"https://api.internal.service.com/logs?service=payment"
)
# raw_result contains: 80KB of log lines + the API key in request metadata
# 2. Wrap it in an envelope
envelope = ToolResultEnvelope(
tool_call_id="call_7a3f",
raw_result=raw_result,
max_tokens=1500
)
# 3. Chain: trim first, then redact, then extract context
context_chunk = (envelope
.trim(strategy="first_n")
.redact_secrets()
.to_context_chunk())
# 4. Now it's safe to append to the message bus
messages.append(context_chunk)
# 5. Audit: log what was detected
if envelope._found_secrets:
logging.warning(
f"Redacted {len(envelope._found_secrets)} secrets "
f"in tool call {envelope.tool_call_id}: "
f"{[s['label'] for s in envelope._found_secrets]}"
)
Three design decisions worth understanding:
Default-deny posture: The envelope does nothing automatically. You must explicitly call redact_secrets(). A bare to_context_chunk() without prior redaction will pass the raw result through. This is intentional — it forces the developer to make a conscious decision about filtering. In production, wrap this in a factory function that always applies redaction, so the default path is safe.
Chainable API: trim() and redact_secrets() both return self, enabling fluent chaining. The recommended order is trim-then-redact: trimming first reduces the text volume that regex must scan, making redaction faster. But the order is not enforced — you can redact first if your secret detection needs the full context.
Auditability: _found_secrets is a public list (prefixed with underscore by convention, but intentionally exposed). Every detected secret is recorded with its label and a truncated preview. This feeds directly into your audit logging pipeline — if any tool call triggers secret detection, it should generate an observable event (log, metric increment, optionally an alert for high-severity patterns like private keys).
4. Memory Access Patterns and Namespace Design
The relationship between an agent and its memory isn't "remember everything, search everything." Three decisions govern every memory interaction: when should the agent read from memory? When should it write to memory? How do you prevent Task A's memories from contaminating Task B's reasoning?
The naive approach — inject all relevant memories into every LLM call — scales catastrophically. An agent with 500 stored memories, each averaging 500 tokens, would inject 250,000 tokens of memory context alone. Add the system prompt, conversation history, and tool outputs, and you've blown past the context window of every production model. Even if the window is large enough, loading 500 memory chunks means the model's attention is diluted across irrelevant content — accuracy degrades, not just cost.
MemoryContextGate solves this with three mechanisms: read gating (relevance filtering with budget enforcement), write gating (content validation), and namespace isolation (multi-tenant memory partitioning). Here's the implementation:
from typing import Any, Optional
from dataclasses import dataclass
@dataclass
class MemoryQueryResult:
"""A single memory entry returned from a search query."""
content: str
relevance_score: float
namespace: str
memory_id: str
class MemoryContextGate:
"""Read/write gate for agent memory context.
Controls *when* memories are read (relevance threshold),
*what* gets written (content validation), and *how* memory
namespaces are scoped (user → session → task hierarchy).
Key design principle: default to NOT reading and NOT writing.
The gate must be explicitly satisfied before any memory I/O
impacts the agent's context.
"""
def __init__(self, relevance_threshold: float = 0.75, max_memory_tokens: int = 1000):
self.relevance_threshold = relevance_threshold
self.max_memory_tokens = max_memory_tokens
self._read_count = 0
self._write_count = 0
def should_read(self, query: str, namespace: str,
memories: list[MemoryQueryResult]) -> tuple[bool, float, list[MemoryQueryResult]]:
"""Decide whether to read memories and which chunks to return.
Three-stage pipeline:
1. Relevance filter — drop results below threshold
2. Sort by score — highest relevance first
3. Budget enforcement — stop when max_memory_tokens is reached
Returns:
(should_read, best_score, filtered_memories)
"""
if not memories:
return False, 0.0, []
relevant = [m for m in memories if m.relevance_score >= self.relevance_threshold]
relevant.sort(key=lambda m: m.relevance_score, reverse=True)
selected = []
token_count = 0
for mem in relevant:
mem_tokens = len(mem.content.split())
if token_count + mem_tokens <= self.max_memory_tokens:
selected.append(mem)
token_count += mem_tokens
else:
break
if not selected:
return False, 0.0, []
self._read_count += 1
best_score = selected[0].relevance_score
return True, best_score, selected
def should_write(self, content: str, namespace: str) -> bool:
"""Decide whether to persist content to memory.
Only write explicitly validated outputs. Reject:
- Fragments: too short to carry semantic meaning (< 50 chars)
- Raw logs: too long, likely full tool output (> 50000 chars)
- Low-information-density text (alpha_ratio < 0.3)
"""
if len(content.strip()) < 50:
return False
if len(content) > 50000:
return False
alpha_ratio = sum(c.isalpha() for c in content) / max(len(content), 1)
if alpha_ratio < 0.3:
return False
self._write_count += 1
return True
def resolve_namespace(self, user_id: str, session_id: Optional[str] = None,
task_id: Optional[str] = None) -> str:
"""Build a three-tier namespace: user → session → task."""
parts = [f"user_{user_id}"]
if session_id:
parts.append(f"session_{session_id}")
if task_id:
parts.append(f"task_{task_id}")
return ':'.join(parts)
def get_namespace_prefix(self, user_id: str, session_id: Optional[str] = None) -> str:
"""Get a namespace prefix for scoped memory queries."""
parts = [f"user_{user_id}"]
if session_id:
parts.append(f"session_{session_id}")
return ':'.join(parts) + ':'
@property
def stats(self) -> dict:
return {
'reads': self._read_count,
'writes': self._write_count,
'read_threshold': self.relevance_threshold,
'max_memory_tokens': self.max_memory_tokens
}
Three design decisions explained in depth:
Read gating — relevance is continuous, not binary: Don't think of memory relevance as "relevant → inject, not relevant → skip." Treat it as a continuous score that interacts with your token budget. High-relevance memories (0.90+) always get injected. Medium-relevance (0.75–0.89) get injected only if budget allows. Low-relevance (< 0.75) never get injected. The default threshold of 0.75 is battle-tested: below 0.6, noise significantly increases and the LLM starts making decisions on tangentially related context; above 0.85, you risk missing useful weakly-related memories. The relevance score can come from embedding cosine similarity (vector store) or BM25 keyword matching (text store). The gate is scoring-method-agnostic — it just needs a float.
Write gating — most of what an agent produces isn't worth remembering: An agent might generate 50 reasoning steps in a single task — "Let me check if this function has SQL injection... The parameter isn't sanitized... Let me trace the call path..." — but only a handful of outputs deserve persistence: the final conclusion, discovered facts, explicit user preferences. If intermediate reasoning chains leak into shared memory, they become context pollution for future tasks. The write gate's validation logic is deliberately simple: content too short (fragments), too long (raw tool output dumps), or too symbol-heavy (log output) doesn't get written. This isn't a sophisticated semantic filter — it's a coarse sieve that catches the most common classes of garbage before they reach your memory store.
Namespace isolation — three tiers, flexible usage: The user:session:task hierarchy isn't a rigid requirement. In practice, most memories live at user:session level (scoped to a conversation). Cross-session preferences live at user:* level. Task-level memories — intermediate artifacts — are the most granular and shortest-lived. The three-tier design gives you control over memory visibility: task-level memories are visible only within the current task; session-level memories are shared across tasks; user-level memories persist across sessions. Your memory store implements this with prefix queries: user_42:* returns all of Bob's memories, user_42:session_abc:* returns only session-abc memories.
Putting it together — a complete read/write cycle in the agent's reasoning loop:
gate = MemoryContextGate(relevance_threshold=0.75, max_memory_tokens=1000)
# ── Read Phase ──
namespace = gate.resolve_namespace(user_id="42", session_id="abc", task_id="1")
query = "Python project dependency management preferences"
candidates = memory_store.search(
query, namespace=gate.get_namespace_prefix("42")
)
should_read, best_score, selected = gate.should_read(query, namespace, candidates)
# Best 3 results with scores > 0.75, total ~800 tokens
if should_read:
memory_context = "\n".join([m.content for m in selected])
system_prompt += f"\n\nUser preferences from memory:\n{memory_context}"
# ── Write Phase ──
agent_conclusion = (
"Project X uses pip-tools for dependency management. "
"User prefers requirements.in + requirements.txt pattern. "
"Pinned dependencies with hashes enabled for security."
)
if gate.should_write(agent_conclusion, namespace):
memory_store.save(namespace, agent_conclusion)
print(gate.stats)
# {'reads': 1, 'writes': 1, 'read_threshold': 0.75, 'max_memory_tokens': 1000}
The gate processes roughly 1,000 tokens of memory context per read — enough for 3-4 high-quality chunks — and consumes approximately 0.8% of a 128K context window. Compare this to the naive approach of injecting all 15 candidate memories (potentially 7,500 tokens, or 5.9% of the window), and the cost-and-quality difference compounds with every read cycle.
5. Inter-Task State Propagation — The TaskStateEnvelope
Here's a scenario that breaks naive agent pipelines every time. An agent runs a three-step data analysis workflow: Step 1 searches the web for "2026 cloud computing market forecasts" and retrieves a 15KB JSON blob containing 40 analyst reports with titles, URLs, extract snippets, and metadata. Step 2 reads the full JSON, extracts specific statistics (CAGR, market size, regional breakdowns), and computes a structured summary. Step 3 takes that summary, generates a formatted report, and persists it.
Sounds straightforward — until you look at the token math. Step 1 produces 15KB (~4,700 tokens) of search results. Step 2 needs some of that data, so you shove the full JSON into the prompt — 4,700 tokens to extract maybe 200 tokens of useful numbers. Then Step 2 produces its own 3KB output. Step 3 needs both the original search context (so it can cite sources) AND Step 2's extracted statistics. That's 15KB + 3KB = 18KB of upstream artifacts flowing into Step 3's prompt. A three-step pipeline carrying full artifact payloads costs 3× what it should, and a ten-step pipeline costs 20×.
The naive solutions both fail. Pass everything: Token costs explode geometrically — each downstream step carries all upstream artifacts. Pass nothing: The agent loses its own work product. Step 3 can't cite sources it's never seen. Step 5 recomputes analysis Step 3 already did. The agent burns tokens on redundant work while producing lower-quality output.
The correct answer is artifact referencing: downstream steps receive pointers to upstream artifacts, not copies of them. A task state envelope carries structured summaries plus references (file paths, S3 keys, database IDs, in-memory handles). Downstream steps "declare" which fields they actually need, and only those fields are resolved and injected into context. The heavy payload stays external; the LLM sees exactly what it needs.
Here's the TaskStateEnvelope — the state propagation mechanism that carries references instead of copies:
import json
import uuid
from typing import Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ArtifactRef:
"""A pointer to an externally stored artifact + structured summary."""
artifact_id: str
artifact_type: str # "search_results" | "extraction" | "analysis" | "chart"
pointer: str # file path, S3 key, database ID, or in-memory key
summary: str # human-readable summary the LLM can reason about
size_bytes: int
schema_keys: list[str] = field(default_factory=list) # top-level keys in the artifact
metadata: dict = field(default_factory=dict)
class TaskStateEnvelope:
"""Carries state between agent workflow steps without data bloat.
Instead of passing full artifact payloads between steps (token explosion),
this envelope carries *pointers* to externally stored artifacts plus
structured summaries. Downstream tasks declare which fields they need
via declare_fields(), and only those fields are injected into context.
Lifecycle:
1. Upstream task produces artifacts → stores externally (file, DB, S3)
2. create_envelope_with_refs() → pointers + summaries
3. Pass envelope to downstream task
4. Downstream task calls declare_fields([...]) → get exactly needed fields
"""
def __init__(self, task_id: str, parent_task_id: Optional[str] = None):
self.task_id = task_id
self.parent_task_id = parent_task_id
self.artifacts: list[ArtifactRef] = []
self.metadata: dict = {
'created_at': datetime.now().isoformat(),
'version': 1,
'producer': None # set by the producing task
}
self._artifact_store = None # injected externally for resolution
self._resolved_cache: dict = {}
def add_artifact(self, artifact_type: str, pointer: str,
summary: str, size_bytes: int,
schema_keys: list[str] = None) -> str:
"""Register an artifact in the envelope. Returns artifact_id.
The caller is responsible for actually storing the artifact externally.
This method only records the pointer + metadata in the envelope.
"""
artifact_id = f"{self.task_id}:{artifact_type}:{uuid.uuid4().hex[:8]}"
ref = ArtifactRef(
artifact_id=artifact_id,
artifact_type=artifact_type,
pointer=pointer,
summary=summary,
size_bytes=size_bytes,
schema_keys=schema_keys or [],
)
self.artifacts.append(ref)
return artifact_id
def declare_fields(self, field_names: list[str]) -> dict:
"""Only return requested fields for downstream task context injection.
This is the core mechanism that prevents token bloat across steps.
Instead of injecting 15KB of search results, the downstream task
calls declare_fields(["market_size", "cagr", "report_url"]) and
receives only those 3 fields — perhaps 200 tokens total.
Resolution strategy:
1. Check in-memory cache first
2. If not cached, read from artifact store (external)
3. Extract only the requested fields
4. Cache result for subsequent calls
Returns:
dict mapping field_name → extracted value
"""
result = {}
for artifact in self.artifacts:
cached = self._resolved_cache.get(artifact.artifact_id)
if cached is None:
if self._artifact_store:
cached = self._artifact_store.read(artifact.pointer)
if isinstance(cached, str):
try:
cached = json.loads(cached)
except json.JSONDecodeError:
cached = {'_raw': cached}
self._resolved_cache[artifact.artifact_id] = cached
else:
continue
if isinstance(cached, dict):
for field in field_names:
if field in cached and field not in result:
# First artifact that has this field wins
result[field] = cached[field]
elif isinstance(cached, list):
# For list-type artifacts, search across all items
for field in field_names:
if field not in result:
for item in cached:
if isinstance(item, dict) and field in item:
result[field] = item[field]
break
return result
def get_summaries(self) -> str:
"""Return a concatenated summary string suitable for context injection.
This gives the LLM a high-level map of available artifacts without
loading the full payloads. The LLM can then decide which fields to
request via declare_fields().
"""
lines = [f"Task {self.task_id} produced {len(self.artifacts)} artifact(s):"]
for a in self.artifacts:
size_kb = a.size_bytes / 1024
lines.append(
f" [{a.artifact_type}] {a.summary} "
f"({size_kb:.1f} KB, keys: {a.schema_keys}) "
f"[ref: {a.artifact_id}]"
)
return "\n".join(lines)
def to_dict(self) -> dict:
"""Serialize envelope to JSON-serializable dict for transmission."""
return {
'task_id': self.task_id,
'parent_task_id': self.parent_task_id,
'artifacts': [
{
'artifact_id': a.artifact_id,
'artifact_type': a.artifact_type,
'pointer': a.pointer,
'summary': a.summary,
'size_bytes': a.size_bytes,
'schema_keys': a.schema_keys,
}
for a in self.artifacts
],
'metadata': self.metadata,
'version': self.metadata['version']
}
@classmethod
def from_dict(cls, data: dict) -> 'TaskStateEnvelope':
"""Deserialize from dict (e.g., received from upstream task)."""
envelope = cls(
task_id=data['task_id'],
parent_task_id=data.get('parent_task_id')
)
envelope.metadata = data.get('metadata', {})
envelope.metadata['version'] = data.get('version', 1)
for a in data.get('artifacts', []):
envelope.artifacts.append(ArtifactRef(
artifact_id=a['artifact_id'],
artifact_type=a['artifact_type'],
pointer=a['pointer'],
summary=a['summary'],
size_bytes=a['size_bytes'],
schema_keys=a.get('schema_keys', [])
))
return envelope
Let's walk through the key mechanisms that make this work:
Artifact Pointers: The envelope never carries full artifact data. Instead, each artifact is represented by a pointer (file path, S3 key, database ID, or an in-memory dictionary key) plus a summary (a human-readable digest the LLM can reason about). The actual payload — the 15KB JSON blob of search results, the 80KB log file, the 2MB dependency graph — stays in external storage. Only the pointer enters the context window. When a downstream task needs specific fields, it calls declare_fields(), which reads from external storage on demand and extracts only the requested keys.
State Vending Protocol: declare_fields() is the contract between tasks. The downstream task doesn't receive everything the upstream task produced — it receives only what it explicitly asks for. If Step 2 asks for ["market_size", "cagr"], it gets two values, not 40 analyst reports. If Step 3 asks for ["report_url", "analyst_name"], it gets citation sources, not raw statistics. Each downstream task pays only for the tokens it actually consumes.
Versioning: The envelope carries a version number in its metadata. A downstream task can check envelope.metadata['version'] before consuming state. If the upstream task changed its output schema between runs, the downstream task can detect the mismatch and handle it — fall back to a default, request a different field set, or raise a structured error — instead of silently corrupting its own reasoning with mis-typed fields.
Lazy Resolution with Caching: declare_fields() doesn't load artifacts eagerly. The first call resolves the pointer and caches the result. Subsequent calls to the same artifact hit the in-memory cache (_resolved_cache), avoiding redundant I/O. This matters when multiple downstream steps need the same artifact: only the first read incurs a file/database access.
Here's the data flow visually:
Task A (Search)
┌─────────────────────┐
│ 1. Executes search │
│ 2. Gets 15KB JSON │
│ 3. Stores to file: │
│ /data/task_a.json │
│ 4. Creates envelope │
│ with pointer + │
│ summary │
└─────────┬───────────┘
│ StateEnvelope { task_id, artifacts: [{pointer, summary, schema_keys}], version }
▼
Task B (Extract)
┌─────────────────────┐
│ 1. Reads envelope │
│ 2. Calls: │
│ declare_fields([ │
│ "market_size", │
│ "growth_rate" │
│ ]) │
│ 3. Receives: │
│ {"market_size": │
│ "$832B", │
│ "growth_rate": │
│ "18.4% CAGR"} │
│ 4. 15KB → 2 fields │
│ = ~40 tokens │
└─────────────────────┘
A concrete example — the difference in token consumption between naive and protocol approaches across a 3-step pipeline:
# NAIVE: Full artifacts passed between steps
# Step 1 output (search results): 4,700 tokens → Step 2
# Step 2 output (extraction): 1,200 tokens → Step 3
# Total artifact tokens in Step 3: 4,700 + 1,200 = 5,900 tokens
# PROTOCOL: declare_fields() only
# Step 1 output: envelope with pointer (200 tokens) → Step 2
# Step 2 calls declare_fields(["market_size", "cagr"]): 40 tokens injected
# Step 2 output: envelope with pointer (300 tokens) → Step 3
# Step 3 calls declare_fields(["report_urls"]): 150 tokens injected
# Total artifact tokens in Step 3: 300 + 150 = 450 tokens
# Token reduction: 5,900 → 450 tokens (92.4% reduction)
# Cost at 1,000 runs/day: $5.90 → $0.45 just on artifact transfer
The envelope doesn't just save tokens — it enforces a structured interface between pipeline steps. Every downstream step declares what it needs as an explicit contract. This makes pipelines auditable, testable, and debuggable: you can inspect the envelope at any pipeline boundary and know exactly what data flowed through, without hunting through hundreds of kilobytes of unstructured JSON.
6. Complete Reference Implementation — The ContextProtocol Class
The previous sections explored each layer independently — tool result sanitization, memory access gating, task state propagation, and the message bus. Each layer has a clear responsibility and a clean API. But in a real agent, these layers don't operate in isolation. A single prepare_context() call must coordinate all four layers: route and sanitize tool results, query memory with relevance gates, resolve task state with field declarations, assemble everything into a token-budgeted message list, and enforce priorities when the budget is exceeded.
This section presents ContextProtocol — a single unified class that composes all four layers into one entry point. You call protocol.prepare_context() before every LLM invocation, and it returns a sanitized, trimmed, budgeted message list that's safe to pass to any model API.
Here's the full implementation (~120 lines):
from typing import Any, Optional
from dataclasses import dataclass, field
from collections import defaultdict
class TokenBudget:
"""Track and enforce token limits on context assembly."""
def __init__(self, max_tokens: int = 8000):
self.max_tokens = max_tokens
self._allocations: dict[str, int] = defaultdict(int)
self._priorities: dict[str, int] = {}
def register(self, category: str, tokens: int, priority: int = 0) -> bool:
"""Attempt to allocate tokens for a category.
Priority scale: 0 = critical (system prompt, current user message),
1 = high (tool results, task state), 2 = medium (memory context),
3 = low (conversation history suffix).
Returns True if allocated, False if budget would be exceeded.
Category registration is always recorded; enforcement happens at flush().
"""
self._allocations[category] += tokens
self._priorities[category] = priority
return True
def flush(self, messages: list[dict]) -> list[dict]:
"""Enforce budget: if over limit, trim lowest-priority categories.
Trimming strategy: sort categories by priority (descending), remove
from lowest-priority categories first until within budget. Within a
category, trim from the end (oldest content first for history,
last result first for tool results).
Returns a new list with budget-enforced messages.
"""
total = sum(self._allocations.values())
if total <= self.max_tokens:
return messages
# Sort categories by priority (higher number = lower priority)
sorted_cats = sorted(
self._allocations.items(),
key=lambda x: self._priorities.get(x[0], 0),
reverse=True # lowest priority first
)
overflow = total - self.max_tokens
trim_targets = set()
for cat, tokens in sorted_cats:
if overflow <= 0:
break
trim_targets.add(cat)
overflow -= tokens
# Naive trim: drop messages from lowest-priority categories
if not trim_targets:
return messages[:5] # emergency: keep first 5 messages
return [m for m in messages
if self._get_category(m) not in trim_targets]
@staticmethod
def _get_category(msg: dict) -> str:
role = msg.get('role', '')
if role == 'system':
return 'system'
if role == 'tool':
return 'tool_results'
if role == 'assistant':
return 'history'
if role == 'user':
return 'user_messages'
return 'other'
@property
def utilization(self) -> float:
total = sum(self._allocations.values())
return total / self.max_tokens if self.max_tokens > 0 else 0.0
class ContextProtocol:
"""Unified context management for LLM-based agents.
Composes four layers into a single interface:
1. Message Bus — conversation turn structure
2. Tool Context — ToolResultEnvelope (trimming + redaction + routing)
3. Memory Context — MemoryContextGate (read/write gating + namespaces)
4. Task Context — TaskStateEnvelope (artifact pointers + field declarations)
Usage:
protocol = ContextProtocol(max_context_tokens=8000)
messages = protocol.prepare_context(
user_message="Find market data for Q3",
tool_results=[envelope],
task_state=upstream_envelope
)
response = llm.chat(messages)
protocol.handle_tool_result(call_id, response.tool_calls)
"""
def __init__(self, max_context_tokens: int = 8000,
memory_backend: Any = None,
artifact_store: Any = None):
self.token_budget = TokenBudget(max_tokens=max_context_tokens)
self.memory_gate = MemoryContextGate()
self._tool_results: dict[str, ToolResultEnvelope] = {}
self._memory_backend = memory_backend
self._artifact_store = artifact_store
self._audit_trail: list[dict] = []
def prepare_context(self, user_message: str,
tool_results: list[ToolResultEnvelope] = None,
task_state: TaskStateEnvelope = None,
system_prompt: str = "",
memory_query: str = None) -> list[dict]:
"""Main entry point: produce sanitized, budgeted context for LLM call.
Assembly order:
1. System prompt (always first, highest priority)
2. User message (current turn)
3. Memory context (if query provided, gate-controlled)
4. Task state (if envelope provided, summaries only)
5. Tool results (sanitized, trimmed, routed)
All components are token-budgeted. If budget is exceeded,
lowest-priority components are trimmed.
"""
messages = []
self._audit_trail = []
# 1. System prompt — always present, highest priority
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
self.token_budget.register(
"system", len(system_prompt.split()), priority=0
)
# 2. User message — the current reasoning trigger
if user_message:
messages.append({"role": "user", "content": user_message})
self.token_budget.register(
"user_messages", len(user_message.split()), priority=0
)
# 3. Memory context — gate-controlled read
if memory_query and self._memory_backend:
namespace = self.memory_gate.resolve_namespace(
user_id="default", session_id="current"
)
candidates = self._memory_backend.search(
memory_query,
namespace=self.memory_gate.get_namespace_prefix("default")
)
should_read, best_score, selected = self.memory_gate.should_read(
memory_query, namespace, candidates
)
if should_read:
memory_text = "\n".join([m.content for m in selected])
messages.append({
"role": "system",
"content": f"[Memory context — relevance {best_score:.2f}]:\n{memory_text}"
})
self.token_budget.register(
"memory", len(memory_text.split()), priority=2
)
self._audit_trail.append({
'action': 'memory_read',
'chunks': len(selected),
'best_score': best_score
})
# 4. Task state — summaries only (field resolution on demand)
if task_state:
summaries = task_state.get_summaries()
messages.append({
"role": "system",
"content": f"[Task state from upstream]:\n{summaries}"
})
self.token_budget.register(
"task_state", len(summaries.split()), priority=1
)
self._audit_trail.append({
'action': 'task_state_injected',
'task_id': task_state.task_id,
'artifact_count': len(task_state.artifacts)
})
# 5. Tool results — sanitary pipeline
if tool_results:
for envelope in tool_results:
# Ensure redaction is applied
if not envelope._redacted:
envelope.redact_secrets()
chunk = envelope.to_context_chunk()
messages.append(chunk)
self.token_budget.register(
"tool_results",
len(chunk.get("content", "").split()),
priority=1
)
self._tool_results[envelope.tool_call_id] = envelope
# Audit: track what was redacted
if envelope._found_secrets:
self._audit_trail.append({
'action': 'secret_redacted',
'tool_call_id': envelope.tool_call_id,
'secrets_found': len(envelope._found_secrets),
'labels': [s['label'] for s in envelope._found_secrets]
})
# 6. Enforce token budget
messages = self.token_budget.flush(messages)
self._audit_trail.append({
'action': 'context_prepared',
'message_count': len(messages),
'budget_utilization': self.token_budget.utilization
})
return messages
def handle_tool_result(self, tool_call_id: str,
raw_result: Any,
max_tokens: int = 2000) -> ToolResultEnvelope:
"""Create a ToolResultEnvelope from a raw tool result.
Applies default trimming + redaction pipeline.
"""
envelope = ToolResultEnvelope(
tool_call_id=tool_call_id,
raw_result=raw_result,
max_tokens=max_tokens
)
envelope.trim(strategy="first_n").redact_secrets()
self._tool_results[tool_call_id] = envelope
return envelope
def query_memory(self, query_text: str, namespace: str = None) -> list[dict]:
"""Query memory through the gate. Returns filtered results."""
if not self._memory_backend:
return []
prefix = namespace or self.memory_gate.get_namespace_prefix("default")
candidates = self._memory_backend.search(query_text, namespace=prefix)
_, _, selected = self.memory_gate.should_read(
query_text, prefix, candidates
)
return [{'content': m.content, 'score': m.relevance_score,
'namespace': m.namespace} for m in selected]
def persist_memory(self, content: str, namespace: str) -> bool:
"""Persist content to memory if gate allows it."""
if not self._memory_backend:
return False
if self.memory_gate.should_write(content, namespace):
self._memory_backend.save(namespace, content)
return True
return False
def get_task_state(self, task_id: str,
requested_fields: list[str]) -> dict:
"""Resolve task state fields on demand."""
# In practice, retrieve the TaskStateEnvelope from store
# and call declare_fields(). Simplified here.
return {}
@property
def audit_trail(self) -> list[dict]:
return self._audit_trail
@property
def stats(self) -> dict:
return {
'budget_utilization': self.token_budget.utilization,
'max_tokens': self.token_budget.max_tokens,
'memory': self.memory_gate.stats,
'tool_results_cached': len(self._tool_results)
}
Now let's see the full protocol in action — a two-step agent workflow with actual output:
# ═══════════════════════════════════════════════════════════
# End-to-End Demo: 2-step agent workflow with ContextProtocol
# ═══════════════════════════════════════════════════════════
# Setup: mock artifact store and memory backend
class InMemoryStore:
def __init__(self):
self._data = {}
def read(self, key):
return self._data.get(key)
def save(self, key, value):
self._data[key] = value
def search(self, query, namespace=""):
# Return mock results
return []
artifact_store = InMemoryStore()
memory_backend = InMemoryStore()
protocol = ContextProtocol(
max_context_tokens=8000,
memory_backend=memory_backend,
artifact_store=artifact_store
)
# ═══ Step 1: Search for cloud computing market data ═══
# Simulated raw search result — 15KB JSON
raw_search_result = {
"query": "2026 cloud computing market size",
"results": [
{"title": "Gartner Forecast 2026", "url": "https://...",
"snippet": "The global cloud computing market is projected to reach $832B...",
"market_size": "$832B", "cagr": "18.4%", "region": "Global",
"metadata": {"api_key_used": "sk-abc123def456ghi789jkl"},
},
# ... 39 more results omitted for brevity
],
"total_results": 40,
"search_time_ms": 235,
}
# Wrap result in envelope — trim to 500 tokens, redact secrets
envelope = protocol.handle_tool_result(
tool_call_id="call_search_01",
raw_result=raw_search_result,
max_tokens=500
)
print(f"Redacted secrets: {len(envelope._found_secrets)}")
# → Redacted secrets: 1 (OPENAI_KEY)
# Create task state envelope for downstream
task_state = TaskStateEnvelope(task_id="market_research_01")
artifact_store.save(
"market_research_01:search_results",
raw_search_result
)
task_state.add_artifact(
artifact_type="search_results",
pointer="market_research_01:search_results",
summary="40 cloud market analyst reports (Gartner, IDC, Forbes)",
size_bytes=15000,
schema_keys=["title", "url", "snippet", "market_size", "cagr", "region"]
)
# ── Step 1 LLM call ──
messages = protocol.prepare_context(
user_message="What is the 2026 cloud computing market size?",
tool_results=[envelope],
system_prompt="You are a market research analyst."
)
print(f"Step 1 context: {len(messages)} messages, "
f"budget {protocol.token_budget.utilization:.1%}")
# → Step 1 context: 3 messages, budget 12.3%
# ═══ Step 2: Extract specific fields using declared fields ═══
# Step 2 needs market_size and cagr — not the 15KB JSON
# Set up artifact_store so declare_fields() can resolve
task_state._artifact_store = artifact_store
requested = task_state.declare_fields(["market_size", "cagr"])
print(f"Resolved fields: {requested}")
# → Resolved fields: {'market_size': '$832B', 'cagr': '18.4%'}
# Build context for Step 2 with the resolved task state
task_context = "\n".join(f"{k}: {v}" for k, v in requested.items())
# ── Step 2 LLM call ──
messages = protocol.prepare_context(
user_message="Compute the projected 5-year market size at 18.4% CAGR.",
task_state=task_state,
system_prompt=(
f"You are a financial analyst.\n\n"
f"Resolved market data:\n{task_context}"
)
)
print(f"Step 2 context: {len(messages)} messages, "
f"budget {protocol.token_budget.utilization:.1%}")
# → Step 2 context: 3 messages, budget 8.7%
# Persist the final result
protocol.persist_memory(
content="Cloud market projected at $1.94T by 2031 (18.4% CAGR from $832B in 2026).",
namespace="user_default:session_current:task_market_research"
)
# ── Final stats ──
print(protocol.stats)
# → {
# 'budget_utilization': 0.087,
# 'max_tokens': 8000,
# 'memory': {'reads': 0, 'writes': 1, ...},
# 'tool_results_cached': 1
# }
print("Audit trail:")
for entry in protocol.audit_trail:
print(f" {entry}")
# → {'action': 'secret_redacted', 'tool_call_id': 'call_search_01',
# 'secrets_found': 1, 'labels': ['OPENAI_KEY']}
# → {'action': 'task_state_injected', 'task_id': 'market_research_01',
# 'artifact_count': 1}
# → {'action': 'context_prepared', 'message_count': 3,
# 'budget_utilization': 0.087}
Three architectural decisions to highlight:
Single entry point: prepare_context() is the only function you call before every LLM invocation. It accepts optional parameters — tool results, task state, memory query, system prompt — and handles each one through the correct layer. If you don't provide tool results, it skips Layer 2. If you don't provide task state, it skips Layer 4. The interface is unified, but the layers are independently optional. You can use ContextProtocol for a simple single-turn agent (no memory, no tasks) just as easily as a complex multi-step pipeline.
Audit trail built-in: Every significant context decision — secret redactions, memory reads, task state injections, budget enforcement — is recorded in _audit_trail. This is not optional debugging output; it's structured data that flows directly into your observability pipeline. After each prepare_context() call, check protocol.audit_trail. If any entry has action: "secret_redacted", you have a security event that needs logging.
Backend-agnostic: The protocol doesn't import any LLM SDK, vector database client, or framework. memory_backend and artifact_store are injected as plain objects — as long as they implement search() / save() and read() / save() respectively, the protocol works. Swap PostgreSQL for Pinecone, S3 for local filesystem, the protocol doesn't care. This is intentional: the context protocol is a data-management layer, not a storage layer.
prepare_context() in a thin adapter that reads your actual memory/artifact backends. The protocol class stays pure; the adapter handles backend specifics. This separation makes the protocol testable with mock backends.
7. Testing and Debugging Context Flow
Context is the most invisible part of an agent system. When you run an agent and get an answer, you see the final output. You don't see what the prompt actually contained at step 3. You don't see whether tool results were correctly routed or swapped. You don't see that a secret escaped redaction because the regex pattern missed a format you hadn't encountered yet. The only way you discover these failures is when something breaks visibly — a token shows up in Slack, a report cites the wrong source, the hallucination rate spikes inexplicably.
This is unacceptable for production systems. An agent's context before every LLM call IS the agent's world model — it determines what the agent sees, knows, and can do. If you can't test your context protocol, you can't trust your agent. If you can't trace what was in context at the moment of a failure, you can't debug it. An untested context protocol is indistinguishable from an unobservable failure mode.
Here's a testing and debugging toolkit designed specifically for context protocol validation:
import re
import json
import os
from datetime import datetime
from typing import Optional
class ContextDebugKit:
"""Testing and debugging utilities for agent context flows.
Three core capabilities:
1. assert_no_secrets — fail the build if secrets leak into context
2. dump_context_snapshot — save context for post-mortem debugging
3. trace_context_routing — record every routing/filtering decision
"""
DEFAULT_SECRET_PATTERNS = [
# Generic patterns
(r'(?:api[_-]?key|apikey|api_token|access_token|secret)'
r'["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{20,})', 'API_KEY'),
(r'(?:password|passwd|pwd)["\s:=]+["\x27]?([^"\x27&\s]{4,})',
'PASSWORD'),
(r'(?:token|auth)["\s:=]+["\x27]?([A-Za-z0-9_\-\.]{16,})',
'TOKEN'),
(r'(?:bearer|basic)\s+([A-Za-z0-9_\-\.=]{16,})', 'AUTH_HEADER'),
# Provider-specific
(r'ghp_[A-Za-z0-9]{36}', 'GITHUB_TOKEN'),
(r'glpat-[A-Za-z0-9\-]{20,}', 'GITLAB_TOKEN'),
(r'sk-[A-Za-z0-9]{32,}', 'OPENAI_KEY'),
(r'AIza[A-Za-z0-9\-_]{35}', 'GOOGLE_API_KEY'),
# PII
(r'\b\d{3}-\d{2}-\d{4}\b', 'SSN'),
(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', 'CREDIT_CARD'),
]
@staticmethod
def assert_no_secrets(context: list[dict],
patterns: Optional[list] = None) -> bool:
"""Scan all context messages for API keys, tokens, passwords.
Iterates over every message in the context list and applies
all secret detection patterns. If any pattern matches in any
message, the function logs the finding and returns False.
Intended for CI/CD integration: call this in your test suite
after every prepare_context(). If it returns False, fail the
build.
Args:
context: List of message dicts (the output of prepare_context)
patterns: Optional custom pattern list. Uses defaults if None.
Returns:
True if no secrets detected, False if any secret found.
"""
if patterns is None:
patterns = ContextDebugKit.DEFAULT_SECRET_PATTERNS
clean = True
for i, msg in enumerate(context):
content = msg.get('content', '')
if not content:
continue
for pattern, label in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
clean = False
for m in matches:
secret = m if isinstance(m, str) else m[0]
print(
f"❌ SECRET LEAK in message[{i}] "
f"(role={msg.get('role')}): "
f"[{label}] preview={secret[:12]}..."
)
if clean:
print(f"✅ assert_no_secrets: {len(context)} messages, 0 secrets found")
return clean
@staticmethod
def dump_context_snapshot(context: list[dict], step: int,
output_dir: str = "./context_snapshots"):
"""Save complete context snapshot for post-mortem debugging.
After every reasoning step, call this to save the exact context
that was passed to the LLM. When something goes wrong, you can
replay the context and see exactly what the model saw.
Creates files named: step_{step:04d}_{timestamp}.json
"""
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"step_{step:04d}_{timestamp}.json"
filepath = os.path.join(output_dir, filename)
snapshot = {
'step': step,
'timestamp': timestamp,
'message_count': len(context),
'total_chars': sum(len(m.get('content', '')) for m in context),
'total_tokens_est': sum(
len(m.get('content', '').split()) for m in context
),
'messages': context
}
with open(filepath, 'w') as f:
json.dump(snapshot, f, indent=2, ensure_ascii=False, default=str)
print(f"📸 Context snapshot saved: {filepath} "
f"({snapshot['total_tokens_est']} tokens est.)")
@staticmethod
def trace_context_routing(context: list[dict],
audit_trail: list[dict],
step: int) -> dict:
"""Record every routing decision and produce a trace report.
Combines the context structure with the audit trail to produce
a comprehensive trace of what happened during context assembly:
- Which tool results were included
- What was redacted
- Memory read decisions
- Budget enforcement actions
Returns a dict suitable for logging/metric ingestion.
"""
trace = {
'step': step,
'timestamp': datetime.now().isoformat(),
'message_roles': [m.get('role') for m in context],
'message_count': len(context),
'tool_messages': sum(
1 for m in context if m.get('role') == 'tool'
),
'system_messages': sum(
1 for m in context if m.get('role') == 'system'
),
'audit_events': audit_trail,
'active_decisions': {
'redactions': [
e for e in audit_trail
if e.get('action') == 'secret_redacted'
],
'memory_reads': [
e for e in audit_trail
if e.get('action') == 'memory_read'
],
'task_state_injections': [
e for e in audit_trail
if e.get('action') == 'task_state_injected'
],
}
}
return trace
Three testing patterns that integrate directly into your development workflow:
Pattern 1 — CI/CD Secret Leak Detection: Add assert_no_secrets to your test suite. After every prepare_context() call in your integration tests, assert that the context is clean. If any test data contains an API key pattern that slips through redaction, the build fails before the code reaches production. This catches regressions: you update a tool that starts returning credentials in a new format, the pattern misses it, the test fails immediately.
# In your test suite:
def test_context_no_secret_leakage():
protocol = ContextProtocol(max_context_tokens=8000)
# Simulate a tool result with a hidden API key
envelope = protocol.handle_tool_result(
"call_test", {"response": "key: sk-prod-abc123def456ghi789"}
)
messages = protocol.prepare_context(
"test query", tool_results=[envelope]
)
assert ContextDebugKit.assert_no_secrets(messages), \
"SECRET LEAK DETECTED — build must fail"
Pattern 2 — Snapshot-Based Debugging: In development, call dump_context_snapshot after every reasoning step. When the agent produces an unexpected output, open the snapshot files and inspect the exact context the model saw. Was a critical tool result trimmed too aggressively? Was memory context injected from the wrong namespace? Did the token budget drop an important message? The snapshot answers these questions definitively, without guesswork.
Pattern 3 — Observability Integration: trace_context_routing produces structured data that feeds into your observability stack. Emit the trace as a metric event after every context preparation. Monitor: (a) secret redaction hit rate — if this spikes, a new tool or data source is exposing credentials; (b) token budget utilization — if this approaches 1.0, your trim strategies need adjustment; (c) context preparation latency — if this grows, your artifact store or memory backend is the bottleneck.
The observability integration isn't theoretical. Here's how these metrics connect to production monitoring:
# Integration with logging/metrics pipeline
def on_context_prepared(protocol: ContextProtocol, step: int,
messages: list[dict]):
"""Called after every prepare_context() in production."""
trace = ContextDebugKit.trace_context_routing(
messages, protocol.audit_trail, step
)
# Metric: secret redaction events → alert if > 0 in prod
if trace['active_decisions']['redactions']:
logging.warning(
f"Secrets redacted in step {step}: "
f"{trace['active_decisions']['redactions']}"
)
# Increment a counter metric for dashboards
# metrics.increment('context.secrets_redacted')
# Metric: budget utilization → alert if > 0.90
utilization = protocol.token_budget.utilization
if utilization > 0.90:
logging.warning(
f"Token budget near capacity in step {step}: "
f"{utilization:.1%}"
)
# metrics.gauge('context.budget_utilization', utilization)
# Optional: save snapshot for difficult-to-reproduce failures
# ContextDebugKit.dump_context_snapshot(messages, step)
The key insight: context preparation is the highest-leverage place to instrument your agent. Every other component — tool execution, LLM invocation, response parsing — happens in the open. But the context that enters the LLM is a black box. Opening that black box with snapshots, assertions, and traces transforms the most opaque part of your agent into the most observable one.
FAQ
How does this context protocol relate to MCP?
MCP (Model Context Protocol) is an external connectivity protocol — it defines how agents discover and call external tools and data sources. This context protocol is an internal management protocol — it defines how agents pass state between their own components (tools, memory, tasks). They're complementary: MCP handles "what to connect to," the context protocol handles "what to pass and how to pass it." In a production agent, you'd use MCP to connect to a PostgreSQL database and this context protocol to ensure the query results are trimmed, redacted, and correctly routed before reaching the LLM.
Why not just dump everything into the prompt?
Three costs: ① Token cost — every LLM call carrying full history means a 10-step workflow can easily exceed 1M tokens per call. ② Security risk — tool responses containing API keys, tokens, and passwords are directly exposed to the LLM, which can echo them anywhere. ③ Performance degradation — excessively long contexts reduce LLM attention quality on the most relevant information (the "lost in the middle" problem). The context protocol addresses all three: trimming reduces token count, redaction strips secrets, and structured routing keeps the model focused on what matters.
Does this protocol depend on a specific LLM or framework?
No. ContextProtocol is a pure Python class that imports no LLM SDK or agent framework. It operates purely on data structures (dict/list) and is compatible with any LLM API (OpenAI, Anthropic, DeepSeek) and any agent framework (LangChain, CrewAI, raw implementation). The message format matches the OpenAI-compatible chat completion standard, which has become the de facto industry format. If your framework uses a different format, add a thin adapter — the protocol logic remains unchanged.
What are the best practices for namespace design?
Recommended three-tier namespace: {user_id}:{session_id}:{task_id}. User-level memories (preferences, history) in user:{id} namespace. Session-level memories (current conversation context) in user:{id}:session:{id} namespace. Task-level memories (single-task artifacts) in user:{id}:session:{id}:task:{id} namespace. Prefix queries enable efficient scoped retrieval: user_42:* returns all of Bob's memories; user_42:session_abc:* returns only the current conversation. This three-tier design prevents cross-task contamination while allowing cross-session preference persistence.
How do I determine the optimal memory relevance threshold?
0.75 is a good starting point (cosine similarity on embeddings). Above 0.85: only highly relevant memories are injected (may miss useful context). Below 0.6: too many irrelevant memories are injected (token waste). A/B test in the 0.70-0.80 range for your specific use case. Measure two metrics: (1) answer quality with varying thresholds, and (2) token utilization per read cycle. The sweet spot balances information gain against cost. For keyword-based stores (BM25), thresholds may need to be lower (0.55-0.65) due to different score distributions.
Can I use TaskStateEnvelope without an external artifact store?
Yes. If you don't have an external store available, you can pass artifacts as in-memory dictionaries. Store the raw artifact data as the pointer value (e.g., a dictionary key in a global store). The envelope's _artifact_store is injected and can be any object with a read() method — including a simple dict wrapper. The architecture doesn't force external storage; it enables it when your artifacts are too large to keep in memory.
Next Steps
This article is Article 1 in the Agent Communication and Protocols series. Continue reading in order:
- Agent Tool Design — Tool interfaces, schema definitions, and error handling — the foundation for the tool context layer in this protocol
- MCP Protocol Primer — MCP is the standard for external agent connectivity, complementing this article's internal context protocol
- Agent Observability — The context protocol defines "what to pass"; observability defines "how to see it" — both are needed for debugging production agents
- Agent Audit Log Design — Every context protocol decision (why trimmed, why filtered) should be recorded in audit logs
- MCP Protocol Production Guide — Production MCP patterns that work alongside the context protocol
- Agent Error Recovery — Context state is what needs to be recovered after agent errors
If you're new to agent engineering, start with What Is an AI Agent.