Agent Memory System Design: Short-Term Memory, Long-Term Memory, and Retrieval Boundaries
⚡ 30-Second Takeaway
- Core Problem: "Just add a vector DB" doesn't give your agent memory. Production agents forget mid-task context, accumulate memory pollution across sessions, and mix up multi-user data — you need a systems-design approach.
- The Solution: L0-L3 four-layer memory architecture — L0 working memory (task brain) → L1 session memory → L2 persistent memory → L3 external retrieval. Each layer has defined storage, lifecycle, and retrieval strategy.
- Key Implementation:
MemoryManagerorchestrator + retrieval boundaries (push/pull/hybrid) + memory hygiene (dedup/contradiction/PII scan) + multi-tenant scope isolation. 7 complete Python code examples. - What You'll Walk Away With: A production-grade memory system design for your agent — what to remember, for how long, how to retrieve it, how to prevent pollution, and how to isolate multi-user data.
1. Why "Just Add a Vector DB" Isn't a Memory System
A customer-service agent goes to production. The team wires up a vector database — after each conversation, user preferences get embedded and stored. On the next query, the agent retrieves relevant preferences, injects them into the prompt. "The agent has memory." Everyone is satisfied.
Three weeks later, a user complaint lands: the agent keeps recommending a product the user explicitly rejected a month ago. Investigation reveals the problem: the user did say "I don't like this brand" a month ago, and the agent faithfully stored it. But a week later, the user also said "not considering this category right now" — two preferences stacked on top of each other. Vector search returned the first one (it was shorter, cosine similarity was higher), so the agent saw "I don't like this brand" instead of "not considering this category." Worse: in week three, the user updated their preference — "X brand's new line looks great" — but this new information was just appended as a fresh record, not replacing the old ones. The agent was now randomly retrieving from three contradictory preferences.
This is the fundamental reason a vector database ≠ a memory system: a vector database is a storage engine. A memory system is a management layer. The former handles "store and search." The latter handles "when to write, what to write, when to update, when to evict, and whether retrieved results are trustworthy."
Three Naive Memory Failure Modes
In production, simply "dumping things into a vector DB" triggers three classes of failure:
- Forgetting: The L0 context window overflows, and the agent loses mid-task state. Imagine an agent executing a 12-step database migration. By step 8, the LLM's context window is saturated with the output from steps 1–7. The critical constraint from step 1 — "do not modify the
userstable" — has been pushed out of the window. The agent drops theuserstable at step 9. This isn't poor memory — it's the absence of structured working memory. Critical constraints should be pinned in independent slots, immune to context-window scrolling. - Pollution: Old, wrong, and contradictory memories never get cleaned. An agent runs continuously for three months, accumulating thousands of "user preferences" — 40% outdated, 15% mutually contradictory, 5% from test users with non-real data. Every retrieval is a gamble: newest or oldest? Most relevant or most similar? More dangerous still: the LLM trusts vector search results by default — it won't question whether a retrieved preference might be stale.
- Conflation: All users' data sits in the same bucket. A SaaS company runs a single agent for multiple customer-service tenants. User 1 says "I love dark mode." User 2 says "I hate dark mode." Both preferences live in the same vector database, ranked by similarity. Which one gets returned depends on which is shorter and more "query-like." This isn't a bug — it's a design defect: no namespace isolation.
What We Need: A Four-Layer Architecture Preview
A production-grade agent memory system requires four layers, each solving a distinct problem:
L0 Working Memory → "Now"
Structured slots inside the context window
Lifecycle: reset each reasoning turn
L1 Session Memory → "This conversation"
Redis / in-memory dictionary
Lifecycle: duration of the session
L2 Persistent Memory → "Until evicted"
SQLite/Postgres + Vector DB
Lifecycle: until explicitly evicted
L3 External Retrieval → "The outside world"
RAG pipeline (docs, APIs, Web)
Lifecycle: stateless fetch
This article decomposes these four layers from an architectural standpoint: what each layer stores, how it stores it, what its lifecycle is, how to retrieve from it, and how to keep it hygienic. This is not a vector-database tutorial — it's an architecture-level solution to the production problem of agent memory degrading, polluting, and conflating over time.
For the storage implementation details (vector DB selection, embedding model comparison), see Agent Memory Systems. For framework-agnostic design principles, see Model-Agnostic Agent Design.
2. Four-Layer Memory Architecture: L0 Working Memory → L3 External Retrieval
Before diving into each layer, let's establish the global view. The core idea of the four-layer architecture is layered governance — not all information lives in the same store or is retrieved the same way. Each layer has an independent storage mechanism, lifecycle, access pattern, and data type.
Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ Agent — MemoryManager │
│ ┌───────────────────────────────────────────────────┐ │
│ │ │ │
│ │ L0 WORKING MEMORY (inside context window) │ │
│ │ task goal · active plan · recent observations │ │
│ │ constraints · scratchpad │ │
│ │ Lifecycle: per-turn · Access: always available │ │
│ │ │ │
│ │ ┌──── write-through ──────────────────────────┐ │ │
│ │ ▼ │ │ │
│ │ L1 SESSION MEMORY (Redis / dict) │ │ │
│ │ conversation turns · tool results · decisions │ │ │
│ │ Lifecycle: session · Access: pull on demand │ │ │
│ │ │ │ │
│ │ ┌──── promotion (important → persist) ───────┐ │ │ │
│ │ ▼ │ │ │ │
│ │ L2 PERSISTENT MEMORY (SQLite+PG / Vector DB) │ │ │ │
│ │ user preferences · learned facts · outcomes │ │ │ │
│ │ Lifecycle: until evicted · Access: hybrid │ │ │ │
│ │ │ │ │ │
│ │ ┌──── just-in-time ──────────────────────────┐ │ │ │ │
│ │ ▼ ▼ │ │ │ │
│ │ L3 EXTERNAL RETRIEVAL (RAG / APIs / Web) │ │ │ │
│ │ documentation · knowledge base · real-time data│ │ │ │
│ │ Lifecycle: stateless · Access: just-in-time │ │ │ │
│ │ │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Layer Definitions
| Layer | Storage | Lifecycle | Access Pattern | Example Data |
|---|---|---|---|---|
| L0 Working Memory | Context window (LLM context) | Reset per turn | Always present (push) | Task goal, current plan step, last 3–5 tool outputs, constraints, scratchpad |
| L1 Session Memory | Redis / in-memory dict | Session duration | Pull when relevant | Full conversation history, tool call results, intermediate decisions |
| L2 Persistent Memory | SQLite / PostgreSQL + Vector DB | Until explicit eviction | Hybrid search (keyword + vector + structured query) | User preferences, learned facts, historical task outcomes, entity knowledge |
| L3 External Retrieval | RAG pipeline (external docs, APIs, Web) | Stateless fetch | Just-in-time retrieval | Product docs, knowledge base articles, real-time API data, web search results |
Key Design Principles
- Write-through: L0 → L1 is automatic. Every significant tool call result is written to L1 at the same time it enters L0. This ensures that even if L0 is pushed out by subsequent content, the information remains recoverable within the session.
- Promotion / Demotion: At session end, the
MemoryManagerevaluates L1 contents. Important memories (explicit user feedback, critical decisions, config changes) get promoted to L2 for persistence. Unimportant ones (intermediate reasoning, transient tool output) are released with the session. - Per-layer TTL: L1 TTL = session length (typically minutes to hours). L2 TTL = configurable (days to permanent), with both soft TTL (mark for review) and hard TTL (auto-delete on expiry).
- Namespace isolation: Every layer's storage is partitioned by
tenant_idoruser_id. L2 user preferences must never be cross-retrieved — this isn't a performance optimization, it's a data-security baseline.
Code: MemoryLayer Enum + MemoryConfig
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class MemoryLayer(Enum):
L0_WORKING = "l0_working"
L1_SESSION = "l1_session"
L2_PERSISTENT = "l2_persistent"
L3_EXTERNAL = "l3_external"
@dataclass
class LayerConfig:
"""Configuration for a single memory layer"""
max_items: int # Maximum number of entries
ttl_seconds: Optional[int] # TTL in seconds, None = no expiry
eviction_policy: str = "lru" # Eviction strategy: lru / fifo / ttl
@dataclass
class MemoryConfig:
"""Agent memory system top-level configuration"""
tenant_id: str # Multi-tenant isolation
user_id: str # User-level isolation
l0: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=5, ttl_seconds=None)) # per-turn reset, no TTL needed
l1: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=100, ttl_seconds=3600)) # 1-hour session
l2: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=10000, ttl_seconds=86400 * 30)) # 30 days
l3: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=0, ttl_seconds=None)) # stateless, no cap
# Retrieval config
similarity_threshold: float = 0.75 # Minimum cosine similarity
hybrid_search_weight: float = 0.5 # 0 = pure keyword, 1 = pure vector
This article's lens is "memory as warehouse" — storage, classification, retrieval, and cleaning. The complementary article Agent Context Protocol Design addresses "how memory flows" — serialization formats, transport protocols, and compaction strategies. The warehouse manages storage; the pipeline manages movement. Both are necessary.
3. Working Memory Design: The Agent's Mental Workbench
L0 working memory is the agent's "mental workbench" — every reasoning turn starts here. It is not simply "stuff the last N messages into the prompt." It is a structured slot system. It needs to be structured because the LLM's attention over a flat message list is unevenly distributed — content closer to the current response position gets higher attention weight; content further away gets "forgotten." If you put the task goal in message #1, after 20 conversation turns, the LLM's effective attention on that goal has decayed to near zero.
Five Structured Slots
L0 working memory is not a flat message list. It is five independent slots, each injected into the prompt at a fixed position:
| Slot | Contents | Update Frequency | Position in Prompt |
|---|---|---|---|
| task_goal | Current task objective (one sentence) | Set at task start; cleared on completion | Top (highest attention weight) |
| active_plan | Current step + next step | Updated after each step completion | After goal |
| recent_observations | Last N tool call results (max 3–5) | Appended after each tool call | After plan |
| constraints | Hard constraints: budget, deadline, forbidden operations | Set at task start; rarely changed | After observations (always retained) |
| scratchpad | Intermediate reasoning, temporary calculations, unverified hypotheses | Read/write at any time | Bottom (closest, most flexible) |
This structure is not arbitrary. It follows a principle: the more important the information, the more fixed its position. The LLM's attention mechanism is most sensitive to repeated patterns at fixed positions — if the goal appears in the same location, wrapped in the same label, every turn, the LLM's attention on that slot remains more stable.
Push vs Pull: When to Fetch from Lower Layers
L0 doesn't operate in isolation — it pulls information from L1/L2 at two trigger points:
- Task start: Pull user preferences, historical task results, and entity knowledge from L2 to populate
task_goalandconstraintsslots. Pull accumulated session context from L1 if it exists. - Tool invocation: Before calling a tool that requires specific knowledge, pull relevant facts from L2. Example: before the agent calls
deploy_to_kubernetes, pull the preference "user previously requested us-east-1 region."
Note that this is different from "retrieve every turn" — per-turn retrieval injects irrelevant memories into the prompt, wasting context budget. L0 retrieval is event-driven: triggered only at state transition points (task start, tool call).
Code: WorkingMemory Class
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
@dataclass
class Observation:
"""A single tool call result"""
tool_name: str
result_summary: str
timestamp: float
importance: str = "normal"
@dataclass
class WorkingMemory:
"""L0 working memory — the agent's mental workbench"""
task_goal: str = ""
active_plan: dict = field(default_factory=dict)
recent_observations: list = field(default_factory=list)
constraints: list = field(default_factory=list)
scratchpad: str = ""
MAX_OBSERVATIONS = 5
MAX_CONSTRAINTS = 8
def update_task(self, goal, plan_steps, constraints=None):
self.task_goal = goal
self.active_plan = {
"current_step": plan_steps[0] if plan_steps else "",
"next_steps": plan_steps[1:3] if len(plan_steps) > 1 else [],
"total_steps": len(plan_steps),
"completed": 0,
}
if constraints:
self.constraints = constraints[:self.MAX_CONSTRAINTS]
def add_observation(self, tool_name, result, importance="normal"):
obs = Observation(
tool_name=tool_name,
result_summary=result[:200],
timestamp=datetime.now().timestamp(),
importance=importance,
)
if importance == "critical":
self.recent_observations.insert(0, obs)
else:
self.recent_observations.append(obs)
self.recent_observations = self.recent_observations[:self.MAX_OBSERVATIONS]
def update_scratchpad(self, note):
ts = datetime.now().strftime("%H:%M")
self.scratchpad += "\n[" + ts + "] " + note
def advance_plan(self):
self.active_plan["completed"] += 1
steps = self.active_plan["next_steps"]
if steps:
self.active_plan["current_step"] = steps[0]
self.active_plan["next_steps"] = steps[1:]
else:
self.active_plan["current_step"] = ""
def to_prompt(self):
lines = []
if self.task_goal:
lines.append("[TASK_GOAL] " + self.task_goal)
plan = self.active_plan
if plan.get("current_step"):
lines.append("[CURRENT_STEP] (" + str(plan["completed"]) +
"/" + str(plan["total_steps"]) + ") " + plan["current_step"])
if plan.get("next_steps"):
lines.append("[NEXT_STEPS] " + " → ".join(plan["next_steps"]))
if self.recent_observations:
obs_lines = []
for obs in self.recent_observations:
marker = "⚡" if obs.importance == "critical" else "·"
obs_lines.append(" " + marker + " [" + obs.tool_name + "] " + obs.result_summary)
lines.append("[RECENT_OBSERVATIONS]\n" + "\n".join(obs_lines))
if self.constraints:
c_lines = [" - " + c for c in self.constraints]
lines.append("[CONSTRAINTS]\n" + "\n".join(c_lines))
if self.scratchpad.strip():
lines.append("[SCRATCHPAD]\n" + self.scratchpad.strip())
return "\n\n".join(lines)
def reset(self):
self.recent_observations = []
self.scratchpad = ""
# ── Usage example ──
wm = WorkingMemory()
wm.update_task(
goal="Upgrade user-service from v2.1 to v3.0 — zero downtime",
plan_steps=[
"Check v3.0 breaking changes",
"Deploy v3.0 to staging",
"Run integration test suite",
"Canary release 5% traffic",
"Full cutover",
],
constraints=[
"Zero downtime — must use rolling update",
"Do not modify database schema",
"Rollback time < 2 minutes",
"Budget: AWS additional cost ≤ $50",
]
)
wm.add_observation("check_breaking_changes",
"v3.0 removed /api/v1/users endpoint, now uses /api/v2/users", "critical")
wm.add_observation("deploy_staging",
"v3.0 deployed to staging successfully, health check passed")
wm.advance_plan()
wm.update_scratchpad("Integration tests need extra test_db config")
print(wm.to_prompt())
Notice the output structure from to_prompt(): the goal sits at the top, constraints are always retained, and the scratchpad is at the bottom. This ordering is deliberate — the goal is the "north star" the LLM should attend to throughout reasoning; recent observations provide the freshest environmental feedback; constraints are inviolable rules that must never be scrolled out of context. The scratchpad sits at the bottom because it is closest to the LLM's current position, ideal for "what I'm thinking about right now."
Memory Budget: L0 Is Not a Dumpster
L0 information lives in the LLM's context window, and the context window has two costs: attention cost (the LLM distributes attention roughly evenly across tokens — more tokens = less average attention per token) and economic cost (per-token billing). This means L0 must be strictly budgeted:
- Max 3–5 observations: Keep only the most critical tool results. Routine results are evicted from L0 after being written to L1.
- Show only current step + one next step: Don't display the full 12-step plan — that wastes attention.
- Trim constraints to essentials: If the constraint list exceeds 8 items, sort by importance and specificity, keep the top 8.
- Periodic scratchpad pruning: When the scratchpad exceeds 500 characters, trigger a trim — keep recent reasoning, discard verified hypotheses.
4. Long-Term Memory Lifecycle: Write, Dedup, Update, Evict
L2 persistent memory is the most complex layer in the memory system — it persists the longest, accumulates the most data, and, without management, degrades from a "useful knowledge base" into a "noise pool." This section models the L2 lifecycle as a state machine — every memory entry passes through well-defined stages from birth to deletion.
L2 Memory Lifecycle State Machine
┌────────┐ ┌─────────────┐ ┌──────────┐ ┌──────────────┐
│ WRITE │───▶│ DEDUP CHECK │───▶│ SIMILAR │───▶│ STORE + TTL │
└────────┘ └──────┬──────┘ │ MERGE? │ └──────┬───────┘
│ └──────────┘ │
│ (duplicate) ▼
▼ ┌──────────────┐
┌──────────┐ │ PERIODIC GC │
│ UPDATE │ └──────┬───────┘
└──────────┘ │
▼
┌──────────────┐
│ EVICT EXPIRED│
└──────┬───────┘
│
▼
┌──────────────┐
│ EMIT METRICS │
└──────────────┘
Every step in this state machine has explicit policies and boundary conditions. It's not "write when you think of it, delete when it expires" — every step is a decision point.
Write Policy: When to Write to L2
Not every tool call result deserves L2 persistence. Here are the L2 write triggers (any one is sufficient):
| Trigger Condition | Example | Source |
|---|---|---|
| Post-task learning summary | "Key pitfall during upgrade: v3.0 API endpoints renamed from /v1/ to /v2/" | Agent self-summarizes at task completion |
| Explicit user feedback | "I don't like dark mode" / "Remember: always use us-east-1 from now on" | Explicit preference statements in conversation |
| Important fact discovery | "user-service database connection pool max = 100" | Key parameters extracted from tool results |
| Configuration change | "log_level changed from INFO to DEBUG" | Agent action produced a persistent effect |
The following should not be written to L2: intermediate reasoning steps, transient tool call results (e.g., ls output), information more than 50% overlapping with existing entries, and inferred facts with confidence below threshold.
Dedup Strategy: Three Lines of Defense
Deduplication is L2's first line of defense — before writing, determine whether this memory "already exists." Three strategies, ordered by cost from low to high:
- Exact match (Hash): Compute SHA256 of the normalized memory text. If the hash already exists → update the timestamp and confidence on the existing record, don't create a new one. Cost: O(1), negligible.
- Semantic similarity (Cosine): Embed both the new and existing memory, compute cosine similarity. If similarity > 0.95 → same fact, merge (keep the newer timestamp and higher-confidence version). Cost: moderate, requires one embedding computation.
- Entity-level (Key lookup): If the memory carries a
user_idandmemory_key(e.g., "color_preference"), do an exact key lookup on existing records. If one exists → update the value rather than creating a new entry. Cost: O(1) index lookup. This is the most reliable approach — more precise than semantic similarity.
Update vs Overwrite: Versioned Memory
When a memory is determined to "already exist," don't just overwrite it — version it. Each L2 memory record carries:
- version: Auto-incrementing version number. First write = v1, each update +1.
- confidence: Confidence score (0.0–1.0). User explicitly stated = 1.0; agent inferred = 0.6; pattern recognition = 0.4.
- created_at / updated_at: Timestamps.
- source: Memory origin —
"user_stated"|"agent_inferred"|"task_outcome"|"config_change".
When two memories conflict (e.g., "user likes X brand" vs "user dislikes X brand"), the system compares their confidence and timestamps: newer and higher confidence wins; if confidence is equal, newer wins. The conflicting old version is not deleted — it's marked superseded and retained in the audit trail.
TTL and Eviction
L2 memories are not immortal. Each entry has a TTL (Time To Live), in two categories:
- Soft TTL: On expiry, marked as
candidate_for_review— not auto-deleted, but downweighted during retrieval. Suitable for "might still be useful, not sure" memories. - Hard TTL: On expiry, automatically deleted. Suitable for time-bound data — "launch event is next Tuesday" is useless after Wednesday.
GC (Garbage Collection): Periodically scan all L2 records and check:
- Hard TTL expired → direct deletion
- Soft TTL expired AND not retrieved for 30+ days → delete
supersededrecords older than 90 days → delete- Confidence < 0.3 AND not verified for 60+ days → mark as
stale
Code: LongTermMemory Class
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from enum import Enum
class MemorySource(Enum):
USER_STATED = "user_stated"
AGENT_INFERRED = "agent_inferred"
TASK_OUTCOME = "task_outcome"
CONFIG_CHANGE = "config_change"
class MemoryStatus(Enum):
ACTIVE = "active"
SUPERSEDED = "superseded"
STALE = "stale"
EVICTED = "evicted"
@dataclass
class MemoryEntry:
"""A single record in L2 persistent memory"""
memory_id: str
user_id: str
memory_key: str
content: str
embedding: Optional[list] = None
source: MemorySource = MemorySource.AGENT_INFERRED
confidence: float = 0.6
version: int = 1
status: MemoryStatus = MemoryStatus.ACTIVE
soft_ttl_days: int = 30
hard_ttl_days: int = 365
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
last_accessed_at: str = field(default_factory=lambda: datetime.now().isoformat())
retrieved_count: int = 0
def content_hash(self):
normalized = self.content.strip().lower()
return hashlib.sha256(normalized.encode()).hexdigest()
class LongTermMemory:
"""L2 persistent memory manager"""
def __init__(self, user_id, db_conn, vector_store, embed_fn):
self.user_id = user_id
self.db = db_conn
self.vector_store = vector_store
self.embed = embed_fn
self.SIMILARITY_MERGE_THRESHOLD = 0.95
def write(self, memory_key, content,
source=MemorySource.AGENT_INFERRED, confidence=0.6):
# Gate 1: Entity-level dedup (fastest, most precise)
existing = self._lookup_by_key(self.user_id, memory_key)
if existing:
return self._update_existing(existing, content, confidence)
# Gate 2: Exact hash dedup
temp_entry = MemoryEntry(
memory_id="", user_id=self.user_id,
memory_key=memory_key, content=content)
content_hash = temp_entry.content_hash()
hash_match = self._lookup_by_hash(self.user_id, content_hash)
if hash_match:
return self._update_existing(hash_match, content, confidence)
# Gate 3: Semantic similarity dedup (most expensive, last resort)
embedding = self.embed(content)
similar = self._search_similar(self.user_id, embedding, top_k=1)
if similar and similar[0][1] >= self.SIMILARITY_MERGE_THRESHOLD:
return self._merge_or_update(similar[0][0], content, confidence)
# Passed all dedup gates — create new memory
entry = MemoryEntry(
memory_id=self._generate_id(),
user_id=self.user_id,
memory_key=memory_key,
content=content,
embedding=embedding,
source=source,
confidence=confidence,
)
self._persist(entry)
return entry
def read(self, query, top_k=5, use_hybrid=True):
results = []
query_embedding = self.embed(query)
vector_results = self._search_similar(
self.user_id, query_embedding, top_k=top_k)
keyword_results = []
if use_hybrid:
keyword_results = self._keyword_search(
self.user_id, query, top_k=top_k)
merged = self._merge_results(vector_results, keyword_results, top_k)
for entry_id, score in merged:
entry = self._load(entry_id)
if entry and entry.status == MemoryStatus.ACTIVE:
entry.last_accessed_at = datetime.now().isoformat()
entry.retrieved_count += 1
results.append(entry)
return results
def update(self, memory_id, content, confidence=None):
entry = self._load(memory_id)
if not entry:
return None
entry.status = MemoryStatus.SUPERSEDED
self._persist(entry)
new_entry = MemoryEntry(
memory_id=self._generate_id(),
user_id=entry.user_id,
memory_key=entry.memory_key,
content=content,
embedding=self.embed(content),
source=entry.source,
confidence=confidence if confidence is not None else entry.confidence,
version=entry.version + 1,
)
new_entry.soft_ttl_days = entry.soft_ttl_days
new_entry.hard_ttl_days = entry.hard_ttl_days
self._persist(new_entry)
return new_entry
def evict(self, dry_run=False):
stats = {"hard_expired": 0, "soft_expired": 0,
"superseded_old": 0, "stale_marked": 0}
now = datetime.now()
all_entries = self._list_active(self.user_id)
for entry in all_entries:
updated = datetime.fromisoformat(entry.updated_at)
accessed = datetime.fromisoformat(entry.last_accessed_at)
# Hard TTL: auto-delete
if updated + timedelta(days=entry.hard_ttl_days) < now:
if not dry_run:
entry.status = MemoryStatus.EVICTED
self._persist(entry)
stats["hard_expired"] += 1
continue
# Soft TTL: delete if also unaccessed for 30 days
if (updated + timedelta(days=entry.soft_ttl_days) < now
and accessed + timedelta(days=30) < now):
if not dry_run:
entry.status = MemoryStatus.EVICTED
self._persist(entry)
stats["soft_expired"] += 1
continue
# Superseded records older than 90 days
if (entry.status == MemoryStatus.SUPERSEDED
and updated + timedelta(days=90) < now):
if not dry_run:
entry.status = MemoryStatus.EVICTED
self._persist(entry)
stats["superseded_old"] += 1
continue
# Low confidence, unverified for 60+ days → mark stale
if (entry.confidence < 0.3
and updated + timedelta(days=60) < now
and entry.status == MemoryStatus.ACTIVE):
if not dry_run:
entry.status = MemoryStatus.STALE
self._persist(entry)
stats["stale_marked"] += 1
return stats
# ── Storage backend stubs (implement with your DB of choice) ──
def _lookup_by_key(self, user_id, key):
return None
def _lookup_by_hash(self, user_id, h):
return None
def _search_similar(self, user_id, embedding, top_k):
return []
def _keyword_search(self, user_id, query, top_k):
return []
def _merge_results(self, vec_results, kw_results, top_k):
return []
def _generate_id(self):
import uuid
return str(uuid.uuid4())[:12]
def _persist(self, entry):
pass
def _load(self, memory_id):
return None
def _list_active(self, user_id):
return []
def _update_existing(self, existing, content, confidence):
if confidence >= existing.confidence:
return self.update(existing.memory_id, content, confidence)
else:
existing.last_accessed_at = datetime.now().isoformat()
existing.retrieved_count += 1
self._persist(existing)
return existing
def _merge_or_update(self, existing_id, content, confidence):
existing = self._load(existing_id)
if not existing:
return self.write("", content)
if confidence > existing.confidence:
return self.update(existing_id, content, confidence)
return existing
The three-tier dedup strategy is not over-engineering — it's cost-aware architecture. Hash lookup is near-free; key lookup requires an index but is still O(1); embedding-based semantic comparison is expensive (one model inference call per write). The gates are ordered so the expensive operation only runs when the cheaper ones fail. In production, roughly 70% of writes hit the key-lookup gate, 25% hit the hash gate, and only 5% reach the embedding comparison — making the system cost-effective at scale.
For the audit trail design that records every memory state transition (write, update, supersede, evict), see Agent Audit Log Design — every memory mutation should produce an immutable audit event.
5. Retrieval Boundary Design — Push, Pull, and Hybrid Triggers
A memory system stores data, but the retrieval strategy determines whether the agent sees the right information at the right time. Retrieve too little — the agent lacks critical context and makes poor decisions. Retrieve too much — the context window floods with irrelevant information, diluting attention. The goal of retrieval boundary design is to find the correct trigger point between "too little" and "too much."
Three Retrieval Trigger Patterns
Retrieval isn't simply "search once per conversation." Depending on the trigger timing, it breaks down into three patterns:
| Pattern | Trigger Timing | Who Decides | Best For |
|---|---|---|---|
| Push (Proactive) | At task start, before the first LLM call | System (predefined rules) | User preferences, task templates, critical constraints — "should see every time" |
| Pull (Reactive) | Mid-task, agent calls a retrieval tool | LLM (autonomous decision) | Specific fact queries — "how was the last deploy's config?" |
| Hybrid | Push at start + Pull on demand + periodic refresh | System + LLM collaboration | Production agents — stable baseline plus flexible query capability |
Push: Proactive Injection — "Here's what you'll need"
Push happens before the task begins. The MemoryManager scans L1/L2 for memories relevant to the current task context, injects the most relevant N items into L0 working memory, and only then issues the first LLM call.
Push's key constraint is its budget:
- Token budget: The total token volume of Push-injected memories must not exceed a preset limit (e.g., 2000 tokens). The L0 context window is a shared resource — the more Push occupies, the less room remains for tool call results and conversation history.
- Item budget: At most M entries (e.g., M=5). Not "stuff in everything you find" — but "only the top M most relevant."
- Relevance threshold: Only memories with similarity > 0.8 enter the Push candidate pool. Memories below this threshold stay in L2, available for Pull-on-demand later.
Typical Push injection content:
- User preferences: "User requires all DB operations to use read-only replicas," "User prefers Slack notifications over email"
- Active task context: If the agent is executing a cross-session long task (e.g., a three-day database migration), Push injects the previous session's progress and key findings
- Entity facts: "user-service production DB connection pool max = 100" — proactively injected when executing user-service related tasks
Pull: On-Demand Retrieval — "Ask for what you need"
Pull happens during task execution. At a certain step, the agent realizes it needs specific information — it calls a retrieval tool to query L1/L2 for relevant memories. Unlike Push, the Pull decision belongs to the LLM — the LLM decides "what do I need to look up right now."
Typical Pull scenarios:
- Pre-tool-call context enrichment: Before calling
deploy_to_kubernetes, Pull "user previously requested us-east-1 region" - Error-triggered historical lookup: Deployment fails, agent Pulls "last time user-service deploy hit a similar permission error — the fix was…"
- User question answering: User asks "how long did that DB migration take last time?" — agent Pulls the corresponding task outcome memory
search_memory(query, layer, top_k). The LLM doesn't need to know whether the backend is vector search or keyword search — it just needs to express "what I want to find." Fusion, ranking, and deduplication are handled by the system layer.
Hybrid: Combined Trigger — The Production Standard
Push alone and Pull alone each have blind spots. Push's problem: "the system guesses what the agent needs" — guess wrong and you waste budget. Pull's problem: "the agent doesn't know what it doesn't know" — the agent won't query memories it doesn't realize exist.
Hybrid triggering combines both:
- Task start: Push — inject user preferences, active task context, key entity facts (these are "should always see" information)
- During execution: Pull — agent calls retrieval tool on demand for specific information
- Periodic refresh: Every K turns (e.g., K=5), re-evaluate the Push memory pool's relevance — because the task context shifts during execution, memories relevant at turn 1 may be irrelevant by turn 10. Periodic refresh keeps Push memories in L0 fresh
Retrieval Fusion
Vector search alone has blind spots — it excels at semantic similarity but not at exact matching or structured filtering. Production retrieval should fuse three search approaches:
| Search Method | Strength | Blind Spot |
|---|---|---|
| Vector Search | Semantic similarity — "user likes dark tones" matches "user prefers dark theme" | Poor at exact value matching — "user_id=42" is meaningless in vector space |
| Keyword Search | Exact match — terms like "v3.0", "us-east-1" are precisely hit | Poor at synonyms and semantic variations — "dark mode" won't match "night theme" |
| Structured Query | Precise filtering — "user_id=42 AND memory_key='color_preference'" | Poor at fuzzy queries — requires knowing the exact key name |
Fusion pipeline: all three searches return Top K results → deduplicate (same memory_id kept once) → re-rank by composite score → take final Top K into L0.
Composite score formula:
composite_score = α × vector_score + β × keyword_score + γ × recency_bonus + δ × importance_bonus
# Default weights (tunable)
α = 0.4 # Vector score weight
β = 0.3 # Keyword score weight
γ = 0.2 # Recency bonus (newer = higher)
δ = 0.1 # Importance bonus (critical > normal)
Relevance Threshold Tuning
The relevance threshold is the most critical knob in the retrieval system:
- Too high (e.g., 0.95): The agent retrieves almost nothing — presents as "amnesia," treating every conversation like a first meeting. User preferences go uninjected; the agent repeatedly asks the same questions.
- Too low (e.g., 0.5): A flood of weakly-relevant memories surge into L0 — presents as "attention scatter," the agent gets lost in irrelevant information and makes decisions unrelated to the current task.
- Optimal threshold: Must be determined via A/B testing — use hit rate (what fraction of retrieved memories the agent actually uses) and false positive rate (what fraction the agent ignores) as evaluation metrics. The typical optimal range is 0.70–0.85.
Code: RetrievalBoundary Class
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
import math
@dataclass
class RetrievalResult:
"""A single retrieval result"""
memory_id: str
content: str
vector_score: float = 0.0
keyword_score: float = 0.0
composite_score: float = 0.0
source_layer: str = "l2"
@dataclass
class RetrievalBoundary:
"""Retrieval boundary — controls when and how memories flow from L1/L2 into L0"""
push_budget_tokens: int = 2000 # Token budget for Push injection
push_max_items: int = 5 # Max entries per Push
relevance_threshold: float = 0.75 # Minimum relevance threshold
refresh_interval_turns: int = 5 # Refresh Push pool every K turns
# Fusion weights
alpha: float = 0.4 # Vector weight
beta: float = 0.3 # Keyword weight
gamma: float = 0.2 # Recency weight
delta: float = 0.1 # Importance weight
_push_cache: list = field(default_factory=list)
_turn_counter: int = 0
def push(self, l1_store, l2_store, task_context, embed_fn):
"""Before task start: proactively inject relevant memories into L0"""
candidates = []
# Collect session-level relevant memories from L1
l1_results = l1_store.search(task_context, top_k=self.push_max_items)
for r in l1_results:
if r.score >= self.relevance_threshold:
candidates.append(RetrievalResult(
memory_id=r.id, content=r.content,
vector_score=r.score, source_layer="l1"))
# Collect persistent memories from L2 — preferences, active tasks, entity facts
l2_results = l2_store.hybrid_search(
task_context, top_k=self.push_max_items,
filters={"memory_type": ["user_preference", "entity_fact", "task_context"]})
for r in l2_results:
if r.score >= self.relevance_threshold:
candidates.append(RetrievalResult(
memory_id=r.id, content=r.content,
vector_score=r.score, keyword_score=r.keyword_score or 0,
source_layer="l2"))
# Fuse and deduplicate
fused = self._fuse_and_rank(candidates)
# Clip by token budget
selected = self._budget_clip(fused, self.push_budget_tokens)
self._push_cache = selected
self._turn_counter = 0
return selected
def pull(self, l1_store, l2_store, query, top_k=5):
"""Mid-task: LLM actively calls retrieval tool"""
results = []
q_embedding = l2_store.embed(query)
# Vector search
vec_results = l2_store.vector_search(q_embedding, top_k=top_k * 2)
for r in vec_results:
results.append(RetrievalResult(
memory_id=r.id, content=r.content,
vector_score=r.score, source_layer="l2"))
# Keyword search
kw_results = l2_store.keyword_search(query, top_k=top_k)
for r in kw_results:
results.append(RetrievalResult(
memory_id=r.id, content=r.content,
keyword_score=r.score, source_layer="l2"))
fused = self._fuse_and_rank(results)
return fused[:top_k]
def refresh_if_needed(self, l1_store, l2_store, task_context):
"""Every K turns: refresh the Push memory pool"""
self._turn_counter += 1
if self._turn_counter >= self.refresh_interval_turns:
return self.push(l1_store, l2_store, task_context)
return self._push_cache
def _fuse_and_rank(self, candidates):
"""Deduplicate and rank by composite score"""
seen = {}
fused = []
now = datetime.now().timestamp()
for c in candidates:
if c.memory_id in seen:
# Keep the higher-scoring version
existing = seen[c.memory_id]
if c.vector_score > existing.vector_score:
seen[c.memory_id] = c
continue
seen[c.memory_id] = c
for m_id, c in seen.items():
# Recency bonus (retrieved from storage in practice)
recency_bonus = 0.5 # Default
importance_bonus = 0.5
c.composite_score = (
self.alpha * c.vector_score +
self.beta * c.keyword_score +
self.gamma * recency_bonus +
self.delta * importance_bonus
)
fused.append(c)
fused.sort(key=lambda x: x.composite_score, reverse=True)
return fused
def _budget_clip(self, candidates, max_tokens):
"""Clip result list by token budget"""
selected = []
token_count = 0
for c in candidates:
est_tokens = len(c.content) // 4 # Rough estimate: ~4 chars/token for English
if token_count + est_tokens > max_tokens and selected:
break
selected.append(c)
token_count += est_tokens
return selected
def tune_threshold(self, hit_rate, false_positive_rate):
"""Adjust relevance threshold based on A/B test results"""
if hit_rate < 0.3:
self.relevance_threshold = max(0.5, self.relevance_threshold - 0.05)
elif false_positive_rate > 0.4:
self.relevance_threshold = min(0.95, self.relevance_threshold + 0.05)
return self.relevance_threshold
The retrieval boundary doesn't operate in isolation — it uses Context Envelopes to inject retrieved memories into L0. For serialization formats and transport protocols for context data, see Agent Context Protocol Design. The retrieval boundary defines "what data gets retrieved"; the context protocol defines "how data is packaged and transmitted."
6. Memory Hygiene — Preventing Pollution
A memory system is not a "write-only, never-clean" log. As the agent runs over time, L2 persistent memory accumulates various pollutants — duplicates, contradictory facts, stale information, and sensitive data. Without an active hygiene mechanism, memory quality continuously degrades until the agent makes decisions based on outdated and contradictory information. Section 4 covered the lifecycle management of individual memories; this section focuses on cross-memory pollution detection and cleanup strategies.
Four Pollution Vectors
Memory pollution has four classic patterns, each requiring different detection and remediation strategies:
| Pollution Type | Cause | Harm | Detection Method |
|---|---|---|---|
| Duplicates | The same fact written multiple times at different times, in different contexts, using different memory_keys or textual formulations | Retrieval returns multiple copies of the same fact, wasting L0 budget; updates only hit one copy, leaving stale duplicates | Hash dedup + semantic similarity dedup + entity-level key dedup (implemented in Section 4) |
| Contradictions | User changed a preference but the old one wasn't overwritten; agent inferred contradictory facts from different tasks | Agent randomly selects between two contradictory facts — decisions become unpredictable: "user likes dark mode" and "user likes light mode" coexist in L2 | Write-time conflict scanning + periodic contradiction scan |
| Staleness | Facts have changed but old records were never updated or evicted; TTLs unconfigured or set too long | Agent makes decisions based on stale information — "the API endpoint is still at /v1/" (actually migrated to /v2/) | Staleness score computation + TTL expiry checks |
| Sensitivity (PII) | Users inadvertently input PII (phone numbers, emails, ID numbers) or credentials (API keys, tokens) that get written to L2 | Privacy leakage + security risk — credentials retrieved and injected into L0 may surface in subsequent LLM outputs | Regex pattern matching + LLM sensitive-content classifier |
Contradiction Detection: Beyond Keyword Antonyms
Contradiction detection is harder than dedup — two memories may have completely different surface text but express mutually exclusive meanings. Simple keyword-antonym matching ("likes" vs "hates") misses most contradictions. Effective contradiction detection requires:
- Entity + attribute alignment: First confirm whether two memories discuss the same entity's same attribute. If one is "user.color_preference = dark" and the other is "user.color_preference = light" — this is a direct contradiction. If one is "user.color_preference = dark" and the other is "user.font_preference = large" — no contradiction, different attributes.
- Semantic contradiction judgment: For unstructured memories (e.g., "user dislikes brand X" vs "user says brand X's new line looks great"), use an LLM to determine whether a contradiction exists. Send both memory texts to a lightweight judgment prompt:
# Contradiction judgment prompt
You are a fact-consistency checker. Determine whether the following
two memories contradict each other.
If contradictory, return: {"contradiction": true, "reason": "..."}
If not contradictory, return: {"contradiction": false}
If one is an update/correction of the other, return:
{"contradiction": false, "update": true}
Memory A: {memory_a.content}
Memory B: {memory_b.content}
- Auto-resolve vs. human review: If two contradictory memories have a clear confidence gap (e.g., one confidence 1.0, the other 0.4), automatically keep the higher-confidence version and mark the lower-confidence one as
superseded. If confidences are close (gap < 0.2), mark both for human review.
Staleness Score
Not all "old" memories should be evicted — some facts are eternally valid (e.g., "the Earth orbits the Sun"). Staleness judgment requires a composite score:
staleness_score = w₁ × age_factor + w₂ × access_decay + w₃ × contradiction_flag
age_factor = min(1.0, days_since_creation / max_age_days)
access_decay = 1.0 - (retrieval_count_last_30d / expected_retrieval_count)
contradiction_flag = 1.0 if has_active_contradiction else 0.0
# Default weights
w₁ = 0.4 # Age weight
w₂ = 0.3 # Access decay weight
w₃ = 0.3 # Contradiction flag weight
When staleness_score > 0.7, the memory is marked stale — downweighted during retrieval (multiplied by 0.5 decay factor). When staleness_score > 0.9, the memory enters the candidate-eviction queue.
PII / Sensitivity Scanning
Sensitive content detection operates in two tiers:
- Regex pattern matching (fast, low cost): Scan on the write path for common PII patterns — email addresses, phone numbers, national ID numbers, credit card numbers (Luhn algorithm), API keys (specific prefixes like
sk-,ghp_), JWT tokens (three-segment base64 structure). Hit any pattern → block the write + alert. - LLM sensitive-content classifier (slow, high cost, high precision): For content that passes regex scanning but remains suspicious (e.g., "my home address is 123 Main Street" — regex might miss it), use an LLM to judge whether it's sensitive personal information. Only trigger the LLM classifier when the regex stage marks content as
suspicious, to control costs.
Sensitive-information blocking is not optional — it should be enabled by default in production. The only exception is explicitly authorized audit scenarios — and even then, sensitive data should be stored encrypted, not in plaintext L2.
Code: MemoryHygiene Class
import re
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
class HygieneAction(Enum):
BLOCKED = "blocked" # Write blocked
FLAGGED = "flagged" # Flagged for review
MERGED = "merged" # Auto-merged
SUPERSEDED = "superseded" # Superseded by newer version
STALE_MARKED = "stale_marked"
EVICTED = "evicted"
@dataclass
class HygieneEvent:
"""A single hygiene operation event"""
action: HygieneAction
memory_id: str
reason: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
metadata: dict = field(default_factory=dict)
@dataclass
class MemoryHygiene:
"""Memory hygiene manager — anti-pollution, dedup, contradiction detection, PII filtering"""
# Contradiction detection config
contradiction_llm_threshold: float = 0.7 # Similarity threshold for LLM judgment
contradiction_auto_resolve_gap: float = 0.2 # Confidence gap above this → auto-resolve
# Staleness config
staleness_threshold_warn: float = 0.7
staleness_threshold_evict: float = 0.9
access_decay_days: int = 30
# PII regex patterns
PII_PATTERNS = {
"email": re.compile(r'[\w\.-]+@[\w\.-]+\.\w+'),
"phone": re.compile(r'\+?[\d\s\-\(\)]{7,15}'),
"credit_card": re.compile(r'\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b'),
"api_key_openai": re.compile(r'sk-[A-Za-z0-9]{32,}'),
"api_key_github": re.compile(r'ghp_[A-Za-z0-9]{36}'),
"jwt": re.compile(r'eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+'),
}
def __init__(self, l2_store, llm_classifier=None, audit_log=None):
self.l2 = l2_store
self.llm_classifier = llm_classifier # Optional LLM classifier
self.audit_log = audit_log # Audit log system
self.remediation_queue: list = [] # Human review queue
def pre_write_check(self, memory_key, content, user_id):
"""Pre-write check — call before LongTermMemory.write()"""
events = []
# Gate 1: Sensitivity scan
sens_result = self.sensitivity_scan(content)
if sens_result["severity"] == "high":
events.append(HygieneEvent(
action=HygieneAction.BLOCKED,
memory_id="", reason=f"PII blocked: {sens_result['matches']}",
metadata=sens_result))
self._emit_audit(events)
return {"allowed": False, "reason": sens_result["matches"], "events": events}
if sens_result["severity"] == "suspicious":
events.append(HygieneEvent(
action=HygieneAction.FLAGGED,
memory_id="", reason="Content flagged for sensitivity review",
metadata=sens_result))
# Don't block, but flag
# Gate 2: Contradiction detection
contradictions = self.contradiction_scan(
user_id, memory_key, content)
if contradictions:
for contra in contradictions:
events.append(HygieneEvent(
action=HygieneAction.FLAGGED,
memory_id=contra.get("existing_id", ""),
reason=f"Contradiction with {contra.get('existing_id','')}: {contra.get('detail','')}",
metadata=contra))
self._emit_audit(events)
return {"allowed": True, "events": events}
def sensitivity_scan(self, content):
"""Scan content for sensitive information"""
matches = {}
severity = "none"
for pattern_name, pattern in self.PII_PATTERNS.items():
found = pattern.findall(content)
if found:
matches[pattern_name] = found[:3] # Keep first 3 matches
if matches:
# API keys, JWT → high severity
high_severity_keys = {"api_key_openai", "api_key_github", "jwt"}
if set(matches.keys()) & high_severity_keys:
severity = "high"
else:
severity = "suspicious"
# If no regex hit but content looks suspicious, try LLM classifier
if severity == "none" and self.llm_classifier:
llm_result = self.llm_classifier.classify(content)
if llm_result.get("sensitive", False):
severity = "suspicious"
matches["llm_flagged"] = [llm_result.get("reason", "unknown")]
return {"severity": severity, "matches": matches}
def contradiction_scan(self, user_id, memory_key, content):
"""Detect whether new content contradicts existing memories"""
contradictions = []
# Exact key lookup
existing = self.l2.lookup_by_key(user_id, memory_key)
if existing and existing.content != content:
# Same key, different value — likely an update, not a contradiction
return []
# Semantic search — find similar but potentially contradictory content
embedding = self.l2.embed(content)
similar = self.l2.vector_search(embedding, top_k=5)
for sim_entry, sim_score in similar:
if sim_score < 0.6: # Not similar enough to be the same topic
continue
if sim_score > self.contradiction_llm_threshold:
# Use LLM to judge contradiction
if self.llm_classifier:
check = self.llm_classifier.check_contradiction(
content, sim_entry.content)
if check.get("contradiction"):
contradictions.append({
"existing_id": sim_entry.memory_id,
"existing_content": sim_entry.content[:200],
"detail": check.get("reason", ""),
"existing_confidence": sim_entry.confidence,
"confidence_gap": abs(
getattr(sim_entry, 'confidence', 0.5) - 0.5)
})
return contradictions
def staleness_score(self, entry):
"""Compute staleness score for a memory entry (0–1)"""
now = datetime.now()
created = datetime.fromisoformat(entry.created_at)
accessed = datetime.fromisoformat(entry.last_accessed_at)
days_since_created = (now - created).days
max_age = getattr(entry, 'hard_ttl_days', 365)
age_factor = min(1.0, days_since_created / max(max_age, 1))
# Retrieval count in last 30 days approximated by access recency
days_since_access = (now - accessed).days
access_decay = min(1.0, days_since_access / self.access_decay_days)
# Contradiction flag
contradiction_flag = 1.0 if entry.status.value == "superseded" else 0.0
score = 0.4 * age_factor + 0.3 * access_decay + 0.3 * contradiction_flag
if score > self.staleness_threshold_evict:
return score, "evict"
elif score > self.staleness_threshold_warn:
return score, "stale"
return score, "healthy"
def run_hygiene_cycle(self, user_id, dry_run=False):
"""Execute a full hygiene inspection cycle"""
stats = {"duplicates_found": 0, "contradictions_found": 0,
"stale_marked": 0, "evicted": 0, "sensitivity_flagged": 0}
all_entries = self.l2.list_active(user_id)
# Scan for staleness
for entry in all_entries:
score, action = self.staleness_score(entry)
if action == "evict" and not dry_run:
entry.status = "evicted"
self.l2.persist(entry)
stats["evicted"] += 1
elif action == "stale":
stats["stale_marked"] += 1
return stats
def _emit_audit(self, events):
"""Write hygiene events to audit log"""
if self.audit_log:
for event in events:
self.audit_log.record(
event_type="memory_hygiene",
action=event.action.value,
memory_id=event.memory_id,
reason=event.reason,
metadata=event.metadata)
Every hygiene operation — dedup merge, contradiction flag, staleness eviction, PII block — should be recorded as an audit event. The audit log provides an immutable chain of evidence for memory changes. See Agent Audit Log Design — every hygiene event is a link in the audit pipeline.
7. Multi-Tenant Memory Isolation and Scoping
If an agent serves multiple users or organizations, memory must be strictly isolated. User A's preferences must never leak into User B's context — this is not a performance optimization; it is a fundamental data-security requirement. In SaaS customer service, enterprise knowledge bases, and multi-tenant agent platforms, scope isolation is the security foundation of the memory system.
Scope Tree: Four Levels
Memory scope is not a simple "User A vs User B" binary division — it is a hierarchical tree:
/global/ ← Global shared read-only (e.g., product doc summaries, public knowledge)
├── /org/{org_id}/ ← Org shared read-write (org members can read/write)
│ ├── /user/{user_id}/ ← User-isolated read-write (only that user can read/write)
│ │ └── /task/{task_id}/← Task scope (temporary; can promote or clean up after task ends)
│ └── /user/{user_id2}/
└── /org/{org_id2}/
Each of the four levels has a distinct permission model:
| Scope | Read Permission | Write Permission | Lifecycle | Example |
|---|---|---|---|---|
| /global/ | All users | Admin / system only | Long-term | Product feature summaries, public FAQ knowledge |
| /org/{id}/ | All org members | Org members | Org lifetime | Team conventions, shared project context |
| /user/{id}/ | That user only | That user only | User lifetime | Personal preferences, conversation history, config |
| /task/{id}/ | Task executor | Task executor | Task duration | Intermediate step state, ephemeral tool outputs |
Cross-Scope Access Rules
The core principle of scope isolation: cross-scope access is denied by default; explicit authorization is the exception.
- Downward read: Child scopes can read parent scope content —
/user/42/can read/org/1/and/global/(inheritance). But/user/42/cannot read/user/43/(sibling isolation). - Upward write: Denied by default. Child scopes cannot write to parent scopes —
/user/42/cannot write to/org/1/. This prevents ordinary users from polluting org-level knowledge. - Sibling isolation: Strictly prohibited. Between two
/user/scopes, or two/org/scopes — no direct access of any kind. - Promotion: The sole exception. Memories from a task scope can be explicitly promoted to a user scope via
promote()— "critical lessons from this task are worth long-term preservation." Promotion requires an explicit call; it cannot happen automatically, because promotion means data movement across a scope boundary.
Namespace Enforcement at the Storage Layer
Scope isolation cannot rely solely on application-layer checks — it must be enforced at the storage layer. All memory keys are prefixed with their scope path:
# L2 key format
/global/knowledge/product_faq_summary
/org/acme-corp/config/deployment_region
/user/42/preferences/color_scheme
/user/42/task/task-abc123/step_3_output
During vector retrieval, the search scope is always bounded within the scope_prefix. Even if vector search returns similar memories from other users, the scope filter removes them before results reach L0. Specifically:
- Vector metadata includes the
scope_path - Retrieval adds a filter:
scope_path LIKE '/user/42/%' OR scope_path LIKE '/org/acme-corp/%' OR scope_path = '/global/%' - Filtered results are then deduplicated and ranked
Multi-User Isolation Verification
Before deploying to production, you must pass the cross-user memory leak test:
- User A writes a preference: "I like dark mode"
- User B starts a conversation, asking "what color mode do you like?"
- Verify: the agent must NOT return User A's preference. If it does — scope isolation has failed; this is a high-severity security defect.
This test should run automatically as a regression check in the CI/CD pipeline.
Code: MemoryScope Class
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class ScopeLevel(Enum):
GLOBAL = "global"
ORG = "org"
USER = "user"
TASK = "task"
class AccessType(Enum):
READ = "read"
WRITE = "write"
@dataclass
class ScopePath:
"""Parsed scope path result"""
level: ScopeLevel = ScopeLevel.GLOBAL
org_id: Optional[str] = None
user_id: Optional[str] = None
task_id: Optional[str] = None
def to_prefix(self):
parts = ["/global/"]
if self.org_id:
parts.append(f"/org/{self.org_id}/")
if self.user_id:
parts.append(f"/user/{self.user_id}/")
if self.task_id:
parts.append(f"/task/{self.task_id}/")
return "".join(parts)
def is_ancestor_of(self, other):
"""Is self an ancestor scope of other?"""
return other.to_prefix().startswith(self.to_prefix())
def is_sibling_of(self, other):
"""Are self and other sibling scopes (same level, different instance)?"""
if self.level != other.level:
return False
if self.level == ScopeLevel.GLOBAL:
return False # Only one global scope
if self.level == ScopeLevel.ORG:
return self.org_id != other.org_id
if self.level == ScopeLevel.USER:
return self.org_id == other.org_id and self.user_id != other.user_id
if self.level == ScopeLevel.TASK:
return self.user_id == other.user_id and self.task_id != other.task_id
return False
class MemoryScope:
"""Multi-tenant memory scope manager"""
def __init__(self, storage_backend):
self.storage = storage_backend
def check_access(self, requester_scope, target_scope, access_type):
"""Verify whether requester has access rights to target scope"""
req = self._parse_scope(requester_scope)
tgt = self._parse_scope(target_scope)
# Global scope: everyone can read, only admin can write
if tgt.level == ScopeLevel.GLOBAL:
if access_type == AccessType.READ:
return True, "global read allowed for all"
return False, "global write requires admin"
# Self-access: always allowed
if req.to_prefix() == tgt.to_prefix():
return True, "self access"
# Ancestor access: child scopes can read parent scopes
if tgt.is_ancestor_of(req):
if access_type == AccessType.READ:
return True, "ancestor read (inheritance)"
return False, "cannot write to ancestor scope"
# Descendant access: can read but not write descendants
if req.is_ancestor_of(tgt):
if access_type == AccessType.READ:
return True, "descendant read"
return False, "cannot write to descendant scope"
# Sibling isolation
if req.is_sibling_of(tgt) or (
req.level == ScopeLevel.USER and tgt.level == ScopeLevel.USER
and req.user_id != tgt.user_id):
return False, "cross-user isolation"
return False, "access denied"
def promote(self, from_scope, to_scope, entry_id):
"""Promote a memory from child scope to parent scope"""
from_path = self._parse_scope(from_scope)
to_path = self._parse_scope(to_scope)
# Must promote along ancestor/descendant axis
if not from_path.is_ancestor_of(to_path) and not to_path.is_ancestor_of(from_path):
raise PermissionError(
f"Promotion must be between ancestor/descendant scopes. "
f"Got {from_scope} → {to_scope}")
# If 'to' is ancestor of 'from', this is an upward promotion
if from_path.to_prefix().startswith(to_path.to_prefix()):
# Validate promotion path
valid_promotions = {
(ScopeLevel.TASK, ScopeLevel.USER),
(ScopeLevel.USER, ScopeLevel.ORG),
(ScopeLevel.TASK, ScopeLevel.ORG),
}
if (from_path.level, to_path.level) not in valid_promotions:
raise PermissionError(
f"Invalid promotion path: {from_path.level.value} → {to_path.level.value}")
entry = self.storage.load(from_path.to_prefix(), entry_id)
if not entry:
raise ValueError(f"Entry {entry_id} not found in {from_scope}")
# Copy to target scope
new_id = self.storage.copy(
entry, from_prefix=from_path.to_prefix(),
to_prefix=to_path.to_prefix())
return new_id
def build_search_filter(self, scope):
"""Build scope filter conditions for vector/structured search"""
path = self._parse_scope(scope)
# Build list of allowed scope prefixes for search
allowed_prefixes = ["/global/"]
if path.org_id:
allowed_prefixes.append(f"/org/{path.org_id}/")
if path.user_id:
allowed_prefixes.append(f"/user/{path.user_id}/")
if path.task_id:
allowed_prefixes.append(f"/task/{path.task_id}/")
return {
"scope_prefix": allowed_prefixes,
"operator": "OR"
}
def _parse_scope(self, scope_str):
"""Parse scope string into ScopePath"""
path = ScopePath()
parts = [p for p in scope_str.split("/") if p]
for i, part in enumerate(parts):
if part == "global":
path.level = ScopeLevel.GLOBAL
elif part == "org" and i + 1 < len(parts):
path.org_id = parts[i + 1]
path.level = ScopeLevel.ORG
elif part == "user" and i + 1 < len(parts):
path.user_id = parts[i + 1]
path.level = ScopeLevel.USER
elif part == "task" and i + 1 < len(parts):
path.task_id = parts[i + 1]
path.level = ScopeLevel.TASK
return path
# Usage example
scope_mgr = MemoryScope(storage_backend=None)
# User A accessing own memory → allowed
print(scope_mgr.check_access(
"/global/org/acme/user/42/", "/global/org/acme/user/42/task/abc/",
AccessType.READ))
# → (True, 'ancestor read (inheritance)')
# User A attempting to read User B's memory → denied
print(scope_mgr.check_access(
"/global/org/acme/user/42/", "/global/org/acme/user/43/",
AccessType.READ))
# → (False, 'cross-user isolation')
Scope prefixes and Context Envelope namespaces have a one-to-one mapping — each scope path maps to a namespace prefix in the context protocol. For the specific mapping rules, see Agent Context Protocol Design — scope governs "who can see," protocol governs "how it's delivered."
8. Production Checklist and Observability
A memory system's design journey goes from architecture to code to deployment. The final mile is ensuring all subsystems collaborate properly in production. This section provides a production checklist, a monitoring metrics dashboard, and a MemoryManager orchestrator that integrates all previous code fragments into a unified entry point.
Production Checklist
Before going live with the agent memory system, verify each item on this checklist:
- L2 persistent storage has backup strategy configured — SQLite/PostgreSQL database + vector database both have scheduled backups, RPO < 1 hour
- TTLs configured per memory category — user preferences = 90 days, task learnings = 30 days, entity facts = permanent + periodic review
- Dedup pipeline activated — entity-level key lookup → exact hash → semantic similarity (threshold 0.95) all three gate lines active
- Contradiction detection runs on write + periodic scan — pre-write hook scans for conflicts, daily full scan for stale contradictions
- PII / sensitivity filter activated — block-on-high-severity mode; API keys, national ID numbers, etc. directly blocked
- GC pipeline runs on schedule — daily scan for expired/stale/contradicted records; dry_run mode verified before execution
- Scope isolation tested — cross-user memory leak test passed (User A's preferences do not appear in User B's retrieval results)
- MemoryManager integrated and tested with agent main loop — end-to-end: task start → Push memories → task execution → Pull retrieval → session end → promotion to persistent → GC run
- Memory metrics connected to monitoring dashboard — write rate, read rate, hit ratio, dedup rate, contradiction rate, staleness distribution, scope count per user
Key Monitoring Metrics
| Metric | Meaning | Healthy Range | Alert Threshold |
|---|---|---|---|
| write_rate | Memory entries written to L2 per minute | 1–20/min | > 50/min (possible write loop or storm) |
| read_hit_ratio | Pull retrievals returning results / total Pull calls | > 0.3 | < 0.1 (relevance threshold may be too high) |
| dedup_rate | Writes intercepted by dedup / total writes | 0.1–0.5 | > 0.7 (dedup may be too aggressive, or write logic has a bug) |
| contradiction_count | New contradiction pairs detected per day | < 10 | > 50 (possible version management or overwrite logic anomaly) |
| staleness_p50 / p95 | Staleness score distribution across all active memories | p50 < 0.3, p95 < 0.7 | p50 > 0.5 (large fraction of memories near eviction; TTL or GC may not be running) |
| scope_count_per_user | Active scopes per user | 1–10 | > 50 (task scopes may not be getting cleaned up) |
| pii_block_count | Writes blocked by PII filter per day | 0–5 | > 20 (users may be inadvertently leaking credentials; needs upstream fix) |
Code: MemoryManager Orchestrator — Integrating All Subsystems
The following MemoryManager integrates all prior subsystems — WorkingMemory (Section 3), LongTermMemory (Section 4), RetrievalBoundary (Section 5), MemoryHygiene (Section 6), MemoryScope (Section 7) — into a unified orchestrator. It is the single memory entry point the agent main loop needs to call:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class TaskPhase(Enum):
STARTUP = "startup"
EXECUTING = "executing"
TOOL_CALL = "tool_call"
TURN_END = "turn_end"
COMPLETE = "complete"
@dataclass
class MemoryManager:
"""Agent memory system orchestrator — integrates L0–L3 + retrieval boundary + hygiene + scope"""
# Core components
working_memory: object = None # WorkingMemory instance (Section 3)
session_memory: object = None # L1 session store
long_term_memory: object = None # LongTermMemory instance (Section 4)
retrieval_boundary: object = None # RetrievalBoundary instance (Section 5)
hygiene: object = None # MemoryHygiene instance (Section 6)
scope: object = None # MemoryScope instance (Section 7)
external_retrieval: object = None # L3 RAG pipeline
# Configuration
tenant_id: str = ""
user_id: str = ""
task_id: str = ""
# Runtime state
current_phase: TaskPhase = TaskPhase.STARTUP
turn_count: int = 0
metrics: dict = field(default_factory=lambda: {
"writes": 0, "reads": 0, "hits": 0, "dedups": 0,
"contradictions": 0, "pii_blocks": 0, "evictions": 0,
})
# ─── Agent main loop entry points ───
def on_task_start(self, task_goal, plan_steps, constraints=None):
"""Task startup: initialize L0 + Push memories"""
self.current_phase = TaskPhase.STARTUP
self.task_id = self._generate_task_id()
# 1. Initialize working memory
self.working_memory.update_task(task_goal, plan_steps, constraints)
# 2. Push: inject relevant memories from L1/L2
task_context = f"task: {task_goal}"
push_results = self.retrieval_boundary.push(
self.session_memory, self.long_term_memory,
task_context, self.long_term_memory.embed)
# 3. Append Push results to scratchpad (or constraints)
if push_results:
self.working_memory.update_scratchpad(
f"[Memory injection] Loaded {len(push_results)} relevant memories from L1/L2")
for r in push_results:
self.working_memory.update_scratchpad(
f" · {r.content[:100]}...")
self.metrics["reads"] += len(push_results)
self.metrics["hits"] += len(push_results)
return self.working_memory.to_prompt()
def on_pre_llm_call(self):
"""Before each LLM call: check if Push pool needs refresh"""
if self.current_phase == TaskPhase.EXECUTING:
self.retrieval_boundary.refresh_if_needed(
self.session_memory, self.long_term_memory,
self.working_memory.task_goal)
return self.working_memory.to_prompt()
def on_tool_call(self, tool_name, tool_input):
"""Before tool call: Pull relevant memories"""
self.current_phase = TaskPhase.TOOL_CALL
# Pull: retrieve memories relevant to this tool call
query = f"{tool_name} {str(tool_input)[:200]}"
pull_results = self.retrieval_boundary.pull(
self.session_memory, self.long_term_memory, query, top_k=3)
if pull_results:
self.metrics["reads"] += 1
self.metrics["hits"] += 1
context = "\n".join([r.content for r in pull_results])
self.working_memory.update_scratchpad(
f"[Pull: {tool_name}] {context[:300]}")
return pull_results
def on_observation(self, tool_name, result, importance="normal"):
"""After tool call: write to L0 + L1"""
# L0: working memory
self.working_memory.add_observation(tool_name, result, importance)
# L1: session memory (write-through)
entry = {
"tool_name": tool_name,
"result": result[:500],
"importance": importance,
"timestamp": datetime.now().isoformat(),
}
self.session_memory.append(self.task_id, entry)
def on_turn_end(self):
"""End of reasoning turn: advance plan + update counter"""
self.turn_count += 1
self.current_phase = TaskPhase.EXECUTING
# Advance plan if current step completed
if self.turn_count > 0:
self.working_memory.advance_plan()
def on_task_complete(self, task_summary=""):
"""Task complete: promote important memories from L1 to L2 + cleanup"""
self.current_phase = TaskPhase.COMPLETE
# 1. Evaluate L1 important memories, promote to L2
session_entries = self.session_memory.get_all(self.task_id)
promoted = 0
for entry in session_entries:
if entry.get("importance") in ("critical", "high"):
# Hygiene check
hygiene_check = self.hygiene.pre_write_check(
memory_key=f"task_{self.task_id}_outcome",
content=entry["result"],
user_id=self.user_id)
if not hygiene_check["allowed"]:
self.metrics["pii_blocks"] += 1
continue
# Write to L2
self.long_term_memory.write(
memory_key=f"task_{self.task_id}_outcome",
content=entry["result"],
source="task_outcome",
confidence=0.7)
promoted += 1
self.metrics["writes"] += 1
# 2. Write task learning summary
if task_summary:
self.long_term_memory.write(
memory_key=f"task_{self.task_id}_learning",
content=task_summary,
confidence=0.8)
self.metrics["writes"] += 1
# 3. Clean up task scope
self.session_memory.clear(self.task_id)
# 4. Run GC (once per day, not every task completion)
self._maybe_run_gc()
return {"promoted": promoted, "task_id": self.task_id}
def search(self, query, layer="l2", top_k=5):
"""Agent-initiated memory search (tool interface exposed to LLM)"""
self.metrics["reads"] += 1
if layer == "l2":
results = self.long_term_memory.read(query, top_k=top_k)
if results:
self.metrics["hits"] += 1
return results
elif layer == "l1":
return self.session_memory.search(query, top_k=top_k)
elif layer == "l3":
return self.external_retrieval.search(query, top_k=top_k)
return []
# ─── Internal helpers ───
def _generate_task_id(self):
import uuid
return f"task-{str(uuid.uuid4())[:8]}"
def _maybe_run_gc(self):
"""Run garbage collection once per day"""
today = datetime.now().strftime("%Y-%m-%d")
last_gc = getattr(self, "_last_gc_date", "")
if last_gc == today:
return
self._last_gc_date = today
# Dry run first
dry_stats = self.long_term_memory.evict(dry_run=True)
# Then execute
gc_stats = self.long_term_memory.evict(dry_run=False)
self.metrics["evictions"] += sum(gc_stats.values())
def get_metrics(self):
"""Export current metrics"""
return {
**self.metrics,
"hit_ratio": (self.metrics["hits"] / max(self.metrics["reads"], 1)),
"turn_count": self.turn_count,
"phase": self.current_phase.value,
}
# ─── Agent main loop integration example ───
# Initialize
mm = MemoryManager(
working_memory=WorkingMemory(),
session_memory=SessionStore(),
long_term_memory=LongTermMemory(
user_id="user-42", db_conn=db, vector_store=vs, embed_fn=embed),
retrieval_boundary=RetrievalBoundary(),
hygiene=MemoryHygiene(l2_store=l2, audit_log=audit),
scope=MemoryScope(storage_backend=store),
tenant_id="acme-corp",
user_id="user-42",
)
# Agent main loop
task_goal = "Upgrade user-service from v2.1 to v3.0"
plan = ["Check breaking changes", "Deploy to staging", "Run integration tests",
"Canary release 5%", "Full cutover"]
# 1. Task start
context = mm.on_task_start(task_goal, plan, constraints=["Zero downtime", "Don't modify DB schema"])
# → L0 populated, Push memories injected
# 2. Reasoning + tool-calling loop
for turn in range(10):
# LLM reasoning
context = mm.on_pre_llm_call()
# llm_response = llm.chat(messages=[system_msg, user_msg, context])
# Simulate tool call
tool_name = "check_breaking_changes"
mm.on_tool_call(tool_name, {"target_version": "v3.0"})
mm.on_observation(tool_name, "v3.0 removed /api/v1/users, now uses /api/v2/users", "critical")
mm.on_turn_end()
# 3. Task complete
result = mm.on_task_complete(
task_summary="v3.0 upgrade key: /api/v1/users → /api/v2/users, all callers must update")
print(f"Task complete: promoted {result['promoted']} memories to L2")
print(f"Metrics: {mm.get_metrics()}")
This MemoryManager is the single entry point for the entire memory system. Agent developers don't need to call WorkingMemory, LongTermMemory, RetrievalBoundary separately — just call MemoryManager's corresponding methods at each key node in the task lifecycle. The internal retrieval boundary, hygiene checks, and scope isolation are all transparent.
For memory system metrics and observability, see Agent Observability — including how to connect these metrics to Grafana, Prometheus, Datadog, and other monitoring platforms, and how to configure alerting rules.
FAQ
What's the difference between Agent Memory and RAG?
RAG is a retrieval mechanism: tool call → search external docs → inject results into context. Memory is the entire persistence + retrieval system: deciding what to remember, how long, how to retrieve it, when to evict, how to isolate it. RAG can serve as the L3 external retrieval layer; the memory system encompasses all four layers (L0–L3) plus write policies, lifecycle management, pollution prevention, and scope isolation.
How does this article relate to the existing agent-memory-systems article?
agent-memory-systems covers "how to store" — SQLite schema, vector DB configuration, JSON persistence. This article covers "how to design" — four-layer architecture, retrieval boundaries, lifecycle management, pollution prevention. The former is the implementation manual; this is the architectural blueprint. We recommend reading the former first to understand storage fundamentals, then this article to master architectural design.
What's the difference between L0 Working Memory and the context window?
The context window is the LLM's physical limit (e.g., 128K tokens). L0 working memory is the structured content you choose to place inside that window — not stuffing all history messages in, but carefully selecting task goal + active plan + recent observations + constraints. L0 is your design decision; the window size is the LLM's physical constraint.
Does a memory system require a vector database?
No. L1 session memory can use Redis or an in-memory dict. The structured portion of L2 persistent memory (entity facts, user preferences) can use SQLite or PostgreSQL. A vector database is only necessary when semantic search is required — typically for L2's unstructured memories (conversation snippets, task learnings) and L3 external retrieval. Small-scale agents can operate entirely without vector search.
In multi-tenant scenarios, how do you prevent User A's memories from leaking to User B?
Through scope prefix enforcement: all memory keys are prefixed with /user/{user_id}/. On retrieval, MemoryScope.check_access() verifies whether the requester's scope can read the target memory. Cross-scope queries are blocked by the firewall. Even if vector search returns similar memories from other users, the scope filter removes them before they reach L0.
How much storage does a memory system need? How do I estimate it?
Rough estimate: each L2 memory entry ~2-5KB (structured fields + embedding vector). An active user generates 10–50 memories per day. 100 DAU × 30 days × 5KB ≈ 150MB/month. Add embedding vectors (1536 dimensions × 4 bytes = 6KB per entry) ≈ an additional 180MB. Total roughly 330MB/month. With TTL eviction policies, you can keep it under 500MB.
Continue Reading
This is the first article in the Agent Memory and Context Engineering series. We recommend the following reading path:
- Agent Memory Systems: From Short-Term Windows to Long-Term Vector Storage — Concrete storage implementations (SQLite/JSON/Vector DB), the foundation this architecture sits on
- Agent Context Protocol Design — The pipeline and envelope for context transmission, the data channel for the memory system
- Agent Audit Log Design — Memory mutations produce audit events, forming an immutable chain of evidence
- Multi-Agent Orchestration — Memory sharing and isolation strategies in multi-agent scenarios
- Model-Agnostic Agent Design — Provider-agnostic design principles for memory architecture