Agent 韧性模式:熔断、限流、Bulkhead 与优雅降级设计

30秒要点

  • 核心问题:Agent 在生产环境中调用 LLM API 和工具时,会遇到 Provider 宕机、API 限流、工具故障和资源耗尽四类问题。简单的重试逻辑会放大这些故障——每次重试消耗 Token、施加负载、延长故障时间——而不是修复它们。
  • 四层韧性栈:熔断器(Circuit Breaker)→ 停止调用已故障的依赖;令牌感知限流(Token-Aware Rate Limiter)→ 保护 API 配额不被耗尽;Bulkhead 隔离 → 防止单个任务拖垮全局;优雅降级(Graceful Degradation)→ 核心依赖不可用时保持系统可运行。
  • 关键设计原则:韧性模式必须在 Agent 请求生命周期的固定位置注入——Bulkhead 在入口、Rate Limiter 在调用前、Circuit Breaker 包裹每次调用、Degradation 在事后兜底。这四者形成一条"请求进入 → 资源检查 → 调用保护 → 失败兜底"的完整链路。
  • 读完能做什么:为你的 Agent 系统设计四层韧性架构——在 Python 中实现完整的 AgentCircuitBreakerTokenAwareRateLimiterAgentBulkheadGracefulDegradationEngine,并通过 ResilientAgentRunner 将它们组合为生产级 Agent 运行时的统一保护层。

1. 为什么 Agent 需要超越重试的韧性设计

一个电商客服 Agent 在深夜 2:17 收到一条查询:"我的订单 #38291 什么时候到货?"Agent 的第一步是调用 get_order 工具获取订单详情。get_order 背后连接的是订单微服务的 /api/v1/orders/38291 端点——但这个端点因为数据库连接池耗尽,正在返回 HTTP 503。

Agent 的重试逻辑触发:第 1 次重试(2:17:01),第 2 次重试(2:17:03,退避 2 秒),第 3 次重试(2:17:07,退避 4 秒)。三次重试全部失败。Agent 的 LLM 在每次重试之间仍然在消耗 Token 来"思考"重试策略——"订单服务返回 503,我再试一次……""还是 503,可能只是临时波动……""第三次了,我换个 query 参数……"。到 2:17:10,这个本应在 800 Token 内完成的简单查询,已经烧掉了 4200 Token——而订单服务仍然不可用,用户仍然没有答案。

这并不是 Agent 代码写错了。这是重试逻辑在 Agent 场景下的系统性缺陷——每次重试都会消耗上游 LLM 的推理 Token,而 LLM 的"思考"本身是在消耗同一套已经紧张的资源。更致命的是,重试没有区分故障类型:瞬态网络抖动(值得重试)和下游服务宕机(重试只会加重故障)被同一种逻辑对待。

重试放大方程:1 个 Agent 请求 × N 次重试 × (LLM 思考 Token + 工具调用成本) = 原有成本 × N。如果 100 个并发 Agent 同时遇到同一个下游故障,它们会在 10 秒内对已经崩溃的服务施加 100 × N 次额外请求——这不是在解决问题,这是在制造 DoS。

四层韧性栈:重试之上的防护架构

重试是韧性体系的第一层——它解决"再试一次也许能成"的问题。但 Agent 的生产韧性要求在此基础上叠加四层防护:

  1. 熔断器(Circuit Breaker):当某个依赖(LLM Provider 或工具 API)连续失败达到阈值时,直接停止调用,快速失败。"不要再给已经崩溃的服务发请求了——先等它恢复。"熔断器是重试的外层守卫:内层可以重试,但当熔断器开启时,重试也被跳过。
  2. 令牌感知限流(Token-Aware Rate Limiter):LLM API 的配额模型是 TPM(每分钟 Token 数)而非 RPM(每分钟请求数)。传统限流器按请求计数,在 Agent 场景下完全失效——一个请求可能消耗 100 Token(简单分类),也可能消耗 120,000 Token(长文档分析)。令牌感知限流器让 Agent 在调用前就能预判"我还有没有足够的 Token 预算完成这次调用"。
  3. Bulkhead 隔离:船的水密隔舱。一个 Agent 用户不应该因为另一个用户的大量请求而耗尽全局 Token 配额或线程池。Bulkhead 在三个维度上隔离资源:用户级 Token 预算、工具级线程池、会话级运行时沙箱。
  4. 优雅降级(Graceful Degradation):当以上三层防护全部穿透——Provider 确实宕机了,工具真的不可用了——Agent 不应该崩溃。降级引擎给出一套优先级决策树:切换到备用模型 → 跳过非关键工具 → 返回缓存结果 → 入队异步处理 → 返回部分结果并标注质量。

这四层与重试的关系是分层的:重试在最内层(针对瞬态故障),熔断器包裹重试(停止对持续故障的重试),限流器在调用入口(确保有配额),降级在兜底(所有路径都失败后做什么)。关于重试本身的深度设计(指数退避、抖动、重试预算),参见 Agent 错误恢复——本文聚焦于重试之上的架构级韧性。

Agent 特有的故障模式:为什么微服务韧性模式不能原样照搬

如果你熟悉 Resilience4j、Polly 或 Hystrix,你可能会想:"这些模式我都在微服务里用过了,Agent 有什么不同?"答案是三个根本差异:

Wow Moment:如果你的 Agent 只用了重试而没有熔断器,当一个下游服务宕机时,每一个调用该服务的 Agent 请求都会独立执行 N 次重试。100 个并发 Agent = 100 × N 次额外调用。加上被消耗的 LLM Token 在每次重试中的"思考成本",你的月度 API 费用会在一次 30 分钟的服务中断中烧掉 2-3 天的预算。熔断器不需要很复杂——它只需要在检测到连续失败后回答一个问题:"这个依赖现在不可用,我是否应该停止尝试?"就够了。

2. 熔断器模式:为 Agent 工具调用加上断路器

熔断器(Circuit Breaker)的核心思路来自电气工程:当电路中的电流超过安全阈值时,断路器跳闸,切断电路,防止火灾。在软件中,它保护系统不被故障依赖的持续调用拖垮。

三态状态机

熔断器在三个状态之间转换:

Agent 特有的失败检测:比 HTTP 状态码更深的信号

传统熔断器把"失败"定义为 HTTP 5xx 或超时。Agent 的熔断器需要感知三类失败:

  1. HTTP 错误:5xx(服务器故障)、429(限流)、连接超时。这是第一层,与传统一致。
  2. 语义失败:LLM 返回了 HTTP 200,但响应体中的工具调用是幻觉——例如 "tool_name": "get_odrer"(拼写错误)、"arguments": {"id": "not_an_integer"}(类型错误)、或引用了一个不存在的函数名。语义失败对 Agent 的破坏力不亚于 HTTP 500。
  3. 超时模式:不只是总调用时长,还包括 Token 产出速率。如果 LLM 在 30 秒内只产出了 15 个 Token(正常应为 500+),说明 Provider 可能处于降级状态——熔断器应将此视为失败信号。

每工具 vs 每 Provider 熔断

Agent 有两种粒度的熔断策略:

两者并存:Provider 熔断器保护 LLM 调用路径,工具熔断器保护工具调用路径。一个典型的 Agent 请求会经过至少两个熔断器。

令牌成本感知熔断

一个容易被忽视的信号:当 LLM 单次调用的 Token 消耗远超正常范围时,应该触发熔断。例如,正常情况下一句"查询订单状态"的 LLM 调用消耗 500–800 Token。如果某次调用消耗了 8,000 Token(因为 LLM 陷入了重复思考或尝试了多种策略),这本身就是故障信号——继续重试只会放大成本。

熔断器记录每次调用的 Token 消耗,当单次 Token 超阈值(如 > 5,000 Token 且结果仍然失败)或滚动窗口 Token 浪费(过去 60 秒内因失败浪费的 Token > 预算的 20%)时,提前触发熔断。

代码:AgentCircuitBreaker

from __future__ import annotations

import time
import threading
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional, Any, Dict
import functools
import logging

logger = logging.getLogger("agent.circuit_breaker")


class CircuitState(Enum):
    CLOSED = "closed"        # 正常运行
    OPEN = "open"            # 熔断中,快速失败
    HALF_OPEN = "half_open"  # 探测中


@dataclass
class CircuitBreakerConfig:
    """熔断器配置——所有阈值均可按 Agent 场景调整。"""
    failure_threshold: int = 5          # 连续失败触发熔断的次数
    success_threshold: int = 2          # Half-Open 下需连续成功来闭合的次数
    cooldown_seconds: float = 30.0      # Open 状态冷却时间
    window_seconds: float = 60.0        # 失败计数滑动窗口
    # Agent 特有配置
    token_cost_threshold: int = 5000    # 单次调用 Token 超阈值 → 计为失败
    max_half_open_probes: int = 2       # Half-Open 状态下允许的并发探测数


@dataclass
class CallResult:
    """封装每次调用的结果,包含 Agent 特有的失败信号。"""
    success: bool
    latency_ms: float
    tokens_consumed: int = 0
    semantic_failure: Optional[str] = None  # e.g. "hallucinated tool name"
    http_status: Optional[int] = None
    error_message: str = ""


