Agent 人工审批流设计:什么时候暂停、请求确认和继续执行

30秒要点

  • 核心问题:生产环境 Agent 在无刹车机制下会自动执行危险操作(删库、发邮件、改配置),团队需要在工具层之上引入人工审批层。
  • 解决方案:四级风险分级(AUTO/LOW_RISK/HIGH_RISK/CRITICAL)+ 审批状态机 + ApprovalRequest Schema + 超时升级链。框架无关,可与任何 Agent 框架集成。
  • 关键实现:状态机驱动(IDLE→PROPOSING→WAITING→APPROVED/REJECTED/MODIFIED/EXPIRED)、幂等性保护、审批请求上下文富化。
  • 读完能做什么:为你的 Agent 系统设计一套可投产的人工审批流,知道什么时候该让 Agent 暂停、如何设计审批请求、如何处理超时和升级。

1. 为什么全自动是陷阱

2025 年某天凌晨 2 点,一个运维 Agent 执行了一项例行清理任务:删除 30 天前的临时表。Agent 识别了目标表名——tmp_backup_2025_04_01——然后调用 DROP TABLE 执行了删除。一切看起来正常。

但 Agent 没有停下来。它发现这个清理模式很有效,于是继续扫描数据库中所有以 tmp_ 开头的表,逐个删除。其中包括一张叫 tmp_prod_migration 的表——那是 DBA 前一天手工创建、用于线上数据迁移的临时表,数据还没迁移完。

没有刹车。没有“你确定吗?”的对话框。Agent 的设计假设是“无人工干预地自动完成任务”——这个假设在所有操作都是安全的、可逆的情况下成立,但在现实的生产环境中几乎不成立。

关键洞察:正确的问题不是“如何去掉人?”,而是“人在哪里最能增加价值?”全自动不是目标——可控的自动才是。

复合错误:没有断路器时会发生什么

单个 Agent 操作出错已经够糟糕了。但 Agent 系统最危险的特质是复合错误:一个操作的输出成为下一个操作的输入,错误在链条上不断放大。

以上面的删表事故为例:Agent 的第一个 DROP TABLE 成功了,这让它“认为”后续的删除操作也是安全的。没有机制去检查“这个表是不是还在用”。每一步的成功都在增强它对下一步的“信心”,直到灾难发生。如果你熟悉自动驾驶里的“脱控”概念(车辆脱离驾驶员控制),Agent 的复合错误本质上是一回事——系统在没有外部校验的情况下不断自我强化。

这就是断路器(circuit breaker)的价值:在操作链的某些节点上强制插入一个外部判断——也就是人工审批。不是说每一行代码都需要老板签字,而是说在特定风险级别上,系统应该暂停、请求确认、然后继续。

信任的渐变:从紧到松

很多团队在引入人工审批时的第一反应是:“这太慢了,Agent 应该自动完成工作。”他们的直觉有一定道理——如果每个操作都要人等,Agent 的价值就大打折扣了。

正确的做法不是“审批 or 不审批”的二元选择,而是信任渐变

这套逻辑和数据库的查询优化器很像:统计信息越充分,自动决策越可靠;统计信息过时或缺失时,回退到保守策略。

反模式:用 System Prompt 做“软刹车”

一个很常见的做法是在 System Prompt 里写:“删除任何东西之前先问用户。”看起来简单优雅——不用写代码,改一行 Prompt 就行。但这是一个非常脆弱的方案:

这篇文章聚焦的是运行时强制:不是在 LLM 的“大脑”里加约束,而是在工具执行层之上建立一个独立的审批关闸。LLM 可以思考、推理、建议——但最终,高风险的执行必须经过独立的审批通道。这个通道本身不依赖 LLM,而是由确定性的状态机和规则驱动。

关于工具执行层的风险分级基础,参见 Agent 命令执行安全,那里定义了从安全查询到危险写操作的风险类别。本文在此基础上构建人工决策层。

2. 审批决策框架:什么时候该暂停

所有审批逻辑的起点是一个简单的问题:这个操作有多危险?但“危险”不是一个可以模糊感知的概念——需要被量化和结构化。下面是一个四级风险分级模型。

四级风险分级

级别定义审批策略典型操作
AUTO 只读查询、无害信息获取 全自动,无需审批 lscat、查询日志、查看 API 状态
LOW_RISK 非破坏性写入、内部 API 调用 首次出现时审批,同类操作可记忆降级为 AUTO 创建文件、创建 PR、写入日志、调用内部 REST API
HIGH_RISK 外部 API 调用、配置变更、数据修改 必须审批,支持批量审批(同类操作打包) 修改 Nginx 配置、UPDATE 生产数据库、调用第三方 API
CRITICAL 删除、金融交易、用户数据访问、凭证变更 必须审批,且需要双人确认(two-person rule) DROP TABLE、执行转账、修改 IAM 策略、轮换密钥

这个分级不是随意定义的。它的底层逻辑是影响半径 × 不可逆程度

风险评分矩阵

在实际系统中,风险级别不能靠人工给每个操作手动标注——那无法扩展。一个更工程化的做法是风险评分矩阵

┌─────────────────┬──────────┬───────────┬───────────┐
│                 │  DEV     │  STAGING  │  PROD     │
├─────────────────┼──────────┼───────────┼───────────┤
│ READ            │  AUTO    │  AUTO     │  AUTO     │
│ WRITE (new)     │  AUTO    │  LOW      │  LOW      │
│ WRITE (modify)  │  LOW     │  LOW      │  HIGH     │
│ DELETE          │  LOW     │  HIGH     │  CRITICAL │
│ EXTERNAL API    │  LOW     │  HIGH     │  HIGH     │
│ FINANCIAL       │  HIGH    │  CRITICAL │  CRITICAL │
│ CREDENTIALS     │  HIGH    │  CRITICAL │  CRITICAL │
└─────────────────┴──────────┴───────────┴───────────┘

矩阵的三个维度:

  1. 操作类型(action_type):READ / WRITE / DELETE / EXTERNAL_API / FINANCIAL / CREDENTIALS
  2. 目标环境(target_environment):DEV / STAGING / PROD
  3. 影响半径(blast_radius):SINGLE_RESOURCE / SERVICE / ACCOUNT

最终风险级别 = max(矩阵查找值, 影响半径修正)。例如:一个 PROD 环境的 DELETE 操作,矩阵给出 CRITICAL;如果影响半径是 SINGLE_RESOURCE(比如删除一个日志文件),可以降为 HIGH。如果影响半径是 ACCOUNT(比如删除整个 AWS 账号下的资源),CRITICAL 以上——需要双人审批。

决策树

如果你不想实现完整的矩阵,下面这个决策树覆盖了 90% 的场景:


