Agent 记忆系统设计:短期记忆、长期记忆与检索边界
30秒要点
- 核心问题:"加个向量数据库"不等于有了记忆系统。Agent 在生产环境中会忘记任务中间状态、跨会话积累记忆污染、混淆多用户数据——需要一个架构层面的记忆系统设计。
- 解决方案:L0-L3 四层记忆架构——L0 工作记忆(任务大脑)→ L1 会话记忆 → L2 持久记忆 → L3 外部检索。每层有明确的存储机制、生命周期和检索策略。
- 关键实现:MemoryManager 编排器 + 检索边界(推/拉/混合触发)+ 记忆卫生(去重/矛盾检测/PII扫描)+ 多租户作用域隔离。含 7 段完整 Python 代码。
- 读完能做什么:为你的 Agent 设计一套生产级记忆系统——知道什么该记住、记多久、怎么检索、如何防污染、怎么隔离多用户数据。
1. 为什么「加个向量数据库」不等于有了记忆系统
一个客服 Agent 上线了。团队给它配了一个向量数据库,每次对话结束后把用户偏好写入向量库。下一次用户提问时,Agent 从向量库检索相关偏好,注入到 Prompt 中。"Agent 有记忆了",团队很满意。
三周后,用户投诉:Agent 反复推荐一款用户在上个月明确拒绝过的产品。团队排查发现:用户确实在上个月说过"我不喜欢这个品牌",Agent 也忠实地把这句话存入了向量库。但用户第二周又说过一次"现在不考虑这个品类"——两条偏好叠加在一起,向量检索返回了第一条(因为它更短、余弦相似度更高),Agent 看到的是"我不喜欢这个品牌"而非"现在不考虑这个品类"。更糟的是,用户第三周改了偏好——"X 品牌的新系列不错"——这条新信息应该覆盖旧信息,但它只是作为一条新记录被追加上去。Agent 在三条互相矛盾的偏好中随机检索。
这就是向量数据库 ≠ 记忆系统的本质原因:向量数据库是一个存储引擎,记忆系统是一个管理层。前者负责"存和查",后者负责"什么时候写、写什么、什么时候更新、什么时候淘汰、查到了怎么判断是否可信"。
三种朴素的记忆失败模式
在生产环境中,简单地"把东西塞进向量库"会触发三类问题:
- 遗忘(Forgetting):L0 上下文窗口溢出,Agent 在处理多步任务时丢失中间状态。比如 Agent 执行一个 12 步的数据库迁移,第 8 步时 LLM 的上下文窗口已经被前 7 步的结果占满,第 1 步的关键约束"不要修改 users 表"被挤出窗口——Agent 在第 9 步把 users 表删了。这不是记忆力差,是没有结构化工作记忆——关键约束应该被锁定在独立的槽位中,不受上下文窗口滚动的影响。
- 污染(Pollution):旧的、错误的、矛盾的记忆从不清理。一个 Agent 连续运行了三个月,积累了上千条"用户偏好",其中 40% 已经过时、15% 互相矛盾、5% 是用户在测试阶段的非真实数据。每次检索都是一场赌博——返回的是最新的还是最旧的?是最相关的还是最相似的?更危险的是,LLM 对向量检索返回的内容默认信任——它不会质疑"这条偏好可能已经过期了"。
- 混淆(Conflation):所有用户的数据放在同一个桶里。Agent A 为一家 SaaS 公司服务,同时为多个客户提供客服。用户 1 说"我喜欢深色模式",用户 2 说"我讨厌深色模式"。两条偏好都在同一个向量库中,检索时按相似度排序——返回哪一个取决于谁写的更短、更"像查询"。这不是 bug,是设计缺陷:没有命名空间隔离。
关键洞察:RAG(检索增强生成)和 Memory(记忆系统)不是一回事。RAG 是一个检索机制——工具调用 → 搜索 → 注入。Memory 是一套完整系统——写入策略、生命周期管理、检索边界、作用域隔离、记忆卫生。打个比方:RAG 是一个图书馆的搜索框;Memory 是图书管理员 + 编目规则 + 书架布局 + 剔旧策略 + 借阅规则。搜索框只知道"找书",不知道"这本书已经过时、应该下架了"。
所以我们需要什么:四层架构预览
一个生产级 Agent 记忆系统需要四个层次,每层解决不同的问题:
L0 工作记忆(Working Memory) → "现在"
上下文窗口内的结构化槽位
生命周期:每次推理轮次重置
L1 会话记忆(Session Memory) → "这次对话"
Redis / 内存字典
生命周期:会话持续期间
L2 持久记忆(Persistent Memory) → "直到被淘汰"
SQLite/Postgres + 向量数据库
生命周期:直到显式淘汰
L3 外部检索(External Retrieval) → "外面的世界"
RAG pipeline(文档、API、Web)
生命周期:无状态获取
这篇文章从架构层面解构这四层:每层存什么、怎么存、生命周期多长、如何检索、如何保持卫生。这不是一个向量数据库教程——这是针对"Agent 的记忆会随时间衰减、污染和混淆"这一生产问题,给出的架构级解决方案。
关于记忆的存储实现细节(向量数据库选型、嵌入模型对比),参见 Agent 记忆存储系统。关于框架无关的设计原则,参见 模型无关的 Agent 设计。
2. 四层记忆架构:L0 工作记忆 → L3 外部检索
在深入每一层之前,先建立一个全局视图。四层记忆架构的核心思想是分层治理——不是所有信息都放在同一个存储里、用同一种方式检索。每一层有独立的存储机制、生命周期、访问模式和示例数据。
架构全景图
┌─────────────────────────────────────────────────────────┐
│ Agent — MemoryManager │
│ ┌───────────────────────────────────────────────────┐ │
│ │ │ │
│ │ L0 WORKING MEMORY (上下文窗口内) │ │
│ │ 任务目标 · 当前计划 · 最近的观察 · 约束 · 草稿纸 │ │
│ │ 生命周期: per-turn · 访问: 始终可用 │ │
│ │ │ │
│ │ ┌──── write-through ──────────────────────────┐ │ │
│ │ ▼ │ │ │
│ │ L1 SESSION MEMORY (Redis / 内存字典) │ │ │
│ │ 对话轮次 · 工具结果 · 中间决策 │ │ │
│ │ 生命周期: session · 访问: 按需拉取 │ │ │
│ │ │ │ │
│ │ ┌──── promotion (重要记忆提升) ──────────────┐ │ │ │
│ │ ▼ │ │ │ │
│ │ L2 PERSISTENT MEMORY (SQLite+PG / 向量DB) │ │ │ │
│ │ 用户偏好 · 学习到的知识 · 任务结果 · 实体信息 │ │ │ │
│ │ 生命周期: 直到显式淘汰 · 访问: 混合检索 │ │ │ │
│ │ │ │ │ │
│ │ ┌──── just-in-time ──────────────────────────┐ │ │ │ │
│ │ ▼ ▼ │ │ │ │
│ │ L3 EXTERNAL RETRIEVAL (RAG / APIs / Web) │ │ │ │
│ │ 文档 · 知识库 · 实时数据 │ │ │ │
│ │ 生命周期: 无状态 · 访问: 即时检索 │ │ │ │
│ │ │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
各层定义
| 层 | 存储机制 | 生命周期 | 访问模式 | 示例数据 |
|---|---|---|---|---|
| L0 工作记忆 | 上下文窗口内(LLM context) | 每轮推理后重置 | 始终可用(push) | 任务目标、当前计划步骤、最近 3-5 条工具输出、约束条件、草稿纸 |
| L1 会话记忆 | Redis / 内存字典 | 会话持续期间 | 按需拉取(pull when relevant) | 完整对话历史、工具调用结果、中间决策记录 |
| L2 持久记忆 | SQLite / PostgreSQL + 向量数据库 | 直到显式淘汰 | 混合检索(关键词 + 向量 + 结构化查询) | 用户偏好、学习到的事实、历史任务结果、实体知识 |
| L3 外部检索 | RAG pipeline(外部文档、API、Web) | 无状态获取 | 即时检索(just-in-time) | 产品文档、知识库文章、实时 API 数据、Web 搜索结果 |
关键设计原则
- Write-through(写入穿透):L0 → L1 自动写入。每次重要的工具调用结果在注入 L0 的同时写入 L1。这确保了即使 L0 被后续内容挤出窗口,信息仍然在会话中可恢复。
- Promotion / Demotion(提升 / 降级):会话结束时,MemoryManager 评估 L1 中的记忆。重要的(用户明确反馈、关键决策、配置变更)提升到 L2 持久化;不重要的(中间推理、暂态工具输出)随会话结束而释放。
- TTL per layer(每层独立 TTL):L1 的 TTL = 会话时长(通常几分钟到几小时)。L2 的 TTL = 可配置(几天到永久),支持软 TTL(标记待审查)和硬 TTL(到期自动删除)。
- Namespace isolation(命名空间隔离):每一层的存储都按
tenant_id或user_id划分命名空间。L2 的用户偏好不能跨用户检索——这不是一个性能优化,而是数据安全的基本要求。
代码:MemoryLayer 枚举 + MemoryConfig
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class MemoryLayer(Enum):
L0_WORKING = "l0_working"
L1_SESSION = "l1_session"
L2_PERSISTENT = "l2_persistent"
L3_EXTERNAL = "l3_external"
@dataclass
class LayerConfig:
"""单层记忆的配置"""
max_items: int # 最大条目数
ttl_seconds: Optional[int] # TTL(秒),None 表示不过期
eviction_policy: str = "lru" # 淘汰策略:lru / fifo / ttl
@dataclass
class MemoryConfig:
"""Agent 记忆系统总配置"""
tenant_id: str # 多租户隔离
user_id: str # 用户级隔离
l0: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=5, ttl_seconds=None)) # per-turn reset,不需要 TTL
l1: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=100, ttl_seconds=3600)) # 1 小时会话
l2: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=10000, ttl_seconds=86400 * 30)) # 30 天
l3: LayerConfig = field(default_factory=lambda: LayerConfig(
max_items=0, ttl_seconds=None)) # 无状态,无上限
# 检索配置
similarity_threshold: float = 0.75 # 向量检索最低相似度
hybrid_search_weight: float = 0.5 # 向量 vs 关键词权重(0=纯关键词, 1=纯向量)
这篇架构的视角是"记忆作为仓库"——存储、分类、检索、清理。与之对应的另一篇文章 Agent 上下文协议设计 讨论的是"记忆如何流动"——信息的序列化格式、传输协议、压缩策略。仓库管存储,管道管输送,两者互补。
3. 工作记忆设计:Agent 的「大脑工作台」
L0 工作记忆是 Agent 的"大脑工作台"——每一次推理轮次从这里开始。它不是简单的"把前 N 条消息塞进 Prompt",而是结构化的槽位系统。之所以需要结构化,是因为 LLM 对扁平消息列表的注意力分布不均匀——距离当前响应位置越近的内容,注意力权重越高;越远的内容,越容易被"遗忘"。如果你把任务目标放在第 1 条消息中,经过 20 轮对话后,LLM 对这条目标的有效注意力已经衰减到接近零。
五个结构化槽位
L0 工作记忆不是一条扁平的消息列表,而是五个独立的槽位,每个槽位以固定的格式注入 Prompt:
| 槽位 | 存储内容 | 更新频率 | Prompt 中的位置 |
|---|---|---|---|
| task_goal | 当前任务的目标(一句话) | 任务开始设定,任务完成清除 | 最顶部(最高注意力权重) |
| active_plan | 当前步骤 + 下一步骤 | 每个步骤完成后更新 | 目标之后 |
| recent_observations | 最近 N 条工具调用结果(max 3-5 条) | 每次工具调用后追加 | 计划之后 |
| constraints | 硬约束:预算、截止时间、禁止操作 | 任务开始设定,极少变更 | 观察之后(但始终保留) |
| scratchpad | 中间推理、临时计算、待验证假设 | 任意时刻读写 | 最后(最近、最灵活) |
这个结构不是任意的。它遵循一个原则:越重要的信息位置越固定。LLM 的注意力机制对固定位置的重复模式最敏感——如果每轮 Prompt 中目标都在同一个位置、用同一个标签包裹,LLM 对这个槽位的注意力保持更稳定。
Push vs Pull:什么时候从下层拉取记忆
L0 不是孤立的——它在两个时间点从 L1/L2 拉取信息:
- 任务启动时:从 L2 拉取用户偏好、历史任务结果、实体知识,填充 task_goal 和 constraints 槽位。从 L1 拉取当前会话的累积上下文(如果有的话)。
- 工具调用时:在调用需要特定知识的工具之前,从 L2 拉取相关事实。例如:Agent 准备调用
deploy_to_kubernetes之前,从 L2 拉取"用户上次要求使用 us-east-1 区域"的偏好。
注意这跟"每轮都检索"不同——每轮检索会把不相关的记忆注入 Prompt,浪费上下文预算。L0 的检索是事件驱动的:只在状态转换点(任务开始、工具调用)触发。
代码:WorkingMemory 类
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
@dataclass
class Observation:
"""单条工具调用结果"""
tool_name: str
result_summary: str
timestamp: float
importance: str = "normal"
@dataclass
class WorkingMemory:
"""L0 工作记忆——Agent 的大脑工作台"""
task_goal: str = ""
active_plan: dict = field(default_factory=dict)
recent_observations: list = field(default_factory=list)
constraints: list = field(default_factory=list)
scratchpad: str = ""
MAX_OBSERVATIONS = 5
MAX_CONSTRAINTS = 8
def update_task(self, goal, plan_steps, constraints=None):
self.task_goal = goal
self.active_plan = {
"current_step": plan_steps[0] if plan_steps else "",
"next_steps": plan_steps[1:3] if len(plan_steps) > 1 else [],
"total_steps": len(plan_steps),
"completed": 0,
}
if constraints:
self.constraints = constraints[:self.MAX_CONSTRAINTS]
def add_observation(self, tool_name, result, importance="normal"):
obs = Observation(
tool_name=tool_name,
result_summary=result[:200],
timestamp=datetime.now().timestamp(),
importance=importance,
)
if importance == "critical":
self.recent_observations.insert(0, obs)
else:
self.recent_observations.append(obs)
self.recent_observations = self.recent_observations[:self.MAX_OBSERVATIONS]
def update_scratchpad(self, note):
ts = datetime.now().strftime("%H:%M")
self.scratchpad += "\n[" + ts + "] " + note
def advance_plan(self):
self.active_plan["completed"] += 1
steps = self.active_plan["next_steps"]
if steps:
self.active_plan["current_step"] = steps[0]
self.active_plan["next_steps"] = steps[1:]
else:
self.active_plan["current_step"] = ""
def to_prompt(self):
lines = []
if self.task_goal:
lines.append("[TASK_GOAL] " + self.task_goal)
plan = self.active_plan
if plan.get("current_step"):
lines.append("[CURRENT_STEP] (" + str(plan["completed"]) +
"/" + str(plan["total_steps"]) + ") " + plan["current_step"])
if plan.get("next_steps"):
lines.append("[NEXT_STEPS] " + " → ".join(plan["next_steps"]))
if self.recent_observations:
obs_lines = []
for obs in self.recent_observations:
marker = "⚡" if obs.importance == "critical" else "·"
obs_lines.append(" " + marker + " [" + obs.tool_name + "] " + obs.result_summary)
lines.append("[RECENT_OBSERVATIONS]\n" + "\n".join(obs_lines))
if self.constraints:
c_lines = [" - " + c for c in self.constraints]
lines.append("[CONSTRAINTS]\n" + "\n".join(c_lines))
if self.scratchpad.strip():
lines.append("[SCRATCHPAD]\n" + self.scratchpad.strip())
return "\n\n".join(lines)
def reset(self):
self.recent_observations = []
self.scratchpad = ""
wm = WorkingMemory()
wm.update_task(
goal="将 user-service 从 v2.1 升级到 v3.0,零停机",
plan_steps=[
"检查 v3.0 的 breaking changes",
"在 staging 环境部署 v3.0",
"运行集成测试套件",
"金丝雀发布 5% 流量",
"全量切换",
],
constraints=[
"零停机——必须使用滚动更新",
"不要修改数据库 schema",
"回滚时间 < 2 分钟",
"预算:AWS 额外费用不超过 $50",
]
)
wm.add_observation("check_breaking_changes",
"v3.0 移除了 /api/v1/users 端点,改用 /api/v2/users", "critical")
wm.add_observation("deploy_staging",
"v3.0 在 staging 成功部署,健康检查通过")
wm.advance_plan()
wm.update_scratchpad("集成测试需要额外配置 test_db 连接")
print(wm.to_prompt())
注意 to_prompt() 的输出结构:目标在最顶部,约束始终保留,草稿纸在最后。这不是随意排列的——目标是 LLM 在整个推理过程中应该持续关注的"北极星";最近的观察提供最新的环境反馈;约束是铁律,绝不能因为上下文滚动而被遗忘。草稿纸在最底部,因为它距离 LLM 的当前位置最近,适合存放"现在正在想的事情"。
记忆预算:L0 不是垃圾桶
L0 的信息存储在 LLM 的上下文窗口中,而上下文窗口有两个成本:注意力成本(LLM 对每个 token 平均分布注意力,token 越多每个 token 获得的平均注意力越低)和经济成本(按 token 计费)。这意味着 L0 必须严格预算:
- observation 不超过 3-5 条:只保留最重要的工具结果。普通结果在写入 L1 后从 L0 移出。
- plan 中只显示当前步骤和一个下一步:不展示完整的 12 步计划——那会浪费注意力。
- constraints 裁剪到关键项:如果约束列表超过 8 条,按重要性和特异性排序,保留前 8 条。
- scratchpad 定期裁剪:每当 scratchpad 超过 500 字符,触发一个裁剪操作——保留最近的推理,删除被验证的假设。
记忆预算的核心准则:不是所有信息都应该进入 L0。L0 是 Agent 的"注意力焦点"——只有当前任务最需要的信息才值得占据这个焦点。其他信息在 L1/L2 中安静等待,在需要时被拉取。
4. 长期记忆生命周期:写入、去重、更新、淘汰
L2 持久记忆是记忆系统中最复杂的层次——它停留时间最长,积累的数据最多,如果不加管理,会从一个"有用的知识库"退化为一个"噪音池"。这一节把 L2 的生命周期定义为一个状态机——每一条记忆从诞生到消亡都经历明确的阶段。
L2 记忆生命周期状态机
┌────────┐ ┌─────────────┐ ┌──────────┐ ┌──────────────┐
│ WRITE │───▶│ DEDUP CHECK │───▶│ SIMILAR │───▶│ STORE + TTL │
└────────┘ └──────┬──────┘ │ MERGE? │ └──────┬───────┘
│ └──────────┘ │
│ (重复) ▼
▼ ┌──────────────┐
┌──────────┐ │ PERIODIC GC │
│ UPDATE │ └──────┬───────┘
└──────────┘ │
▼
┌──────────────┐
│ EVICT EXPIRED│
└──────┬───────┘
│
▼
┌──────────────┐
│ EMIT METRICS │
└──────────────┘
这个状态机的每一步都有明确的策略和边界条件。它不是"想到了就写,过期了就删"——每一步都是一个决策点。
写入策略:什么时候向 L2 写入
不是每个工具调用结果都值得写入 L2。以下是 L2 写入的触发条件(满足任一即可):
| 触发条件 | 示例 | 来源 |
|---|---|---|
| 任务完成后学习总结 | "升级时遇到的主要坑:v3.0 API 端点名称全部从 /v1/ 改为 /v2/" | Agent 在任务结束时自我总结 |
| 用户明确反馈 | "我不喜欢深色模式"、"记住:以后都用 us-east-1" | 用户对话中的显式偏好陈述 |
| 重要事实发现 | "user-service 的数据库连接池上限是 100" | 工具调用结果中提取的关键参数 |
| 配置变更 | "log_level 从 INFO 改为 DEBUG" | Agent 执行的操作产生了持久效果 |
以下情况不应写入 L2:中间推理步骤、临时工具调用结果(如 ls 输出)、超过一半以上内容相同的重复信息、置信度低于阈值的事实推断。
去重策略:三条防线
去重是 L2 的第一道防线——在写入之前判断这条记忆是否"已经存在"。三条策略按成本从低到高排列:
- 精确匹配(Hash):对记忆的规范化文本计算 SHA256 哈希。如果哈希已存在 → 更新已有记录的时间戳和置信度,不新建。成本极低,O(1)。
- 语义相似(Cosine Similarity):对新记忆和现有记忆分别计算嵌入向量,计算余弦相似度。如果 similarity > 0.95 → 同一事实,合并(保留更新时间和更高置信度的版本)。成本中等,需要一次向量计算。
- 实体级(Entity-level):如果记忆带有
user_id和memory_key(如 "color_preference"),按 key 精确查找已有记录。如果存在 → 更新值而非新建。成本低,O(1) 索引查找。这是最可靠的方式——比语义相似度更精确。
更新 vs 覆盖:版本化记忆
当一条记忆被判定为"已存在"时,不应该直接覆盖——应该版本化。每条 L2 记忆记录包含:
- version:版本号(自增)。第一次写入 = v1,每次更新 +1。
- confidence:置信度(0.0–1.0)。用户明确陈述 = 1.0;Agent 推断 = 0.6;模式识别 = 0.4。
- created_at / updated_at:时间戳。
- source:记忆来源——"user_stated" | "agent_inferred" | "task_outcome" | "config_change"。
当两条记忆矛盾时(如 "user 喜欢 X 品牌" vs "user 不喜欢 X 品牌"),系统比较它们的置信度和时间戳:更新且置信度更高的获胜;如果置信度相同,更新的获胜。矛盾的旧版本不删除——标记为 superseded 并保留在审计日志中。
TTL 与淘汰
L2 记忆不是永生的。每条记忆都有 TTL(Time To Live),分为两类:
- 软 TTL(Soft TTL):到期后标记为
candidate_for_review——不会自动删除,但在检索时降低权重。适合"可能还有用,但不确定"的记忆。 - 硬 TTL(Hard TTL):到期后自动删除。适合明确的时效性数据——"下周二是发布会"过了下周三就没用了。
GC(垃圾回收):定期扫描 L2 中所有记录,检查:
- 硬 TTL 已过期 → 直接删除
- 软 TTL 已过期且超过 30 天未被检索 → 删除
- 与更新版本矛盾的
superseded记录超过 90 天 → 删除 - 置信度 < 0.3 且超过 60 天未被验证 → 标记为
stale
代码:LongTermMemory 类
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from enum import Enum
class MemorySource(Enum):
USER_STATED = "user_stated"
AGENT_INFERRED = "agent_inferred"
TASK_OUTCOME = "task_outcome"
CONFIG_CHANGE = "config_change"
class MemoryStatus(Enum):
ACTIVE = "active"
SUPERSEDED = "superseded"
STALE = "stale"
EVICTED = "evicted"
@dataclass
class MemoryEntry:
"""L2 持久记忆中的一条记录"""
memory_id: str
user_id: str
memory_key: str
content: str
embedding: Optional[list] = None
source: MemorySource = MemorySource.AGENT_INFERRED
confidence: float = 0.6
version: int = 1
status: MemoryStatus = MemoryStatus.ACTIVE
soft_ttl_days: int = 30
hard_ttl_days: int = 365
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
last_accessed_at: str = field(default_factory=lambda: datetime.now().isoformat())
retrieved_count: int = 0
def content_hash(self):
normalized = self.content.strip().lower()
return hashlib.sha256(normalized.encode()).hexdigest()
class LongTermMemory:
"""L2 持久记忆管理器"""
def __init__(self, user_id, db_conn, vector_store, embed_fn):
self.user_id = user_id
self.db = db_conn
self.vector_store = vector_store
self.embed = embed_fn
self.SIMILARITY_MERGE_THRESHOLD = 0.95
def write(self, memory_key, content,
source=MemorySource.AGENT_INFERRED, confidence=0.6):
# 第 1 关:实体级去重(最快、最精确)
existing = self._lookup_by_key(self.user_id, memory_key)
if existing:
return self._update_existing(existing, content, confidence)
# 第 2 关:精确哈希去重
temp_entry = MemoryEntry(
memory_id="", user_id=self.user_id,
memory_key=memory_key, content=content)
content_hash = temp_entry.content_hash()
hash_match = self._lookup_by_hash(self.user_id, content_hash)
if hash_match:
return self._update_existing(hash_match, content, confidence)
# 第 3 关:语义相似去重(最贵,最后用)
embedding = self.embed(content)
similar = self._search_similar(self.user_id, embedding, top_k=1)
if similar and similar[0][1] >= self.SIMILARITY_MERGE_THRESHOLD:
return self._merge_or_update(similar[0][0], content, confidence)
# 通过所有去重关——创建新记忆
entry = MemoryEntry(
memory_id=self._generate_id(),
user_id=self.user_id,
memory_key=memory_key,
content=content,
embedding=embedding,
source=source,
confidence=confidence,
)
self._persist(entry)
return entry
def read(self, query, top_k=5, use_hybrid=True):
results = []
query_embedding = self.embed(query)
vector_results = self._search_similar(
self.user_id, query_embedding, top_k=top_k)
keyword_results = []
if use_hybrid:
keyword_results = self._keyword_search(
self.user_id, query, top_k=top_k)
merged = self._merge_results(vector_results, keyword_results, top_k)
for entry_id, score in merged:
entry = self._load(entry_id)
if entry and entry.status == MemoryStatus.ACTIVE:
entry.last_accessed_at = datetime.now().isoformat()
entry.retrieved_count += 1
results.append(entry)
return results
def update(self, memory_id, content, confidence=None):
entry = self._load(memory_id)
if not entry:
return None
entry.status = MemoryStatus.SUPERSEDED
self._persist(entry)
new_entry = MemoryEntry(
memory_id=self._generate_id(),
user_id=entry.user_id,
memory_key=entry.memory_key,
content=content,
embedding=self.embed(content),
source=entry.source,
confidence=confidence if confidence is not None else entry.confidence,
version=entry.version + 1,
)
new_entry.soft_ttl_days = entry.soft_ttl_days
new_entry.hard_ttl_days = entry.hard_ttl_days
self._persist(new_entry)
return new_entry
def evict(self, dry_run=False):
stats = {"hard_expired": 0, "soft_expired": 0,
"superseded_old": 0, "stale_marked": 0}
now = datetime.now()
all_entries = self._list_active(self.user_id)
for entry in all_entries:
updated = datetime.fromisoformat(entry.updated_at)
accessed = datetime.fromisoformat(entry.last_accessed_at)
if updated + timedelta(days=entry.hard_ttl_days) < now:
if not dry_run:
entry.status = MemoryStatus.EVICTED
self._persist(entry)
stats["hard_expired"] += 1
continue
if (updated + timedelta(days=entry.soft_ttl_days) < now
and accessed + timedelta(days=30) < now):
if not dry_run:
entry.status = MemoryStatus.EVICTED
self._persist(entry)
stats["soft_expired"] += 1
continue
if (entry.status == MemoryStatus.SUPERSEDED
and updated + timedelta(days=90) < now):
if not dry_run:
entry.status = MemoryStatus.EVICTED
self._persist(entry)
stats["superseded_old"] += 1
continue
if (entry.confidence < 0.3
and updated + timedelta(days=60) < now
and entry.status == MemoryStatus.ACTIVE):
if not dry_run:
entry.status = MemoryStatus.STALE
self._persist(entry)
stats["stale_marked"] += 1
return stats
def _lookup_by_key(self, user_id, key):
return None
def _lookup_by_hash(self, user_id, h):
return None
def _search_similar(self, user_id, embedding, top_k):
return []
def _keyword_search(self, user_id, query, top_k):
return []
def _merge_results(self, vec_results, kw_results, top_k):
return []
def _generate_id(self):
import uuid
return str(uuid.uuid4())[:12]
def _persist(self, entry):
pass
def _load(self, memory_id):
return None
def _list_active(self, user_id):
return []
def _update_existing(self, existing, content, confidence):
if confidence >= existing.confidence:
return self.update(existing.memory_id, content, confidence)
else:
existing.last_accessed_at = datetime.now().isoformat()
existing.retrieved_count += 1
self._persist(existing)
return existing
def _merge_or_update(self, existing_id, content, confidence):
existing = self._load(existing_id)
if not existing:
return self.write("", content)
if confidence > existing.confidence:
return self.update(existing_id, content, confidence)
return existing
这个实现中有几个值得强调的设计决策:
- 去重顺序:按成本从低到高——实体级 key 查找(O(1))→ 哈希匹配(O(1))→ 语义相似(O(n) 向量计算 + ANN 搜索)。把最贵的操作放在最后,在前两个关口能拦截大部分重复。
- 版本化而非覆盖:每一版记忆都保留。这样如果用户说"我之前告诉过你我喜欢的颜色",Agent 可以追溯完整的变更历史——"你在 3 月说喜欢深色,5 月改成了浅色"。这对审计和调试至关重要。
- dry_run 模式:
evict()支持 dry_run 参数——先看会删除什么,确认后再实际执行。生产环境中,GC 操作不应该是一个黑盒。 - 置信度作为写入保护:当新记忆的置信度低于已有记忆时,不覆盖已有记忆。这防止了 LLM 的低置信度推断污染用户明确陈述的事实。
关于记忆的每一次变更(写入、更新、淘汰、合并)都应该作为一个审计事件被记录,参见 Agent 审计日志设计——每一条 L2 记忆的 mutation 都对应一条审计记录,形成不可篡改的变更链。
5. 检索边界设计:推、拉与混合触发
记忆系统存储了数据,但检索策略决定了 Agent 在正确的时间看到正确的信息。如果检索得太少——Agent 缺少关键上下文,做错决策。如果检索得太多——上下文窗口被无关信息淹没,注意力稀释。检索边界设计的目标是在"太少"和"太多"之间找到正确的触发位置。
三种检索触发模式
检索不是"每次对话都搜一次"那么简单。根据触发时机,检索分为三种模式:
| 模式 | 触发时机 | 谁决定 | 适用场景 |
|---|---|---|---|
| Push(主动推送) | 任务开始时,在首次 LLM 调用之前 | 系统(预设规则) | 用户偏好、任务模板、关键约束——"每次都应该看到"的信息 |
| Pull(按需拉取) | 任务执行中,Agent 调用检索工具 | LLM(自主决策) | 特定事实查询——"这个 API 的上一个版本是怎么配置的?" |
| Hybrid(混合触发) | Push 开局 + Pull 按需 + 定时刷新 | 系统 + LLM 协作 | 生产级 Agent——既有稳定基线,又有灵活查询能力 |
Push:主动注入——"先给你准备好"
Push 发生在任务开始之前。MemoryManager 扫描 L1/L2 中与当前任务上下文相关的记忆,将相关性最高的 N 条注入 L0 工作记忆,然后才发出第一次 LLM 调用。
Push 的关键约束是预算:
- Token 预算:Push 注入的记忆总量不超过预设的 token 上限(如 2000 tokens)。因为 L0 的上下文窗口是共享资源,Push 占用越多,留给工具调用结果和对话历史的空间越少。
- 条目预算:最多注入 M 条记忆(如 M=5)。不是"找到多少塞多少",而是"只取最相关的 M 条"。
- 相关性门槛:只有相似度 > 0.8 的记忆才被纳入 Push 候选池。低于这个门槛的记忆留在 L2,等 Pull 时再按需查询。
Push 的典型注入内容:
- 用户偏好:"用户要求所有数据库操作使用 read-only 副本"、"用户偏好收到 Slack 通知而非邮件"
- 活跃任务上下文:如果 Agent 正在执行一个跨会话的长任务(如为期三天的数据库迁移),Push 注入上一会话的进度和关键发现
- 实体事实:"user-service 的生产数据库连接池上限是 100"——在执行与 user-service 相关的任务时主动注入
Pull:按需拉取——"需要什么自己查"
Pull 发生在任务执行过程中。Agent 执行到某一步时,发现自己需要某个特定信息——它调用一个检索工具,从 L1/L2 中查询相关记忆。与 Push 不同,Pull 的决策权在 LLM——LLM 判断"我现在需要查什么"。
Pull 的典型场景:
- 工具调用前的上下文补充:Agent 准备调用
deploy_to_kubernetes时,Pull "用户上次要求的区域是 us-east-1" - 遇到错误时的历史查找:部署失败,Agent Pull "上一次部署 user-service 时遇到过类似的权限错误,解决方案是……"
- 用户问题应答:用户问"上次那个数据库迁移用了多久?",Agent Pull 对应的任务结果记忆
Pull 的工具设计要点:Pull 检索工具应该暴露给 LLM 一个清晰的接口——search_memory(query, layer, top_k)。LLM 不需要知道底层是向量搜索还是关键词搜索——它只需要表达"我想找什么"。检索的融合、排序、去重由系统层处理。
Hybrid:混合触发——生产环境的标配
单独的 Push 或 Pull 在各自场景下都有盲区。Push 的问题是"系统猜 Agent 需要什么"——猜错了就浪费预算。Pull 的问题是"Agent 不知道它不知道什么"——Agent 不会主动查询它不知道存在的记忆。
混合触发结合两者:
- 任务启动时:Push——注入用户偏好、活跃任务上下文、关键实体事实(这些都是"每次都应该看到"的信息)
- 执行过程中:Pull——Agent 按需调用检索工具获取特定信息
- 定时刷新:每隔 K 轮(如 K=5),重新评估 Push 记忆池的相关性——因为任务执行过程中上下文在变化,第 1 轮相关的记忆到第 10 轮可能已经无关了。定时刷新确保 L0 中的 Push 记忆保持新鲜
检索融合(Retrieval Fusion)
单靠向量检索有盲区——它擅长语义相似但不擅长精确匹配和结构化过滤。生产级检索应该融合三种搜索:
| 搜索方式 | 擅长 | 盲区 |
|---|---|---|
| 向量搜索(Vector) | 语义相似——"用户喜欢暗色调"能匹配"用户偏好深色主题" | 不擅长精确值匹配——"user_id=42" 在向量空间中没有意义 |
| 关键词搜索(Keyword) | 精确匹配——"v3.0"、"us-east-1" 等术语精确命中 | 不擅长同义词和语义变化——"暗色模式"不会匹配"深色主题" |
| 结构化查询(Structured) | 精确过滤——"user_id=42 AND memory_key='color_preference'" | 不擅长模糊查询——需要知道确切的 key 名称 |
融合流程:三种搜索各自返回 Top K 结果 → 去重(同一 memory_id 只保留一次)→ 按复合分数重新排序 → 取最终 Top K 注入 L0。
复合分数公式:
composite_score = α × vector_score + β × keyword_score + γ × recency_bonus + δ × importance_bonus
# 默认权重(可调)
α = 0.4 # 向量分数权重
β = 0.3 # 关键词分数权重
γ = 0.2 # 新近度加分(越新越高)
δ = 0.1 # 重要性加分(critical > normal)
相关性门槛调优
相关性门槛是检索系统最关键的旋钮:
- 门槛太高(如 0.95):Agent 几乎检索不到任何记忆——表现为"健忘",每次对话都像第一次见面。用户偏好不被注入,Agent 反复问同样的问题。
- 门槛太低(如 0.5):大量弱相关记忆涌入 L0——表现为"注意力涣散",Agent 在无关信息中迷失,做出与当前任务无关的决策。
- 最优门槛:需要通过 A/B 测试确定——用命中率(检索到的记忆中有多少被 Agent 实际使用)和误报率(检索到的记忆中有多少被 Agent 忽略)作为评估指标。典型的最优区间在 0.70–0.85。
代码:RetrievalBoundary 类
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
import math
@dataclass
class RetrievalResult:
"""单条检索结果"""
memory_id: str
content: str
vector_score: float = 0.0
keyword_score: float = 0.0
composite_score: float = 0.0
source_layer: str = "l2"
@dataclass
class RetrievalBoundary:
"""检索边界——控制记忆何时、如何从 L1/L2 注入 L0"""
push_budget_tokens: int = 2000 # Push 注入的 token 预算
push_max_items: int = 5 # Push 注入的最大条目数
relevance_threshold: float = 0.75 # 相关性最低门槛
refresh_interval_turns: int = 5 # 每 K 轮刷新 Push 记忆
# 融合权重
alpha: float = 0.4 # 向量权重
beta: float = 0.3 # 关键词权重
gamma: float = 0.2 # 新近度权重
delta: float = 0.1 # 重要性权重
_push_cache: list = field(default_factory=list)
_turn_counter: int = 0
def push(self, l1_store, l2_store, task_context, embed_fn):
"""任务启动前:主动注入相关记忆到 L0"""
candidates = []
# 从 L1 收集会话级相关记忆
l1_results = l1_store.search(task_context, top_k=self.push_max_items)
for r in l1_results:
if r.score >= self.relevance_threshold:
candidates.append(RetrievalResult(
memory_id=r.id, content=r.content,
vector_score=r.score, source_layer="l1"))
# 从 L2 收集持久记忆——用户偏好、活跃任务、实体事实
l2_results = l2_store.hybrid_search(
task_context, top_k=self.push_max_items,
filters={"memory_type": ["user_preference", "entity_fact", "task_context"]})
for r in l2_results:
if r.score >= self.relevance_threshold:
candidates.append(RetrievalResult(
memory_id=r.id, content=r.content,
vector_score=r.score, keyword_score=r.keyword_score or 0,
source_layer="l2"))
# 融合去重
fused = self._fuse_and_rank(candidates)
# 按 token 预算裁剪
selected = self._budget_clip(fused, self.push_budget_tokens)
self._push_cache = selected
self._turn_counter = 0
return selected
def pull(self, l1_store, l2_store, query, top_k=5):
"""任务执行中:LLM 主动调用检索工具"""
results = []
q_embedding = l2_store.embed(query)
# 向量搜索
vec_results = l2_store.vector_search(q_embedding, top_k=top_k * 2)
for r in vec_results:
results.append(RetrievalResult(
memory_id=r.id, content=r.content,
vector_score=r.score, source_layer="l2"))
# 关键词搜索
kw_results = l2_store.keyword_search(query, top_k=top_k)
for r in kw_results:
results.append(RetrievalResult(
memory_id=r.id, content=r.content,
keyword_score=r.score, source_layer="l2"))
fused = self._fuse_and_rank(results)
return fused[:top_k]
def refresh_if_needed(self, l1_store, l2_store, task_context):
"""每 K 轮刷新 Push 记忆池"""
self._turn_counter += 1
if self._turn_counter >= self.refresh_interval_turns:
return self.push(l1_store, l2_store, task_context)
return self._push_cache
def _fuse_and_rank(self, candidates):
"""融合去重并按复合分数排序"""
seen = {}
fused = []
now = datetime.now().timestamp()
for c in candidates:
if c.memory_id in seen:
# 保留分数更高的
existing = seen[c.memory_id]
if c.vector_score > existing.vector_score:
seen[c.memory_id] = c
continue
seen[c.memory_id] = c
for m_id, c in seen.items():
# 新近度加分:假设内存中可获取 last_updated
recency_bonus = 0.5 # 默认
importance_bonus = 0.5
c.composite_score = (
self.alpha * c.vector_score +
self.beta * c.keyword_score +
self.gamma * recency_bonus +
self.delta * importance_bonus
)
fused.append(c)
fused.sort(key=lambda x: x.composite_score, reverse=True)
return fused
def _budget_clip(self, candidates, max_tokens):
"""按 token 预算裁剪结果列表"""
selected = []
token_count = 0
for c in candidates:
est_tokens = len(c.content) // 3 # 粗略估算:中文 ~1.5 字符/token
if token_count + est_tokens > max_tokens and selected:
break
selected.append(c)
token_count += est_tokens
return selected
def tune_threshold(self, hit_rate, false_positive_rate):
"""根据 A/B 测试结果调整相关性门槛"""
if hit_rate < 0.3:
self.relevance_threshold = max(0.5, self.relevance_threshold - 0.05)
elif false_positive_rate > 0.4:
self.relevance_threshold = min(0.95, self.relevance_threshold + 0.05)
return self.relevance_threshold
检索边界不是孤立的——它使用上下文信封(Context Envelope)将检索到的记忆注入 L0。关于上下文数据的序列化和传输协议,参见 Agent 上下文协议设计。检索边界定义了"什么数据被检索",上下文协议定义了"数据如何打包和传递"。
6. 记忆卫生与防污染
记忆系统不是"只写不清理"的日志。随着 Agent 运行时间增长,L2 持久记忆中会积累各种"污染物"——重复数据、矛盾事实、过期信息、敏感数据。如果没有主动的卫生机制,记忆质量会持续下降,直到 Agent 的决策基于过时和矛盾的信息。第四章讲了单条记忆的生命周期管理;这一章聚焦跨记忆的污染检测和清理策略。
四种污染向量
记忆污染有四种典型模式,每种都需要不同的检测和修复策略:
| 污染类型 | 成因 | 危害 | 检测方式 |
|---|---|---|---|
| 重复(Duplicates) | 同一事实在不同时间、不同上下文下被多次写入,使用了不同的 memory_key 或文本表述 | 检索时同一事实返回多条,浪费 L0 预算;更新时只更新了其中一条,其他副本变成过期数据 | 哈希去重 + 语义相似去重 + 实体级 key 去重(已在第 4 章实现) |
| 矛盾(Contradictions) | 用户改了偏好但旧偏好没被覆盖;Agent 在不同任务中推断出矛盾的事实 | Agent 在两条矛盾信息中随机选择,决策不可预测——"用户喜欢深色模式" vs "用户喜欢浅色模式"同时存在于 L2 | 写入时搜索冲突事实 + 定期矛盾扫描 |
| 过期(Staleness) | 事实已经改变但旧记录从未被更新或淘汰;TTL 未配置或配置过长 | Agent 基于过时信息做决策——"API 端点还在 /v1/"(实际已迁移到 /v2/) | 过期分数计算 + TTL 到期检测 |
| 敏感信息泄露(Sensitivity) | 用户无意中输入了 PII(电话号码、邮箱、身份证号)或凭据(API key、token),被写入了 L2 | 隐私泄露 + 安全风险——凭据在检索时被注入 L0,可能在后续对话中被 LLM 输出 | 正则模式匹配 + LLM 敏感内容分类器 |
矛盾检测:不止是关键词相反
矛盾检测比去重更难——两条记忆的文本可能完全不同,但表达的意思互相排斥。简单的关键词反义词匹配("喜欢" vs "讨厌")覆盖不了大部分矛盾。有效的矛盾检测需要:
- 实体 + 属性对齐:先确认两条记忆是否谈论同一实体的同一属性。如果一条是 "user.color_preference = dark" 另一条是 "user.color_preference = light"——这是直接矛盾。如果一条是 "user.color_preference = dark" 另一条是 "user.font_preference = large"——这不矛盾,属性不同。
- 语义矛盾判断:对于非结构化记忆(如"用户不喜欢 X 品牌" vs "用户说 X 品牌的新系列不错"),使用 LLM 判断是否存在矛盾。将两条记忆文本发送给一个轻量级的判断 prompt:
# 矛盾判断 Prompt
你是一个事实一致性检查器。判断以下两条记忆是否存在矛盾。
如果矛盾,返回 {"contradiction": true, "reason": "..."}
如果不矛盾,返回 {"contradiction": false}
如果一条是对另一条的更新/修正,返回 {"contradiction": false, "update": true}
记忆 A: {memory_a.content}
记忆 B: {memory_b.content}
- 自动解决 vs 人工审查:如果两条矛盾的记忆都有明确的置信度差异(如一条置信度 1.0,另一条 0.4),自动保留高置信度的一方,将低置信度方标记为
superseded。如果置信度相近(差异 < 0.2),两者都标记为待审查,加入人工审查队列。
过期分数(Staleness Score)
不是所有"旧"记忆都应该被淘汰——有些事实是永不过期的(如"地球绕太阳转")。过期判断需要一个复合分数:
staleness_score = w₁ × age_factor + w₂ × access_decay + w₃ × contradiction_flag
age_factor = min(1.0, days_since_creation / max_age_days)
access_decay = 1.0 - (retrieval_count_last_30d / expected_retrieval_count)
contradiction_flag = 1.0 if has_active_contradiction else 0.0
# 默认权重
w₁ = 0.4 # 年龄权重
w₂ = 0.3 # 访问衰减权重
w₃ = 0.3 # 矛盾标记权重
当 staleness_score > 0.7 时,记忆被标记为 stale——在检索时降权(乘以 0.5 的衰减系数)。当 staleness_score > 0.9 时,记忆进入候选淘汰队列。
PII / 敏感信息扫描
敏感信息检测分两层:
- 正则模式匹配(快速、低成本):在写入路径上扫描常见 PII 模式——邮箱地址、手机号(中国 11 位、国际格式)、身份证号(18 位)、信用卡号(Luhn 算法)、API key(特定前缀如
sk-、ghp_)、JWT token(三段 base64 结构)。命中任一模式 → 阻断写入 + 告警。 - LLM 敏感内容分类器(慢、高成本、高精度):对于通过正则扫描但仍有嫌疑的内容(如"我家地址是朝阳区建国路 88 号"——正则可能漏掉),使用 LLM 判断是否为敏感个人信息。只在正则阶段标记为
suspicious时才触发 LLM 分类器,控制成本。
敏感信息阻断不是可选的——生产环境中应该默认启用。唯一的例外是经过明确授权的审计场景——即便如此,敏感数据应加密存储而非明文写入 L2。
代码:MemoryHygiene 类
import re
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
class HygieneAction(Enum):
BLOCKED = "blocked" # 写入被阻断
FLAGGED = "flagged" # 标记待审查
MERGED = "merged" # 自动合并
SUPERSEDED = "superseded" # 被新版本取代
STALE_MARKED = "stale_marked"
EVICTED = "evicted"
@dataclass
class HygieneEvent:
"""一次卫生操作事件"""
action: HygieneAction
memory_id: str
reason: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
metadata: dict = field(default_factory=dict)
@dataclass
class MemoryHygiene:
"""记忆卫生管理器——防污染、去重、矛盾检测、敏感信息过滤"""
# 矛盾检测配置
contradiction_llm_threshold: float = 0.7 # 触发 LLM 判断的相似度门槛
contradiction_auto_resolve_gap: float = 0.2 # 置信度差异 > 此值则自动解决
# 过期配置
staleness_threshold_warn: float = 0.7
staleness_threshold_evict: float = 0.9
access_decay_days: int = 30
# 敏感信息正则模式
PII_PATTERNS = {
"email": re.compile(r'[\w\.-]+@[\w\.-]+\.\w+'),
"phone_cn": re.compile(r'1[3-9]\d{9}'),
"id_card_cn": re.compile(r'\d{17}[\dXx]'),
"api_key_openai": re.compile(r'sk-[A-Za-z0-9]{32,}'),
"api_key_github": re.compile(r'ghp_[A-Za-z0-9]{36}'),
"jwt": re.compile(r'eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+'),
"credit_card": re.compile(r'\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b'),
}
def __init__(self, l2_store, llm_classifier=None, audit_log=None):
self.l2 = l2_store
self.llm_classifier = llm_classifier # 可选的 LLM 分类器
self.audit_log = audit_log # 审计日志系统
self.remediation_queue: list = [] # 人工审查队列
def pre_write_check(self, memory_key, content, user_id):
"""写入前检查——在 LongTermMemory.write() 之前调用"""
events = []
# 第 1 关:敏感信息扫描
sens_result = self.sensitivity_scan(content)
if sens_result["severity"] == "high":
events.append(HygieneEvent(
action=HygieneAction.BLOCKED,
memory_id="", reason=f"PII blocked: {sens_result['matches']}",
metadata=sens_result))
self._emit_audit(events)
return {"allowed": False, "reason": sens_result["matches"], "events": events}
if sens_result["severity"] == "suspicious":
events.append(HygieneEvent(
action=HygieneAction.FLAGGED,
memory_id="", reason="Content flagged for sensitivity review",
metadata=sens_result))
# 不阻断,但标记
# 第 2 关:矛盾检测
contradictions = self.contradiction_scan(
user_id, memory_key, content)
if contradictions:
for contra in contradictions:
events.append(HygieneEvent(
action=HygieneAction.FLAGGED,
memory_id=contra.get("existing_id", ""),
reason=f"Contradiction with {contra.get('existing_id','')}: {contra.get('detail','')}",
metadata=contra))
self._emit_audit(events)
return {"allowed": True, "events": events}
def sensitivity_scan(self, content):
"""扫描内容中的敏感信息"""
matches = {}
severity = "none"
for pattern_name, pattern in self.PII_PATTERNS.items():
found = pattern.findall(content)
if found:
matches[pattern_name] = found[:3] # 只保留前 3 个匹配
if matches:
# API key、JWT、身份证 → 高危
high_severity_keys = {"api_key_openai", "api_key_github", "jwt", "id_card_cn"}
if set(matches.keys()) & high_severity_keys:
severity = "high"
else:
severity = "suspicious"
# 如果正则没命中但内容可疑,且 LLM 分类器可用
if severity == "none" and self.llm_classifier:
llm_result = self.llm_classifier.classify(content)
if llm_result.get("sensitive", False):
severity = "suspicious"
matches["llm_flagged"] = [llm_result.get("reason", "unknown")]
return {"severity": severity, "matches": matches}
def contradiction_scan(self, user_id, memory_key, content):
"""检测新内容是否与已有记忆矛盾"""
contradictions = []
# 精确 key 查找
existing = self.l2.lookup_by_key(user_id, memory_key)
if existing and existing.content != content:
# 同 key 不同值——可能是更新,不是矛盾(由 LongTermMemory.update 处理)
return []
# 语义搜索——找相似但可能矛盾的内容
embedding = self.l2.embed(content)
similar = self.l2.vector_search(embedding, top_k=5)
for sim_entry, sim_score in similar:
if sim_score < 0.6: # 不够相似,不可能是同一话题
continue
if sim_score > self.contradiction_llm_threshold:
# 使用 LLM 判断是否矛盾
if self.llm_classifier:
check = self.llm_classifier.check_contradiction(
content, sim_entry.content)
if check.get("contradiction"):
contradictions.append({
"existing_id": sim_entry.memory_id,
"existing_content": sim_entry.content[:200],
"detail": check.get("reason", ""),
"existing_confidence": sim_entry.confidence,
"confidence_gap": abs(
getattr(sim_entry, 'confidence', 0.5) - 0.5)
})
return contradictions
def staleness_score(self, entry):
"""计算一条记忆的过期分数(0-1)"""
now = datetime.now()
created = datetime.fromisoformat(entry.created_at)
accessed = datetime.fromisoformat(entry.last_accessed_at)
days_since_created = (now - created).days
max_age = getattr(entry, 'hard_ttl_days', 365)
age_factor = min(1.0, days_since_created / max(max_age, 1))
# 过去 30 天内的检索次数
days_since_access = (now - accessed).days
access_decay = min(1.0, days_since_access / self.access_decay_days)
# 矛盾标记
contradiction_flag = 1.0 if entry.status.value == "superseded" else 0.0
score = 0.4 * age_factor + 0.3 * access_decay + 0.3 * contradiction_flag
if score > self.staleness_threshold_evict:
return score, "evict"
elif score > self.staleness_threshold_warn:
return score, "stale"
return score, "healthy"
def run_hygiene_cycle(self, user_id, dry_run=False):
"""执行一次完整的卫生检查周期"""
stats = {"duplicates_found": 0, "contradictions_found": 0,
"stale_marked": 0, "evicted": 0, "sensitivity_flagged": 0}
all_entries = self.l2.list_active(user_id)
# 扫描过期
for entry in all_entries:
score, action = self.staleness_score(entry)
if action == "evict" and not dry_run:
entry.status = "evicted"
self.l2.persist(entry)
stats["evicted"] += 1
elif action == "stale":
stats["stale_marked"] += 1
return stats
def _emit_audit(self, events):
"""将卫生事件写入审计日志"""
if self.audit_log:
for event in events:
self.audit_log.record(
event_type="memory_hygiene",
action=event.action.value,
memory_id=event.memory_id,
reason=event.reason,
metadata=event.metadata)
每一次卫生操作——去重合并、矛盾标记、过期淘汰、PII 阻断——都应该作为一个审计事件被记录。审计日志提供了记忆变更的不可篡改的证据链。参见 Agent 审计日志设计——每一条卫生事件都是审计流水线上的一环。
7. 多租户记忆隔离与作用域
如果 Agent 为多个用户或组织服务,记忆必须被严格隔离。用户 A 的偏好绝不能泄露到用户 B 的上下文中——这不是性能优化,而是数据安全的基本要求。在 SaaS 客服、企业知识库、多租户 Agent 平台等场景中,作用域隔离是记忆系统的安全基石。
作用域树:四个层级
记忆的作用域不是简单的"用户 A vs 用户 B"二元划分,而是一个层级树:
/global/ ← 全局共享只读(如产品文档摘要、公共知识)
├── /org/{org_id}/ ← 组织共享读写(同组织成员可读写)
│ ├── /user/{user_id}/ ← 用户隔离读写(仅该用户可读写)
│ │ └── /task/{task_id}/← 任务作用域(临时,任务结束后可提升或清理)
│ └── /user/{user_id2}/
└── /org/{org_id2}/
四个层级各有不同的权限模型:
| 作用域 | 读权限 | 写权限 | 生命周期 | 示例 |
|---|---|---|---|---|
| /global/ | 所有用户 | 仅管理员 / 系统 | 长期 | 产品功能摘要、公共 FAQ 知识 |
| /org/{id}/ | 组织内所有成员 | 组织内成员 | 组织存续期间 | 团队规范、共享项目上下文 |
| /user/{id}/ | 仅该用户 | 仅该用户 | 用户存续期间 | 个人偏好、对话历史、配置 |
| /task/{id}/ | 任务执行者 | 任务执行者 | 任务持续期间 | 中间步骤状态、暂态工具输出 |
跨作用域访问规则
作用域隔离的核心原则:默认禁止跨作用域访问,明确授权例外。
- 向下读取:子作用域可以读取父作用域的内容——
/user/42/可以读取/org/1/和/global/(继承)。但/user/42/不能读取/user/43/(兄弟隔离)。 - 向上写入:默认禁止。子作用域不能向父作用域写入——
/user/42/不能写入/org/1/。防止普通用户污染组织级知识。 - 同级隔离:严格禁止。两个
/user/作用域之间、两个/org/作用域之间,不允许任何直接访问。 - 提升(Promotion):唯一的例外。任务作用域中的记忆可以通过显式的
promote()调用提升到用户作用域——"这次任务中学到的关键经验值得长期保存"。提升需要显式调用,不能自动发生,因为提升意味着跨作用域边界的数据移动。
存储层的命名空间强制执行
作用域隔离不能只依赖应用层检查——必须在存储层强制执行。所有记忆的 key 以作用域路径为前缀:
# L2 中的 key 格式
/global/knowledge/product_faq_summary
/org/acme-corp/config/deployment_region
/user/42/preferences/color_scheme
/user/42/task/task-abc123/step_3_output
向量检索时,搜索范围始终限定在 scope_prefix 内。即使向量搜索返回了其他用户的相似记忆,scope filter 在结果注入 L0 之前就过滤掉了。具体做法:
- 向量的 metadata 中包含
scope_path - 检索时添加 filter:
scope_path LIKE '/user/42/%' OR scope_path LIKE '/org/acme-corp/%' OR scope_path = '/global/%' - 过滤后的结果再进行去重和排序
多用户隔离验证
在生产环境部署前,必须通过交叉用户记忆泄漏测试:
- 用户 A 写入一条偏好:"我喜欢深色模式"
- 用户 B 发起一次对话,询问"你喜欢什么颜色模式?"
- 验证:Agent 不应返回用户 A 的偏好。如果返回了——作用域隔离失效,这是高危安全缺陷。
这个测试应该在 CI/CD 流水线中作为回归测试自动运行。
代码:MemoryScope 类
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class ScopeLevel(Enum):
GLOBAL = "global"
ORG = "org"
USER = "user"
TASK = "task"
class AccessType(Enum):
READ = "read"
WRITE = "write"
@dataclass
class ScopePath:
"""作用域路径解析结果"""
level: ScopeLevel
global_part: str = "/global/"
org_id: Optional[str] = None
user_id: Optional[str] = None
task_id: Optional[str] = None
def to_prefix(self):
parts = ["/global/"]
if self.org_id:
parts.append(f"/org/{self.org_id}/")
if self.user_id:
parts.append(f"/user/{self.user_id}/")
if self.task_id:
parts.append(f"/task/{self.task_id}/")
return "".join(parts)
def is_ancestor_of(self, other):
"""self 是否是 other 的祖先作用域"""
return other.to_prefix().startswith(self.to_prefix())
def is_sibling_of(self, other):
"""self 和 other 是否是同级作用域的不同实例"""
if self.level != other.level:
return False
if self.level == ScopeLevel.GLOBAL:
return False # 全局只有一个
if self.level == ScopeLevel.ORG:
return self.org_id != other.org_id
if self.level == ScopeLevel.USER:
return self.org_id == other.org_id and self.user_id != other.user_id
if self.level == ScopeLevel.TASK:
return self.user_id == other.user_id and self.task_id != other.task_id
return False
class MemoryScope:
"""多租户记忆作用域管理器"""
def __init__(self, storage_backend):
self.storage = storage_backend
def check_access(self, requester_scope, target_scope, access_type):
"""验证请求者是否有权访问目标作用域"""
req = self._parse_scope(requester_scope)
tgt = self._parse_scope(target_scope)
# 全局作用域:所有人可读,仅管理员可写
if tgt.level == ScopeLevel.GLOBAL:
if access_type == AccessType.READ:
return True, "global read allowed for all"
return False, "global write requires admin"
# 自己访问自己:始终允许
if req.to_prefix() == tgt.to_prefix():
return True, "self access"
# 祖先访问:子作用域可以读父作用域
if tgt.is_ancestor_of(req):
if access_type == AccessType.READ:
return True, "ancestor read (inheritance)"
return False, "cannot write to ancestor scope"
# 后代访问:可以读但不能写后代
if req.is_ancestor_of(tgt):
if access_type == AccessType.READ:
return True, "descendant read"
return False, "cannot write to descendant scope"
# 兄弟隔离
if req.is_sibling_of(tgt) or (
req.level == ScopeLevel.USER and tgt.level == ScopeLevel.USER
and req.user_id != tgt.user_id):
return False, "cross-user isolation"
return False, "access denied"
def promote(self, from_scope, to_scope, entry_id):
"""将记忆从子作用域提升到父作用域"""
from_path = self._parse_scope(from_scope)
to_path = self._parse_scope(to_scope)
# 只能向祖先作用域提升
if not from_path.is_ancestor_of(to_path) and not to_path.is_ancestor_of(from_path):
raise PermissionError(
f"Promotion must be between ancestor/descendant scopes. "
f"Got {from_scope} → {to_scope}")
# 如果 to 是 from 的祖先,这是向上提升
if from_path.to_prefix().startswith(to_path.to_prefix()):
# 验证:只能从 task 提升到 user,或 user 提升到 org
valid_promotions = {
(ScopeLevel.TASK, ScopeLevel.USER),
(ScopeLevel.USER, ScopeLevel.ORG),
(ScopeLevel.TASK, ScopeLevel.ORG),
}
if (from_path.level, to_path.level) not in valid_promotions:
raise PermissionError(
f"Invalid promotion path: {from_path.level.value} → {to_path.level.value}")
entry = self.storage.load(from_path.to_prefix(), entry_id)
if not entry:
raise ValueError(f"Entry {entry_id} not found in {from_scope}")
# 复制到目标作用域
new_id = self.storage.copy(
entry, from_prefix=from_path.to_prefix(),
to_prefix=to_path.to_prefix())
return new_id
def build_search_filter(self, scope):
"""为向量/结构化搜索构建作用域过滤条件"""
path = self._parse_scope(scope)
# 构造允许搜索的作用域前缀列表
allowed_prefixes = ["/global/"]
if path.org_id:
allowed_prefixes.append(f"/org/{path.org_id}/")
if path.user_id:
allowed_prefixes.append(f"/user/{path.user_id}/")
if path.task_id:
allowed_prefixes.append(f"/task/{path.task_id}/")
return {
"scope_prefix": allowed_prefixes,
"operator": "OR"
}
def _parse_scope(self, scope_str):
"""解析作用域字符串为 ScopePath"""
path = ScopePath()
parts = [p for p in scope_str.split("/") if p]
for i, part in enumerate(parts):
if part == "global":
path.level = ScopeLevel.GLOBAL
elif part == "org" and i + 1 < len(parts):
path.org_id = parts[i + 1]
path.level = ScopeLevel.ORG
elif part == "user" and i + 1 < len(parts):
path.user_id = parts[i + 1]
path.level = ScopeLevel.USER
elif part == "task" and i + 1 < len(parts):
path.task_id = parts[i + 1]
path.level = ScopeLevel.TASK
return path
# 使用示例
scope_mgr = MemoryScope(storage_backend=None)
# 用户 A 访问自己的记忆 → 允许
print(scope_mgr.check_access(
"/global/org/acme/user/42/", "/global/org/acme/user/42/task/abc/",
AccessType.READ))
# → (True, 'ancestor read (inheritance)')
# 用户 A 尝试读用户 B 的记忆 → 拒绝
print(scope_mgr.check_access(
"/global/org/acme/user/42/", "/global/org/acme/user/43/",
AccessType.READ))
# → (False, 'cross-user isolation')
作用域前缀和上下文信封(Context Envelope)的命名空间是一一对应的——每个作用域路径映射到上下文协议中的一个命名空间前缀。具体映射规则参见 Agent 上下文协议设计——作用域管"谁能看到",协议管"怎么传递"。
8. 投产 Checklist 与监控
记忆系统的设计从架构到代码再到部署,最后一公里是确保所有子系统在生产环境中协同工作。本章提供一份投产 Checklist 和监控指标清单,以及一个将前面所有代码片段整合在一起的 MemoryManager 编排器。
投产 Checklist
在 Agent 记忆系统上线前,逐项确认以下清单:
- L2 持久存储已配置备份策略——SQLite/PostgreSQL 数据库 + 向量数据库均有定期备份,RPO < 1 小时
- TTL 已按记忆类别配置——用户偏好 = 90 天,任务学习 = 30 天,实体事实 = 永久 + 定期审查
- 去重流水线已激活——实体级 key 查找 → 精确哈希 → 语义相似(阈值 0.95)三道防线全开
- 矛盾检测在写入时运行 + 定期扫描——Pre-write hook 扫描冲突,每日全量扫描过期矛盾
- PII / 敏感信息过滤器已激活——block-on-high-severity 模式,API key、身份证号等直接阻断
- GC 流水线按计划运行——每日扫描过期/过期/矛盾记录,dry_run 模式验证后再执行
- 作用域隔离已测试——跨用户记忆泄漏测试通过(用户 A 的偏好不出现在用户 B 的检索结果中)
- MemoryManager 与 Agent 主循环已集成测试——端到端:任务启动 → Push 记忆 → 任务执行 → Pull 检索 → 会话结束 → 提升持久化 → GC 运行
- 记忆指标已接入监控面板——写入速率、读取速率、命中率、去重率、矛盾率、过期分布、每用户作用域数量
关键监控指标
| 指标 | 含义 | 健康范围 | 告警阈值 |
|---|---|---|---|
| write_rate | 每分钟向 L2 写入的记忆条目数 | 1–20/min | > 50/min(可能写入循环或风暴) |
| read_hit_ratio | Pull 检索中返回结果次数 / 总 Pull 次数 | > 0.3 | < 0.1(相关性门槛可能过高) |
| dedup_rate | 被去重拦截的写入 / 总写入数 | 0.1–0.5 | > 0.7(可能去重过于激进或写入逻辑有 bug) |
| contradiction_count | 每日新检测到的矛盾记忆对数 | < 10 | > 50(可能版本管理或覆盖逻辑异常) |
| staleness_p50 / p95 | 所有活跃记忆的过期分数分布 | p50 < 0.3, p95 < 0.7 | p50 > 0.5(大量记忆接近淘汰,TTL 或 GC 可能未运行) |
| scope_count_per_user | 每用户的活跃作用域数量 | 1–10 | > 50(可能任务作用域未清理) |
| pii_block_count | 每日被 PII 过滤器阻断的写入次数 | 0–5 | > 20(用户可能在无意中泄露凭据,需要上游修复) |
代码:MemoryManager 编排器——整合所有子系统
以下 MemoryManager 将前面所有的子系统——WorkingMemory(第 3 章)、LongTermMemory(第 4 章)、RetrievalBoundary(第 5 章)、MemoryHygiene(第 6 章)、MemoryScope(第 7 章)——整合为一个统一的编排器。它也是 Agent 主循环中唯一需要调用的记忆入口:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class TaskPhase(Enum):
STARTUP = "startup"
EXECUTING = "executing"
TOOL_CALL = "tool_call"
TURN_END = "turn_end"
COMPLETE = "complete"
@dataclass
class MemoryManager:
"""Agent 记忆系统编排器——整合 L0-L3 四层 + 检索边界 + 卫生 + 作用域"""
# 核心组件
working_memory: object = None # WorkingMemory 实例(第 3 章)
session_memory: object = None # L1 会话存储
long_term_memory: object = None # LongTermMemory 实例(第 4 章)
retrieval_boundary: object = None # RetrievalBoundary 实例(第 5 章)
hygiene: object = None # MemoryHygiene 实例(第 6 章)
scope: object = None # MemoryScope 实例(第 7 章)
external_retrieval: object = None # L3 RAG pipeline
# 配置
tenant_id: str = ""
user_id: str = ""
task_id: str = ""
# 运行时状态
current_phase: TaskPhase = TaskPhase.STARTUP
turn_count: int = 0
metrics: dict = field(default_factory=lambda: {
"writes": 0, "reads": 0, "hits": 0, "dedups": 0,
"contradictions": 0, "pii_blocks": 0, "evictions": 0,
})
# ─── Agent 主循环入口 ───
def on_task_start(self, task_goal, plan_steps, constraints=None):
"""任务启动:初始化 L0 + Push 记忆"""
self.current_phase = TaskPhase.STARTUP
self.task_id = self._generate_task_id()
# 1. 初始化工作记忆
self.working_memory.update_task(task_goal, plan_steps, constraints)
# 2. Push:从 L1/L2 注入相关记忆
task_context = f"task: {task_goal}"
push_results = self.retrieval_boundary.push(
self.session_memory, self.long_term_memory,
task_context, self.long_term_memory.embed)
# 3. 将 Push 结果追加到 scratchpad(或 constraints)
if push_results:
self.working_memory.update_scratchpad(
f"[记忆注入] 从 L1/L2 加载了 {len(push_results)} 条相关记忆")
for r in push_results:
self.working_memory.update_scratchpad(
f" · {r.content[:100]}...")
self.metrics["reads"] += len(push_results)
self.metrics["hits"] += len(push_results)
return self.working_memory.to_prompt()
def on_pre_llm_call(self):
"""每次 LLM 调用前:检查是否需要刷新 Push 记忆"""
if self.current_phase == TaskPhase.EXECUTING:
self.retrieval_boundary.refresh_if_needed(
self.session_memory, self.long_term_memory,
self.working_memory.task_goal)
return self.working_memory.to_prompt()
def on_tool_call(self, tool_name, tool_input):
"""工具调用前:Pull 相关记忆"""
self.current_phase = TaskPhase.TOOL_CALL
# Pull:检索与工具调用相关的记忆
query = f"{tool_name} {str(tool_input)[:200]}"
pull_results = self.retrieval_boundary.pull(
self.session_memory, self.long_term_memory, query, top_k=3)
if pull_results:
self.metrics["reads"] += 1
self.metrics["hits"] += 1
context = "\n".join([r.content for r in pull_results])
self.working_memory.update_scratchpad(
f"[Pull: {tool_name}] {context[:300]}")
return pull_results
def on_observation(self, tool_name, result, importance="normal"):
"""工具调用完成:写入 L0 + L1"""
# L0: 工作记忆
self.working_memory.add_observation(tool_name, result, importance)
# L1: 会话记忆(write-through)
entry = {
"tool_name": tool_name,
"result": result[:500],
"importance": importance,
"timestamp": datetime.now().isoformat(),
}
self.session_memory.append(self.task_id, entry)
def on_turn_end(self):
"""每轮推理结束:推进计划 + 更新计数器"""
self.turn_count += 1
self.current_phase = TaskPhase.EXECUTING
# 如果当前步骤完成,推进计划
if self.turn_count > 0:
self.working_memory.advance_plan()
def on_task_complete(self, task_summary=""):
"""任务完成:从 L1 提升重要记忆到 L2 + 清理"""
self.current_phase = TaskPhase.COMPLETE
# 1. 评估 L1 中的重要记忆,提升到 L2
session_entries = self.session_memory.get_all(self.task_id)
promoted = 0
for entry in session_entries:
if entry.get("importance") in ("critical", "high"):
# 卫生检查
hygiene_check = self.hygiene.pre_write_check(
memory_key=f"task_{self.task_id}_outcome",
content=entry["result"],
user_id=self.user_id)
if not hygiene_check["allowed"]:
self.metrics["pii_blocks"] += 1
continue
# 写入 L2
result = self.long_term_memory.write(
memory_key=f"task_{self.task_id}_outcome",
content=entry["result"],
source="task_outcome" if hasattr(
self.long_term_memory, 'MemorySource')
else "task_outcome",
confidence=0.7)
promoted += 1
self.metrics["writes"] += 1
# 2. 写入任务学习总结
if task_summary:
self.long_term_memory.write(
memory_key=f"task_{self.task_id}_learning",
content=task_summary,
confidence=0.8)
self.metrics["writes"] += 1
# 3. 清理任务作用域
self.session_memory.clear(self.task_id)
# 4. 执行 GC(每日一次,避免每次任务结束都跑)
self._maybe_run_gc()
return {"promoted": promoted, "task_id": self.task_id}
def search(self, query, layer="l2", top_k=5):
"""Agent 主动搜索记忆(暴露给 LLM 的工具接口)"""
self.metrics["reads"] += 1
if layer == "l2":
results = self.long_term_memory.read(query, top_k=top_k)
if results:
self.metrics["hits"] += 1
return results
elif layer == "l1":
return self.session_memory.search(query, top_k=top_k)
elif layer == "l3":
return self.external_retrieval.search(query, top_k=top_k)
return []
# ─── 内部辅助 ───
def _generate_task_id(self):
import uuid
return f"task-{str(uuid.uuid4())[:8]}"
def _maybe_run_gc(self):
"""每日运行一次垃圾回收"""
today = datetime.now().strftime("%Y-%m-%d")
last_gc = getattr(self, "_last_gc_date", "")
if last_gc == today:
return
self._last_gc_date = today
# Dry run 先看
dry_stats = self.long_term_memory.evict(dry_run=True)
# 确认后执行
gc_stats = self.long_term_memory.evict(dry_run=False)
self.metrics["evictions"] += sum(gc_stats.values())
def get_metrics(self):
"""导出当前指标"""
return {
**self.metrics,
"hit_ratio": (self.metrics["hits"] / max(self.metrics["reads"], 1)),
"turn_count": self.turn_count,
"phase": self.current_phase.value,
}
# ─── Agent 主循环集成示例 ───
# 初始化
mm = MemoryManager(
working_memory=WorkingMemory(),
session_memory=SessionStore(),
long_term_memory=LongTermMemory(
user_id="user-42", db_conn=db, vector_store=vs, embed_fn=embed),
retrieval_boundary=RetrievalBoundary(),
hygiene=MemoryHygiene(l2_store=l2, audit_log=audit),
scope=MemoryScope(storage_backend=store),
tenant_id="acme-corp",
user_id="user-42",
)
# Agent 主循环
task_goal = "将 user-service 从 v2.1 升级到 v3.0"
plan = ["检查 breaking changes", "在 staging 部署", "运行集成测试", "金丝雀发布", "全量切换"]
# 1. 任务启动
context = mm.on_task_start(task_goal, plan, constraints=["零停机", "不改 schema"])
# → L0 已填充,Push 记忆已注入
# 2. 推理 + 工具调用循环
for turn in range(10):
# LLM 推理
context = mm.on_pre_llm_call()
# llm_response = llm.chat(messages=[system_msg, user_msg, context])
# 模拟工具调用
tool_name = "check_breaking_changes"
mm.on_tool_call(tool_name, {"target_version": "v3.0"})
mm.on_observation(tool_name, "v3.0 移除了 /api/v1/users,改用 /api/v2/users", "critical")
mm.on_turn_end()
# 3. 任务完成
result = mm.on_task_complete(
task_summary="v3.0 升级关键:/api/v1/users → /api/v2/users,需更新所有调用方")
print(f"任务完成:提升了 {result['promoted']} 条记忆到 L2")
print(f"指标:{mm.get_metrics()}")
这个 MemoryManager 是整个记忆系统的单一入口。Agent 开发者不需要分别调用 WorkingMemory、LongTermMemory、RetrievalBoundary——只需在任务生命周期的关键节点调用 MemoryManager 的对应方法。内部的检索边界、卫生检查、作用域隔离全部透明。
关于记忆系统的指标和可观察性,参见 Agent 可观察性——包括如何将这些指标接入 Grafana、Prometheus、Datadog 等监控平台,以及如何配置告警规则。
常见问题(FAQ)
Agent 记忆和 RAG 到底有什么区别?
RAG 是一个检索机制:工具调用 → 搜索外部文档 → 注入结果到上下文。记忆是整个持久化 + 检索系统:决定什么该记、记多久、怎么查、何时淘汰、如何隔离。RAG 可以做 L3 外部检索层;记忆系统包含 L0-L3 全部四层以及写入策略、生命周期、防污染和隔离。
已有的 agent-memory-systems 文章和本文是什么关系?
agent-memory-systems 讲「怎么存」——SQLite schema、向量数据库配置、JSON 持久化。本文讲「怎么设计」——四层架构、检索边界、生命周期管理、防污染。前者是实现手册,后者是建筑蓝图。建议先读前者了解存储基础,再读本文掌握架构设计。
L0 工作记忆和上下文窗口有什么区别?
上下文窗口是 LLM 的物理限制(如 128K tokens)。L0 工作记忆是你选择放进窗口的结构化内容——不是把所有历史消息都塞进去,而是精选 task goal + active plan + recent observations + constraints。L0 是你的设计决策;窗口大小是 LLM 的物理约束。
记忆系统必须要有向量数据库吗?
不必须。L1 会话记忆可以用 Redis/dict,L2 持久记忆的结构化部分(entity facts、user preferences)可以用 SQLite/Postgres。向量数据库只在需要语义搜索时才必要——通常用于 L2 的非结构化记忆(conversation snippets、task learnings)和 L3 外部检索。小规模 Agent 可以完全不用向量搜索。
多租户场景下怎么防止用户 A 的记忆泄露给用户 B?
通过作用域前缀强制隔离:所有记忆 key 以 /user/{user_id}/ 为前缀。检索时,MemoryScope.check_access() 验证请求者的 scope 是否能读目标 memory。跨 scope 查询被防火墙拦截。即使向量搜索返回了其他用户的相似记忆,scope filter 也会在注入 L0 前过滤掉。
记忆系统需要多少存储空间?怎么估算?
粗略估算:每条 L2 记忆 ~2-5KB(结构化字段 + embedding 向量)。一个活跃用户每天产生 10-50 条记忆。100 个日活用户 × 30 天 × 5KB ≈ 150MB/月。加上 embedding 向量(1536 维 × 4 bytes = 6KB/条)≈ 额外 180MB。总计约 330MB/月。配合 TTL 淘汰策略可控制在 500MB 以内。
继续阅读 / 下一步阅读
本文是 Agent Memory and Context Engineering 系列的第一篇。建议按以下路径继续:
- Agent 记忆系统:从短期窗口到长期向量存储 — 具体的存储实现(SQLite/JSON/向量DB),本文的建筑地基
- Agent 上下文协议设计 — 上下文传递的管道和 envelope,记忆系统的数据通道
- Agent 审计日志设计 — 记忆变更产生审计事件,证据链不可篡改
- 多 Agent 编排 — 多 Agent 场景下的记忆共享与隔离策略
- 模型无关的 Agent 设计 — 记忆架构的 provider-agnostic 设计原则