class CircuitBreakerOpenError(Exception):
    """熔断器断开时抛出,承载状态信息供调用方决策(如触发降级)。"""
    def __init__(self, breaker_name: str, state: CircuitState, failures: int):
        self.breaker_name = breaker_name
        self.state = state
        self.failures = failures
        super().__init__(
            f"Circuit breaker '{breaker_name}' is {state.value} "
            f"(failures: {failures})"
        )


class AgentCircuitBreaker:
    """Agent 场景的熔断器实现。

    支持:
    - 三态状态机 + 滑动窗口
    - HTTP 失败 + 语义失败 + Token 异常三种失败检测
    - 线程安全(可在多工具并发场景下使用)
    - Half-Open 探测并发控制
    """

    def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self._failures: list[float] = []       # 失败时间戳列表(滑动窗口)
        self._successes_in_half_open: int = 0
        self._last_failure_time: float = 0.0
        self._open_since: float = 0.0
        self._half_open_probe_count: int = 0   # 当前探测中的请求数
        self._lock = threading.RLock()
        self.last_result: Optional[CallResult] = None

    # ---- 状态机核心 ----

    def before_call(self) -> None:
        """调用前检查:如果熔断器 Open 且在冷却期,直接拒绝。"""
        with self._lock:
            if self.state == CircuitState.CLOSED:
                return
            if self.state == CircuitState.OPEN:
                elapsed = time.monotonic() - self._open_since
                if elapsed >= self.config.cooldown_seconds:
                    self.state = CircuitState.HALF_OPEN
                    self._successes_in_half_open = 0
                    self._half_open_probe_count = 0
                    logger.info(
                        "CB [%s] transitioning OPEN -> HALF_OPEN", self.name
                    )
                else:
                    raise CircuitBreakerOpenError(
                        self.name, self.state, len(self._failures)
                    )
            if self.state == CircuitState.HALF_OPEN:
                if self._half_open_probe_count >= self.config.max_half_open_probes:
                    raise CircuitBreakerOpenError(
                        self.name, self.state, len(self._failures)
                    )
                self._half_open_probe_count += 1

    def after_call(self, result: CallResult) -> None:
        """调用后更新状态:成功或失败推进状态机。"""
        with self._lock:
            self.last_result = result
            now = time.monotonic()
            cutoff = now - self.config.window_seconds
            self._failures = [t for t in self._failures if t > cutoff]
            if self.state == CircuitState.HALF_OPEN:
                self._half_open_probe_count = max(
                    0, self._half_open_probe_count - 1
                )

            is_failure = (
                not result.success
                or result.semantic_failure is not None
                or result.tokens_consumed > self.config.token_cost_threshold
            )

            if is_failure:
                self._failures.append(now)
                self._last_failure_time = now
                if self.state == CircuitState.HALF_OPEN:
                    self.state = CircuitState.OPEN
                    self._open_since = now
                    self._successes_in_half_open = 0
                    logger.warning(
                        "CB [%s] HALF_OPEN probe FAILED -> OPEN", self.name
                    )
                elif self.state == CircuitState.CLOSED:
                    if len(self._failures) >= self.config.failure_threshold:
                        self.state = CircuitState.OPEN
                        self._open_since = now
                        logger.error(
                            "CB [%s] CLOSED -> OPEN! %d failures",
                            self.name, len(self._failures),
                        )
            else:
                if self.state == CircuitState.HALF_OPEN:
                    self._successes_in_half_open += 1
                    if self._successes_in_half_open >= self.config.success_threshold:
                        self.state = CircuitState.CLOSED
                        self._failures.clear()
                        logger.info(
                            "CB [%s] HALF_OPEN -> CLOSED (%d successes)",
                            self.name, self._successes_in_half_open,
                        )
                elif self.state == CircuitState.CLOSED:
                    self._failures.clear()

    @staticmethod
    def is_semantic_failure(response_data: dict) -> Optional[str]:
        """检测 LLM 工具调用响应中的语义失败。"""
        tool_calls = response_data.get("tool_calls", [])
        if not tool_calls:
            return None
        for tc in tool_calls:
            func_name = tc.get("function", {}).get("name", "")
            if not func_name or len(func_name) < 2:
                return f"Malformed tool name: '{func_name}'"
        return None

    def get_metrics(self) -> dict:
        with self._lock:
            return {
                "name": self.name,
                "state": self.state.value,
                "failures_in_window": len(self._failures),
                "open_since": self._open_since,
                "half_open_probes": self._half_open_probe_count,
            }


# ---- 装饰器 API ----