操作类型?
  ├── 只读?(list, get, read, describe, cat, grep)
  │     └── YES → AUTO ✓(无需审批,直接执行)
  │
  ├── 写入新资源?(create, put, new, write)
  │     ├── 目标 = 生产环境?
  │     │     ├── YES → LOW_RISK ○(首次审批,后续自动)
  │     │     └── NO  → AUTO ✓
  │     └── 涉及外部 API?
  │           └── YES → HIGH_RISK ▲(必须审批)
  │
  ├── 修改现有资源?(update, modify, patch, change)
  │     └── 目标 = 生产环境?
  │           ├── YES → HIGH_RISK ▲(必须审批)
  │           └── NO  → LOW_RISK ○
  │
  ├── 删除?(delete, drop, remove, rm)
  │     └── HIGH_RISK ▲(必须审批)
  │     └── 目标 = 生产环境?→ CRITICAL ■(双人审批)
  │
  ├── 涉及资金?(charge, transfer, payment, bill)
  │     └── CRITICAL ■(双人审批,无论环境)
  │
  └── 涉及凭证?(rotate, generate_key, change_password)
        └── CRITICAL ■(双人审批,无论环境)
  

审批门控代码示例

下面是一个 Python 实现,把你刚刚看到的决策树变成一个可调用的门控函数:

from enum import Enum
from dataclasses import dataclass
from typing import Optional

class RiskLevel(Enum):
    AUTO = "auto"           # 全自动
    LOW_RISK = "low_risk"   # 首次审批
    HIGH_RISK = "high_risk" # 必须审批
    CRITICAL = "critical"   # 双人审批

class ActionType(Enum):
    READ = "read"
    WRITE_NEW = "write_new"
    WRITE_MODIFY = "write_modify"
    DELETE = "delete"
    EXTERNAL_API = "external_api"
    FINANCIAL = "financial"
    CREDENTIALS = "credentials"

class Environment(Enum):
    DEV = "dev"
    STAGING = "staging"
    PROD = "prod"

@dataclass
class ActionContext:
    action_type: ActionType
    environment: Environment
    resource: str
    # 影响半径:single / service / account
    blast_radius: str = "single"

class ApprovalGate:
    """审批门控——根据操作类型和环境决定风险级别"""

    # 风险矩阵:rows=操作类型, cols=环境
    RISK_MATRIX = {
        ActionType.READ: {
            Environment.DEV: RiskLevel.AUTO,
            Environment.STAGING: RiskLevel.AUTO,
            Environment.PROD: RiskLevel.AUTO,
        },
        ActionType.WRITE_NEW: {
            Environment.DEV: RiskLevel.AUTO,
            Environment.STAGING: RiskLevel.LOW_RISK,
            Environment.PROD: RiskLevel.LOW_RISK,
        },
        ActionType.WRITE_MODIFY: {
            Environment.DEV: RiskLevel.LOW_RISK,
            Environment.STAGING: RiskLevel.LOW_RISK,
            Environment.PROD: RiskLevel.HIGH_RISK,
        },
        ActionType.DELETE: {
            Environment.DEV: RiskLevel.LOW_RISK,
            Environment.STAGING: RiskLevel.HIGH_RISK,
            Environment.PROD: RiskLevel.CRITICAL,
        },
        ActionType.EXTERNAL_API: {
            Environment.DEV: RiskLevel.LOW_RISK,
            Environment.STAGING: RiskLevel.HIGH_RISK,
            Environment.PROD: RiskLevel.HIGH_RISK,
        },
        ActionType.FINANCIAL: {
            Environment.DEV: RiskLevel.HIGH_RISK,
            Environment.STAGING: RiskLevel.CRITICAL,
            Environment.PROD: RiskLevel.CRITICAL,
        },
        ActionType.CREDENTIALS: {
            Environment.DEV: RiskLevel.HIGH_RISK,
            Environment.STAGING: RiskLevel.CRITICAL,
            Environment.PROD: RiskLevel.CRITICAL,
        },
    }

    @classmethod
    def evaluate_risk(cls, ctx: ActionContext) -> RiskLevel:
        """评估操作风险级别"""
        base_risk = cls.RISK_MATRIX[ctx.action_type][ctx.environment]

        # 影响半径修正
        if ctx.blast_radius == "account" and base_risk != RiskLevel.CRITICAL:
            return RiskLevel.CRITICAL
        elif ctx.blast_radius == "service" and base_risk == RiskLevel.HIGH_RISK:
            return RiskLevel.CRITICAL

        return base_risk

    @classmethod
    def requires_approval(cls, ctx: ActionContext) -> bool:
        """是否需要人工审批"""
        return cls.evaluate_risk(ctx) != RiskLevel.AUTO


# --- 使用示例 ---
ctx = ActionContext(
    action_type=ActionType.DELETE,
    environment=Environment.PROD,
    resource="tmp_backup_2025_04_01",
    blast_radius="single"
)

risk = ApprovalGate.evaluate_risk(ctx)
print(f"风险级别: {risk.value}")           # CRITICAL
print(f"需要审批: {ApprovalGate.requires_approval(ctx)}")  # True

注意这个实现和 Agent 工具权限控制 的关系:工具权限控制负责”Agent 能不能调用这个工具”(调用前检查),而这里的审批门控负责”调用结果要不要执行”(调用后、执行前)。两者是互补的——一个在工具注册时生效,一个在工具执行路径上生效。当工具权限控制标记某个操作为 ASK_APPROVAL 时,本文的审批状态机接管后续流程。

3. 审批状态机设计

有了”什么时候该审批”的决策框架后,下一个问题是”怎么审批”——即审批请求的生命周期如何管理。这里需要的是一个形式化的状态机,而不是一堆 if-else。

状态定义与状态图

审批状态机有以下状态和转换路径:


                    ┌──────────┐
                    │   IDLE   │  ← Agent 正常工作状态
                    └────┬─────┘
                         │ 触发高风险操作
                         ▼
                    ┌───────────┐
                    │ PROPOSING │  ← Agent 构建审批请求
                    └─────┬─────┘
                          │ 提案完成
                          ▼
                   ┌────────────────┐
                   │ WAITING_APPROVAL│ ← 等待人工决策
                   └───┬──┬──┬──┬──┘
                       │  │  │  │
          ┌────────────┘  │  │  └────────────┐
          ▼               ▼  ▼               ▼
    ┌──────────┐  ┌──────────┐ ┌──────────┐ ┌──────────┐
    │ APPROVED │  │ REJECTED │ │ MODIFIED │ │ EXPIRED  │
    └────┬─────┘  └────┬─────┘ └────┬─────┘ └────┬─────┘
         │             │            │            │
         ▼             ▼            ▼            ▼
    ┌──────────┐  ┌──────────┐ ┌──────────┐ ┌──────────┐
    │ RESUMING │  │ ROLLBACK │ │RE-PROPOSE│ │ESCALATE │
    └────┬─────┘  └──────────┘ └──────────┘ └──────────┘
         │
         ▼
    ┌───────────┐
    │ EXECUTING │  ← Agent 恢复执行
    └─────┬─────┘
          │ 执行完成
          ▼
    ┌───────────┐
    │ COMPLETED │  ← 回到 IDLE
    └───────────┘
  

