Agent Resilience Patterns: Circuit Breakers, Rate Limiting, Bulkheads, and Graceful Degradation

30-Second Takeaway

  • The Core Problem: When agents run in production, they face four classes of failures: provider outages, API rate limits, tool breakdowns, and resource exhaustion. Simple retry logic amplifies these failures--each retry burns tokens, adds load, and prolongs the outage--rather than fixing them.
  • The Four-Layer Resilience Stack: Circuit Breaker - Stop calling dead dependencies. Token-Aware Rate Limiter - Protect API quotas from exhaustion. Bulkhead Isolation - Prevent a single task from draining global resources. Graceful Degradation - Keep the system running when core dependencies fail.
  • Key Design Principle: Resilience patterns must be injected at fixed positions in the agent request lifecycle--Bulkhead at entry, Rate Limiter before the call, Circuit Breaker wrapping every call, Degradation as the final safety net. Together they form a complete "request enters, resource check, call protection, failure fallback" pipeline.
  • What You'll Build: A four-layer resilience architecture for your agent system--implementing AgentCircuitBreaker, TokenAwareRateLimiter, AgentBulkhead, and GracefulDegradationEngine in Python, then composing them via ResilientAgentRunner into a unified protection layer for production agent runtimes.

1. Why Agents Need Resilience Beyond Retries

An e-commerce customer-support agent receives a query at 2:17 AM: “When will my order #38291 arrive?” The agent’s first step is calling the get_order tool to fetch order details. Behind get_order is the order microservice’s /api/v1/orders/38291 endpoint—which is returning HTTP 503 because its database connection pool is exhausted.

The agent’s retry logic triggers: 1st retry (2:17:01), 2nd retry (2:17:03, 2s backoff), 3rd retry (2:17:07, 4s backoff). All three fail. Between retries, the agent’s LLM keeps burning tokens to “think” about its retry strategy—“Order service returned 503, let me retry…” “Still 503, probably just transient…” “Third time’s the charm, let me change the query format…” By 2:17:10, a simple query that should have cost 800 tokens has burned 4,200 tokens—and the order service is still down, the user still has no answer.

This isn’t a code bug. It’s a systemic flaw in retry logic when applied to agents—every retry consumes LLM reasoning tokens, and that “thinking” burns the same scarce resources the system is already struggling with. Worse, retries don’t distinguish failure types: transient network jitter (worth retrying) and a dead downstream service (where retries only make things worse) are handled by the exact same logic.

The Retry Amplification Equation: 1 agent request × N retries × (LLM thinking tokens + tool call cost) = Original cost × N. If 100 concurrent agents all hit the same downstream failure, they collectively hammer the already-crashed service with 100 × N additional requests in 10 seconds—this isn’t recovery, it’s a self-inflicted DoS.

The Four-Layer Resilience Stack: Protection Above Retries

Retries are the first layer of resilience—they answer “maybe trying again will work.” But production agent resilience demands four additional layers on top:

  1. Circuit Breaker: When a dependency (LLM provider or tool API) fails repeatedly, stop calling it altogether and fail fast. “Don’t send more requests to a service that’s already down—wait for it to recover first.” The circuit breaker is the outer guard around retries: retries can run inside, but when the circuit opens, retries are skipped entirely.
  2. Token-Aware Rate Limiter: LLM API quotas are modeled as TPM (tokens per minute), not RPM (requests per minute). Traditional request-based rate limiters fail completely in agent scenarios—one request might consume 100 tokens (a simple classification) or 120,000 tokens (a long document analysis). A token-aware limiter lets the agent ask before every call: “Do I have enough token budget remaining to complete this call?”
  3. Bulkhead Isolation: The ship’s watertight compartments. One agent user shouldn’t exhaust the global token quota or thread pool just because another user is making heavy requests. Bulkheads isolate resources across three dimensions: per-user token budgets, per-tool thread pools, and per-session runtime sandboxes.
  4. Graceful Degradation: When all three layers above are penetrated—the provider really is down, the tool really is unreachable—the agent shouldn’t crash. The degradation engine provides a priority-ordered decision tree: switch to a fallback model → skip non-critical tools → return cached results → enqueue for async processing → return partial results with quality markers.

These four layers relate to retries in a nested hierarchy: retries sit at the innermost layer (for transient faults), the circuit breaker wraps retries (stopping retries against persistent failures), the rate limiter guards the call entrypoint (ensuring quota exists), and degradation provides the final safety net (what to do when every path fails). For deep coverage of retry design itself (exponential backoff, jitter, retry budgets), see Agent Error Recovery—this article focuses on the architectural resilience layers above retries.

Agent-Specific Failure Modes: Why Microservices Resilience Patterns Don’t Port Directly

If you’re familiar with Resilience4j, Polly, or Hystrix, you might be thinking: “I already use these patterns in my microservices—what’s different about agents?” Three fundamental differences:

Wow Moment: If your agent uses retries without a circuit breaker, every agent request that hits a down dependency independently runs N retries. 100 concurrent agents = 100 × N extra calls. Add the LLM tokens consumed during each retry’s “thinking” phase, and a 30-minute service outage can burn 2–3 days of your monthly API budget. The circuit breaker doesn’t need to be complex—it just needs to answer one question after detecting consecutive failures: “Is this dependency currently unhealthy, and should I stop trying?”

2. Circuit Breaker Pattern for Agent Tool Calls

The circuit breaker originates from electrical engineering: when current exceeds a safe threshold, the breaker trips and cuts the circuit, preventing fires. In software, it protects the system from being dragged down by continuous calls to a failing dependency.

The Three-State State Machine

The circuit breaker transitions between three states:

Agent-Specific Failure Detection: Signals Deeper Than HTTP Status Codes

Traditional circuit breakers define “failure” as HTTP 5xx or timeout. Agent circuit breakers need to detect three categories of failure:

  1. HTTP errors: 5xx (server failure), 429 (rate limited), connection timeout. This is the first layer, consistent with traditional breakers.
  2. Semantic failures: The LLM returned HTTP 200, but the response body contains a hallucinated tool call—for example, "tool_name": "get_odrer" (typo), "arguments": {"id": "not_an_integer"} (type error), or a reference to a nonexistent function. Semantic failures are just as destructive to an agent as HTTP 500.
  3. Timeout patterns: Beyond total call duration, also track token generation rate. If the LLM produces only 15 tokens in 30 seconds (normally 500+), the provider may be in a degraded state—the circuit breaker should treat this as a failure signal.

Per-Tool vs. Per-Provider Circuit Breakers

Agents need two granularity levels of circuit breaking:

Both coexist: provider breakers protect the LLM call path, tool breakers protect the tool call path. A typical agent request passes through at least two circuit breakers.

Token-Cost-Aware Circuit Breaking

An easily overlooked signal: when a single LLM call consumes far more tokens than normal, the circuit should trip. For example, a normal “check order status” LLM call uses 500–800 tokens. If a call burns 8,000 tokens (because the LLM got stuck in repetitive reasoning or tried multiple strategies), that is itself a failure signal—continuing to retry only amplifies the cost. The circuit breaker tracks per-call token consumption. When a single call exceeds a token threshold (e.g., > 5,000 tokens and still failed) or when wasted tokens in a rolling window exceed budget (e.g., > 20% of budget wasted in 60 seconds on failed retries), the breaker trips early.

Code: AgentCircuitBreaker Class

from __future__ import annotations

import time
import threading
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional, Any, Dict
import functools
import logging

logger = logging.getLogger("agent.circuit_breaker")


class CircuitState(Enum):
    CLOSED = "closed"        # Normal operation
    OPEN = "open"            # Tripped, failing fast
    HALF_OPEN = "half_open"  # Probing recovery


@dataclass
class CircuitBreakerConfig:
    """Circuit breaker configuration--all thresholds tunable per agent scenario."""
    failure_threshold: int = 5          # Consecutive failures to trip
    success_threshold: int = 2          # Consecutive successes in Half-Open to close
    cooldown_seconds: float = 30.0      # Open state cooldown
    window_seconds: float = 60.0        # Sliding window for failure counting
    # Agent-specific config
    token_cost_threshold: int = 5000    # Per-call token threshold -> counted as failure
    max_half_open_probes: int = 2       # Max concurrent probes in Half-Open


@dataclass
class CallResult:
    """Encapsulates each call result with agent-specific failure signals."""
    success: bool
    latency_ms: float
    tokens_consumed: int = 0
    semantic_failure: Optional[str] = None  # e.g. "hallucinated tool name"
    http_status: Optional[int] = None
    error_message: str = ""


class CircuitBreakerOpenError(Exception):
    """Raised when circuit is open, carrying state info for the caller (e.g., trigger degradation)."""
    def __init__(self, breaker_name: str, state: CircuitState, failures: int):
        self.breaker_name = breaker_name
        self.state = state
        self.failures = failures
        super().__init__(
            f"Circuit breaker '{breaker_name}' is {state.value} "
            f"(failures: {failures})"
        )


class AgentCircuitBreaker:
    """Circuit breaker implementation for agent scenarios.

    Features:
    - Three-state machine with sliding window
    - HTTP failure + semantic failure + token anomaly detection
    - Thread-safe (usable in multi-tool concurrent scenarios)
    - Half-Open probe concurrency control
    """

    def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self._failures: list[float] = []       # Failure timestamp list (sliding window)
        self._successes_in_half_open: int = 0
        self._last_failure_time: float = 0.0
        self._open_since: float = 0.0
        self._half_open_probe_count: int = 0   # Current active probes
        self._lock = threading.RLock()
        self.last_result: Optional[CallResult] = None

    # ---- State machine core ----

    def before_call(self) -> None:
        """Pre-call check: if breaker is Open and within cooldown, reject immediately."""
        with self._lock:
            if self.state == CircuitState.CLOSED:
                return
            if self.state == CircuitState.OPEN:
                elapsed = time.monotonic() - self._open_since
                if elapsed >= self.config.cooldown_seconds:
                    self.state = CircuitState.HALF_OPEN
                    self._successes_in_half_open = 0
                    self._half_open_probe_count = 0
                    logger.info(
                        "CB [%s] transitioning OPEN -> HALF_OPEN", self.name
                    )
                else:
                    raise CircuitBreakerOpenError(
                        self.name, self.state, len(self._failures)
                    )
            if self.state == CircuitState.HALF_OPEN:
                if self._half_open_probe_count >= self.config.max_half_open_probes:
                    raise CircuitBreakerOpenError(
                        self.name, self.state, len(self._failures)
                    )
                self._half_open_probe_count += 1

    def after_call(self, result: CallResult) -> None:
        """Post-call state update: success or failure advances the state machine."""
        with self._lock:
            self.last_result = result
            now = time.monotonic()
            cutoff = now - self.config.window_seconds
            self._failures = [t for t in self._failures if t > cutoff]
            if self.state == CircuitState.HALF_OPEN:
                self._half_open_probe_count = max(
                    0, self._half_open_probe_count - 1
                )

            is_failure = (
                not result.success
                or result.semantic_failure is not None
                or result.tokens_consumed > self.config.token_cost_threshold
            )

            if is_failure:
                self._failures.append(now)
                self._last_failure_time = now
                if self.state == CircuitState.HALF_OPEN:
                    self.state = CircuitState.OPEN
                    self._open_since = now
                    self._successes_in_half_open = 0
                    logger.warning(
                        "CB [%s] HALF_OPEN probe FAILED -> OPEN", self.name
                    )
                elif self.state == CircuitState.CLOSED:
                    if len(self._failures) >= self.config.failure_threshold:
                        self.state = CircuitState.OPEN
                        self._open_since = now
                        logger.error(
                            "CB [%s] CLOSED -> OPEN! %d failures",
                            self.name, len(self._failures),
                        )
            else:
                if self.state == CircuitState.HALF_OPEN:
                    self._successes_in_half_open += 1
                    if self._successes_in_half_open >= self.config.success_threshold:
                        self.state = CircuitState.CLOSED
                        self._failures.clear()
                        logger.info(
                            "CB [%s] HALF_OPEN -> CLOSED (%d successes)",
                            self.name, self._successes_in_half_open,
                        )
                elif self.state == CircuitState.CLOSED:
                    self._failures.clear()

    @staticmethod
    def is_semantic_failure(response_data: dict) -> Optional[str]:
        """Detect semantic failures in LLM tool-call responses."""
        tool_calls = response_data.get("tool_calls", [])
        if not tool_calls:
            return None
        for tc in tool_calls:
            func_name = tc.get("function", {}).get("name", "")
            if not func_name or len(func_name) < 2:
                return f"Malformed tool name: '{func_name}'"
        return None

    def get_metrics(self) -> dict:
        with self._lock:
            return {
                "name": self.name,
                "state": self.state.value,
                "failures_in_window": len(self._failures),
                "open_since": self._open_since,
                "half_open_probes": self._half_open_probe_count,
            }


# ---- Decorator API ----

class circuit_breaker:
    """Decorator: auto-wrap agent tool functions with a circuit breaker.

    Usage:
        breaker = AgentCircuitBreaker("get_order")

        @circuit_breaker(breaker)
        def get_order(order_id: str) -> dict:
            ...
    """

    def __init__(self, breaker: AgentCircuitBreaker):
        self.breaker = breaker

    def __call__(self, func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            self.breaker.before_call()
            start = time.monotonic()
            try:
                result = func(*args, **kwargs)
                elapsed = time.monotonic() - start
                self.breaker.after_call(CallResult(
                    success=True, latency_ms=elapsed * 1000))
                return result
            except Exception as e:
                elapsed = time.monotonic() - start
                self.breaker.after_call(CallResult(
                    success=False, latency_ms=elapsed * 1000,
                    error_message=str(e)))
                raise
        return wrapper


# ---- Usage example ----
if __name__ == "__main__":
    breaker = AgentCircuitBreaker("search_docs")

    # Simulate consecutive failures -> circuit trips
    for i in range(6):
        try:
            breaker.before_call()
            raise ConnectionError("Backend unreachable")
        except CircuitBreakerOpenError:
            print(f"[Call {i}] Circuit OPEN -- fast failing")
            break
        except ConnectionError:
            breaker.after_call(CallResult(
                success=False, latency_ms=50,
                error_message="Backend unreachable"))
            print(f"[Call {i}] Failure recorded")

    print("Metrics:", breaker.get_metrics())

Design notes:

MCP (Model Context Protocol) tool servers benefit from the same circuit breaker patterns. See MCP Protocol Production Guide for MCP server resilience integration.

3. Token-Aware Rate Limiting: Protecting LLM API Quotas

Why Traditional Request-Based Rate Limiters Fail

Most API rate limiters are request-count-based: at most N requests per minute. This works fine in the REST API world—each request costs roughly the same. But LLM API billing is nothing like that:

Call ScenarioInput TokensOutput TokensTotal TokensEquivalent "Standard Requests"
Simple classification ("Is this spam?")505551x
Customer support reply (with context)8002001,00018x
Code review (full PR diff)12,0003,00015,000273x
Long document summary (50-page PDF)80,0005,00085,0001,545x

If you rate-limit by RPM (requests per minute) to 100 RPM, then 100 simple classifications (5,500 tokens) and 100 long document summaries (8,500,000 tokens) are treated identically—but the latter will instantly exhaust your TPM quota. Traditional rate limiters are completely blind to this discrepancy.

Token Bucket Refactored: From Request Buckets to Token Buckets

The classic Token Bucket algorithm maintains a bucket with N tokens; each request consumes 1 token, and tokens refill at a fixed rate. The agent adaptation is simple but critical: bucket capacity = provider TPM limit; each request consumes not 1 but the estimated token count for that call.

This means:

Provider Rate Limit Profiles—Especially DeepSeek

Each provider has a different rate limit model:

Response Header Dynamic Adjustment

A client-side rate limiter can never be perfectly accurate. The limiter must read provider response headers after every call to calibrate:

Client-Side Token Estimation: Decide Before You Know

The solution is estimation + calibration: Use historical data and character-based heuristics to estimate before the call, then calibrate from response headers after the call.

Code: TokenAwareRateLimiter Class (with asyncio support)

from __future__ import annotations

import time
import asyncio
import threading
from dataclasses import dataclass, field
from typing import Optional, Dict
from enum import Enum
import logging

logger = logging.getLogger("agent.rate_limiter")


class Provider(Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    DEEPSEEK = "deepseek"


@dataclass
class ProviderRateProfile:
    """Provider rate limit profile--based on provider docs and empirical data."""
    provider: Provider
    tpm_limit: int                   # Tokens per minute upper bound
    rpm_limit: int = 500
    max_concurrency: int = 50        # Max concurrent connections (critical for DeepSeek)
    chars_per_token: float = 3.5     # Mixed CN/EN default


PROVIDER_PROFILES: Dict[Provider, ProviderRateProfile] = {
    Provider.OPENAI: ProviderRateProfile(
        provider=Provider.OPENAI, tpm_limit=1_000_000, rpm_limit=500,
        chars_per_token=4.0,
    ),
    Provider.ANTHROPIC: ProviderRateProfile(
        provider=Provider.ANTHROPIC, tpm_limit=400_000, rpm_limit=400,
        chars_per_token=4.0,
    ),
    Provider.DEEPSEEK: ProviderRateProfile(
        provider=Provider.DEEPSEEK, tpm_limit=500_000, rpm_limit=300,
        max_concurrency=10,  # DeepSeek's core constraint
        chars_per_token=1.8,  # Primarily Chinese
    ),
}


@dataclass
class TokenBudget:
    """Token bucket: capacity = TPM limit, refills at token/sec rate."""
    capacity: float
    tokens: float
    refill_rate_per_sec: float
    last_refill: float = field(default_factory=time.monotonic)

    def refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity,
                          self.tokens + elapsed * self.refill_rate_per_sec)
        self.last_refill = now

    def consume(self, amount: float) -> bool:
        self.refill()
        if self.tokens >= amount:
            self.tokens -= amount
            return True
        return False


class TokenAwareRateLimiter:
    """Token-aware rate limiter.

    Features: Token Bucket refactoring, per-provider independent buckets, response header dynamic calibration,
    asyncio async support, DeepSeek concurrency awareness, queue-based backpressure.
    """

    def __init__(self, profiles=None):
        self.profiles = profiles or PROVIDER_PROFILES
        self._budgets: Dict[Provider, TokenBudget] = {}
        self._semaphores: Dict[Provider, asyncio.Semaphore] = {}
        self._lock = threading.RLock()
        self._active_requests: Dict[Provider, int] = {}

        for provider, profile in self.profiles.items():
            self._budgets[provider] = TokenBudget(
                capacity=float(profile.tpm_limit),
                tokens=float(profile.tpm_limit),
                refill_rate_per_sec=profile.tpm_limit / 60.0,
            )
            self._semaphores[provider] = asyncio.Semaphore(
                profile.max_concurrency)
            self._active_requests[provider] = 0

        self._aggregate_tokens_consumed: float = 0.0
        self._aggregate_cap: float = sum(
            p.tpm_limit for p in self.profiles.values())

    def estimate_tokens(self, provider: Provider, messages: list) -> int:
        """Estimate token consumption--used pre-call."""
        profile = self.profiles.get(provider)
        if not profile:
            return 1000
        total_chars = 0
        for msg in messages:
            content = msg.get("content", "")
            if isinstance(content, str):
                total_chars += len(content)
            elif isinstance(content, list):
                for part in content:
                    if isinstance(part, dict) and "text" in part:
                        total_chars += len(part["text"])
        return max(1, int(total_chars / profile.chars_per_token) + 500)

    def acquire(self, provider: Provider, estimated_tokens: int,
                timeout_ms: float = 5000) -> bool:
        """Synchronously acquire token quota."""
        budget = self._budgets.get(provider)
        if not budget:
            return False
        deadline = time.monotonic() + timeout_ms / 1000
        while time.monotonic() < deadline:
            with self._lock:
                if budget.consume(float(estimated_tokens)):
                    self._active_requests[provider] += 1
                    return True
            time.sleep(0.05)
        return False

    def release(self, provider: Provider, actual_tokens: int,
                estimated_tokens: int, response_headers=None):
        """Post-call release/calibrate quota."""
        budget = self._budgets.get(provider)
        if not budget:
            return
        with self._lock:
            diff = actual_tokens - estimated_tokens
            if diff > 0:
                budget.tokens = max(0, budget.tokens - diff)
            elif diff < 0:
                budget.tokens = min(budget.capacity, budget.tokens - diff)
            self._active_requests[provider] = max(
                0, self._active_requests[provider] - 1)
            if response_headers:
                remaining_hdr = (
                    response_headers.get("x-ratelimit-remaining-tokens")
                    or response_headers.get(
                        "anthropic-ratelimit-tokens-remaining")
                )
                if remaining_hdr is not None:
                    budget.tokens = min(budget.tokens, float(remaining_hdr))
                # Retry-After header -> freeze token refill (429 rate-limit protection)
                retry_after = response_headers.get("retry-after")
                if retry_after is not None:
                    freeze_sec = float(retry_after)
                    budget.last_refill = time.monotonic() + freeze_sec
                    logger.info(
                        "Provider %s: freezing token refill for %.1fs (Retry-After)",
                        provider.value, freeze_sec,
                    )
            self._aggregate_tokens_consumed += actual_tokens

    async def aacquire(self, provider: Provider, estimated_tokens: int,
                       timeout_ms: float = 5000) -> bool:
        """Async token quota acquisition + semaphore (critical for DeepSeek)."""
        sem = self._semaphores.get(provider)
        if not sem:
            return False
        try:
            await asyncio.wait_for(sem.acquire(),
                                   timeout=timeout_ms / 1000)
        except asyncio.TimeoutError:
            return False
        budget = self._budgets.get(provider)
        if not budget:
            sem.release()
            return False
        loop = asyncio.get_running_loop()
        deadline = loop.time() + timeout_ms / 1000
        while loop.time() < deadline:
            with self._lock:
                if budget.consume(float(estimated_tokens)):
                    self._active_requests[provider] += 1
                    return True
            await asyncio.sleep(0.05)
        sem.release()
        return False

    async def arelease(self, provider: Provider, actual_tokens: int,
                       estimated_tokens: int, response_headers=None):
        """Async quota release."""
        self.release(provider, actual_tokens, estimated_tokens,
                     response_headers)
        sem = self._semaphores.get(provider)
        if sem:
            sem.release()

    def is_near_limit(self, provider: Provider,
                      threshold_pct: float = 0.85) -> bool:
        """Check if near limit (to trigger backpressure)."""
        budget = self._budgets.get(provider)
        if not budget:
            return False
        budget.refill()
        ratio = (budget.tokens / budget.capacity
                 if budget.capacity > 0 else 1.0)
        return ratio < (1.0 - threshold_pct)

    @property
    def aggregate_consumption_pct(self) -> float:
        return (self._aggregate_tokens_consumed / self._aggregate_cap
                if self._aggregate_cap > 0 else 0.0)

    def get_metrics(self) -> dict:
        metrics = {}
        for provider, budget in self._budgets.items():
            budget.refill()
            metrics[provider.value] = {
                "tokens_remaining": budget.tokens,
                "capacity": budget.capacity,
                "usage_pct": round(
                    (1 - budget.tokens / budget.capacity) * 100, 1
                ) if budget.capacity > 0 else 0,
                "active_requests": self._active_requests.get(provider, 0),
            }
        metrics["aggregate"] = {
            "tokens_consumed": self._aggregate_tokens_consumed,
            "total_cap": self._aggregate_cap,
            "usage_pct": round(self.aggregate_consumption_pct * 100, 1),
        }
        return metrics


# ---- Usage example ----
if __name__ == "__main__":
    deepseek_profile = ProviderRateProfile(
        provider=Provider.DEEPSEEK,
        tpm_limit=500_000, rpm_limit=300,
        max_concurrency=10, chars_per_token=1.8,
    )
    limiter = TokenAwareRateLimiter({
        Provider.DEEPSEEK: deepseek_profile,
        Provider.OPENAI: PROVIDER_PROFILES[Provider.OPENAI],
    })
    messages = [{"role": "user",
                 "content": "Summarize this 10-page contract for me."}]
    estimated = limiter.estimate_tokens(Provider.DEEPSEEK, messages)
    print(f"Estimated tokens: {estimated}")
    if limiter.acquire(Provider.DEEPSEEK, estimated, timeout_ms=5000):
        print("Token acquired -- proceed with API call")
        actual_tokens = 1200
        headers = {"x-ratelimit-remaining-tokens": "498000"}
        limiter.release(Provider.DEEPSEEK, actual_tokens, estimated,
                        headers)
    else:
        print("Rate limited -- must wait or use fallback")
    print("Metrics:", limiter.get_metrics())

Design notes:

4. Bulkhead Pattern: Isolating Failures Across Agent Components

The circuit breaker protects you from failing dependencies. The rate limiter protects you from exhausting quotas. But there's another failure mode: a component inside the system consumes all shared resources, starving other components. This is what the Bulkhead pattern addresses.

Analogy: A ship's hull is divided into watertight compartments. One compartment floods, but the ship stays afloat. Software bulkheads work the same way: partition system resources (threads, connections, memory, token budgets) into isolated pools so that exhausting one pool does not affect others.

Three-Dimensional Agent Isolation

Agent systems need three layers of bulkhead isolation:

  1. Per-user token budgets: Each user/tenant gets an independent token consumption cap (e.g., 50,000 tokens per minute). When one user triggers many agent tasks, other users' quotas are unaffected. Hard limit = reject call; soft warning = 80% threshold.
  2. Per-tool thread pools: Each tool type (search, database_query, send_email) gets its own dedicated thread pool. A slow database query does not block all email sends.
  3. Per-session sandboxes: Each agent session gets an independent temp directory, memory cap, and file descriptor quota.

Bulkhead vs. Rate Limiting: The Key Distinction

Rate Limiter is flow control—it decides whether a request should pass through. Bulkhead is resource isolation—it allocates independent pools from physical resources. Resources are partitioned, not requests. They work together: the Rate Limiter at the Bulkhead entry ensures requests comply with rate limits, and the Bulkhead executes requests within allocated resource pools.

Code: AgentBulkhead Class

from __future__ import annotations

import time
import threading
from dataclasses import dataclass, field
from typing import Dict, Optional
from concurrent.futures import ThreadPoolExecutor
import logging

logger = logging.getLogger("agent.bulkhead")


class BulkheadCapacityExceeded(Exception):
    """Bulkhead resource exhaustion exception."""
    def __init__(self, bulkhead_name: str, limit_type: str,
                 current: float, limit: float):
        self.bulkhead_name = bulkhead_name
        self.limit_type = limit_type
        self.current = current
        self.limit = limit
        super().__init__(
            f"Bulkhead '{bulkhead_name}' {limit_type} exceeded: "
            f"{current}/{limit}"
        )


@dataclass
class UserTokenBudget:
    """Per-user token budget controller."""
    user_id: str
    token_limit: int              # Hard limit
    warning_threshold_pct: float = 0.80  # 80% soft warning
    window_seconds: float = 60.0
    tokens_used: float = 0.0
    last_reset: float = field(default_factory=time.monotonic)

    def _maybe_reset(self) -> None:
        now = time.monotonic()
        if now - self.last_reset >= self.window_seconds:
            self.tokens_used = 0.0
            self.last_reset = now

    def try_consume(self, tokens: int) -> bool:
        self._maybe_reset()
        if self.tokens_used + tokens > self.token_limit:
            return False
        self.tokens_used += tokens
        return True

    @property
    def usage_pct(self) -> float:
        self._maybe_reset()
        return (self.tokens_used / self.token_limit
                if self.token_limit > 0 else 0.0)

    @property
    def is_near_limit(self) -> bool:
        return self.usage_pct >= self.warning_threshold_pct


class AgentBulkhead:
    """Agent Bulkhead isolator.

    Three isolation dimensions:
    1. Per-user token budget -- prevents one user from draining global quota
    2. Per-tool thread pool -- isolates slow tools from fast tools
    3. Per-session resource quotas -- file descriptor / memory limits
    """

    def __init__(
        self,
        default_token_limit: int = 100_000,
        max_file_descriptors_per_session: int = 100,
    ):
        self.default_token_limit = default_token_limit
        self.max_fds_per_session = max_file_descriptors_per_session
        self._user_budgets: Dict[str, UserTokenBudget] = {}
        self._budget_lock = threading.RLock()
        self._tool_pools: Dict[str, ThreadPoolExecutor] = {}
        self._pool_lock = threading.RLock()
        self._session_resources: Dict[str, dict] = {}
        self._session_lock = threading.RLock()

    # ---- 1. Per-user token budget ----

    def check_user_budget(self, user_id: str,
                          estimated_tokens: int) -> bool:
        """Check user token budget. Returns False = rejected."""
        with self._budget_lock:
            if user_id not in self._user_budgets:
                self._user_budgets[user_id] = UserTokenBudget(
                    user_id=user_id,
                    token_limit=self.default_token_limit,
                )
            budget = self._user_budgets[user_id]

            if budget.is_near_limit:
                logger.warning(
                    "User %s approaching token limit: %.1f%%",
                    user_id, budget.usage_pct * 100,
                )

            if not budget.try_consume(estimated_tokens):
                raise BulkheadCapacityExceeded(
                    f"user_budget:{user_id}", "token_budget",
                    budget.tokens_used, budget.token_limit,
                )
        return True

    def get_user_usage(self, user_id: str) -> dict:
        with self._budget_lock:
            budget = self._user_budgets.get(user_id)
            if not budget:
                return {"user_id": user_id, "usage_pct": 0.0}
            return {
                "user_id": user_id,
                "usage_pct": round(budget.usage_pct * 100, 1),
                "tokens_used": budget.tokens_used,
                "token_limit": budget.token_limit,
                "near_limit": budget.is_near_limit,
            }

    # ---- 2. Per-tool thread pool isolation ----

    def get_tool_executor(
        self, tool_name: str, max_workers: int = 8, queue_depth: int = 32
    ) -> ThreadPoolExecutor:
        """Get or create a tool-specific thread pool."""
        with self._pool_lock:
            if tool_name not in self._tool_pools:
                self._tool_pools[tool_name] = ThreadPoolExecutor(
                    max_workers=max_workers,
                    thread_name_prefix=f"agent-tool-{tool_name}",
                )
                logger.info(
                    "Created ThreadPool for tool '%s': workers=%d",
                    tool_name, max_workers,
                )
            return self._tool_pools[tool_name]

    def tool_pool_metrics(self) -> dict:
        with self._pool_lock:
            return {
                name: {
                    "max_workers": pool._max_workers,
                }
                for name, pool in self._tool_pools.items()
            }

    # ---- 3. Per-session resource tracking ----

    def register_session(self, session_id: str) -> None:
        with self._session_lock:
            self._session_resources[session_id] = {
                "files_open": 0,
                "created_at": time.monotonic(),
            }

    def check_session_resource(self, session_id: str,
                               resource_type: str) -> bool:
        """Check if session resource limit is exceeded."""
        with self._session_lock:
            if session_id not in self._session_resources:
                self._session_resources[session_id] = {
                    "files_open": 0,
                    "created_at": time.monotonic(),
                }
            resources = self._session_resources[session_id]

        if resource_type == "file_descriptor":
            if resources.get("files_open", 0) >= self.max_fds_per_session:
                raise BulkheadCapacityExceeded(
                    f"session:{session_id}", "file_descriptors",
                    resources["files_open"], self.max_fds_per_session,
                )
            with self._session_lock:
                self._session_resources[session_id]["files_open"] += 1
            return True
        return True

    def release_session_resource(self, session_id: str,
                                 resource_type: str) -> None:
        with self._session_lock:
            if session_id in self._session_resources:
                if resource_type == "file_descriptor":
                    self._session_resources[session_id]["files_open"] = max(
                        0, self._session_resources[session_id].get(
                            "files_open", 1) - 1
                    )

    def cleanup_session(self, session_id: str) -> None:
        with self._session_lock:
            self._session_resources.pop(session_id, None)

    # ---- Observability ----

    def get_metrics(self) -> dict:
        return {
            "user_budgets": {
                uid: self.get_user_usage(uid)
                for uid in list(self._user_budgets.keys())[:20]
            },
            "tool_pools": self.tool_pool_metrics(),
            "active_sessions": len(self._session_resources),
        }


# ---- Usage example ----
if __name__ == "__main__":
    bulkhead = AgentBulkhead(default_token_limit=50_000)

    # Per-user budget check
    try:
        bulkhead.check_user_budget("user-42", estimated_tokens=3000)
        print("[User-42] Budget OK")
        bulkhead.check_user_budget("user-42", estimated_tokens=48000)
        print("[User-42] Budget near limit -- soft warning triggered")
    except BulkheadCapacityExceeded as e:
        print(f"[User-42] Budget exceeded: {e}")

    # Per-tool thread pools
    search_pool = bulkhead.get_tool_executor("search_docs", max_workers=4)
    db_pool = bulkhead.get_tool_executor("database_query", max_workers=8)
    print("Tool pools:", bulkhead.tool_pool_metrics())

    # Per-session resources
    bulkhead.register_session("sess-abc")
    bulkhead.check_session_resource("sess-abc", "file_descriptor")
    print("Session resource check passed")

    print("Bulkhead metrics:", bulkhead.get_metrics())

Design notes:

5. Graceful Degradation: Survival Strategies When Core Dependencies Fail

The circuit breaker has opened. The rate limiter has rejected. The bulkhead is near saturation. What does the agent do next? The answer cannot be "crash." Graceful degradation is the last line of defense: when dependencies are unavailable, the agent continues operating at reduced quality but without loss of function.

The Six-Level Degradation Decision Tree

Degradation strategies are priority-ordered—the agent tries the best option first and retreats step by step:

  1. Model fallback: Primary model unavailable (circuit open / rate limited) → switch to a backup model. For example, GPT-5.5 → GPT-5.4-mini, Claude Opus → Claude Haiku, or cross-provider failover (Claude → DeepSeek).
  2. Tool skip: A non-critical tool is unavailable → skip it and complete the task with remaining tools. Mark the result as partial.
  3. Cache return: Same or similar query was answered recently → return the cached result with a stale: true marker.
  4. Async defer: Synchronous completion impossible → enqueue for background processing, return an acknowledgment.
  5. Partial result: Return what you have with degradation metadata: which components are missing, why, and estimated quality impact.
  6. Graceful failure: Every path blocked → return clear, actionable error guidance—not a stack trace.

Degradation Metadata: Telling Callers the Result Is Incomplete

Degradation is not a boolean—it is a structured description. Every degradation response should carry:

Circuit Breaker Integration

The degradation engine integrates deeply with circuit breakers: when a tool's circuit breaker is Open, the degradation engine skips that tool directly rather than waiting for the call to fail. This means degradation can make decisions at the request-planning stage—saving unnecessary wait time and token burn.

Code: GracefulDegradationEngine Class

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Any, Dict, List
import logging
import hashlib

logger = logging.getLogger("agent.degradation")


class DegradationLevel(Enum):
    NONE = "none"           # Full result
    PARTIAL = "partial"     # Some components unavailable
    FALLBACK = "fallback"   # Used backup model / strategy
    DEFERRED = "deferred"   # Routed to async processing
    FAILED = "failed"       # Graceful failure


@dataclass
class DegradationMetadata:
    """Degradation response metadata for upstream quality assessment."""
    level: DegradationLevel = DegradationLevel.NONE
    missing_components: List[str] = field(default_factory=list)
    quality_estimate: float = 1.0          # 0.0–1.0
    fallback_chain: List[str] = field(default_factory=list)
    user_message: Optional[str] = None


class DegradationStrategy:
    """Abstract base class for degradation strategies."""
    def try_execute(self, context: dict) -> Optional[Any]:
        raise NotImplementedError


class ModelFallbackStrategy(DegradationStrategy):
    """Model fallback: try backup models in priority order."""
    def __init__(self, models: List[str]):
        self.models = models

    def try_execute(self, context: dict) -> Optional[Any]:
        current_model = context.get("current_model")
        for model in self.models:
            if model != current_model:
                logger.info("Degradation: trying fallback model %s", model)
                return {"model": model, "result": "fallback response"}
        return None


class SkipToolStrategy(DegradationStrategy):
    """Tool skip: allow skipping specified non-critical tools."""
    def __init__(self, optional_tools: List[str]):
        self.optional_tools = set(optional_tools)

    def try_execute(self, context: dict) -> Optional[Any]:
        failed_tool = context.get("failed_tool")
        if failed_tool and failed_tool in self.optional_tools:
            logger.info("Degradation: skipping optional tool %s", failed_tool)
            return {"skipped_tool": failed_tool,
                    "partial_results": context.get("partial_results", {})}
        return None


class CacheReturnStrategy(DegradationStrategy):
    """Cache return: look up similar queries in cache."""
    def __init__(self, cache: Optional[dict] = None):
        self._cache = cache or {}

    def _query_fingerprint(self, messages: list) -> str:
        content = "".join(m.get("content", "")[:500] for m in messages)
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def try_execute(self, context: dict) -> Optional[Any]:
        messages = context.get("messages", [])
        fp = self._query_fingerprint(messages)
        cached = self._cache.get(fp)
        if cached is not None:
            logger.info("Degradation: returning cached result (fp=%s)", fp)
            return {"cache_hit": True, "stale": True, "result": cached}
        return None

    def store(self, messages: list, result: Any) -> None:
        fp = self._query_fingerprint(messages)
        self._cache[fp] = result


class AsyncDeferStrategy(DegradationStrategy):
    """Async defer: enqueue task, return processing acknowledgment."""
    def __init__(self, queue: Optional[Any] = None):
        self._queue = queue or []

    def try_execute(self, context: dict) -> Optional[Any]:
        task = {
            "messages": context.get("messages"),
            "metadata": context.get("metadata", {}),
        }
        self._queue.append(task)
        logger.info("Degradation: task deferred to async queue (depth=%d)",
                     len(self._queue))
        return {
            "deferred": True,
            "queue_position": len(self._queue),
            "estimated_wait_seconds": len(self._queue) * 15,
        }


class GracefulFailureStrategy(DegradationStrategy):
    """Graceful failure: return actionable error guidance."""
    def __init__(self, fallback_message: str = ""):
        self.fallback_message = fallback_message

    def try_execute(self, context: dict) -> Optional[Any]:
        msg = self.fallback_message or (
            "The service is temporarily unavailable. Please try again later or visit the self-service portal."
        )
        return {
            "error": "graceful_failure",
            "user_message": msg,
            "support_contact": "[email protected]",
        }


class GracefulDegradationEngine:
    """Graceful degradation engine.

    Attempts strategies in priority order:
    model_fallback -> tool_skip -> cache_return ->
    async_defer -> partial_result -> graceful_failure

    Integrated with circuit breakers: tool circuit open -> auto tool_skip.
    """

    def __init__(self):
        self._strategies: List[tuple] = []
        self._circuit_breakers: Dict[str, Any] = {}
        self._fallback_chain: List[str] = []

    def register_model_fallback(self, models: List[str]):
        self._strategies.append(
            ("model_fallback", ModelFallbackStrategy(models)))

    def register_optional_tools(self, tool_names: List[str]):
        self._strategies.append(
            ("tool_skip", SkipToolStrategy(tool_names)))

    def register_cache(self, cache=None):
        self._strategies.append(
            ("cache_return", CacheReturnStrategy(cache)))

    def register_async_queue(self, queue=None):
        self._strategies.append(
            ("async_defer", AsyncDeferStrategy(queue)))

    def register_graceful_failure(self, message: str = ""):
        self._strategies.append(
            ("graceful_failure", GracefulFailureStrategy(message)))

    def link_circuit_breakers(self, breakers: Dict[str, Any]) -> None:
        """Link circuit breakers: tool open -> degradation auto-skips it."""
        self._circuit_breakers = breakers

    def execute(self, context: dict) -> tuple:
        """Execute degradation chain, return (result, DegradationMetadata)."""
        meta = DegradationMetadata()
        self._fallback_chain = []

        for strategy_name, strategy in self._strategies:
            self._fallback_chain.append(strategy_name)
            try:
                result = strategy.try_execute(context)
                if result is not None:
                    meta.fallback_chain = self._fallback_chain

                    if strategy_name == "model_fallback":
                        meta.level = DegradationLevel.FALLBACK
                        meta.quality_estimate = 0.85
                    elif strategy_name == "tool_skip":
                        meta.level = DegradationLevel.PARTIAL
                        meta.missing_components.append(
                            context.get("failed_tool", "unknown"))
                        meta.quality_estimate = 0.75
                    elif strategy_name == "cache_return":
                        meta.level = DegradationLevel.PARTIAL
                        meta.quality_estimate = 0.70
                    elif strategy_name == "async_defer":
                        meta.level = DegradationLevel.DEFERRED
                        meta.quality_estimate = 0.60
                    elif strategy_name == "graceful_failure":
                        meta.level = DegradationLevel.FAILED
                        meta.quality_estimate = 0.0
                        meta.user_message = result.get("user_message", "")

                    logger.info(
                        "Degradation chain succeeded at '%s' (level=%s)",
                        strategy_name, meta.level.value)
                    return result, meta
            except Exception as e:
                logger.warning("Degradation strategy '%s' failed: %s",
                               strategy_name, e)
                continue

        # All strategies failed
        meta.level = DegradationLevel.FAILED
        meta.quality_estimate = 0.0
        meta.user_message = "All degradation strategies exhausted. Contact support."
        meta.fallback_chain = self._fallback_chain
        return {"error": "total_degradation_failure"}, meta

    def should_skip_tool(self, tool_name: str) -> bool:
        """Check if a tool should be skipped due to open circuit breaker."""
        breaker = self._circuit_breakers.get(tool_name)
        if breaker and hasattr(breaker, "state"):
            cb_state = breaker.state
            if hasattr(cb_state, "value") and cb_state.value == "open":
                return True
        return False

    def get_metrics(self) -> dict:
        return {
            "strategies_registered": len(self._strategies),
            "circuit_breakers_linked": len(self._circuit_breakers),
            "last_fallback_chain": self._fallback_chain,
        }


# ---- Usage example ----
if __name__ == "__main__":
    engine = GracefulDegradationEngine()

    engine.register_model_fallback(
        models=["claude-opus", "claude-haiku",
                "deepseek-chat", "gpt-5.4-mini"])
    engine.register_optional_tools(
        ["send_notification", "log_analytics", "enrich_profile"])
    engine.register_cache({})
    engine.register_async_queue([])
    engine.register_graceful_failure(
        "Service temporarily unavailable. Please try again or contact [email protected].")

    context = {
        "current_model": "claude-opus",
        "failed_tool": "enrich_profile",
        "messages": [{"role": "user", "content": "Check my order status"}],
        "partial_results": {"order_id": "38291", "status_hint": "shipping"},
        "metadata": {},
    }

    result, meta = engine.execute(context)
    print(f"Result: {result}")
    print(f"Degradation: level={meta.level.value}, "
          f"quality={meta.quality_estimate}")
    print(f"Fallback chain: {meta.fallback_chain}")
    print(f"Missing: {meta.missing_components}")

Design notes:

6. Composing the Patterns: A Production Resilience Architecture

Sections 2–5 introduced four independent resilience patterns. But in production they are not used independently—a single agent request passes through all four layers simultaneously. This section shows how they compose.

Complete Request Lifecycle (7 Layers)

An agent request passes through seven resilience checkpoints from entry to exit:

  1. Bulkhead entry check: User token budget? Correct tool thread pool? Session resources within limits?
  2. Rate Limiter token budget check: Does the target provider have sufficient TPM quota for this call?
  3. LLM call (Circuit Breaker wrapped): Is the provider circuit breaker Closed? Post-call token calibration and circuit state update.
  4. LLM failure → Degradation engine decision: Provider unavailable? Select model fallback. Circuit breaker Open? Skip the failed tool.
  5. Before tool call → Circuit Breaker check: Is the tool's circuit breaker Closed? Token cost within normal range?
  6. Tool call failure → Degradation engine decision: Non-critical tool → skip. Critical tool → try cache → async defer → partial result.
  7. Response exit → Bulkhead release: Release user budget, thread pool resources, session resources.

Pattern Interaction Matrix

Pattern APattern BInteractionPotential Conflict
Circuit BreakerRate LimiterComplementary. CB stops dead calls; RL protects quota.Over-throttle if RL too conservative.
Circuit BreakerBulkheadComplementary. CB faces external; Bulkhead faces internal.Low conflict.
Circuit BreakerDegradationDeeply integrated. CB Open → Degradation auto-skips tool.Degradation loop possible; enforce max depth.
Rate LimiterBulkheadComplementary. RL is flow control; Bulkhead is resource isolation.Double-counting; clarify boundaries.
Rate LimiterDegradationRL rejection → Degradation tries alternate provider.All providers rate-limited → graceful failure.
BulkheadDegradationBulkhead exhaustion → Degradation async_defer or graceful failure.Low conflict.

Configuration Defaults

These are reasonable starting thresholds drawn from production experience:

Config ItemDefaultNotes
CB failure threshold5 failures / 60sLower to 3 for high-frequency agents; raise to 10 for low-frequency
CB cooldown30 secondsTool APIs typically recover within this window; LLM providers suggest 60s
RL token bucket capacityProvider TPM limitRead from provider tier information
RL token estimation factor3.5 chars/token (CN/EN mix)Consider integrating tiktoken or Anthropic tokenizer
Bulkhead user budget100,000 tokens/min/userDepends on user base and global quota
Bulkhead tool thread pool4–8 workers/toolI/O-bound → higher; CPU-bound → lower
Degradation max depth6 levelsPrevents degradation loops

Code: ResilientAgentRunner Class

This is the core code of the article—composing all four patterns into a runnable agent pipeline:

from __future__ import annotations

import time
import logging
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field

logger = logging.getLogger("agent.resilient_runner")


@dataclass
class CallResult:
    """Encapsulates each call result (inline version for ResilientAgentRunner)."""
    success: bool = False
    latency_ms: float = 0.0
    tokens_consumed: int = 0
    semantic_failure: Optional[str] = None


@dataclass
class ResilientAgentConfig:
    """Resilient agent configuration--all pattern thresholds centralized."""
    cb_failure_threshold: int = 5
    cb_cooldown_seconds: float = 30.0
    rl_acquire_timeout_ms: float = 5000
    bulkhead_user_token_limit: int = 100_000
    bulkhead_tool_workers: int = 6
    fallback_models: List[str] = field(default_factory=lambda: [
        "claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"
    ])
    optional_tools: List[str] = field(default_factory=lambda: [
        "send_notification", "log_analytics"
    ])
    max_degradation_depth: int = 6


@dataclass
class ResilientRunResult:
    """Complete result from a resilient agent run."""
    success: bool
    result: Any
    circuit_breaker_events: List[dict] = field(default_factory=list)
    rate_limiter_metrics: dict = field(default_factory=dict)
    bulkhead_metrics: dict = field(default_factory=dict)
    degradation_metadata: Optional[Any] = None
    total_latency_ms: float = 0.0
    tokens_consumed: int = 0
    resilience_events: List[str] = field(default_factory=list)


class ResilientAgentRunner:
    """Production-grade agent resilience runner.

    Weaves all four patterns into a 7-layer request lifecycle.
    """

    def __init__(self, circuit_breakers, rate_limiter, bulkhead,
                 degradation_engine, config=None):
        self.circuit_breakers = circuit_breakers
        self.rate_limiter = rate_limiter
        self.bulkhead = bulkhead
        self.degradation_engine = degradation_engine
        self.config = config or ResilientAgentConfig()
        self.degradation_engine.link_circuit_breakers(circuit_breakers)

    def run(self, user_id, session_id, provider, messages,
            tools_to_call) -> ResilientRunResult:
        """Run a complete resilience-protected agent request."""
        start_time = time.monotonic()
        result = ResilientRunResult(success=False, result=None)
        estimated_tokens = 0

        # LAYER 1: Bulkhead entry check
        try:
            estimated_tokens = self.rate_limiter.estimate_tokens(
                provider, messages)
            self.bulkhead.check_user_budget(user_id, estimated_tokens)
            self.bulkhead.register_session(session_id)
            self.bulkhead.check_session_resource(
                session_id, "file_descriptor")
            result.resilience_events.append("bulkhead:entry_passed")
        except Exception as e:
            result.resilience_events.append(f"bulkhead:entry_failed:{e}")
            return result

        # LAYER 2: Rate Limiter token budget check
        token_acquired = self.rate_limiter.acquire(
            provider, estimated_tokens,
            timeout_ms=self.config.rl_acquire_timeout_ms)
        if not token_acquired:
            result.resilience_events.append("rate_limiter:rejected")
            ctx = {"messages": messages, "error": "rate_limited"}
            deg_result, deg_meta = self.degradation_engine.execute(ctx)
            result.result = deg_result
            result.degradation_metadata = deg_meta
            return result
        result.resilience_events.append("rate_limiter:acquired")

        # LAYER 3: LLM call (Circuit Breaker wrapped)
        provider_cb = self.circuit_breakers.get(
            f"provider:{provider.value}")
        llm_response = None

        if provider_cb:
            try:
                provider_cb.before_call()
                # === Actual LLM call (replace with real call in production) ===
                llm_response = {
                    "content": "The order #38291 is in transit.",
                    "tool_calls": [
                        {"function": {"name": "get_order",
                         "arguments": '{"order_id": "38291"}'}},
                    ],
                    "usage": {"total_tokens": 1200},
                }
                # ================================================
                actual_tokens = llm_response.get("usage", {}).get(
                    "total_tokens", estimated_tokens)
                result.tokens_consumed += actual_tokens
                self.rate_limiter.release(
                    provider, actual_tokens, estimated_tokens)

                # Create CallResult for circuit breaker update
                cr = CallResult(
                    success=True, latency_ms=100,
                    tokens_consumed=actual_tokens)
                provider_cb.after_call(cr)
                result.resilience_events.append(
                    "circuit_breaker:llm_passed")
            except Exception as e:
                result.resilience_events.append(
                    f"circuit_breaker:llm_failed:{e}")
                ctx = {"messages": messages,
                       "current_model": provider.value,
                       "error": str(e)}
                deg_result, deg_meta = self.degradation_engine.execute(ctx)
                result.result = deg_result
                result.degradation_metadata = deg_meta
                return result

        # LAYER 5-6: Tool call pre-CB check + degradation
        tool_results = {}
        skipped_tools = []
        # Guard against None when provider_cb was also None
        if llm_response is None:
            llm_response = {}
        tool_calls = llm_response.get("tool_calls", [])

        for tc in tool_calls:
            tool_name = tc.get("function", {}).get("name", "")
            if not tool_name:
                continue
            if self.degradation_engine.should_skip_tool(tool_name):
                result.resilience_events.append(
                    f"degradation:skipping_tool:{tool_name}")
                skipped_tools.append(tool_name)
                continue

            tool_cb = self.circuit_breakers.get(f"tool:{tool_name}")
            if tool_cb:
                try:
                    tool_cb.before_call()
                    tool_results[tool_name] = {
                        "status": "success",
                        "data": f"Result from {tool_name}"}
                    cr = CallResult(
                        success=True, latency_ms=50,
                        tokens_consumed=0)
                    tool_cb.after_call(cr)
                    result.resilience_events.append(
                        f"circuit_breaker:tool_passed:{tool_name}")
                except Exception as e:
                    result.resilience_events.append(
                        f"circuit_breaker:tool_failed:{tool_name}")
                    ctx = {"failed_tool": tool_name,
                           "partial_results": tool_results,
                           "messages": messages}
                    deg_result, deg_meta = (
                        self.degradation_engine.execute(ctx))
                    if deg_meta.level.value != "failed":
                        tool_results[tool_name] = {
                            "degraded": True, "reason": str(e)}
                        skipped_tools.append(tool_name)
                    result.degradation_metadata = deg_meta

        # LAYER 7: Response exit -> Bulkhead release
        self.bulkhead.release_session_resource(
            session_id, "file_descriptor")
        result.resilience_events.append("bulkhead:exit_released")

        result.success = True
        result.result = {
            "content": llm_response.get("content"),
            "tool_results": tool_results,
            "skipped_tools": skipped_tools,
            "resilience": {"events": result.resilience_events},
        }
        result.total_latency_ms = (
            time.monotonic() - start_time) * 1000
        result.bulkhead_metrics = self.bulkhead.get_metrics()
        return result

    def get_metrics(self) -> dict:
        """Aggregate metrics from all resilience layers--for observability consumption."""
        return {
            "circuit_breakers": {
                name: cb.get_metrics()
                for name, cb in self.circuit_breakers.items()
                if hasattr(cb, "get_metrics")
            },
            "rate_limiter": self.rate_limiter.get_metrics(),
            "bulkhead": self.bulkhead.get_metrics(),
            "degradation": self.degradation_engine.get_metrics(),
        }


# ---- Usage example ----
if __name__ == "__main__":
    # In a real project, import these from their respective modules:
    # from circuit_breaker import AgentCircuitBreaker
    # from rate_limiter import TokenAwareRateLimiter, Provider
    # from bulkhead import AgentBulkhead
    # from degradation import GracefulDegradationEngine

    # ---- Initialize four patterns ----
    # 1. Circuit Breakers (Provider + Tool level)
    provider_cb = AgentCircuitBreaker("provider:deepseek")
    tool_cb_get_order = AgentCircuitBreaker("tool:get_order")
    tool_cb_lookup = AgentCircuitBreaker("tool:lookup_shipping")
    circuit_breakers = {
        "provider:deepseek": provider_cb,
        "tool:get_order": tool_cb_get_order,
        "tool:lookup_shipping": tool_cb_lookup,
    }

    # 2. Rate Limiter
    rl = TokenAwareRateLimiter()

    # 3. Bulkhead
    bulkhead = AgentBulkhead(default_token_limit=100_000)

    # 4. Degradation Engine
    degradation = GracefulDegradationEngine()
    degradation.register_model_fallback([
        "claude-opus", "claude-haiku", "deepseek-chat", "gpt-5.4-mini"])
    degradation.register_optional_tools(
        ["send_notification", "log_analytics"])
    degradation.register_cache({})
    degradation.register_async_queue([])
    degradation.register_graceful_failure(
        "Service temporarily unavailable. Try again later or contact [email protected].")

    # ---- Create Runner and execute ----
    runner = ResilientAgentRunner(
        circuit_breakers, rl, bulkhead, degradation)

    messages = [{"role": "user", "content": "Check order #38291 shipping status"}]
    result = runner.run(
        user_id="user-42",
        session_id="sess-abc",
        provider=Provider.DEEPSEEK,
        messages=messages,
        tools_to_call=["get_order", "lookup_shipping"],
    )

    print("=" * 50)
    print(f"Success: {result.success}")
    print(f"Content: {result.result['content']}")
    print(f"Tool results: {result.result['tool_results']}")
    print(f"Skipped tools: {result.result['skipped_tools']}")
    print(f"Total latency: {result.total_latency_ms:.0f}ms")
    print(f"Tokens consumed: {result.tokens_consumed}")
    print(f"Resilience events: {result.resilience_events}")
    print("=" * 50)
    print("Runner metrics:", runner.get_metrics())

Design notes:

7. Multi-Provider Resilience: Unified Protection Across OpenAI, Claude, and DeepSeek

Most production agents use multiple LLM providers to balance cost, latency, and availability. But each provider has different failure modes and rate limit models. Multi-provider resilience architecture abstracts these differences into a unified protection layer.

The Provider Abstraction Layer

Multi-provider resilience requires a provider abstraction layer that lets agents operate without caring whether the underlying provider is OpenAI, Anthropic, or DeepSeek. This aligns with the core principle of Model-Agnostic Agent Design—the abstraction layer is what makes cross-provider failover possible.

Per-Provider Independent Circuit Breakers + Rate Limiters

Each provider gets its own independent circuit breaker and rate limiter instance:

When the OpenAI circuit breaker opens, only OpenAI calls are blocked—Anthropic and DeepSeek calls continue normally.

Cross-Provider Failover Chain

The failover chain is a configurable priority list (e.g., ["openai", "anthropic", "deepseek"]). Routing decisions consider availability, cost (under budget pressure, prefer cheaper providers), and latency.

DeepSeek: Central Focus for Chinese Teams

For Chinese engineering teams, DeepSeek plays a dual role:

Code: MultiProviderResilienceManager Class

from __future__ import annotations

import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import logging

logger = logging.getLogger("agent.multi_provider")


class ProviderTier(Enum):
    PREMIUM = "premium"     # OpenAI GPT-5.5, Claude Opus
    STANDARD = "standard"   # GPT-5.4, Claude Sonnet
    BUDGET = "budget"       # GPT-5.4-mini, Claude Haiku, DeepSeek


@dataclass
class ProviderInfo:
    """Provider profile: availability, cost, and capability metadata."""
    name: str
    tier: ProviderTier
    cost_per_1m_input: float = 0.0
    cost_per_1m_output: float = 0.0
    max_context: int = 128_000
    supports_streaming: bool = True
    circuit_breaker: Optional[Any] = None     # AgentCircuitBreaker
    rate_limiter: Optional[Any] = None        # TokenAwareRateLimiter


# Provider cost profiles (2026 Q2 approximate; check provider pricing pages)
PROVIDER_REGISTRY: Dict[str, ProviderInfo] = {
    "openai-gpt55": ProviderInfo(
        name="openai-gpt55", tier=ProviderTier.PREMIUM,
        cost_per_1m_input=5.00, cost_per_1m_output=16.00,
        max_context=256_000,
    ),
    "openai-gpt54": ProviderInfo(
        name="openai-gpt54", tier=ProviderTier.STANDARD,
        cost_per_1m_input=2.50, cost_per_1m_output=10.00,
    ),
    "anthropic-opus": ProviderInfo(
        name="anthropic-opus", tier=ProviderTier.PREMIUM,
        cost_per_1m_input=15.00, cost_per_1m_output=75.00,
    ),
    "anthropic-haiku": ProviderInfo(
        name="anthropic-haiku", tier=ProviderTier.BUDGET,
        cost_per_1m_input=0.80, cost_per_1m_output=4.00,
    ),
    "deepseek-chat": ProviderInfo(
        name="deepseek-chat", tier=ProviderTier.BUDGET,
        cost_per_1m_input=0.14, cost_per_1m_output=0.28,   # Ultra-low cost
        max_context=128_000,
    ),
}


class MultiProviderResilienceManager:
    """Multi-provider resilience manager.

    Core responsibilities:
    1. Per-provider independent circuit breaker + rate limiter management
    2. Cross-provider failover chain (availability + cost + latency)
    3. Unified token budget tracking
    4. Cost-aware routing
    """

    def __init__(self, provider_registry=None):
        self.providers = provider_registry or PROVIDER_REGISTRY
        self._token_usage: Dict[str, int] = {}
        self._budget_pressure_threshold: float = 0.80  # 80% triggers cost routing

    def attach_circuit_breaker(self, provider_name: str, cb: Any) -> None:
        """Bind an independent circuit breaker to a provider."""
        if provider_name in self.providers:
            self.providers[provider_name].circuit_breaker = cb

    def attach_rate_limiter(self, provider_name: str, rl: Any) -> None:
        """Bind an independent rate limiter to a provider."""
        if provider_name in self.providers:
            self.providers[provider_name].rate_limiter = rl

    def is_provider_available(self, provider_name: str) -> bool:
        """Check if provider is available (CB Closed + RL has quota)."""
        info = self.providers.get(provider_name)
        if not info:
            return False
        if info.circuit_breaker:
            cb_state = info.circuit_breaker.state
            if hasattr(cb_state, "value") and cb_state.value != "closed":
                return False
        if info.rate_limiter:
            metrics = info.rate_limiter.get_metrics()
            provider_metrics = metrics.get(provider_name, {})
            if provider_metrics.get("usage_pct", 0) > 95:
                return False
        return True

    def select_provider(
        self, preferred: List[str], context=None
    ) -> Optional[ProviderInfo]:
        """Select an available provider by priority.

        Decision logic:
        1. Follow preferred list order
        2. Skip unavailable providers
        3. If global budget pressure > 80%, BUDGET tier gets priority
        """
        context = context or {}
        total_used = sum(self._token_usage.values())
        total_cap = 0
        for info in self.providers.values():
            if info.rate_limiter:
                total_cap = max(total_cap, info.rate_limiter._aggregate_cap)
        budget_pressure = (total_used / total_cap) if total_cap > 0 else 0.0

        if budget_pressure > self._budget_pressure_threshold:
            logger.info(
                "Budget pressure %.1f%% > %.0f%%, cost-aware routing",
                budget_pressure * 100, self._budget_pressure_threshold * 100)
            preferred = sorted(
                preferred,
                key=lambda p: (
                    self.providers[p].tier != ProviderTier.BUDGET,
                    self.providers[p].cost_per_1m_input,
                ),
            )

        for name in preferred:
            if self.is_provider_available(name):
                return self.providers[name]

        for name, info in self.providers.items():
            if info.tier == ProviderTier.BUDGET and self.is_provider_available(name):
                logger.warning("Falling back to budget provider %s", name)
                return info
        return None

    def record_usage(self, provider_name: str, tokens: int) -> None:
        self._token_usage[provider_name] = (
            self._token_usage.get(provider_name, 0) + tokens)

    def get_usage_summary(self) -> dict:
        total = sum(self._token_usage.values())
        return {
            "per_provider": dict(self._token_usage),
            "total_tokens": total,
        }

    def get_failover_chain(self, preferred: List[str]) -> List[str]:
        chain = []
        for name in preferred:
            if self.is_provider_available(name):
                chain.append(name)
        for name in self.providers:
            if name not in chain and self.is_provider_available(name):
                chain.append(name)
        return chain

    def get_metrics(self) -> dict:
        metrics = {"providers": {}, "aggregate": self.get_usage_summary()}
        for name, info in self.providers.items():
            cb_metrics = {}
            if info.circuit_breaker and hasattr(info.circuit_breaker, "get_metrics"):
                cb_metrics = info.circuit_breaker.get_metrics()
            rl_metrics = {}
            if info.rate_limiter and hasattr(info.rate_limiter, "get_metrics"):
                rl_metrics = info.rate_limiter.get_metrics()
            metrics["providers"][name] = {
                "tier": info.tier.value,
                "available": self.is_provider_available(name),
                "cost_per_1m": {"input": info.cost_per_1m_input,
                                "output": info.cost_per_1m_output},
                "circuit_breaker": cb_metrics,
                "rate_limiter": rl_metrics,
                "tokens_used": self._token_usage.get(name, 0),
            }
        return metrics


# ---- Usage example ----
if __name__ == "__main__":
    manager = MultiProviderResilienceManager()

    class FakeCB:
        def __init__(self, name):
            self.name = name
            self.state = type("S", (), {"value": "closed"})()
        def get_metrics(self):
            return {"state": self.state.value}

    class FakeRL:
        def __init__(self):
            self._aggregate_cap = 2_000_000
        def get_metrics(self):
            return {"aggregate": {"usage_pct": 35.0}}

    for name in PROVIDER_REGISTRY:
        manager.attach_circuit_breaker(name, FakeCB(name))
        manager.attach_rate_limiter(name, FakeRL())

    # Simulate OpenAI circuit open
    manager.providers["openai-gpt55"].circuit_breaker.state.value = "open"

    preferred = ["openai-gpt55", "anthropic-haiku", "deepseek-chat"]
    selected = manager.select_provider(preferred)
    if selected:
        print(f"Selected: {selected.name} (tier={selected.tier.value})")
    else:
        print("No provider available!")

    chain = manager.get_failover_chain(preferred)
    print(f"Failover chain: {chain}")

    manager.record_usage("anthropic-haiku", 5000)
    manager.record_usage("deepseek-chat", 15000)
    print("Usage summary:", manager.get_usage_summary())

Design notes:

8. FAQ + Next Steps

Frequently Asked Questions

1. What is the difference between a circuit breaker and a retry, and when should I use each?

Retry and circuit breaker solve two different classes of problems. Retries are for transient faults—network jitter, temporary connection timeouts, occasional 503s—failures that may self-resolve within seconds, where trying again is likely to succeed. Circuit breakers are for persistent faults—the downstream service is confirmed down, an endpoint has failed 5 times in 30 seconds—where retries only waste time and tokens while adding load to an already-crashed service. In deployment, they are nested: retries execute inside the circuit breaker. When the breaker is Closed, retry logic runs normally; when the breaker is Open, before_call() raises immediately and retries never fire—exactly the behavior we want.

2. What is the core difference between token-aware rate limiting and traditional request-based rate limiting?

The core difference is the unit of consumption. Traditional rate limiters consume "requests"—N requests per minute, each consuming 1. This assumes each request costs roughly the same—an assumption that is false in the LLM world. A 50-token classification request and an 80,000-token document analysis request look identical to a traditional limiter—both consume 1 request slot. But provider billing and rate limiting is token-based (TPM), not request-based. Token-aware rate limiting changes the consumption unit: each call consumes not 1 but the estimated token count for that call. Additionally, post-call calibration from response headers is essential—because clients can never precisely estimate tokens (especially output tokens), they must rely on the actual counts the provider returns in its response to keep the client bucket accurate.

3. Bulkhead isolation and rate limiting seem similar—how do I distinguish them?

They are similar on the "counting and limiting" dimension but fundamentally different: Rate Limiting is flow control—it decides "should this request pass through?" based on rate and time windows. Rejected requests can usually be retried later. Bulkhead is resource isolation—it allocates independent pools from physical resources (threads, memory, file handles) and ensures exhausting one pool does not affect others. The bulkhead's user-level token budget looks like rate limiting, but its motivation is not "this user is calling too fast"—it is "if this user drains the global token quota, will other users be affected?" Bulkheads partition by isolation domain (users, tools, sessions); Rate Limiters partition by rate and provider. In the architecture: the Rate Limiter sits at the call entry; the Bulkhead sits at the resource allocation layer.

4. Will graceful degradation make users perceive quality degradation?

Yes, but perceiving quality degradation beats perceiving service unavailability. This is the core philosophy of degradation design—"better than nothing." The key is managing user perception: the quality_estimate field in degradation metadata lets the frontend adjust the UI based on degradation level—for example, a partial result can show a yellow info banner "Some information temporarily unavailable," and a fallback result can display "Using backup model, results may differ slightly." More importantly, degraded results often exceed user expectations—a cached answer from DeepSeek (quality 0.7) may be perfectly sufficient for "Where is my order?" and the user may never notice it is incomplete. What truly frustrates users is complete unresponsiveness—degradation prevents exactly that worst case.

5. Do I need to implement all four patterns, or can I adopt them incrementally?

You can and should adopt them incrementally. The recommended adoption order is: Circuit Breaker → Token-Aware Rate Limiter → Graceful Degradation → Bulkhead Isolation. Why this order? The circuit breaker solves the most immediate problem—preventing agents from burning tokens on dead dependencies (one code change, instantly visible benefit). The rate limiter is second priority—preventing 429 errors and quota exhaustion (since 429s are among the most common production agent failures). The degradation engine becomes especially valuable once circuit breakers and rate limiters are in place—they provide clear trigger signals for degradation. Bulkhead comes last—it solves the "shared resources causing cascading failures" problem, which typically only becomes a visible pain point after the agent is deployed to multi-user environments. Each pattern can be deployed independently and produce independent benefits—you do not need everything in place to start seeing results.

6. Will a multi-provider resilience architecture add too much complexity?

If you only use one LLM provider and your traffic is modest, multi-provider resilience may indeed be over-engineering. But several signals indicate you need it: (1) your agent has already experienced at least one production outage due to a provider going down; (2) your monthly LLM API bill exceeds $500 and a single provider accounts for 90%+; (3) you have cost-sensitive scenarios (e.g., customer support agent handling 10,000+ daily calls) where provider price differences directly impact margins. For most teams, a practical starting point is "OpenAI/Claude as primary + DeepSeek as fallback"—managing resilience for just 2 providers keeps complexity manageable while covering the vast majority of failure scenarios. Complexity is real—but measured against the revenue impact of a single provider outage, the implementation cost of multi-provider resilience (estimated 3–5 days of engineering effort) is typically a very worthwhile investment.