class circuit_breaker:
    """装饰器:为 Agent 工具函数自动包装熔断器。

    用法:
        breaker = AgentCircuitBreaker("get_order")

        @circuit_breaker(breaker)
        def get_order(order_id: str) -> dict:
            ...
    """

    def __init__(self, breaker: AgentCircuitBreaker):
        self.breaker = breaker

    def __call__(self, func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            self.breaker.before_call()
            start = time.monotonic()
            try:
                result = func(*args, **kwargs)
                elapsed = time.monotonic() - start
                self.breaker.after_call(CallResult(
                    success=True, latency_ms=elapsed * 1000))
                return result
            except Exception as e:
                elapsed = time.monotonic() - start
                self.breaker.after_call(CallResult(
                    success=False, latency_ms=elapsed * 1000,
                    error_message=str(e)))
                raise
        return wrapper


# ---- 使用示例 ----
if __name__ == "__main__":
    breaker = AgentCircuitBreaker("search_docs")

    # 模拟连续失败 -> 熔断触发
    for i in range(6):
        try:
            breaker.before_call()
            raise ConnectionError("Backend unreachable")
        except CircuitBreakerOpenError:
            print(f"[Call {i}] Circuit OPEN -- fast failing")
            break
        except ConnectionError:
            breaker.after_call(CallResult(
                success=False, latency_ms=50,
                error_message="Backend unreachable"))
            print(f"[Call {i}] Failure recorded")

    print("Metrics:", breaker.get_metrics())

设计要点:

MCP(Model Context Protocol)中的工具服务端也可以受益于相同的熔断模式——MCP 服务端的健康状态(连接、超时、响应格式)可以直接映射到熔断器的失败信号。参见 MCP 协议生产指南 了解 MCP 服务端的韧性集成。

3. 令牌感知限流:保护 LLM API 配额不被耗尽

为什么传统请求限流失效

大多数 API 限流器是基于请求数的:每分钟最多 N 个请求。这在 REST API 的世界里工作良好——每个请求的成本大致相当。但 LLM API 的计费模型完全不是这样:

调用场景输入 Token输出 Token总 Token相当于多少个"标准请求"?
简单分类("这是垃圾邮件吗?")50555
客服回复(带上下文)8002001,00018×
代码审查(完整 PR diff)12,0003,00015,000273×
长文档摘要(50 页 PDF)80,0005,00085,0001,545×

如果你按 RPM(请求数/分钟)限流,设 100 RPM,那么 100 次简单分类(5,500 Token)和 100 次长文档摘要(8,500,000 Token)被同等对待——但后者会瞬间耗尽你的 TPM 配额。传统限流器对这种差异完全失明

Token Bucket 改造:从请求桶到 Token 桶

经典的 Token Bucket 算法维护一个"桶",桶中有 N 个 Token,每次请求消耗 1 个 Token,Token 以固定速率补充。Agent 的改造很简单但关键:桶容量 = Provider 的 TPM 上限,每次请求消耗的不再是 1,而是此次调用的预估 Token 数

这意味着:

Provider 限流画像——尤其是 DeepSeek

每个 Provider 的限流模型不同,限流器必须感知这些差异:

响应头动态调整

客户端限流器的状态永远不可能是完全准确的——Provider 端可能有其他客户端在同时消耗配额。因此,限流器必须在每次调用后读取 Provider 的响应头来校准:

客户端 Token 预估:在不知道确数之前就做判断

限流器的核心难题:你需要在调用之前就知道要消耗多少 Token,但你只有调用之后才能知道确数。解决方案是预估 + 校准

代码:TokenAwareRateLimiter 类(支持 asyncio)

from __future__ import annotations

import time
import asyncio
import threading
from dataclasses import dataclass, field
from typing import Optional, Dict
from enum import Enum
import logging

logger = logging.getLogger("agent.rate_limiter")


class Provider(Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    DEEPSEEK = "deepseek"


@dataclass
class ProviderRateProfile:
    """Provider 限流画像——基于各 Provider 的文档和实测。"""
    provider: Provider
    tpm_limit: int                   # Token 数/分钟上限
    rpm_limit: int = 500
    max_concurrency: int = 50        # 最大并发连接数(DeepSeek 关键)
    chars_per_token: float = 3.5     # 中英混合默认值


PROVIDER_PROFILES: Dict[Provider, ProviderRateProfile] = {
    Provider.OPENAI: ProviderRateProfile(
        provider=Provider.OPENAI, tpm_limit=1_000_000, rpm_limit=500,
        chars_per_token=4.0,
    ),
    Provider.ANTHROPIC: ProviderRateProfile(
        provider=Provider.ANTHROPIC, tpm_limit=400_000, rpm_limit=400,
        chars_per_token=4.0,
    ),
    Provider.DEEPSEEK: ProviderRateProfile(
        provider=Provider.DEEPSEEK, tpm_limit=500_000, rpm_limit=300,
        max_concurrency=10,  # DeepSeek 的核心限制!
        chars_per_token=1.8,  # 中文为主
    ),
}


@dataclass
class TokenBudget:
    """Token 桶:容量 = TPM 上限,以 Token/秒 速率补充。"""
    capacity: float
    tokens: float
    refill_rate_per_sec: float
    last_refill: float = field(default_factory=time.monotonic)

    def refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity,
                          self.tokens + elapsed * self.refill_rate_per_sec)
        self.last_refill = now

    def consume(self, amount: float) -> bool:
        self.refill()
        if self.tokens >= amount:
            self.tokens -= amount
            return True
        return False


class TokenAwareRateLimiter:
    """令牌感知限流器。

    特性:Token Bucket 改造、多 Provider 独立桶、响应头动态校准、
    asyncio 异步支持、DeepSeek 并发感知、队列式背压。
    """

    def __init__(self, profiles=None):
        self.profiles = profiles or PROVIDER_PROFILES
        self._budgets: Dict[Provider, TokenBudget] = {}
        self._semaphores: Dict[Provider, asyncio.Semaphore] = {}
        self._lock = threading.RLock()
        self._active_requests: Dict[Provider, int] = {}

        for provider, profile in self.profiles.items():
            self._budgets[provider] = TokenBudget(
                capacity=float(profile.tpm_limit),
                tokens=float(profile.tpm_limit),
                refill_rate_per_sec=profile.tpm_limit / 60.0,
            )
            self._semaphores[provider] = asyncio.Semaphore(
                profile.max_concurrency)
            self._active_requests[provider] = 0

        self._aggregate_tokens_consumed: float = 0.0
        self._aggregate_cap: float = sum(
            p.tpm_limit for p in self.profiles.values())

    def estimate_tokens(self, provider: Provider, messages: list) -> int:
        """预估 Token 消耗——调用前使用。"""
        profile = self.profiles.get(provider)
        if not profile:
            return 1000
        total_chars = 0
        for msg in messages:
            content = msg.get("content", "")
            if isinstance(content, str):
                total_chars += len(content)
            elif isinstance(content, list):
                for part in content:
                    if isinstance(part, dict) and "text" in part:
                        total_chars += len(part["text"])
        return max(1, int(total_chars / profile.chars_per_token) + 500)

    def acquire(self, provider: Provider, estimated_tokens: int,
                timeout_ms: float = 5000) -> bool:
        """同步获取 Token 配额。"""
        budget = self._budgets.get(provider)
        if not budget:
            return False
        deadline = time.monotonic() + timeout_ms / 1000
        while time.monotonic() < deadline:
            with self._lock:
                if budget.consume(float(estimated_tokens)):
                    self._active_requests[provider] += 1
                    return True
            time.sleep(0.05)
        return False

    def release(self, provider: Provider, actual_tokens: int,
                estimated_tokens: int, response_headers=None):
        """调用后释放/校准配额。"""
        budget = self._budgets.get(provider)
        if not budget:
            return
        with self._lock:
            diff = actual_tokens - estimated_tokens
            if diff > 0:
                budget.tokens = max(0, budget.tokens - diff)
            elif diff < 0:
                budget.tokens = min(budget.capacity, budget.tokens - diff)
            self._active_requests[provider] = max(
                0, self._active_requests[provider] - 1)
            if response_headers:
                remaining_hdr = (
                    response_headers.get("x-ratelimit-remaining-tokens")
                    or response_headers.get(
                        "anthropic-ratelimit-tokens-remaining")
                )
                if remaining_hdr is not None:
                    budget.tokens = min(budget.tokens, float(remaining_hdr))
                # Retry-After 头 → 冻结 Token 补充(429 限流保护)
                retry_after = response_headers.get("retry-after")
                if retry_after is not None:
                    freeze_sec = float(retry_after)
                    budget.last_refill = time.monotonic() + freeze_sec
                    logger.info(
                        "Provider %s: freezing token refill for %.1fs (Retry-After)",
                        provider.value, freeze_sec,
                    )
            self._aggregate_tokens_consumed += actual_tokens

    async def aacquire(self, provider: Provider, estimated_tokens: int,
                       timeout_ms: float = 5000) -> bool:
        """异步获取 Token 配额 + 信号量(对 DeepSeek 关键)。"""
        sem = self._semaphores.get(provider)
        if not sem:
            return False
        try:
            await asyncio.wait_for(sem.acquire(),
                                   timeout=timeout_ms / 1000)
        except asyncio.TimeoutError:
            return False
        budget = self._budgets.get(provider)
        if not budget:
            sem.release()
            return False
        loop = asyncio.get_running_loop()
        deadline = loop.time() + timeout_ms / 1000
        while loop.time() < deadline:
            with self._lock:
                if budget.consume(float(estimated_tokens)):
                    self._active_requests[provider] += 1
                    return True
            await asyncio.sleep(0.05)
        sem.release()
        return False

    async def arelease(self, provider: Provider, actual_tokens: int,
                       estimated_tokens: int, response_headers=None):
        """异步释放配额。"""
        self.release(provider, actual_tokens, estimated_tokens,
                     response_headers)
        sem = self._semaphores.get(provider)
        if sem:
            sem.release()

    def is_near_limit(self, provider: Provider,
                      threshold_pct: float = 0.85) -> bool:
        """是否接近限额(用于触发背压)。"""
        budget = self._budgets.get(provider)
        if not budget:
            return False
        budget.refill()
        ratio = (budget.tokens / budget.capacity
                 if budget.capacity > 0 else 1.0)
        return ratio < (1.0 - threshold_pct)

    @property
    def aggregate_consumption_pct(self) -> float:
        return (self._aggregate_tokens_consumed / self._aggregate_cap
                if self._aggregate_cap > 0 else 0.0)

    def get_metrics(self) -> dict:
        metrics = {}
        for provider, budget in self._budgets.items():
            budget.refill()
            metrics[provider.value] = {
                "tokens_remaining": budget.tokens,
                "capacity": budget.capacity,
                "usage_pct": round(
                    (1 - budget.tokens / budget.capacity) * 100, 1
                ) if budget.capacity > 0 else 0,
                "active_requests": self._active_requests.get(provider, 0),
            }
        metrics["aggregate"] = {
            "tokens_consumed": self._aggregate_tokens_consumed,
            "total_cap": self._aggregate_cap,
            "usage_pct": round(self.aggregate_consumption_pct * 100, 1),
        }
        return metrics


# ---- 使用示例 ----
if __name__ == "__main__":
    deepseek_profile = ProviderRateProfile(
        provider=Provider.DEEPSEEK,
        tpm_limit=500_000, rpm_limit=300,
        max_concurrency=10, chars_per_token=1.8,
    )
    limiter = TokenAwareRateLimiter({
        Provider.DEEPSEEK: deepseek_profile,
        Provider.OPENAI: PROVIDER_PROFILES[Provider.OPENAI],
    })
    messages = [{"role": "user",
                 "content": "帮我总结这份 10 页的合同。"}]
    estimated = limiter.estimate_tokens(Provider.DEEPSEEK, messages)
    print(f"Estimated tokens: {estimated}")
    if limiter.acquire(Provider.DEEPSEEK, estimated, timeout_ms=5000):
        print("Token acquired -- proceed with API call")
        actual_tokens = 1200
        headers = {"x-ratelimit-remaining-tokens": "498000"}
        limiter.release(Provider.DEEPSEEK, actual_tokens, estimated,
                        headers)
    else:
        print("Rate limited -- must wait or use fallback")
    print("Metrics:", limiter.get_metrics())

设计要点:

4. Bulkhead 隔离模式:防止 Agent 故障扩散

熔断器保护你免受故障依赖的伤害。限流器保护你免于耗尽配额。但还有一种失败模式:系统内部的一个组件消耗了所有共享资源,导致其他组件无法运行。这就是 Bulkhead(隔舱)模式要解决的问题。

类比:船体被分隔成多个水密隔舱。一个隔舱进水,船仍然能浮。软件的 Bulkhead 同理:将系统资源(线程、连接、内存、Token 配额)划分到隔离的池中,一个池的耗尽不影响其他池。

Agent 的三维隔离

Agent 系统需要三层 Bulkhead 隔离:

  1. 用户级 Token 预算(User-Level Budget):每个用户/租户有独立的 Token 消费上限(如每分钟 50,000 Token)。当一个用户触发大量 Agent 任务时,其他用户的配额不受影响。硬限制为拒绝调用,软警告为 80% 阈值。
  2. 工具级线程池(Per-Tool ThreadPool):每类工具(searchdatabase_querysend_email)分配独立的线程池。一个慢速的数据库查询不会阻塞所有邮件发送——两个工具在线程层面隔离。
  3. 会话级沙箱(Per-Session Sandbox):每个 Agent 会话拥有独立的临时目录、内存上限和文件描述符配额。一个会话创建了 10,000 个临时文件不会耗尽其他会话的文件系统资源。

Bulkhead vs Rate Limiting:关键区别

很多人混淆这两个模式。区别是清晰的:

二者可以协作:Rate Limiter 在 Bulkhead 入口处确保请求符合速率限制,Bulkhead 在分配的资源池内执行请求。

代码:AgentBulkhead

from __future__ import annotations

import time
import threading
from dataclasses import dataclass, field
from typing import Dict, Optional
from concurrent.futures import ThreadPoolExecutor
import logging

logger = logging.getLogger("agent.bulkhead")


class BulkheadCapacityExceeded(Exception):
    """Bulkhead 资源耗尽异常。"""
    def __init__(self, bulkhead_name: str, limit_type: str,
                 current: float, limit: float):
        self.bulkhead_name = bulkhead_name
        self.limit_type = limit_type
        self.current = current
        self.limit = limit
        super().__init__(
            f"Bulkhead '{bulkhead_name}' {limit_type} exceeded: "
            f"{current}/{limit}"
        )


@dataclass
class UserTokenBudget:
    """用户级 Token 预算控制器。"""
    user_id: str
    token_limit: int              # 硬限制
    warning_threshold_pct: float = 0.80  # 80% 软警告
    window_seconds: float = 60.0
    tokens_used: float = 0.0
    last_reset: float = field(default_factory=time.monotonic)

    def _maybe_reset(self) -> None:
        now = time.monotonic()
        if now - self.last_reset >= self.window_seconds:
            self.tokens_used = 0.0
            self.last_reset = now

    def try_consume(self, tokens: int) -> bool:
        self._maybe_reset()
        if self.tokens_used + tokens > self.token_limit:
            return False
        self.tokens_used += tokens
        return True

    @property
    def usage_pct(self) -> float:
        self._maybe_reset()
        return (self.tokens_used / self.token_limit
                if self.token_limit > 0 else 0.0)

    @property
    def is_near_limit(self) -> bool:
        return self.usage_pct >= self.warning_threshold_pct


class AgentBulkhead:
    """Agent Bulkhead 隔离器。

    三维隔离:
    1. 用户级 Token 预算 —— 防止单一用户耗尽全局配额
    2. 工具级线程池 —— 每类工具独立池,防止慢工具阻塞快工具
    3. 会话级资源配额 —— 文件描述符/内存上限
    """

    def __init__(
        self,
        default_token_limit: int = 100_000,
        max_file_descriptors_per_session: int = 100,
    ):
        self.default_token_limit = default_token_limit
        self.max_fds_per_session = max_file_descriptors_per_session
        self._user_budgets: Dict[str, UserTokenBudget] = {}
        self._budget_lock = threading.RLock()
        self._tool_pools: Dict[str, ThreadPoolExecutor] = {}
        self._pool_lock = threading.RLock()
        self._session_resources: Dict[str, dict] = {}
        self._session_lock = threading.RLock()

    # ---- 1. 用户级 Token 预算 ----

    def check_user_budget(self, user_id: str,
                          estimated_tokens: int) -> bool:
        """检查用户 Token 预算。返回 False = 拒绝。"""
        with self._budget_lock:
            if user_id not in self._user_budgets:
                self._user_budgets[user_id] = UserTokenBudget(
                    user_id=user_id,
                    token_limit=self.default_token_limit,
                )
            budget = self._user_budgets[user_id]

            if budget.is_near_limit:
                logger.warning(
                    "User %s approaching token limit: %.1f%%",
                    user_id, budget.usage_pct * 100,
                )

            if not budget.try_consume(estimated_tokens):
                raise BulkheadCapacityExceeded(
                    f"user_budget:{user_id}", "token_budget",
                    budget.tokens_used, budget.token_limit,
                )
        return True

    def get_user_usage(self, user_id: str) -> dict:
        with self._budget_lock:
            budget = self._user_budgets.get(user_id)
            if not budget:
                return {"user_id": user_id, "usage_pct": 0.0}
            return {
                "user_id": user_id,
                "usage_pct": round(budget.usage_pct * 100, 1),
                "tokens_used": budget.tokens_used,
                "token_limit": budget.token_limit,
                "near_limit": budget.is_near_limit,
            }

    # ---- 2. 工具级线程池隔离 ----

    def get_tool_executor(
        self, tool_name: str, max_workers: int = 8, queue_depth: int = 32
    ) -> ThreadPoolExecutor:
        """获取或创建工具专用线程池。"""
        with self._pool_lock:
            if tool_name not in self._tool_pools:
                self._tool_pools[tool_name] = ThreadPoolExecutor(
                    max_workers=max_workers,
                    thread_name_prefix=f"agent-tool-{tool_name}",
                )
                logger.info(
                    "Created ThreadPool for tool '%s': workers=%d",
                    tool_name, max_workers,
                )
            return self._tool_pools[tool_name]

    def tool_pool_metrics(self) -> dict:
        with self._pool_lock:
            return {
                name: {
                    "max_workers": pool._max_workers,
                }
                for name, pool in self._tool_pools.items()
            }

    # ---- 3. 会话级资源追踪 ----

    def register_session(self, session_id: str) -> None:
        with self._session_lock:
            self._session_resources[session_id] = {
                "files_open": 0,
                "created_at": time.monotonic(),
            }

    def check_session_resource(self, session_id: str,
                               resource_type: str) -> bool:
        """检查会话资源是否超限。"""
        with self._session_lock:
            if session_id not in self._session_resources:
                self._session_resources[session_id] = {
                    "files_open": 0,
                    "created_at": time.monotonic(),
                }
            resources = self._session_resources[session_id]

        if resource_type == "file_descriptor":
            if resources.get("files_open", 0) >= self.max_fds_per_session:
                raise BulkheadCapacityExceeded(
                    f"session:{session_id}", "file_descriptors",
                    resources["files_open"], self.max_fds_per_session,
                )
            with self._session_lock:
                self._session_resources[session_id]["files_open"] += 1
            return True
        return True

    def release_session_resource(self, session_id: str,
                                 resource_type: str) -> None:
        with self._session_lock:
            if session_id in self._session_resources:
                if resource_type == "file_descriptor":
                    self._session_resources[session_id]["files_open"] = max(
                        0, self._session_resources[session_id].get(
                            "files_open", 1) - 1
                    )

    def cleanup_session(self, session_id: str) -> None:
        with self._session_lock:
            self._session_resources.pop(session_id, None)

    # ---- 可观测性 ----

    def get_metrics(self) -> dict:
        return {
            "user_budgets": {
                uid: self.get_user_usage(uid)
                for uid in list(self._user_budgets.keys())[:20]
            },
            "tool_pools": self.tool_pool_metrics(),
            "active_sessions": len(self._session_resources),
        }


# ---- 使用示例 ----
if __name__ == "__main__":
    bulkhead = AgentBulkhead(default_token_limit=50_000)

    # 用户级预算检查
    try:
        bulkhead.check_user_budget("user-42", estimated_tokens=3000)
        print("[User-42] Budget OK")
        bulkhead.check_user_budget("user-42", estimated_tokens=48000)
        print("[User-42] Budget near limit -- soft warning triggered")
    except BulkheadCapacityExceeded as e:
        print(f"[User-42] Budget exceeded: {e}")

    # 工具级线程池
    search_pool = bulkhead.get_tool_executor("search_docs", max_workers=4)
    db_pool = bulkhead.get_tool_executor("database_query", max_workers=8)
    print("Tool pools:", bulkhead.tool_pool_metrics())

    # 会话级资源
    bulkhead.register_session("sess-abc")
    bulkhead.check_session_resource("sess-abc", "file_descriptor")
    print("Session resource check passed")

    print("Bulkhead metrics:", bulkhead.get_metrics())

设计要点:

5. 优雅降级:当核心依赖不可用时的生存策略

熔断器 Open 了,限流器拒绝了,Bulkhead 接近饱和——Agent 接下来做什么?答案不能是"崩溃"。优雅降级是韧性栈的最后一道防线:当依赖不可用时,Agent 以降低质量但不丧失功能的方式继续运行。

六级降级决策树

降级策略按优先级排序——Agent 从最佳选项开始尝试,逐级后退:

  1. 模型回退(Model Fallback):主模型不可用(熔断/限流)→ 切换到备用模型。例如 GPT-5.5 → GPT-5.4-mini,Claude Opus → Claude Haiku,或跨 Provider 回退(Claude → DeepSeek)。备用模型可能质量略低,但任务可以继续。
  2. 工具跳过(Tool Skip):某个非关键工具不可用 → 跳过它,用剩余工具完成任务。例如 get_order 挂了,但 lookup_shipping 还能用——从运输信息推断订单状态。在结果中标注"部分信息"。
  3. 缓存返回(Cache Return):相同或相似的查询在近期被成功回答过 → 返回缓存结果,附上 stale: true 标记。例如"我的订单什么时候到货?"在 10 分钟前刚有人问过——缓存时效性足够,直接返回。
  4. 异步延迟(Async Defer):同步无法完成 → 将任务入队,后台异步处理,向用户返回"正在处理中,预计 X 分钟内完成"。
  5. 部分结果(Partial Result):返回当前已有的信息,附上降级元数据:缺少哪些组件、为什么缺少、对结果质量的预估影响。
  6. 优雅失败(Graceful Failure):所有路径都不通 → 返回清晰的错误指引,而非堆栈追踪。例如:"目前订单查询系统正在维护中(预计 30 分钟内恢复)。您可以:1) 拨打客服电话 400-xxx;2) 访问 self-service 门户查看订单状态。"

降级元数据:让调用方知道"结果不完整"

降级不是一个布尔值——它是一个结构化的描述,告诉上游系统(或最终用户)结果的质量状态。每个降级响应都应携带:

熔断器联动

降级引擎与熔断器深度集成:当某个工具的熔断器处于 Open 状态时,降级引擎直接跳过该工具(执行工具跳过策略),而不是等到调用失败后再反应。这意味着降级可以在请求规划阶段就做出决策——节省了不必要的等待和 Token 消耗。

代码:GracefulDegradationEngine

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Any, Dict, List
import logging
import hashlib

logger = logging.getLogger("agent.degradation")


class DegradationLevel(Enum):
    NONE = "none"           # 完整结果
    PARTIAL = "partial"     # 部分组件不可用
    FALLBACK = "fallback"   # 使用了备用模型/策略
    DEFERRED = "deferred"   # 转入异步处理
    FAILED = "failed"       # 优雅失败


@dataclass
class DegradationMetadata:
    """降级响应的元数据,供上游系统判断结果质量。"""
    level: DegradationLevel = DegradationLevel.NONE
    missing_components: List[str] = field(default_factory=list)
    quality_estimate: float = 1.0          # 0.0–1.0
    fallback_chain: List[str] = field(default_factory=list)
    user_message: Optional[str] = None


class DegradationStrategy:
    """降级策略的抽象基类。"""
    def try_execute(self, context: dict) -> Optional[Any]:
        raise NotImplementedError


class ModelFallbackStrategy(DegradationStrategy):
    """模型回退:按优先级尝试备用模型。"""
    def __init__(self, models: List[str]):
        self.models = models

    def try_execute(self, context: dict) -> Optional[Any]:
        current_model = context.get("current_model")
        for model in self.models:
            if model != current_model:
                logger.info("Degradation: trying fallback model %s", model)
                return {"model": model, "result": "fallback response"}
        return None


class SkipToolStrategy(DegradationStrategy):
    """工具跳过:允许跳过指定的非关键工具。"""
    def __init__(self, optional_tools: List[str]):
        self.optional_tools = set(optional_tools)

    def try_execute(self, context: dict) -> Optional[Any]:
        failed_tool = context.get("failed_tool")
        if failed_tool and failed_tool in self.optional_tools:
            logger.info("Degradation: skipping optional tool %s", failed_tool)
            return {"skipped_tool": failed_tool,
                    "partial_results": context.get("partial_results", {})}
        return None


class CacheReturnStrategy(DegradationStrategy):
    """缓存返回:查找相似查询的缓存结果。"""
    def __init__(self, cache: Optional[dict] = None):
        self._cache = cache or {}

    def _query_fingerprint(self, messages: list) -> str:
        content = "".join(m.get("content", "")[:500] for m in messages)
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def try_execute(self, context: dict) -> Optional[Any]:
        messages = context.get("messages", [])
        fp = self._query_fingerprint(messages)
        cached = self._cache.get(fp)
        if cached is not None:
            logger.info("Degradation: returning cached result (fp=%s)", fp)
            return {"cache_hit": True, "stale": True, "result": cached}
        return None

    def store(self, messages: list, result: Any) -> None:
        fp = self._query_fingerprint(messages)
        self._cache[fp] = result


class AsyncDeferStrategy(DegradationStrategy):
    """异步延迟:将任务入队,返回处理确认。"""
    def __init__(self, queue: Optional[Any] = None):
        self._queue = queue or []

    def try_execute(self, context: dict) -> Optional[Any]:
        task = {
            "messages": context.get("messages"),
            "metadata": context.get("metadata", {}),
        }
        self._queue.append(task)
        logger.info("Degradation: task deferred to async queue (depth=%d)",
                     len(self._queue))
        return {
            "deferred": True,
            "queue_position": len(self._queue),
            "estimated_wait_seconds": len(self._queue) * 15,
        }


class GracefulFailureStrategy(DegradationStrategy):
    """优雅失败:返回可执行的错误指引。"""
    def __init__(self, fallback_message: str = ""):
        self.fallback_message = fallback_message

    def try_execute(self, context: dict) -> Optional[Any]:
        msg = self.fallback_message or (
            "当前服务暂时不可用。请稍后重试,"
            "或通过 self-service 门户获取帮助。"
        )
        return {
            "error": "graceful_failure",
            "user_message": msg,
            "support_contact": "[email protected]",
        }


class GracefulDegradationEngine:
    """优雅降级引擎。

    按优先级依次尝试降级策略:
    model_fallback -> tool_skip -> cache_return ->
    async_defer -> partial_result -> graceful_failure

    与熔断器集成:工具熔断 → 自动触发 tool_skip。
    """

    def __init__(self):
        self._strategies: List[tuple] = []
        self._circuit_breakers: Dict[str, Any] = {}
        self._fallback_chain: List[str] = []

    def register_model_fallback(self, models: List[str]):
        self._strategies.append(
            ("model_fallback", ModelFallbackStrategy(models)))

    def register_optional_tools(self, tool_names: List[str]):
        self._strategies.append(
            ("tool_skip", SkipToolStrategy(tool_names)))

    def register_cache(self, cache=None):
        self._strategies.append(
            ("cache_return", CacheReturnStrategy(cache)))

    def register_async_queue(self, queue=None):
        self._strategies.append(
            ("async_defer", AsyncDeferStrategy(queue)))

    def register_graceful_failure(self, message: str = ""):
        self._strategies.append(
            ("graceful_failure", GracefulFailureStrategy(message)))

    def link_circuit_breakers(self, breakers: Dict[str, Any]) -> None:
        """关联熔断器:工具熔断时,降级引擎自动跳过该工具。"""
        self._circuit_breakers = breakers

    def execute(self, context: dict) -> tuple:
        """执行降级链,返回 (结果, 降级元数据)。"""
        meta = DegradationMetadata()
        self._fallback_chain = []

        for strategy_name, strategy in self._strategies:
            self._fallback_chain.append(strategy_name)
            try:
                result = strategy.try_execute(context)
                if result is not None:
                    meta.fallback_chain = self._fallback_chain

                    if strategy_name == "model_fallback":
                        meta.level = DegradationLevel.FALLBACK
                        meta.quality_estimate = 0.85
                    elif strategy_name == "tool_skip":
                        meta.level = DegradationLevel.PARTIAL
                        meta.missing_components.append(
                            context.get("failed_tool", "unknown"))
                        meta.quality_estimate = 0.75
                    elif strategy_name == "cache_return":
                        meta.level = DegradationLevel.PARTIAL
                        meta.quality_estimate = 0.70
                    elif strategy_name == "async_defer":
                        meta.level = DegradationLevel.DEFERRED
                        meta.quality_estimate = 0.60
                    elif strategy_name == "graceful_failure":
                        meta.level = DegradationLevel.FAILED
                        meta.quality_estimate = 0.0
                        meta.user_message = result.get("user_message", "")

                    logger.info(
                        "Degradation chain succeeded at '%s' (level=%s)",
                        strategy_name, meta.level.value)
                    return result, meta
            except Exception as e:
                logger.warning("Degradation strategy '%s' failed: %s",
                               strategy_name, e)
                continue

        # 所有策略都失败了
        meta.level = DegradationLevel.FAILED
        meta.quality_estimate = 0.0
        meta.user_message = "所有降级策略均已失败。请联系支持团队。"
        meta.fallback_chain = self._fallback_chain
        return {"error": "total_degradation_failure"}, meta

    def should_skip_tool(self, tool_name: str) -> bool:
        """检查工具是否因熔断器开启而应被跳过。"""
        breaker = self._circuit_breakers.get(tool_name)
        if breaker and hasattr(breaker, "state"):
            cb_state = breaker.state
            if hasattr(cb_state, "value") and cb_state.value == "open":
                return True
        return False

    def get_metrics(self) -> dict:
        return {
            "strategies_registered": len(self._strategies),
            "circuit_breakers_linked": len(self._circuit_breakers),
            "last_fallback_chain": self._fallback_chain,
        }


# ---- 使用示例 ----
if __name__ == "__main__":
    engine = GracefulDegradationEngine()

    engine.register_model_fallback(
        models=["claude-opus", "claude-haiku",
                "deepseek-chat", "gpt-5.4-mini"])
    engine.register_optional_tools(
        ["send_notification", "log_analytics", "enrich_profile"])
    engine.register_cache({})
    engine.register_async_queue([])
    engine.register_graceful_failure(
        "当前服务暂时不可用。请稍后重试或联系 [email protected]。")

    context = {
        "current_model": "claude-opus",
        "failed_tool": "enrich_profile",
        "messages": [{"role": "user", "content": "查询我的订单状态"}],
        "partial_results": {"order_id": "38291", "status_hint": "shipping"},
        "metadata": {},
    }

    result, meta = engine.execute(context)
    print(f"Result: {result}")
    print(f"Degradation: level={meta.level.value}, "
          f"quality={meta.quality_estimate}")
    print(f"Fallback chain: {meta.fallback_chain}")
    print(f"Missing: {meta.missing_components}")

设计要点:

6. 模式组合:构建生产级 Agent 韧性架构

前面的 2–5 节展示了四个独立的韧性模式。但在生产环境中,它们不是独立使用的——一个 Agent 请求同时经过所有四层保护。这一节展示它们如何协作。

完整请求生命周期(7 层)

一个 Agent 请求从进入到返回,经过七个韧性检查点:

  1. Bulkhead 入口检查:用户的 Token 预算是否有余量?请求是否放入正确工具线程池?会话资源(文件描述符)是否在限制内?
  2. Rate Limiter Token 预算检查:目标 Provider 的 TPM 配额是否足够容纳本次调用的预估 Token?需要等待多久?
  3. LLM 调用(Circuit Breaker 包装):Provider 熔断器是否 Closed?调用成功?调用后进行 Token 校准和熔断状态更新。
  4. LLM 失败 → Degradation 引擎决策:Provider 不可用?选择模型回退(fallback chain)。熔断器 Open?降级引擎跳过对故障工具的计划。
  5. 工具调用前 → Circuit Breaker 检查:目标工具熔断器是否 Closed?Token 成本是否正常?
  6. 工具调用失败 → Degradation 引擎决策:非关键工具 → 跳过。关键工具不可跳 → 尝试缓存 → 异步延迟 → 部分结果。
  7. 响应出口 → Bulkhead 释放:Release 用户预算、线程池资源、会话资源。

模式交互矩阵

模式 A模式 B交互关系潜在冲突
Circuit BreakerRate Limiter 互补。CB 停止无效调用,RL 保护配额。CB Open → RL 无需再检查。 如果 RL 极度保守而 CB 未触发,可能导致过度限流。
Circuit BreakerBulkhead 互补。CB 面向外部依赖,Bulkhead 面向内部资源。 低冲突。
Circuit BreakerDegradation 深度集成。CB Open → Degradation 自动跳过对应工具。 Degradation fallback 也可能触发 CB(循环降级),需设置最大降级深度。
Rate LimiterBulkhead 互补。RL 是流控(入口),Bulkhead 是资源隔离(执行层)。 Bulkhead 的用户预算与 RL 的 Token 桶可能重复计算,需明确职责边界。
Rate LimiterDegradation RL 拒绝 → Degradation 尝试备用 Provider(不同配额池)。 所有 Provider 都限流时,降级链走到 graceful failure。
BulkheadDegradation Bulkhead 资源耗尽 → Degradation 执行 async_defer 或 graceful failure。 低冲突。

配置默认值

以下是从生产经验中总结的合理起步阈值——这些值不是"正确答案",而是"合理起点",需要根据实际流量和错误模式调整:

配置项默认值说明
CB 失败阈值5 次 / 60 秒高频 Agent 可下调至 3;低频可上调至 10
CB 冷却时间30 秒工具 API 通常在此时间内可恢复;LLM Provider 建议 60 秒
RL Token 桶容量Provider TPM 上限从 Provider 的 Tier 信息获取
RL 令牌预估算因子中英文混合 3.5 字符/Token建议集成 tiktoken(OpenAI)或 Anthropic tokenizer
Bulkhead 用户预算100,000 Token/分钟/用户取决于用户基数和全局配额
Bulkhead 工具线程池4–8 workers/工具I/O 密集 → 偏高;CPU 密集 → 偏低
降级最大深度6 级防止降级循环(A→B→C→A)

代码:ResilientAgentRunner

这是全文的核心代码——将所有模式组合为一个可运行的 Agent 管道:

from __future__ import annotations

import time
import logging
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field

logger = logging.getLogger("agent.resilient_runner")


@dataclass
class CallResult:
    """封装每次调用的结果(ResilientAgentRunner 内联版本)。"""
    success: bool = False
    latency_ms: float = 0.0
    tokens_consumed: int = 0
    semantic_failure: Optional[str] = None


@dataclass
class ResilientAgentConfig:
    """韧性 Agent 的配置——所有模式阈值集中管理。"""
    cb_failure_threshold: int = 5
    cb_cooldown_seconds: float = 30.0
    rl_acquire_timeout_ms: float = 5000
    bulkhead_user_token_limit: int = 100_000
    bulkhead_tool_workers: int = 6
    fallback_models: List[str] = field(default_factory=lambda: [
        "claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"
    ])
    optional_tools: List[str] = field(default_factory=lambda: [
        "send_notification", "log_analytics"
    ])
    max_degradation_depth: int = 6


@dataclass
class ResilientRunResult:
    """韧性 Agent 的完整运行结果。"""
    success: bool
    result: Any
    circuit_breaker_events: List[dict] = field(default_factory=list)
    rate_limiter_metrics: dict = field(default_factory=dict)
    bulkhead_metrics: dict = field(default_factory=dict)
    degradation_metadata: Optional[Any] = None
    total_latency_ms: float = 0.0
    tokens_consumed: int = 0
    resilience_events: List[str] = field(default_factory=list)


class ResilientAgentRunner:
    """生产级 Agent 韧性运行器。

    将四大模式编织为 7 层请求生命周期。
    """

    def __init__(self, circuit_breakers, rate_limiter, bulkhead,
                 degradation_engine, config=None):
        self.circuit_breakers = circuit_breakers
        self.rate_limiter = rate_limiter
        self.bulkhead = bulkhead
        self.degradation_engine = degradation_engine
        self.config = config or ResilientAgentConfig()
        self.degradation_engine.link_circuit_breakers(circuit_breakers)

    def run(self, user_id, session_id, provider, messages,
            tools_to_call) -> ResilientRunResult:
        """运行一个完整的、带韧性保护的 Agent 请求。"""
        start_time = time.monotonic()
        result = ResilientRunResult(success=False, result=None)
        estimated_tokens = 0

        # LAYER 1: Bulkhead 入口检查
        try:
            estimated_tokens = self.rate_limiter.estimate_tokens(
                provider, messages)
            self.bulkhead.check_user_budget(user_id, estimated_tokens)
            self.bulkhead.register_session(session_id)
            self.bulkhead.check_session_resource(
                session_id, "file_descriptor")
            result.resilience_events.append("bulkhead:entry_passed")
        except Exception as e:
            result.resilience_events.append(f"bulkhead:entry_failed:{e}")
            return result

        # LAYER 2: Rate Limiter Token 预算检查
        token_acquired = self.rate_limiter.acquire(
            provider, estimated_tokens,
            timeout_ms=self.config.rl_acquire_timeout_ms)
        if not token_acquired:
            result.resilience_events.append("rate_limiter:rejected")
            ctx = {"messages": messages, "error": "rate_limited"}
            deg_result, deg_meta = self.degradation_engine.execute(ctx)
            result.result = deg_result
            result.degradation_metadata = deg_meta
            return result
        result.resilience_events.append("rate_limiter:acquired")

        # LAYER 3: LLM 调用(Circuit Breaker 包装)
        provider_cb = self.circuit_breakers.get(
            f"provider:{provider.value}")
        llm_response = None

        if provider_cb:
            try:
                provider_cb.before_call()
                # === 实际 LLM 调用(生产环境替换为真实调用)===
                llm_response = {
                    "content": "The order #38291 is in transit.",
                    "tool_calls": [
                        {"function": {"name": "get_order",
                         "arguments": '{"order_id": "38291"}'}},
                    ],
                    "usage": {"total_tokens": 1200},
                }
                # ================================================
                actual_tokens = llm_response.get("usage", {}).get(
                    "total_tokens", estimated_tokens)
                result.tokens_consumed += actual_tokens
                self.rate_limiter.release(
                    provider, actual_tokens, estimated_tokens)

                # 创建 CallResult 用于熔断器更新
                cr = CallResult(
                    success=True, latency_ms=100,
                    tokens_consumed=actual_tokens)
                provider_cb.after_call(cr)
                result.resilience_events.append(
                    "circuit_breaker:llm_passed")
            except Exception as e:
                result.resilience_events.append(
                    f"circuit_breaker:llm_failed:{e}")
                ctx = {"messages": messages,
                       "current_model": provider.value,
                       "error": str(e)}
                deg_result, deg_meta = self.degradation_engine.execute(ctx)
                result.result = deg_result
                result.degradation_metadata = deg_meta
                return result

        # LAYER 5-6: 工具调用前 CB 检查 + 降级
        tool_results = {}
        skipped_tools = []
        # 防止 provider_cb 为 None 时 llm_response 也为 None
        if llm_response is None:
            llm_response = {}
        tool_calls = llm_response.get("tool_calls", [])

        for tc in tool_calls:
            tool_name = tc.get("function", {}).get("name", "")
            if not tool_name:
                continue
            if self.degradation_engine.should_skip_tool(tool_name):
                result.resilience_events.append(
                    f"degradation:skipping_tool:{tool_name}")
                skipped_tools.append(tool_name)
                continue

            tool_cb = self.circuit_breakers.get(f"tool:{tool_name}")
            if tool_cb:
                try:
                    tool_cb.before_call()
                    tool_results[tool_name] = {
                        "status": "success",
                        "data": f"Result from {tool_name}"}
                    cr = CallResult(
                        success=True, latency_ms=50,
                        tokens_consumed=0)
                    tool_cb.after_call(cr)
                    result.resilience_events.append(
                        f"circuit_breaker:tool_passed:{tool_name}")
                except Exception as e:
                    result.resilience_events.append(
                        f"circuit_breaker:tool_failed:{tool_name}")
                    ctx = {"failed_tool": tool_name,
                           "partial_results": tool_results,
                           "messages": messages}
                    deg_result, deg_meta = (
                        self.degradation_engine.execute(ctx))
                    if deg_meta.level.value != "failed":
                        tool_results[tool_name] = {
                            "degraded": True, "reason": str(e)}
                        skipped_tools.append(tool_name)
                    result.degradation_metadata = deg_meta

        # LAYER 7: 响应出口 → Bulkhead 释放
        self.bulkhead.release_session_resource(
            session_id, "file_descriptor")
        result.resilience_events.append("bulkhead:exit_released")

        result.success = True
        result.result = {
            "content": llm_response.get("content"),
            "tool_results": tool_results,
            "skipped_tools": skipped_tools,
            "resilience": {"events": result.resilience_events},
        }
        result.total_latency_ms = (
            time.monotonic() - start_time) * 1000
        result.bulkhead_metrics = self.bulkhead.get_metrics()
        return result

    def get_metrics(self) -> dict:
        """聚合所有韧性层的指标——供可观测性系统消费。"""
        return {
            "circuit_breakers": {
                name: cb.get_metrics()
                for name, cb in self.circuit_breakers.items()
                if hasattr(cb, "get_metrics")
            },
            "rate_limiter": self.rate_limiter.get_metrics(),
            "bulkhead": self.bulkhead.get_metrics(),
            "degradation": self.degradation_engine.get_metrics(),
        }


# ---- 使用示例 ----
if __name__ == "__main__":
    # 在实际项目中,这些类从前面各节定义的模块导入:
    # from circuit_breaker import AgentCircuitBreaker
    # from rate_limiter import TokenAwareRateLimiter, Provider
    # from bulkhead import AgentBulkhead
    # from degradation import GracefulDegradationEngine

    # ---- 初始化四大模式 ----
    # 1. Circuit Breakers(Provider + Tool 级别)
    provider_cb = AgentCircuitBreaker("provider:deepseek")
    tool_cb_get_order = AgentCircuitBreaker("tool:get_order")
    tool_cb_lookup = AgentCircuitBreaker("tool:lookup_shipping")
    circuit_breakers = {
        "provider:deepseek": provider_cb,
        "tool:get_order": tool_cb_get_order,
        "tool:lookup_shipping": tool_cb_lookup,
    }

    # 2. Rate Limiter
    rl = TokenAwareRateLimiter()

    # 3. Bulkhead
    bulkhead = AgentBulkhead(default_token_limit=100_000)

    # 4. Degradation Engine
    degradation = GracefulDegradationEngine()
    degradation.register_model_fallback([
        "claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"])
    degradation.register_optional_tools(
        ["send_notification", "log_analytics"])
    degradation.register_cache({})
    degradation.register_async_queue([])
    degradation.register_graceful_failure(
        "当前服务暂时不可用。请稍后重试或联系 [email protected]。")

    # ---- 创建 Runner 并执行 ----
    runner = ResilientAgentRunner(
        circuit_breakers, rl, bulkhead, degradation)

    messages = [{"role": "user", "content": "查询订单 #38291 的物流状态"}]
    result = runner.run(
        user_id="user-42",
        session_id="sess-abc",
        provider=Provider.DEEPSEEK,
        messages=messages,
        tools_to_call=["get_order", "lookup_shipping"],
    )

    print("=" * 50)
    print(f"Success: {result.success}")
    print(f"Content: {result.result['content']}")
    print(f"Tool results: {result.result['tool_results']}")
    print(f"Skipped tools: {result.result['skipped_tools']}")
    print(f"Total latency: {result.total_latency_ms:.0f}ms")
    print(f"Tokens consumed: {result.tokens_consumed}")
    print(f"Resilience events: {result.resilience_events}")
    print("=" * 50)
    print("Runner metrics:", runner.get_metrics())

设计要点:

7. 多模型韧性:跨 OpenAI、Claude 和 DeepSeek 的统一保护

大多数生产 Agent 不依赖单一 LLM Provider——它们使用多个 Provider 来平衡成本、延迟和可用性。但每个 Provider 有不同的故障模式和限流模型。多模型韧性架构将这些差异抽象为统一的保护层。

Provider 抽象层

多模型韧性需要第一块基石:一个 Provider 抽象层,让 Agent 无需关心底层是 OpenAI、Anthropic 还是 DeepSeek。这与 模型无关的 Agent 设计 的核心原则一致——抽象层使得跨 Provider 回退成为可能。

每 Provider 独立熔断 + 独立限流

每个 Provider 有自己独立的熔断器和限流器实例:

当 OpenAI 熔断器 Open 时,只有 OpenAI 的调用被阻断——Anthropic 和 DeepSeek 的调用仍然正常通过。

跨 Provider 回退链

回退链是可配置的优先级列表(例如 ["openai", "anthropic", "deepseek"])。当主 Provider 不可用时,MultiProviderResilienceManager 自动将请求路由到下一个 Provider。路由决策考虑三个因素:

  1. 可用性:Provider 的熔断器是否 Closed?限流器是否有足够配额?
  2. 成本:在预算压力下(聚合使用率 > 80%),优先路由到更便宜的 Provider(DeepSeek 通常是成本最低的选择)。
  3. 延迟:如果 SLA 对延迟敏感,跳过响应慢的 Provider(DeepSeek 可能在高并发下延迟升高)。

DeepSeek:中文读者的核心关注

对于中文团队,DeepSeek 的独特地位使得多模型韧性中 DeepSeek 的策略需要特殊设计:

代码:MultiProviderResilienceManager

from __future__ import annotations

import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import logging

logger = logging.getLogger("agent.multi_provider")


class ProviderTier(Enum):
    PREMIUM = "premium"     # OpenAI GPT-5.5, Claude Opus
    STANDARD = "standard"   # GPT-5.4, Claude Sonnet
    BUDGET = "budget"       # GPT-5.4-mini, Claude Haiku, DeepSeek


@dataclass
class ProviderInfo:
    """Provider 画像:包含可用性、成本和能力元数据。"""
    name: str
    tier: ProviderTier
    cost_per_1m_input: float = 0.0
    cost_per_1m_output: float = 0.0
    max_context: int = 128_000
    supports_streaming: bool = True
    circuit_breaker: Optional[Any] = None     # AgentCircuitBreaker
    rate_limiter: Optional[Any] = None        # TokenAwareRateLimiter


# Provider 成本画像(2026 Q2 近似值,实际以 Provider 定价页为准)
PROVIDER_REGISTRY: Dict[str, ProviderInfo] = {
    "openai-gpt55": ProviderInfo(
        name="openai-gpt55", tier=ProviderTier.PREMIUM,
        cost_per_1m_input=5.00, cost_per_1m_output=16.00,
        max_context=256_000,
    ),
    "openai-gpt54": ProviderInfo(
        name="openai-gpt54", tier=ProviderTier.STANDARD,
        cost_per_1m_input=2.50, cost_per_1m_output=10.00,
    ),
    "anthropic-opus": ProviderInfo(
        name="anthropic-opus", tier=ProviderTier.PREMIUM,
        cost_per_1m_input=15.00, cost_per_1m_output=75.00,
    ),
    "anthropic-haiku": ProviderInfo(
        name="anthropic-haiku", tier=ProviderTier.BUDGET,
        cost_per_1m_input=0.80, cost_per_1m_output=4.00,
    ),
    "deepseek-chat": ProviderInfo(
        name="deepseek-chat", tier=ProviderTier.BUDGET,
        cost_per_1m_input=0.14, cost_per_1m_output=0.28,   # 极低成本
        max_context=128_000,
    ),
}


class MultiProviderResilienceManager:
    """多模型韧性管理器。

    核心职责:
    1. 每 Provider 独立熔断器 + 限流器管理
    2. 跨 Provider 回退链(可用性 + 成本 + 延迟)
    3. 统一 Token 预算追踪
    4. 成本感知路由
    """

    def __init__(self, provider_registry=None):
        self.providers = provider_registry or PROVIDER_REGISTRY
        self._token_usage: Dict[str, int] = {}
        self._budget_pressure_threshold: float = 0.80  # 80% 触发成本路由

    def attach_circuit_breaker(self, provider_name: str, cb: Any) -> None:
        """为 Provider 绑定独立熔断器。"""
        if provider_name in self.providers:
            self.providers[provider_name].circuit_breaker = cb

    def attach_rate_limiter(self, provider_name: str, rl: Any) -> None:
        """为 Provider 绑定独立限流器。"""
        if provider_name in self.providers:
            self.providers[provider_name].rate_limiter = rl

    def is_provider_available(self, provider_name: str) -> bool:
        """检查 Provider 是否可用(熔断器 Closed + 限流器有配额)。"""
        info = self.providers.get(provider_name)
        if not info:
            return False
        if info.circuit_breaker:
            cb_state = info.circuit_breaker.state
            if hasattr(cb_state, "value") and cb_state.value != "closed":
                return False
        if info.rate_limiter:
            metrics = info.rate_limiter.get_metrics()
            provider_metrics = metrics.get(provider_name, {})
            if provider_metrics.get("usage_pct", 0) > 95:
                return False
        return True

    def select_provider(
        self, preferred: List[str], context=None
    ) -> Optional[ProviderInfo]:
        """按优先级选择可用 Provider。

        决策逻辑:
        1. 按 preferred 列表顺序
        2. 跳过不可用 Provider
        3. 如果全局预算压力 > 80%,BUDGET tier 优先
        """
        context = context or {}
        total_used = sum(self._token_usage.values())
        total_cap = 0
        for info in self.providers.values():
            if info.rate_limiter:
                total_cap = max(total_cap, info.rate_limiter._aggregate_cap)
        budget_pressure = (total_used / total_cap) if total_cap > 0 else 0.0

        if budget_pressure > self._budget_pressure_threshold:
            logger.info(
                "Budget pressure %.1f%% > %.0f%%, cost-aware routing",
                budget_pressure * 100, self._budget_pressure_threshold * 100)
            preferred = sorted(
                preferred,
                key=lambda p: (
                    self.providers[p].tier != ProviderTier.BUDGET,
                    self.providers[p].cost_per_1m_input,
                ),
            )

        for name in preferred:
            if self.is_provider_available(name):
                return self.providers[name]

        for name, info in self.providers.items():
            if info.tier == ProviderTier.BUDGET and self.is_provider_available(name):
                logger.warning("Falling back to budget provider %s", name)
                return info
        return None

    def record_usage(self, provider_name: str, tokens: int) -> None:
        self._token_usage[provider_name] = (
            self._token_usage.get(provider_name, 0) + tokens)

    def get_usage_summary(self) -> dict:
        total = sum(self._token_usage.values())
        return {
            "per_provider": dict(self._token_usage),
            "total_tokens": total,
        }

    def get_failover_chain(self, preferred: List[str]) -> List[str]:
        chain = []
        for name in preferred:
            if self.is_provider_available(name):
                chain.append(name)
        for name in self.providers:
            if name not in chain and self.is_provider_available(name):
                chain.append(name)
        return chain

    def get_metrics(self) -> dict:
        metrics = {"providers": {}, "aggregate": self.get_usage_summary()}
        for name, info in self.providers.items():
            cb_metrics = {}
            if info.circuit_breaker and hasattr(info.circuit_breaker, "get_metrics"):
                cb_metrics = info.circuit_breaker.get_metrics()
            rl_metrics = {}
            if info.rate_limiter and hasattr(info.rate_limiter, "get_metrics"):
                rl_metrics = info.rate_limiter.get_metrics()
            metrics["providers"][name] = {
                "tier": info.tier.value,
                "available": self.is_provider_available(name),
                "cost_per_1m": {"input": info.cost_per_1m_input,
                                "output": info.cost_per_1m_output},
                "circuit_breaker": cb_metrics,
                "rate_limiter": rl_metrics,
                "tokens_used": self._token_usage.get(name, 0),
            }
        return metrics


# ---- 使用示例 ----
if __name__ == "__main__":
    manager = MultiProviderResilienceManager()

    class FakeCB:
        def __init__(self, name):
            self.name = name
            self.state = type("S", (), {"value": "closed"})()
        def get_metrics(self):
            return {"state": self.state.value}

    class FakeRL:
        def __init__(self):
            self._aggregate_cap = 2_000_000
        def get_metrics(self):
            return {"aggregate": {"usage_pct": 35.0}}

    for name in PROVIDER_REGISTRY:
        manager.attach_circuit_breaker(name, FakeCB(name))
        manager.attach_rate_limiter(name, FakeRL())

    # 模拟 OpenAI 熔断
    manager.providers["openai-gpt55"].circuit_breaker.state.value = "open"

    preferred = ["openai-gpt55", "anthropic-haiku", "deepseek-chat"]
    selected = manager.select_provider(preferred)
    if selected:
        print(f"Selected: {selected.name} (tier={selected.tier.value})")
    else:
        print("No provider available!")

    chain = manager.get_failover_chain(preferred)
    print(f"Failover chain: {chain}")

    manager.record_usage("anthropic-haiku", 5000)
    manager.record_usage("deepseek-chat", 15000)
    print("Usage summary:", manager.get_usage_summary())

设计要点:

8. 常见问题 + 下一步阅读

常见问题

1. 熔断器和重试有什么区别,什么时候用哪个?

重试(Retry)和熔断器(Circuit Breaker)解决的是两类不同的问题。重试适用于瞬态故障——网络抖动、临时的连接超时、偶尔的 503——这些故障在短时间内可能自行恢复,再试一次就能成功。熔断器适用于持续故障——下游服务已经确认宕机、一个端点在 30 秒内连续失败了 5 次——此时重试只是在浪费时间和 Token,并且在给已经崩溃的服务施加额外负载。在实际部署中,两者是嵌套关系:重试在熔断器内部执行。当熔断器 Closed 时,函数内部的 Retry 逻辑正常工作;当熔断器 Open 时,before_call() 直接抛异常,Retry 还没开始就被打断了——这正是我们想要的行为。

2. 令牌感知限流和传统请求限流的核心区别是什么?

核心区别在消耗的单位。传统限流器消耗"请求数"——每分钟 N 个请求,每个请求消耗 1。这假设每个请求的成本大致相当——在 LLM 的世界里这个假设是错的。一个 50 Token 的分类请求和一个 80,000 Token 的文档分析请求在传统限流器看来完全一样——都消耗 1 个请求槽位。但 Provider 的计费和限流是基于 Token 的(TPM),不是基于请求数的。令牌感知限流改动了这个消耗单位:每次调用消耗的不再是 1,而是此次调用的预估 Token 数。另外,令牌感知限流还需要在调用后从响应头校准——因为客户端不可能精确预估 Token(尤其是输出 Token),所以必须依赖 Provider 在响应中提供的实际计数来保持客户端桶的准确性。

3. Bulkhead 隔离和 Rate Limiting 看起来很像,怎么区分?

二者在"计数和限制"的维度上确实相似,但本质不同:Rate Limiting 是流控——它基于速率和时间窗口决定"这个请求是否应该通过"。被拒绝的请求通常可以稍后重试。Bulkhead 是资源隔离——它基于物理资源(线程、内存、文件句柄)分配独立池,确保一个池的耗尽不影响其他池。Bulkhead 的用户级 Token 预算看起来像限流(限制每分钟 Token 数),但它的动机不是"这个用户调用太快"——而是"如果这个用户耗尽全局 Token 配额,其他用户会受影响吗?"Bulkhead 按隔离域分区(用户、工具、会话),Rate Limiter 按速率Provider限制。实际系统中的位置:Rate Limiter 在调用入口("有配额吗?"),Bulkhead 在资源分配层("你有自己的专用池")。

4. 优雅降级会不会让用户感知到质量下降?

会的,但感知到质量下降好过感知到服务不可用。这是降级设计的核心哲学——"比没有要好"(better than nothing)。关键是如何管理用户感知:降级元数据中的 quality_estimate 字段让前端可以根据降级级别调整 UI——例如 partial 结果可以加一个黄色信息条"部分信息暂时不可用",fallback 结果可以显示"使用了备用模型,结果可能略有差异"。更重要的是,降级结果的质量通常超过用户的预期——一个从 DeepSeek 返回的缓存答案(quality 0.7)可能已经足够满足"我的订单到哪了"这种查询,用户甚至不会注意到它不完整。真正会让用户不满的是服务完全无响应——降级防止的正是这种最坏情况。

5. 这些模式必须全部实现吗,还是可以逐步落地?

可以也应该逐步落地。推荐的落地顺序是:熔断器 → 令牌感知限流 → 优雅降级 → Bulkhead 隔离。为什么这个顺序?熔断器最直接解决问题——防止 Agent 在故障依赖上烧 Token(只改一个调用点,收益立即可见)。限流器是第二个优先级——防止 429 错误和配额耗尽(因为 429 是 Agent 最常见的生产故障之一)。降级引擎在熔断和限流就位后变得特别有价值——它们为降级提供了明确的触发信号。Bulkhead 最后加——它解决的是"资源共享导致级联故障"的问题,这通常在 Agent 部署到多用户环境后才成为显性痛感。每个模式都可以独立部署并产生独立收益——不需要全部就位才开始看到效果。

6. 多模型韧性架构会不会增加太多复杂度?

如果你只使用一个 LLM Provider 且流量不高,多模型韧性的确可能是过度设计。但有几个信号表明你需要多模型韧性:(1)你的 Agent 已经在生产环境中因为 Provider 宕机而中断过至少一次;(2)你的月度 LLM API 账单超过 $500,且单一 Provider 占 90% 以上;(3)你有成本敏感场景(如客服 Agent 日均 10,000+ 调用),不同 Provider 的价格差直接影响利润率。对于大多数中文团队,一个实用的起步方案是"OpenAI/Claude 为主 + DeepSeek 为回退"——这意味着只需要 2 个 Provider 的韧性管理,复杂度可控,但已经覆盖了绝大多数故障场景。复杂度是真实存在的——但与一次 Provider 中断导致的收入损失相比,多模型韧性的实现成本(估计 3-5 天的工程投入)通常是很划算的投资。