这个状态机的设计遵循一个原则:每个中间状态都有明确的退出路径。没有任何状态能把 Agent 永久卡住。即使是 WAITING_APPROVAL,也有 EXPIRED 作为兜底出口——超时不是系统故障,是预期的控制路径。

状态转换规则

当前状态触发事件目标状态守卫条件
IDLEtool_call_risk ≥ HIGH_RISKPROPOSINGrisk_level != AUTO
PROPOSINGrequest_readyWAITING_APPROVAL请求格式有效
WAITING_APPROVALhuman_approveAPPROVED审批人在授权名单中
WAITING_APPROVALhuman_rejectREJECTED审批人有权拒绝
WAITING_APPROVALhuman_modifyMODIFIED请求允许修改
WAITING_APPROVALtimeoutEXPIRED超过 deadline
APPROVEDresume_agentRESUMING检查点状态有效
MODIFIEDre_proposePROPOSING将修改后的参数重提案
EXPIREDescalateWAITING_APPROVAL升级链中还有下一级审批人
EXPIREDfallback_denyREJECTED升级链耗尽
RESUMINGcheck_idempotentEXECUTING 或 IDLE检查幂等性键
EXECUTINGexecution_doneCOMPLETED操作完成(成功或失败)
COMPLETEDcleanupIDLE释放资源

边界情况:超时之后的审批到了

一个容易被忽略但实际经常发生的情况:审批请求超时了(EXPIRED),系统触发了升级链或自动拒绝。但就在这之后几秒,原始审批人姗姗来迟地点击了”批准”。

处理策略:拒绝迟到的审批。状态机已经离开 WAITING_APPROVAL 状态,迟到的审批是一个无效事件——系统应该记录这个事件并通知审批人”操作已超时处理,您的批准未被采纳”。

同理:如果审批到达时 Agent 的状态已经发生了变化(例如用户手动取消了原始操作),审批也应该是无效的。这就是为什么需要状态机而不是简单的回调函数——状态的线性演进确保了审批在正确的时间窗口内有效。

检查点持久化:暂停和恢复的基石

Agent 在进入 PROPOSING 状态之前必须保存当前状态——这就是检查点(checkpoint)。没有检查点,Agent 在审批通过后无法准确恢复执行。

检查点至少包含:

@dataclass
class AgentCheckpoint:
    “””Agent 暂停点——审批通过后从这里恢复”””
    checkpoint_id: str
    agent_id: str
    pending_action: dict          # 待审批的操作
    conversation_snapshot: list   # 对话历史快照
    tool_call_stack: list         # 工具调用栈
    task_state: dict              # 任务进度
    created_at: float             # Unix 时间戳
    ttl_seconds: int = 600        # 检查点有效期

恢复时,Agent 从检查点加载状态,不等 LLM 重新推理——直接跳转到执行步骤。这避免了”审批通过后 LLM 又纠结了半天要不要执行”的问题。恢复逻辑是确定性的。

幂等性:同样的批准不会执行两次

幂等性是分布式系统的基本要求,审批系统也不例外。场景:操作 A 被批准执行了;由于网络延迟,审批通道又推送了一次”已批准”事件。

幂等性键的设计:

import hashlib

def generate_action_id(agent_id, action, params):
    canonical = “%s:%s:%s” % (agent_id, action, sorted(params.items()))
    return hashlib.sha256(canonical.encode()).hexdigest()[:16]

# 使用
action_id = generate_action_id(
    “agent-01”, “DROP TABLE”,
    {“table”: “tmp_backup_2025_04_01”}
)

# 在执行前检查
if execution_log.contains(action_id):
    print(“操作 %s 已执行过,跳过” % action_id)
    return  # 幂等性保护

执行日志是一个持久化的集合(Redis Set、数据库表都可以),用于记录所有已执行的操作 ID。在执行前检查、执行后写入。这两个操作应该在同一事务中(或者使用 Redis 的 SETNX 实现原子性)。

状态机完整实现

from enum import Enum
from datetime import datetime, timedelta
from typing import Optional
import uuid
import hashlib

class ApprovalState(Enum):
    IDLE = "idle"
    PROPOSING = "proposing"
    WAITING_APPROVAL = "waiting_approval"
    APPROVED = "approved"
    REJECTED = "rejected"
    MODIFIED = "modified"
    EXPIRED = "expired"
    RESUMING = "resuming"
    EXECUTING = "executing"
    COMPLETED = "completed"


def generate_action_id(agent_id: str, action: str, params: dict) -> str:
    canonical = f"{agent_id}:{action}:{sorted(params.items())}"
    return hashlib.sha256(canonical.encode()).hexdigest()[:16]


class ExecutionLog:
    def __init__(self):
        self._executed: set[str] = set()
    def contains(self, action_id: str) -> bool:
        return action_id in self._executed
    def record(self, action_id: str):
        self._executed.add(action_id)

execution_log = ExecutionLog()


class ApprovalStateMachine:

    def __init__(self, request_id: str = None):
        self.request_id = request_id or str(uuid.uuid4())[:8]
        self.state = ApprovalState.IDLE
        self.checkpoint: Optional[dict] = None
        self.action_id: Optional[str] = None
        self.deadline: Optional[datetime] = None
        self.modified_params: Optional[dict] = None
        self.escalation_chain: list[str] = []
        self.current_escalation: int = 0

    def propose(self, action: dict, deadline_seconds: int = 300,
                escalation: list[str] = None):
        assert self.state == ApprovalState.IDLE
        self.checkpoint = {
            "action": action,
            "timestamp": datetime.now().isoformat(),
        }
        self.deadline = datetime.now() + timedelta(seconds=deadline_seconds)
        self.escalation_chain = escalation or []
        self.current_escalation = 0
        self.action_id = generate_action_id(
            "agent-01", action["tool"], action["params"])
        self.state = ApprovalState.PROPOSING
        self.state = ApprovalState.WAITING_APPROVAL
        return self

    def approve(self, approver: str) -> bool:
        if self.state != ApprovalState.WAITING_APPROVAL:
            return False
        if self._is_expired():
            return self._handle_expired()
        self.state = ApprovalState.APPROVED
        return True

    def reject(self, approver: str, reason: str = "") -> bool:
        if self.state != ApprovalState.WAITING_APPROVAL:
            return False
        self.state = ApprovalState.REJECTED
        return True

    def modify(self, approver: str, new_params: dict) -> bool:
        if self.state != ApprovalState.WAITING_APPROVAL:
            return False
        if self._is_expired():
            return self._handle_expired()
        self.modified_params = new_params
        self.state = ApprovalState.MODIFIED
        self.state = ApprovalState.PROPOSING
        self.state = ApprovalState.WAITING_APPROVAL
        self.deadline = datetime.now() + timedelta(seconds=300)
        return True

    def handle_timeout(self) -> bool:
        if self.state != ApprovalState.WAITING_APPROVAL:
            return False
        if not self._is_expired():
            return False
        self.state = ApprovalState.EXPIRED
        if self.current_escalation < len(self.escalation_chain):
            next_approver = self.escalation_chain[self.current_escalation]
            self.current_escalation += 1
            self.deadline = datetime.now() + timedelta(seconds=300)
            self.state = ApprovalState.WAITING_APPROVAL
            return True
        else:
            self.state = ApprovalState.REJECTED
            return False

    def resume(self) -> Optional[dict]:
        if self.state != ApprovalState.APPROVED:
            return None
        if execution_log.contains(self.action_id):
            self.state = ApprovalState.IDLE
            return None
        self.state = ApprovalState.RESUMING
        self.state = ApprovalState.EXECUTING
        return self.checkpoint["action"]

    def complete(self):
        assert self.state == ApprovalState.EXECUTING
        execution_log.record(self.action_id)
        self.state = ApprovalState.COMPLETED
        self.state = ApprovalState.IDLE

    def _is_expired(self) -> bool:
        return datetime.now() > self.deadline

    def _handle_expired(self) -> bool:
        self.state = ApprovalState.EXPIRED
        return self.handle_timeout()


sm = ApprovalStateMachine()
sm.propose(
    action={"tool": "DROP TABLE",
            "params": {"table": "tmp_backup_2025_04_01"}},
    deadline_seconds=300,
    escalation=["[email protected]", "[email protected]"]
)
sm.approve("[email protected]")
action = sm.resume()
if action:
    print(f"执行: {action}")
    sm.complete()

这个实现覆盖了完整的生命周期。生产环境中,状态应该持久化到数据库或 Redis 中(而不是内存),以支持进程重启和跨进程审批。handle_timeout() 应由定时任务驱动,而不是在 approve 时才检测。

4. 审批请求 Schema:让人能做出正确决策

审批流的前半段讲的是“什么时候让 Agent 停”,这一节讲的是“停下来之后怎么办”。审批请求不是一个简单的 yes/no 弹窗——它是人工决策的界面。界面设计得好,审批人能快速做出正确决策;设计得不好,审批人要么盲目批准,要么因信息不全而卡住。

为什么需要结构化 Schema

想象你收到两条审批通知:

通知 A:“是否删除 tmp_backup_2025_04_01?[批准/拒绝]”

通知 B:“Agent-01 请求在生产数据库 db-prod-1 上执行 DROP TABLE tmp_backup_2025_04_01。该表由 DBA 昨日创建(数据迁移中,预计还需 2 小时完成)。操作风险等级:CRITICAL。Agent 推断此表为 30 天前创建,符合清理策略,但实际创建时间为昨天。建议:拒绝并更新清理策略。5 分钟内无响应将升级至 oncall-dba。修改参数:允许。[批准/拒绝/修改参数]”

通知 A 的审批人只能基于自己的知识做判断——如果她不知道这个表是昨天创建的,她会批准。通知 B 的审批人拥有足够的上下文来做出正确决策。两者之间的差异就是结构化 Schema 的力量

ApprovalRequest JSON Schema

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://xslyl.com/schemas/approval-request-v1.json",
  "title": "Agent Approval Request",
  "description": "Agent 请求人工审批时提交的结构化数据",
  "type": "object",
  "required": [
    "action",
    "parameters",
    "risk_level",
    "proposed_by",
    "context_window",
    "deadline",
    "idempotency_key"
  ],
  "properties": {
    "action": {
      "type": "object",
      "description": "Agent 请求执行的操作",
      "required": ["tool_name", "description"],
      "properties": {
        "tool_name": {
          "type": "string",
          "description": "工具名称,如 DROP_TABLE, SEND_EMAIL, CREATE_PR"
        },
        "description": {
          "type": "string",
          "description": "人类可读的描述"
        }
      }
    },
    "parameters": {
      "type": "object",
      "description": "工具调用的参数(key-value)"
    },
    "risk_level": {
      "type": "string",
      "enum": ["AUTO", "LOW_RISK", "HIGH_RISK", "CRITICAL"],
      "description": "风险级别"
    },
    "proposed_by": {
      "type": "object",
      "description": "提案 Agent 信息",
      "required": ["agent_id", "agent_name"],
      "properties": {
        "agent_id": {"type": "string"},
        "agent_name": {"type": "string"},
        "agent_version": {"type": "string"}
      }
    },
    "context_window": {
      "type": "object",
      "description": "帮助审批人理解上下文的信息",
      "required": ["reasoning", "relevant_tool_outputs"],
      "properties": {
        "reasoning": {
          "type": "string",
          "description": "Agent 的推理链"
        },
        "relevant_tool_outputs": {
          "type": "array",
          "description": "相关工具输出摘要",
          "items": {"type": "object"}
        },
        "risk_justification": {
          "type": "string",
          "description": "风险级别判定依据"
        },
        "conversation_snippet": {
          "type": "string",
          "description": "触发此操作的用户对话片段"
        }
      }
    },
    "deadline": {
      "type": "string",
      "format": "date-time",
      "description": "审批截止时间(ISO 8601)"
    },
    "idempotency_key": {
      "type": "string",
      "description": "幂等性键——防止重复执行"
    },
    "fallback_on_timeout": {
      "type": "string",
      "enum": ["auto_deny", "auto_approve", "escalate"],
      "default": "escalate",
      "description": "超时默认行为"
    },
    "fallback_on_reject": {
      "type": "string",
      "enum": ["rollback", "notify", "log_only"],
      "default": "notify",
      "description": "拒绝后 Agent 的后续动作"
    },
    "escalation_chain": {
      "type": "array",
      "items": {"type": "string"},
      "description": "升级链:按顺序的审批人列表"
    },
    "modification_allowed": {
      "type": "boolean",
      "default": true,
      "description": "审批人是否可以修改参数后批准"
    }
  }
}

核心字段解析

context_window(上下文窗口)——这是整个 Schema 中最重要的字段。没有它,审批人无法做出有意义的决策。它包括:

fallback 字段——定义了“没人回应时怎么办”和“被拒绝后怎么办”。这两个都不是可选项;没有明确定义的 fallback,系统会在异常路径上表现出未定义行为。

默认原则:超时回退默认 auto_deny(而非 auto_approve),拒绝回退默认 notify(记录事件并通知 Agent 终止当前任务)。安全默认值比便利默认值更重要——宁可多拒绝一次,不可误执行一次。

Python 实现:ApprovalRequest 数据结构

from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from typing import Any, Optional
import json
import hashlib

def generate_action_id(agent_id, action, params):
    canonical = "%s:%s:%s" % (agent_id, action, sorted(params.items()))
    return hashlib.sha256(canonical.encode()).hexdigest()[:16]


@dataclass
class ApprovalRequest:
    """Approval request -- structured data for human decision"""

    # Required fields
    action: dict
    parameters: dict
    risk_level: str
    proposed_by: dict
    context_window: dict
    idempotency_key: str

    # Optional fields with safe defaults
    deadline: str = field(default_factory=lambda:
        (datetime.now() + timedelta(seconds=300)).isoformat())
    fallback_on_timeout: str = "escalate"
    fallback_on_reject: str = "notify"
    escalation_chain: list = field(default_factory=list)
    modification_allowed: bool = True

    def to_json(self):
        return json.dumps(asdict(self), ensure_ascii=False, indent=2)

    def to_notification(self, platform="slack"):
        risk_icon = "[CRITICAL]" if self.risk_level == "CRITICAL" else "[HIGH]"
        lines = [
            "*Agent Approval Request*",
            "*Action:* %s" % self.action.get("description", "N/A"),
            "*Risk Level:* %s %s" % (risk_icon, self.risk_level),
            "*Proposed By:* %s" % self.proposed_by.get("agent_name", "N/A"),
            "*Deadline:* %s" % self.deadline,
            "*Reasoning:* %s..." % (
                self.context_window.get("reasoning", "N/A")[:200]),
        ]
        if self.escalation_chain:
            chain = " -> ".join(self.escalation_chain)
            lines.append("*Escalation Chain:* %s" % chain)
        return "\n".join(lines)

    @classmethod
    def build(cls, action, parameters, risk_level, agent_info,
              reasoning, tool_outputs, risk_justification="",
              conversation_snippet="", **kwargs):
        context = dict(
            reasoning=reasoning,
            relevant_tool_outputs=tool_outputs,
            risk_justification=risk_justification or (
                "Evaluated by ApprovalGate: %s" % risk_level),
            conversation_snippet=conversation_snippet,
        )
        idemp_key = generate_action_id(
            agent_info.get("agent_id", "unknown"),
            action.get("tool_name", ""),
            parameters
        )
        return cls(
            action=action,
            parameters=parameters,
            risk_level=risk_level,
            proposed_by=agent_info,
            context_window=context,
            idempotency_key=idemp_key,
            **kwargs
        )


# Usage example
req = ApprovalRequest.build(
    action=dict(
        tool_name="DROP_TABLE",
        description="Drop tmp_backup_2025_04_01 in production"
    ),
    parameters=dict(database="db-prod-1", table="tmp_backup_2025_04_01"),
    risk_level="CRITICAL",
    agent_info=dict(
        agent_id="agent-01",
        agent_name="Ops Cleanup Agent",
        agent_version="v2.3.1"
    ),
    reasoning=(
        "1. User requested cleanup of temp tables older than 30 days. "
        "2. Scanned DB, found table matching tmp_ pattern. "
        "3. Inferred from name date that table is 30+ days old. "
        "NOTE: Did NOT check actual creation time via DESCRIBE TABLE."
    ),
    tool_outputs=[
        dict(tool="SCAN_TABLES",
             output=["tmp_backup_2025_04_01", "tmp_prod_migration"]),
        dict(tool="DESCRIBE_TABLE",
             output=dict(created_at="yesterday",
                         description="data migration in progress"))
    ],
    risk_justification=(
        "DELETE x PROD = CRITICAL. "
        "Mitigation: single table only, "
        "but table is actively in use -- maintain CRITICAL."
    ),
    escalation_chain=["[email protected]", "[email protected]"]
)

print(req.to_json())
print(req.to_notification())

上下文富化的实战策略

上面这个示例展示了上下文富化的核心价值:Agent 的推理链中有一个明显的逻辑错误(基于表名推断创建时间,而非实际查询),而且 DESCRIBE TABLE 的输出明确显示该表是昨天创建、正在使用中。审批请求把这些信息全部暴露了出来,审批人一眼就能发现问题。

几个让上下文更有用的策略:

反模式:发送太少上下文

最危险的审批请求就是那个只写了一句“删除这个?[Y/n]”的。心理学上,当信息太少时,人会倾向于默认批准——因为拒绝的成本比批准高(拒绝意味着需要去搞清楚为什么)。这被称为“审批疲劳”。

对抗审批疲劳的唯一方法是提前消化复杂性:Agent 先做推理和分析,把关键信息提取出来呈现给审批人。审批人的工作不是“搞懂发生了什么”,而是“在一个已经整理好的案卷上做判断”。这两者的认知负荷差异是巨大的。

一个好的审批请求应该让审批人在 30 秒内完成决策——如果超过了,说明上下文还不够富化。

5. 超时、升级与降级策略

审批请求发出去了,但审批人在哪里?真实世界不是理想世界——审批人会休假、生病、换团队,或者只是忽略了通知。一个不处理超时的审批系统等于没有审批系统。

审批 TTL:按风险级别配置超时

不是所有操作都能等同样长的时间。CRITICAL 操作(比如删除生产数据库)如果不能立即得到审批,应该快速自动拒绝,而不是让 Agent 无限等待。相反,LOW_RISK 操作可以给审批人更充裕的响应时间:

风险级别默认 TTL超时动作理由
LOW_RISK 24 小时 可配置:auto_deny 或 auto_approve 影响小,可逆,可以给审批人充裕时间;特定场景可自动批准以保持流程速度
HIGH_RISK 4 小时 escalate(升级) 影响中等,需要审批但窗口不必太紧;超时后自动升级而非直接拒绝
CRITICAL 30 分钟 auto_deny(自动拒绝) 不可逆操作,宁可不执行也不能在无审批的情况下执行

TTL 的设计原则是风险越高,窗口越短。这不是为了给审批人施加压力,而是因为高风险操作不应该长时间处于“悬而未决”的状态——Agent 被卡住,任务的上下文在流失,用户的等待成本在累积。

升级链:从审批人到团队负责人到值班工程师

升级链(escalation chain)定义了“主审批人没有响应时,审批请求应该转给谁”:


审批人(0-4h)
  └─ 超时 → 团队负责人(4-8h)
        └─ 超时 → 值班工程师(8-12h)
              └─ 超时 → 自动拒绝 + 通知所有上游
  

升级链不是一个静态的字符串列表——它应该和组织的值班排班(on-call rotation)集成。下面是升级策略的核心实现:

from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional, Callable
from enum import Enum

class TimeoutAction(Enum):
    AUTO_DENY = "auto_deny"
    AUTO_APPROVE = "auto_approve"
    ESCALATE = "escalate"

@dataclass
class EscalationPolicy:
    """升级策略——定义审批超时后的行为链"""

    risk_level: str
    ttl_seconds: int
    timeout_action: str  # "escalate" | "auto_deny" | "auto_approve"
    escalation_contacts: list  # 从主审批人到最终兜底
    max_escalations: int = 3

    # 预定义策略
    POLICIES = {
        "LOW_RISK": dict(ttl=86400, action="auto_deny"),
        "HIGH_RISK": dict(ttl=14400, action="escalate"),
        "CRITICAL": dict(ttl=1800, action="auto_deny"),
    }

    @classmethod
    def for_risk(cls, risk_level: str,
                 contacts: list) -> "EscalationPolicy":
        cfg = cls.POLICIES.get(risk_level, cls.POLICIES["HIGH_RISK"])
        return cls(
            risk_level=risk_level,
            ttl_seconds=cfg["ttl"],
            timeout_action=cfg["action"],
            escalation_contacts=contacts,
        )

    def get_current_approver(self, escalation_level: int) -> Optional[str]:
        if escalation_level < len(self.escalation_contacts):
            return self.escalation_contacts[escalation_level]
        return None  # 升级链耗尽

    def next_escalation(self, current_level: int):
        """推进到下一个升级级别"""
        next_level = current_level + 1
        if next_level >= self.max_escalations:
            return -1, None  # 耗尽,触发 final_timeout_action
        contact = self.get_current_approver(next_level)
        return next_level, contact

    def final_timeout_action(self) -> str:
        """升级链耗尽后的最终动作"""
        if self.timeout_action == "escalate":
            return "auto_deny"  # 升级链耗尽→安全兜底
        return self.timeout_action


# --- 使用示例 ---
policy = EscalationPolicy.for_risk(
    "HIGH_RISK",
    contacts=["[email protected]", "[email protected]", "[email protected]"]
)

level = 0
approver = policy.get_current_approver(level)
print(f"当前审批人: {approver}")

# 模拟超时后升级
level, approver = policy.next_escalation(level)
print(f"升级后审批人: {approver}")  # [email protected]

# 再次超时,继续升级
level, approver = policy.next_escalation(level)
print(f"再次升级: {approver}")  # [email protected]

# 第三次超时,升级链耗尽
level, approver = policy.next_escalation(level)
print(f"升级链耗尽: level={level}, 最终动作={policy.final_timeout_action()}")
# → level=-1, 最终动作=auto_deny

死信队列:过期审批的归宿

审批请求超时后发生了什么?如果只是把它从等待队列中移除,那就是信息丢失——审批请求携带着 Agent 的完整推理链和上下文,这些都是有价值的审计数据。

死信队列(Dead Letter Queue, DLQ)的设计:

死信队列不是垃圾桶——它是审批系统的历史存档。关于审批决策的完整审计日志设计,参见 Agent 审计日志设计

断路器:当审批系统本身挂了

审批服务不可用时的选择是这个系统设计中最棘手的决策之一——Agent 正在请求审批,但审批 API 返回 503。此时有两个选择:

模式行为适用场景
Fail-Open(允许执行) 审批系统不可用时,默认批准低风险操作并记录审计日志 非生产环境、LOW_RISK 操作、已建立高度信任的 Agent
Fail-Closed(拒绝执行) 审批系统不可用时,拒绝所有需要审批的操作 生产环境、HIGH_RISK 和 CRITICAL 操作——默认安全

推荐默认:Fail-Closed。审批系统不可用是一个异常事件,应该触发 PagerDuty 告警,而不是悄悄让 Agent 绕过审批。唯一的例外是 LOW_RISK 操作在非生产环境——可以配置 Fail-Open 以保持开发流程顺畅,但操作必须被记录为“无审批执行”。

审批人离职:待审批的请求怎么办

当审批人离开公司时,他名下的待审批请求不能跟着消失。审批人 offboarding 流程必须包含:

值班排班集成

生产环境中的审批路由不应该硬编码邮件地址——它应该和 PagerDuty、Opsgenie 或其他值班系统集成。审批请求的 routing 逻辑:

def resolve_approver(risk_level: str, context: dict) -> str:
    """根据风险级别和上下文决定审批人"""
    if risk_level == "CRITICAL":
        # CRITICAL: 当前值班 DBA + 安全负责人(双人审批)
        return resolve_oncall("dba-primary"), resolve_oncall("security")
    elif risk_level == "HIGH_RISK":
        # HIGH_RISK: 按影响域路由
        domain = context.get("domain", "general")
        return resolve_oncall(f"{domain}-oncall")
    else:
        # LOW_RISK: 团队负责人
        return context.get("team_lead", "[email protected]")


def resolve_oncall(schedule: str) -> str:
    """查询 PagerDuty/Opsgenie 获取当前值班人"""
    # 实际实现中调用 PagerDuty REST API 或 Opsgenie API
    oncall = pagerduty_client.get_oncall(schedule_id=schedule)
    return oncall.email

关于审批队列健康监控和告警,参见 Agent 可观测性。关于审批被拒绝后 Agent 如何恢复,参见 Agent 错误恢复

6. 框架对比:LangGraph vs AgentGraph vs AutoGen vs CrewAI

前面四节讨论的审批流设计是框架无关的——你可以用任何 Agent 框架实现。但不同的框架对 HITL(Human-in-the-Loop)的原生支持程度差异巨大。下面是一次全面的横向对比。

HITL 能力矩阵

特性LangGraphAgentGraph (trpc-agent-go)AutoGenCrewAI
中断机制 interrupt() + Command(resume=...) graph.Interrupt() + ResumeCommand UserProxyAgent + human_input_mode human_input flag + ask_human_input
检查点持久化 内置 SqliteSaver / PostgresSaver 内置 checkpoint 存储(Redis / DB) 无内置持久化 无内置 checkpoint
审批超时 需手动实现 graph.Interrupt() 配合 context deadline 通过 timeout context 手动实现 非原生支持,需手动实现
多节点审批 任意节点可 interrupt() 任意节点可 graph.Interrupt() 每个 workflow 一个 UserProxyAgent 每个 Task 可设置 human_input=True
升级链支持 无内置 无内置 无内置 无内置
生态成熟度 成熟,文档广泛(v0.2+) 快速增长,腾讯生态(v0.6+) 成熟,微软支持 成熟,API 简洁

一个关键的观察:没有框架原生支持完整的审批流——升级链、死信队列、断路器、审批人 offboarding 这些都需要自己实现。框架提供的是基础的“暂停-恢复”原语。本文的状态机和 Schema 设计填补了这些框架在上层审批逻辑上的空白。

各框架 HITL 模式代码示例

LangGraph (Python):

from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command

def high_risk_node(state):
    # 暂停并等待人工审批
    decision = interrupt("Need approval: DELETE table tmp_prod")
    return state

graph = StateGraph(State)
graph.add_node("high_risk_op", high_risk_node)
app = graph.compile(checkpointer=SqliteSaver.from_conn_string(":memory:"))

# 审批通过后恢复
app.invoke(Command(resume={"approved": True}))

AgentGraph / trpc-agent-go (Go):

func highRiskNode(ctx context.Context, state graph.State) (*graph.State, error) {
    // 暂停并等待人工审批
    newState, err := graph.Interrupt(ctx, state,
        "approval_key", ApprovalPayload{Action: "DELETE", Table: "tmp_prod"})
    if err != nil {
        return nil, err
    }
    return &newState, nil
}

// 审批通过后恢复
resumeCmd := graph.NewResumeCommand("approval_key", map[string]any{"approved": true})
app.Resume(ctx, resumeCmd)

AutoGen (Python):

from autogen import UserProxyAgent, AssistantAgent

user_proxy = UserProxyAgent(
    name="human",
    human_input_mode="TERMINATE",  # 每次工具调用前请求人工输入
)

assistant = AssistantAgent(name="agent", llm_config=llm_config)
user_proxy.initiate_chat(assistant, message="Clean up temp tables")

CrewAI (Python):

from crewai import Task, Agent

task = Task(
    description="Drop temp tables in production",
    agent=db_agent,
    human_input=True,  # 此任务需要人工审批
)
crew.kickoff()

框架选择决策指南

基于 HITL 需求选择框架:

关于 Agent 协作中的编排模式(Supervisor、Hierarchical、Swarm),参见 多 Agent 编排。关于工具接口设计,参见 Agent 工具设计

7. 多 Agent 审批与 CLI 交互

前面讨论的场景是“一个 Agent 请求一个审批人”。但在多 Agent 系统中,情况更复杂:Agent A 委托 Agent B 执行任务,Agent B 需要审批——审批请求如何传播?审批界面(CLI、Web、Slack)怎么设计才不打断人的工作流?

多 Agent 场景:审批请求的传播

考虑这个场景:用户对协调 Agent 说“清理数据库中的临时表”。协调 Agent 将任务分解为两个子任务——一个委托给扫描 Agent(扫描表),一个委托给执行 Agent(删除表)。执行 Agent 在准备删除表时触发了 HIGH_RISK 审批。

关键问题:审批请求应该由谁发出?


用户 → 协调 Agent → 扫描 Agent(AUTO — 直接执行)
                  → 执行 Agent(HIGH_RISK — 需要审批)
                        │
                        ▼
                  审批请求(包含委托链: 协调 Agent → 执行 Agent)
                        │
                        ▼
                  审批人(看到完整上下文:谁委托的、为什么、要删什么)
  

设计原则:由执行危险操作的 Agent 发出审批请求。审批请求的 proposed_by 字段记录发出者,delegation_chain 字段记录完整的委托链:

{
  "proposed_by": {
    "agent_id": "exec-agent-03",
    "agent_name": "Table Cleanup Executor"
  },
  "delegation_chain": [
    {"agent_id": "coord-agent-01", "role": "User Request Handler"},
    {"agent_id": "exec-agent-03", "role": "Table Cleanup Executor"}
  ],
  "context_window": {
    "reasoning": "协调 Agent 委托我删除 30 天前的临时表...",
    "original_user_request": "清理数据库中的临时表"
  }
}

这样审批人看到的不只是“某个 Agent 要删表”,而是完整的上下文——“用户要求清理数据库 → 协调 Agent 分解任务 → 执行 Agent 识别出目标表 → 现在请求删除”。

集中式审批队列 vs 分布式审批处理

在多 Agent 系统中,有两种审批架构:

架构描述优点缺点
集中式审批队列 所有 Agent 的审批请求进入一个统一队列,由审批服务统一管理状态机 单一视图、统一超时管理、易审计 审批服务成为单点、耦合度高
分布式审批处理 每个 Agent 管理自己的审批状态机,审批服务只做通知路由 低耦合、Agent 自治、容错性好 审批视图分散、全局超时策略难统一

推荐:集中式队列 + 分布式执行。审批状态机集中在审批服务中管理(统一超时、升级、审计),但 Agent 保留自己的 checkpoint(不依赖审批服务来恢复执行)。审批服务挂了,Agent 可以等待恢复;Agent 挂了,审批服务保留了完整的审批状态。

CLI 审批 UX:终端里的审批请求

对于在终端中使用 Agent 的开发者(OpenCode、Claude Code、Codex 等),审批请求的呈现方式直接影响体验:

模式 1:内联提示(Inline Prompt)——同步,简单:

Agent: 需要删除生产数据库中的表 tmp_prod_migration
       风险级别: CRITICAL ■
       推理: 该表匹配清理策略(名称含 tmp_,日期推断超过 30 天)
       注意: DESCRIBE TABLE 显示该表昨天创建,数据迁移进行中

       批准此操作? [Y=批准 / n=拒绝 / m=修改参数] Y

适合交互式 CLI,审批过程阻塞终端但直观。缺点是审批人在终端里等——不适合长时间等待的场景。

模式 2:后台通知(Background Notification)——异步,非阻塞:

Agent: 已将操作加入审批队列。审批 ID: apr-42a9
       查看所有待审批操作: /approvals
       批准: /approve apr-42a9
       拒绝: /reject apr-42a9 "表还在使用中"

Agent 不阻塞,用户可以继续其他工作,稍后回来处理审批。

模式 3:Web 控制台(Web Dashboard)——全功能,团队友好:

Slack / 邮件 / 飞书通知集成

审批请求不应该要求审批人去某个特定平台查看——它应该主动触达审批人所在的通信渠道:

import requests

def send_approval_webhook(request, platform="slack"):
    """通过 webhook 向通信平台发送审批通知"""

    payload = {
        "text": f"*Agent 审批请求*",
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": (
                        f"*操作:* {request.action.get('description')}\n"
                        f"*风险级别:* {request.risk_level}\n"
                        f"*Agent:* {request.proposed_by.get('agent_name')}\n"
                        f"*截止时间:* {request.deadline}"
                    )
                }
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "批准"},
                        "style": "primary",
                        "value": request.idempotency_key,
                        "action_id": "approve_action"
                    },
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "拒绝"},
                        "style": "danger",
                        "value": request.idempotency_key,
                        "action_id": "reject_action"
                    }
                ]
            }
        ]
    }

    webhook_url = config.get(f"{platform}_webhook_url")
    requests.post(webhook_url, json=payload)

审批人可以在 Slack 里直接点击“批准”或“拒绝”按钮完成决策——不需要切换到另一个系统。这是降低审批摩擦的关键:让审批在审批人已经在的地方发生

审计轨迹:不可否认的审批证据

每一次审批决策都必须留下不可否认的审计记录。审计事件至少包含:

{
  "event_id": "audit-ev-8f3a2",
  "timestamp": "2026-05-31T14:22:10Z",
  "event_type": "approval.decided",
  "subject": {
    "approval_request_id": "apr-42a9",
    "action": "DROP_TABLE",
    "risk_level": "CRITICAL"
  },
  "actor": {
    "approver_email": "[email protected]",
    "approver_name": "Alice Zhang",
    "authentication_method": "SSO + 2FA"
  },
  "decision": {
    "outcome": "rejected",
    "reason": "表仍在数据迁移使用中",
    "time_to_decide_seconds": 47
  },
  "evidence_hash": "sha256:abcd1234...",
  "evidence_chain": ["prev-hash-xyz", "sha256:abcd1234..."]
}

evidence_chain 是一个哈希链——每个审计事件包含前一个事件的哈希,形成一条不可篡改的证据链。关于审计日志的完整设计,参见 Agent 审计日志设计。关于审批上下文如何在 Agent 之间传递,参见 Agent 上下文协议设计

8. 测试与投产 Checklist

审批系统和其他基础设施一样,需要在投产前经过严格的测试。但审批系统的测试有它的特殊性——核心路径涉及人,而人在自动化测试中不存在。

单元测试:状态机的每一条路径

审批状态机是确定性的,非常适合单元测试。核心测试场景:

import pytest
from approval_sm import ApprovalStateMachine, ApprovalState

class MockApprover:
    """模拟审批人——返回 approve / reject / timeout 按需"""
    def __init__(self, response="approve", delay=0):
        self.response = response
        self.delay = delay

    def decide(self, request):
        if self.delay:
            time.sleep(self.delay)
        return self.response

def test_approve_happy_path():
    """正常批准路径"""
    sm = ApprovalStateMachine()
    sm.propose(
        action={"tool": "DELETE_FILE", "params": {"path": "/tmp/test"}},
        deadline_seconds=300
    )
    assert sm.state == ApprovalState.WAITING_APPROVAL

    result = sm.approve("[email protected]")
    assert result is True
    assert sm.state == ApprovalState.APPROVED

    action = sm.resume()
    assert action is not None
    assert sm.state == ApprovalState.EXECUTING

def test_timeout_escalation():
    """超时升级路径"""
    sm = ApprovalStateMachine()
    sm.propose(
        action={"tool": "DROP_TABLE", "params": {"table": "x"}},
        deadline_seconds=-1,  # 立即过期
        escalation=["alice", "bob"]
    )
    # 此时已经过期,调用 approve 应触发过期处理
    sm.state = ApprovalState.WAITING_APPROVAL
    sm.deadline = datetime.now() - timedelta(seconds=1)

    sm.handle_timeout()
    assert sm.state == ApprovalState.WAITING_APPROVAL  # 升级到 bob
    assert sm.current_escalation == 1

def test_idempotency():
    """幂等性:同一操作批准两次只执行一次"""
    sm1 = ApprovalStateMachine()
    sm1.propose(
        action={"tool": "RESTART_SERVICE", "params": {"name": "nginx"}}
    )
    sm1.approve("alice")
    sm1.resume()
    sm1.complete()

    # 第二次“批准”同一操作
    sm2 = ApprovalStateMachine()
    sm2.propose(
        action={"tool": "RESTART_SERVICE", "params": {"name": "nginx"}}
    )
    sm2.approve("alice")
    result = sm2.resume()
    assert result is None  # 幂等性保护:不重复执行

def test_reject_path():
    """拒绝路径"""
    sm = ApprovalStateMachine()
    sm.propose(action={"tool": "SEND_EMAIL", "params": {"to": "[email protected]"}})
    sm.reject("alice", reason="不要群发全员邮件")
    assert sm.state == ApprovalState.REJECTED

def test_modify_path():
    """修改参数后重新提案"""
    sm = ApprovalStateMachine()
    sm.propose(action={"tool": "UPDATE_CONFIG", "params": {"key": "db.host", "value": "0.0.0.0"}})
    sm.modify("alice", new_params={"key": "db.host", "value": "10.0.1.5"})
    assert sm.state == ApprovalState.WAITING_APPROVAL
    # 新参数已覆盖原参数

集成测试:时间和人的行为

单元测试覆盖了状态机的逻辑路径。集成测试需要覆盖时间相关的场景:

混沌测试:审批服务挂了之后

分布式系统的现实是:服务会挂。审批服务的混沌测试场景:

  1. 审批服务在审批中途宕机:Agent 已进入 WAITING_APPROVAL,审批服务进程被 kill。重启后从持久化存储恢复所有审批状态——不应丢失任何请求。
  2. 网络分区:Agent 可以提交审批请求,但审批人的批准回调无法到达 Agent。验证 Agent 在超时后的行为符合 Fail-Closed 策略。
  3. 数据库主从切换:审批状态存储在数据库中,主库宕机触发切换。验证状态一致性和幂等性键在主从切换后仍然有效。

投产 Checklist

在将审批系统部署到生产环境之前,逐项检查:

告警规则

以下指标应该触发告警:

指标告警阈值严重级别
审批队列积压待审批请求 > 10 个WARNING
审批队列积压待审批请求 > 50 个CRITICAL
MTTD(平均决策时间)HIGH_RISK 操作 > 2 小时WARNING
MTTDCRITICAL 操作 > 15 分钟CRITICAL
升级率升级率 > 30%(10 个请求中 3 个触发升级)WARNING
死信队列增长死信队列 > 0 条(非零即告警)INFO → WARNING
审批服务可用性可用性 < 99.9%CRITICAL

关于审批队列监控指标的完整设计,参见 Agent 可观测性。关于安全测试,参见 Agent 安全评估

常见问题(FAQ)

人工审批和工具权限控制(agent-tool-permission-control)是什么关系?

工具权限控制是前置层:它决定一个操作是 ALLOW(直接执行)、DENY(拒绝)还是 ASK_APPROVAL(需要审批)。本文聚焦 ASK_APPROVAL 之后的事——审批请求怎么构造、状态怎么管理、超时怎么处理。两者是上下游关系。

审批状态机必须持久化吗?不持久化会有什么问题?

如果不持久化,Agent 进程重启后所有等待中的审批都会丢失——Agent 不知道自己在等什么,审批人不知道要批什么。生产环境必须持久化:用数据库(PostgreSQL/Redis)或框架内置的 checkpoint(LangGraph SqliteSaver/PostgresSaver)。

怎么避免审批疲劳?每种操作都要审批吗?

不。四级风险分级的核心目的就是减少不必要的审批:AUTO 和 LOW_RISK 操作不需要审批。只有 HIGH_RISK 和 CRITICAL 操作才触发审批。进一步优化:采样审批(只审批 10% 的 HIGH_RISK 操作)、信任累积(Agent 连续 N 次 LOW_RISK 无误后提升权限)。

审批超时后应该自动拒绝还是自动批准?

默认自动拒绝(fail-safe)。CRITICAL 操作绝对不应自动批准。LOW_RISK 操作在特定场景下可配置为超时自动批准(fail-open),但需要记录审计日志并事后通知。策略应该可配置:每个 RiskLevel 有独立的 timeout_action。

多个 Agent 协作时,审批请求由谁发出?

由执行危险操作的 Agent 发出。如果 Agent A 委托 Agent B 执行一个 HIGH_RISK 操作,Agent B 应该发出审批请求。审批请求中包含委托链信息(delegated_by: Agent A),这样审批人能看到完整的上下文。

这套审批流设计能和现有的企业 OA 审批系统(飞书/钉钉/企业微信)集成吗?

可以。ApprovalRequest Schema 是框架无关的数据结构。你可以实现一个 OA 适配器:把 ApprovalRequest 转换成 OA 审批单,OA 审批结果回调更新状态机。需要处理双向同步:OA 审批超时、OA 审批人转交、OA 审批撤回等场景。

本文是 Agent Communication and Protocols 系列的一部分。建议按以下路径继续阅读:

如果你还没有读过任何 Agent 工程文章,建议从 什么是 AI Agent 开始。