Agent 状态机设计:把不可控对话变成可恢复流程
30秒要点
- 核心问题:对话驱动的 Agent 在长流程中会跳过步骤、重复执行副作用、重启后丢失所有进度——因为"对话历史"不是"任务状态"。LLM 是概率性的,不加以约束就会在对话流中漂移。你需要一个确定性的任务生命周期状态机来包装 Agent 的每一次执行。
- 解决方案:七状态显式任务生命周期——planned → running → paused → blocked → retrying → done/failed。每个状态有明确的进入条件、允许的事件、合法的转移目标。无效转移被拒绝,不是尽力而为。
- 关键实现:AgentTaskStateMachine 核心类(~250 行 Python)——TaskState/TaskEvent 枚举、TRANSITION_TABLE 转移表、InvalidTransitionError 守卫、SQLite 持久化、checkpoint/幂等键、恢复策略。含完整生命周期演示和重启恢复验证。
- 读完能做什么:为你的 Agent 实现一个确定性的任务生命周期包装器——任务在任何时刻都知道自己是谁、处于什么状态、被什么事件驱动、下一步能去哪里、重启后如何恢复。告别"Agent 跑着跑着就不见了"的生产噩梦。
1. 为什么生产级 Agent 需要显式状态机
凌晨两点,一个自动退款审批 Agent 在处理一笔退款。流程很简单:校验订单 → 检查退款策略 → 请求主管审批 → 调用支付网关退款 → 发送通知。Agent 在第 3 步发出审批请求后,支付网关超时了。Agent 的对话历史里写着"审批已通过,继续执行",于是它重试——但这次它跳过了审批校验,直接从第 3 步的结果缓存中读到了"通过",执行了第 4 步退款。三秒后,主管拒绝了这笔退款。但钱已经退了。
同一天下午,同一个 Agent 在处理另一笔退款时,服务器因为内存不足被 OOM Killer 杀掉了。Agent 进程重启,对话历史清空——LLM 上下文窗口里什么都没有。Agent 从头开始:校验订单、检查策略、请求审批……但它不知道这笔退款已经退过了。它又退了一次。
这就是对话驱动 Agent 的三个根本性失败模式:
- 上下文漂移导致跳步骤(Context Drift → Step Skip):Agent 的"状态"隐含在对话历史中。当对话历史变长、被压缩、被截断或 LLM 注意力稀释时,Agent 可能跳过关键步骤——"审批已通过"和"审批已请求但尚未回复"在对话文本中可能看起来很像,LLM 分不清。
- 重试导致重复副作用(Retry → Duplicate Side Effects):Agent 遇到网络超时重试,但它不知道上一次调用实际上已经成功了(响应在网络中丢失)。没有状态记录就没有去重依据。
- 重启丢失全部进度(Restart → Total Progress Loss):Agent 进程重启后,对话历史不在持久存储中。Agent 从零开始,不知道已经完成了哪些步骤。
关键洞察:LLM 是概率性的——同样的输入可能产生不同的输出,同样的对话历史可能被理解为不同的"当前状态"。你需要一个确定性的外壳来包裹 LLM 的执行。这个外壳就是状态机——它不依赖 LLM 来判断"我现在该做什么",而是通过显式的状态转移规则来裁定。LLM 在 running 状态内自由推理,但它不能改变任务状态。状态转移发生在 LLM 之外,由结构化事件(工具结果、审批回调、错误信号)驱动,而非由 LLM 生成的文本驱动。
对话驱动 vs 状态机驱动:两种 Agent 架构对比
理解这个区别不需要深入框架源码。看一个简单的对比:
对话驱动 Agent(状态隐含在聊天历史中):
┌─────────────────────────────────────────────┐
│ User: 帮我处理退款 #12345 │
│ Agent: 好的,我先校验订单… │
│ Agent: 订单有效,金额 ¥150.00 │
│ Agent: 需要主管审批,已发送请求 │
│ Agent: [等待中…聊天历史说"已发送请求"] │
│ --- 聊天历史被截断/压缩/LLM 理解偏差 --- │
│ Agent: 审批已通过!执行退款… ← 错了! │
└─────────────────────────────────────────────┘
状态机驱动 Agent(状态显式存储和校验):
┌─────────────────────────────────────────────┐
│ task.state = PLANNED │
│ → START → task.state = RUNNING │
│ → PAUSE_FOR_APPROVAL → task.state = PAUSED│
│ → [等待审批回调…] │
│ → 审批回调到达 → 校验: PAUSED + │
│ APPROVAL_GRANTED → 合法 → task.state = │
│ RUNNING │
│ → COMPLETE → task.state = DONE │
│ │
│ 任何时刻:task.state 精确告诉你任务在哪。 │
│ 上下文丢了?从持久存储恢复 task.state。 │
│ 进程重启?task.state 还在 SQLite 里。 │
└─────────────────────────────────────────────┘
核心区别:对话驱动 Agent 的"状态"是 200 条聊天消息中的某个语义位置;状态机驱动 Agent 的状态是数据库中的一个确定值。前者依赖 LLM 从对话中推断;后者是一个不可变的、可验证的事实。
关于为什么上下文窗口不可靠,参见 Agent 上下文窗口管理——上下文衰减是物理规律,不是 bug。状态机正是在上下文消失后依然能恢复任务进度的机制。
2. 从对话流到任务生命周期:状态、事件与转移
在深入具体状态设计之前,先建立三个核心概念的精确定义。这三个概念构成了状态机的全部语义:
- 状态(State)
- 任务在某一时刻的确定位置。状态回答:"这个任务现在在哪里?"一个任务在任何时刻有且只有一个当前状态。状态是显式的、持久的、可查询的。
- 事件(Event)
- 触发状态变更的外部或内部信号。事件回答:"发生了什么,需要任务做出反应?"事件可以是工具调用的结果、人工审批的回调、超时信号、错误码——但不包括 LLM 的文本输出。事件是结构化的、命名的、可审计的。
- 转移(Transition)
- 从状态 A 到状态 B 的有向边,由事件触发。转移回答:"给定当前状态和发生的事件,任务应该去哪个状态?"转移规则被编码为转移表,每次转移前强制校验。不允许的转移会抛出错误,而不是静默降级。
转移表:状态机的核心数据结构
状态机的全部行为可以压缩为一张转移表——一个从 (当前状态, 事件) 到 目标状态 的映射。这张表既是文档(一眼看清所有合法路径),也是执行规则(每次转移前查表校验)。
TRANSITION_TABLE = {
(PLANNED, START): RUNNING,
(RUNNING, PAUSE_FOR_APPROVAL): PAUSED,
(RUNNING, BLOCK_ON_DEPENDENCY): BLOCKED,
(RUNNING, COMPLETE): DONE,
(RUNNING, FATAL_ERROR): FAILED,
(RUNNING, TRANSIENT_ERROR): RETRYING,
(PAUSED, APPROVAL_GRANTED): RUNNING,
(PAUSED, APPROVAL_DENIED): FAILED,
(PAUSED, TIMEOUT): FAILED,
(BLOCKED, DEPENDENCY_RESOLVED): RUNNING,
(BLOCKED, FATAL_ERROR): FAILED,
(RETRYING, RETRY): RUNNING,
(RETRYING, MAX_RETRIES_EXCEEDED): FAILED,
(RETRYING, FATAL_ERROR): FAILED,
# DONE 和 FAILED 是终态,没有任何出边
}
注意这张表里没有什么:
- 没有
(PAUSED, COMPLETE)——你不能跳过审批直接完成任务。 - 没有
(FAILED, RETRY)——失败是终态,不能直接重试。必须从running进入retrying,而不是从failed重试。 - 没有
(DONE, 任何事件)——终态不接受任何事件。
这些"缺失的转移"不是 bug——它们是设计约束。转移表通过"不列出的即非法"来实现流程控制,比代码中的 if-else 检查更清晰、更不容易被绕过。
状态图:七状态的一目了然
┌─────────┐
│ PLANNED │
└────┬────┘
│ START
▼
┌──────────────┴──────────────────┐
│ RUNNING │◄──────────────────────┐
└──┬──────┬──────┬──────┬────────┘ │
│ │ │ │ │
PAUSE_FOR │ │ │ │ COMPLETE │
_APPROVAL │ │ │ │ │
▼ ▼ ▼ ▼ │
┌────────┐ ┌───────┐┌─────────┐┌──────┐ │
│ PAUSED │ │BLOCKED││RETRYING ││ DONE │ │
└───┬──┬─┘ └───┬───┘└────┬────┘└──────┘ │
│ │ │ │ │
APPROVAL │ │TIMEOUT │DEPENDENCY │ RETRY │
_GRANTED │ │ │_RESOLVED └──────────────────────────────┘
│ │ │
│ ▼ ▼
│ ┌──────────┐
│ │ FAILED │◄── MAX_RETRIES_EXCEEDED, FATAL_ERROR,
│ └──────────┘ APPROVAL_DENIED
│
└──────────────────────────────────────────────────────► RUNNING
这张图展示了全部 14 条合法转移。注意几个关键结构特征:
- RUNNING 是枢纽状态:六条边从它发出,三条边指向它。几乎所有工作都在
running中进行;其他状态是"等待条件"的暂停态。 - PAUSED 和 BLOCKED 是 RUNNING 的两种"暂停":两者都等待外部条件满足后回到
running。区别在于暂停的原因和恢复的条件。 - FAILED 是汇聚终态:五条边指向
failed——审批拒绝、依赖失败、重试耗尽、致命错误、阻塞超时。所有不可恢复的路径都汇聚于此。 - RETRYING → RUNNING 形成一个循环:只要还有重试次数,错误就在这个回路内消化,不泄露到终态。
无效转移是错误,不是降级行为
状态机最重要的设计决策之一:任何不在转移表中的转移尝试,必须抛出异常,不得静默忽略或降级处理。
def transition(self, event: TaskEvent, metadata=None) -> TaskState:
key = (self._state, event)
next_state = TRANSITION_TABLE.get(key)
if next_state is None:
raise InvalidTransitionError(
f"无效转移: {self._state.value} + {event.value}"
)
# ... 执行转移 ...
为什么必须是异常而不是 warn 日志?因为调用 transition() 的代码以为状态已经变了——如果静默忽略,调用方会继续基于错误的状态假设执行后续逻辑。异常迫使调用方处理这个情况:要么修 bug(为什么会出现非法事件?),要么处理边界(比如并发竞争导致的事件乱序)。
关于为什么结构化事件的类型和版本管理很重要,参见 Agent 消息 Schema 设计——状态转移事件和 Agent 消息共享相同的类型安全、版本演进和校验原则。
3. 核心状态设计:planned、running、paused、blocked、retrying、done、failed
七个状态覆盖了 Agent 任务从创建到终结的完整生命周期。每个状态有明确的语义、进入条件、允许的操作和检测方式。
planned(已规划)
含义:任务已创建,前置条件尚待验证。这是任务的初始状态。在这个状态下,Agent 不执行任何副作用——它只是在准备。可以校验参数、分配资源、检查依赖服务是否可达。
允许的事件:仅 START。任何其他事件(包括 COMPLETE、FATAL_ERROR)在 planned 状态下都是非法的。
为什么需要这个状态:区分"任务创建了"和"任务开始执行了"。在分布式系统中,任务可能被创建但从未被调度(调度器挂了、队列积压了)。planned 让你能监控"有多少任务在排队但还没开始"——这是容量规划的关键指标。
running(执行中)
含义:Agent 正在主动执行任务步骤。这是 LLM 唯一可以自由推理的状态。在这个状态内,Agent 调用工具、分析结果、生成中间输出——但它不能改变自己的任务状态。状态转移由结构化事件触发(工具返回特定结果、外部审批回调到达、错误信号产生),不是由 LLM 文本输出触发。
允许的事件:PAUSE_FOR_APPROVAL、BLOCK_ON_DEPENDENCY、COMPLETE、FATAL_ERROR、TRANSIENT_ERROR。从 running 出发有五条边,是所有状态中最多的。
关键约束:LLM 在 running 内可以产生任何输出——但它不能说"任务完成了"。只有 COMPLETE 事件(由工具调用或评估逻辑产生)才能触发到 done 的转移。这是状态机"确定性外壳"的核心体现。
paused(已暂停)
含义:Agent 正在等待外部输入——通常是人工审批、人工反馈或安全审查。这是自愿的等待。Agent 可以继续运行(监控超时、发送提醒),但不能推进任务本身。
允许的事件:APPROVAL_GRANTED(回到 running)、APPROVAL_DENIED(进入 failed)、TIMEOUT(进入 failed)。
与 blocked 的关键区别:paused 是主动等待——"我需要人的决定"。blocked 是被动受阻——"我需要的服务不可用"。前者是流程设计的正常环节;后者是异常情况。
blocked(已阻塞)
含义:Agent 无法继续,因为遇到了不可解决的依赖问题——依赖的 API 返回 503、需要的文件不存在、权限不足无法访问资源。这是非自愿的阻滞。
允许的事件:DEPENDENCY_RESOLVED(回到 running)、FATAL_ERROR(进入 failed)。
与 paused 的关键区别:blocked 不会超时直接进入 failed——依赖恢复的时间不可预测。但可以通过外部监控告警:"这个任务 blocked 了 4 小时了,需要人工介入"。paused 则有明确的超时策略——审批不能无限等待。
retrying(重试中)
含义:Agent 遇到了暂时性错误,正在重新尝试。这是 running 和 failed 之间的缓冲状态。
允许的事件:RETRY(回到 running)、MAX_RETRIES_EXCEEDED(进入 failed)、FATAL_ERROR(进入 failed)。
与 running 的关键区别:retrying 意味着上一次尝试失败了。这个区别很重要:(1) 重试需要幂等性保证——不能重复执行已成功的步骤;(2) 重试有次数上限——无限重试等于死循环;(3) 监控需要区分"正常运行"和"正在重试"——这是完全不同的运维语义。
done(已完成)—— 终态
含义:任务成功完成。所有步骤已执行,所有副作用已提交,所有通知已发送。
允许的事件:无。任何事件在 done 状态下都是非法的。终态不可逆转。
failed(已失败)—— 终态
含义:任务无法完成。可能是审批被拒绝、重试耗尽、致命错误、依赖永久失败或暂停超时。
允许的事件:无。和 done 一样是不可逆终态。
但注意:failed 不等于"数据丢失"或"无法追溯"。失败任务的状态历史、checkpoint 数据、错误信息全部保留在持久存储中。操作者可以从失败任务中提取已完成步骤的结果,手动处理剩余部分,甚至创建一个新的任务实例从 checkpoint 继续——但新实例是一个新的 planned 任务,不是旧实例的状态转移。
状态计数:一个任务的生命周期统计
理解这七个状态的另一种方式是看一个典型任务的"状态停留":
状态 停留时间 说明
─────────────────────────────────────────────
planned 几毫秒~几秒 验证前置条件
running 几秒~几分钟 Agent 执行任务步骤
paused 几分钟~几小时 等待人类审批
blocked 几分钟~几小时 等待依赖恢复
retrying 几秒~几分钟 退避重试窗口
done 永久 终态
failed 永久 终态
注意 running 的实际时间占比可能远小于你预期——在生产环境中,Agent 大量时间花在等待(paused/blocked)而非计算(running)。这意味着状态机的主要价值不是控制 running 内的行为,而是管理等待期间的语义和恢复。
4. 暂停与继续:把人工审批、人类反馈和安全门禁放进状态机
大多数 Agent 框架把人工审批当作一个特殊的工具调用或对话消息。"Agent 发了一条消息说'请审批',然后继续执行。"这是危险的——Agent 可能在人类响应之前就基于对话上下文中的"审批通过"跳过等待。
状态机把人工审批提升为一等状态:paused。Agent 进入 paused 后,除非收到明确的 APPROVAL_GRANTED 或 APPROVAL_DENIED 事件,否则物理上无法进入 running。LLM 无法通过生成"看起来像审批通过"的文本来绕过——因为状态转移不经过 LLM。
审批流程的三条路径
路径 A: 审批通过(正常流程)
RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_GRANTED → RUNNING
路径 B: 审批拒绝(终止流程)
RUNNING → PAUSE_FOR_APPROVAL → PAUSED → APPROVAL_DENIED → FAILED
路径 C: 审批超时(保护性终止)
RUNNING → PAUSE_FOR_APPROVAL → PAUSED → TIMEOUT → FAILED
每条路径都不可绕过。Agent 不能从 paused 直接跳到 done;不能在 paused 状态中"继续工作"(在 paused 中你是无法执行任务步骤的——你只能等待事件)。
超时处理:审批不能无限等待
paused 状态有明确的超时策略。典型配置:
审批超时策略:
初次等待: 30 分钟
超时提醒: 15 分钟时发送提醒(Slack / 邮件 / 企业微信)
超时动作: 标记为 FAILED,原因: "approval_timeout"
宽限期: 超时后 5 分钟内如果审批到达,可人工从 FAILED 中恢复
(创建新任务实例,从 checkpoint 继续)
注意这里不自动重试——审批是人做的决定,不是网络超时。人的决策不会因为等待时间变长而自动变成"同意"。超时后终止是安全的默认行为。
安全门禁:不止是审批
paused 状态不仅用于人工审批。任何需要"在执行副作用之前暂停并验证"的场景都可以用:
- 安全门禁:Agent 即将执行一个高危操作(删除数据库表、修改生产配置、发送批量通知)。在
running→paused→ 安全审查 →running的流程中,安全团队可以在paused状态下审查参数。 - 合规确认:在受监管行业中,某些操作需要合规官的明确批准。Agent 在
paused状态下等待合规确认,审批通过后才能继续。 - 外部系统回调:Agent 触发了一个异步的外部流程(比如提交了一个工单),需要等待外部系统完成后的回调。这不是"人工审批",但语义相同:暂停 → 等待外部事件 → 继续。
与外部审批系统的集成模式
状态机不绑定特定的审批 UI。它只定义事件接口。以下是集成模式:
# Webhook 接收审批回调
@app.route("/approval-callback", methods=["POST"])
def handle_approval():
data = request.json
task_id = data["task_id"]
decision = data["decision"] # "approved" 或 "denied"
sm = load_state_machine(task_id) # 从持久存储恢复
event = APPROVAL_GRANTED if decision == "approved" else APPROVAL_DENIED
sm.transition(event, metadata={
"approver": data["approver"],
"comment": data.get("comment", ""),
"timestamp": data["timestamp"]
})
审批源可以是 Slack 按钮、Jira 状态变更、企业微信审批、自定义 Web 面板——只要它能发送一个带 task_id 和 decision 的 HTTP 请求,就能驱动状态机。
关于审批工作流的 UI/UX 设计模式和更完整的审批集成方案,参见 Agent 人工审批工作流——本文提供底层状态机,那篇文章提供上层交互设计。
重申:paused 不是失败。很多团队看到 Agent 停在 paused 就报警——这是对状态机语义的误解。paused 是正常的、设计的、预期内的等待状态。你的监控面板应该把 paused 和 blocked 分开显示,而不是统一归类为"非 running"。
5. 失败与重试:避免重复执行、跳步骤和无限循环
重试是分布式系统中最容易出错的操作——不是因为重试本身复杂,而是因为重试时的状态假设经常是错的。状态机的重试设计围绕三个保证:
- 去重(No Duplicate Effects):重试不能重复执行已经成功的步骤。
- 不跳步(No Step Skip):重试不能越过未完成的步骤直接跳到后面。
- 有界重试(Bounded Retries):重试有次数上限和退避策略,不会无限循环。
重试状态机:一个完整的重试生命周期
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=0)
RETRYING → [退避等待 2^0 = 1秒] → RETRY → RUNNING (retry_count=1)
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=1)
RETRYING → [退避等待 2^1 = 2秒] → RETRY → RUNNING (retry_count=2)
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=2)
RETRYING → [退避等待 2^2 = 4秒] → RETRY → RUNNING (retry_count=3)
RUNNING → TRANSIENT_ERROR → RETRYING (retry_count=3)
RETRYING → MAX_RETRIES_EXCEEDED → FAILED
注意关键设计:
retry_count在RETRYING状态中递增(在RETRY事件处理时 +1),而不是在TRANSIENT_ERROR时递增。这保证了retry_count只在真正尝试重试时才增加。- 每次
RETRY事件处理前,检查retry_count <= max_retries。超过上限则必须使用MAX_RETRIES_EXCEEDED事件而非RETRY事件——这两个事件不可混用。 - 退避策略在转移表之外实现——状态机负责"能不能重试",调用方负责"什么时候重试"。
重试退避策略
import time
def retry_with_backoff(sm: AgentTaskStateMachine, max_retries=3):
"""执行重试循环,带指数退避。"""
while sm.state == TaskState.RETRYING:
backoff = 2 ** sm.retry_count # 指数退避: 2, 4, 8...
print(f"[retry] 退避等待 {backoff}s (第 {sm.retry_count} 次重试)")
time.sleep(backoff)
try:
sm.transition(TaskEvent.RETRY)
return # 回到 RUNNING,调用方继续执行
except InvalidTransitionError:
# 超过最大重试次数,使用 MAX_RETRIES_EXCEEDED
sm.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
raise
幂等性:状态机只能防止重复触发,不能保证外部幂等
这是本文最重要的诚实声明之一:
状态机保证 Agent 不会对同一个步骤发起两次调用。但它不能保证外部系统不会处理两次——如果第一次调用的响应在网络中丢失,而第二次调用恰好被外部系统当作新请求处理。
为了实现更强的幂等性,需要两层的配合:
- Agent 侧(状态机层):在执行副作用前写入幂等键 checkpoint,执行前检查该键是否已存在。
- 外部系统侧:外部 API 支持幂等键(如 Stripe 的
Idempotency-Keyheader),在服务端去重。
# Agent 侧幂等键模式
def execute_with_idempotency(sm, step_name, action):
idempotency_key = f"{sm.task_id}:{step_name}"
# 检查是否已执行过
if sm.get_checkpoint(idempotency_key) == "done":
print(f"[idempotency] 步骤 '{step_name}' 已完成,跳过")
return sm.get_checkpoint(f"{idempotency_key}:result")
# 标记为"执行中"(防止并发重复)
sm.set_checkpoint(idempotency_key, "executing")
try:
result = action(idempotency_key) # 将幂等键传递给外部系统
sm.set_checkpoint(idempotency_key, "done")
sm.set_checkpoint(f"{idempotency_key}:result", result)
return result
except Exception as e:
# 执行失败,不标记为 done,下次重试会重新执行
sm.set_checkpoint(idempotency_key, "failed")
raise
关键模式:先标记 executing,再执行操作,成功后标记 done。如果执行中崩溃,下次恢复时看到 executing 就知道那次执行的结果不确定——需要查询外部系统确认,而不是盲目重试或盲目跳过。
可重试 vs 不可重试的错误
| 错误类型 | 示例 | 重试? | 处理 |
|---|---|---|---|
| 暂时性网络错误 | 连接超时、DNS 解析失败 | ✓ 是 | TRANSIENT_ERROR → RETRYING |
| 服务暂时不可用 | 503 Service Unavailable、429 Rate Limited | ✓ 是 | TRANSIENT_ERROR → RETRYING(注意退避) |
| 响应丢失 | 请求成功但响应在网络中丢失 | ✓ 谨慎 | 用幂等键查询外部系统状态再决定 |
| 参数校验失败 | 422 Unprocessable Entity | ✗ 否 | FATAL_ERROR → FAILED(重试不会改变结果) |
| 权限不足 | 403 Forbidden | ✗ 否 | FATAL_ERROR → FAILED |
| 数据损坏 | 必填字段为空、格式错误 | ✗ 否 | FATAL_ERROR → FAILED |
核心判断标准:如果重试时所有输入不变,结果会改变吗?如果答案不变(同一请求永远返回 422),则不应重试。如果答案可能改变(网络恢复、服务恢复),则可以重试。
6. 状态存储与恢复:让 Agent 在重启、断线和上下文清空后继续工作
这是状态机最核心的价值——任务状态是持久的。Agent 进程可以被杀、服务器可以重启、LLM 上下文可以清空,但任务状态还在。
什么必须持久化
一个完整的状态存储方案需要持久化以下数据:
┌─────────────────────────────────────────────────┐
│ task_checkpoints 表 │
├──────────────┬──────────────────────────────────┤
│ task_id │ 任务唯一标识 │
│ state │ 当前状态 (planned/running/...) │
│ retry_count │ 当前重试次数 │
│ checkpoint │ JSON: {步骤结果, 中间数据, 幂等键} │
│ updated_at │ 最后更新时间 │
└──────────────┴──────────────────────────────────┘
┌─────────────────────────────────────────────────┐
│ state_transitions 表 │
├──────────────┬──────────────────────────────────┤
│ id │ 自增主键 │
│ task_id │ 任务唯一标识 │
│ from_state │ 转移前状态 │
│ to_state │ 转移后状态 │
│ event │ 触发事件 │
│ timestamp │ ISO 8601 时间戳 │
│ metadata │ JSON: {错误信息, 审批人, ...} │
└──────────────┴──────────────────────────────────┘
注意状态转移表是追加写的——每行是一个转移事件,从不更新或删除。这意味着你拥有完整的审计轨迹:在任何时刻,你可以回溯一个任务从 planned 到 done/failed 的每一步。
存储后端选择
| 后端 | 适用场景 | 注意事项 |
|---|---|---|
| SQLite | 单机 Agent、Demo、本地工具 | 简单、零依赖、事务安全。不适合并发写入——多 Agent 实例共享一个 SQLite 文件会导致锁竞争。生产环境使用 WAL 模式。 |
| PostgreSQL | 生产 Agent、多实例部署 | 支持并发、连接池、主从复制。任务状态存储是关键基础设施——如果状态库挂了,所有 Agent 都无法转移状态(fail closed,安全)。 |
| Redis + AOF | 低延迟场景 | 内存快,但持久化弱于 PostgreSQL。如果使用 Redis,必须开启 AOF 持久化(appendonly yes),并接受重启时可能丢失最后几秒的转移记录。 |
| 文件 JSON | 原型、Demo、单文件工具 | 最简单但不适合生产——无事务、无并发控制、无恢复保证。 |
恢复策略:处理四种残留状态
Agent 重启后,从持久存储加载任务状态,然后根据当前状态执行恢复策略:
def recover(self):
saved = self.persistence.load_checkpoint(self.task_id)
if saved:
self._state = TaskState(saved["state"])
self._retry_count = saved.get("retry_count", 0)
self._checkpoint = saved.get("checkpoint_data", {})
self._history = self.persistence.load_history(self.task_id)
# 处理残留状态
if self._state == TaskState.RUNNING:
# 进程在 RUNNING 时崩溃了 → 标记为需要重试
self.transition(TaskEvent.TRANSIENT_ERROR,
metadata={"reason": "recovery_stale_running"})
elif self._state == TaskState.RETRYING:
# 进程在 RETRYING 时崩溃了 → 检查重试次数后重试
if self._retry_count < self._max_retries:
self.transition(TaskEvent.RETRY)
else:
self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
elif self._state == TaskState.PAUSED:
# 检查审批是否超时
paused_at = self._get_last_transition_time()
if time_since(paused_at) > APPROVAL_TIMEOUT:
self.transition(TaskEvent.TIMEOUT,
metadata={"reason": "recovery_approval_timeout"})
elif self._state == TaskState.BLOCKED:
# 检查依赖是否恢复(重新探测依赖服务)
if self._check_dependency():
self.transition(TaskEvent.DEPENDENCY_RESOLVED)
# 依赖仍然不可用 → 保持 BLOCKED,等待外部监控告警
return self
恢复策略的默认行为应偏向安全:
- 残留
running→ 默认进入重试。这是最安全的默认——因为 Agent 在running中崩溃,你不知道它执行了多少。重试时幂等键会防止重复执行。 - 残留
paused→ 检查超时。如果在超时窗口内,留在paused继续等待。超时则进入failed。 - 残留
retrying→ 检查重试次数。未耗尽则继续重试,已耗尽则进入failed。 - 不做的事:不自动从
failed或done恢复。终态就是终态。
任务状态恢复 vs LLM 上下文恢复:两件不同的事
关键区分:状态机恢复的是任务级别的状态——任务进行到哪一步了、检查点数据是什么、哪些步骤已完成。它不恢复 LLM 的对话历史。Agent 重启后,LLM 面对一个空的上下文窗口。Agent 从状态机读取任务状态和 checkpoint 数据,构建一个新的上下文摘要("你正在执行任务 X,已完成步骤 1-3,步骤 3 的结果是 Y,现在需要执行步骤 4"),然后让 LLM 从那个点继续推理。对话历史丢失了,但任务进度没有丢失。
关于上下文窗口的恢复策略,参见 Agent 上下文窗口管理——跨窗口状态管理部分详细讨论了上下文重置后如何用 checkpoint 重建 LLM 的工作上下文。关于记忆系统的持久化架构(L0-L3 各层如何存储),参见 Agent 记忆系统设计——任务 checkpoint 和语义记忆是不同的存储层次。
7. 可观测状态机:日志、指标、审计轨迹与告警
状态机不仅是一个执行模型——它本身就是一个可观测性原语。每一次状态转移都是一条不可变的事件记录,天然适合构建审计轨迹、指标面板和告警规则。
转移事件:观测的基本单位
每次状态转移产生一条结构化事件:
{
"task_id": "task_a1b2c3d4",
"from_state": "running",
"to_state": "paused",
"event": "pause_for_approval",
"timestamp": "2026-06-05T14:22:31.123456+00:00",
"metadata": {
"step": "refund_approval",
"amount": 150.00,
"requested_by": "agent_refund_processor"
}
}
这条事件记录了什么、为什么、何时发生。所有转移事件按时间排序就是任务的完整审计轨迹——不需要额外的"审计日志系统",状态转移事件就是审计日志。
可观测性钩子:在每次转移时触发
# on_transition 钩子:每次状态转移后调用
def on_transition(transition: StateTransition):
# 1. 结构化日志
logger.info(json.dumps({
"event": "state_transition",
"task_id": transition.task_id,
"from": transition.from_state.value,
"to": transition.to_state.value,
"trigger": transition.event.value,
"timestamp": transition.timestamp,
"metadata": transition.metadata
}))
# 2. 指标埋点
metrics.increment(f"transition.{transition.to_state.value}")
metrics.gauge(f"task.{transition.task_id}.state",
transition.to_state.value)
# 3. 告警检查
if transition.to_state == TaskState.RETRYING:
retry_count = transition.metadata.get("retry_count", 0)
if retry_count >= 3:
alerts.send("high_retry_count",
f"Task {transition.task_id} retried {retry_count} times")
关于结构化事件 schema 的类型和版本管理,参见 Agent 消息 Schema 设计——状态转移事件应该像 Agent 消息一样有 typed、versioned、validated 的 schema。
核心指标
以下是应该从状态转移流中计算的关键指标:
| 指标 | 计算方式 | 用途 |
|---|---|---|
| time_in_state | 当前时间 - 进入当前状态的时间戳 | SLA 监控:任务在 running 中多长时间了?在 paused 中等了多久? |
| transition_counts | 按事件类型聚合计数 | 异常检测:重试次数突增?审批拒绝率异常? |
| state_distribution | 各状态下的任务数量 | 容量规划:多少任务在排队(planned)?多少在等待(paused/blocked)? |
| retry_rate | retrying 转移数 / 总转移数 | 健康评估:重试率升高说明依赖服务不稳定 |
| mean_time_to_recovery | 从 paused/blocked/retrying 回到 running 的平均时间 | 恢复效率:审批响应时间、依赖恢复时间、重试退避窗口 |
| invalid_transition_attempts | InvalidTransitionError 抛出次数 | Bug 检测:事件乱序、并发竞争、调用方逻辑错误 |
告警规则
基于状态机指标的告警应该检测以下条件:
告警规则:
1. task_in_running_too_long:
条件: time_in_state(RUNNING) > 30min
动作: 通知 oncall,附 task_id 和最后 checkpoint
含义: Agent 可能卡住了(死循环、等待无响应的工具调用)
2. task_retry_flapping:
条件: retry_count >= 3 且持续在 RETRYING ↔ RUNNING 间摆动
动作: 告警升级,人工介入
含义: 重试退避太短,或依赖问题不是暂时的
3. task_paused_abandoned:
条件: time_in_state(PAUSED) > 4h
动作: 通知审批人 + oncall
含义: 审批可能被遗忘,需要提醒或升级
4. task_blocked_prolonged:
条件: time_in_state(BLOCKED) > 2h
动作: 通知 oncall
含义: 依赖服务长时间不可用,需要人工介入
5. too_many_failed_tasks:
条件: failed_count / total_tasks > 0.3(滑动窗口)
动作: 紧急告警
含义: 系统性问题——可能是依赖故障或配置错误
6. invalid_transition_spike:
条件: invalid_transition_attempts > 10/min
动作: 通知 oncall
含义: 调用方代码 bug 或并发竞争导致的事件乱序
关于更完整的 Agent 可观测性体系(指标收集、面板构建、告警管道),参见 Agent 可观察性——本文侧重状态转移产生的观测信号,那篇文章覆盖整个 Agent 的监控基础设施。关于审计日志的不可变性、保留策略和合规要求,参见 Agent 审计日志设计——状态转移历史就是一种审计日志的实现形式。
日志级别指南
日志级别:
INFO: 每次成功的状态转移
示例: "Task abc123: RUNNING → PAUSED (pause_for_approval)"
WARN: 非法转移尝试(被状态机拦截)
示例: "Task abc123: 拒绝转移 PAUSED + COMPLETE"
重要——这是 bug 或攻击的信号
ERROR: 状态持久化失败
示例: "Task abc123: 无法写入转移记录到数据库"
此时转移应失败(fail closed),任务停留在当前状态
DEBUG: 状态机内部细节(转移表查找、checkpoint 读写)
仅开发环境启用
8. 完整示例:一个可恢复 AgentTaskStateMachine 的 Python 骨架
以下是一个可运行的 Python 参考实现,展示了本文讨论的全部概念——状态枚举、转移表、无效转移守卫、SQLite 持久化、checkpoint、恢复策略和生命周期演示。这不是一个生产级库(缺少连接池、迁移、水平扩展等),而是一个生产模式骨架,你可以据此适配自己的 Agent。
完整代码
"""
AgentTaskStateMachine — 可恢复的 Agent 任务状态机参考实现
==========================================================
这是一个生产模式骨架,不是可直接部署的库。
缺少:连接池、数据库迁移、水平扩展、安全加固。
提供了:完整的七状态生命周期、转移校验、持久化、恢复。
依赖:Python 3.9+(仅标准库 + sqlite3)
"""
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Dict, List, Any
import json
import sqlite3
import time
import uuid
# ── 状态枚举 ──────────────────────────────────────────
class TaskState(str, Enum):
"""Agent 任务的显式生命周期状态。"""
PLANNED = "planned" # 任务已创建,前置条件尚未验证
RUNNING = "running" # Agent 正在执行任务步骤
PAUSED = "paused" # 等待人工审批或外部输入
BLOCKED = "blocked" # 无法继续——依赖不可用
RETRYING = "retrying" # 重试中(上一次尝试失败)
DONE = "done" # 终态:成功
FAILED = "failed" # 终态:不可恢复的失败
# ── 事件枚举 ──────────────────────────────────────────
class TaskEvent(str, Enum):
"""触发状态转移的事件。"""
START = "start"
PAUSE_FOR_APPROVAL = "pause_for_approval"
APPROVAL_GRANTED = "approval_granted"
APPROVAL_DENIED = "approval_denied"
BLOCK_ON_DEPENDENCY = "block_on_dependency"
DEPENDENCY_RESOLVED = "dependency_resolved"
TRANSIENT_ERROR = "transient_error"
RETRY = "retry"
MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
COMPLETE = "complete"
FATAL_ERROR = "fatal_error"
TIMEOUT = "timeout"
# ── 转移表 ────────────────────────────────────────────
# 格式: (当前状态, 事件) → 目标状态
# 不在表中的转移 = 非法 → 抛出 InvalidTransitionError
TRANSITION_TABLE: Dict[tuple, TaskState] = {
(TaskState.PLANNED, TaskEvent.START): TaskState.RUNNING,
(TaskState.RUNNING, TaskEvent.PAUSE_FOR_APPROVAL): TaskState.PAUSED,
(TaskState.RUNNING, TaskEvent.BLOCK_ON_DEPENDENCY): TaskState.BLOCKED,
(TaskState.RUNNING, TaskEvent.COMPLETE): TaskState.DONE,
(TaskState.RUNNING, TaskEvent.FATAL_ERROR): TaskState.FAILED,
(TaskState.RUNNING, TaskEvent.TRANSIENT_ERROR): TaskState.RETRYING,
(TaskState.PAUSED, TaskEvent.APPROVAL_GRANTED): TaskState.RUNNING,
(TaskState.PAUSED, TaskEvent.APPROVAL_DENIED): TaskState.FAILED,
(TaskState.PAUSED, TaskEvent.TIMEOUT): TaskState.FAILED,
(TaskState.BLOCKED, TaskEvent.DEPENDENCY_RESOLVED): TaskState.RUNNING,
(TaskState.BLOCKED, TaskEvent.FATAL_ERROR): TaskState.FAILED,
(TaskState.RETRYING, TaskEvent.RETRY): TaskState.RUNNING,
(TaskState.RETRYING, TaskEvent.MAX_RETRIES_EXCEEDED): TaskState.FAILED,
(TaskState.RETRYING, TaskEvent.FATAL_ERROR): TaskState.FAILED,
# DONE 和 FAILED 是终态:没有任何出边
}
# ── 数据类 ────────────────────────────────────────────
@dataclass
class StateTransition:
"""一次状态转移的不可变记录。"""
task_id: str
from_state: TaskState
to_state: TaskState
event: TaskEvent
timestamp: str
metadata: Dict[str, Any] = field(default_factory=dict)
class InvalidTransitionError(Exception):
"""当尝试的转移不在转移表中时抛出。"""
pass
# ── 状态持久化(SQLite) ──────────────────────────────
class StatePersistence:
"""基于 SQLite 的状态持久化。
生产环境注意事项:
- 多实例部署应使用 PostgreSQL
- SQLite 仅适用于单机 demo 或单进程 Agent
- 使用 WAL 模式减少锁竞争
"""
def __init__(self, db_path: str = "agent_state.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA journal_mode=WAL")
self._init_schema()
def _init_schema(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS task_checkpoints (
task_id TEXT PRIMARY KEY,
state TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
checkpoint_data TEXT DEFAULT '{}',
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS state_transitions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
from_state TEXT NOT NULL,
to_state TEXT NOT NULL,
event TEXT NOT NULL,
timestamp TEXT NOT NULL,
metadata TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_transitions_task
ON state_transitions(task_id, timestamp);
""")
self.conn.commit()
def save_transition(self, t: StateTransition) -> None:
self.conn.execute(
"INSERT INTO state_transitions "
"(task_id, from_state, to_state, event, timestamp, metadata) "
"VALUES (?, ?, ?, ?, ?, ?)",
(t.task_id, t.from_state.value, t.to_state.value,
t.event.value, t.timestamp, json.dumps(t.metadata))
)
self.conn.commit()
def save_checkpoint(self, task_id: str, state: TaskState,
retry_count: int, checkpoint_data: dict = None) -> None:
self.conn.execute(
"""INSERT OR REPLACE INTO task_checkpoints
(task_id, state, retry_count, checkpoint_data, updated_at)
VALUES (?, ?, ?, ?, ?)""",
(task_id, state.value, retry_count,
json.dumps(checkpoint_data or {}),
datetime.now(timezone.utc).isoformat())
)
self.conn.commit()
def load_checkpoint(self, task_id: str) -> Optional[dict]:
row = self.conn.execute(
"SELECT state, retry_count, checkpoint_data "
"FROM task_checkpoints WHERE task_id = ?",
(task_id,)
).fetchone()
if row:
return {
"state": row[0],
"retry_count": row[1],
"checkpoint_data": json.loads(row[2])
}
return None
def load_history(self, task_id: str) -> List[StateTransition]:
rows = self.conn.execute(
"SELECT task_id, from_state, to_state, event, timestamp, metadata "
"FROM state_transitions WHERE task_id = ? ORDER BY id",
(task_id,)
).fetchall()
return [
StateTransition(
task_id=r[0],
from_state=TaskState(r[1]),
to_state=TaskState(r[2]),
event=TaskEvent(r[3]),
timestamp=r[4],
metadata=json.loads(r[5])
) for r in rows
]
# ── 状态机核心类 ──────────────────────────────────────
class AgentTaskStateMachine:
"""
可恢复的 Agent 任务状态机。
保证:
- 每次状态转移经过转移表校验
- 每次转移被持久化(可审计、可恢复)
- 重启后可恢复任务状态
"""
def __init__(self, task_id: str, persistence: StatePersistence,
max_retries: int = 3):
self.task_id = task_id
self.persistence = persistence
self._state: TaskState = TaskState.PLANNED
self._history: List[StateTransition] = []
self._retry_count: int = 0
self._max_retries: int = max_retries
self._checkpoint: Dict[str, Any] = {}
self._on_transition_callbacks: List[callable] = []
# ── 属性 ──────────────────────────────────────────
@property
def state(self) -> TaskState:
return self._state
@property
def retry_count(self) -> int:
return self._retry_count
@property
def history(self) -> List[StateTransition]:
return list(self._history)
# ── 可观测性钩子 ──────────────────────────────────
def on_transition(self, callback: callable) -> None:
"""注册转移回调(日志、指标、告警)。"""
self._on_transition_callbacks.append(callback)
def _notify_observers(self, transition: StateTransition) -> None:
for cb in self._on_transition_callbacks:
try:
cb(transition)
except Exception:
pass # 观测失败不应影响转移本身
# ── 核心转移方法 ──────────────────────────────────
def transition(self, event: TaskEvent,
metadata: Dict[str, Any] = None) -> TaskState:
"""
尝试状态转移。不在转移表中的转移抛出 InvalidTransitionError。
"""
key = (self._state, event)
next_state = TRANSITION_TABLE.get(key)
if next_state is None:
raise InvalidTransitionError(
f"无效转移: {self._state.value} + {event.value}"
)
# 重试计数检查
if event == TaskEvent.RETRY:
self._retry_count += 1
if self._retry_count > self._max_retries:
raise InvalidTransitionError(
f"超过最大重试次数 ({self._max_retries})——"
f"请使用 MAX_RETRIES_EXCEEDED 事件"
)
# 构建转移记录
transition_record = StateTransition(
task_id=self.task_id,
from_state=self._state,
to_state=next_state,
event=event,
timestamp=datetime.now(timezone.utc).isoformat(),
metadata=metadata or {},
)
# 执行转移
self._history.append(transition_record)
self._state = next_state
# 持久化
self.persistence.save_transition(transition_record)
self.persistence.save_checkpoint(
self.task_id, self._state, self._retry_count,
self._checkpoint
)
# 通知观测者
self._notify_observers(transition_record)
return self._state
# ── Checkpoint 管理 ───────────────────────────────
def set_checkpoint(self, key: str, value: Any) -> None:
"""存储中间数据供恢复使用。"""
self._checkpoint[key] = value
def get_checkpoint(self, key: str) -> Optional[Any]:
"""读取 checkpoint 数据。"""
return self._checkpoint.get(key)
def has_checkpoint(self, key: str) -> bool:
"""检查 checkpoint 是否存在。"""
return key in self._checkpoint
# ── 幂等执行辅助 ──────────────────────────────────
def execute_idempotent(self, step_name: str,
action: callable) -> Any:
"""
幂等执行一个步骤。
action 应接收 idempotency_key 作为参数,并传递给外部系统。
如果步骤已完成(checkpoint 中存在),直接返回缓存结果。
"""
idempotency_key = f"{self.task_id}:{step_name}"
# 已完成?直接返回缓存结果
if self.get_checkpoint(idempotency_key) == "done":
return self.get_checkpoint(f"{idempotency_key}:result")
# 标记为执行中
self.set_checkpoint(idempotency_key, "executing")
try:
result = action(idempotency_key)
self.set_checkpoint(idempotency_key, "done")
self.set_checkpoint(f"{idempotency_key}:result", result)
return result
except Exception:
raise
# ── 恢复 ──────────────────────────────────────────
def recover(self) -> "AgentTaskStateMachine":
"""从持久存储恢复任务状态。"""
saved = self.persistence.load_checkpoint(self.task_id)
if saved:
self._state = TaskState(saved["state"])
self._retry_count = saved.get("retry_count", 0)
self._checkpoint = saved.get("checkpoint_data", {})
self._history = self.persistence.load_history(self.task_id)
# 处理残留状态
if self._state == TaskState.RUNNING:
self.transition(TaskEvent.TRANSIENT_ERROR,
metadata={"reason": "recovery_stale_running"})
elif self._state == TaskState.RETRYING:
if self._retry_count < self._max_retries:
self.transition(TaskEvent.RETRY)
else:
self.transition(TaskEvent.MAX_RETRIES_EXCEEDED)
elif self._state == TaskState.PAUSED:
last_t = self._history[-1] if self._history else None
if last_t and last_t.metadata.get("timeout_after"):
elapsed = (datetime.now(timezone.utc) -
datetime.fromisoformat(last_t.timestamp))
if elapsed.total_seconds() > last_t.metadata["timeout_after"]:
self.transition(TaskEvent.TIMEOUT,
metadata={"reason": "recovery_approval_timeout"})
elif self._state == TaskState.BLOCKED:
pass # 保持 blocked,依赖外部告警
return self
# ── 诊断方法 ──────────────────────────────────────
def summary(self) -> dict:
"""返回任务当前状态的摘要。"""
return {
"task_id": self.task_id,
"state": self._state.value,
"retry_count": self._retry_count,
"transition_count": len(self._history),
"checkpoint_keys": list(self._checkpoint.keys()),
"is_terminal": self._state in (TaskState.DONE, TaskState.FAILED),
}
# ── 使用演示 ──────────────────────────────────────────
def demo_full_lifecycle():
"""演示完整的任务生命周期,包括暂停、重试和恢复。"""
print("=" * 60)
print("AgentTaskStateMachine 生命周期演示")
print("=" * 60)
persistence = StatePersistence(":memory:")
task_id = f"task_{uuid.uuid4().hex[:8]}"
sm = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
sm.on_transition(lambda t: print(
f" [观测] {t.from_state.value} → {t.to_state.value} "
f"({t.event.value})"
))
# 1. planned → running
sm.transition(TaskEvent.START)
print(f"[{sm.state.value}] 任务开始\n")
# 2. running → paused(需要人工审批)
sm.transition(TaskEvent.PAUSE_FOR_APPROVAL,
metadata={"step": "refund_approval", "amount": 150.00})
print(f"[{sm.state.value}] 等待人工审批\n")
# 3. paused → running(审批通过)
sm.transition(TaskEvent.APPROVAL_GRANTED,
metadata={"approver": "[email protected]"})
print(f"[{sm.state.value}] 审批通过,继续执行\n")
# 4. running → retrying(暂时性错误)
sm.transition(TaskEvent.TRANSIENT_ERROR,
metadata={"error": "rate_limit", "step": "send_notification"})
print(f"[{sm.state.value}] 暂时性错误,"
f"重试 {sm.retry_count}/{sm._max_retries}\n")
# 5. retrying → running(重试)
sm.transition(TaskEvent.RETRY)
print(f"[{sm.state.value}] 重试中...\n")
# 6. running → done
sm.transition(TaskEvent.COMPLETE,
metadata={"result": "refund_processed"})
print(f"[{sm.state.value}] 任务完成\n")
# 7. 测试非法转移
print("--- 测试非法转移 ---")
try:
sm.transition(TaskEvent.START)
except InvalidTransitionError as e:
print(f"[守卫] 非法转移被拦截: {e}\n")
# 8. 查看转移历史
print("--- 转移历史 ---")
for i, t in enumerate(sm.history, 1):
print(f" {i}. {t.from_state.value} → {t.to_state.value} "
f"({t.event.value}) @ {t.timestamp[:19]}")
print(f"\n--- 摘要 ---")
for k, v in sm.summary().items():
print(f" {k}: {v}")
def demo_recovery():
"""演示重启恢复。"""
print("\n" + "=" * 60)
print("重启恢复演示")
print("=" * 60)
persistence = StatePersistence(":memory:")
task_id = "recovered_task_001"
sm1 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
sm1.transition(TaskEvent.START)
sm1.transition(TaskEvent.COMPLETE)
print(f"实例1: 任务执行完毕,状态 = {sm1.state.value}")
sm2 = AgentTaskStateMachine(task_id=task_id, persistence=persistence)
sm2.recover()
print(f"实例2: 恢复后状态 = {sm2.state.value}")
assert sm2.state == TaskState.DONE, "恢复失败!"
print("✓ 恢复验证通过")
print(f" 恢复的历史记录数: {len(sm2.history)}")
def demo_idempotent_execution():
"""演示幂等执行。"""
print("\n" + "=" * 60)
print("幂等执行演示")
print("=" * 60)
persistence = StatePersistence(":memory:")
sm = AgentTaskStateMachine(task_id="idempotent_demo", persistence=persistence)
sm.transition(TaskEvent.START)
call_count = [0]
def external_call(idempotency_key: str) -> str:
call_count[0] += 1
print(f" → 外部调用 #{call_count[0]} "
f"(idempotency_key={idempotency_key})")
return f"result_{call_count[0]}"
r1 = sm.execute_idempotent("step_refund", external_call)
print(f" 第一次: {r1} (外部调用次数: {call_count[0]})")
r2 = sm.execute_idempotent("step_refund", external_call)
print(f" 第二次: {r2} (外部调用次数: {call_count[0]})")
assert call_count[0] == 1, "重复执行了!应该只调用一次"
assert r1 == r2, "两次结果不一致!"
print("✓ 幂等验证通过")
if __name__ == "__main__":
demo_full_lifecycle()
demo_recovery()
demo_idempotent_execution()
运行输出
============================================================
AgentTaskStateMachine 生命周期演示
============================================================
[观测] planned → running (start)
[running] 任务开始
[观测] running → paused (pause_for_approval)
[paused] 等待人工审批
[观测] paused → running (approval_granted)
[running] 审批通过,继续执行
[观测] running → retrying (transient_error)
[retrying] 暂时性错误,重试 0/3
[观测] retrying → running (retry)
[running] 重试中...
[观测] running → done (complete)
[done] 任务完成
--- 测试非法转移 ---
[守卫] 非法转移被拦截: 无效转移: done + start
--- 转移历史 ---
1. planned → running (start)
2. running → paused (pause_for_approval)
3. paused → running (approval_granted)
4. running → retrying (transient_error)
5. retrying → running (retry)
6. running → done (complete)
--- 摘要 ---
task_id: task_a1b2c3d4
state: done
retry_count: 1
transition_count: 6
checkpoint_keys: []
is_terminal: True
============================================================
重启恢复演示
============================================================
实例1: 任务执行完毕,状态 = done
实例2: 恢复后状态 = done
✓ 恢复验证通过
恢复的历史记录数: 2
============================================================
幂等执行演示
============================================================
→ 外部调用 #1 (idempotency_key=idempotent_demo:step_refund)
第一次: result_1 (外部调用次数: 1)
第二次: result_1 (外部调用次数: 1)
✓ 幂等验证通过
生产化改造清单
这个骨架要在生产环境中使用,需要以下改造:
- PostgreSQL 替代 SQLite:多实例 Agent 需要支持并发写入。使用连接池(如
psycopg2pool)、事务管理和主从复制。 - 数据库迁移:使用 Alembic 或类似工具管理 schema 版本。任务 checkpoint schema 会演进——需要迁移支持。
- 连接池和事务安全:每个
transition()调用必须是原子的——如果持久化失败,转移必须回滚。当前 SQLite 实现使用简单 commit,生产环境需要 proper 事务管理。 - 并发控制:多个进程对同一任务的状态机操作需要乐观锁或分布式锁。最简单的方案:在
task_checkpoints上使用updated_at进行乐观并发控制(compare-and-swap)。 - 水平扩展:状态存储成为瓶颈时,按
task_id分片。每个分片独立的 PostgreSQL 实例。转移历史可以异步写入,checkpoint 数据必须同步写入。 - 安全加固:状态机操作需要认证和授权——不是任何人都能改变任务状态。集成你的 IAM 系统。
- 重试策略可配置化:将指数退避参数(base、max_wait、jitter)提取为配置。不同任务类型可能需要不同的重试策略。
- 恢复策略可配置化:不是所有残留
running都应该进入retrying——某些任务应该留在running等待心跳检查。恢复策略应根据任务类型配置。
局限性——诚实的说明
这个状态机做了三件事:
- 保证Agent 内部的状态转移是确定性和可恢复的。
- 防止 Agent 对同一操作重复发起调用(通过 checkpoint 和幂等键)。
- 为每个任务提供完整的审计轨迹。
它不能做的事:
- 让 LLM 的输出变成确定性的——LLM 在
running状态内仍然可能产生不同的推理结果。 - 保证外部系统的 exactly-once 效果——如果外部 API 不支持幂等键,重复调用仍可能产生重复副作用。
- 提供分布式事务——状态机内部状态和外部系统的状态不是原子性一致的。
- 替代 Temporal/LangGraph 等编排引擎——它们解决的是不同层面的问题(外部工作流编排 vs Agent 内部任务生命周期)。
一句话总结:状态机让你对 Agent 在做什么有确定性答案。它不会让你对 Agent 在想什么有确定性答案。前者是工程问题,后者是 AI 问题。
FAQ
状态机和普通工作流引擎(Temporal、Airflow)有什么区别?
状态机是 Agent 内部的执行逻辑——Agent 自己知道自己处于哪个任务状态、下一步能去哪里。Temporal 和 Airflow 是外部的编排系统——它们告诉 Agent 什么该做、什么时候做。两者互补:Agent 内部的状态机确保它不跳步骤、不重复执行;外部编排引擎确保 Agent 在正确的时机被调用。本文演示的是内置到 Agent 中的状态机模式,你可以根据需要再决定是否用 Temporal 做外层调度。
如果已经用了 LangGraph checkpoint,还需要显式状态机吗?
LangGraph 的 checkpoint 记录的是图遍历状态(当前在哪个节点、边条件如何),不是任务生命周期状态。如果你的任务需要区分"等待审批的暂停"和"依赖故障的阻塞"和"错误后的重试",你需要显式状态机。好消息是:LangGraph 的 checkpoint 是一个很好的持久化后端,你可以把本文的七状态机器建立在 LangGraph 的 checkpoint 存储之上。
状态机如何防止 Agent 重复执行同一个操作?
通过幂等键 + checkpoint 双重保护。在执行任何副作用操作前:(1) 检查 checkpoint 中是否已有该操作的幂等键且状态为 done——如果有,直接返回缓存结果,不执行;(2) 如果没有,先写入幂等键(状态为 executing),再执行操作,成功后改为 done。如果在 executing 状态时崩溃,恢复时看到 executing 就知道那次执行的结果不确定——需要查询外部系统确认,而不是盲目重试。参见本文 H2-5 详细讲解。
Agent 重启后,LLM 对话上下文丢失怎么办?
状态机不恢复完整的对话历史——它只恢复任务状态和关键 checkpoint 数据。Agent 重启后:(1) 从状态机读取"任务进行到哪一步了、之前的结果是什么";(2) 用这些信息构建一个新的上下文摘要(不是完整对话历史);(3) 将摘要注入 LLM 的新上下文窗口,让 LLM 从那个点继续推理。对话历史丢失了,但任务进度没有丢失。关于如何用 checkpoint 重建工作上下文,参见 Agent 上下文窗口管理 的跨窗口状态管理部分。
这个状态机适合哪些 Agent 场景?
适合任何需要可靠性的长流程 Agent:自动退款审批(需要人工审批暂停)、CI/CD 运维 Agent(需要安全门禁)、客服升级流程(需要经理审批)、合规审计 Agent(需要合规官确认)、多步骤数据 ETL Agent(需要重试去重)。不适合:一次性简单问答 Agent——如果 Agent 只做一轮工具调用就返回结果,状态机就是过度设计。一个实用判断标准:如果你的 Agent 任务超过 3 个步骤、涉及人工审批、或者需要在重启后恢复,状态机的复杂度远低于没有状态机时处理异常、去重和恢复的调试成本。
状态机会不会过度设计?什么时候才值得引入?
本文的完整 Python 骨架约 250 行核心代码(不包括注释和演示代码)。这个复杂度对应的是以下能力:任务状态在任何时刻可查询、人工审批不会跳过、重试不会重复执行、重启后可以恢复。如果你的 Agent 是对话式问答——一次调用,一个回答——状态机确实是过度设计。但如果你的 Agent 执行多步骤、有副作用、有人工审批、运行时间超过 LLM 的上下文窗口,那么没有状态机的调试成本(排查"为什么退了两遍"、修复"审批为什么没生效"、恢复"重启后该怎么办")远高于 250 行代码。结论:状态机是长流程 Agent 的最低可靠性层,不是可选的奢侈品。
继续阅读
本文是 Agent Memory and Context Engineering 系列中"确定性任务生命周期"的核心章节。建议按以下路径继续:
- Agent 上下文窗口管理 — 状态机解决"任务做到哪了";上下文管理解决"LLM 还记得什么"。两者是 Agent 可靠性的左右腿——状态机在上下文消失后依然知道进度,上下文管理在窗口内维护 LLM 需要的信息。(先读)
- Agent 记忆系统设计 — L0-L3 四层记忆架构。任务状态和 checkpoint 是 L1/L2 层的关键数据——理解每层存什么,才能决定状态机的 checkpoint 属于哪一层。
- Agent 人工审批工作流 — 本文提供底层
paused状态机;那篇文章提供上层审批 UI/UX 设计和交互模式。 - Agent 可观察性 — 本文的状态转移事件是可观测性的核心数据源。将转移流接入 Prometheus/Grafana,构建任务状态面板和告警。
- Agent 审计日志设计 — 转移历史就是审计日志。那篇文章讨论不可变性、保留策略和合规要求。
- Agent 消息 Schema 设计 — 状态转移事件的结构化 schema 应该遵循和 Agent 消息相同的类型安全、版本演进和校验原则。