Agent 韧性模式:熔断、限流、Bulkhead 与优雅降级设计
30秒要点
- 核心问题:Agent 在生产环境中调用 LLM API 和工具时,会遇到 Provider 宕机、API 限流、工具故障和资源耗尽四类问题。简单的重试逻辑会放大这些故障——每次重试消耗 Token、施加负载、延长故障时间——而不是修复它们。
- 四层韧性栈:熔断器(Circuit Breaker)→ 停止调用已故障的依赖;令牌感知限流(Token-Aware Rate Limiter)→ 保护 API 配额不被耗尽;Bulkhead 隔离 → 防止单个任务拖垮全局;优雅降级(Graceful Degradation)→ 核心依赖不可用时保持系统可运行。
- 关键设计原则:韧性模式必须在 Agent 请求生命周期的固定位置注入——Bulkhead 在入口、Rate Limiter 在调用前、Circuit Breaker 包裹每次调用、Degradation 在事后兜底。这四者形成一条"请求进入 → 资源检查 → 调用保护 → 失败兜底"的完整链路。
- 读完能做什么:为你的 Agent 系统设计四层韧性架构——在 Python 中实现完整的
AgentCircuitBreaker、TokenAwareRateLimiter、AgentBulkhead和GracefulDegradationEngine,并通过ResilientAgentRunner将它们组合为生产级 Agent 运行时的统一保护层。
1. 为什么 Agent 需要超越重试的韧性设计
一个电商客服 Agent 在深夜 2:17 收到一条查询:"我的订单 #38291 什么时候到货?"Agent 的第一步是调用 get_order 工具获取订单详情。get_order 背后连接的是订单微服务的 /api/v1/orders/38291 端点——但这个端点因为数据库连接池耗尽,正在返回 HTTP 503。
Agent 的重试逻辑触发:第 1 次重试(2:17:01),第 2 次重试(2:17:03,退避 2 秒),第 3 次重试(2:17:07,退避 4 秒)。三次重试全部失败。Agent 的 LLM 在每次重试之间仍然在消耗 Token 来"思考"重试策略——"订单服务返回 503,我再试一次……""还是 503,可能只是临时波动……""第三次了,我换个 query 参数……"。到 2:17:10,这个本应在 800 Token 内完成的简单查询,已经烧掉了 4200 Token——而订单服务仍然不可用,用户仍然没有答案。
这并不是 Agent 代码写错了。这是重试逻辑在 Agent 场景下的系统性缺陷——每次重试都会消耗上游 LLM 的推理 Token,而 LLM 的"思考"本身是在消耗同一套已经紧张的资源。更致命的是,重试没有区分故障类型:瞬态网络抖动(值得重试)和下游服务宕机(重试只会加重故障)被同一种逻辑对待。
重试放大方程:1 个 Agent 请求 × N 次重试 × (LLM 思考 Token + 工具调用成本) = 原有成本 × N。如果 100 个并发 Agent 同时遇到同一个下游故障,它们会在 10 秒内对已经崩溃的服务施加 100 × N 次额外请求——这不是在解决问题,这是在制造 DoS。
四层韧性栈:重试之上的防护架构
重试是韧性体系的第一层——它解决"再试一次也许能成"的问题。但 Agent 的生产韧性要求在此基础上叠加四层防护:
- 熔断器(Circuit Breaker):当某个依赖(LLM Provider 或工具 API)连续失败达到阈值时,直接停止调用,快速失败。"不要再给已经崩溃的服务发请求了——先等它恢复。"熔断器是重试的外层守卫:内层可以重试,但当熔断器开启时,重试也被跳过。
- 令牌感知限流(Token-Aware Rate Limiter):LLM API 的配额模型是 TPM(每分钟 Token 数)而非 RPM(每分钟请求数)。传统限流器按请求计数,在 Agent 场景下完全失效——一个请求可能消耗 100 Token(简单分类),也可能消耗 120,000 Token(长文档分析)。令牌感知限流器让 Agent 在调用前就能预判"我还有没有足够的 Token 预算完成这次调用"。
- Bulkhead 隔离:船的水密隔舱。一个 Agent 用户不应该因为另一个用户的大量请求而耗尽全局 Token 配额或线程池。Bulkhead 在三个维度上隔离资源:用户级 Token 预算、工具级线程池、会话级运行时沙箱。
- 优雅降级(Graceful Degradation):当以上三层防护全部穿透——Provider 确实宕机了,工具真的不可用了——Agent 不应该崩溃。降级引擎给出一套优先级决策树:切换到备用模型 → 跳过非关键工具 → 返回缓存结果 → 入队异步处理 → 返回部分结果并标注质量。
这四层与重试的关系是分层的:重试在最内层(针对瞬态故障),熔断器包裹重试(停止对持续故障的重试),限流器在调用入口(确保有配额),降级在兜底(所有路径都失败后做什么)。关于重试本身的深度设计(指数退避、抖动、重试预算),参见 Agent 错误恢复——本文聚焦于重试之上的架构级韧性。
Agent 特有的故障模式:为什么微服务韧性模式不能原样照搬
如果你熟悉 Resilience4j、Polly 或 Hystrix,你可能会想:"这些模式我都在微服务里用过了,Agent 有什么不同?"答案是三个根本差异:
- 故障类型不同:微服务只区分 HTTP 5xx / 4xx / timeout。Agent 还需要检测语义失败——LLM 返回了格式正确的 JSON,但里面的工具名是幻觉(hallucination)、参数类型不匹配、调用了一个不存在的 API。这种失败在 HTTP 层返回 200 OK,在传统熔断器看来是"成功"的。
- 成本模型不同:微服务的重试成本是 CPU 和网络带宽,Agent 的重试成本是 LLM Token——美元计费。一次冗余重试在微服务中成本趋近于零,在 Agent 中可能烧掉 $0.02–$0.50(取决于模型和上下文长度)。Token 感知不是优化——是生存需求。
- 多 Provider 依赖:微服务通常调用自己的服务。Agent 同时依赖多个 LLM Provider(OpenAI、Anthropic、DeepSeek)和多个工具 API。每个 Provider 有独立的故障模式、限流模型和退避策略。韧性必须跨 Provider 统一管理。
Wow Moment:如果你的 Agent 只用了重试而没有熔断器,当一个下游服务宕机时,每一个调用该服务的 Agent 请求都会独立执行 N 次重试。100 个并发 Agent = 100 × N 次额外调用。加上被消耗的 LLM Token 在每次重试中的"思考成本",你的月度 API 费用会在一次 30 分钟的服务中断中烧掉 2-3 天的预算。熔断器不需要很复杂——它只需要在检测到连续失败后回答一个问题:"这个依赖现在不可用,我是否应该停止尝试?"就够了。
2. 熔断器模式:为 Agent 工具调用加上断路器
熔断器(Circuit Breaker)的核心思路来自电气工程:当电路中的电流超过安全阈值时,断路器跳闸,切断电路,防止火灾。在软件中,它保护系统不被故障依赖的持续调用拖垮。
三态状态机
熔断器在三个状态之间转换:
- Closed(闭合/正常):所有请求正常通过。每次失败都会递增失败计数器。当失败次数在时间窗口内超过阈值时,状态切换到 Open。
- Open(断开/熔断):所有请求被立即拒绝,抛出
CircuitBreakerOpenError,不执行实际调用。Open 状态持续一个冷却期(cooldown),之后切换到 Half-Open。 - Half-Open(半开/探测):允许有限数量的探测请求通过(通常 1–3 个)。如果探测请求成功,熔断器复位到 Closed。如果探测请求失败,熔断器重新回到 Open,冷却期重置。
Agent 特有的失败检测:比 HTTP 状态码更深的信号
传统熔断器把"失败"定义为 HTTP 5xx 或超时。Agent 的熔断器需要感知三类失败:
- HTTP 错误:5xx(服务器故障)、429(限流)、连接超时。这是第一层,与传统一致。
- 语义失败:LLM 返回了 HTTP 200,但响应体中的工具调用是幻觉——例如
"tool_name": "get_odrer"(拼写错误)、"arguments": {"id": "not_an_integer"}(类型错误)、或引用了一个不存在的函数名。语义失败对 Agent 的破坏力不亚于 HTTP 500。 - 超时模式:不只是总调用时长,还包括 Token 产出速率。如果 LLM 在 30 秒内只产出了 15 个 Token(正常应为 500+),说明 Provider 可能处于降级状态——熔断器应将此视为失败信号。
每工具 vs 每 Provider 熔断
Agent 有两种粒度的熔断策略:
- 每工具熔断器:为
get_order、search_docs、send_email等每个工具独立维护熔断器。当一个工具故障时,其他工具不受影响——Agent 可以继续使用健康工具完成任务(配合降级引擎跳过故障工具)。 - 每 Provider 熔断器:为
openai、anthropic、deepseek每个 LLM Provider 独立维护熔断器。当一个 Provider 全局宕机时,流量切换到备用 Provider。
两者并存:Provider 熔断器保护 LLM 调用路径,工具熔断器保护工具调用路径。一个典型的 Agent 请求会经过至少两个熔断器。
令牌成本感知熔断
一个容易被忽视的信号:当 LLM 单次调用的 Token 消耗远超正常范围时,应该触发熔断。例如,正常情况下一句"查询订单状态"的 LLM 调用消耗 500–800 Token。如果某次调用消耗了 8,000 Token(因为 LLM 陷入了重复思考或尝试了多种策略),这本身就是故障信号——继续重试只会放大成本。
熔断器记录每次调用的 Token 消耗,当单次 Token 超阈值(如 > 5,000 Token 且结果仍然失败)或滚动窗口 Token 浪费(过去 60 秒内因失败浪费的 Token > 预算的 20%)时,提前触发熔断。
代码:AgentCircuitBreaker 类
from __future__ import annotations
import time
import threading
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional, Any, Dict
import functools
import logging
logger = logging.getLogger("agent.circuit_breaker")
class CircuitState(Enum):
CLOSED = "closed" # 正常运行
OPEN = "open" # 熔断中,快速失败
HALF_OPEN = "half_open" # 探测中
@dataclass
class CircuitBreakerConfig:
"""熔断器配置——所有阈值均可按 Agent 场景调整。"""
failure_threshold: int = 5 # 连续失败触发熔断的次数
success_threshold: int = 2 # Half-Open 下需连续成功来闭合的次数
cooldown_seconds: float = 30.0 # Open 状态冷却时间
window_seconds: float = 60.0 # 失败计数滑动窗口
# Agent 特有配置
token_cost_threshold: int = 5000 # 单次调用 Token 超阈值 → 计为失败
max_half_open_probes: int = 2 # Half-Open 状态下允许的并发探测数
@dataclass
class CallResult:
"""封装每次调用的结果,包含 Agent 特有的失败信号。"""
success: bool
latency_ms: float
tokens_consumed: int = 0
semantic_failure: Optional[str] = None # e.g. "hallucinated tool name"
http_status: Optional[int] = None
error_message: str = ""
class CircuitBreakerOpenError(Exception):
"""熔断器断开时抛出,承载状态信息供调用方决策(如触发降级)。"""
def __init__(self, breaker_name: str, state: CircuitState, failures: int):
self.breaker_name = breaker_name
self.state = state
self.failures = failures
super().__init__(
f"Circuit breaker '{breaker_name}' is {state.value} "
f"(failures: {failures})"
)
class AgentCircuitBreaker:
"""Agent 场景的熔断器实现。
支持:
- 三态状态机 + 滑动窗口
- HTTP 失败 + 语义失败 + Token 异常三种失败检测
- 线程安全(可在多工具并发场景下使用)
- Half-Open 探测并发控制
"""
def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self._failures: list[float] = [] # 失败时间戳列表(滑动窗口)
self._successes_in_half_open: int = 0
self._last_failure_time: float = 0.0
self._open_since: float = 0.0
self._half_open_probe_count: int = 0 # 当前探测中的请求数
self._lock = threading.RLock()
self.last_result: Optional[CallResult] = None
# ---- 状态机核心 ----
def before_call(self) -> None:
"""调用前检查:如果熔断器 Open 且在冷却期,直接拒绝。"""
with self._lock:
if self.state == CircuitState.CLOSED:
return
if self.state == CircuitState.OPEN:
elapsed = time.monotonic() - self._open_since
if elapsed >= self.config.cooldown_seconds:
self.state = CircuitState.HALF_OPEN
self._successes_in_half_open = 0
self._half_open_probe_count = 0
logger.info(
"CB [%s] transitioning OPEN -> HALF_OPEN", self.name
)
else:
raise CircuitBreakerOpenError(
self.name, self.state, len(self._failures)
)
if self.state == CircuitState.HALF_OPEN:
if self._half_open_probe_count >= self.config.max_half_open_probes:
raise CircuitBreakerOpenError(
self.name, self.state, len(self._failures)
)
self._half_open_probe_count += 1
def after_call(self, result: CallResult) -> None:
"""调用后更新状态:成功或失败推进状态机。"""
with self._lock:
self.last_result = result
now = time.monotonic()
cutoff = now - self.config.window_seconds
self._failures = [t for t in self._failures if t > cutoff]
if self.state == CircuitState.HALF_OPEN:
self._half_open_probe_count = max(
0, self._half_open_probe_count - 1
)
is_failure = (
not result.success
or result.semantic_failure is not None
or result.tokens_consumed > self.config.token_cost_threshold
)
if is_failure:
self._failures.append(now)
self._last_failure_time = now
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self._open_since = now
self._successes_in_half_open = 0
logger.warning(
"CB [%s] HALF_OPEN probe FAILED -> OPEN", self.name
)
elif self.state == CircuitState.CLOSED:
if len(self._failures) >= self.config.failure_threshold:
self.state = CircuitState.OPEN
self._open_since = now
logger.error(
"CB [%s] CLOSED -> OPEN! %d failures",
self.name, len(self._failures),
)
else:
if self.state == CircuitState.HALF_OPEN:
self._successes_in_half_open += 1
if self._successes_in_half_open >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self._failures.clear()
logger.info(
"CB [%s] HALF_OPEN -> CLOSED (%d successes)",
self.name, self._successes_in_half_open,
)
elif self.state == CircuitState.CLOSED:
self._failures.clear()
@staticmethod
def is_semantic_failure(response_data: dict) -> Optional[str]:
"""检测 LLM 工具调用响应中的语义失败。"""
tool_calls = response_data.get("tool_calls", [])
if not tool_calls:
return None
for tc in tool_calls:
func_name = tc.get("function", {}).get("name", "")
if not func_name or len(func_name) < 2:
return f"Malformed tool name: '{func_name}'"
return None
def get_metrics(self) -> dict:
with self._lock:
return {
"name": self.name,
"state": self.state.value,
"failures_in_window": len(self._failures),
"open_since": self._open_since,
"half_open_probes": self._half_open_probe_count,
}
# ---- 装饰器 API ----
class circuit_breaker:
"""装饰器:为 Agent 工具函数自动包装熔断器。
用法:
breaker = AgentCircuitBreaker("get_order")
@circuit_breaker(breaker)
def get_order(order_id: str) -> dict:
...
"""
def __init__(self, breaker: AgentCircuitBreaker):
self.breaker = breaker
def __call__(self, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
self.breaker.before_call()
start = time.monotonic()
try:
result = func(*args, **kwargs)
elapsed = time.monotonic() - start
self.breaker.after_call(CallResult(
success=True, latency_ms=elapsed * 1000))
return result
except Exception as e:
elapsed = time.monotonic() - start
self.breaker.after_call(CallResult(
success=False, latency_ms=elapsed * 1000,
error_message=str(e)))
raise
return wrapper
# ---- 使用示例 ----
if __name__ == "__main__":
breaker = AgentCircuitBreaker("search_docs")
# 模拟连续失败 -> 熔断触发
for i in range(6):
try:
breaker.before_call()
raise ConnectionError("Backend unreachable")
except CircuitBreakerOpenError:
print(f"[Call {i}] Circuit OPEN -- fast failing")
break
except ConnectionError:
breaker.after_call(CallResult(
success=False, latency_ms=50,
error_message="Backend unreachable"))
print(f"[Call {i}] Failure recorded")
print("Metrics:", breaker.get_metrics())
设计要点:
- 线程安全:
threading.RLock确保在多工具并发调用时状态机一致性。Half-Open 状态下的并发探测数由max_half_open_probes限制——防止"雪崩探测"(大量请求同时探测,其中一个失败可能导致其他探测也被误判)。 - 为什么不用 pybreaker:pybreaker 提供了基础的 HTTP 熔断,但它不感知语义失败、Token 消耗、也不支持 Half-Open 探测并发控制。在 Agent 场景下,你需要的是能理解 LLM 响应内容(不只是 HTTP 状态码)的熔断器。pybreaker 可以作为底层状态机的参考实现,但 Agent 熔断器的逻辑必须自定义。
- 与 Retry 的组合:Retry 逻辑在熔断器内部。熔断器 Closed 时,Retry 正常执行;熔断器 Open 时,
before_call()直接抛异常,Retry 不会触发。这避免了"熔断器断了但 Retry 还在重试"的矛盾。具体组合方式见第 6 节。
MCP(Model Context Protocol)中的工具服务端也可以受益于相同的熔断模式——MCP 服务端的健康状态(连接、超时、响应格式)可以直接映射到熔断器的失败信号。参见 MCP 协议生产指南 了解 MCP 服务端的韧性集成。
3. 令牌感知限流:保护 LLM API 配额不被耗尽
为什么传统请求限流失效
大多数 API 限流器是基于请求数的:每分钟最多 N 个请求。这在 REST API 的世界里工作良好——每个请求的成本大致相当。但 LLM API 的计费模型完全不是这样:
| 调用场景 | 输入 Token | 输出 Token | 总 Token | 相当于多少个"标准请求"? |
|---|---|---|---|---|
| 简单分类("这是垃圾邮件吗?") | 50 | 5 | 55 | 1× |
| 客服回复(带上下文) | 800 | 200 | 1,000 | 18× |
| 代码审查(完整 PR diff) | 12,000 | 3,000 | 15,000 | 273× |
| 长文档摘要(50 页 PDF) | 80,000 | 5,000 | 85,000 | 1,545× |
如果你按 RPM(请求数/分钟)限流,设 100 RPM,那么 100 次简单分类(5,500 Token)和 100 次长文档摘要(8,500,000 Token)被同等对待——但后者会瞬间耗尽你的 TPM 配额。传统限流器对这种差异完全失明。
Token Bucket 改造:从请求桶到 Token 桶
经典的 Token Bucket 算法维护一个"桶",桶中有 N 个 Token,每次请求消耗 1 个 Token,Token 以固定速率补充。Agent 的改造很简单但关键:桶容量 = Provider 的 TPM 上限,每次请求消耗的不再是 1,而是此次调用的预估 Token 数。
这意味着:
- 调用前:预估本次调用将消耗的 Token(基于系统提示 + 上下文 + 预估值)
- 调用中:如果桶中 Token 不足,请求被拒绝或入队等待
- 调用后:从 Provider 响应头(
x-ratelimit-remaining-tokens或anthropic-ratelimit-tokens-remaining)读取实际消耗,校准桶的计数
Provider 限流画像——尤其是 DeepSeek
每个 Provider 的限流模型不同,限流器必须感知这些差异:
- OpenAI:RPM(请求数/分钟)+ TPM(Token/分钟)+ RPD(请求数/天)。响应头提供
x-ratelimit-remaining-requests和x-ratelimit-remaining-tokens。 - Anthropic:RPM + ITPM(输入 Token/分钟)+ OTPM(输出 Token/分钟)。缓存命中的 Token 不计入 ITPM。响应头极为丰富:
anthropic-ratelimit-requests-remaining、anthropic-ratelimit-tokens-remaining。 - DeepSeek:与 OpenAI 和 Anthropic 不同,DeepSeek 的核心限制是并发连接数(而非 TPM/RPM 的绝对值)。同时,DeepSeek 的 Token 吞吐受限于每个并发连接的速率。这意味着 DeepSeek 的限流器需要跟踪活跃请求数和每个请求的预计 Token 消耗——一个纯粹的 TPM 桶对 DeepSeek 不够准确。此外,DeepSeek 的价格是 OpenAI 的 1/10–1/30,中文读者大量依赖 DeepSeek 来保持成本可控,但这也意味着单个 DeepSeek API Key 被更多 Agent 共享——并发冲突的概率更高,限流器必须在客户端做好协调。
响应头动态调整
客户端限流器的状态永远不可能是完全准确的——Provider 端可能有其他客户端在同时消耗配额。因此,限流器必须在每次调用后读取 Provider 的响应头来校准:
- 剩余 Token:如果
x-ratelimit-remaining-tokens显示仅剩 1,000 Token,而客户端桶显示还有 5,000——以 Provider 响应头为准,立即下修客户端桶。 - Retry-After:如果返回 429 且携带
Retry-After: 15,限流器应冻结桶的 Token 补充 15 秒,而非继续按固定速率补充。 - 重置时间:
x-ratelimit-reset-requests或anthropic-ratelimit-reset-tokens指示配额重置的时间窗口。限流器应在接近重置点时释放被保守扣押的 Token。
客户端 Token 预估:在不知道确数之前就做判断
限流器的核心难题:你需要在调用之前就知道要消耗多少 Token,但你只有调用之后才能知道确数。解决方案是预估 + 校准:
- 预估:基于历史数据:同类调用的平均 Token 消耗。系统提示 + 用户消息的字符数除以每 Token 字符数(中文约 1.5 字符/Token,英文约 4 字符/Token)。加上
max_tokens参数作为输出侧的上限。 - 校准:调用后从响应头获取实际消耗,更新预估模型。如果连续 10 次调用的预估值偏离实际超过 30%,调整估算因子。
代码:TokenAwareRateLimiter 类(支持 asyncio)
from __future__ import annotations
import time
import asyncio
import threading
from dataclasses import dataclass, field
from typing import Optional, Dict
from enum import Enum
import logging
logger = logging.getLogger("agent.rate_limiter")
class Provider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
DEEPSEEK = "deepseek"
@dataclass
class ProviderRateProfile:
"""Provider 限流画像——基于各 Provider 的文档和实测。"""
provider: Provider
tpm_limit: int # Token 数/分钟上限
rpm_limit: int = 500
max_concurrency: int = 50 # 最大并发连接数(DeepSeek 关键)
chars_per_token: float = 3.5 # 中英混合默认值
PROVIDER_PROFILES: Dict[Provider, ProviderRateProfile] = {
Provider.OPENAI: ProviderRateProfile(
provider=Provider.OPENAI, tpm_limit=1_000_000, rpm_limit=500,
chars_per_token=4.0,
),
Provider.ANTHROPIC: ProviderRateProfile(
provider=Provider.ANTHROPIC, tpm_limit=400_000, rpm_limit=400,
chars_per_token=4.0,
),
Provider.DEEPSEEK: ProviderRateProfile(
provider=Provider.DEEPSEEK, tpm_limit=500_000, rpm_limit=300,
max_concurrency=10, # DeepSeek 的核心限制!
chars_per_token=1.8, # 中文为主
),
}
@dataclass
class TokenBudget:
"""Token 桶:容量 = TPM 上限,以 Token/秒 速率补充。"""
capacity: float
tokens: float
refill_rate_per_sec: float
last_refill: float = field(default_factory=time.monotonic)
def refill(self) -> None:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity,
self.tokens + elapsed * self.refill_rate_per_sec)
self.last_refill = now
def consume(self, amount: float) -> bool:
self.refill()
if self.tokens >= amount:
self.tokens -= amount
return True
return False
class TokenAwareRateLimiter:
"""令牌感知限流器。
特性:Token Bucket 改造、多 Provider 独立桶、响应头动态校准、
asyncio 异步支持、DeepSeek 并发感知、队列式背压。
"""
def __init__(self, profiles=None):
self.profiles = profiles or PROVIDER_PROFILES
self._budgets: Dict[Provider, TokenBudget] = {}
self._semaphores: Dict[Provider, asyncio.Semaphore] = {}
self._lock = threading.RLock()
self._active_requests: Dict[Provider, int] = {}
for provider, profile in self.profiles.items():
self._budgets[provider] = TokenBudget(
capacity=float(profile.tpm_limit),
tokens=float(profile.tpm_limit),
refill_rate_per_sec=profile.tpm_limit / 60.0,
)
self._semaphores[provider] = asyncio.Semaphore(
profile.max_concurrency)
self._active_requests[provider] = 0
self._aggregate_tokens_consumed: float = 0.0
self._aggregate_cap: float = sum(
p.tpm_limit for p in self.profiles.values())
def estimate_tokens(self, provider: Provider, messages: list) -> int:
"""预估 Token 消耗——调用前使用。"""
profile = self.profiles.get(provider)
if not profile:
return 1000
total_chars = 0
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
total_chars += len(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and "text" in part:
total_chars += len(part["text"])
return max(1, int(total_chars / profile.chars_per_token) + 500)
def acquire(self, provider: Provider, estimated_tokens: int,
timeout_ms: float = 5000) -> bool:
"""同步获取 Token 配额。"""
budget = self._budgets.get(provider)
if not budget:
return False
deadline = time.monotonic() + timeout_ms / 1000
while time.monotonic() < deadline:
with self._lock:
if budget.consume(float(estimated_tokens)):
self._active_requests[provider] += 1
return True
time.sleep(0.05)
return False
def release(self, provider: Provider, actual_tokens: int,
estimated_tokens: int, response_headers=None):
"""调用后释放/校准配额。"""
budget = self._budgets.get(provider)
if not budget:
return
with self._lock:
diff = actual_tokens - estimated_tokens
if diff > 0:
budget.tokens = max(0, budget.tokens - diff)
elif diff < 0:
budget.tokens = min(budget.capacity, budget.tokens - diff)
self._active_requests[provider] = max(
0, self._active_requests[provider] - 1)
if response_headers:
remaining_hdr = (
response_headers.get("x-ratelimit-remaining-tokens")
or response_headers.get(
"anthropic-ratelimit-tokens-remaining")
)
if remaining_hdr is not None:
budget.tokens = min(budget.tokens, float(remaining_hdr))
# Retry-After 头 → 冻结 Token 补充(429 限流保护)
retry_after = response_headers.get("retry-after")
if retry_after is not None:
freeze_sec = float(retry_after)
budget.last_refill = time.monotonic() + freeze_sec
logger.info(
"Provider %s: freezing token refill for %.1fs (Retry-After)",
provider.value, freeze_sec,
)
self._aggregate_tokens_consumed += actual_tokens
async def aacquire(self, provider: Provider, estimated_tokens: int,
timeout_ms: float = 5000) -> bool:
"""异步获取 Token 配额 + 信号量(对 DeepSeek 关键)。"""
sem = self._semaphores.get(provider)
if not sem:
return False
try:
await asyncio.wait_for(sem.acquire(),
timeout=timeout_ms / 1000)
except asyncio.TimeoutError:
return False
budget = self._budgets.get(provider)
if not budget:
sem.release()
return False
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout_ms / 1000
while loop.time() < deadline:
with self._lock:
if budget.consume(float(estimated_tokens)):
self._active_requests[provider] += 1
return True
await asyncio.sleep(0.05)
sem.release()
return False
async def arelease(self, provider: Provider, actual_tokens: int,
estimated_tokens: int, response_headers=None):
"""异步释放配额。"""
self.release(provider, actual_tokens, estimated_tokens,
response_headers)
sem = self._semaphores.get(provider)
if sem:
sem.release()
def is_near_limit(self, provider: Provider,
threshold_pct: float = 0.85) -> bool:
"""是否接近限额(用于触发背压)。"""
budget = self._budgets.get(provider)
if not budget:
return False
budget.refill()
ratio = (budget.tokens / budget.capacity
if budget.capacity > 0 else 1.0)
return ratio < (1.0 - threshold_pct)
@property
def aggregate_consumption_pct(self) -> float:
return (self._aggregate_tokens_consumed / self._aggregate_cap
if self._aggregate_cap > 0 else 0.0)
def get_metrics(self) -> dict:
metrics = {}
for provider, budget in self._budgets.items():
budget.refill()
metrics[provider.value] = {
"tokens_remaining": budget.tokens,
"capacity": budget.capacity,
"usage_pct": round(
(1 - budget.tokens / budget.capacity) * 100, 1
) if budget.capacity > 0 else 0,
"active_requests": self._active_requests.get(provider, 0),
}
metrics["aggregate"] = {
"tokens_consumed": self._aggregate_tokens_consumed,
"total_cap": self._aggregate_cap,
"usage_pct": round(self.aggregate_consumption_pct * 100, 1),
}
return metrics
# ---- 使用示例 ----
if __name__ == "__main__":
deepseek_profile = ProviderRateProfile(
provider=Provider.DEEPSEEK,
tpm_limit=500_000, rpm_limit=300,
max_concurrency=10, chars_per_token=1.8,
)
limiter = TokenAwareRateLimiter({
Provider.DEEPSEEK: deepseek_profile,
Provider.OPENAI: PROVIDER_PROFILES[Provider.OPENAI],
})
messages = [{"role": "user",
"content": "帮我总结这份 10 页的合同。"}]
estimated = limiter.estimate_tokens(Provider.DEEPSEEK, messages)
print(f"Estimated tokens: {estimated}")
if limiter.acquire(Provider.DEEPSEEK, estimated, timeout_ms=5000):
print("Token acquired -- proceed with API call")
actual_tokens = 1200
headers = {"x-ratelimit-remaining-tokens": "498000"}
limiter.release(Provider.DEEPSEEK, actual_tokens, estimated,
headers)
else:
print("Rate limited -- must wait or use fallback")
print("Metrics:", limiter.get_metrics())
设计要点:
- DeepSeek 并发感知:限流器不只看 TPM,还通过
asyncio.Semaphore控制并发连接数。这对 DeepSeek 用户尤为重要——DeepSeek 的并发限制可能比 TPM 限制更早被触发。10 个并发连接每个消耗 10,000 Token 的调用 = 100,000 Token,远低于 500,000 TPM,但已经达到并发上限。 - 令牌感知限流与成本可观察性的关系:限流器实时追踪 Token 消耗,而成本可观察性将这些 Token 消耗转化为美元金额。限流器是事前保护(阻止超出预算),成本可观察性是事后核算(归因到团队/项目/用户)。参见 Agent 成本可观察性。
- 为什么不用 backoff 库的限流:backoff 和 Tenacity 提供了退避重试(当遇到 429 时等待),但它们不会在客户端预测并预防 429。令牌感知限流的目标是不让 429 发生——在请求发出前就判断是否有配额。
4. Bulkhead 隔离模式:防止 Agent 故障扩散
熔断器保护你免受故障依赖的伤害。限流器保护你免于耗尽配额。但还有一种失败模式:系统内部的一个组件消耗了所有共享资源,导致其他组件无法运行。这就是 Bulkhead(隔舱)模式要解决的问题。
类比:船体被分隔成多个水密隔舱。一个隔舱进水,船仍然能浮。软件的 Bulkhead 同理:将系统资源(线程、连接、内存、Token 配额)划分到隔离的池中,一个池的耗尽不影响其他池。
Agent 的三维隔离
Agent 系统需要三层 Bulkhead 隔离:
- 用户级 Token 预算(User-Level Budget):每个用户/租户有独立的 Token 消费上限(如每分钟 50,000 Token)。当一个用户触发大量 Agent 任务时,其他用户的配额不受影响。硬限制为拒绝调用,软警告为 80% 阈值。
- 工具级线程池(Per-Tool ThreadPool):每类工具(
search、database_query、send_email)分配独立的线程池。一个慢速的数据库查询不会阻塞所有邮件发送——两个工具在线程层面隔离。 - 会话级沙箱(Per-Session Sandbox):每个 Agent 会话拥有独立的临时目录、内存上限和文件描述符配额。一个会话创建了 10,000 个临时文件不会耗尽其他会话的文件系统资源。
Bulkhead vs Rate Limiting:关键区别
很多人混淆这两个模式。区别是清晰的:
- Rate Limiter 是流控:它决定"这个请求是否应该通过?"——基于速率、Token 预算、时间窗口。拒绝的请求通常可以稍后重试。
- Bulkhead 是资源隔离:它决定"这部分系统能用多少资源?"——基于线程数、内存、文件句柄的物理分配。一个 Bulkhead 池满了,不代表其他池也满了——隔离确保了独立。资源被分区,而非请求被限速。
二者可以协作:Rate Limiter 在 Bulkhead 入口处确保请求符合速率限制,Bulkhead 在分配的资源池内执行请求。
代码:AgentBulkhead 类
from __future__ import annotations
import time
import threading
from dataclasses import dataclass, field
from typing import Dict, Optional
from concurrent.futures import ThreadPoolExecutor
import logging
logger = logging.getLogger("agent.bulkhead")
class BulkheadCapacityExceeded(Exception):
"""Bulkhead 资源耗尽异常。"""
def __init__(self, bulkhead_name: str, limit_type: str,
current: float, limit: float):
self.bulkhead_name = bulkhead_name
self.limit_type = limit_type
self.current = current
self.limit = limit
super().__init__(
f"Bulkhead '{bulkhead_name}' {limit_type} exceeded: "
f"{current}/{limit}"
)
@dataclass
class UserTokenBudget:
"""用户级 Token 预算控制器。"""
user_id: str
token_limit: int # 硬限制
warning_threshold_pct: float = 0.80 # 80% 软警告
window_seconds: float = 60.0
tokens_used: float = 0.0
last_reset: float = field(default_factory=time.monotonic)
def _maybe_reset(self) -> None:
now = time.monotonic()
if now - self.last_reset >= self.window_seconds:
self.tokens_used = 0.0
self.last_reset = now
def try_consume(self, tokens: int) -> bool:
self._maybe_reset()
if self.tokens_used + tokens > self.token_limit:
return False
self.tokens_used += tokens
return True
@property
def usage_pct(self) -> float:
self._maybe_reset()
return (self.tokens_used / self.token_limit
if self.token_limit > 0 else 0.0)
@property
def is_near_limit(self) -> bool:
return self.usage_pct >= self.warning_threshold_pct
class AgentBulkhead:
"""Agent Bulkhead 隔离器。
三维隔离:
1. 用户级 Token 预算 —— 防止单一用户耗尽全局配额
2. 工具级线程池 —— 每类工具独立池,防止慢工具阻塞快工具
3. 会话级资源配额 —— 文件描述符/内存上限
"""
def __init__(
self,
default_token_limit: int = 100_000,
max_file_descriptors_per_session: int = 100,
):
self.default_token_limit = default_token_limit
self.max_fds_per_session = max_file_descriptors_per_session
self._user_budgets: Dict[str, UserTokenBudget] = {}
self._budget_lock = threading.RLock()
self._tool_pools: Dict[str, ThreadPoolExecutor] = {}
self._pool_lock = threading.RLock()
self._session_resources: Dict[str, dict] = {}
self._session_lock = threading.RLock()
# ---- 1. 用户级 Token 预算 ----
def check_user_budget(self, user_id: str,
estimated_tokens: int) -> bool:
"""检查用户 Token 预算。返回 False = 拒绝。"""
with self._budget_lock:
if user_id not in self._user_budgets:
self._user_budgets[user_id] = UserTokenBudget(
user_id=user_id,
token_limit=self.default_token_limit,
)
budget = self._user_budgets[user_id]
if budget.is_near_limit:
logger.warning(
"User %s approaching token limit: %.1f%%",
user_id, budget.usage_pct * 100,
)
if not budget.try_consume(estimated_tokens):
raise BulkheadCapacityExceeded(
f"user_budget:{user_id}", "token_budget",
budget.tokens_used, budget.token_limit,
)
return True
def get_user_usage(self, user_id: str) -> dict:
with self._budget_lock:
budget = self._user_budgets.get(user_id)
if not budget:
return {"user_id": user_id, "usage_pct": 0.0}
return {
"user_id": user_id,
"usage_pct": round(budget.usage_pct * 100, 1),
"tokens_used": budget.tokens_used,
"token_limit": budget.token_limit,
"near_limit": budget.is_near_limit,
}
# ---- 2. 工具级线程池隔离 ----
def get_tool_executor(
self, tool_name: str, max_workers: int = 8, queue_depth: int = 32
) -> ThreadPoolExecutor:
"""获取或创建工具专用线程池。"""
with self._pool_lock:
if tool_name not in self._tool_pools:
self._tool_pools[tool_name] = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=f"agent-tool-{tool_name}",
)
logger.info(
"Created ThreadPool for tool '%s': workers=%d",
tool_name, max_workers,
)
return self._tool_pools[tool_name]
def tool_pool_metrics(self) -> dict:
with self._pool_lock:
return {
name: {
"max_workers": pool._max_workers,
}
for name, pool in self._tool_pools.items()
}
# ---- 3. 会话级资源追踪 ----
def register_session(self, session_id: str) -> None:
with self._session_lock:
self._session_resources[session_id] = {
"files_open": 0,
"created_at": time.monotonic(),
}
def check_session_resource(self, session_id: str,
resource_type: str) -> bool:
"""检查会话资源是否超限。"""
with self._session_lock:
if session_id not in self._session_resources:
self._session_resources[session_id] = {
"files_open": 0,
"created_at": time.monotonic(),
}
resources = self._session_resources[session_id]
if resource_type == "file_descriptor":
if resources.get("files_open", 0) >= self.max_fds_per_session:
raise BulkheadCapacityExceeded(
f"session:{session_id}", "file_descriptors",
resources["files_open"], self.max_fds_per_session,
)
with self._session_lock:
self._session_resources[session_id]["files_open"] += 1
return True
return True
def release_session_resource(self, session_id: str,
resource_type: str) -> None:
with self._session_lock:
if session_id in self._session_resources:
if resource_type == "file_descriptor":
self._session_resources[session_id]["files_open"] = max(
0, self._session_resources[session_id].get(
"files_open", 1) - 1
)
def cleanup_session(self, session_id: str) -> None:
with self._session_lock:
self._session_resources.pop(session_id, None)
# ---- 可观测性 ----
def get_metrics(self) -> dict:
return {
"user_budgets": {
uid: self.get_user_usage(uid)
for uid in list(self._user_budgets.keys())[:20]
},
"tool_pools": self.tool_pool_metrics(),
"active_sessions": len(self._session_resources),
}
# ---- 使用示例 ----
if __name__ == "__main__":
bulkhead = AgentBulkhead(default_token_limit=50_000)
# 用户级预算检查
try:
bulkhead.check_user_budget("user-42", estimated_tokens=3000)
print("[User-42] Budget OK")
bulkhead.check_user_budget("user-42", estimated_tokens=48000)
print("[User-42] Budget near limit -- soft warning triggered")
except BulkheadCapacityExceeded as e:
print(f"[User-42] Budget exceeded: {e}")
# 工具级线程池
search_pool = bulkhead.get_tool_executor("search_docs", max_workers=4)
db_pool = bulkhead.get_tool_executor("database_query", max_workers=8)
print("Tool pools:", bulkhead.tool_pool_metrics())
# 会话级资源
bulkhead.register_session("sess-abc")
bulkhead.check_session_resource("sess-abc", "file_descriptor")
print("Session resource check passed")
print("Bulkhead metrics:", bulkhead.get_metrics())
设计要点:
- 用户级预算是成本控制的第一道防线:它阻止单一用户或租户通过大量并发请求消耗全局 Token 配额。与限流器不同,用户预算按用户身份分区——而非按 Provider。在 SaaS Agent 场景中,这直接影响到你的利润率。
- 工具级线程池是 Agent 特有的需求:微服务 Bulkhead 通常隔离的是服务(如"订单服务"与"支付服务"分配不同的连接池)。Agent 需要隔离的是工具——同一 Agent 实例内部的 search 和 database_query 工具。因为 Agent 在一次推理中可能并发调用 3-4 个工具,如果一个工具的线程被耗尽,其他工具不受影响。
- 与运行时隔离的关系:Bulkhead 提供的是逻辑资源隔离(Token 预算、线程池、文件描述符),而运行时隔离(容器、Firecracker、gVisor)提供的是物理进程隔离。这两层共同构成 Agent 的"纵深防御"——参见 Agent 运行时隔离。
5. 优雅降级:当核心依赖不可用时的生存策略
熔断器 Open 了,限流器拒绝了,Bulkhead 接近饱和——Agent 接下来做什么?答案不能是"崩溃"。优雅降级是韧性栈的最后一道防线:当依赖不可用时,Agent 以降低质量但不丧失功能的方式继续运行。
六级降级决策树
降级策略按优先级排序——Agent 从最佳选项开始尝试,逐级后退:
- 模型回退(Model Fallback):主模型不可用(熔断/限流)→ 切换到备用模型。例如 GPT-5.5 → GPT-5.4-mini,Claude Opus → Claude Haiku,或跨 Provider 回退(Claude → DeepSeek)。备用模型可能质量略低,但任务可以继续。
- 工具跳过(Tool Skip):某个非关键工具不可用 → 跳过它,用剩余工具完成任务。例如
get_order挂了,但lookup_shipping还能用——从运输信息推断订单状态。在结果中标注"部分信息"。 - 缓存返回(Cache Return):相同或相似的查询在近期被成功回答过 → 返回缓存结果,附上
stale: true标记。例如"我的订单什么时候到货?"在 10 分钟前刚有人问过——缓存时效性足够,直接返回。 - 异步延迟(Async Defer):同步无法完成 → 将任务入队,后台异步处理,向用户返回"正在处理中,预计 X 分钟内完成"。
- 部分结果(Partial Result):返回当前已有的信息,附上降级元数据:缺少哪些组件、为什么缺少、对结果质量的预估影响。
- 优雅失败(Graceful Failure):所有路径都不通 → 返回清晰的错误指引,而非堆栈追踪。例如:"目前订单查询系统正在维护中(预计 30 分钟内恢复)。您可以:1) 拨打客服电话 400-xxx;2) 访问 self-service 门户查看订单状态。"
降级元数据:让调用方知道"结果不完整"
降级不是一个布尔值——它是一个结构化的描述,告诉上游系统(或最终用户)结果的质量状态。每个降级响应都应携带:
degradation_level:none|partial|fallback|deferred|failedmissing_components:哪些组件不可用(如["get_order", "order_db"])quality_estimate:对结果质量的预估分数(0.0–1.0)。例如只有缓存答案 = 0.7,模型回退 = 0.85,完整结果 = 1.0。fallback_chain:本次请求尝试了哪些降级路径(用于事后分析和改进降级策略)
熔断器联动
降级引擎与熔断器深度集成:当某个工具的熔断器处于 Open 状态时,降级引擎直接跳过该工具(执行工具跳过策略),而不是等到调用失败后再反应。这意味着降级可以在请求规划阶段就做出决策——节省了不必要的等待和 Token 消耗。
代码:GracefulDegradationEngine 类
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Any, Dict, List
import logging
import hashlib
logger = logging.getLogger("agent.degradation")
class DegradationLevel(Enum):
NONE = "none" # 完整结果
PARTIAL = "partial" # 部分组件不可用
FALLBACK = "fallback" # 使用了备用模型/策略
DEFERRED = "deferred" # 转入异步处理
FAILED = "failed" # 优雅失败
@dataclass
class DegradationMetadata:
"""降级响应的元数据,供上游系统判断结果质量。"""
level: DegradationLevel = DegradationLevel.NONE
missing_components: List[str] = field(default_factory=list)
quality_estimate: float = 1.0 # 0.0–1.0
fallback_chain: List[str] = field(default_factory=list)
user_message: Optional[str] = None
class DegradationStrategy:
"""降级策略的抽象基类。"""
def try_execute(self, context: dict) -> Optional[Any]:
raise NotImplementedError
class ModelFallbackStrategy(DegradationStrategy):
"""模型回退:按优先级尝试备用模型。"""
def __init__(self, models: List[str]):
self.models = models
def try_execute(self, context: dict) -> Optional[Any]:
current_model = context.get("current_model")
for model in self.models:
if model != current_model:
logger.info("Degradation: trying fallback model %s", model)
return {"model": model, "result": "fallback response"}
return None
class SkipToolStrategy(DegradationStrategy):
"""工具跳过:允许跳过指定的非关键工具。"""
def __init__(self, optional_tools: List[str]):
self.optional_tools = set(optional_tools)
def try_execute(self, context: dict) -> Optional[Any]:
failed_tool = context.get("failed_tool")
if failed_tool and failed_tool in self.optional_tools:
logger.info("Degradation: skipping optional tool %s", failed_tool)
return {"skipped_tool": failed_tool,
"partial_results": context.get("partial_results", {})}
return None
class CacheReturnStrategy(DegradationStrategy):
"""缓存返回:查找相似查询的缓存结果。"""
def __init__(self, cache: Optional[dict] = None):
self._cache = cache or {}
def _query_fingerprint(self, messages: list) -> str:
content = "".join(m.get("content", "")[:500] for m in messages)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def try_execute(self, context: dict) -> Optional[Any]:
messages = context.get("messages", [])
fp = self._query_fingerprint(messages)
cached = self._cache.get(fp)
if cached is not None:
logger.info("Degradation: returning cached result (fp=%s)", fp)
return {"cache_hit": True, "stale": True, "result": cached}
return None
def store(self, messages: list, result: Any) -> None:
fp = self._query_fingerprint(messages)
self._cache[fp] = result
class AsyncDeferStrategy(DegradationStrategy):
"""异步延迟:将任务入队,返回处理确认。"""
def __init__(self, queue: Optional[Any] = None):
self._queue = queue or []
def try_execute(self, context: dict) -> Optional[Any]:
task = {
"messages": context.get("messages"),
"metadata": context.get("metadata", {}),
}
self._queue.append(task)
logger.info("Degradation: task deferred to async queue (depth=%d)",
len(self._queue))
return {
"deferred": True,
"queue_position": len(self._queue),
"estimated_wait_seconds": len(self._queue) * 15,
}
class GracefulFailureStrategy(DegradationStrategy):
"""优雅失败:返回可执行的错误指引。"""
def __init__(self, fallback_message: str = ""):
self.fallback_message = fallback_message
def try_execute(self, context: dict) -> Optional[Any]:
msg = self.fallback_message or (
"当前服务暂时不可用。请稍后重试,"
"或通过 self-service 门户获取帮助。"
)
return {
"error": "graceful_failure",
"user_message": msg,
"support_contact": "[email protected]",
}
class GracefulDegradationEngine:
"""优雅降级引擎。
按优先级依次尝试降级策略:
model_fallback -> tool_skip -> cache_return ->
async_defer -> partial_result -> graceful_failure
与熔断器集成:工具熔断 → 自动触发 tool_skip。
"""
def __init__(self):
self._strategies: List[tuple] = []
self._circuit_breakers: Dict[str, Any] = {}
self._fallback_chain: List[str] = []
def register_model_fallback(self, models: List[str]):
self._strategies.append(
("model_fallback", ModelFallbackStrategy(models)))
def register_optional_tools(self, tool_names: List[str]):
self._strategies.append(
("tool_skip", SkipToolStrategy(tool_names)))
def register_cache(self, cache=None):
self._strategies.append(
("cache_return", CacheReturnStrategy(cache)))
def register_async_queue(self, queue=None):
self._strategies.append(
("async_defer", AsyncDeferStrategy(queue)))
def register_graceful_failure(self, message: str = ""):
self._strategies.append(
("graceful_failure", GracefulFailureStrategy(message)))
def link_circuit_breakers(self, breakers: Dict[str, Any]) -> None:
"""关联熔断器:工具熔断时,降级引擎自动跳过该工具。"""
self._circuit_breakers = breakers
def execute(self, context: dict) -> tuple:
"""执行降级链,返回 (结果, 降级元数据)。"""
meta = DegradationMetadata()
self._fallback_chain = []
for strategy_name, strategy in self._strategies:
self._fallback_chain.append(strategy_name)
try:
result = strategy.try_execute(context)
if result is not None:
meta.fallback_chain = self._fallback_chain
if strategy_name == "model_fallback":
meta.level = DegradationLevel.FALLBACK
meta.quality_estimate = 0.85
elif strategy_name == "tool_skip":
meta.level = DegradationLevel.PARTIAL
meta.missing_components.append(
context.get("failed_tool", "unknown"))
meta.quality_estimate = 0.75
elif strategy_name == "cache_return":
meta.level = DegradationLevel.PARTIAL
meta.quality_estimate = 0.70
elif strategy_name == "async_defer":
meta.level = DegradationLevel.DEFERRED
meta.quality_estimate = 0.60
elif strategy_name == "graceful_failure":
meta.level = DegradationLevel.FAILED
meta.quality_estimate = 0.0
meta.user_message = result.get("user_message", "")
logger.info(
"Degradation chain succeeded at '%s' (level=%s)",
strategy_name, meta.level.value)
return result, meta
except Exception as e:
logger.warning("Degradation strategy '%s' failed: %s",
strategy_name, e)
continue
# 所有策略都失败了
meta.level = DegradationLevel.FAILED
meta.quality_estimate = 0.0
meta.user_message = "所有降级策略均已失败。请联系支持团队。"
meta.fallback_chain = self._fallback_chain
return {"error": "total_degradation_failure"}, meta
def should_skip_tool(self, tool_name: str) -> bool:
"""检查工具是否因熔断器开启而应被跳过。"""
breaker = self._circuit_breakers.get(tool_name)
if breaker and hasattr(breaker, "state"):
cb_state = breaker.state
if hasattr(cb_state, "value") and cb_state.value == "open":
return True
return False
def get_metrics(self) -> dict:
return {
"strategies_registered": len(self._strategies),
"circuit_breakers_linked": len(self._circuit_breakers),
"last_fallback_chain": self._fallback_chain,
}
# ---- 使用示例 ----
if __name__ == "__main__":
engine = GracefulDegradationEngine()
engine.register_model_fallback(
models=["claude-opus", "claude-haiku",
"deepseek-chat", "gpt-5.4-mini"])
engine.register_optional_tools(
["send_notification", "log_analytics", "enrich_profile"])
engine.register_cache({})
engine.register_async_queue([])
engine.register_graceful_failure(
"当前服务暂时不可用。请稍后重试或联系 [email protected]。")
context = {
"current_model": "claude-opus",
"failed_tool": "enrich_profile",
"messages": [{"role": "user", "content": "查询我的订单状态"}],
"partial_results": {"order_id": "38291", "status_hint": "shipping"},
"metadata": {},
}
result, meta = engine.execute(context)
print(f"Result: {result}")
print(f"Degradation: level={meta.level.value}, "
f"quality={meta.quality_estimate}")
print(f"Fallback chain: {meta.fallback_chain}")
print(f"Missing: {meta.missing_components}")
设计要点:
- 降级链的顺序不能随意调整:模型回退(切换到一个可用模型)优于返回不完整结果(跳过工具/返回缓存),后者优于完全失败。为什么?因为用户期望的是答案——即便是用稍差模型生成的答案,也好过"我们正在处理中,请等待"。
- 降级不是质量妥协——是质量声明:降级元数据中的
quality_estimate让上游系统(或用户界面)能够向用户诚实地说明结果的可信度。如果结果是缓存的且过期了 10 分钟,UI 可以显示"结果可能不是最新的(更新于 10 分钟前)"。 - 与回滚的配合:降级和回滚是互补的——降级让 Agent 在没有依赖的情况下继续运行(质量折中),而回滚让 Agent 撤销已经造成的破坏(安全兜底)。两者配合才完整:韧性模式在故障发生前兜底,回滚在故障发生后恢复。参见 Agent 回滚设计。
6. 模式组合:构建生产级 Agent 韧性架构
前面的 2–5 节展示了四个独立的韧性模式。但在生产环境中,它们不是独立使用的——一个 Agent 请求同时经过所有四层保护。这一节展示它们如何协作。
完整请求生命周期(7 层)
一个 Agent 请求从进入到返回,经过七个韧性检查点:
- Bulkhead 入口检查:用户的 Token 预算是否有余量?请求是否放入正确工具线程池?会话资源(文件描述符)是否在限制内?
- Rate Limiter Token 预算检查:目标 Provider 的 TPM 配额是否足够容纳本次调用的预估 Token?需要等待多久?
- LLM 调用(Circuit Breaker 包装):Provider 熔断器是否 Closed?调用成功?调用后进行 Token 校准和熔断状态更新。
- LLM 失败 → Degradation 引擎决策:Provider 不可用?选择模型回退(fallback chain)。熔断器 Open?降级引擎跳过对故障工具的计划。
- 工具调用前 → Circuit Breaker 检查:目标工具熔断器是否 Closed?Token 成本是否正常?
- 工具调用失败 → Degradation 引擎决策:非关键工具 → 跳过。关键工具不可跳 → 尝试缓存 → 异步延迟 → 部分结果。
- 响应出口 → Bulkhead 释放:Release 用户预算、线程池资源、会话资源。
模式交互矩阵
| 模式 A | 模式 B | 交互关系 | 潜在冲突 |
|---|---|---|---|
| Circuit Breaker | Rate Limiter | 互补。CB 停止无效调用,RL 保护配额。CB Open → RL 无需再检查。 | 如果 RL 极度保守而 CB 未触发,可能导致过度限流。 |
| Circuit Breaker | Bulkhead | 互补。CB 面向外部依赖,Bulkhead 面向内部资源。 | 低冲突。 |
| Circuit Breaker | Degradation | 深度集成。CB Open → Degradation 自动跳过对应工具。 | Degradation fallback 也可能触发 CB(循环降级),需设置最大降级深度。 |
| Rate Limiter | Bulkhead | 互补。RL 是流控(入口),Bulkhead 是资源隔离(执行层)。 | Bulkhead 的用户预算与 RL 的 Token 桶可能重复计算,需明确职责边界。 |
| Rate Limiter | Degradation | RL 拒绝 → Degradation 尝试备用 Provider(不同配额池)。 | 所有 Provider 都限流时,降级链走到 graceful failure。 |
| Bulkhead | Degradation | Bulkhead 资源耗尽 → Degradation 执行 async_defer 或 graceful failure。 | 低冲突。 |
配置默认值
以下是从生产经验中总结的合理起步阈值——这些值不是"正确答案",而是"合理起点",需要根据实际流量和错误模式调整:
| 配置项 | 默认值 | 说明 |
|---|---|---|
| CB 失败阈值 | 5 次 / 60 秒 | 高频 Agent 可下调至 3;低频可上调至 10 |
| CB 冷却时间 | 30 秒 | 工具 API 通常在此时间内可恢复;LLM Provider 建议 60 秒 |
| RL Token 桶容量 | Provider TPM 上限 | 从 Provider 的 Tier 信息获取 |
| RL 令牌预估算因子 | 中英文混合 3.5 字符/Token | 建议集成 tiktoken(OpenAI)或 Anthropic tokenizer |
| Bulkhead 用户预算 | 100,000 Token/分钟/用户 | 取决于用户基数和全局配额 |
| Bulkhead 工具线程池 | 4–8 workers/工具 | I/O 密集 → 偏高;CPU 密集 → 偏低 |
| 降级最大深度 | 6 级 | 防止降级循环(A→B→C→A) |
代码:ResilientAgentRunner 类
这是全文的核心代码——将所有模式组合为一个可运行的 Agent 管道:
from __future__ import annotations
import time
import logging
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
logger = logging.getLogger("agent.resilient_runner")
@dataclass
class CallResult:
"""封装每次调用的结果(ResilientAgentRunner 内联版本)。"""
success: bool = False
latency_ms: float = 0.0
tokens_consumed: int = 0
semantic_failure: Optional[str] = None
@dataclass
class ResilientAgentConfig:
"""韧性 Agent 的配置——所有模式阈值集中管理。"""
cb_failure_threshold: int = 5
cb_cooldown_seconds: float = 30.0
rl_acquire_timeout_ms: float = 5000
bulkhead_user_token_limit: int = 100_000
bulkhead_tool_workers: int = 6
fallback_models: List[str] = field(default_factory=lambda: [
"claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"
])
optional_tools: List[str] = field(default_factory=lambda: [
"send_notification", "log_analytics"
])
max_degradation_depth: int = 6
@dataclass
class ResilientRunResult:
"""韧性 Agent 的完整运行结果。"""
success: bool
result: Any
circuit_breaker_events: List[dict] = field(default_factory=list)
rate_limiter_metrics: dict = field(default_factory=dict)
bulkhead_metrics: dict = field(default_factory=dict)
degradation_metadata: Optional[Any] = None
total_latency_ms: float = 0.0
tokens_consumed: int = 0
resilience_events: List[str] = field(default_factory=list)
class ResilientAgentRunner:
"""生产级 Agent 韧性运行器。
将四大模式编织为 7 层请求生命周期。
"""
def __init__(self, circuit_breakers, rate_limiter, bulkhead,
degradation_engine, config=None):
self.circuit_breakers = circuit_breakers
self.rate_limiter = rate_limiter
self.bulkhead = bulkhead
self.degradation_engine = degradation_engine
self.config = config or ResilientAgentConfig()
self.degradation_engine.link_circuit_breakers(circuit_breakers)
def run(self, user_id, session_id, provider, messages,
tools_to_call) -> ResilientRunResult:
"""运行一个完整的、带韧性保护的 Agent 请求。"""
start_time = time.monotonic()
result = ResilientRunResult(success=False, result=None)
estimated_tokens = 0
# LAYER 1: Bulkhead 入口检查
try:
estimated_tokens = self.rate_limiter.estimate_tokens(
provider, messages)
self.bulkhead.check_user_budget(user_id, estimated_tokens)
self.bulkhead.register_session(session_id)
self.bulkhead.check_session_resource(
session_id, "file_descriptor")
result.resilience_events.append("bulkhead:entry_passed")
except Exception as e:
result.resilience_events.append(f"bulkhead:entry_failed:{e}")
return result
# LAYER 2: Rate Limiter Token 预算检查
token_acquired = self.rate_limiter.acquire(
provider, estimated_tokens,
timeout_ms=self.config.rl_acquire_timeout_ms)
if not token_acquired:
result.resilience_events.append("rate_limiter:rejected")
ctx = {"messages": messages, "error": "rate_limited"}
deg_result, deg_meta = self.degradation_engine.execute(ctx)
result.result = deg_result
result.degradation_metadata = deg_meta
return result
result.resilience_events.append("rate_limiter:acquired")
# LAYER 3: LLM 调用(Circuit Breaker 包装)
provider_cb = self.circuit_breakers.get(
f"provider:{provider.value}")
llm_response = None
if provider_cb:
try:
provider_cb.before_call()
# === 实际 LLM 调用(生产环境替换为真实调用)===
llm_response = {
"content": "The order #38291 is in transit.",
"tool_calls": [
{"function": {"name": "get_order",
"arguments": '{"order_id": "38291"}'}},
],
"usage": {"total_tokens": 1200},
}
# ================================================
actual_tokens = llm_response.get("usage", {}).get(
"total_tokens", estimated_tokens)
result.tokens_consumed += actual_tokens
self.rate_limiter.release(
provider, actual_tokens, estimated_tokens)
# 创建 CallResult 用于熔断器更新
cr = CallResult(
success=True, latency_ms=100,
tokens_consumed=actual_tokens)
provider_cb.after_call(cr)
result.resilience_events.append(
"circuit_breaker:llm_passed")
except Exception as e:
result.resilience_events.append(
f"circuit_breaker:llm_failed:{e}")
ctx = {"messages": messages,
"current_model": provider.value,
"error": str(e)}
deg_result, deg_meta = self.degradation_engine.execute(ctx)
result.result = deg_result
result.degradation_metadata = deg_meta
return result
# LAYER 5-6: 工具调用前 CB 检查 + 降级
tool_results = {}
skipped_tools = []
# 防止 provider_cb 为 None 时 llm_response 也为 None
if llm_response is None:
llm_response = {}
tool_calls = llm_response.get("tool_calls", [])
for tc in tool_calls:
tool_name = tc.get("function", {}).get("name", "")
if not tool_name:
continue
if self.degradation_engine.should_skip_tool(tool_name):
result.resilience_events.append(
f"degradation:skipping_tool:{tool_name}")
skipped_tools.append(tool_name)
continue
tool_cb = self.circuit_breakers.get(f"tool:{tool_name}")
if tool_cb:
try:
tool_cb.before_call()
tool_results[tool_name] = {
"status": "success",
"data": f"Result from {tool_name}"}
cr = CallResult(
success=True, latency_ms=50,
tokens_consumed=0)
tool_cb.after_call(cr)
result.resilience_events.append(
f"circuit_breaker:tool_passed:{tool_name}")
except Exception as e:
result.resilience_events.append(
f"circuit_breaker:tool_failed:{tool_name}")
ctx = {"failed_tool": tool_name,
"partial_results": tool_results,
"messages": messages}
deg_result, deg_meta = (
self.degradation_engine.execute(ctx))
if deg_meta.level.value != "failed":
tool_results[tool_name] = {
"degraded": True, "reason": str(e)}
skipped_tools.append(tool_name)
result.degradation_metadata = deg_meta
# LAYER 7: 响应出口 → Bulkhead 释放
self.bulkhead.release_session_resource(
session_id, "file_descriptor")
result.resilience_events.append("bulkhead:exit_released")
result.success = True
result.result = {
"content": llm_response.get("content"),
"tool_results": tool_results,
"skipped_tools": skipped_tools,
"resilience": {"events": result.resilience_events},
}
result.total_latency_ms = (
time.monotonic() - start_time) * 1000
result.bulkhead_metrics = self.bulkhead.get_metrics()
return result
def get_metrics(self) -> dict:
"""聚合所有韧性层的指标——供可观测性系统消费。"""
return {
"circuit_breakers": {
name: cb.get_metrics()
for name, cb in self.circuit_breakers.items()
if hasattr(cb, "get_metrics")
},
"rate_limiter": self.rate_limiter.get_metrics(),
"bulkhead": self.bulkhead.get_metrics(),
"degradation": self.degradation_engine.get_metrics(),
}
# ---- 使用示例 ----
if __name__ == "__main__":
# 在实际项目中,这些类从前面各节定义的模块导入:
# from circuit_breaker import AgentCircuitBreaker
# from rate_limiter import TokenAwareRateLimiter, Provider
# from bulkhead import AgentBulkhead
# from degradation import GracefulDegradationEngine
# ---- 初始化四大模式 ----
# 1. Circuit Breakers(Provider + Tool 级别)
provider_cb = AgentCircuitBreaker("provider:deepseek")
tool_cb_get_order = AgentCircuitBreaker("tool:get_order")
tool_cb_lookup = AgentCircuitBreaker("tool:lookup_shipping")
circuit_breakers = {
"provider:deepseek": provider_cb,
"tool:get_order": tool_cb_get_order,
"tool:lookup_shipping": tool_cb_lookup,
}
# 2. Rate Limiter
rl = TokenAwareRateLimiter()
# 3. Bulkhead
bulkhead = AgentBulkhead(default_token_limit=100_000)
# 4. Degradation Engine
degradation = GracefulDegradationEngine()
degradation.register_model_fallback([
"claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"])
degradation.register_optional_tools(
["send_notification", "log_analytics"])
degradation.register_cache({})
degradation.register_async_queue([])
degradation.register_graceful_failure(
"当前服务暂时不可用。请稍后重试或联系 [email protected]。")
# ---- 创建 Runner 并执行 ----
runner = ResilientAgentRunner(
circuit_breakers, rl, bulkhead, degradation)
messages = [{"role": "user", "content": "查询订单 #38291 的物流状态"}]
result = runner.run(
user_id="user-42",
session_id="sess-abc",
provider=Provider.DEEPSEEK,
messages=messages,
tools_to_call=["get_order", "lookup_shipping"],
)
print("=" * 50)
print(f"Success: {result.success}")
print(f"Content: {result.result['content']}")
print(f"Tool results: {result.result['tool_results']}")
print(f"Skipped tools: {result.result['skipped_tools']}")
print(f"Total latency: {result.total_latency_ms:.0f}ms")
print(f"Tokens consumed: {result.tokens_consumed}")
print(f"Resilience events: {result.resilience_events}")
print("=" * 50)
print("Runner metrics:", runner.get_metrics())
设计要点:
- 每个层的失败都有明确的下游响应:Bulkhead 拒绝 → 立即返回错误(不消耗 Token 预算)。Rate Limiter 拒绝 → 触发降级到备用 Provider。Circuit Breaker Open → 降级引擎自动跳过该依赖。降级引擎兜底 → 确保永远有响应返回给用户(即使是最坏的 graceful failure)。
- 可观测性是韧性模式的"眼睛":每个层都通过
get_metrics()和resilience_events暴露状态。熔断器状态变化、限流器水位、降级事件频率——这些信号直接驱动告警和容量规划。韧性模式不只是在故障时起作用——它们产生的指标和事件是系统健康的"生命体征"。参见 Agent 可观察性。 - 单一入口模式:
ResilientAgentRunner.run()是 Agent 请求的唯一入口。Agent 的业务逻辑无需关心韧性层——它在run()的框架内执行,所有韧性检查由框架透明处理。这种"框架级韧性注入"是生产 Agent 系统的标准模式。
7. 多模型韧性:跨 OpenAI、Claude 和 DeepSeek 的统一保护
大多数生产 Agent 不依赖单一 LLM Provider——它们使用多个 Provider 来平衡成本、延迟和可用性。但每个 Provider 有不同的故障模式和限流模型。多模型韧性架构将这些差异抽象为统一的保护层。
Provider 抽象层
多模型韧性需要第一块基石:一个 Provider 抽象层,让 Agent 无需关心底层是 OpenAI、Anthropic 还是 DeepSeek。这与 模型无关的 Agent 设计 的核心原则一致——抽象层使得跨 Provider 回退成为可能。
每 Provider 独立熔断 + 独立限流
每个 Provider 有自己独立的熔断器和限流器实例:
- OpenAI 熔断器:检测 5xx + 429(用
retry-after头) + 语义失败 - Anthropic 熔断器:检测 5xx + 429(用
anthropic-ratelimit-*头) + 缓存感知失败 - DeepSeek 熔断器:检测连接超时 + 并发耗尽 + 响应截断(DeepSeek 特有:在高并发下可能返回不完整的流式响应)
当 OpenAI 熔断器 Open 时,只有 OpenAI 的调用被阻断——Anthropic 和 DeepSeek 的调用仍然正常通过。
跨 Provider 回退链
回退链是可配置的优先级列表(例如 ["openai", "anthropic", "deepseek"])。当主 Provider 不可用时,MultiProviderResilienceManager 自动将请求路由到下一个 Provider。路由决策考虑三个因素:
- 可用性:Provider 的熔断器是否 Closed?限流器是否有足够配额?
- 成本:在预算压力下(聚合使用率 > 80%),优先路由到更便宜的 Provider(DeepSeek 通常是成本最低的选择)。
- 延迟:如果 SLA 对延迟敏感,跳过响应慢的 Provider(DeepSeek 可能在高并发下延迟升高)。
DeepSeek:中文读者的核心关注
对于中文团队,DeepSeek 的独特地位使得多模型韧性中 DeepSeek 的策略需要特殊设计:
- DeepSeek 作为成本锚点:DeepSeek 的价格是 OpenAI 的 1/10–1/30。在高频 Agent 场景(如客服、文档处理),将尽可能多的流量路由到 DeepSeek 是成本优化的关键。但 DeepSeek 的并发限制意味着所有请求不能同时落到 DeepSeek——需要一个智能路由层在"优先便宜"和"避免超并发"之间平衡。
- DeepSeek 作为回退锚点:当 OpenAI 和 Anthropic 同时不可用(这在中小规模团队中并不罕见——两个 Provider 都可能因为账单问题、配额耗尽或区域故障而不可用),DeepSeek 是可靠的第三选择。对于中文内容,DeepSeek 的中文理解和生成能力在某些场景下甚至优于 GPT。
- DeepSeek 特有的故障信号:连接被拒绝(非 HTTP 错误码——是 TCP 层拒绝)、响应截断(在高并发下,流式响应可能在中途断开)、速度衰减(响应 Token 速率从正常 50 t/s 降到 5 t/s)。这些信号在传统的 5xx/429 检测中不可见,需要专门的处理逻辑。
代码:MultiProviderResilienceManager 类
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import logging
logger = logging.getLogger("agent.multi_provider")
class ProviderTier(Enum):
PREMIUM = "premium" # OpenAI GPT-5.5, Claude Opus
STANDARD = "standard" # GPT-5.4, Claude Sonnet
BUDGET = "budget" # GPT-5.4-mini, Claude Haiku, DeepSeek
@dataclass
class ProviderInfo:
"""Provider 画像:包含可用性、成本和能力元数据。"""
name: str
tier: ProviderTier
cost_per_1m_input: float = 0.0
cost_per_1m_output: float = 0.0
max_context: int = 128_000
supports_streaming: bool = True
circuit_breaker: Optional[Any] = None # AgentCircuitBreaker
rate_limiter: Optional[Any] = None # TokenAwareRateLimiter
# Provider 成本画像(2026 Q2 近似值,实际以 Provider 定价页为准)
PROVIDER_REGISTRY: Dict[str, ProviderInfo] = {
"openai-gpt55": ProviderInfo(
name="openai-gpt55", tier=ProviderTier.PREMIUM,
cost_per_1m_input=5.00, cost_per_1m_output=16.00,
max_context=256_000,
),
"openai-gpt54": ProviderInfo(
name="openai-gpt54", tier=ProviderTier.STANDARD,
cost_per_1m_input=2.50, cost_per_1m_output=10.00,
),
"anthropic-opus": ProviderInfo(
name="anthropic-opus", tier=ProviderTier.PREMIUM,
cost_per_1m_input=15.00, cost_per_1m_output=75.00,
),
"anthropic-haiku": ProviderInfo(
name="anthropic-haiku", tier=ProviderTier.BUDGET,
cost_per_1m_input=0.80, cost_per_1m_output=4.00,
),
"deepseek-chat": ProviderInfo(
name="deepseek-chat", tier=ProviderTier.BUDGET,
cost_per_1m_input=0.14, cost_per_1m_output=0.28, # 极低成本
max_context=128_000,
),
}
class MultiProviderResilienceManager:
"""多模型韧性管理器。
核心职责:
1. 每 Provider 独立熔断器 + 限流器管理
2. 跨 Provider 回退链(可用性 + 成本 + 延迟)
3. 统一 Token 预算追踪
4. 成本感知路由
"""
def __init__(self, provider_registry=None):
self.providers = provider_registry or PROVIDER_REGISTRY
self._token_usage: Dict[str, int] = {}
self._budget_pressure_threshold: float = 0.80 # 80% 触发成本路由
def attach_circuit_breaker(self, provider_name: str, cb: Any) -> None:
"""为 Provider 绑定独立熔断器。"""
if provider_name in self.providers:
self.providers[provider_name].circuit_breaker = cb
def attach_rate_limiter(self, provider_name: str, rl: Any) -> None:
"""为 Provider 绑定独立限流器。"""
if provider_name in self.providers:
self.providers[provider_name].rate_limiter = rl
def is_provider_available(self, provider_name: str) -> bool:
"""检查 Provider 是否可用(熔断器 Closed + 限流器有配额)。"""
info = self.providers.get(provider_name)
if not info:
return False
if info.circuit_breaker:
cb_state = info.circuit_breaker.state
if hasattr(cb_state, "value") and cb_state.value != "closed":
return False
if info.rate_limiter:
metrics = info.rate_limiter.get_metrics()
provider_metrics = metrics.get(provider_name, {})
if provider_metrics.get("usage_pct", 0) > 95:
return False
return True
def select_provider(
self, preferred: List[str], context=None
) -> Optional[ProviderInfo]:
"""按优先级选择可用 Provider。
决策逻辑:
1. 按 preferred 列表顺序
2. 跳过不可用 Provider
3. 如果全局预算压力 > 80%,BUDGET tier 优先
"""
context = context or {}
total_used = sum(self._token_usage.values())
total_cap = 0
for info in self.providers.values():
if info.rate_limiter:
total_cap = max(total_cap, info.rate_limiter._aggregate_cap)
budget_pressure = (total_used / total_cap) if total_cap > 0 else 0.0
if budget_pressure > self._budget_pressure_threshold:
logger.info(
"Budget pressure %.1f%% > %.0f%%, cost-aware routing",
budget_pressure * 100, self._budget_pressure_threshold * 100)
preferred = sorted(
preferred,
key=lambda p: (
self.providers[p].tier != ProviderTier.BUDGET,
self.providers[p].cost_per_1m_input,
),
)
for name in preferred:
if self.is_provider_available(name):
return self.providers[name]
for name, info in self.providers.items():
if info.tier == ProviderTier.BUDGET and self.is_provider_available(name):
logger.warning("Falling back to budget provider %s", name)
return info
return None
def record_usage(self, provider_name: str, tokens: int) -> None:
self._token_usage[provider_name] = (
self._token_usage.get(provider_name, 0) + tokens)
def get_usage_summary(self) -> dict:
total = sum(self._token_usage.values())
return {
"per_provider": dict(self._token_usage),
"total_tokens": total,
}
def get_failover_chain(self, preferred: List[str]) -> List[str]:
chain = []
for name in preferred:
if self.is_provider_available(name):
chain.append(name)
for name in self.providers:
if name not in chain and self.is_provider_available(name):
chain.append(name)
return chain
def get_metrics(self) -> dict:
metrics = {"providers": {}, "aggregate": self.get_usage_summary()}
for name, info in self.providers.items():
cb_metrics = {}
if info.circuit_breaker and hasattr(info.circuit_breaker, "get_metrics"):
cb_metrics = info.circuit_breaker.get_metrics()
rl_metrics = {}
if info.rate_limiter and hasattr(info.rate_limiter, "get_metrics"):
rl_metrics = info.rate_limiter.get_metrics()
metrics["providers"][name] = {
"tier": info.tier.value,
"available": self.is_provider_available(name),
"cost_per_1m": {"input": info.cost_per_1m_input,
"output": info.cost_per_1m_output},
"circuit_breaker": cb_metrics,
"rate_limiter": rl_metrics,
"tokens_used": self._token_usage.get(name, 0),
}
return metrics
# ---- 使用示例 ----
if __name__ == "__main__":
manager = MultiProviderResilienceManager()
class FakeCB:
def __init__(self, name):
self.name = name
self.state = type("S", (), {"value": "closed"})()
def get_metrics(self):
return {"state": self.state.value}
class FakeRL:
def __init__(self):
self._aggregate_cap = 2_000_000
def get_metrics(self):
return {"aggregate": {"usage_pct": 35.0}}
for name in PROVIDER_REGISTRY:
manager.attach_circuit_breaker(name, FakeCB(name))
manager.attach_rate_limiter(name, FakeRL())
# 模拟 OpenAI 熔断
manager.providers["openai-gpt55"].circuit_breaker.state.value = "open"
preferred = ["openai-gpt55", "anthropic-haiku", "deepseek-chat"]
selected = manager.select_provider(preferred)
if selected:
print(f"Selected: {selected.name} (tier={selected.tier.value})")
else:
print("No provider available!")
chain = manager.get_failover_chain(preferred)
print(f"Failover chain: {chain}")
manager.record_usage("anthropic-haiku", 5000)
manager.record_usage("deepseek-chat", 15000)
print("Usage summary:", manager.get_usage_summary())
设计要点:
- Provider 抽象层是跨 Provider 回退的基础:没有统一接口,每个 Provider 的差异(认证方式、请求格式、响应格式、错误码)会渗透到韧性逻辑中,使回退变得脆弱。关于 Provider 抽象层的设计,参见 模型无关的 Agent 设计。
- 成本感知路由不是优化——是韧性的一部分:当一个 Provider 不可用但另外两个 Provider 可用时,Agent 仍然可以运行。但如果可用 Provider 的成本是通常的 5-10 倍(例如只有 Claude Opus 可用而没有 DeepSeek/Haiku),账单会在一次中断中大幅飙升。成本感知路由确保在韧性回退时也考虑成本。
- DeepSeek 在中文场景中的双重角色:既是成本锚点(日常路由优先目标),又是韧性锚点(其他 Provider 不可用时的兜底)。这使得 DeepSeek 的并发限制管理尤为重要——如果所有流量单向涌入 DeepSeek,它自己的并发限制将成为新的单点故障。
8. 常见问题 + 下一步阅读
常见问题
1. 熔断器和重试有什么区别,什么时候用哪个?
重试(Retry)和熔断器(Circuit Breaker)解决的是两类不同的问题。重试适用于瞬态故障——网络抖动、临时的连接超时、偶尔的 503——这些故障在短时间内可能自行恢复,再试一次就能成功。熔断器适用于持续故障——下游服务已经确认宕机、一个端点在 30 秒内连续失败了 5 次——此时重试只是在浪费时间和 Token,并且在给已经崩溃的服务施加额外负载。在实际部署中,两者是嵌套关系:重试在熔断器内部执行。当熔断器 Closed 时,函数内部的 Retry 逻辑正常工作;当熔断器 Open 时,before_call() 直接抛异常,Retry 还没开始就被打断了——这正是我们想要的行为。
2. 令牌感知限流和传统请求限流的核心区别是什么?
核心区别在消耗的单位。传统限流器消耗"请求数"——每分钟 N 个请求,每个请求消耗 1。这假设每个请求的成本大致相当——在 LLM 的世界里这个假设是错的。一个 50 Token 的分类请求和一个 80,000 Token 的文档分析请求在传统限流器看来完全一样——都消耗 1 个请求槽位。但 Provider 的计费和限流是基于 Token 的(TPM),不是基于请求数的。令牌感知限流改动了这个消耗单位:每次调用消耗的不再是 1,而是此次调用的预估 Token 数。另外,令牌感知限流还需要在调用后从响应头校准——因为客户端不可能精确预估 Token(尤其是输出 Token),所以必须依赖 Provider 在响应中提供的实际计数来保持客户端桶的准确性。
3. Bulkhead 隔离和 Rate Limiting 看起来很像,怎么区分?
二者在"计数和限制"的维度上确实相似,但本质不同:Rate Limiting 是流控——它基于速率和时间窗口决定"这个请求是否应该通过"。被拒绝的请求通常可以稍后重试。Bulkhead 是资源隔离——它基于物理资源(线程、内存、文件句柄)分配独立池,确保一个池的耗尽不影响其他池。Bulkhead 的用户级 Token 预算看起来像限流(限制每分钟 Token 数),但它的动机不是"这个用户调用太快"——而是"如果这个用户耗尽全局 Token 配额,其他用户会受影响吗?"Bulkhead 按隔离域分区(用户、工具、会话),Rate Limiter 按速率和Provider限制。实际系统中的位置:Rate Limiter 在调用入口("有配额吗?"),Bulkhead 在资源分配层("你有自己的专用池")。
4. 优雅降级会不会让用户感知到质量下降?
会的,但感知到质量下降好过感知到服务不可用。这是降级设计的核心哲学——"比没有要好"(better than nothing)。关键是如何管理用户感知:降级元数据中的 quality_estimate 字段让前端可以根据降级级别调整 UI——例如 partial 结果可以加一个黄色信息条"部分信息暂时不可用",fallback 结果可以显示"使用了备用模型,结果可能略有差异"。更重要的是,降级结果的质量通常超过用户的预期——一个从 DeepSeek 返回的缓存答案(quality 0.7)可能已经足够满足"我的订单到哪了"这种查询,用户甚至不会注意到它不完整。真正会让用户不满的是服务完全无响应——降级防止的正是这种最坏情况。
5. 这些模式必须全部实现吗,还是可以逐步落地?
可以也应该逐步落地。推荐的落地顺序是:熔断器 → 令牌感知限流 → 优雅降级 → Bulkhead 隔离。为什么这个顺序?熔断器最直接解决问题——防止 Agent 在故障依赖上烧 Token(只改一个调用点,收益立即可见)。限流器是第二个优先级——防止 429 错误和配额耗尽(因为 429 是 Agent 最常见的生产故障之一)。降级引擎在熔断和限流就位后变得特别有价值——它们为降级提供了明确的触发信号。Bulkhead 最后加——它解决的是"资源共享导致级联故障"的问题,这通常在 Agent 部署到多用户环境后才成为显性痛感。每个模式都可以独立部署并产生独立收益——不需要全部就位才开始看到效果。
6. 多模型韧性架构会不会增加太多复杂度?
如果你只使用一个 LLM Provider 且流量不高,多模型韧性的确可能是过度设计。但有几个信号表明你需要多模型韧性:(1)你的 Agent 已经在生产环境中因为 Provider 宕机而中断过至少一次;(2)你的月度 LLM API 账单超过 $500,且单一 Provider 占 90% 以上;(3)你有成本敏感场景(如客服 Agent 日均 10,000+ 调用),不同 Provider 的价格差直接影响利润率。对于大多数中文团队,一个实用的起步方案是"OpenAI/Claude 为主 + DeepSeek 为回退"——这意味着只需要 2 个 Provider 的韧性管理,复杂度可控,但已经覆盖了绝大多数故障场景。复杂度是真实存在的——但与一次 Provider 中断导致的收入损失相比,多模型韧性的实现成本(估计 3-5 天的工程投入)通常是很划算的投资。
下一步阅读
- Agent 错误恢复 — 重试是韧性体系的第一层,向上还有熔断、限流、Bulkhead 和降级。本文的四大模式与重试形成分层防护:重试处理瞬态故障,韧性模式防护持续故障。
- Agent 可观察性 — 韧性模式的状态变化(熔断器开关、限流器水位、降级事件)是可观测性的关键信号。将这些指标接入 Prometheus/Grafana 是实现主动运维的基础。
- Agent 回滚设计 — 韧性模式在故障发生前兜底,回滚在故障发生后恢复。两者配合才完整:韧性预防故障扩散,回滚撤销已发生的损害。
- Agent 成本可观察性 — 令牌感知限流和用户级预算是成本保护的第一道防线。限流器阻止超额消耗,成本可观察性将消耗归因到团队和项目。
- Agent 运行时隔离 — Bulkhead 资源隔离与运行时沙箱共同构成 Agent 的多层隔离体系。Bulkhead 保护逻辑资源,沙箱保护进程边界。
- 模型无关的 Agent 设计 — 多模型韧性和模型切换需要 Provider 抽象层。模型无关设计是跨 Provider 回退的基础——没有统一接口就没有统一的韧性管理。
- MCP 协议生产指南 — MCP 工具调用受益于相同的熔断/限流模式。MCP 服务端健康状态可直接映射到熔断器的失败信号。