Agent Resilience Patterns: Circuit Breakers, Rate Limiting, Bulkheads, and Graceful Degradation
30-Second Takeaway
- The Core Problem: When agents run in production, they face four classes of failures: provider outages, API rate limits, tool breakdowns, and resource exhaustion. Simple retry logic amplifies these failures--each retry burns tokens, adds load, and prolongs the outage--rather than fixing them.
- The Four-Layer Resilience Stack: Circuit Breaker - Stop calling dead dependencies. Token-Aware Rate Limiter - Protect API quotas from exhaustion. Bulkhead Isolation - Prevent a single task from draining global resources. Graceful Degradation - Keep the system running when core dependencies fail.
- Key Design Principle: Resilience patterns must be injected at fixed positions in the agent request lifecycle--Bulkhead at entry, Rate Limiter before the call, Circuit Breaker wrapping every call, Degradation as the final safety net. Together they form a complete "request enters, resource check, call protection, failure fallback" pipeline.
- What You'll Build: A four-layer resilience architecture for your agent system--implementing
AgentCircuitBreaker,TokenAwareRateLimiter,AgentBulkhead, andGracefulDegradationEnginein Python, then composing them viaResilientAgentRunnerinto a unified protection layer for production agent runtimes.
1. Why Agents Need Resilience Beyond Retries
An e-commerce customer-support agent receives a query at 2:17 AM: “When will my order #38291 arrive?” The agent’s first step is calling the get_order tool to fetch order details. Behind get_order is the order microservice’s /api/v1/orders/38291 endpoint—which is returning HTTP 503 because its database connection pool is exhausted.
The agent’s retry logic triggers: 1st retry (2:17:01), 2nd retry (2:17:03, 2s backoff), 3rd retry (2:17:07, 4s backoff). All three fail. Between retries, the agent’s LLM keeps burning tokens to “think” about its retry strategy—“Order service returned 503, let me retry…” “Still 503, probably just transient…” “Third time’s the charm, let me change the query format…” By 2:17:10, a simple query that should have cost 800 tokens has burned 4,200 tokens—and the order service is still down, the user still has no answer.
This isn’t a code bug. It’s a systemic flaw in retry logic when applied to agents—every retry consumes LLM reasoning tokens, and that “thinking” burns the same scarce resources the system is already struggling with. Worse, retries don’t distinguish failure types: transient network jitter (worth retrying) and a dead downstream service (where retries only make things worse) are handled by the exact same logic.
The Retry Amplification Equation: 1 agent request × N retries × (LLM thinking tokens + tool call cost) = Original cost × N. If 100 concurrent agents all hit the same downstream failure, they collectively hammer the already-crashed service with 100 × N additional requests in 10 seconds—this isn’t recovery, it’s a self-inflicted DoS.
The Four-Layer Resilience Stack: Protection Above Retries
Retries are the first layer of resilience—they answer “maybe trying again will work.” But production agent resilience demands four additional layers on top:
- Circuit Breaker: When a dependency (LLM provider or tool API) fails repeatedly, stop calling it altogether and fail fast. “Don’t send more requests to a service that’s already down—wait for it to recover first.” The circuit breaker is the outer guard around retries: retries can run inside, but when the circuit opens, retries are skipped entirely.
- Token-Aware Rate Limiter: LLM API quotas are modeled as TPM (tokens per minute), not RPM (requests per minute). Traditional request-based rate limiters fail completely in agent scenarios—one request might consume 100 tokens (a simple classification) or 120,000 tokens (a long document analysis). A token-aware limiter lets the agent ask before every call: “Do I have enough token budget remaining to complete this call?”
- Bulkhead Isolation: The ship’s watertight compartments. One agent user shouldn’t exhaust the global token quota or thread pool just because another user is making heavy requests. Bulkheads isolate resources across three dimensions: per-user token budgets, per-tool thread pools, and per-session runtime sandboxes.
- Graceful Degradation: When all three layers above are penetrated—the provider really is down, the tool really is unreachable—the agent shouldn’t crash. The degradation engine provides a priority-ordered decision tree: switch to a fallback model → skip non-critical tools → return cached results → enqueue for async processing → return partial results with quality markers.
These four layers relate to retries in a nested hierarchy: retries sit at the innermost layer (for transient faults), the circuit breaker wraps retries (stopping retries against persistent failures), the rate limiter guards the call entrypoint (ensuring quota exists), and degradation provides the final safety net (what to do when every path fails). For deep coverage of retry design itself (exponential backoff, jitter, retry budgets), see Agent Error Recovery—this article focuses on the architectural resilience layers above retries.
Agent-Specific Failure Modes: Why Microservices Resilience Patterns Don’t Port Directly
If you’re familiar with Resilience4j, Polly, or Hystrix, you might be thinking: “I already use these patterns in my microservices—what’s different about agents?” Three fundamental differences:
- Failure types are different: Microservices only distinguish HTTP 5xx / 4xx / timeout. Agents also need to detect semantic failures—the LLM returns well-formed JSON at HTTP 200, but the tool name inside is hallucinated, the parameter types are wrong, or it references a nonexistent function. These failures look like “success” to a traditional circuit breaker.
- The cost model is different: Microservice retry costs are CPU and network bandwidth. Agent retry costs are LLM tokens—denominated in dollars. A redundant retry is near-zero cost in microservices; in agents it can burn $0.02–$0.50 (depending on model and context length). Token awareness isn’t an optimization—it’s a survival requirement.
- Multi-provider dependency: Microservices typically call your own services. Agents depend simultaneously on multiple LLM providers (OpenAI, Anthropic, DeepSeek) and multiple tool APIs. Each provider has independent failure modes, rate limit models, and backoff strategies. Resilience must be managed across all providers uniformly.
Wow Moment: If your agent uses retries without a circuit breaker, every agent request that hits a down dependency independently runs N retries. 100 concurrent agents = 100 × N extra calls. Add the LLM tokens consumed during each retry’s “thinking” phase, and a 30-minute service outage can burn 2–3 days of your monthly API budget. The circuit breaker doesn’t need to be complex—it just needs to answer one question after detecting consecutive failures: “Is this dependency currently unhealthy, and should I stop trying?”
2. Circuit Breaker Pattern for Agent Tool Calls
The circuit breaker originates from electrical engineering: when current exceeds a safe threshold, the breaker trips and cuts the circuit, preventing fires. In software, it protects the system from being dragged down by continuous calls to a failing dependency.
The Three-State State Machine
The circuit breaker transitions between three states:
- Closed (normal operation): All requests pass through. Each failure increments a counter. When failures exceed the threshold within a time window, the state transitions to Open.
- Open (tripped): All requests are immediately rejected, raising
CircuitBreakerOpenError, without making the actual call. The Open state persists for a cooldown period, then transitions to Half-Open. - Half-Open (probing): A limited number of probe requests are allowed through (typically 1–3). If probes succeed, the breaker resets to Closed. If any probe fails, the breaker returns to Open and the cooldown resets.
Agent-Specific Failure Detection: Signals Deeper Than HTTP Status Codes
Traditional circuit breakers define “failure” as HTTP 5xx or timeout. Agent circuit breakers need to detect three categories of failure:
- HTTP errors: 5xx (server failure), 429 (rate limited), connection timeout. This is the first layer, consistent with traditional breakers.
- Semantic failures: The LLM returned HTTP 200, but the response body contains a hallucinated tool call—for example,
"tool_name": "get_odrer"(typo),"arguments": {"id": "not_an_integer"}(type error), or a reference to a nonexistent function. Semantic failures are just as destructive to an agent as HTTP 500. - Timeout patterns: Beyond total call duration, also track token generation rate. If the LLM produces only 15 tokens in 30 seconds (normally 500+), the provider may be in a degraded state—the circuit breaker should treat this as a failure signal.
Per-Tool vs. Per-Provider Circuit Breakers
Agents need two granularity levels of circuit breaking:
- Per-tool circuit breakers: Maintain an independent breaker for each tool—
get_order,search_docs,send_email. When one tool fails, other tools are unaffected—the agent can continue using healthy tools to complete the task (with the degradation engine skipping failed tools). - Per-provider circuit breakers: Maintain an independent breaker for each LLM provider—
openai,anthropic,deepseek. When one provider goes down globally, traffic shifts to a backup provider.
Both coexist: provider breakers protect the LLM call path, tool breakers protect the tool call path. A typical agent request passes through at least two circuit breakers.
Token-Cost-Aware Circuit Breaking
An easily overlooked signal: when a single LLM call consumes far more tokens than normal, the circuit should trip. For example, a normal “check order status” LLM call uses 500–800 tokens. If a call burns 8,000 tokens (because the LLM got stuck in repetitive reasoning or tried multiple strategies), that is itself a failure signal—continuing to retry only amplifies the cost. The circuit breaker tracks per-call token consumption. When a single call exceeds a token threshold (e.g., > 5,000 tokens and still failed) or when wasted tokens in a rolling window exceed budget (e.g., > 20% of budget wasted in 60 seconds on failed retries), the breaker trips early.
Code: AgentCircuitBreaker Class
from __future__ import annotations
import time
import threading
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional, Any, Dict
import functools
import logging
logger = logging.getLogger("agent.circuit_breaker")
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Tripped, failing fast
HALF_OPEN = "half_open" # Probing recovery
@dataclass
class CircuitBreakerConfig:
"""Circuit breaker configuration--all thresholds tunable per agent scenario."""
failure_threshold: int = 5 # Consecutive failures to trip
success_threshold: int = 2 # Consecutive successes in Half-Open to close
cooldown_seconds: float = 30.0 # Open state cooldown
window_seconds: float = 60.0 # Sliding window for failure counting
# Agent-specific config
token_cost_threshold: int = 5000 # Per-call token threshold -> counted as failure
max_half_open_probes: int = 2 # Max concurrent probes in Half-Open
@dataclass
class CallResult:
"""Encapsulates each call result with agent-specific failure signals."""
success: bool
latency_ms: float
tokens_consumed: int = 0
semantic_failure: Optional[str] = None # e.g. "hallucinated tool name"
http_status: Optional[int] = None
error_message: str = ""
class CircuitBreakerOpenError(Exception):
"""Raised when circuit is open, carrying state info for the caller (e.g., trigger degradation)."""
def __init__(self, breaker_name: str, state: CircuitState, failures: int):
self.breaker_name = breaker_name
self.state = state
self.failures = failures
super().__init__(
f"Circuit breaker '{breaker_name}' is {state.value} "
f"(failures: {failures})"
)
class AgentCircuitBreaker:
"""Circuit breaker implementation for agent scenarios.
Features:
- Three-state machine with sliding window
- HTTP failure + semantic failure + token anomaly detection
- Thread-safe (usable in multi-tool concurrent scenarios)
- Half-Open probe concurrency control
"""
def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self._failures: list[float] = [] # Failure timestamp list (sliding window)
self._successes_in_half_open: int = 0
self._last_failure_time: float = 0.0
self._open_since: float = 0.0
self._half_open_probe_count: int = 0 # Current active probes
self._lock = threading.RLock()
self.last_result: Optional[CallResult] = None
# ---- State machine core ----
def before_call(self) -> None:
"""Pre-call check: if breaker is Open and within cooldown, reject immediately."""
with self._lock:
if self.state == CircuitState.CLOSED:
return
if self.state == CircuitState.OPEN:
elapsed = time.monotonic() - self._open_since
if elapsed >= self.config.cooldown_seconds:
self.state = CircuitState.HALF_OPEN
self._successes_in_half_open = 0
self._half_open_probe_count = 0
logger.info(
"CB [%s] transitioning OPEN -> HALF_OPEN", self.name
)
else:
raise CircuitBreakerOpenError(
self.name, self.state, len(self._failures)
)
if self.state == CircuitState.HALF_OPEN:
if self._half_open_probe_count >= self.config.max_half_open_probes:
raise CircuitBreakerOpenError(
self.name, self.state, len(self._failures)
)
self._half_open_probe_count += 1
def after_call(self, result: CallResult) -> None:
"""Post-call state update: success or failure advances the state machine."""
with self._lock:
self.last_result = result
now = time.monotonic()
cutoff = now - self.config.window_seconds
self._failures = [t for t in self._failures if t > cutoff]
if self.state == CircuitState.HALF_OPEN:
self._half_open_probe_count = max(
0, self._half_open_probe_count - 1
)
is_failure = (
not result.success
or result.semantic_failure is not None
or result.tokens_consumed > self.config.token_cost_threshold
)
if is_failure:
self._failures.append(now)
self._last_failure_time = now
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self._open_since = now
self._successes_in_half_open = 0
logger.warning(
"CB [%s] HALF_OPEN probe FAILED -> OPEN", self.name
)
elif self.state == CircuitState.CLOSED:
if len(self._failures) >= self.config.failure_threshold:
self.state = CircuitState.OPEN
self._open_since = now
logger.error(
"CB [%s] CLOSED -> OPEN! %d failures",
self.name, len(self._failures),
)
else:
if self.state == CircuitState.HALF_OPEN:
self._successes_in_half_open += 1
if self._successes_in_half_open >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self._failures.clear()
logger.info(
"CB [%s] HALF_OPEN -> CLOSED (%d successes)",
self.name, self._successes_in_half_open,
)
elif self.state == CircuitState.CLOSED:
self._failures.clear()
@staticmethod
def is_semantic_failure(response_data: dict) -> Optional[str]:
"""Detect semantic failures in LLM tool-call responses."""
tool_calls = response_data.get("tool_calls", [])
if not tool_calls:
return None
for tc in tool_calls:
func_name = tc.get("function", {}).get("name", "")
if not func_name or len(func_name) < 2:
return f"Malformed tool name: '{func_name}'"
return None
def get_metrics(self) -> dict:
with self._lock:
return {
"name": self.name,
"state": self.state.value,
"failures_in_window": len(self._failures),
"open_since": self._open_since,
"half_open_probes": self._half_open_probe_count,
}
# ---- Decorator API ----
class circuit_breaker:
"""Decorator: auto-wrap agent tool functions with a circuit breaker.
Usage:
breaker = AgentCircuitBreaker("get_order")
@circuit_breaker(breaker)
def get_order(order_id: str) -> dict:
...
"""
def __init__(self, breaker: AgentCircuitBreaker):
self.breaker = breaker
def __call__(self, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
self.breaker.before_call()
start = time.monotonic()
try:
result = func(*args, **kwargs)
elapsed = time.monotonic() - start
self.breaker.after_call(CallResult(
success=True, latency_ms=elapsed * 1000))
return result
except Exception as e:
elapsed = time.monotonic() - start
self.breaker.after_call(CallResult(
success=False, latency_ms=elapsed * 1000,
error_message=str(e)))
raise
return wrapper
# ---- Usage example ----
if __name__ == "__main__":
breaker = AgentCircuitBreaker("search_docs")
# Simulate consecutive failures -> circuit trips
for i in range(6):
try:
breaker.before_call()
raise ConnectionError("Backend unreachable")
except CircuitBreakerOpenError:
print(f"[Call {i}] Circuit OPEN -- fast failing")
break
except ConnectionError:
breaker.after_call(CallResult(
success=False, latency_ms=50,
error_message="Backend unreachable"))
print(f"[Call {i}] Failure recorded")
print("Metrics:", breaker.get_metrics())
Design notes:
- Thread safety:
threading.RLockensures state machine consistency during concurrent multi-tool calls. - Why not pybreaker: pybreaker provides basic HTTP circuit breaking, but does not understand semantic failures or token consumption. Agent circuit breaker logic must be custom.
- Integration with retries: Retry logic sits inside the circuit breaker. When the breaker is Open,
before_call()raises immediately and retries never trigger. See Section 6 for concrete composition patterns.
MCP (Model Context Protocol) tool servers benefit from the same circuit breaker patterns. See MCP Protocol Production Guide for MCP server resilience integration.
3. Token-Aware Rate Limiting: Protecting LLM API Quotas
Why Traditional Request-Based Rate Limiters Fail
Most API rate limiters are request-count-based: at most N requests per minute. This works fine in the REST API world—each request costs roughly the same. But LLM API billing is nothing like that:
| Call Scenario | Input Tokens | Output Tokens | Total Tokens | Equivalent "Standard Requests" |
|---|---|---|---|---|
| Simple classification ("Is this spam?") | 50 | 5 | 55 | 1x |
| Customer support reply (with context) | 800 | 200 | 1,000 | 18x |
| Code review (full PR diff) | 12,000 | 3,000 | 15,000 | 273x |
| Long document summary (50-page PDF) | 80,000 | 5,000 | 85,000 | 1,545x |
If you rate-limit by RPM (requests per minute) to 100 RPM, then 100 simple classifications (5,500 tokens) and 100 long document summaries (8,500,000 tokens) are treated identically—but the latter will instantly exhaust your TPM quota. Traditional rate limiters are completely blind to this discrepancy.
Token Bucket Refactored: From Request Buckets to Token Buckets
The classic Token Bucket algorithm maintains a bucket with N tokens; each request consumes 1 token, and tokens refill at a fixed rate. The agent adaptation is simple but critical: bucket capacity = provider TPM limit; each request consumes not 1 but the estimated token count for that call.
This means:
- Before call: Estimate tokens this call will consume (system prompt + context + baseline estimate)
- During call: If the bucket has insufficient tokens, the request is rejected or queued
- After call: Read actual consumption from provider response headers to calibrate the client-side bucket
Provider Rate Limit Profiles—Especially DeepSeek
Each provider has a different rate limit model:
- OpenAI: RPM + TPM + RPD. Response headers include
x-ratelimit-remaining-requestsandx-ratelimit-remaining-tokens. - Anthropic: RPM + ITPM + OTPM. Cache-hit tokens do not count toward ITPM. Rich headers:
anthropic-ratelimit-requests-remaining,anthropic-ratelimit-tokens-remaining. - DeepSeek: Core limit is concurrent connections (not absolute TPM/RPM). The limiter needs to track active request count and per-request estimated token consumption. DeepSeek pricing is 1/10-1/30 of OpenAI, so a single API key is shared among more agents—concurrency conflicts are more likely.
Response Header Dynamic Adjustment
A client-side rate limiter can never be perfectly accurate. The limiter must read provider response headers after every call to calibrate:
- Remaining tokens: Trust the provider header over the client bucket estimate.
- Retry-After: If a 429 arrives with
Retry-After: 15, freeze token refill for 15 seconds. - Reset time: Use
x-ratelimit-reset-requestsoranthropic-ratelimit-reset-tokensto release conservatively held tokens near reset time.
Client-Side Token Estimation: Decide Before You Know
The solution is estimation + calibration: Use historical data and character-based heuristics to estimate before the call, then calibrate from response headers after the call.
Code: TokenAwareRateLimiter Class (with asyncio support)
from __future__ import annotations
import time
import asyncio
import threading
from dataclasses import dataclass, field
from typing import Optional, Dict
from enum import Enum
import logging
logger = logging.getLogger("agent.rate_limiter")
class Provider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
DEEPSEEK = "deepseek"
@dataclass
class ProviderRateProfile:
"""Provider rate limit profile--based on provider docs and empirical data."""
provider: Provider
tpm_limit: int # Tokens per minute upper bound
rpm_limit: int = 500
max_concurrency: int = 50 # Max concurrent connections (critical for DeepSeek)
chars_per_token: float = 3.5 # Mixed CN/EN default
PROVIDER_PROFILES: Dict[Provider, ProviderRateProfile] = {
Provider.OPENAI: ProviderRateProfile(
provider=Provider.OPENAI, tpm_limit=1_000_000, rpm_limit=500,
chars_per_token=4.0,
),
Provider.ANTHROPIC: ProviderRateProfile(
provider=Provider.ANTHROPIC, tpm_limit=400_000, rpm_limit=400,
chars_per_token=4.0,
),
Provider.DEEPSEEK: ProviderRateProfile(
provider=Provider.DEEPSEEK, tpm_limit=500_000, rpm_limit=300,
max_concurrency=10, # DeepSeek's core constraint
chars_per_token=1.8, # Primarily Chinese
),
}
@dataclass
class TokenBudget:
"""Token bucket: capacity = TPM limit, refills at token/sec rate."""
capacity: float
tokens: float
refill_rate_per_sec: float
last_refill: float = field(default_factory=time.monotonic)
def refill(self) -> None:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity,
self.tokens + elapsed * self.refill_rate_per_sec)
self.last_refill = now
def consume(self, amount: float) -> bool:
self.refill()
if self.tokens >= amount:
self.tokens -= amount
return True
return False
class TokenAwareRateLimiter:
"""Token-aware rate limiter.
Features: Token Bucket refactoring, per-provider independent buckets, response header dynamic calibration,
asyncio async support, DeepSeek concurrency awareness, queue-based backpressure.
"""
def __init__(self, profiles=None):
self.profiles = profiles or PROVIDER_PROFILES
self._budgets: Dict[Provider, TokenBudget] = {}
self._semaphores: Dict[Provider, asyncio.Semaphore] = {}
self._lock = threading.RLock()
self._active_requests: Dict[Provider, int] = {}
for provider, profile in self.profiles.items():
self._budgets[provider] = TokenBudget(
capacity=float(profile.tpm_limit),
tokens=float(profile.tpm_limit),
refill_rate_per_sec=profile.tpm_limit / 60.0,
)
self._semaphores[provider] = asyncio.Semaphore(
profile.max_concurrency)
self._active_requests[provider] = 0
self._aggregate_tokens_consumed: float = 0.0
self._aggregate_cap: float = sum(
p.tpm_limit for p in self.profiles.values())
def estimate_tokens(self, provider: Provider, messages: list) -> int:
"""Estimate token consumption--used pre-call."""
profile = self.profiles.get(provider)
if not profile:
return 1000
total_chars = 0
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
total_chars += len(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and "text" in part:
total_chars += len(part["text"])
return max(1, int(total_chars / profile.chars_per_token) + 500)
def acquire(self, provider: Provider, estimated_tokens: int,
timeout_ms: float = 5000) -> bool:
"""Synchronously acquire token quota."""
budget = self._budgets.get(provider)
if not budget:
return False
deadline = time.monotonic() + timeout_ms / 1000
while time.monotonic() < deadline:
with self._lock:
if budget.consume(float(estimated_tokens)):
self._active_requests[provider] += 1
return True
time.sleep(0.05)
return False
def release(self, provider: Provider, actual_tokens: int,
estimated_tokens: int, response_headers=None):
"""Post-call release/calibrate quota."""
budget = self._budgets.get(provider)
if not budget:
return
with self._lock:
diff = actual_tokens - estimated_tokens
if diff > 0:
budget.tokens = max(0, budget.tokens - diff)
elif diff < 0:
budget.tokens = min(budget.capacity, budget.tokens - diff)
self._active_requests[provider] = max(
0, self._active_requests[provider] - 1)
if response_headers:
remaining_hdr = (
response_headers.get("x-ratelimit-remaining-tokens")
or response_headers.get(
"anthropic-ratelimit-tokens-remaining")
)
if remaining_hdr is not None:
budget.tokens = min(budget.tokens, float(remaining_hdr))
# Retry-After header -> freeze token refill (429 rate-limit protection)
retry_after = response_headers.get("retry-after")
if retry_after is not None:
freeze_sec = float(retry_after)
budget.last_refill = time.monotonic() + freeze_sec
logger.info(
"Provider %s: freezing token refill for %.1fs (Retry-After)",
provider.value, freeze_sec,
)
self._aggregate_tokens_consumed += actual_tokens
async def aacquire(self, provider: Provider, estimated_tokens: int,
timeout_ms: float = 5000) -> bool:
"""Async token quota acquisition + semaphore (critical for DeepSeek)."""
sem = self._semaphores.get(provider)
if not sem:
return False
try:
await asyncio.wait_for(sem.acquire(),
timeout=timeout_ms / 1000)
except asyncio.TimeoutError:
return False
budget = self._budgets.get(provider)
if not budget:
sem.release()
return False
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout_ms / 1000
while loop.time() < deadline:
with self._lock:
if budget.consume(float(estimated_tokens)):
self._active_requests[provider] += 1
return True
await asyncio.sleep(0.05)
sem.release()
return False
async def arelease(self, provider: Provider, actual_tokens: int,
estimated_tokens: int, response_headers=None):
"""Async quota release."""
self.release(provider, actual_tokens, estimated_tokens,
response_headers)
sem = self._semaphores.get(provider)
if sem:
sem.release()
def is_near_limit(self, provider: Provider,
threshold_pct: float = 0.85) -> bool:
"""Check if near limit (to trigger backpressure)."""
budget = self._budgets.get(provider)
if not budget:
return False
budget.refill()
ratio = (budget.tokens / budget.capacity
if budget.capacity > 0 else 1.0)
return ratio < (1.0 - threshold_pct)
@property
def aggregate_consumption_pct(self) -> float:
return (self._aggregate_tokens_consumed / self._aggregate_cap
if self._aggregate_cap > 0 else 0.0)
def get_metrics(self) -> dict:
metrics = {}
for provider, budget in self._budgets.items():
budget.refill()
metrics[provider.value] = {
"tokens_remaining": budget.tokens,
"capacity": budget.capacity,
"usage_pct": round(
(1 - budget.tokens / budget.capacity) * 100, 1
) if budget.capacity > 0 else 0,
"active_requests": self._active_requests.get(provider, 0),
}
metrics["aggregate"] = {
"tokens_consumed": self._aggregate_tokens_consumed,
"total_cap": self._aggregate_cap,
"usage_pct": round(self.aggregate_consumption_pct * 100, 1),
}
return metrics
# ---- Usage example ----
if __name__ == "__main__":
deepseek_profile = ProviderRateProfile(
provider=Provider.DEEPSEEK,
tpm_limit=500_000, rpm_limit=300,
max_concurrency=10, chars_per_token=1.8,
)
limiter = TokenAwareRateLimiter({
Provider.DEEPSEEK: deepseek_profile,
Provider.OPENAI: PROVIDER_PROFILES[Provider.OPENAI],
})
messages = [{"role": "user",
"content": "Summarize this 10-page contract for me."}]
estimated = limiter.estimate_tokens(Provider.DEEPSEEK, messages)
print(f"Estimated tokens: {estimated}")
if limiter.acquire(Provider.DEEPSEEK, estimated, timeout_ms=5000):
print("Token acquired -- proceed with API call")
actual_tokens = 1200
headers = {"x-ratelimit-remaining-tokens": "498000"}
limiter.release(Provider.DEEPSEEK, actual_tokens, estimated,
headers)
else:
print("Rate limited -- must wait or use fallback")
print("Metrics:", limiter.get_metrics())
Design notes:
- DeepSeek concurrency awareness: The limiter uses
asyncio.Semaphoreto control concurrent connections. Ten concurrent connections at 10,000 tokens each = 100,000 tokens, well below 500,000 TPM but already at the concurrency ceiling. - Token-aware rate limiting and cost observability: The rate limiter provides prevention (stopping over-budget calls), while cost observability provides attribution. See Agent Cost Observability.
- Why not use the backoff library: backoff and Tenacity provide backoff after a 429, but do not predict and prevent 429s. Token-aware rate limiting checks quota before the request leaves your system.
4. Bulkhead Pattern: Isolating Failures Across Agent Components
The circuit breaker protects you from failing dependencies. The rate limiter protects you from exhausting quotas. But there's another failure mode: a component inside the system consumes all shared resources, starving other components. This is what the Bulkhead pattern addresses.
Analogy: A ship's hull is divided into watertight compartments. One compartment floods, but the ship stays afloat. Software bulkheads work the same way: partition system resources (threads, connections, memory, token budgets) into isolated pools so that exhausting one pool does not affect others.
Three-Dimensional Agent Isolation
Agent systems need three layers of bulkhead isolation:
- Per-user token budgets: Each user/tenant gets an independent token consumption cap (e.g., 50,000 tokens per minute). When one user triggers many agent tasks, other users' quotas are unaffected. Hard limit = reject call; soft warning = 80% threshold.
- Per-tool thread pools: Each tool type (
search,database_query,send_email) gets its own dedicated thread pool. A slow database query does not block all email sends. - Per-session sandboxes: Each agent session gets an independent temp directory, memory cap, and file descriptor quota.
Bulkhead vs. Rate Limiting: The Key Distinction
Rate Limiter is flow control—it decides whether a request should pass through. Bulkhead is resource isolation—it allocates independent pools from physical resources. Resources are partitioned, not requests. They work together: the Rate Limiter at the Bulkhead entry ensures requests comply with rate limits, and the Bulkhead executes requests within allocated resource pools.
Code: AgentBulkhead Class
from __future__ import annotations
import time
import threading
from dataclasses import dataclass, field
from typing import Dict, Optional
from concurrent.futures import ThreadPoolExecutor
import logging
logger = logging.getLogger("agent.bulkhead")
class BulkheadCapacityExceeded(Exception):
"""Bulkhead resource exhaustion exception."""
def __init__(self, bulkhead_name: str, limit_type: str,
current: float, limit: float):
self.bulkhead_name = bulkhead_name
self.limit_type = limit_type
self.current = current
self.limit = limit
super().__init__(
f"Bulkhead '{bulkhead_name}' {limit_type} exceeded: "
f"{current}/{limit}"
)
@dataclass
class UserTokenBudget:
"""Per-user token budget controller."""
user_id: str
token_limit: int # Hard limit
warning_threshold_pct: float = 0.80 # 80% soft warning
window_seconds: float = 60.0
tokens_used: float = 0.0
last_reset: float = field(default_factory=time.monotonic)
def _maybe_reset(self) -> None:
now = time.monotonic()
if now - self.last_reset >= self.window_seconds:
self.tokens_used = 0.0
self.last_reset = now
def try_consume(self, tokens: int) -> bool:
self._maybe_reset()
if self.tokens_used + tokens > self.token_limit:
return False
self.tokens_used += tokens
return True
@property
def usage_pct(self) -> float:
self._maybe_reset()
return (self.tokens_used / self.token_limit
if self.token_limit > 0 else 0.0)
@property
def is_near_limit(self) -> bool:
return self.usage_pct >= self.warning_threshold_pct
class AgentBulkhead:
"""Agent Bulkhead isolator.
Three isolation dimensions:
1. Per-user token budget -- prevents one user from draining global quota
2. Per-tool thread pool -- isolates slow tools from fast tools
3. Per-session resource quotas -- file descriptor / memory limits
"""
def __init__(
self,
default_token_limit: int = 100_000,
max_file_descriptors_per_session: int = 100,
):
self.default_token_limit = default_token_limit
self.max_fds_per_session = max_file_descriptors_per_session
self._user_budgets: Dict[str, UserTokenBudget] = {}
self._budget_lock = threading.RLock()
self._tool_pools: Dict[str, ThreadPoolExecutor] = {}
self._pool_lock = threading.RLock()
self._session_resources: Dict[str, dict] = {}
self._session_lock = threading.RLock()
# ---- 1. Per-user token budget ----
def check_user_budget(self, user_id: str,
estimated_tokens: int) -> bool:
"""Check user token budget. Returns False = rejected."""
with self._budget_lock:
if user_id not in self._user_budgets:
self._user_budgets[user_id] = UserTokenBudget(
user_id=user_id,
token_limit=self.default_token_limit,
)
budget = self._user_budgets[user_id]
if budget.is_near_limit:
logger.warning(
"User %s approaching token limit: %.1f%%",
user_id, budget.usage_pct * 100,
)
if not budget.try_consume(estimated_tokens):
raise BulkheadCapacityExceeded(
f"user_budget:{user_id}", "token_budget",
budget.tokens_used, budget.token_limit,
)
return True
def get_user_usage(self, user_id: str) -> dict:
with self._budget_lock:
budget = self._user_budgets.get(user_id)
if not budget:
return {"user_id": user_id, "usage_pct": 0.0}
return {
"user_id": user_id,
"usage_pct": round(budget.usage_pct * 100, 1),
"tokens_used": budget.tokens_used,
"token_limit": budget.token_limit,
"near_limit": budget.is_near_limit,
}
# ---- 2. Per-tool thread pool isolation ----
def get_tool_executor(
self, tool_name: str, max_workers: int = 8, queue_depth: int = 32
) -> ThreadPoolExecutor:
"""Get or create a tool-specific thread pool."""
with self._pool_lock:
if tool_name not in self._tool_pools:
self._tool_pools[tool_name] = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=f"agent-tool-{tool_name}",
)
logger.info(
"Created ThreadPool for tool '%s': workers=%d",
tool_name, max_workers,
)
return self._tool_pools[tool_name]
def tool_pool_metrics(self) -> dict:
with self._pool_lock:
return {
name: {
"max_workers": pool._max_workers,
}
for name, pool in self._tool_pools.items()
}
# ---- 3. Per-session resource tracking ----
def register_session(self, session_id: str) -> None:
with self._session_lock:
self._session_resources[session_id] = {
"files_open": 0,
"created_at": time.monotonic(),
}
def check_session_resource(self, session_id: str,
resource_type: str) -> bool:
"""Check if session resource limit is exceeded."""
with self._session_lock:
if session_id not in self._session_resources:
self._session_resources[session_id] = {
"files_open": 0,
"created_at": time.monotonic(),
}
resources = self._session_resources[session_id]
if resource_type == "file_descriptor":
if resources.get("files_open", 0) >= self.max_fds_per_session:
raise BulkheadCapacityExceeded(
f"session:{session_id}", "file_descriptors",
resources["files_open"], self.max_fds_per_session,
)
with self._session_lock:
self._session_resources[session_id]["files_open"] += 1
return True
return True
def release_session_resource(self, session_id: str,
resource_type: str) -> None:
with self._session_lock:
if session_id in self._session_resources:
if resource_type == "file_descriptor":
self._session_resources[session_id]["files_open"] = max(
0, self._session_resources[session_id].get(
"files_open", 1) - 1
)
def cleanup_session(self, session_id: str) -> None:
with self._session_lock:
self._session_resources.pop(session_id, None)
# ---- Observability ----
def get_metrics(self) -> dict:
return {
"user_budgets": {
uid: self.get_user_usage(uid)
for uid in list(self._user_budgets.keys())[:20]
},
"tool_pools": self.tool_pool_metrics(),
"active_sessions": len(self._session_resources),
}
# ---- Usage example ----
if __name__ == "__main__":
bulkhead = AgentBulkhead(default_token_limit=50_000)
# Per-user budget check
try:
bulkhead.check_user_budget("user-42", estimated_tokens=3000)
print("[User-42] Budget OK")
bulkhead.check_user_budget("user-42", estimated_tokens=48000)
print("[User-42] Budget near limit -- soft warning triggered")
except BulkheadCapacityExceeded as e:
print(f"[User-42] Budget exceeded: {e}")
# Per-tool thread pools
search_pool = bulkhead.get_tool_executor("search_docs", max_workers=4)
db_pool = bulkhead.get_tool_executor("database_query", max_workers=8)
print("Tool pools:", bulkhead.tool_pool_metrics())
# Per-session resources
bulkhead.register_session("sess-abc")
bulkhead.check_session_resource("sess-abc", "file_descriptor")
print("Session resource check passed")
print("Bulkhead metrics:", bulkhead.get_metrics())
Design notes:
- Per-user budgets are the first line of cost defense: They prevent a single user from draining the global token quota. Unlike the rate limiter, user budgets are partitioned by user identity.
- Per-tool thread pools are an agent-specific requirement: Microservice bulkheads isolate services; agents need to isolate tools within the same agent instance.
- Relationship with runtime isolation: Bulkheads provide logical resource isolation, while runtime isolation provides physical process isolation. See Agent Runtime Isolation.
5. Graceful Degradation: Survival Strategies When Core Dependencies Fail
The circuit breaker has opened. The rate limiter has rejected. The bulkhead is near saturation. What does the agent do next? The answer cannot be "crash." Graceful degradation is the last line of defense: when dependencies are unavailable, the agent continues operating at reduced quality but without loss of function.
The Six-Level Degradation Decision Tree
Degradation strategies are priority-ordered—the agent tries the best option first and retreats step by step:
- Model fallback: Primary model unavailable (circuit open / rate limited) → switch to a backup model. For example, GPT-5.5 → GPT-5.4-mini, Claude Opus → Claude Haiku, or cross-provider failover (Claude → DeepSeek).
- Tool skip: A non-critical tool is unavailable → skip it and complete the task with remaining tools. Mark the result as partial.
- Cache return: Same or similar query was answered recently → return the cached result with a
stale: truemarker. - Async defer: Synchronous completion impossible → enqueue for background processing, return an acknowledgment.
- Partial result: Return what you have with degradation metadata: which components are missing, why, and estimated quality impact.
- Graceful failure: Every path blocked → return clear, actionable error guidance—not a stack trace.
Degradation Metadata: Telling Callers the Result Is Incomplete
Degradation is not a boolean—it is a structured description. Every degradation response should carry:
degradation_level:none|partial|fallback|deferred|failedmissing_components: Which components are unavailable (e.g.,["get_order", "order_db"])quality_estimate: An estimated quality score (0.0–1.0). For example, cached answer = 0.7, model fallback = 0.85, full result = 1.0.fallback_chain: Which degradation paths this request attempted (for post-hoc analysis).
Circuit Breaker Integration
The degradation engine integrates deeply with circuit breakers: when a tool's circuit breaker is Open, the degradation engine skips that tool directly rather than waiting for the call to fail. This means degradation can make decisions at the request-planning stage—saving unnecessary wait time and token burn.
Code: GracefulDegradationEngine Class
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Any, Dict, List
import logging
import hashlib
logger = logging.getLogger("agent.degradation")
class DegradationLevel(Enum):
NONE = "none" # Full result
PARTIAL = "partial" # Some components unavailable
FALLBACK = "fallback" # Used backup model / strategy
DEFERRED = "deferred" # Routed to async processing
FAILED = "failed" # Graceful failure
@dataclass
class DegradationMetadata:
"""Degradation response metadata for upstream quality assessment."""
level: DegradationLevel = DegradationLevel.NONE
missing_components: List[str] = field(default_factory=list)
quality_estimate: float = 1.0 # 0.0–1.0
fallback_chain: List[str] = field(default_factory=list)
user_message: Optional[str] = None
class DegradationStrategy:
"""Abstract base class for degradation strategies."""
def try_execute(self, context: dict) -> Optional[Any]:
raise NotImplementedError
class ModelFallbackStrategy(DegradationStrategy):
"""Model fallback: try backup models in priority order."""
def __init__(self, models: List[str]):
self.models = models
def try_execute(self, context: dict) -> Optional[Any]:
current_model = context.get("current_model")
for model in self.models:
if model != current_model:
logger.info("Degradation: trying fallback model %s", model)
return {"model": model, "result": "fallback response"}
return None
class SkipToolStrategy(DegradationStrategy):
"""Tool skip: allow skipping specified non-critical tools."""
def __init__(self, optional_tools: List[str]):
self.optional_tools = set(optional_tools)
def try_execute(self, context: dict) -> Optional[Any]:
failed_tool = context.get("failed_tool")
if failed_tool and failed_tool in self.optional_tools:
logger.info("Degradation: skipping optional tool %s", failed_tool)
return {"skipped_tool": failed_tool,
"partial_results": context.get("partial_results", {})}
return None
class CacheReturnStrategy(DegradationStrategy):
"""Cache return: look up similar queries in cache."""
def __init__(self, cache: Optional[dict] = None):
self._cache = cache or {}
def _query_fingerprint(self, messages: list) -> str:
content = "".join(m.get("content", "")[:500] for m in messages)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def try_execute(self, context: dict) -> Optional[Any]:
messages = context.get("messages", [])
fp = self._query_fingerprint(messages)
cached = self._cache.get(fp)
if cached is not None:
logger.info("Degradation: returning cached result (fp=%s)", fp)
return {"cache_hit": True, "stale": True, "result": cached}
return None
def store(self, messages: list, result: Any) -> None:
fp = self._query_fingerprint(messages)
self._cache[fp] = result
class AsyncDeferStrategy(DegradationStrategy):
"""Async defer: enqueue task, return processing acknowledgment."""
def __init__(self, queue: Optional[Any] = None):
self._queue = queue or []
def try_execute(self, context: dict) -> Optional[Any]:
task = {
"messages": context.get("messages"),
"metadata": context.get("metadata", {}),
}
self._queue.append(task)
logger.info("Degradation: task deferred to async queue (depth=%d)",
len(self._queue))
return {
"deferred": True,
"queue_position": len(self._queue),
"estimated_wait_seconds": len(self._queue) * 15,
}
class GracefulFailureStrategy(DegradationStrategy):
"""Graceful failure: return actionable error guidance."""
def __init__(self, fallback_message: str = ""):
self.fallback_message = fallback_message
def try_execute(self, context: dict) -> Optional[Any]:
msg = self.fallback_message or (
"The service is temporarily unavailable. Please try again later or visit the self-service portal."
)
return {
"error": "graceful_failure",
"user_message": msg,
"support_contact": "[email protected]",
}
class GracefulDegradationEngine:
"""Graceful degradation engine.
Attempts strategies in priority order:
model_fallback -> tool_skip -> cache_return ->
async_defer -> partial_result -> graceful_failure
Integrated with circuit breakers: tool circuit open -> auto tool_skip.
"""
def __init__(self):
self._strategies: List[tuple] = []
self._circuit_breakers: Dict[str, Any] = {}
self._fallback_chain: List[str] = []
def register_model_fallback(self, models: List[str]):
self._strategies.append(
("model_fallback", ModelFallbackStrategy(models)))
def register_optional_tools(self, tool_names: List[str]):
self._strategies.append(
("tool_skip", SkipToolStrategy(tool_names)))
def register_cache(self, cache=None):
self._strategies.append(
("cache_return", CacheReturnStrategy(cache)))
def register_async_queue(self, queue=None):
self._strategies.append(
("async_defer", AsyncDeferStrategy(queue)))
def register_graceful_failure(self, message: str = ""):
self._strategies.append(
("graceful_failure", GracefulFailureStrategy(message)))
def link_circuit_breakers(self, breakers: Dict[str, Any]) -> None:
"""Link circuit breakers: tool open -> degradation auto-skips it."""
self._circuit_breakers = breakers
def execute(self, context: dict) -> tuple:
"""Execute degradation chain, return (result, DegradationMetadata)."""
meta = DegradationMetadata()
self._fallback_chain = []
for strategy_name, strategy in self._strategies:
self._fallback_chain.append(strategy_name)
try:
result = strategy.try_execute(context)
if result is not None:
meta.fallback_chain = self._fallback_chain
if strategy_name == "model_fallback":
meta.level = DegradationLevel.FALLBACK
meta.quality_estimate = 0.85
elif strategy_name == "tool_skip":
meta.level = DegradationLevel.PARTIAL
meta.missing_components.append(
context.get("failed_tool", "unknown"))
meta.quality_estimate = 0.75
elif strategy_name == "cache_return":
meta.level = DegradationLevel.PARTIAL
meta.quality_estimate = 0.70
elif strategy_name == "async_defer":
meta.level = DegradationLevel.DEFERRED
meta.quality_estimate = 0.60
elif strategy_name == "graceful_failure":
meta.level = DegradationLevel.FAILED
meta.quality_estimate = 0.0
meta.user_message = result.get("user_message", "")
logger.info(
"Degradation chain succeeded at '%s' (level=%s)",
strategy_name, meta.level.value)
return result, meta
except Exception as e:
logger.warning("Degradation strategy '%s' failed: %s",
strategy_name, e)
continue
# All strategies failed
meta.level = DegradationLevel.FAILED
meta.quality_estimate = 0.0
meta.user_message = "All degradation strategies exhausted. Contact support."
meta.fallback_chain = self._fallback_chain
return {"error": "total_degradation_failure"}, meta
def should_skip_tool(self, tool_name: str) -> bool:
"""Check if a tool should be skipped due to open circuit breaker."""
breaker = self._circuit_breakers.get(tool_name)
if breaker and hasattr(breaker, "state"):
cb_state = breaker.state
if hasattr(cb_state, "value") and cb_state.value == "open":
return True
return False
def get_metrics(self) -> dict:
return {
"strategies_registered": len(self._strategies),
"circuit_breakers_linked": len(self._circuit_breakers),
"last_fallback_chain": self._fallback_chain,
}
# ---- Usage example ----
if __name__ == "__main__":
engine = GracefulDegradationEngine()
engine.register_model_fallback(
models=["claude-opus", "claude-haiku",
"deepseek-chat", "gpt-5.4-mini"])
engine.register_optional_tools(
["send_notification", "log_analytics", "enrich_profile"])
engine.register_cache({})
engine.register_async_queue([])
engine.register_graceful_failure(
"Service temporarily unavailable. Please try again or contact [email protected].")
context = {
"current_model": "claude-opus",
"failed_tool": "enrich_profile",
"messages": [{"role": "user", "content": "Check my order status"}],
"partial_results": {"order_id": "38291", "status_hint": "shipping"},
"metadata": {},
}
result, meta = engine.execute(context)
print(f"Result: {result}")
print(f"Degradation: level={meta.level.value}, "
f"quality={meta.quality_estimate}")
print(f"Fallback chain: {meta.fallback_chain}")
print(f"Missing: {meta.missing_components}")
Design notes:
- The degradation chain order is not arbitrary: Model fallback beats returning incomplete results, which beats total failure. Users expect an answer.
- Degradation is a quality declaration: The
quality_estimatein degradation metadata lets UIs honestly inform users about result reliability. - Relationship with rollback: Degradation lets the agent keep running without dependencies; rollback undoes damage already caused. See Agent Rollback Design.
6. Composing the Patterns: A Production Resilience Architecture
Sections 2–5 introduced four independent resilience patterns. But in production they are not used independently—a single agent request passes through all four layers simultaneously. This section shows how they compose.
Complete Request Lifecycle (7 Layers)
An agent request passes through seven resilience checkpoints from entry to exit:
- Bulkhead entry check: User token budget? Correct tool thread pool? Session resources within limits?
- Rate Limiter token budget check: Does the target provider have sufficient TPM quota for this call?
- LLM call (Circuit Breaker wrapped): Is the provider circuit breaker Closed? Post-call token calibration and circuit state update.
- LLM failure → Degradation engine decision: Provider unavailable? Select model fallback. Circuit breaker Open? Skip the failed tool.
- Before tool call → Circuit Breaker check: Is the tool's circuit breaker Closed? Token cost within normal range?
- Tool call failure → Degradation engine decision: Non-critical tool → skip. Critical tool → try cache → async defer → partial result.
- Response exit → Bulkhead release: Release user budget, thread pool resources, session resources.
Pattern Interaction Matrix
| Pattern A | Pattern B | Interaction | Potential Conflict |
|---|---|---|---|
| Circuit Breaker | Rate Limiter | Complementary. CB stops dead calls; RL protects quota. | Over-throttle if RL too conservative. |
| Circuit Breaker | Bulkhead | Complementary. CB faces external; Bulkhead faces internal. | Low conflict. |
| Circuit Breaker | Degradation | Deeply integrated. CB Open → Degradation auto-skips tool. | Degradation loop possible; enforce max depth. |
| Rate Limiter | Bulkhead | Complementary. RL is flow control; Bulkhead is resource isolation. | Double-counting; clarify boundaries. |
| Rate Limiter | Degradation | RL rejection → Degradation tries alternate provider. | All providers rate-limited → graceful failure. |
| Bulkhead | Degradation | Bulkhead exhaustion → Degradation async_defer or graceful failure. | Low conflict. |
Configuration Defaults
These are reasonable starting thresholds drawn from production experience:
| Config Item | Default | Notes |
|---|---|---|
| CB failure threshold | 5 failures / 60s | Lower to 3 for high-frequency agents; raise to 10 for low-frequency |
| CB cooldown | 30 seconds | Tool APIs typically recover within this window; LLM providers suggest 60s |
| RL token bucket capacity | Provider TPM limit | Read from provider tier information |
| RL token estimation factor | 3.5 chars/token (CN/EN mix) | Consider integrating tiktoken or Anthropic tokenizer |
| Bulkhead user budget | 100,000 tokens/min/user | Depends on user base and global quota |
| Bulkhead tool thread pool | 4–8 workers/tool | I/O-bound → higher; CPU-bound → lower |
| Degradation max depth | 6 levels | Prevents degradation loops |
Code: ResilientAgentRunner Class
This is the core code of the article—composing all four patterns into a runnable agent pipeline:
from __future__ import annotations
import time
import logging
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
logger = logging.getLogger("agent.resilient_runner")
@dataclass
class CallResult:
"""Encapsulates each call result (inline version for ResilientAgentRunner)."""
success: bool = False
latency_ms: float = 0.0
tokens_consumed: int = 0
semantic_failure: Optional[str] = None
@dataclass
class ResilientAgentConfig:
"""Resilient agent configuration--all pattern thresholds centralized."""
cb_failure_threshold: int = 5
cb_cooldown_seconds: float = 30.0
rl_acquire_timeout_ms: float = 5000
bulkhead_user_token_limit: int = 100_000
bulkhead_tool_workers: int = 6
fallback_models: List[str] = field(default_factory=lambda: [
"claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"
])
optional_tools: List[str] = field(default_factory=lambda: [
"send_notification", "log_analytics"
])
max_degradation_depth: int = 6
@dataclass
class ResilientRunResult:
"""Complete result from a resilient agent run."""
success: bool
result: Any
circuit_breaker_events: List[dict] = field(default_factory=list)
rate_limiter_metrics: dict = field(default_factory=dict)
bulkhead_metrics: dict = field(default_factory=dict)
degradation_metadata: Optional[Any] = None
total_latency_ms: float = 0.0
tokens_consumed: int = 0
resilience_events: List[str] = field(default_factory=list)
class ResilientAgentRunner:
"""Production-grade agent resilience runner.
Weaves all four patterns into a 7-layer request lifecycle.
"""
def __init__(self, circuit_breakers, rate_limiter, bulkhead,
degradation_engine, config=None):
self.circuit_breakers = circuit_breakers
self.rate_limiter = rate_limiter
self.bulkhead = bulkhead
self.degradation_engine = degradation_engine
self.config = config or ResilientAgentConfig()
self.degradation_engine.link_circuit_breakers(circuit_breakers)
def run(self, user_id, session_id, provider, messages,
tools_to_call) -> ResilientRunResult:
"""Run a complete resilience-protected agent request."""
start_time = time.monotonic()
result = ResilientRunResult(success=False, result=None)
estimated_tokens = 0
# LAYER 1: Bulkhead entry check
try:
estimated_tokens = self.rate_limiter.estimate_tokens(
provider, messages)
self.bulkhead.check_user_budget(user_id, estimated_tokens)
self.bulkhead.register_session(session_id)
self.bulkhead.check_session_resource(
session_id, "file_descriptor")
result.resilience_events.append("bulkhead:entry_passed")
except Exception as e:
result.resilience_events.append(f"bulkhead:entry_failed:{e}")
return result
# LAYER 2: Rate Limiter token budget check
token_acquired = self.rate_limiter.acquire(
provider, estimated_tokens,
timeout_ms=self.config.rl_acquire_timeout_ms)
if not token_acquired:
result.resilience_events.append("rate_limiter:rejected")
ctx = {"messages": messages, "error": "rate_limited"}
deg_result, deg_meta = self.degradation_engine.execute(ctx)
result.result = deg_result
result.degradation_metadata = deg_meta
return result
result.resilience_events.append("rate_limiter:acquired")
# LAYER 3: LLM call (Circuit Breaker wrapped)
provider_cb = self.circuit_breakers.get(
f"provider:{provider.value}")
llm_response = None
if provider_cb:
try:
provider_cb.before_call()
# === Actual LLM call (replace with real call in production) ===
llm_response = {
"content": "The order #38291 is in transit.",
"tool_calls": [
{"function": {"name": "get_order",
"arguments": '{"order_id": "38291"}'}},
],
"usage": {"total_tokens": 1200},
}
# ================================================
actual_tokens = llm_response.get("usage", {}).get(
"total_tokens", estimated_tokens)
result.tokens_consumed += actual_tokens
self.rate_limiter.release(
provider, actual_tokens, estimated_tokens)
# Create CallResult for circuit breaker update
cr = CallResult(
success=True, latency_ms=100,
tokens_consumed=actual_tokens)
provider_cb.after_call(cr)
result.resilience_events.append(
"circuit_breaker:llm_passed")
except Exception as e:
result.resilience_events.append(
f"circuit_breaker:llm_failed:{e}")
ctx = {"messages": messages,
"current_model": provider.value,
"error": str(e)}
deg_result, deg_meta = self.degradation_engine.execute(ctx)
result.result = deg_result
result.degradation_metadata = deg_meta
return result
# LAYER 5-6: Tool call pre-CB check + degradation
tool_results = {}
skipped_tools = []
# Guard against None when provider_cb was also None
if llm_response is None:
llm_response = {}
tool_calls = llm_response.get("tool_calls", [])
for tc in tool_calls:
tool_name = tc.get("function", {}).get("name", "")
if not tool_name:
continue
if self.degradation_engine.should_skip_tool(tool_name):
result.resilience_events.append(
f"degradation:skipping_tool:{tool_name}")
skipped_tools.append(tool_name)
continue
tool_cb = self.circuit_breakers.get(f"tool:{tool_name}")
if tool_cb:
try:
tool_cb.before_call()
tool_results[tool_name] = {
"status": "success",
"data": f"Result from {tool_name}"}
cr = CallResult(
success=True, latency_ms=50,
tokens_consumed=0)
tool_cb.after_call(cr)
result.resilience_events.append(
f"circuit_breaker:tool_passed:{tool_name}")
except Exception as e:
result.resilience_events.append(
f"circuit_breaker:tool_failed:{tool_name}")
ctx = {"failed_tool": tool_name,
"partial_results": tool_results,
"messages": messages}
deg_result, deg_meta = (
self.degradation_engine.execute(ctx))
if deg_meta.level.value != "failed":
tool_results[tool_name] = {
"degraded": True, "reason": str(e)}
skipped_tools.append(tool_name)
result.degradation_metadata = deg_meta
# LAYER 7: Response exit -> Bulkhead release
self.bulkhead.release_session_resource(
session_id, "file_descriptor")
result.resilience_events.append("bulkhead:exit_released")
result.success = True
result.result = {
"content": llm_response.get("content"),
"tool_results": tool_results,
"skipped_tools": skipped_tools,
"resilience": {"events": result.resilience_events},
}
result.total_latency_ms = (
time.monotonic() - start_time) * 1000
result.bulkhead_metrics = self.bulkhead.get_metrics()
return result
def get_metrics(self) -> dict:
"""Aggregate metrics from all resilience layers--for observability consumption."""
return {
"circuit_breakers": {
name: cb.get_metrics()
for name, cb in self.circuit_breakers.items()
if hasattr(cb, "get_metrics")
},
"rate_limiter": self.rate_limiter.get_metrics(),
"bulkhead": self.bulkhead.get_metrics(),
"degradation": self.degradation_engine.get_metrics(),
}
# ---- Usage example ----
if __name__ == "__main__":
# In a real project, import these from their respective modules:
# from circuit_breaker import AgentCircuitBreaker
# from rate_limiter import TokenAwareRateLimiter, Provider
# from bulkhead import AgentBulkhead
# from degradation import GracefulDegradationEngine
# ---- Initialize four patterns ----
# 1. Circuit Breakers (Provider + Tool level)
provider_cb = AgentCircuitBreaker("provider:deepseek")
tool_cb_get_order = AgentCircuitBreaker("tool:get_order")
tool_cb_lookup = AgentCircuitBreaker("tool:lookup_shipping")
circuit_breakers = {
"provider:deepseek": provider_cb,
"tool:get_order": tool_cb_get_order,
"tool:lookup_shipping": tool_cb_lookup,
}
# 2. Rate Limiter
rl = TokenAwareRateLimiter()
# 3. Bulkhead
bulkhead = AgentBulkhead(default_token_limit=100_000)
# 4. Degradation Engine
degradation = GracefulDegradationEngine()
degradation.register_model_fallback([
"claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"])
degradation.register_optional_tools(
["send_notification", "log_analytics"])
degradation.register_cache({})
degradation.register_async_queue([])
degradation.register_graceful_failure(
"Service temporarily unavailable. Try again later or contact [email protected].")
# ---- Create Runner and execute ----
runner = ResilientAgentRunner(
circuit_breakers, rl, bulkhead, degradation)
messages = [{"role": "user", "content": "Check order #38291 shipping status"}]
result = runner.run(
user_id="user-42",
session_id="sess-abc",
provider=Provider.DEEPSEEK,
messages=messages,
tools_to_call=["get_order", "lookup_shipping"],
)
print("=" * 50)
print(f"Success: {result.success}")
print(f"Content: {result.result['content']}")
print(f"Tool results: {result.result['tool_results']}")
print(f"Skipped tools: {result.result['skipped_tools']}")
print(f"Total latency: {result.total_latency_ms:.0f}ms")
print(f"Tokens consumed: {result.tokens_consumed}")
print(f"Resilience events: {result.resilience_events}")
print("=" * 50)
print("Runner metrics:", runner.get_metrics())
Design notes:
- Every layer failure has a clear downstream response: Bulkhead rejection → immediate error. Rate Limiter rejection → degradation to alternate provider. Circuit Breaker Open → degradation auto-skips. Degradation backstop → guarantees a response.
- Observability is the "eyes" of resilience: Every layer exposes state through
get_metrics()andresilience_events. See Agent Observability. - The single-entry pattern:
ResilientAgentRunner.run()is the sole entry point. Agent business logic executes within the framework; all resilience checks are transparently handled.
7. Multi-Provider Resilience: Unified Protection Across OpenAI, Claude, and DeepSeek
Most production agents use multiple LLM providers to balance cost, latency, and availability. But each provider has different failure modes and rate limit models. Multi-provider resilience architecture abstracts these differences into a unified protection layer.
The Provider Abstraction Layer
Multi-provider resilience requires a provider abstraction layer that lets agents operate without caring whether the underlying provider is OpenAI, Anthropic, or DeepSeek. This aligns with the core principle of Model-Agnostic Agent Design—the abstraction layer is what makes cross-provider failover possible.
Per-Provider Independent Circuit Breakers + Rate Limiters
Each provider gets its own independent circuit breaker and rate limiter instance:
- OpenAI circuit breaker: Detects 5xx + 429 (with
retry-afterheader) + semantic failures - Anthropic circuit breaker: Detects 5xx + 429 + cache-aware failures
- DeepSeek circuit breaker: Detects connection timeout + concurrency exhaustion + response truncation
When the OpenAI circuit breaker opens, only OpenAI calls are blocked—Anthropic and DeepSeek calls continue normally.
Cross-Provider Failover Chain
The failover chain is a configurable priority list (e.g., ["openai", "anthropic", "deepseek"]). Routing decisions consider availability, cost (under budget pressure, prefer cheaper providers), and latency.
DeepSeek: Central Focus for Chinese Teams
For Chinese engineering teams, DeepSeek plays a dual role:
- DeepSeek as cost anchor: DeepSeek pricing is 1/10–1/30 of OpenAI. Intelligent routing balances "prefer cheapest" with "avoid over-concurrency."
- DeepSeek as resilience anchor: When both OpenAI and Anthropic are unavailable, DeepSeek is the reliable third option. For Chinese content, DeepSeek can even exceed GPT in some scenarios.
- DeepSeek-specific failure signals: Connection refused (TCP-level), response truncation, speed degradation. These require specialized handling logic beyond traditional 5xx/429 detection.
Code: MultiProviderResilienceManager Class
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import logging
logger = logging.getLogger("agent.multi_provider")
class ProviderTier(Enum):
PREMIUM = "premium" # OpenAI GPT-5.5, Claude Opus
STANDARD = "standard" # GPT-5.4, Claude Sonnet
BUDGET = "budget" # GPT-5.4-mini, Claude Haiku, DeepSeek
@dataclass
class ProviderInfo:
"""Provider profile: availability, cost, and capability metadata."""
name: str
tier: ProviderTier
cost_per_1m_input: float = 0.0
cost_per_1m_output: float = 0.0
max_context: int = 128_000
supports_streaming: bool = True
circuit_breaker: Optional[Any] = None # AgentCircuitBreaker
rate_limiter: Optional[Any] = None # TokenAwareRateLimiter
# Provider cost profiles (2026 Q2 approximate; check provider pricing pages)
PROVIDER_REGISTRY: Dict[str, ProviderInfo] = {
"openai-gpt55": ProviderInfo(
name="openai-gpt55", tier=ProviderTier.PREMIUM,
cost_per_1m_input=5.00, cost_per_1m_output=16.00,
max_context=256_000,
),
"openai-gpt54": ProviderInfo(
name="openai-gpt54", tier=ProviderTier.STANDARD,
cost_per_1m_input=2.50, cost_per_1m_output=10.00,
),
"anthropic-opus": ProviderInfo(
name="anthropic-opus", tier=ProviderTier.PREMIUM,
cost_per_1m_input=15.00, cost_per_1m_output=75.00,
),
"anthropic-haiku": ProviderInfo(
name="anthropic-haiku", tier=ProviderTier.BUDGET,
cost_per_1m_input=0.80, cost_per_1m_output=4.00,
),
"deepseek-chat": ProviderInfo(
name="deepseek-chat", tier=ProviderTier.BUDGET,
cost_per_1m_input=0.14, cost_per_1m_output=0.28, # Ultra-low cost
max_context=128_000,
),
}
class MultiProviderResilienceManager:
"""Multi-provider resilience manager.
Core responsibilities:
1. Per-provider independent circuit breaker + rate limiter management
2. Cross-provider failover chain (availability + cost + latency)
3. Unified token budget tracking
4. Cost-aware routing
"""
def __init__(self, provider_registry=None):
self.providers = provider_registry or PROVIDER_REGISTRY
self._token_usage: Dict[str, int] = {}
self._budget_pressure_threshold: float = 0.80 # 80% triggers cost routing
def attach_circuit_breaker(self, provider_name: str, cb: Any) -> None:
"""Bind an independent circuit breaker to a provider."""
if provider_name in self.providers:
self.providers[provider_name].circuit_breaker = cb
def attach_rate_limiter(self, provider_name: str, rl: Any) -> None:
"""Bind an independent rate limiter to a provider."""
if provider_name in self.providers:
self.providers[provider_name].rate_limiter = rl
def is_provider_available(self, provider_name: str) -> bool:
"""Check if provider is available (CB Closed + RL has quota)."""
info = self.providers.get(provider_name)
if not info:
return False
if info.circuit_breaker:
cb_state = info.circuit_breaker.state
if hasattr(cb_state, "value") and cb_state.value != "closed":
return False
if info.rate_limiter:
metrics = info.rate_limiter.get_metrics()
provider_metrics = metrics.get(provider_name, {})
if provider_metrics.get("usage_pct", 0) > 95:
return False
return True
def select_provider(
self, preferred: List[str], context=None
) -> Optional[ProviderInfo]:
"""Select an available provider by priority.
Decision logic:
1. Follow preferred list order
2. Skip unavailable providers
3. If global budget pressure > 80%, BUDGET tier gets priority
"""
context = context or {}
total_used = sum(self._token_usage.values())
total_cap = 0
for info in self.providers.values():
if info.rate_limiter:
total_cap = max(total_cap, info.rate_limiter._aggregate_cap)
budget_pressure = (total_used / total_cap) if total_cap > 0 else 0.0
if budget_pressure > self._budget_pressure_threshold:
logger.info(
"Budget pressure %.1f%% > %.0f%%, cost-aware routing",
budget_pressure * 100, self._budget_pressure_threshold * 100)
preferred = sorted(
preferred,
key=lambda p: (
self.providers[p].tier != ProviderTier.BUDGET,
self.providers[p].cost_per_1m_input,
),
)
for name in preferred:
if self.is_provider_available(name):
return self.providers[name]
for name, info in self.providers.items():
if info.tier == ProviderTier.BUDGET and self.is_provider_available(name):
logger.warning("Falling back to budget provider %s", name)
return info
return None
def record_usage(self, provider_name: str, tokens: int) -> None:
self._token_usage[provider_name] = (
self._token_usage.get(provider_name, 0) + tokens)
def get_usage_summary(self) -> dict:
total = sum(self._token_usage.values())
return {
"per_provider": dict(self._token_usage),
"total_tokens": total,
}
def get_failover_chain(self, preferred: List[str]) -> List[str]:
chain = []
for name in preferred:
if self.is_provider_available(name):
chain.append(name)
for name in self.providers:
if name not in chain and self.is_provider_available(name):
chain.append(name)
return chain
def get_metrics(self) -> dict:
metrics = {"providers": {}, "aggregate": self.get_usage_summary()}
for name, info in self.providers.items():
cb_metrics = {}
if info.circuit_breaker and hasattr(info.circuit_breaker, "get_metrics"):
cb_metrics = info.circuit_breaker.get_metrics()
rl_metrics = {}
if info.rate_limiter and hasattr(info.rate_limiter, "get_metrics"):
rl_metrics = info.rate_limiter.get_metrics()
metrics["providers"][name] = {
"tier": info.tier.value,
"available": self.is_provider_available(name),
"cost_per_1m": {"input": info.cost_per_1m_input,
"output": info.cost_per_1m_output},
"circuit_breaker": cb_metrics,
"rate_limiter": rl_metrics,
"tokens_used": self._token_usage.get(name, 0),
}
return metrics
# ---- Usage example ----
if __name__ == "__main__":
manager = MultiProviderResilienceManager()
class FakeCB:
def __init__(self, name):
self.name = name
self.state = type("S", (), {"value": "closed"})()
def get_metrics(self):
return {"state": self.state.value}
class FakeRL:
def __init__(self):
self._aggregate_cap = 2_000_000
def get_metrics(self):
return {"aggregate": {"usage_pct": 35.0}}
for name in PROVIDER_REGISTRY:
manager.attach_circuit_breaker(name, FakeCB(name))
manager.attach_rate_limiter(name, FakeRL())
# Simulate OpenAI circuit open
manager.providers["openai-gpt55"].circuit_breaker.state.value = "open"
preferred = ["openai-gpt55", "anthropic-haiku", "deepseek-chat"]
selected = manager.select_provider(preferred)
if selected:
print(f"Selected: {selected.name} (tier={selected.tier.value})")
else:
print("No provider available!")
chain = manager.get_failover_chain(preferred)
print(f"Failover chain: {chain}")
manager.record_usage("anthropic-haiku", 5000)
manager.record_usage("deepseek-chat", 15000)
print("Usage summary:", manager.get_usage_summary())
Design notes:
- The provider abstraction layer is the foundation of cross-provider failover: Without a unified interface, each provider's differences leak into resilience logic, making failover brittle. See Model-Agnostic Agent Design.
- Cost-aware routing is not optimization—it is resilience: When one provider is down but another is 5–10x the usual cost, your bill spikes during an outage. Cost-aware routing ensures cost is considered during failover.
- DeepSeek dual role: Both cost anchor and resilience anchor. Concurrency management is critical to prevent DeepSeek from becoming a single point of failure.
8. FAQ + Next Steps
Frequently Asked Questions
1. What is the difference between a circuit breaker and a retry, and when should I use each?
Retry and circuit breaker solve two different classes of problems. Retries are for transient faults—network jitter, temporary connection timeouts, occasional 503s—failures that may self-resolve within seconds, where trying again is likely to succeed. Circuit breakers are for persistent faults—the downstream service is confirmed down, an endpoint has failed 5 times in 30 seconds—where retries only waste time and tokens while adding load to an already-crashed service. In deployment, they are nested: retries execute inside the circuit breaker. When the breaker is Closed, retry logic runs normally; when the breaker is Open, before_call() raises immediately and retries never fire—exactly the behavior we want.
2. What is the core difference between token-aware rate limiting and traditional request-based rate limiting?
The core difference is the unit of consumption. Traditional rate limiters consume "requests"—N requests per minute, each consuming 1. This assumes each request costs roughly the same—an assumption that is false in the LLM world. A 50-token classification request and an 80,000-token document analysis request look identical to a traditional limiter—both consume 1 request slot. But provider billing and rate limiting is token-based (TPM), not request-based. Token-aware rate limiting changes the consumption unit: each call consumes not 1 but the estimated token count for that call. Additionally, post-call calibration from response headers is essential—because clients can never precisely estimate tokens (especially output tokens), they must rely on the actual counts the provider returns in its response to keep the client bucket accurate.
3. Bulkhead isolation and rate limiting seem similar—how do I distinguish them?
They are similar on the "counting and limiting" dimension but fundamentally different: Rate Limiting is flow control—it decides "should this request pass through?" based on rate and time windows. Rejected requests can usually be retried later. Bulkhead is resource isolation—it allocates independent pools from physical resources (threads, memory, file handles) and ensures exhausting one pool does not affect others. The bulkhead's user-level token budget looks like rate limiting, but its motivation is not "this user is calling too fast"—it is "if this user drains the global token quota, will other users be affected?" Bulkheads partition by isolation domain (users, tools, sessions); Rate Limiters partition by rate and provider. In the architecture: the Rate Limiter sits at the call entry; the Bulkhead sits at the resource allocation layer.
4. Will graceful degradation make users perceive quality degradation?
Yes, but perceiving quality degradation beats perceiving service unavailability. This is the core philosophy of degradation design—"better than nothing." The key is managing user perception: the quality_estimate field in degradation metadata lets the frontend adjust the UI based on degradation level—for example, a partial result can show a yellow info banner "Some information temporarily unavailable," and a fallback result can display "Using backup model, results may differ slightly." More importantly, degraded results often exceed user expectations—a cached answer from DeepSeek (quality 0.7) may be perfectly sufficient for "Where is my order?" and the user may never notice it is incomplete. What truly frustrates users is complete unresponsiveness—degradation prevents exactly that worst case.
5. Do I need to implement all four patterns, or can I adopt them incrementally?
You can and should adopt them incrementally. The recommended adoption order is: Circuit Breaker → Token-Aware Rate Limiter → Graceful Degradation → Bulkhead Isolation. Why this order? The circuit breaker solves the most immediate problem—preventing agents from burning tokens on dead dependencies (one code change, instantly visible benefit). The rate limiter is second priority—preventing 429 errors and quota exhaustion (since 429s are among the most common production agent failures). The degradation engine becomes especially valuable once circuit breakers and rate limiters are in place—they provide clear trigger signals for degradation. Bulkhead comes last—it solves the "shared resources causing cascading failures" problem, which typically only becomes a visible pain point after the agent is deployed to multi-user environments. Each pattern can be deployed independently and produce independent benefits—you do not need everything in place to start seeing results.
6. Will a multi-provider resilience architecture add too much complexity?
If you only use one LLM provider and your traffic is modest, multi-provider resilience may indeed be over-engineering. But several signals indicate you need it: (1) your agent has already experienced at least one production outage due to a provider going down; (2) your monthly LLM API bill exceeds $500 and a single provider accounts for 90%+; (3) you have cost-sensitive scenarios (e.g., customer support agent handling 10,000+ daily calls) where provider price differences directly impact margins. For most teams, a practical starting point is "OpenAI/Claude as primary + DeepSeek as fallback"—managing resilience for just 2 providers keeps complexity manageable while covering the vast majority of failure scenarios. Complexity is real—but measured against the revenue impact of a single provider outage, the implementation cost of multi-provider resilience (estimated 3–5 days of engineering effort) is typically a very worthwhile investment.
Next Steps / Further Reading
- Agent Error Recovery — Retries are the first layer of the resilience stack; above them sit circuit breakers, rate limiting, bulkheads, and degradation.
- Agent Observability — Resilience pattern state changes (circuit breaker transitions, rate limiter watermarks, degradation events) are key observability signals.
- Agent Rollback Design — Resilience patterns prevent failures from spreading; rollback recovers from failures that did occur.
- Agent Cost Observability — Token-aware rate limiting and per-user budgets are the first line of cost defense.
- Agent Runtime Isolation — Bulkhead resource isolation and runtime sandboxes together form the multi-layer isolation architecture.
- Model-Agnostic Agent Design — Multi-provider resilience and model switching require a provider abstraction layer.
- MCP Protocol Production Guide — MCP tool calls benefit from the same circuit breaker and rate limiting patterns.