Agent 状态机设计:把不可控对话变成可恢复流程

30秒要点

  • 核心问题:对话驱动的 Agent 在长流程中会跳过步骤、重复执行副作用、重启后丢失所有进度——因为"对话历史"不是"任务状态"。LLM 是概率性的,不加以约束就会在对话流中漂移。你需要一个确定性的任务生命周期状态机来包装 Agent 的每一次执行。
  • 解决方案:七状态显式任务生命周期——planned → running → paused → blocked → retrying → done/failed。每个状态有明确的进入条件、允许的事件、合法的转移目标。无效转移被拒绝,不是尽力而为。
  • 关键实现:AgentTaskStateMachine 核心类(~250 行 Python)——TaskState/TaskEvent 枚举、TRANSITION_TABLE 转移表、InvalidTransitionError 守卫、SQLite 持久化、checkpoint/幂等键、恢复策略。含完整生命周期演示和重启恢复验证。
  • 读完能做什么:为你的 Agent 实现一个确定性的任务生命周期包装器——任务在任何时刻都知道自己是谁、处于什么状态、被什么事件驱动、下一步能去哪里、重启后如何恢复。告别"Agent 跑着跑着就不见了"的生产噩梦。

1. 为什么生产级 Agent 需要显式状态机

凌晨两点,一个自动退款审批 Agent 在处理一笔退款。流程很简单:校验订单 → 检查退款策略 → 请求主管审批 → 调用支付网关退款 → 发送通知。Agent 在第 3 步发出审批请求后,支付网关超时了。Agent 的对话历史里写着"审批已通过,继续执行",于是它重试——但这次它跳过了审批校验,直接从第 3 步的结果缓存中读到了"通过",执行了第 4 步退款。三秒后,主管拒绝了这笔退款。但钱已经退了。

同一天下午,同一个 Agent 在处理另一笔退款时,服务器因为内存不足被 OOM Killer 杀掉了。Agent 进程重启,对话历史清空——LLM 上下文窗口里什么都没有。Agent 从头开始:校验订单、检查策略、请求审批……但它不知道这笔退款已经退过了。它又退了一次。

这就是对话驱动 Agent 的三个根本性失败模式

  1. 上下文漂移导致跳步骤(Context Drift → Step Skip):Agent 的"状态"隐含在对话历史中。当对话历史变长、被压缩、被截断或 LLM 注意力稀释时,Agent 可能跳过关键步骤——"审批已通过"和"审批已请求但尚未回复"在对话文本中可能看起来很像,LLM 分不清。
  2. 重试导致重复副作用(Retry → Duplicate Side Effects):Agent 遇到网络超时重试,但它不知道上一次调用实际上已经成功了(响应在网络中丢失)。没有状态记录就没有去重依据。
  3. 重启丢失全部进度(Restart → Total Progress Loss):Agent 进程重启后,对话历史不在持久存储中。Agent 从零开始,不知道已经完成了哪些步骤。

关键洞察:LLM 是概率性的——同样的输入可能产生不同的输出,同样的对话历史可能被理解为不同的"当前状态"。你需要一个确定性的外壳来包裹 LLM 的执行。这个外壳就是状态机——它不依赖 LLM 来判断"我现在该做什么",而是通过显式的状态转移规则来裁定。LLM 在 running 状态内自由推理,但它不能改变任务状态。状态转移发生在 LLM 之外,由结构化事件(工具结果、审批回调、错误信号)驱动,而非由 LLM 生成的文本驱动。

对话驱动 vs 状态机驱动:两种 Agent 架构对比

理解这个区别不需要深入框架源码。看一个简单的对比:


  对话驱动 Agent(状态隐含在聊天历史中):
  ┌─────────────────────────────────────────────┐
  │ User: 帮我处理退款 #12345                   │
  │ Agent: 好的,我先校验订单…                   │
  │ Agent: 订单有效,金额 ¥150.00               │
  │ Agent: 需要主管审批,已发送请求             │
  │ Agent: [等待中…聊天历史说"已发送请求"]       │
  │ --- 聊天历史被截断/压缩/LLM 理解偏差 ---     │
  │ Agent: 审批已通过!执行退款…  ← 错了!       │
  └─────────────────────────────────────────────┘

  状态机驱动 Agent(状态显式存储和校验):
  ┌─────────────────────────────────────────────┐
  │ task.state = PLANNED                        │
  │   → START → task.state = RUNNING            │
  │   → PAUSE_FOR_APPROVAL → task.state = PAUSED│
  │   → [等待审批回调…]                         │
  │   → 审批回调到达 → 校验: PAUSED +            │
  │     APPROVAL_GRANTED → 合法 → task.state =   │
  │     RUNNING                                 │
  │   → COMPLETE → task.state = DONE            │
  │                                              │
  │ 任何时刻:task.state 精确告诉你任务在哪。    │
  │ 上下文丢了?从持久存储恢复 task.state。      │
  │ 进程重启?task.state 还在 SQLite 里。        │
  └─────────────────────────────────────────────┘
  

核心区别:对话驱动 Agent 的"状态"是 200 条聊天消息中的某个语义位置;状态机驱动 Agent 的状态是数据库中的一个确定值。前者依赖 LLM 从对话中推断;后者是一个不可变的、可验证的事实。

关于为什么上下文窗口不可靠,参见 Agent 上下文窗口管理——上下文衰减是物理规律,不是 bug。状态机正是在上下文消失后依然能恢复任务进度的机制。

2. 从对话流到任务生命周期:状态、事件与转移

在深入具体状态设计之前,先建立三个核心概念的精确定义。这三个概念构成了状态机的全部语义:

状态(State)
任务在某一时刻的确定位置。状态回答:"这个任务现在在哪里?"一个任务在任何时刻有且只有一个当前状态。状态是显式的、持久的、可查询的。
事件(Event)
触发状态变更的外部或内部信号。事件回答:"发生了什么,需要任务做出反应?"事件可以是工具调用的结果、人工审批的回调、超时信号、错误码——但不包括 LLM 的文本输出。事件是结构化的、命名的、可审计的。
转移(Transition)
从状态 A 到状态 B 的有向边,由事件触发。转移回答:"给定当前状态和发生的事件,任务应该去哪个状态?"转移规则被编码为转移表,每次转移前强制校验。不允许的转移会抛出错误,而不是静默降级。

转移表:状态机的核心数据结构

状态机的全部行为可以压缩为一张转移表——一个从 (当前状态, 事件)目标状态 的映射。这张表既是文档(一眼看清所有合法路径),也是执行规则(每次转移前查表校验)。


  TRANSITION_TABLE = {
      (PLANNED,  START):                 RUNNING,
      (RUNNING,  PAUSE_FOR_APPROVAL):    PAUSED,
      (RUNNING,  BLOCK_ON_DEPENDENCY):   BLOCKED,
      (RUNNING,  COMPLETE):              DONE,
      (RUNNING,  FATAL_ERROR):           FAILED,
      (RUNNING,  TRANSIENT_ERROR):       RETRYING,
      (PAUSED,   APPROVAL_GRANTED):      RUNNING,
      (PAUSED,   APPROVAL_DENIED):       FAILED,
      (PAUSED,   TIMEOUT):               FAILED,
      (BLOCKED,  DEPENDENCY_RESOLVED):   RUNNING,
      (BLOCKED,  FATAL_ERROR):           FAILED,
      (RETRYING, RETRY):                 RUNNING,
      (RETRYING, MAX_RETRIES_EXCEEDED):  FAILED,
      (RETRYING, FATAL_ERROR):           FAILED,
      # DONE 和 FAILED 是终态,没有任何出边
  }
  

注意这张表里没有什么:

这些"缺失的转移"不是 bug——它们是设计约束。转移表通过"不列出的即非法"来实现流程控制,比代码中的 if-else 检查更清晰、更不容易被绕过。

状态图:七状态的一目了然


                      ┌─────────┐
                      │ PLANNED │
                      └────┬────┘
                           │ START
                           ▼
            ┌──────────────┴──────────────────┐
            │            RUNNING              │◄──────────────────────┐
            └──┬──────┬──────┬──────┬────────┘                       │
               │      │      │      │                                 │
    PAUSE_FOR  │      │      │      │ COMPLETE                        │
    _APPROVAL  │      │      │      │                                 │
               ▼      ▼      ▼      ▼                                 │
          ┌────────┐ ┌───────┐┌─────────┐┌──────┐                     │
          │ PAUSED │ │BLOCKED││RETRYING ││ DONE │                     │
          └───┬──┬─┘ └───┬───┘└────┬────┘└──────┘                     │
              │  │        │         │                                  │
     APPROVAL │  │TIMEOUT │DEPENDENCY  │ RETRY                        │
     _GRANTED │  │        │_RESOLVED   └──────────────────────────────┘
              │  │        │
              │  ▼        ▼
              │ ┌──────────┐
              │ │  FAILED  │◄── MAX_RETRIES_EXCEEDED, FATAL_ERROR,
              │ └──────────┘    APPROVAL_DENIED
              │
              └──────────────────────────────────────────────────────► RUNNING
  

这张图展示了全部 14 条合法转移。注意几个关键结构特征:

无效转移是错误,不是降级行为

状态机最重要的设计决策之一:任何不在转移表中的转移尝试,必须抛出异常,不得静默忽略或降级处理。


  def transition(self, event: TaskEvent, metadata=None) -> TaskState:
      key = (self._state, event)
      next_state = TRANSITION_TABLE.get(key)
      if next_state is None:
          raise InvalidTransitionError(
              f"无效转移: {self._state.value} + {event.value}"
          )
      # ... 执行转移 ...
  

为什么必须是异常而不是 warn 日志?因为调用 transition() 的代码以为状态已经变了——如果静默忽略,调用方会继续基于错误的状态假设执行后续逻辑。异常迫使调用方处理这个情况:要么修 bug(为什么会出现非法事件?),要么处理边界(比如并发竞争导致的事件乱序)。

关于为什么结构化事件的类型和版本管理很重要,参见 Agent 消息 Schema 设计——状态转移事件和 Agent 消息共享相同的类型安全、版本演进和校验原则。

3. 核心状态设计:planned、running、paused、blocked、retrying、done、failed

七个状态覆盖了 Agent 任务从创建到终结的完整生命周期。每个状态有明确的语义、进入条件、允许的操作和检测方式。

planned(已规划)

含义:任务已创建,前置条件尚待验证。这是任务的初始状态。在这个状态下,Agent 不执行任何副作用——它只是在准备。可以校验参数、分配资源、检查依赖服务是否可达。

允许的事件:START。任何其他事件(包括 COMPLETEFATAL_ERROR)在 planned 状态下都是非法的。

为什么需要这个状态:区分"任务创建了"和"任务开始执行了"。在分布式系统中,任务可能被创建但从未被调度(调度器挂了、队列积压了)。planned 让你能监控"有多少任务在排队但还没开始"——这是容量规划的关键指标。

running(执行中)

含义:Agent 正在主动执行任务步骤。这是 LLM 唯一可以自由推理的状态。在这个状态内,Agent 调用工具、分析结果、生成中间输出——但它不能改变自己的任务状态。状态转移由结构化事件触发(工具返回特定结果、外部审批回调到达、错误信号产生),不是由 LLM 文本输出触发。

允许的事件:PAUSE_FOR_APPROVALBLOCK_ON_DEPENDENCYCOMPLETEFATAL_ERRORTRANSIENT_ERROR。从 running 出发有五条边,是所有状态中最多的。

关键约束:LLM 在 running 内可以产生任何输出——但它不能说"任务完成了"。只有 COMPLETE 事件(由工具调用或评估逻辑产生)才能触发到 done 的转移。这是状态机"确定性外壳"的核心体现。

paused(已暂停)

含义:Agent 正在等待外部输入——通常是人工审批、人工反馈或安全审查。这是自愿的等待。Agent 可以继续运行(监控超时、发送提醒),但不能推进任务本身。

允许的事件:APPROVAL_GRANTED(回到 running)、APPROVAL_DENIED(进入 failed)、TIMEOUT(进入 failed)。

与 blocked 的关键区别:paused 是主动等待——"我需要人的决定"。blocked 是被动受阻——"我需要的服务不可用"。前者是流程设计的正常环节;后者是异常情况。

blocked(已阻塞)

含义:Agent 无法继续,因为遇到了不可解决的依赖问题——依赖的 API 返回 503、需要的文件不存在、权限不足无法访问资源。这是非自愿的阻滞。

允许的事件:DEPENDENCY_RESOLVED(回到 running)、FATAL_ERROR(进入 failed)。

与 paused 的关键区别:blocked 不会超时直接进入 failed——依赖恢复的时间不可预测。但可以通过外部监控告警:"这个任务 blocked 了 4 小时了,需要人工介入"。paused 则有明确的超时策略——审批不能无限等待。

retrying(重试中)

含义:Agent 遇到了暂时性错误,正在重新尝试。这是 runningfailed 之间的缓冲状态。

允许的事件:RETRY(回到 running)、MAX_RETRIES_EXCEEDED(进入 failed)、FATAL_ERROR(进入 failed)。

与 running 的关键区别:retrying 意味着上一次尝试失败了。这个区别很重要:(1) 重试需要幂等性保证——不能重复执行已成功的步骤;(2) 重试有次数上限——无限重试等于死循环;(3) 监控需要区分"正常运行"和"正在重试"——这是完全不同的运维语义。

done(已完成)—— 终态

含义:任务成功完成。所有步骤已执行,所有副作用已提交,所有通知已发送。

允许的事件:无。任何事件在 done 状态下都是非法的。终态不可逆转。

failed(已失败)—— 终态

含义:任务无法完成。可能是审批被拒绝、重试耗尽、致命错误、依赖永久失败或暂停超时。

允许的事件:无。和 done 一样是不可逆终态。

但注意:failed 不等于"数据丢失"或"无法追溯"。失败任务的状态历史、checkpoint 数据、错误信息全部保留在持久存储中。操作者可以从失败任务中提取已完成步骤的结果,手动处理剩余部分,甚至创建一个新的任务实例从 checkpoint 继续——但新实例是一个新的 planned 任务,不是旧实例的状态转移。

状态计数:一个任务的生命周期统计

理解这七个状态的另一种方式是看一个典型任务的"状态停留":


  状态      停留时间      说明
  ─────────────────────────────────────────────
  planned   几毫秒~几秒   验证前置条件
  running   几秒~几分钟   Agent 执行任务步骤
  paused    几分钟~几小时 等待人类审批
  blocked   几分钟~几小时 等待依赖恢复
  retrying  几秒~几分钟   退避重试窗口
  done      永久          终态
  failed    永久          终态
  

注意 running 的实际时间占比可能远小于你预期——在生产环境中,Agent 大量时间花在等待(paused/blocked)而非计算(running)。这意味着状态机的主要价值不是控制 running 内的行为,而是管理等待期间的语义和恢复。

4. 暂停与继续:把人工审批、人类反馈和安全门禁放进状态机

大多数 Agent 框架把人工审批当作一个特殊的工具调用或对话消息。"Agent 发了一条消息说'请审批',然后继续执行。"这是危险的——Agent 可能在人类响应之前就基于对话上下文中的"审批通过"跳过等待。

状态机把人工审批提升为一等状态paused。Agent 进入 paused 后,除非收到明确的 APPROVAL_GRANTEDAPPROVAL_DENIED 事件,否则物理上无法进入 running。LLM 无法通过生成"看起来像审批通过"的文本来绕过——因为状态转移不经过 LLM。

审批流程的三条路径


  路径 A: 审批通过(正常流程)
  RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_GRANTED → RUNNING

  路径 B: 审批拒绝(终止流程)
  RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_DENIED → FAILED

  路径 C: 审批超时(保护性终止)
  RUNNING → PAUSE_FOR_APPROVAL → PAUSED → TIMEOUT → FAILED
  

每条路径都不可绕过。Agent 不能从 paused 直接跳到 done;不能在 paused 状态中"继续工作"(在 paused 中你是无法执行任务步骤的——你只能等待事件)。

超时处理:审批不能无限等待

paused 状态有明确的超时策略。典型配置:


  审批超时策略:
    初次等待: 30 分钟
    超时提醒: 15 分钟时发送提醒(Slack / 邮件 / 企业微信)
    超时动作: 标记为 FAILED,原因: "approval_timeout"
    宽限期: 超时后 5 分钟内如果审批到达,可人工从 FAILED 中恢复
            (创建新任务实例,从 checkpoint 继续)
  

注意这里不自动重试——审批是人做的决定,不是网络超时。人的决策不会因为等待时间变长而自动变成"同意"。超时后终止是安全的默认行为。

安全门禁:不止是审批

paused 状态不仅用于人工审批。任何需要"在执行副作用之前暂停并验证"的场景都可以用:

与外部审批系统的集成模式

状态机不绑定特定的审批 UI。它只定义事件接口。以下是集成模式:


  # Webhook 接收审批回调
  @app.route("/approval-callback", methods=["POST"])
  def handle_approval():
      data = request.json
      task_id = data["task_id"]
      decision = data["decision"]  # "approved" 或 "denied"

      sm = load_state_machine(task_id)  # 从持久存储恢复
      event = APPROVAL_GRANTED if decision == "approved" else APPROVAL_DENIED
      sm.transition(event, metadata={
          "approver": data["approver"],
          "comment": data.get("comment", ""),
          "timestamp": data["timestamp"]
      })
  

审批源可以是 Slack 按钮、Jira 状态变更、企业微信审批、自定义 Web 面板——只要它能发送一个带 task_iddecision 的 HTTP 请求,就能驱动状态机。

关于审批工作流的 UI/UX 设计模式和更完整的审批集成方案,参见 Agent 人工审批工作流——本文提供底层状态机,那篇文章提供上层交互设计。

重申:paused 不是失败。很多团队看到 Agent 停在 paused 就报警——这是对状态机语义的误解。paused 是正常的、设计的、预期内的等待状态。你的监控面板应该把 pausedblocked 分开显示,而不是统一归类为"非 running"。

5. 失败与重试:避免重复执行、跳步骤和无限循环

重试是分布式系统中最容易出错的操作——不是因为重试本身复杂,而是因为重试时的状态假设经常是错的。状态机的重试设计围绕三个保证:

  1. 去重(No Duplicate Effects):重试不能重复执行已经成功的步骤。
  2. 不跳步(No Step Skip):重试不能越过未完成的步骤直接跳到后面。
  3. 有界重试(Bounded Retries):重试有次数上限和退避策略,不会无限循环。

重试状态机:一个完整的重试生命周期


  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=0)
  RETRYING → [退避等待 2^0 = 1秒] → RETRY → RUNNING (retry_count=1)
  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=1)
  RETRYING → [退避等待 2^1 = 2秒] → RETRY → RUNNING (retry_count=2)
  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=2)
  RETRYING → [退避等待 2^2 = 4秒] → RETRY → RUNNING (retry_count=3)
  RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=3)
  RETRYING → MAX_RETRIES_EXCEEDED → FAILED
  

注意关键设计:

重试退避策略


  import time

  def retry_with_backoff(sm: AgentTaskStateMachine, max_retries=3):
      """执行重试循环,带指数退避。"""
      while sm.state == TaskState.RETRYING:
          backoff = 2 ** sm.retry_count  # 指数退避: 2, 4, 8...
          print(f"[retry] 退避等待 {backoff}s (第 {sm.retry_count} 次重试)")
          time.sleep(backoff)

          try:
              sm.transition(TaskEvent.RETRY)
              return  # 回到 RUNNING,调用方继续执行
          except InvalidTransitionError:
              # 超过最大重试次数,使用 MAX_RETRIES_EXCEEDED
              sm.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
              raise
  

幂等性:状态机只能防止重复触发,不能保证外部幂等

这是本文最重要的诚实声明之一:

状态机保证 Agent 不会对同一个步骤发起两次调用。但它不能保证外部系统不会处理两次——如果第一次调用的响应在网络中丢失,而第二次调用恰好被外部系统当作新请求处理。

为了实现更强的幂等性,需要两层的配合:

  1. Agent 侧(状态机层):在执行副作用前写入幂等键 checkpoint,执行前检查该键是否已存在。
  2. 外部系统侧:外部 API 支持幂等键(如 Stripe 的 Idempotency-Key header),在服务端去重。

  # Agent 侧幂等键模式
  def execute_with_idempotency(sm, step_name, action):
      idempotency_key = f"{sm.task_id}:{step_name}"

      # 检查是否已执行过
      if sm.get_checkpoint(idempotency_key) == "done":
          print(f"[idempotency] 步骤 '{step_name}' 已完成,跳过")
          return sm.get_checkpoint(f"{idempotency_key}:result")

      # 标记为"执行中"(防止并发重复)
      sm.set_checkpoint(idempotency_key, "executing")

      try:
          result = action(idempotency_key)  # 将幂等键传递给外部系统
          sm.set_checkpoint(idempotency_key, "done")
          sm.set_checkpoint(f"{idempotency_key}:result", result)
          return result
      except Exception as e:
          # 执行失败,不标记为 done,下次重试会重新执行
          sm.set_checkpoint(idempotency_key, "failed")
          raise
  

关键模式:先标记 executing,再执行操作,成功后标记 done。如果执行中崩溃,下次恢复时看到 executing 就知道那次执行的结果不确定——需要查询外部系统确认,而不是盲目重试或盲目跳过。

可重试 vs 不可重试的错误

错误类型示例重试?处理
暂时性网络错误连接超时、DNS 解析失败✓ 是TRANSIENT_ERROR → RETRYING
服务暂时不可用503 Service Unavailable、429 Rate Limited✓ 是TRANSIENT_ERROR → RETRYING(注意退避)
响应丢失请求成功但响应在网络中丢失✓ 谨慎用幂等键查询外部系统状态再决定
参数校验失败422 Unprocessable Entity✗ 否FATAL_ERROR → FAILED(重试不会改变结果)
权限不足403 Forbidden✗ 否FATAL_ERROR → FAILED
数据损坏必填字段为空、格式错误✗ 否FATAL_ERROR → FAILED

核心判断标准:如果重试时所有输入不变,结果会改变吗?如果答案不变(同一请求永远返回 422),则不应重试。如果答案可能改变(网络恢复、服务恢复),则可以重试。

6. 状态存储与恢复:让 Agent 在重启、断线和上下文清空后继续工作

这是状态机最核心的价值——任务状态是持久的。Agent 进程可以被杀、服务器可以重启、LLM 上下文可以清空,但任务状态还在。

什么必须持久化

一个完整的状态存储方案需要持久化以下数据:


  ┌─────────────────────────────────────────────────┐
  │              task_checkpoints 表                  │
  ├──────────────┬──────────────────────────────────┤
  │ task_id      │ 任务唯一标识                       │
  │ state        │ 当前状态 (planned/running/...)     │
  │ retry_count  │ 当前重试次数                       │
  │ checkpoint   │ JSON: {步骤结果, 中间数据, 幂等键} │
  │ updated_at   │ 最后更新时间                       │
  └──────────────┴──────────────────────────────────┘

  ┌─────────────────────────────────────────────────┐
  │            state_transitions 表                  │
  ├──────────────┬──────────────────────────────────┤
  │ id           │ 自增主键                           │
  │ task_id      │ 任务唯一标识                       │
  │ from_state   │ 转移前状态                         │
  │ to_state     │ 转移后状态                         │
  │ event        │ 触发事件                           │
  │ timestamp    │ ISO 8601 时间戳                    │
  │ metadata     │ JSON: {错误信息, 审批人, ...}      │
  └──────────────┴──────────────────────────────────┘
  

注意状态转移表是追加写的——每行是一个转移事件,从不更新或删除。这意味着你拥有完整的审计轨迹:在任何时刻,你可以回溯一个任务从 planneddone/failed 的每一步。

存储后端选择

后端适用场景注意事项
SQLite单机 Agent、Demo、本地工具简单、零依赖、事务安全。不适合并发写入——多 Agent 实例共享一个 SQLite 文件会导致锁竞争。生产环境使用 WAL 模式。
PostgreSQL生产 Agent、多实例部署支持并发、连接池、主从复制。任务状态存储是关键基础设施——如果状态库挂了,所有 Agent 都无法转移状态(fail closed,安全)。
Redis + AOF低延迟场景内存快,但持久化弱于 PostgreSQL。如果使用 Redis,必须开启 AOF 持久化(appendonly yes),并接受重启时可能丢失最后几秒的转移记录。
文件 JSON原型、Demo、单文件工具最简单但不适合生产——无事务、无并发控制、无恢复保证。

恢复策略:处理四种残留状态

Agent 重启后,从持久存储加载任务状态,然后根据当前状态执行恢复策略:


  def recover(self):
      saved = self.persistence.load_checkpoint(self.task_id)
      if saved:
          self._state = TaskState(saved["state"])
          self._retry_count = saved.get("retry_count", 0)
          self._checkpoint = saved.get("checkpoint_data", {})
      self._history = self.persistence.load_history(self.task_id)

      # 处理残留状态
      if self._state == TaskState.RUNNING:
          # 进程在 RUNNING 时崩溃了 → 标记为需要重试
          self.transition(TaskEvent.TRANSIENT_ERROR,
              metadata={"reason": "recovery_stale_running"})

      elif self._state == TaskState.RETRYING:
          # 进程在 RETRYING 时崩溃了 → 检查重试次数后重试
          if self._retry_count < self._max_retries:
              self.transition(TaskEvent.RETRY)
          else:
              self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)

      elif self._state == TaskState.PAUSED:
          # 检查审批是否超时
          paused_at = self._get_last_transition_time()
          if time_since(paused_at) > APPROVAL_TIMEOUT:
              self.transition(TaskEvent.TIMEOUT,
                  metadata={"reason": "recovery_approval_timeout"})

      elif self._state == TaskState.BLOCKED:
          # 检查依赖是否恢复(重新探测依赖服务)
          if self._check_dependency():
              self.transition(TaskEvent.DEPENDENCY_RESOLVED)
          # 依赖仍然不可用 → 保持 BLOCKED,等待外部监控告警

      return self
  

恢复策略的默认行为应偏向安全:

任务状态恢复 vs LLM 上下文恢复:两件不同的事

关键区分:状态机恢复的是任务级别的状态——任务进行到哪一步了、检查点数据是什么、哪些步骤已完成。它恢复 LLM 的对话历史。Agent 重启后,LLM 面对一个空的上下文窗口。Agent 从状态机读取任务状态和 checkpoint 数据,构建一个新的上下文摘要("你正在执行任务 X,已完成步骤 1-3,步骤 3 的结果是 Y,现在需要执行步骤 4"),然后让 LLM 从那个点继续推理。对话历史丢失了,但任务进度没有丢失。

关于上下文窗口的恢复策略,参见 Agent 上下文窗口管理——跨窗口状态管理部分详细讨论了上下文重置后如何用 checkpoint 重建 LLM 的工作上下文。关于记忆系统的持久化架构(L0-L3 各层如何存储),参见 Agent 记忆系统设计——任务 checkpoint 和语义记忆是不同的存储层次。

7. 可观测状态机:日志、指标、审计轨迹与告警

状态机不仅是一个执行模型——它本身就是一个可观测性原语。每一次状态转移都是一条不可变的事件记录,天然适合构建审计轨迹、指标面板和告警规则。

转移事件:观测的基本单位

每次状态转移产生一条结构化事件:


  {
      "task_id": "task_a1b2c3d4",
      "from_state": "running",
      "to_state": "paused",
      "event": "pause_for_approval",
      "timestamp": "2026-06-05T14:22:31.123456+00:00",
      "metadata": {
          "step": "refund_approval",
          "amount": 150.00,
          "requested_by": "agent_refund_processor"
      }
  }
  

这条事件记录了什么、为什么、何时发生。所有转移事件按时间排序就是任务的完整审计轨迹——不需要额外的"审计日志系统",状态转移事件就是审计日志。

可观测性钩子:在每次转移时触发


  # on_transition 钩子:每次状态转移后调用
  def on_transition(transition: StateTransition):
      # 1. 结构化日志
      logger.info(json.dumps({
          "event": "state_transition",
          "task_id": transition.task_id,
          "from": transition.from_state.value,
          "to": transition.to_state.value,
          "trigger": transition.event.value,
          "timestamp": transition.timestamp,
          "metadata": transition.metadata
      }))

      # 2. 指标埋点
      metrics.increment(f"transition.{transition.to_state.value}")
      metrics.gauge(f"task.{transition.task_id}.state",
                    transition.to_state.value)

      # 3. 告警检查
      if transition.to_state == TaskState.RETRYING:
          retry_count = transition.metadata.get("retry_count", 0)
          if retry_count >= 3:
              alerts.send("high_retry_count",
                  f"Task {transition.task_id} retried {retry_count} times")
  

关于结构化事件 schema 的类型和版本管理,参见 Agent 消息 Schema 设计——状态转移事件应该像 Agent 消息一样有 typed、versioned、validated 的 schema。

核心指标

以下是应该从状态转移流中计算的关键指标:

指标计算方式用途
time_in_state当前时间 - 进入当前状态的时间戳SLA 监控:任务在 running 中多长时间了?在 paused 中等了多久?
transition_counts按事件类型聚合计数异常检测:重试次数突增?审批拒绝率异常?
state_distribution各状态下的任务数量容量规划:多少任务在排队(planned)?多少在等待(paused/blocked)?
retry_rateretrying 转移数 / 总转移数健康评估:重试率升高说明依赖服务不稳定
mean_time_to_recovery从 paused/blocked/retrying 回到 running 的平均时间恢复效率:审批响应时间、依赖恢复时间、重试退避窗口
invalid_transition_attemptsInvalidTransitionError 抛出次数Bug 检测:事件乱序、并发竞争、调用方逻辑错误

告警规则

基于状态机指标的告警应该检测以下条件:


  告警规则:
    1. task_in_running_too_long:
       条件: time_in_state(RUNNING) > 30min
       动作: 通知 oncall,附 task_id 和最后 checkpoint
       含义: Agent 可能卡住了(死循环、等待无响应的工具调用)

    2. task_retry_flapping:
       条件: retry_count >= 3 且持续在 RETRYING ↔ RUNNING 间摆动
       动作: 告警升级,人工介入
       含义: 重试退避太短,或依赖问题不是暂时的

    3. task_paused_abandoned:
       条件: time_in_state(PAUSED) > 4h
       动作: 通知审批人 + oncall
       含义: 审批可能被遗忘,需要提醒或升级

    4. task_blocked_prolonged:
       条件: time_in_state(BLOCKED) > 2h
       动作: 通知 oncall
       含义: 依赖服务长时间不可用,需要人工介入

    5. too_many_failed_tasks:
       条件: failed_count / total_tasks > 0.3(滑动窗口)
       动作: 紧急告警
       含义: 系统性问题——可能是依赖故障或配置错误

    6. invalid_transition_spike:
       条件: invalid_transition_attempts > 10/min
       动作: 通知 oncall
       含义: 调用方代码 bug 或并发竞争导致的事件乱序
  

关于更完整的 Agent 可观测性体系(指标收集、面板构建、告警管道),参见 Agent 可观察性——本文侧重状态转移产生的观测信号,那篇文章覆盖整个 Agent 的监控基础设施。关于审计日志的不可变性、保留策略和合规要求,参见 Agent 审计日志设计——状态转移历史就是一种审计日志的实现形式。

日志级别指南


  日志级别:
    INFO:   每次成功的状态转移
            示例: "Task abc123: RUNNING → PAUSED (pause_for_approval)"

    WARN:   非法转移尝试(被状态机拦截)
            示例: "Task abc123: 拒绝转移 PAUSED + COMPLETE"
            重要——这是 bug 或攻击的信号

    ERROR:  状态持久化失败
            示例: "Task abc123: 无法写入转移记录到数据库"
            此时转移应失败(fail closed),任务停留在当前状态

    DEBUG:  状态机内部细节(转移表查找、checkpoint 读写)
            仅开发环境启用
  

8. 完整示例:一个可恢复 AgentTaskStateMachine 的 Python 骨架

以下是一个可运行的 Python 参考实现,展示了本文讨论的全部概念——状态枚举、转移表、无效转移守卫、SQLite 持久化、checkpoint、恢复策略和生命周期演示。这不是一个生产级库(缺少连接池、迁移、水平扩展等),而是一个生产模式骨架,你可以据此适配自己的 Agent。

完整代码


  """
  AgentTaskStateMachine — 可恢复的 Agent 任务状态机参考实现
  ==========================================================

  这是一个生产模式骨架,不是可直接部署的库。
  缺少:连接池、数据库迁移、水平扩展、安全加固。
  提供了:完整的七状态生命周期、转移校验、持久化、恢复。

  依赖:Python 3.9+(仅标准库 + sqlite3)
  """

  from enum import Enum
  from dataclasses import dataclass, field
  from datetime import datetime, timezone
  from typing import Optional, Dict, List, Any
  import json
  import sqlite3
  import time
  import uuid


  # ── 状态枚举 ──────────────────────────────────────────
  class TaskState(str, Enum):
      """Agent 任务的显式生命周期状态。"""
      PLANNED = "planned"      # 任务已创建,前置条件尚未验证
      RUNNING = "running"      # Agent 正在执行任务步骤
      PAUSED = "paused"        # 等待人工审批或外部输入
      BLOCKED = "blocked"      # 无法继续——依赖不可用
      RETRYING = "retrying"    # 重试中(上一次尝试失败)
      DONE = "done"            # 终态:成功
      FAILED = "failed"        # 终态:不可恢复的失败


  # ── 事件枚举 ──────────────────────────────────────────
  class TaskEvent(str, Enum):
      """触发状态转移的事件。"""
      START = "start"
      PAUSE_FOR_APPROVAL = "pause_for_approval"
      APPROVAL_GRANTED = "approval_granted"
      APPROVAL_DENIED = "approval_denied"
      BLOCK_ON_DEPENDENCY = "block_on_dependency"
      DEPENDENCY_RESOLVED = "dependency_resolved"
      TRANSIENT_ERROR = "transient_error"
      RETRY = "retry"
      MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
      COMPLETE = "complete"
      FATAL_ERROR = "fatal_error"
      TIMEOUT = "timeout"


  # ── 转移表 ────────────────────────────────────────────
  # 格式: (当前状态, 事件) → 目标状态
  # 不在表中的转移 = 非法 → 抛出 InvalidTransitionError
  TRANSITION_TABLE: Dict[tuple, TaskState] = {
      (TaskState.PLANNED,   TaskEvent.START):                  TaskState.RUNNING,
      (TaskState.RUNNING,   TaskEvent.PAUSE_FOR_APPROVAL):     TaskState.PAUSED,
      (TaskState.RUNNING,   TaskEvent.BLOCK_ON_DEPENDENCY):    TaskState.BLOCKED,
      (TaskState.RUNNING,   TaskEvent.COMPLETE):               TaskState.DONE,
      (TaskState.RUNNING,   TaskEvent.FATAL_ERROR):            TaskState.FAILED,
      (TaskState.RUNNING,   TaskEvent.TRANSIENT_ERROR):        TaskState.RETRYING,
      (TaskState.PAUSED,    TaskEvent.APPROVAL_GRANTED):       TaskState.RUNNING,
      (TaskState.PAUSED,    TaskEvent.APPROVAL_DENIED):        TaskState.FAILED,
      (TaskState.PAUSED,    TaskEvent.TIMEOUT):                TaskState.FAILED,
      (TaskState.BLOCKED,   TaskEvent.DEPENDENCY_RESOLVED):    TaskState.RUNNING,
      (TaskState.BLOCKED,   TaskEvent.FATAL_ERROR):            TaskState.FAILED,
      (TaskState.RETRYING,  TaskEvent.RETRY):                  TaskState.RUNNING,
      (TaskState.RETRYING,  TaskEvent.MAX_RETRIES_EXCEEDED):   TaskState.FAILED,
      (TaskState.RETRYING,  TaskEvent.FATAL_ERROR):            TaskState.FAILED,
      # DONE 和 FAILED 是终态:没有任何出边
  }


  # ── 数据类 ────────────────────────────────────────────
  @dataclass
  class StateTransition:
      """一次状态转移的不可变记录。"""
      task_id: str
      from_state: TaskState
      to_state: TaskState
      event: TaskEvent
      timestamp: str
      metadata: Dict[str, Any] = field(default_factory=dict)


  class InvalidTransitionError(Exception):
      """当尝试的转移不在转移表中时抛出。"""
      pass


  # ── 状态持久化(SQLite) ──────────────────────────────
  class StatePersistence:
      """基于 SQLite 的状态持久化。

      生产环境注意事项:
      - 多实例部署应使用 PostgreSQL
      - SQLite 仅适用于单机 demo 或单进程 Agent
      - 使用 WAL 模式减少锁竞争
      """

      def __init__(self, db_path: str = "agent_state.db"):
          self.conn = sqlite3.connect(db_path)
          self.conn.execute("PRAGMA journal_mode=WAL")
          self._init_schema()

      def _init_schema(self):
          self.conn.executescript("""
              CREATE TABLE IF NOT EXISTS task_checkpoints (
                  task_id TEXT PRIMARY KEY,
                  state TEXT NOT NULL,
                  retry_count INTEGER DEFAULT 0,
                  checkpoint_data TEXT DEFAULT '{}',
                  updated_at TEXT NOT NULL
              );
              CREATE TABLE IF NOT EXISTS state_transitions (
                  id INTEGER PRIMARY KEY AUTOINCREMENT,
                  task_id TEXT NOT NULL,
                  from_state TEXT NOT NULL,
                  to_state TEXT NOT NULL,
                  event TEXT NOT NULL,
                  timestamp TEXT NOT NULL,
                  metadata TEXT DEFAULT '{}'
              );
              CREATE INDEX IF NOT EXISTS idx_transitions_task
                  ON state_transitions(task_id, timestamp);
          """)
          self.conn.commit()

      def save_transition(self, t: StateTransition) -> None:
          self.conn.execute(
              "INSERT INTO state_transitions "
              "(task_id, from_state, to_state, event, timestamp, metadata) "
              "VALUES (?, ?, ?, ?, ?, ?)",
              (t.task_id, t.from_state.value, t.to_state.value,
               t.event.value, t.timestamp, json.dumps(t.metadata))
          )
          self.conn.commit()

      def save_checkpoint(self, task_id: str, state: TaskState,
                          retry_count: int, checkpoint_data: dict = None) -> None:
          self.conn.execute(
              """INSERT OR REPLACE INTO task_checkpoints
                 (task_id, state, retry_count, checkpoint_data, updated_at)
                 VALUES (?, ?, ?, ?, ?)""",
              (task_id, state.value, retry_count,
               json.dumps(checkpoint_data or {}),
               datetime.now(timezone.utc).isoformat())
          )
          self.conn.commit()

      def load_checkpoint(self, task_id: str) -> Optional[dict]:
          row = self.conn.execute(
              "SELECT state, retry_count, checkpoint_data "
              "FROM task_checkpoints WHERE task_id = ?",
              (task_id,)
          ).fetchone()
          if row:
              return {
                  "state": row[0],
                  "retry_count": row[1],
                  "checkpoint_data": json.loads(row[2])
              }
          return None

      def load_history(self, task_id: str) -> List[StateTransition]:
          rows = self.conn.execute(
              "SELECT task_id, from_state, to_state, event, timestamp, metadata "
              "FROM state_transitions WHERE task_id = ? ORDER BY id",
              (task_id,)
          ).fetchall()
          return [
              StateTransition(
                  task_id=r[0],
                  from_state=TaskState(r[1]),
                  to_state=TaskState(r[2]),
                  event=TaskEvent(r[3]),
                  timestamp=r[4],
                  metadata=json.loads(r[5])
              ) for r in rows
          ]


  # ── 状态机核心类 ──────────────────────────────────────
  class AgentTaskStateMachine:
      """
      可恢复的 Agent 任务状态机。

      保证:
      - 每次状态转移经过转移表校验
      - 每次转移被持久化(可审计、可恢复)
      - 重启后可恢复任务状态
      """

      def __init__(self, task_id: str, persistence: StatePersistence,
                   max_retries: int = 3):
          self.task_id = task_id
          self.persistence = persistence
          self._state: TaskState = TaskState.PLANNED
          self._history: List[StateTransition] = []
          self._retry_count: int = 0
          self._max_retries: int = max_retries
          self._checkpoint: Dict[str, Any] = {}
          self._on_transition_callbacks: List[callable] = []

      # ── 属性 ──────────────────────────────────────────
      @property
      def state(self) -> TaskState:
          return self._state

      @property
      def retry_count(self) -> int:
          return self._retry_count

      @property
      def history(self) -> List[StateTransition]:
          return list(self._history)

      # ── 可观测性钩子 ──────────────────────────────────
      def on_transition(self, callback: callable) -> None:
          """注册转移回调(日志、指标、告警)。"""
          self._on_transition_callbacks.append(callback)

      def _notify_observers(self, transition: StateTransition) -> None:
          for cb in self._on_transition_callbacks:
              try:
                  cb(transition)
              except Exception:
                  pass  # 观测失败不应影响转移本身

      # ── 核心转移方法 ──────────────────────────────────
      def transition(self, event: TaskEvent,
                     metadata: Dict[str, Any] = None) -> TaskState:
          """
          尝试状态转移。不在转移表中的转移抛出 InvalidTransitionError。
          """
          key = (self._state, event)
          next_state = TRANSITION_TABLE.get(key)
          if next_state is None:
              raise InvalidTransitionError(
                  f"无效转移: {self._state.value} + {event.value}"
              )

          # 重试计数检查
          if event == TaskEvent.RETRY:
              self._retry_count += 1
              if self._retry_count > self._max_retries:
                  raise InvalidTransitionError(
                      f"超过最大重试次数 ({self._max_retries})——"
                      f"请使用 MAX_RETRIES_EXCEEDED 事件"
                  )

          # 构建转移记录
          transition_record = StateTransition(
              task_id=self.task_id,
              from_state=self._state,
              to_state=next_state,
              event=event,
              timestamp=datetime.now(timezone.utc).isoformat(),
              metadata=metadata or {},
          )

          # 执行转移
          self._history.append(transition_record)
          self._state = next_state

          # 持久化
          self.persistence.save_transition(transition_record)
          self.persistence.save_checkpoint(
              self.task_id, self._state, self._retry_count,
              self._checkpoint
          )

          # 通知观测者
          self._notify_observers(transition_record)

          return self._state

      # ── Checkpoint 管理 ───────────────────────────────
      def set_checkpoint(self, key: str, value: Any) -> None:
          """存储中间数据供恢复使用。"""
          self._checkpoint[key] = value

      def get_checkpoint(self, key: str) -> Optional[Any]:
          """读取 checkpoint 数据。"""
          return self._checkpoint.get(key)

      def has_checkpoint(self, key: str) -> bool:
          """检查 checkpoint 是否存在。"""
          return key in self._checkpoint

      # ── 幂等执行辅助 ──────────────────────────────────
      def execute_idempotent(self, step_name: str,
                             action: callable) -> Any:
          """
          幂等执行一个步骤。

          action 应接收 idempotency_key 作为参数,并传递给外部系统。
          如果步骤已完成(checkpoint 中存在),直接返回缓存结果。
          """
          idempotency_key = f"{self.task_id}:{step_name}"

          # 已完成?直接返回缓存结果
          if self.get_checkpoint(idempotency_key) == "done":
              return self.get_checkpoint(f"{idempotency_key}:result")

          # 标记为执行中
          self.set_checkpoint(idempotency_key, "executing")

          try:
              result = action(idempotency_key)
              self.set_checkpoint(idempotency_key, "done")
              self.set_checkpoint(f"{idempotency_key}:result", result)
              return result
          except Exception:
              raise

      # ── 恢复 ──────────────────────────────────────────
      def recover(self) -> "AgentTaskStateMachine":
          """从持久存储恢复任务状态。"""
          saved = self.persistence.load_checkpoint(self.task_id)
          if saved:
              self._state = TaskState(saved["state"])
              self._retry_count = saved.get("retry_count", 0)
              self._checkpoint = saved.get("checkpoint_data", {})

          self._history = self.persistence.load_history(self.task_id)

          # 处理残留状态
          if self._state == TaskState.RUNNING:
              self.transition(TaskEvent.TRANSIENT_ERROR,
                  metadata={"reason": "recovery_stale_running"})

          elif self._state == TaskState.RETRYING:
              if self._retry_count < self._max_retries:
                  self.transition(TaskEvent.RETRY)
              else:
                  self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)

          elif self._state == TaskState.PAUSED:
              last_t = self._history[-1] if self._history else None
              if last_t and last_t.metadata.get("timeout_after"):
                  elapsed = (datetime.now(timezone.utc) -
                             datetime.fromisoformat(last_t.timestamp))
                  if elapsed.total_seconds() > last_t.metadata["timeout_after"]:
                      self.transition(TaskEvent.TIMEOUT,
                          metadata={"reason": "recovery_approval_timeout"})

          elif self._state == TaskState.BLOCKED:
              pass  # 保持 blocked,依赖外部告警

          return self

      # ── 诊断方法 ──────────────────────────────────────
      def summary(self) -> dict:
          """返回任务当前状态的摘要。"""
          return {
              "task_id": self.task_id,
              "state": self._state.value,
              "retry_count": self._retry_count,
              "transition_count": len(self._history),
              "checkpoint_keys": list(self._checkpoint.keys()),
              "is_terminal": self._state in (TaskState.DONE, TaskState.FAILED),
          }


  # ── 使用演示 ──────────────────────────────────────────
  def demo_full_lifecycle():
      """演示完整的任务生命周期,包括暂停、重试和恢复。"""
      print("=" * 60)
      print("AgentTaskStateMachine 生命周期演示")
      print("=" * 60)

      persistence = StatePersistence(":memory:")
      task_id = f"task_{uuid.uuid4().hex[:8]}"
      sm = AgentTaskStateMachine(task_id=task_id, persistence=persistence)

      sm.on_transition(lambda t: print(
          f"  [观测] {t.from_state.value} → {t.to_state.value} "
          f"({t.event.value})"
      ))

      # 1. planned → running
      sm.transition(TaskEvent.START)
      print(f"[{sm.state.value}] 任务开始\n")

      # 2. running → paused(需要人工审批)
      sm.transition(TaskEvent.PAUSE_FOR_APPROVAL,
                    metadata={"step": "refund_approval", "amount": 150.00})
      print(f"[{sm.state.value}] 等待人工审批\n")

      # 3. paused → running(审批通过)
      sm.transition(TaskEvent.APPROVAL_GRANTED,
                    metadata={"approver": "[email protected]"})
      print(f"[{sm.state.value}] 审批通过,继续执行\n")

      # 4. running → retrying(暂时性错误)
      sm.transition(TaskEvent.TRANSIENT_ERROR,
                    metadata={"error": "rate_limit", "step": "send_notification"})
      print(f"[{sm.state.value}] 暂时性错误,"
            f"重试 {sm.retry_count}/{sm._max_retries}\n")

      # 5. retrying → running(重试)
      sm.transition(TaskEvent.RETRY)
      print(f"[{sm.state.value}] 重试中...\n")

      # 6. running → done
      sm.transition(TaskEvent.COMPLETE,
                    metadata={"result": "refund_processed"})
      print(f"[{sm.state.value}] 任务完成\n")

      # 7. 测试非法转移
      print("--- 测试非法转移 ---")
      try:
          sm.transition(TaskEvent.START)
      except InvalidTransitionError as e:
          print(f"[守卫] 非法转移被拦截: {e}\n")

      # 8. 查看转移历史
      print("--- 转移历史 ---")
      for i, t in enumerate(sm.history, 1):
          print(f"  {i}. {t.from_state.value} → {t.to_state.value} "
                f"({t.event.value}) @ {t.timestamp[:19]}")

      print(f"\n--- 摘要 ---")
      for k, v in sm.summary().items():
          print(f"  {k}: {v}")


  def demo_recovery():
      """演示重启恢复。"""
      print("\n" + "=" * 60)
      print("重启恢复演示")
      print("=" * 60)

      persistence = StatePersistence(":memory:")
      task_id = "recovered_task_001"

      sm1 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
      sm1.transition(TaskEvent.START)
      sm1.transition(TaskEvent.COMPLETE)
      print(f"实例1: 任务执行完毕,状态 = {sm1.state.value}")

      sm2 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
      sm2.recover()
      print(f"实例2: 恢复后状态 = {sm2.state.value}")

      assert sm2.state == TaskState.DONE, "恢复失败!"
      print("✓ 恢复验证通过")
      print(f"  恢复的历史记录数: {len(sm2.history)}")


  def demo_idempotent_execution():
      """演示幂等执行。"""
      print("\n" + "=" * 60)
      print("幂等执行演示")
      print("=" * 60)

      persistence = StatePersistence(":memory:")
      sm = AgentTaskStateMachine(task_id="idempotent_demo", persistence=persistence)
      sm.transition(TaskEvent.START)

      call_count = [0]

      def external_call(idempotency_key: str) -> str:
          call_count[0] += 1
          print(f"  → 外部调用 #{call_count[0]} "
                f"(idempotency_key={idempotency_key})")
          return f"result_{call_count[0]}"

      r1 = sm.execute_idempotent("step_refund", external_call)
      print(f"  第一次: {r1} (外部调用次数: {call_count[0]})")

      r2 = sm.execute_idempotent("step_refund", external_call)
      print(f"  第二次: {r2} (外部调用次数: {call_count[0]})")

      assert call_count[0] == 1, "重复执行了!应该只调用一次"
      assert r1 == r2, "两次结果不一致!"
      print("✓ 幂等验证通过")


  if __name__ == "__main__":
      demo_full_lifecycle()
      demo_recovery()
      demo_idempotent_execution()
  

运行输出


  ============================================================
  AgentTaskStateMachine 生命周期演示
  ============================================================
    [观测] planned → running (start)
  [running] 任务开始

    [观测] running → paused (pause_for_approval)
  [paused] 等待人工审批

    [观测] paused → running (approval_granted)
  [running] 审批通过,继续执行

    [观测] running → retrying (transient_error)
  [retrying] 暂时性错误,重试 0/3

    [观测] retrying → running (retry)
  [running] 重试中...

    [观测] running → done (complete)
  [done] 任务完成

  --- 测试非法转移 ---
  [守卫] 非法转移被拦截: 无效转移: done + start

  --- 转移历史 ---
    1. planned → running (start)
    2. running → paused (pause_for_approval)
    3. paused → running (approval_granted)
    4. running → retrying (transient_error)
    5. retrying → running (retry)
    6. running → done (complete)

  --- 摘要 ---
    task_id: task_a1b2c3d4
    state: done
    retry_count: 1
    transition_count: 6
    checkpoint_keys: []
    is_terminal: True

  ============================================================
  重启恢复演示
  ============================================================
  实例1: 任务执行完毕,状态 = done
  实例2: 恢复后状态 = done
  ✓ 恢复验证通过
    恢复的历史记录数: 2

  ============================================================
  幂等执行演示
  ============================================================
    → 外部调用 #1 (idempotency_key=idempotent_demo:step_refund)
    第一次: result_1 (外部调用次数: 1)
    第二次: result_1 (外部调用次数: 1)
  ✓ 幂等验证通过
  

生产化改造清单

这个骨架要在生产环境中使用,需要以下改造:

  1. PostgreSQL 替代 SQLite:多实例 Agent 需要支持并发写入。使用连接池(如 psycopg2 pool)、事务管理和主从复制。
  2. 数据库迁移:使用 Alembic 或类似工具管理 schema 版本。任务 checkpoint schema 会演进——需要迁移支持。
  3. 连接池和事务安全:每个 transition() 调用必须是原子的——如果持久化失败,转移必须回滚。当前 SQLite 实现使用简单 commit,生产环境需要 proper 事务管理。
  4. 并发控制:多个进程对同一任务的状态机操作需要乐观锁或分布式锁。最简单的方案:在 task_checkpoints 上使用 updated_at 进行乐观并发控制(compare-and-swap)。
  5. 水平扩展:状态存储成为瓶颈时,按 task_id 分片。每个分片独立的 PostgreSQL 实例。转移历史可以异步写入,checkpoint 数据必须同步写入。
  6. 安全加固:状态机操作需要认证和授权——不是任何人都能改变任务状态。集成你的 IAM 系统。
  7. 重试策略可配置化:将指数退避参数(base、max_wait、jitter)提取为配置。不同任务类型可能需要不同的重试策略。
  8. 恢复策略可配置化:不是所有残留 running 都应该进入 retrying——某些任务应该留在 running 等待心跳检查。恢复策略应根据任务类型配置。

局限性——诚实的说明

这个状态机做了三件事:

  1. 保证Agent 内部的状态转移是确定性和可恢复的。
  2. 防止 Agent 对同一操作重复发起调用(通过 checkpoint 和幂等键)。
  3. 为每个任务提供完整的审计轨迹。

不能做的事:

  1. 让 LLM 的输出变成确定性的——LLM 在 running 状态内仍然可能产生不同的推理结果。
  2. 保证外部系统的 exactly-once 效果——如果外部 API 不支持幂等键,重复调用仍可能产生重复副作用。
  3. 提供分布式事务——状态机内部状态和外部系统的状态不是原子性一致的。
  4. 替代 Temporal/LangGraph 等编排引擎——它们解决的是不同层面的问题(外部工作流编排 vs Agent 内部任务生命周期)。

一句话总结:状态机让你对 Agent 在做什么有确定性答案。它不会让你对 Agent 在想什么有确定性答案。前者是工程问题,后者是 AI 问题。

FAQ

状态机和普通工作流引擎(Temporal、Airflow)有什么区别?

状态机是 Agent 内部的执行逻辑——Agent 自己知道自己处于哪个任务状态、下一步能去哪里。Temporal 和 Airflow 是外部的编排系统——它们告诉 Agent 什么该做、什么时候做。两者互补:Agent 内部的状态机确保它不跳步骤、不重复执行;外部编排引擎确保 Agent 在正确的时机被调用。本文演示的是内置到 Agent 中的状态机模式,你可以根据需要再决定是否用 Temporal 做外层调度。

如果已经用了 LangGraph checkpoint,还需要显式状态机吗?

LangGraph 的 checkpoint 记录的是图遍历状态(当前在哪个节点、边条件如何),不是任务生命周期状态。如果你的任务需要区分"等待审批的暂停"和"依赖故障的阻塞"和"错误后的重试",你需要显式状态机。好消息是:LangGraph 的 checkpoint 是一个很好的持久化后端,你可以把本文的七状态机器建立在 LangGraph 的 checkpoint 存储之上。

状态机如何防止 Agent 重复执行同一个操作?

通过幂等键 + checkpoint 双重保护。在执行任何副作用操作前:(1) 检查 checkpoint 中是否已有该操作的幂等键且状态为 done——如果有,直接返回缓存结果,不执行;(2) 如果没有,先写入幂等键(状态为 executing),再执行操作,成功后改为 done。如果在 executing 状态时崩溃,恢复时看到 executing 就知道那次执行的结果不确定——需要查询外部系统确认,而不是盲目重试。参见本文 H2-5 详细讲解。

Agent 重启后,LLM 对话上下文丢失怎么办?

状态机不恢复完整的对话历史——它只恢复任务状态和关键 checkpoint 数据。Agent 重启后:(1) 从状态机读取"任务进行到哪一步了、之前的结果是什么";(2) 用这些信息构建一个新的上下文摘要(不是完整对话历史);(3) 将摘要注入 LLM 的新上下文窗口,让 LLM 从那个点继续推理。对话历史丢失了,但任务进度没有丢失。关于如何用 checkpoint 重建工作上下文,参见 Agent 上下文窗口管理 的跨窗口状态管理部分。

这个状态机适合哪些 Agent 场景?

适合任何需要可靠性的长流程 Agent:自动退款审批(需要人工审批暂停)、CI/CD 运维 Agent(需要安全门禁)、客服升级流程(需要经理审批)、合规审计 Agent(需要合规官确认)、多步骤数据 ETL Agent(需要重试去重)。不适合:一次性简单问答 Agent——如果 Agent 只做一轮工具调用就返回结果,状态机就是过度设计。一个实用判断标准:如果你的 Agent 任务超过 3 个步骤、涉及人工审批、或者需要在重启后恢复,状态机的复杂度远低于没有状态机时处理异常、去重和恢复的调试成本。

状态机会不会过度设计?什么时候才值得引入?

本文的完整 Python 骨架约 250 行核心代码(不包括注释和演示代码)。这个复杂度对应的是以下能力:任务状态在任何时刻可查询、人工审批不会跳过、重试不会重复执行、重启后可以恢复。如果你的 Agent 是对话式问答——一次调用,一个回答——状态机确实是过度设计。但如果你的 Agent 执行多步骤、有副作用、有人工审批、运行时间超过 LLM 的上下文窗口,那么没有状态机的调试成本(排查"为什么退了两遍"、修复"审批为什么没生效"、恢复"重启后该怎么办")远高于 250 行代码。结论:状态机是长流程 Agent 的最低可靠性层,不是可选的奢侈品。

继续阅读

本文是 Agent Memory and Context Engineering 系列中"确定性任务生命周期"的核心章节。建议按以下路径继续: