Agent 记忆系统设计:短期记忆、长期记忆与检索边界

30秒要点

  • 核心问题:"加个向量数据库"不等于有了记忆系统。Agent 在生产环境中会忘记任务中间状态、跨会话积累记忆污染、混淆多用户数据——需要一个架构层面的记忆系统设计。
  • 解决方案:L0-L3 四层记忆架构——L0 工作记忆(任务大脑)→ L1 会话记忆 → L2 持久记忆 → L3 外部检索。每层有明确的存储机制、生命周期和检索策略。
  • 关键实现:MemoryManager 编排器 + 检索边界(推/拉/混合触发)+ 记忆卫生(去重/矛盾检测/PII扫描)+ 多租户作用域隔离。含 7 段完整 Python 代码。
  • 读完能做什么:为你的 Agent 设计一套生产级记忆系统——知道什么该记住、记多久、怎么检索、如何防污染、怎么隔离多用户数据。

1. 为什么「加个向量数据库」不等于有了记忆系统

一个客服 Agent 上线了。团队给它配了一个向量数据库,每次对话结束后把用户偏好写入向量库。下一次用户提问时,Agent 从向量库检索相关偏好,注入到 Prompt 中。"Agent 有记忆了",团队很满意。

三周后,用户投诉:Agent 反复推荐一款用户在上个月明确拒绝过的产品。团队排查发现:用户确实在上个月说过"我不喜欢这个品牌",Agent 也忠实地把这句话存入了向量库。但用户第二周又说过一次"现在不考虑这个品类"——两条偏好叠加在一起,向量检索返回了第一条(因为它更短、余弦相似度更高),Agent 看到的是"我不喜欢这个品牌"而非"现在不考虑这个品类"。更糟的是,用户第三周改了偏好——"X 品牌的新系列不错"——这条新信息应该覆盖旧信息,但它只是作为一条新记录被追加上去。Agent 在三条互相矛盾的偏好中随机检索。

这就是向量数据库 ≠ 记忆系统的本质原因:向量数据库是一个存储引擎,记忆系统是一个管理层。前者负责"存和查",后者负责"什么时候写、写什么、什么时候更新、什么时候淘汰、查到了怎么判断是否可信"。

三种朴素的记忆失败模式

在生产环境中,简单地"把东西塞进向量库"会触发三类问题:

  1. 遗忘(Forgetting):L0 上下文窗口溢出,Agent 在处理多步任务时丢失中间状态。比如 Agent 执行一个 12 步的数据库迁移,第 8 步时 LLM 的上下文窗口已经被前 7 步的结果占满,第 1 步的关键约束"不要修改 users 表"被挤出窗口——Agent 在第 9 步把 users 表删了。这不是记忆力差,是没有结构化工作记忆——关键约束应该被锁定在独立的槽位中,不受上下文窗口滚动的影响。
  2. 污染(Pollution):旧的、错误的、矛盾的记忆从不清理。一个 Agent 连续运行了三个月,积累了上千条"用户偏好",其中 40% 已经过时、15% 互相矛盾、5% 是用户在测试阶段的非真实数据。每次检索都是一场赌博——返回的是最新的还是最旧的?是最相关的还是最相似的?更危险的是,LLM 对向量检索返回的内容默认信任——它不会质疑"这条偏好可能已经过期了"。
  3. 混淆(Conflation):所有用户的数据放在同一个桶里。Agent A 为一家 SaaS 公司服务,同时为多个客户提供客服。用户 1 说"我喜欢深色模式",用户 2 说"我讨厌深色模式"。两条偏好都在同一个向量库中,检索时按相似度排序——返回哪一个取决于谁写的更短、更"像查询"。这不是 bug,是设计缺陷:没有命名空间隔离

关键洞察:RAG(检索增强生成)和 Memory(记忆系统)不是一回事。RAG 是一个检索机制——工具调用 → 搜索 → 注入。Memory 是一套完整系统——写入策略、生命周期管理、检索边界、作用域隔离、记忆卫生。打个比方:RAG 是一个图书馆的搜索框;Memory 是图书管理员 + 编目规则 + 书架布局 + 剔旧策略 + 借阅规则。搜索框只知道"找书",不知道"这本书已经过时、应该下架了"。

所以我们需要什么:四层架构预览

一个生产级 Agent 记忆系统需要四个层次,每层解决不同的问题:


  L0  工作记忆(Working Memory)         → "现在"
      上下文窗口内的结构化槽位
      生命周期:每次推理轮次重置

  L1  会话记忆(Session Memory)        → "这次对话"
      Redis / 内存字典
      生命周期:会话持续期间

  L2  持久记忆(Persistent Memory)     → "直到被淘汰"
      SQLite/Postgres + 向量数据库
      生命周期:直到显式淘汰

  L3  外部检索(External Retrieval)    → "外面的世界"
      RAG pipeline(文档、API、Web)
      生命周期:无状态获取
  

这篇文章从架构层面解构这四层:每层存什么、怎么存、生命周期多长、如何检索、如何保持卫生。这不是一个向量数据库教程——这是针对"Agent 的记忆会随时间衰减、污染和混淆"这一生产问题,给出的架构级解决方案。

关于记忆的存储实现细节(向量数据库选型、嵌入模型对比),参见 Agent 记忆存储系统。关于框架无关的设计原则,参见 模型无关的 Agent 设计

2. 四层记忆架构:L0 工作记忆 → L3 外部检索

在深入每一层之前,先建立一个全局视图。四层记忆架构的核心思想是分层治理——不是所有信息都放在同一个存储里、用同一种方式检索。每一层有独立的存储机制、生命周期、访问模式和示例数据。

架构全景图


  ┌─────────────────────────────────────────────────────────┐
  │                   Agent — MemoryManager                  │
  │  ┌───────────────────────────────────────────────────┐  │
  │  │                                                     │  │
  │  │   L0  WORKING MEMORY          (上下文窗口内)        │  │
  │  │   任务目标 · 当前计划 · 最近的观察 · 约束 · 草稿纸  │  │
  │  │   生命周期: per-turn  ·  访问: 始终可用            │  │
  │  │                                                     │  │
  │  │   ┌──── write-through ──────────────────────────┐   │  │
  │  │   ▼                                              │   │  │
  │  │   L1  SESSION MEMORY         (Redis / 内存字典)   │   │  │
  │  │   对话轮次 · 工具结果 · 中间决策                  │   │  │
  │  │   生命周期: session  ·  访问: 按需拉取            │   │  │
  │  │                                                   │   │  │
  │  │   ┌──── promotion (重要记忆提升) ──────────────┐  │   │  │
  │  │   ▼                                             │  │   │  │
  │  │   L2  PERSISTENT MEMORY    (SQLite+PG / 向量DB) │  │   │  │
  │  │   用户偏好 · 学习到的知识 · 任务结果 · 实体信息 │  │   │  │
  │  │   生命周期: 直到显式淘汰  ·  访问: 混合检索     │  │   │  │
  │  │                                                  │  │   │  │
  │  │   ┌──── just-in-time ──────────────────────────┐ │  │   │  │
  │  │   ▼                                             ▼ │  │   │  │
  │  │   L3  EXTERNAL RETRIEVAL   (RAG / APIs / Web)    │  │   │  │
  │  │   文档 · 知识库 · 实时数据                        │  │   │  │
  │  │   生命周期: 无状态  ·  访问: 即时检索             │  │   │  │
  │  │                                                     │  │
  │  └───────────────────────────────────────────────────┘  │
  └─────────────────────────────────────────────────────────┘
  

各层定义

存储机制生命周期访问模式示例数据
L0 工作记忆 上下文窗口内(LLM context) 每轮推理后重置 始终可用(push) 任务目标、当前计划步骤、最近 3-5 条工具输出、约束条件、草稿纸
L1 会话记忆 Redis / 内存字典 会话持续期间 按需拉取(pull when relevant) 完整对话历史、工具调用结果、中间决策记录
L2 持久记忆 SQLite / PostgreSQL + 向量数据库 直到显式淘汰 混合检索(关键词 + 向量 + 结构化查询) 用户偏好、学习到的事实、历史任务结果、实体知识
L3 外部检索 RAG pipeline(外部文档、API、Web) 无状态获取 即时检索(just-in-time) 产品文档、知识库文章、实时 API 数据、Web 搜索结果

关键设计原则

  1. Write-through(写入穿透):L0 → L1 自动写入。每次重要的工具调用结果在注入 L0 的同时写入 L1。这确保了即使 L0 被后续内容挤出窗口,信息仍然在会话中可恢复。
  2. Promotion / Demotion(提升 / 降级):会话结束时,MemoryManager 评估 L1 中的记忆。重要的(用户明确反馈、关键决策、配置变更)提升到 L2 持久化;不重要的(中间推理、暂态工具输出)随会话结束而释放。
  3. TTL per layer(每层独立 TTL):L1 的 TTL = 会话时长(通常几分钟到几小时)。L2 的 TTL = 可配置(几天到永久),支持软 TTL(标记待审查)和硬 TTL(到期自动删除)。
  4. Namespace isolation(命名空间隔离):每一层的存储都按 tenant_iduser_id 划分命名空间。L2 的用户偏好不能跨用户检索——这不是一个性能优化,而是数据安全的基本要求。

代码:MemoryLayer 枚举 + MemoryConfig

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional

class MemoryLayer(Enum):
    L0_WORKING = "l0_working"
    L1_SESSION = "l1_session"
    L2_PERSISTENT = "l2_persistent"
    L3_EXTERNAL = "l3_external"

@dataclass
class LayerConfig:
    """单层记忆的配置"""
    max_items: int           # 最大条目数
    ttl_seconds: Optional[int]  # TTL(秒),None 表示不过期
    eviction_policy: str = "lru"  # 淘汰策略:lru / fifo / ttl

@dataclass
class MemoryConfig:
    """Agent 记忆系统总配置"""
    tenant_id: str  # 多租户隔离
    user_id: str    # 用户级隔离

    l0: LayerConfig = field(default_factory=lambda: LayerConfig(
        max_items=5, ttl_seconds=None))  # per-turn reset,不需要 TTL

    l1: LayerConfig = field(default_factory=lambda: LayerConfig(
        max_items=100, ttl_seconds=3600))  # 1 小时会话

    l2: LayerConfig = field(default_factory=lambda: LayerConfig(
        max_items=10000, ttl_seconds=86400 * 30))  # 30 天

    l3: LayerConfig = field(default_factory=lambda: LayerConfig(
        max_items=0, ttl_seconds=None))  # 无状态,无上限

    # 检索配置
    similarity_threshold: float = 0.75  # 向量检索最低相似度
    hybrid_search_weight: float = 0.5   # 向量 vs 关键词权重(0=纯关键词, 1=纯向量)

这篇架构的视角是"记忆作为仓库"——存储、分类、检索、清理。与之对应的另一篇文章 Agent 上下文协议设计 讨论的是"记忆如何流动"——信息的序列化格式、传输协议、压缩策略。仓库管存储,管道管输送,两者互补。

3. 工作记忆设计:Agent 的「大脑工作台」

L0 工作记忆是 Agent 的"大脑工作台"——每一次推理轮次从这里开始。它不是简单的"把前 N 条消息塞进 Prompt",而是结构化的槽位系统。之所以需要结构化,是因为 LLM 对扁平消息列表的注意力分布不均匀——距离当前响应位置越近的内容,注意力权重越高;越远的内容,越容易被"遗忘"。如果你把任务目标放在第 1 条消息中,经过 20 轮对话后,LLM 对这条目标的有效注意力已经衰减到接近零。

五个结构化槽位

L0 工作记忆不是一条扁平的消息列表,而是五个独立的槽位,每个槽位以固定的格式注入 Prompt:

槽位存储内容更新频率Prompt 中的位置
task_goal 当前任务的目标(一句话) 任务开始设定,任务完成清除 最顶部(最高注意力权重)
active_plan 当前步骤 + 下一步骤 每个步骤完成后更新 目标之后
recent_observations 最近 N 条工具调用结果(max 3-5 条) 每次工具调用后追加 计划之后
constraints 硬约束:预算、截止时间、禁止操作 任务开始设定,极少变更 观察之后(但始终保留)
scratchpad 中间推理、临时计算、待验证假设 任意时刻读写 最后(最近、最灵活)

这个结构不是任意的。它遵循一个原则:越重要的信息位置越固定。LLM 的注意力机制对固定位置的重复模式最敏感——如果每轮 Prompt 中目标都在同一个位置、用同一个标签包裹,LLM 对这个槽位的注意力保持更稳定。

Push vs Pull:什么时候从下层拉取记忆

L0 不是孤立的——它在两个时间点从 L1/L2 拉取信息:

  1. 任务启动时:从 L2 拉取用户偏好、历史任务结果、实体知识,填充 task_goal 和 constraints 槽位。从 L1 拉取当前会话的累积上下文(如果有的话)。
  2. 工具调用时:在调用需要特定知识的工具之前,从 L2 拉取相关事实。例如:Agent 准备调用 deploy_to_kubernetes 之前,从 L2 拉取"用户上次要求使用 us-east-1 区域"的偏好。

注意这跟"每轮都检索"不同——每轮检索会把不相关的记忆注入 Prompt,浪费上下文预算。L0 的检索是事件驱动的:只在状态转换点(任务开始、工具调用)触发。

代码:WorkingMemory 类

from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime

@dataclass
class Observation:
    """单条工具调用结果"""
    tool_name: str
    result_summary: str
    timestamp: float
    importance: str = "normal"

@dataclass
class WorkingMemory:
    """L0 工作记忆——Agent 的大脑工作台"""

    task_goal: str = ""
    active_plan: dict = field(default_factory=dict)
    recent_observations: list = field(default_factory=list)
    constraints: list = field(default_factory=list)
    scratchpad: str = ""

    MAX_OBSERVATIONS = 5
    MAX_CONSTRAINTS = 8

    def update_task(self, goal, plan_steps, constraints=None):
        self.task_goal = goal
        self.active_plan = {
            "current_step": plan_steps[0] if plan_steps else "",
            "next_steps": plan_steps[1:3] if len(plan_steps) > 1 else [],
            "total_steps": len(plan_steps),
            "completed": 0,
        }
        if constraints:
            self.constraints = constraints[:self.MAX_CONSTRAINTS]

    def add_observation(self, tool_name, result, importance="normal"):
        obs = Observation(
            tool_name=tool_name,
            result_summary=result[:200],
            timestamp=datetime.now().timestamp(),
            importance=importance,
        )
        if importance == "critical":
            self.recent_observations.insert(0, obs)
        else:
            self.recent_observations.append(obs)
        self.recent_observations = self.recent_observations[:self.MAX_OBSERVATIONS]

    def update_scratchpad(self, note):
        ts = datetime.now().strftime("%H:%M")
        self.scratchpad += "\n[" + ts + "] " + note

    def advance_plan(self):
        self.active_plan["completed"] += 1
        steps = self.active_plan["next_steps"]
        if steps:
            self.active_plan["current_step"] = steps[0]
            self.active_plan["next_steps"] = steps[1:]
        else:
            self.active_plan["current_step"] = ""

    def to_prompt(self):
        lines = []
        if self.task_goal:
            lines.append("[TASK_GOAL] " + self.task_goal)
        plan = self.active_plan
        if plan.get("current_step"):
            lines.append("[CURRENT_STEP] (" + str(plan["completed"]) +
                         "/" + str(plan["total_steps"]) + ") " + plan["current_step"])
        if plan.get("next_steps"):
            lines.append("[NEXT_STEPS] " + " → ".join(plan["next_steps"]))
        if self.recent_observations:
            obs_lines = []
            for obs in self.recent_observations:
                marker = "⚡" if obs.importance == "critical" else "·"
                obs_lines.append("  " + marker + " [" + obs.tool_name + "] " + obs.result_summary)
            lines.append("[RECENT_OBSERVATIONS]\n" + "\n".join(obs_lines))
        if self.constraints:
            c_lines = ["  - " + c for c in self.constraints]
            lines.append("[CONSTRAINTS]\n" + "\n".join(c_lines))
        if self.scratchpad.strip():
            lines.append("[SCRATCHPAD]\n" + self.scratchpad.strip())
        return "\n\n".join(lines)

    def reset(self):
        self.recent_observations = []
        self.scratchpad = ""


wm = WorkingMemory()
wm.update_task(
    goal="将 user-service 从 v2.1 升级到 v3.0,零停机",
    plan_steps=[
        "检查 v3.0 的 breaking changes",
        "在 staging 环境部署 v3.0",
        "运行集成测试套件",
        "金丝雀发布 5% 流量",
        "全量切换",
    ],
    constraints=[
        "零停机——必须使用滚动更新",
        "不要修改数据库 schema",
        "回滚时间 < 2 分钟",
        "预算:AWS 额外费用不超过 $50",
    ]
)
wm.add_observation("check_breaking_changes",
    "v3.0 移除了 /api/v1/users 端点,改用 /api/v2/users", "critical")
wm.add_observation("deploy_staging",
    "v3.0 在 staging 成功部署,健康检查通过")
wm.advance_plan()
wm.update_scratchpad("集成测试需要额外配置 test_db 连接")
print(wm.to_prompt())

注意 to_prompt() 的输出结构:目标在最顶部,约束始终保留,草稿纸在最后。这不是随意排列的——目标是 LLM 在整个推理过程中应该持续关注的"北极星";最近的观察提供最新的环境反馈;约束是铁律,绝不能因为上下文滚动而被遗忘。草稿纸在最底部,因为它距离 LLM 的当前位置最近,适合存放"现在正在想的事情"。

记忆预算:L0 不是垃圾桶

L0 的信息存储在 LLM 的上下文窗口中,而上下文窗口有两个成本:注意力成本(LLM 对每个 token 平均分布注意力,token 越多每个 token 获得的平均注意力越低)和经济成本(按 token 计费)。这意味着 L0 必须严格预算:

记忆预算的核心准则:不是所有信息都应该进入 L0。L0 是 Agent 的"注意力焦点"——只有当前任务最需要的信息才值得占据这个焦点。其他信息在 L1/L2 中安静等待,在需要时被拉取。

4. 长期记忆生命周期:写入、去重、更新、淘汰

L2 持久记忆是记忆系统中最复杂的层次——它停留时间最长,积累的数据最多,如果不加管理,会从一个"有用的知识库"退化为一个"噪音池"。这一节把 L2 的生命周期定义为一个状态机——每一条记忆从诞生到消亡都经历明确的阶段。

L2 记忆生命周期状态机


  ┌────────┐    ┌─────────────┐    ┌──────────┐    ┌──────────────┐
  │  WRITE │───▶│ DEDUP CHECK │───▶│ SIMILAR  │───▶│ STORE + TTL  │
  └────────┘    └──────┬──────┘    │  MERGE?  │    └──────┬───────┘
                       │           └──────────┘           │
                       │ (重复)                           ▼
                       ▼                          ┌──────────────┐
                  ┌──────────┐                     │ PERIODIC GC  │
                  │  UPDATE  │                     └──────┬───────┘
                  └──────────┘                            │
                                                          ▼
                                                  ┌──────────────┐
                                                  │ EVICT EXPIRED│
                                                  └──────┬───────┘
                                                         │
                                                         ▼
                                                  ┌──────────────┐
                                                  │ EMIT METRICS │
                                                  └──────────────┘
  

这个状态机的每一步都有明确的策略和边界条件。它不是"想到了就写,过期了就删"——每一步都是一个决策点。

写入策略:什么时候向 L2 写入

不是每个工具调用结果都值得写入 L2。以下是 L2 写入的触发条件(满足任一即可):

触发条件示例来源
任务完成后学习总结 "升级时遇到的主要坑:v3.0 API 端点名称全部从 /v1/ 改为 /v2/" Agent 在任务结束时自我总结
用户明确反馈 "我不喜欢深色模式"、"记住:以后都用 us-east-1" 用户对话中的显式偏好陈述
重要事实发现 "user-service 的数据库连接池上限是 100" 工具调用结果中提取的关键参数
配置变更 "log_level 从 INFO 改为 DEBUG" Agent 执行的操作产生了持久效果

以下情况不应写入 L2:中间推理步骤、临时工具调用结果(如 ls 输出)、超过一半以上内容相同的重复信息、置信度低于阈值的事实推断。

去重策略:三条防线

去重是 L2 的第一道防线——在写入之前判断这条记忆是否"已经存在"。三条策略按成本从低到高排列:

  1. 精确匹配(Hash):对记忆的规范化文本计算 SHA256 哈希。如果哈希已存在 → 更新已有记录的时间戳和置信度,不新建。成本极低,O(1)。
  2. 语义相似(Cosine Similarity):对新记忆和现有记忆分别计算嵌入向量,计算余弦相似度。如果 similarity > 0.95 → 同一事实,合并(保留更新时间和更高置信度的版本)。成本中等,需要一次向量计算。
  3. 实体级(Entity-level):如果记忆带有 user_idmemory_key(如 "color_preference"),按 key 精确查找已有记录。如果存在 → 更新值而非新建。成本低,O(1) 索引查找。这是最可靠的方式——比语义相似度更精确。

更新 vs 覆盖:版本化记忆

当一条记忆被判定为"已存在"时,不应该直接覆盖——应该版本化。每条 L2 记忆记录包含:

当两条记忆矛盾时(如 "user 喜欢 X 品牌" vs "user 不喜欢 X 品牌"),系统比较它们的置信度和时间戳:更新且置信度更高的获胜;如果置信度相同,更新的获胜。矛盾的旧版本不删除——标记为 superseded 并保留在审计日志中。

TTL 与淘汰

L2 记忆不是永生的。每条记忆都有 TTL(Time To Live),分为两类:

GC(垃圾回收):定期扫描 L2 中所有记录,检查:

  1. 硬 TTL 已过期 → 直接删除
  2. 软 TTL 已过期且超过 30 天未被检索 → 删除
  3. 与更新版本矛盾的 superseded 记录超过 90 天 → 删除
  4. 置信度 < 0.3 且超过 60 天未被验证 → 标记为 stale

代码:LongTermMemory 类

import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from enum import Enum

class MemorySource(Enum):
    USER_STATED = "user_stated"
    AGENT_INFERRED = "agent_inferred"
    TASK_OUTCOME = "task_outcome"
    CONFIG_CHANGE = "config_change"

class MemoryStatus(Enum):
    ACTIVE = "active"
    SUPERSEDED = "superseded"
    STALE = "stale"
    EVICTED = "evicted"

@dataclass
class MemoryEntry:
    """L2 持久记忆中的一条记录"""
    memory_id: str
    user_id: str
    memory_key: str
    content: str
    embedding: Optional[list] = None
    source: MemorySource = MemorySource.AGENT_INFERRED
    confidence: float = 0.6
    version: int = 1
    status: MemoryStatus = MemoryStatus.ACTIVE
    soft_ttl_days: int = 30
    hard_ttl_days: int = 365
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
    updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
    last_accessed_at: str = field(default_factory=lambda: datetime.now().isoformat())
    retrieved_count: int = 0

    def content_hash(self):
        normalized = self.content.strip().lower()
        return hashlib.sha256(normalized.encode()).hexdigest()

class LongTermMemory:
    """L2 持久记忆管理器"""

    def __init__(self, user_id, db_conn, vector_store, embed_fn):
        self.user_id = user_id
        self.db = db_conn
        self.vector_store = vector_store
        self.embed = embed_fn
        self.SIMILARITY_MERGE_THRESHOLD = 0.95

    def write(self, memory_key, content,
              source=MemorySource.AGENT_INFERRED, confidence=0.6):
        # 第 1 关:实体级去重(最快、最精确)
        existing = self._lookup_by_key(self.user_id, memory_key)
        if existing:
            return self._update_existing(existing, content, confidence)

        # 第 2 关:精确哈希去重
        temp_entry = MemoryEntry(
            memory_id="", user_id=self.user_id,
            memory_key=memory_key, content=content)
        content_hash = temp_entry.content_hash()
        hash_match = self._lookup_by_hash(self.user_id, content_hash)
        if hash_match:
            return self._update_existing(hash_match, content, confidence)

        # 第 3 关:语义相似去重(最贵,最后用)
        embedding = self.embed(content)
        similar = self._search_similar(self.user_id, embedding, top_k=1)
        if similar and similar[0][1] >= self.SIMILARITY_MERGE_THRESHOLD:
            return self._merge_or_update(similar[0][0], content, confidence)

        # 通过所有去重关——创建新记忆
        entry = MemoryEntry(
            memory_id=self._generate_id(),
            user_id=self.user_id,
            memory_key=memory_key,
            content=content,
            embedding=embedding,
            source=source,
            confidence=confidence,
        )
        self._persist(entry)
        return entry

    def read(self, query, top_k=5, use_hybrid=True):
        results = []
        query_embedding = self.embed(query)
        vector_results = self._search_similar(
            self.user_id, query_embedding, top_k=top_k)

        keyword_results = []
        if use_hybrid:
            keyword_results = self._keyword_search(
                self.user_id, query, top_k=top_k)

        merged = self._merge_results(vector_results, keyword_results, top_k)
        for entry_id, score in merged:
            entry = self._load(entry_id)
            if entry and entry.status == MemoryStatus.ACTIVE:
                entry.last_accessed_at = datetime.now().isoformat()
                entry.retrieved_count += 1
                results.append(entry)

        return results

    def update(self, memory_id, content, confidence=None):
        entry = self._load(memory_id)
        if not entry:
            return None
        entry.status = MemoryStatus.SUPERSEDED
        self._persist(entry)

        new_entry = MemoryEntry(
            memory_id=self._generate_id(),
            user_id=entry.user_id,
            memory_key=entry.memory_key,
            content=content,
            embedding=self.embed(content),
            source=entry.source,
            confidence=confidence if confidence is not None else entry.confidence,
            version=entry.version + 1,
        )
        new_entry.soft_ttl_days = entry.soft_ttl_days
        new_entry.hard_ttl_days = entry.hard_ttl_days
        self._persist(new_entry)
        return new_entry

    def evict(self, dry_run=False):
        stats = {"hard_expired": 0, "soft_expired": 0,
                 "superseded_old": 0, "stale_marked": 0}

        now = datetime.now()
        all_entries = self._list_active(self.user_id)

        for entry in all_entries:
            updated = datetime.fromisoformat(entry.updated_at)
            accessed = datetime.fromisoformat(entry.last_accessed_at)

            if updated + timedelta(days=entry.hard_ttl_days) < now:
                if not dry_run:
                    entry.status = MemoryStatus.EVICTED
                    self._persist(entry)
                stats["hard_expired"] += 1
                continue

            if (updated + timedelta(days=entry.soft_ttl_days) < now
                    and accessed + timedelta(days=30) < now):
                if not dry_run:
                    entry.status = MemoryStatus.EVICTED
                    self._persist(entry)
                stats["soft_expired"] += 1
                continue

            if (entry.status == MemoryStatus.SUPERSEDED
                    and updated + timedelta(days=90) < now):
                if not dry_run:
                    entry.status = MemoryStatus.EVICTED
                    self._persist(entry)
                stats["superseded_old"] += 1
                continue

            if (entry.confidence < 0.3
                    and updated + timedelta(days=60) < now
                    and entry.status == MemoryStatus.ACTIVE):
                if not dry_run:
                    entry.status = MemoryStatus.STALE
                    self._persist(entry)
                stats["stale_marked"] += 1

        return stats

    def _lookup_by_key(self, user_id, key):
        return None

    def _lookup_by_hash(self, user_id, h):
        return None

    def _search_similar(self, user_id, embedding, top_k):
        return []

    def _keyword_search(self, user_id, query, top_k):
        return []

    def _merge_results(self, vec_results, kw_results, top_k):
        return []

    def _generate_id(self):
        import uuid
        return str(uuid.uuid4())[:12]

    def _persist(self, entry):
        pass

    def _load(self, memory_id):
        return None

    def _list_active(self, user_id):
        return []

    def _update_existing(self, existing, content, confidence):
        if confidence >= existing.confidence:
            return self.update(existing.memory_id, content, confidence)
        else:
            existing.last_accessed_at = datetime.now().isoformat()
            existing.retrieved_count += 1
            self._persist(existing)
            return existing

    def _merge_or_update(self, existing_id, content, confidence):
        existing = self._load(existing_id)
        if not existing:
            return self.write("", content)
        if confidence > existing.confidence:
            return self.update(existing_id, content, confidence)
        return existing

这个实现中有几个值得强调的设计决策:

关于记忆的每一次变更(写入、更新、淘汰、合并)都应该作为一个审计事件被记录,参见 Agent 审计日志设计——每一条 L2 记忆的 mutation 都对应一条审计记录,形成不可篡改的变更链。

5. 检索边界设计:推、拉与混合触发

记忆系统存储了数据,但检索策略决定了 Agent 在正确的时间看到正确的信息。如果检索得太少——Agent 缺少关键上下文,做错决策。如果检索得太多——上下文窗口被无关信息淹没,注意力稀释。检索边界设计的目标是在"太少"和"太多"之间找到正确的触发位置

三种检索触发模式

检索不是"每次对话都搜一次"那么简单。根据触发时机,检索分为三种模式:

模式触发时机谁决定适用场景
Push(主动推送) 任务开始时,在首次 LLM 调用之前 系统(预设规则) 用户偏好、任务模板、关键约束——"每次都应该看到"的信息
Pull(按需拉取) 任务执行中,Agent 调用检索工具 LLM(自主决策) 特定事实查询——"这个 API 的上一个版本是怎么配置的?"
Hybrid(混合触发) Push 开局 + Pull 按需 + 定时刷新 系统 + LLM 协作 生产级 Agent——既有稳定基线,又有灵活查询能力

Push:主动注入——"先给你准备好"

Push 发生在任务开始之前。MemoryManager 扫描 L1/L2 中与当前任务上下文相关的记忆,将相关性最高的 N 条注入 L0 工作记忆,然后才发出第一次 LLM 调用。

Push 的关键约束是预算

Push 的典型注入内容:

  1. 用户偏好:"用户要求所有数据库操作使用 read-only 副本"、"用户偏好收到 Slack 通知而非邮件"
  2. 活跃任务上下文:如果 Agent 正在执行一个跨会话的长任务(如为期三天的数据库迁移),Push 注入上一会话的进度和关键发现
  3. 实体事实:"user-service 的生产数据库连接池上限是 100"——在执行与 user-service 相关的任务时主动注入

Pull:按需拉取——"需要什么自己查"

Pull 发生在任务执行过程中。Agent 执行到某一步时,发现自己需要某个特定信息——它调用一个检索工具,从 L1/L2 中查询相关记忆。与 Push 不同,Pull 的决策权在 LLM——LLM 判断"我现在需要查什么"。

Pull 的典型场景:

  1. 工具调用前的上下文补充:Agent 准备调用 deploy_to_kubernetes 时,Pull "用户上次要求的区域是 us-east-1"
  2. 遇到错误时的历史查找:部署失败,Agent Pull "上一次部署 user-service 时遇到过类似的权限错误,解决方案是……"
  3. 用户问题应答:用户问"上次那个数据库迁移用了多久?",Agent Pull 对应的任务结果记忆

Pull 的工具设计要点:Pull 检索工具应该暴露给 LLM 一个清晰的接口——search_memory(query, layer, top_k)。LLM 不需要知道底层是向量搜索还是关键词搜索——它只需要表达"我想找什么"。检索的融合、排序、去重由系统层处理。

Hybrid:混合触发——生产环境的标配

单独的 Push 或 Pull 在各自场景下都有盲区。Push 的问题是"系统猜 Agent 需要什么"——猜错了就浪费预算。Pull 的问题是"Agent 不知道它不知道什么"——Agent 不会主动查询它不知道存在的记忆。

混合触发结合两者:

  1. 任务启动时:Push——注入用户偏好、活跃任务上下文、关键实体事实(这些都是"每次都应该看到"的信息)
  2. 执行过程中:Pull——Agent 按需调用检索工具获取特定信息
  3. 定时刷新:每隔 K 轮(如 K=5),重新评估 Push 记忆池的相关性——因为任务执行过程中上下文在变化,第 1 轮相关的记忆到第 10 轮可能已经无关了。定时刷新确保 L0 中的 Push 记忆保持新鲜

检索融合(Retrieval Fusion)

单靠向量检索有盲区——它擅长语义相似但不擅长精确匹配和结构化过滤。生产级检索应该融合三种搜索:

搜索方式擅长盲区
向量搜索(Vector) 语义相似——"用户喜欢暗色调"能匹配"用户偏好深色主题" 不擅长精确值匹配——"user_id=42" 在向量空间中没有意义
关键词搜索(Keyword) 精确匹配——"v3.0"、"us-east-1" 等术语精确命中 不擅长同义词和语义变化——"暗色模式"不会匹配"深色主题"
结构化查询(Structured) 精确过滤——"user_id=42 AND memory_key='color_preference'" 不擅长模糊查询——需要知道确切的 key 名称

融合流程:三种搜索各自返回 Top K 结果 → 去重(同一 memory_id 只保留一次)→ 按复合分数重新排序 → 取最终 Top K 注入 L0。

复合分数公式:

composite_score = α × vector_score + β × keyword_score + γ × recency_bonus + δ × importance_bonus

  # 默认权重(可调)
  α = 0.4   # 向量分数权重
  β = 0.3   # 关键词分数权重
  γ = 0.2   # 新近度加分(越新越高)
  δ = 0.1   # 重要性加分(critical > normal)
  

相关性门槛调优

相关性门槛是检索系统最关键的旋钮:

代码:RetrievalBoundary 类

from dataclasses import dataclass, field
  from typing import Optional
  from datetime import datetime
  import math

  @dataclass
  class RetrievalResult:
      """单条检索结果"""
      memory_id: str
      content: str
      vector_score: float = 0.0
      keyword_score: float = 0.0
      composite_score: float = 0.0
      source_layer: str = "l2"

  @dataclass
  class RetrievalBoundary:
      """检索边界——控制记忆何时、如何从 L1/L2 注入 L0"""

      push_budget_tokens: int = 2000          # Push 注入的 token 预算
      push_max_items: int = 5                 # Push 注入的最大条目数
      relevance_threshold: float = 0.75       # 相关性最低门槛
      refresh_interval_turns: int = 5         # 每 K 轮刷新 Push 记忆

      # 融合权重
      alpha: float = 0.4    # 向量权重
      beta: float = 0.3     # 关键词权重
      gamma: float = 0.2    # 新近度权重
      delta: float = 0.1    # 重要性权重

      _push_cache: list = field(default_factory=list)
      _turn_counter: int = 0

      def push(self, l1_store, l2_store, task_context, embed_fn):
          """任务启动前:主动注入相关记忆到 L0"""
          candidates = []

          # 从 L1 收集会话级相关记忆
          l1_results = l1_store.search(task_context, top_k=self.push_max_items)
          for r in l1_results:
              if r.score >= self.relevance_threshold:
                  candidates.append(RetrievalResult(
                      memory_id=r.id, content=r.content,
                      vector_score=r.score, source_layer="l1"))

          # 从 L2 收集持久记忆——用户偏好、活跃任务、实体事实
          l2_results = l2_store.hybrid_search(
              task_context, top_k=self.push_max_items,
              filters={"memory_type": ["user_preference", "entity_fact", "task_context"]})
          for r in l2_results:
              if r.score >= self.relevance_threshold:
                  candidates.append(RetrievalResult(
                      memory_id=r.id, content=r.content,
                      vector_score=r.score, keyword_score=r.keyword_score or 0,
                      source_layer="l2"))

          # 融合去重
          fused = self._fuse_and_rank(candidates)
          # 按 token 预算裁剪
          selected = self._budget_clip(fused, self.push_budget_tokens)
          self._push_cache = selected
          self._turn_counter = 0
          return selected

      def pull(self, l1_store, l2_store, query, top_k=5):
          """任务执行中:LLM 主动调用检索工具"""
          results = []
          q_embedding = l2_store.embed(query)

          # 向量搜索
          vec_results = l2_store.vector_search(q_embedding, top_k=top_k * 2)
          for r in vec_results:
              results.append(RetrievalResult(
                  memory_id=r.id, content=r.content,
                  vector_score=r.score, source_layer="l2"))

          # 关键词搜索
          kw_results = l2_store.keyword_search(query, top_k=top_k)
          for r in kw_results:
              results.append(RetrievalResult(
                  memory_id=r.id, content=r.content,
                  keyword_score=r.score, source_layer="l2"))

          fused = self._fuse_and_rank(results)
          return fused[:top_k]

      def refresh_if_needed(self, l1_store, l2_store, task_context):
          """每 K 轮刷新 Push 记忆池"""
          self._turn_counter += 1
          if self._turn_counter >= self.refresh_interval_turns:
              return self.push(l1_store, l2_store, task_context)
          return self._push_cache

      def _fuse_and_rank(self, candidates):
          """融合去重并按复合分数排序"""
          seen = {}
          fused = []
          now = datetime.now().timestamp()

          for c in candidates:
              if c.memory_id in seen:
                  # 保留分数更高的
                  existing = seen[c.memory_id]
                  if c.vector_score > existing.vector_score:
                      seen[c.memory_id] = c
                  continue
              seen[c.memory_id] = c

          for m_id, c in seen.items():
              # 新近度加分:假设内存中可获取 last_updated
              recency_bonus = 0.5  # 默认
              importance_bonus = 0.5

              c.composite_score = (
                  self.alpha * c.vector_score +
                  self.beta * c.keyword_score +
                  self.gamma * recency_bonus +
                  self.delta * importance_bonus
              )
              fused.append(c)

          fused.sort(key=lambda x: x.composite_score, reverse=True)
          return fused

      def _budget_clip(self, candidates, max_tokens):
          """按 token 预算裁剪结果列表"""
          selected = []
          token_count = 0
          for c in candidates:
              est_tokens = len(c.content) // 3  # 粗略估算:中文 ~1.5 字符/token
              if token_count + est_tokens > max_tokens and selected:
                  break
              selected.append(c)
              token_count += est_tokens
          return selected

      def tune_threshold(self, hit_rate, false_positive_rate):
          """根据 A/B 测试结果调整相关性门槛"""
          if hit_rate < 0.3:
              self.relevance_threshold = max(0.5, self.relevance_threshold - 0.05)
          elif false_positive_rate > 0.4:
              self.relevance_threshold = min(0.95, self.relevance_threshold + 0.05)
          return self.relevance_threshold
  

检索边界不是孤立的——它使用上下文信封(Context Envelope)将检索到的记忆注入 L0。关于上下文数据的序列化和传输协议,参见 Agent 上下文协议设计。检索边界定义了"什么数据被检索",上下文协议定义了"数据如何打包和传递"。

6. 记忆卫生与防污染

记忆系统不是"只写不清理"的日志。随着 Agent 运行时间增长,L2 持久记忆中会积累各种"污染物"——重复数据、矛盾事实、过期信息、敏感数据。如果没有主动的卫生机制,记忆质量会持续下降,直到 Agent 的决策基于过时和矛盾的信息。第四章讲了单条记忆的生命周期管理;这一章聚焦跨记忆的污染检测和清理策略

四种污染向量

记忆污染有四种典型模式,每种都需要不同的检测和修复策略:

污染类型成因危害检测方式
重复(Duplicates) 同一事实在不同时间、不同上下文下被多次写入,使用了不同的 memory_key 或文本表述 检索时同一事实返回多条,浪费 L0 预算;更新时只更新了其中一条,其他副本变成过期数据 哈希去重 + 语义相似去重 + 实体级 key 去重(已在第 4 章实现)
矛盾(Contradictions) 用户改了偏好但旧偏好没被覆盖;Agent 在不同任务中推断出矛盾的事实 Agent 在两条矛盾信息中随机选择,决策不可预测——"用户喜欢深色模式" vs "用户喜欢浅色模式"同时存在于 L2 写入时搜索冲突事实 + 定期矛盾扫描
过期(Staleness) 事实已经改变但旧记录从未被更新或淘汰;TTL 未配置或配置过长 Agent 基于过时信息做决策——"API 端点还在 /v1/"(实际已迁移到 /v2/) 过期分数计算 + TTL 到期检测
敏感信息泄露(Sensitivity) 用户无意中输入了 PII(电话号码、邮箱、身份证号)或凭据(API key、token),被写入了 L2 隐私泄露 + 安全风险——凭据在检索时被注入 L0,可能在后续对话中被 LLM 输出 正则模式匹配 + LLM 敏感内容分类器

矛盾检测:不止是关键词相反

矛盾检测比去重更难——两条记忆的文本可能完全不同,但表达的意思互相排斥。简单的关键词反义词匹配("喜欢" vs "讨厌")覆盖不了大部分矛盾。有效的矛盾检测需要:

  1. 实体 + 属性对齐:先确认两条记忆是否谈论同一实体的同一属性。如果一条是 "user.color_preference = dark" 另一条是 "user.color_preference = light"——这是直接矛盾。如果一条是 "user.color_preference = dark" 另一条是 "user.font_preference = large"——这不矛盾,属性不同。
  2. 语义矛盾判断:对于非结构化记忆(如"用户不喜欢 X 品牌" vs "用户说 X 品牌的新系列不错"),使用 LLM 判断是否存在矛盾。将两条记忆文本发送给一个轻量级的判断 prompt:
  # 矛盾判断 Prompt
  你是一个事实一致性检查器。判断以下两条记忆是否存在矛盾。
  如果矛盾,返回 {"contradiction": true, "reason": "..."}
  如果不矛盾,返回 {"contradiction": false}
  如果一条是对另一条的更新/修正,返回 {"contradiction": false, "update": true}

  记忆 A: {memory_a.content}
  记忆 B: {memory_b.content}
  
  1. 自动解决 vs 人工审查:如果两条矛盾的记忆都有明确的置信度差异(如一条置信度 1.0,另一条 0.4),自动保留高置信度的一方,将低置信度方标记为 superseded。如果置信度相近(差异 < 0.2),两者都标记为待审查,加入人工审查队列。

过期分数(Staleness Score)

不是所有"旧"记忆都应该被淘汰——有些事实是永不过期的(如"地球绕太阳转")。过期判断需要一个复合分数:

staleness_score = w₁ × age_factor + w₂ × access_decay + w₃ × contradiction_flag

  age_factor = min(1.0, days_since_creation / max_age_days)
  access_decay = 1.0 - (retrieval_count_last_30d / expected_retrieval_count)
  contradiction_flag = 1.0 if has_active_contradiction else 0.0

  # 默认权重
  w₁ = 0.4  # 年龄权重
  w₂ = 0.3  # 访问衰减权重
  w₃ = 0.3  # 矛盾标记权重
  

staleness_score > 0.7 时,记忆被标记为 stale——在检索时降权(乘以 0.5 的衰减系数)。当 staleness_score > 0.9 时,记忆进入候选淘汰队列。

PII / 敏感信息扫描

敏感信息检测分两层:

  1. 正则模式匹配(快速、低成本):在写入路径上扫描常见 PII 模式——邮箱地址、手机号(中国 11 位、国际格式)、身份证号(18 位)、信用卡号(Luhn 算法)、API key(特定前缀如 sk-ghp_)、JWT token(三段 base64 结构)。命中任一模式 → 阻断写入 + 告警。
  2. LLM 敏感内容分类器(慢、高成本、高精度):对于通过正则扫描但仍有嫌疑的内容(如"我家地址是朝阳区建国路 88 号"——正则可能漏掉),使用 LLM 判断是否为敏感个人信息。只在正则阶段标记为 suspicious 时才触发 LLM 分类器,控制成本。

敏感信息阻断不是可选的——生产环境中应该默认启用。唯一的例外是经过明确授权的审计场景——即便如此,敏感数据应加密存储而非明文写入 L2。

代码:MemoryHygiene 类

import re
  import hashlib
  from dataclasses import dataclass, field
  from datetime import datetime, timedelta
  from enum import Enum
  from typing import Optional

  class HygieneAction(Enum):
      BLOCKED = "blocked"           # 写入被阻断
      FLAGGED = "flagged"           # 标记待审查
      MERGED = "merged"             # 自动合并
      SUPERSEDED = "superseded"     # 被新版本取代
      STALE_MARKED = "stale_marked"
      EVICTED = "evicted"

  @dataclass
  class HygieneEvent:
      """一次卫生操作事件"""
      action: HygieneAction
      memory_id: str
      reason: str
      timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
      metadata: dict = field(default_factory=dict)

  @dataclass
  class MemoryHygiene:
      """记忆卫生管理器——防污染、去重、矛盾检测、敏感信息过滤"""

      # 矛盾检测配置
      contradiction_llm_threshold: float = 0.7   # 触发 LLM 判断的相似度门槛
      contradiction_auto_resolve_gap: float = 0.2  # 置信度差异 > 此值则自动解决

      # 过期配置
      staleness_threshold_warn: float = 0.7
      staleness_threshold_evict: float = 0.9
      access_decay_days: int = 30

      # 敏感信息正则模式
      PII_PATTERNS = {
          "email": re.compile(r'[\w\.-]+@[\w\.-]+\.\w+'),
          "phone_cn": re.compile(r'1[3-9]\d{9}'),
          "id_card_cn": re.compile(r'\d{17}[\dXx]'),
          "api_key_openai": re.compile(r'sk-[A-Za-z0-9]{32,}'),
          "api_key_github": re.compile(r'ghp_[A-Za-z0-9]{36}'),
          "jwt": re.compile(r'eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+'),
          "credit_card": re.compile(r'\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b'),
      }

      def __init__(self, l2_store, llm_classifier=None, audit_log=None):
          self.l2 = l2_store
          self.llm_classifier = llm_classifier   # 可选的 LLM 分类器
          self.audit_log = audit_log             # 审计日志系统
          self.remediation_queue: list = []      # 人工审查队列

      def pre_write_check(self, memory_key, content, user_id):
          """写入前检查——在 LongTermMemory.write() 之前调用"""
          events = []

          # 第 1 关:敏感信息扫描
          sens_result = self.sensitivity_scan(content)
          if sens_result["severity"] == "high":
              events.append(HygieneEvent(
                  action=HygieneAction.BLOCKED,
                  memory_id="", reason=f"PII blocked: {sens_result['matches']}",
                  metadata=sens_result))
              self._emit_audit(events)
              return {"allowed": False, "reason": sens_result["matches"], "events": events}

          if sens_result["severity"] == "suspicious":
              events.append(HygieneEvent(
                  action=HygieneAction.FLAGGED,
                  memory_id="", reason="Content flagged for sensitivity review",
                  metadata=sens_result))
              # 不阻断,但标记

          # 第 2 关:矛盾检测
          contradictions = self.contradiction_scan(
              user_id, memory_key, content)
          if contradictions:
              for contra in contradictions:
                  events.append(HygieneEvent(
                      action=HygieneAction.FLAGGED,
                      memory_id=contra.get("existing_id", ""),
                      reason=f"Contradiction with {contra.get('existing_id','')}: {contra.get('detail','')}",
                      metadata=contra))

          self._emit_audit(events)
          return {"allowed": True, "events": events}

      def sensitivity_scan(self, content):
          """扫描内容中的敏感信息"""
          matches = {}
          severity = "none"

          for pattern_name, pattern in self.PII_PATTERNS.items():
              found = pattern.findall(content)
              if found:
                  matches[pattern_name] = found[:3]  # 只保留前 3 个匹配

          if matches:
              # API key、JWT、身份证 → 高危
              high_severity_keys = {"api_key_openai", "api_key_github", "jwt", "id_card_cn"}
              if set(matches.keys()) & high_severity_keys:
                  severity = "high"
              else:
                  severity = "suspicious"

          # 如果正则没命中但内容可疑,且 LLM 分类器可用
          if severity == "none" and self.llm_classifier:
              llm_result = self.llm_classifier.classify(content)
              if llm_result.get("sensitive", False):
                  severity = "suspicious"
                  matches["llm_flagged"] = [llm_result.get("reason", "unknown")]

          return {"severity": severity, "matches": matches}

      def contradiction_scan(self, user_id, memory_key, content):
          """检测新内容是否与已有记忆矛盾"""
          contradictions = []

          # 精确 key 查找
          existing = self.l2.lookup_by_key(user_id, memory_key)
          if existing and existing.content != content:
              # 同 key 不同值——可能是更新,不是矛盾(由 LongTermMemory.update 处理)
              return []

          # 语义搜索——找相似但可能矛盾的内容
          embedding = self.l2.embed(content)
          similar = self.l2.vector_search(embedding, top_k=5)

          for sim_entry, sim_score in similar:
              if sim_score < 0.6:  # 不够相似,不可能是同一话题
                  continue
              if sim_score > self.contradiction_llm_threshold:
                  # 使用 LLM 判断是否矛盾
                  if self.llm_classifier:
                      check = self.llm_classifier.check_contradiction(
                          content, sim_entry.content)
                      if check.get("contradiction"):
                          contradictions.append({
                              "existing_id": sim_entry.memory_id,
                              "existing_content": sim_entry.content[:200],
                              "detail": check.get("reason", ""),
                              "existing_confidence": sim_entry.confidence,
                              "confidence_gap": abs(
                                  getattr(sim_entry, 'confidence', 0.5) - 0.5)
                          })

          return contradictions

      def staleness_score(self, entry):
          """计算一条记忆的过期分数(0-1)"""
          now = datetime.now()
          created = datetime.fromisoformat(entry.created_at)
          accessed = datetime.fromisoformat(entry.last_accessed_at)

          days_since_created = (now - created).days
          max_age = getattr(entry, 'hard_ttl_days', 365)

          age_factor = min(1.0, days_since_created / max(max_age, 1))

          # 过去 30 天内的检索次数
          days_since_access = (now - accessed).days
          access_decay = min(1.0, days_since_access / self.access_decay_days)

          # 矛盾标记
          contradiction_flag = 1.0 if entry.status.value == "superseded" else 0.0

          score = 0.4 * age_factor + 0.3 * access_decay + 0.3 * contradiction_flag

          if score > self.staleness_threshold_evict:
              return score, "evict"
          elif score > self.staleness_threshold_warn:
              return score, "stale"
          return score, "healthy"

      def run_hygiene_cycle(self, user_id, dry_run=False):
          """执行一次完整的卫生检查周期"""
          stats = {"duplicates_found": 0, "contradictions_found": 0,
                   "stale_marked": 0, "evicted": 0, "sensitivity_flagged": 0}

          all_entries = self.l2.list_active(user_id)

          # 扫描过期
          for entry in all_entries:
              score, action = self.staleness_score(entry)
              if action == "evict" and not dry_run:
                  entry.status = "evicted"
                  self.l2.persist(entry)
                  stats["evicted"] += 1
              elif action == "stale":
                  stats["stale_marked"] += 1

          return stats

      def _emit_audit(self, events):
          """将卫生事件写入审计日志"""
          if self.audit_log:
              for event in events:
                  self.audit_log.record(
                      event_type="memory_hygiene",
                      action=event.action.value,
                      memory_id=event.memory_id,
                      reason=event.reason,
                      metadata=event.metadata)
  

每一次卫生操作——去重合并、矛盾标记、过期淘汰、PII 阻断——都应该作为一个审计事件被记录。审计日志提供了记忆变更的不可篡改的证据链。参见 Agent 审计日志设计——每一条卫生事件都是审计流水线上的一环。

7. 多租户记忆隔离与作用域

如果 Agent 为多个用户或组织服务,记忆必须被严格隔离。用户 A 的偏好绝不能泄露到用户 B 的上下文中——这不是性能优化,而是数据安全的基本要求。在 SaaS 客服、企业知识库、多租户 Agent 平台等场景中,作用域隔离是记忆系统的安全基石。

作用域树:四个层级

记忆的作用域不是简单的"用户 A vs 用户 B"二元划分,而是一个层级树:

  /global/                    ← 全局共享只读(如产品文档摘要、公共知识)
  ├── /org/{org_id}/          ← 组织共享读写(同组织成员可读写)
  │   ├── /user/{user_id}/    ← 用户隔离读写(仅该用户可读写)
  │   │   └── /task/{task_id}/← 任务作用域(临时,任务结束后可提升或清理)
  │   └── /user/{user_id2}/
  └── /org/{org_id2}/
  

四个层级各有不同的权限模型:

作用域读权限写权限生命周期示例
/global/ 所有用户 仅管理员 / 系统 长期 产品功能摘要、公共 FAQ 知识
/org/{id}/ 组织内所有成员 组织内成员 组织存续期间 团队规范、共享项目上下文
/user/{id}/ 仅该用户 仅该用户 用户存续期间 个人偏好、对话历史、配置
/task/{id}/ 任务执行者 任务执行者 任务持续期间 中间步骤状态、暂态工具输出

跨作用域访问规则

作用域隔离的核心原则:默认禁止跨作用域访问,明确授权例外

  1. 向下读取:子作用域可以读取父作用域的内容——/user/42/ 可以读取 /org/1//global/(继承)。但 /user/42/ 不能读取 /user/43/(兄弟隔离)。
  2. 向上写入:默认禁止。子作用域不能向父作用域写入——/user/42/ 不能写入 /org/1/。防止普通用户污染组织级知识。
  3. 同级隔离:严格禁止。两个 /user/ 作用域之间、两个 /org/ 作用域之间,不允许任何直接访问。
  4. 提升(Promotion):唯一的例外。任务作用域中的记忆可以通过显式的 promote() 调用提升到用户作用域——"这次任务中学到的关键经验值得长期保存"。提升需要显式调用,不能自动发生,因为提升意味着跨作用域边界的数据移动。

存储层的命名空间强制执行

作用域隔离不能只依赖应用层检查——必须在存储层强制执行。所有记忆的 key 以作用域路径为前缀:

  # L2 中的 key 格式
  /global/knowledge/product_faq_summary
  /org/acme-corp/config/deployment_region
  /user/42/preferences/color_scheme
  /user/42/task/task-abc123/step_3_output
  

向量检索时,搜索范围始终限定在 scope_prefix 内。即使向量搜索返回了其他用户的相似记忆,scope filter 在结果注入 L0 之前就过滤掉了。具体做法:

  1. 向量的 metadata 中包含 scope_path
  2. 检索时添加 filter:scope_path LIKE '/user/42/%' OR scope_path LIKE '/org/acme-corp/%' OR scope_path = '/global/%'
  3. 过滤后的结果再进行去重和排序

多用户隔离验证

在生产环境部署前,必须通过交叉用户记忆泄漏测试:

  1. 用户 A 写入一条偏好:"我喜欢深色模式"
  2. 用户 B 发起一次对话,询问"你喜欢什么颜色模式?"
  3. 验证:Agent 不应返回用户 A 的偏好。如果返回了——作用域隔离失效,这是高危安全缺陷。

这个测试应该在 CI/CD 流水线中作为回归测试自动运行。

代码:MemoryScope 类

from dataclasses import dataclass
  from enum import Enum
  from typing import Optional

  class ScopeLevel(Enum):
      GLOBAL = "global"
      ORG = "org"
      USER = "user"
      TASK = "task"

  class AccessType(Enum):
      READ = "read"
      WRITE = "write"

  @dataclass
  class ScopePath:
      """作用域路径解析结果"""
      level: ScopeLevel
      global_part: str = "/global/"
      org_id: Optional[str] = None
      user_id: Optional[str] = None
      task_id: Optional[str] = None

      def to_prefix(self):
          parts = ["/global/"]
          if self.org_id:
              parts.append(f"/org/{self.org_id}/")
          if self.user_id:
              parts.append(f"/user/{self.user_id}/")
          if self.task_id:
              parts.append(f"/task/{self.task_id}/")
          return "".join(parts)

      def is_ancestor_of(self, other):
          """self 是否是 other 的祖先作用域"""
          return other.to_prefix().startswith(self.to_prefix())

      def is_sibling_of(self, other):
          """self 和 other 是否是同级作用域的不同实例"""
          if self.level != other.level:
              return False
          if self.level == ScopeLevel.GLOBAL:
              return False  # 全局只有一个
          if self.level == ScopeLevel.ORG:
              return self.org_id != other.org_id
          if self.level == ScopeLevel.USER:
              return self.org_id == other.org_id and self.user_id != other.user_id
          if self.level == ScopeLevel.TASK:
              return self.user_id == other.user_id and self.task_id != other.task_id
          return False

  class MemoryScope:
      """多租户记忆作用域管理器"""

      def __init__(self, storage_backend):
          self.storage = storage_backend

      def check_access(self, requester_scope, target_scope, access_type):
          """验证请求者是否有权访问目标作用域"""
          req = self._parse_scope(requester_scope)
          tgt = self._parse_scope(target_scope)

          # 全局作用域:所有人可读,仅管理员可写
          if tgt.level == ScopeLevel.GLOBAL:
              if access_type == AccessType.READ:
                  return True, "global read allowed for all"
              return False, "global write requires admin"

          # 自己访问自己:始终允许
          if req.to_prefix() == tgt.to_prefix():
              return True, "self access"

          # 祖先访问:子作用域可以读父作用域
          if tgt.is_ancestor_of(req):
              if access_type == AccessType.READ:
                  return True, "ancestor read (inheritance)"
              return False, "cannot write to ancestor scope"

          # 后代访问:可以读但不能写后代
          if req.is_ancestor_of(tgt):
              if access_type == AccessType.READ:
                  return True, "descendant read"
              return False, "cannot write to descendant scope"

          # 兄弟隔离
          if req.is_sibling_of(tgt) or (
              req.level == ScopeLevel.USER and tgt.level == ScopeLevel.USER
              and req.user_id != tgt.user_id):
              return False, "cross-user isolation"

          return False, "access denied"

      def promote(self, from_scope, to_scope, entry_id):
          """将记忆从子作用域提升到父作用域"""
          from_path = self._parse_scope(from_scope)
          to_path = self._parse_scope(to_scope)

          # 只能向祖先作用域提升
          if not from_path.is_ancestor_of(to_path) and not to_path.is_ancestor_of(from_path):
              raise PermissionError(
                  f"Promotion must be between ancestor/descendant scopes. "
                  f"Got {from_scope} → {to_scope}")

          # 如果 to 是 from 的祖先,这是向上提升
          if from_path.to_prefix().startswith(to_path.to_prefix()):
              # 验证:只能从 task 提升到 user,或 user 提升到 org
              valid_promotions = {
                  (ScopeLevel.TASK, ScopeLevel.USER),
                  (ScopeLevel.USER, ScopeLevel.ORG),
                  (ScopeLevel.TASK, ScopeLevel.ORG),
              }
              if (from_path.level, to_path.level) not in valid_promotions:
                  raise PermissionError(
                      f"Invalid promotion path: {from_path.level.value} → {to_path.level.value}")

          entry = self.storage.load(from_path.to_prefix(), entry_id)
          if not entry:
              raise ValueError(f"Entry {entry_id} not found in {from_scope}")

          # 复制到目标作用域
          new_id = self.storage.copy(
              entry, from_prefix=from_path.to_prefix(),
              to_prefix=to_path.to_prefix())
          return new_id

      def build_search_filter(self, scope):
          """为向量/结构化搜索构建作用域过滤条件"""
          path = self._parse_scope(scope)

          # 构造允许搜索的作用域前缀列表
          allowed_prefixes = ["/global/"]

          if path.org_id:
              allowed_prefixes.append(f"/org/{path.org_id}/")
          if path.user_id:
              allowed_prefixes.append(f"/user/{path.user_id}/")
          if path.task_id:
              allowed_prefixes.append(f"/task/{path.task_id}/")

          return {
              "scope_prefix": allowed_prefixes,
              "operator": "OR"
          }

      def _parse_scope(self, scope_str):
          """解析作用域字符串为 ScopePath"""
          path = ScopePath()
          parts = [p for p in scope_str.split("/") if p]

          for i, part in enumerate(parts):
              if part == "global":
                  path.level = ScopeLevel.GLOBAL
              elif part == "org" and i + 1 < len(parts):
                  path.org_id = parts[i + 1]
                  path.level = ScopeLevel.ORG
              elif part == "user" and i + 1 < len(parts):
                  path.user_id = parts[i + 1]
                  path.level = ScopeLevel.USER
              elif part == "task" and i + 1 < len(parts):
                  path.task_id = parts[i + 1]
                  path.level = ScopeLevel.TASK

          return path

  # 使用示例
  scope_mgr = MemoryScope(storage_backend=None)

  # 用户 A 访问自己的记忆 → 允许
  print(scope_mgr.check_access(
      "/global/org/acme/user/42/", "/global/org/acme/user/42/task/abc/",
      AccessType.READ))
  # → (True, 'ancestor read (inheritance)')

  # 用户 A 尝试读用户 B 的记忆 → 拒绝
  print(scope_mgr.check_access(
      "/global/org/acme/user/42/", "/global/org/acme/user/43/",
      AccessType.READ))
  # → (False, 'cross-user isolation')
  

作用域前缀和上下文信封(Context Envelope)的命名空间是一一对应的——每个作用域路径映射到上下文协议中的一个命名空间前缀。具体映射规则参见 Agent 上下文协议设计——作用域管"谁能看到",协议管"怎么传递"。

8. 投产 Checklist 与监控

记忆系统的设计从架构到代码再到部署,最后一公里是确保所有子系统在生产环境中协同工作。本章提供一份投产 Checklist 和监控指标清单,以及一个将前面所有代码片段整合在一起的 MemoryManager 编排器。

投产 Checklist

在 Agent 记忆系统上线前,逐项确认以下清单:

关键监控指标

指标含义健康范围告警阈值
write_rate 每分钟向 L2 写入的记忆条目数 1–20/min > 50/min(可能写入循环或风暴)
read_hit_ratio Pull 检索中返回结果次数 / 总 Pull 次数 > 0.3 < 0.1(相关性门槛可能过高)
dedup_rate 被去重拦截的写入 / 总写入数 0.1–0.5 > 0.7(可能去重过于激进或写入逻辑有 bug)
contradiction_count 每日新检测到的矛盾记忆对数 < 10 > 50(可能版本管理或覆盖逻辑异常)
staleness_p50 / p95 所有活跃记忆的过期分数分布 p50 < 0.3, p95 < 0.7 p50 > 0.5(大量记忆接近淘汰,TTL 或 GC 可能未运行)
scope_count_per_user 每用户的活跃作用域数量 1–10 > 50(可能任务作用域未清理)
pii_block_count 每日被 PII 过滤器阻断的写入次数 0–5 > 20(用户可能在无意中泄露凭据,需要上游修复)

代码:MemoryManager 编排器——整合所有子系统

以下 MemoryManager 将前面所有的子系统——WorkingMemory(第 3 章)、LongTermMemory(第 4 章)、RetrievalBoundary(第 5 章)、MemoryHygiene(第 6 章)、MemoryScope(第 7 章)——整合为一个统一的编排器。它也是 Agent 主循环中唯一需要调用的记忆入口:

from dataclasses import dataclass, field
  from datetime import datetime
  from typing import Optional
  from enum import Enum

  class TaskPhase(Enum):
      STARTUP = "startup"
      EXECUTING = "executing"
      TOOL_CALL = "tool_call"
      TURN_END = "turn_end"
      COMPLETE = "complete"

  @dataclass
  class MemoryManager:
      """Agent 记忆系统编排器——整合 L0-L3 四层 + 检索边界 + 卫生 + 作用域"""

      # 核心组件
      working_memory: object = None        # WorkingMemory 实例(第 3 章)
      session_memory: object = None        # L1 会话存储
      long_term_memory: object = None      # LongTermMemory 实例(第 4 章)
      retrieval_boundary: object = None    # RetrievalBoundary 实例(第 5 章)
      hygiene: object = None               # MemoryHygiene 实例(第 6 章)
      scope: object = None                 # MemoryScope 实例(第 7 章)
      external_retrieval: object = None    # L3 RAG pipeline

      # 配置
      tenant_id: str = ""
      user_id: str = ""
      task_id: str = ""

      # 运行时状态
      current_phase: TaskPhase = TaskPhase.STARTUP
      turn_count: int = 0
      metrics: dict = field(default_factory=lambda: {
          "writes": 0, "reads": 0, "hits": 0, "dedups": 0,
          "contradictions": 0, "pii_blocks": 0, "evictions": 0,
      })

      # ─── Agent 主循环入口 ───

      def on_task_start(self, task_goal, plan_steps, constraints=None):
          """任务启动:初始化 L0 + Push 记忆"""
          self.current_phase = TaskPhase.STARTUP
          self.task_id = self._generate_task_id()

          # 1. 初始化工作记忆
          self.working_memory.update_task(task_goal, plan_steps, constraints)

          # 2. Push:从 L1/L2 注入相关记忆
          task_context = f"task: {task_goal}"
          push_results = self.retrieval_boundary.push(
              self.session_memory, self.long_term_memory,
              task_context, self.long_term_memory.embed)

          # 3. 将 Push 结果追加到 scratchpad(或 constraints)
          if push_results:
              self.working_memory.update_scratchpad(
                  f"[记忆注入] 从 L1/L2 加载了 {len(push_results)} 条相关记忆")
              for r in push_results:
                  self.working_memory.update_scratchpad(
                      f"  · {r.content[:100]}...")
              self.metrics["reads"] += len(push_results)
              self.metrics["hits"] += len(push_results)

          return self.working_memory.to_prompt()

      def on_pre_llm_call(self):
          """每次 LLM 调用前:检查是否需要刷新 Push 记忆"""
          if self.current_phase == TaskPhase.EXECUTING:
              self.retrieval_boundary.refresh_if_needed(
                  self.session_memory, self.long_term_memory,
                  self.working_memory.task_goal)
          return self.working_memory.to_prompt()

      def on_tool_call(self, tool_name, tool_input):
          """工具调用前:Pull 相关记忆"""
          self.current_phase = TaskPhase.TOOL_CALL

          # Pull:检索与工具调用相关的记忆
          query = f"{tool_name} {str(tool_input)[:200]}"
          pull_results = self.retrieval_boundary.pull(
              self.session_memory, self.long_term_memory, query, top_k=3)

          if pull_results:
              self.metrics["reads"] += 1
              self.metrics["hits"] += 1
              context = "\n".join([r.content for r in pull_results])
              self.working_memory.update_scratchpad(
                  f"[Pull: {tool_name}] {context[:300]}")

          return pull_results

      def on_observation(self, tool_name, result, importance="normal"):
          """工具调用完成:写入 L0 + L1"""
          # L0: 工作记忆
          self.working_memory.add_observation(tool_name, result, importance)

          # L1: 会话记忆(write-through)
          entry = {
              "tool_name": tool_name,
              "result": result[:500],
              "importance": importance,
              "timestamp": datetime.now().isoformat(),
          }
          self.session_memory.append(self.task_id, entry)

      def on_turn_end(self):
          """每轮推理结束:推进计划 + 更新计数器"""
          self.turn_count += 1
          self.current_phase = TaskPhase.EXECUTING

          # 如果当前步骤完成,推进计划
          if self.turn_count > 0:
              self.working_memory.advance_plan()

      def on_task_complete(self, task_summary=""):
          """任务完成:从 L1 提升重要记忆到 L2 + 清理"""
          self.current_phase = TaskPhase.COMPLETE

          # 1. 评估 L1 中的重要记忆,提升到 L2
          session_entries = self.session_memory.get_all(self.task_id)
          promoted = 0
          for entry in session_entries:
              if entry.get("importance") in ("critical", "high"):
                  # 卫生检查
                  hygiene_check = self.hygiene.pre_write_check(
                      memory_key=f"task_{self.task_id}_outcome",
                      content=entry["result"],
                      user_id=self.user_id)
                  if not hygiene_check["allowed"]:
                      self.metrics["pii_blocks"] += 1
                      continue

                  # 写入 L2
                  result = self.long_term_memory.write(
                      memory_key=f"task_{self.task_id}_outcome",
                      content=entry["result"],
                      source="task_outcome" if hasattr(
                          self.long_term_memory, 'MemorySource')
                          else "task_outcome",
                      confidence=0.7)
                  promoted += 1
                  self.metrics["writes"] += 1

          # 2. 写入任务学习总结
          if task_summary:
              self.long_term_memory.write(
                  memory_key=f"task_{self.task_id}_learning",
                  content=task_summary,
                  confidence=0.8)
              self.metrics["writes"] += 1

          # 3. 清理任务作用域
          self.session_memory.clear(self.task_id)

          # 4. 执行 GC(每日一次,避免每次任务结束都跑)
          self._maybe_run_gc()

          return {"promoted": promoted, "task_id": self.task_id}

      def search(self, query, layer="l2", top_k=5):
          """Agent 主动搜索记忆(暴露给 LLM 的工具接口)"""
          self.metrics["reads"] += 1

          if layer == "l2":
              results = self.long_term_memory.read(query, top_k=top_k)
              if results:
                  self.metrics["hits"] += 1
              return results

          elif layer == "l1":
              return self.session_memory.search(query, top_k=top_k)

          elif layer == "l3":
              return self.external_retrieval.search(query, top_k=top_k)

          return []

      # ─── 内部辅助 ───

      def _generate_task_id(self):
          import uuid
          return f"task-{str(uuid.uuid4())[:8]}"

      def _maybe_run_gc(self):
          """每日运行一次垃圾回收"""
          today = datetime.now().strftime("%Y-%m-%d")
          last_gc = getattr(self, "_last_gc_date", "")
          if last_gc == today:
              return
          self._last_gc_date = today

          # Dry run 先看
          dry_stats = self.long_term_memory.evict(dry_run=True)
          # 确认后执行
          gc_stats = self.long_term_memory.evict(dry_run=False)
          self.metrics["evictions"] += sum(gc_stats.values())

      def get_metrics(self):
          """导出当前指标"""
          return {
              **self.metrics,
              "hit_ratio": (self.metrics["hits"] / max(self.metrics["reads"], 1)),
              "turn_count": self.turn_count,
              "phase": self.current_phase.value,
          }


  # ─── Agent 主循环集成示例 ───

  # 初始化
  mm = MemoryManager(
      working_memory=WorkingMemory(),
      session_memory=SessionStore(),
      long_term_memory=LongTermMemory(
          user_id="user-42", db_conn=db, vector_store=vs, embed_fn=embed),
      retrieval_boundary=RetrievalBoundary(),
      hygiene=MemoryHygiene(l2_store=l2, audit_log=audit),
      scope=MemoryScope(storage_backend=store),
      tenant_id="acme-corp",
      user_id="user-42",
  )

  # Agent 主循环
  task_goal = "将 user-service 从 v2.1 升级到 v3.0"
  plan = ["检查 breaking changes", "在 staging 部署", "运行集成测试", "金丝雀发布", "全量切换"]

  # 1. 任务启动
  context = mm.on_task_start(task_goal, plan, constraints=["零停机", "不改 schema"])
  # → L0 已填充,Push 记忆已注入

  # 2. 推理 + 工具调用循环
  for turn in range(10):
      # LLM 推理
      context = mm.on_pre_llm_call()
      # llm_response = llm.chat(messages=[system_msg, user_msg, context])

      # 模拟工具调用
      tool_name = "check_breaking_changes"
      mm.on_tool_call(tool_name, {"target_version": "v3.0"})
      mm.on_observation(tool_name, "v3.0 移除了 /api/v1/users,改用 /api/v2/users", "critical")
      mm.on_turn_end()

  # 3. 任务完成
  result = mm.on_task_complete(
      task_summary="v3.0 升级关键:/api/v1/users → /api/v2/users,需更新所有调用方")
  print(f"任务完成:提升了 {result['promoted']} 条记忆到 L2")
  print(f"指标:{mm.get_metrics()}")
  

这个 MemoryManager 是整个记忆系统的单一入口。Agent 开发者不需要分别调用 WorkingMemoryLongTermMemoryRetrievalBoundary——只需在任务生命周期的关键节点调用 MemoryManager 的对应方法。内部的检索边界、卫生检查、作用域隔离全部透明。

关于记忆系统的指标和可观察性,参见 Agent 可观察性——包括如何将这些指标接入 Grafana、Prometheus、Datadog 等监控平台,以及如何配置告警规则。

常见问题(FAQ)

Agent 记忆和 RAG 到底有什么区别?

RAG 是一个检索机制:工具调用 → 搜索外部文档 → 注入结果到上下文。记忆是整个持久化 + 检索系统:决定什么该记、记多久、怎么查、何时淘汰、如何隔离。RAG 可以做 L3 外部检索层;记忆系统包含 L0-L3 全部四层以及写入策略、生命周期、防污染和隔离。

已有的 agent-memory-systems 文章和本文是什么关系?

agent-memory-systems 讲「怎么存」——SQLite schema、向量数据库配置、JSON 持久化。本文讲「怎么设计」——四层架构、检索边界、生命周期管理、防污染。前者是实现手册,后者是建筑蓝图。建议先读前者了解存储基础,再读本文掌握架构设计。

L0 工作记忆和上下文窗口有什么区别?

上下文窗口是 LLM 的物理限制(如 128K tokens)。L0 工作记忆是你选择放进窗口的结构化内容——不是把所有历史消息都塞进去,而是精选 task goal + active plan + recent observations + constraints。L0 是你的设计决策;窗口大小是 LLM 的物理约束。

记忆系统必须要有向量数据库吗?

不必须。L1 会话记忆可以用 Redis/dict,L2 持久记忆的结构化部分(entity facts、user preferences)可以用 SQLite/Postgres。向量数据库只在需要语义搜索时才必要——通常用于 L2 的非结构化记忆(conversation snippets、task learnings)和 L3 外部检索。小规模 Agent 可以完全不用向量搜索。

多租户场景下怎么防止用户 A 的记忆泄露给用户 B?

通过作用域前缀强制隔离:所有记忆 key 以 /user/{user_id}/ 为前缀。检索时,MemoryScope.check_access() 验证请求者的 scope 是否能读目标 memory。跨 scope 查询被防火墙拦截。即使向量搜索返回了其他用户的相似记忆,scope filter 也会在注入 L0 前过滤掉。

记忆系统需要多少存储空间?怎么估算?

粗略估算:每条 L2 记忆 ~2-5KB(结构化字段 + embedding 向量)。一个活跃用户每天产生 10-50 条记忆。100 个日活用户 × 30 天 × 5KB ≈ 150MB/月。加上 embedding 向量(1536 维 × 4 bytes = 6KB/条)≈ 额外 180MB。总计约 330MB/月。配合 TTL 淘汰策略可控制在 500MB 以内。

本文是 Agent Memory and Context Engineering 系列的第一篇。建议按以下路径继续: