Agent 成本可观测性:如何追踪 token、工具调用和失败重试成本
30秒要点
- 核心问题:Agent 在生产环境中运行时,API 账单在涨但没人能说清楚"这个任务花了多少钱"、"钱花在 token 还是工具调用还是重试上"、"哪个用户消耗最多"。没有成本可观测性,你就是在盲烧钱。
- 四个追踪维度:Token 成本(按 provider/model 的精确计价)→ 工具调用成本(LLM function call token 开销 + 第三方 API 费用)→ 重试浪费(占总成本 15-30% — 示意场景,最隐蔽的损耗)→ 用户/租户级归因(多租户系统中谁在烧钱)。
- 关键设计:成本追踪的粒度必须与 Agent loop 的粒度对齐——每一次 LLM 调用、每一次工具执行、每一次重试都是独立的 cost record。Provider Pricing Registry 模式让你在未来模型调价时只改一个表。
- 读完能做什么:为你的 Agent 系统构建完整的成本可观测性层——从 token 计价到工具成本注册表,从重试浪费量化到 per-user 成本归因,从预算硬停止到成本感知模型路由,最后用 OpenTelemetry + Prometheus 汇成统一成本大盘。
1. 为什么 Agent 成本可观测性不同
2026 年 6 月,一个生产环境的代码审查 Agent 每天处理 800 个 PR。它的月账单是 $4,200。团队知道这个数字,但回答不了三个问题:$4,200 中有多少花在了真正有价值的审查上,多少被重试吞噬了?费用在不同客户之间是怎么分布的?如果下个月流量翻倍,预算是 $8,000 还是 $12,000?
这些问题不是"监控面板不够好看"的问题——它们是成本可见性缺失的问题。传统的可观测性三驾马车(metrics/traces/logs)能告诉你延迟和错误率,但它们无法告诉你美元流向。成本是可观测性中被系统性忽视的第四维度。
为什么传统的成本追踪思路对 Agent 不适用
如果你只运行单次 LLM 调用("生成摘要"、"翻译文档"),成本追踪很简单:每请求一个 cost record,月底加总。Agent 完全不同——它是一个多步循环体:思考 → 调用工具 → 观察结果 → 再思考 → 再调用工具 → ... → 最终输出。一次 task 可能触发 5-30 次 LLM 调用、3-15 次工具调用、0-5 次重试。
这意味着 Agent 的成本不是"一个请求花了多少钱",而是一个多维度成本向量:
- Token 成本:每次 LLM 调用的 input/output token 花费,随 provider 和 model 变化剧烈——同一个 prompt 在 DeepSeek-V4-Flash 上可能 $0.0003,在 GPT-5.5 上可能 $0.015(50 倍差异)。
- 工具调用成本:Agent 调用搜索 API、数据库查询、代码执行容器——这些都有独立的定价模型,和 LLM provider 完全无关。
- 重试成本:Agent 调用了错误的工具、收到了超时、遭遇了幻觉——这些失败的 step 消耗了 token 和工具费用但没有产生任何价值。
- 归因成本:在多租户 SaaS 中,同一个 Agent pipeline 为来自 200 个客户的请求服务——每个客户的成本需要独立追踪和计费。
Agent 的成本可观测性与传统可观测性的关系不是替代而是补充:延迟告诉你"哪个 step 慢了",成本告诉你"哪个 step 贵了"——两者交叉分析才能回答"花得值不值"。关于 Agent 可观测性的完整框架(metrics/traces/logs 三元组),参见 Agent 可观测性。成本是这套框架的第四维度——它的数据来源仍然是 traces 和 metrics,但输出是美元而非毫秒。
Agent loop 的成本结构:一个具体例子
下面是一个典型的代码审查 Agent 单次 task 的成本拆解(使用 GPT-5.4):
| Step | 操作 | Input Tokens | Output Tokens | Token 成本 | 工具成本 | 累计 |
|---|---|---|---|---|---|---|
| 1 | LLM 思考(分析 PR diff) | 8,200 | 450 | $0.0273 | — | $0.0273 |
| 2 | 工具调用:git blame(查作者) | 10,500 | 180 | $0.0290 | $0.0001 | $0.0564 |
| 3 | LLM 思考(分析 blame 结果) | 11,200 | 320 | $0.0328 | — | $0.0892 |
| 4 | 工具调用:搜索相关 issue | 12,800 | 200 | $0.0350 | $0.003 | $0.1272 |
| 5 | 重试(Step 4 超时) | 12,800 | 200 | $0.0350 | $0.003 | $0.1652 |
| 6 | LLM 思考(综合信息) | 15,000 | 500 | $0.0450 | — | $0.2102 |
| 7 | LLM 输出审查意见 | 15,800 | 1,200 | $0.0575 | — | $0.2677 |
关键发现:一次 $0.27 的 task 中,$0.038(14.2%)花在了 Step 5 的重试上——这次重试成功了,所以"看起来没问题",但如果你每天跑 800 次 task,这 14.2% 就是每天 $30 的浪费,每月 $900。而如果你没有追踪到 step 级别的成本,你只会看到"每个 task 大约 $0.27",永远不会知道这笔钱的存在。
核心洞察:Agent 的成本追踪必须做到 step 级粒度。将整个 task 视为一个 cost unit 会掩盖三个关键问题:哪些 step 烧钱最多、重试浪费占比多少、工具调用是否比 LLM 思考更贵。这三个问题没有 step 级数据就无法回答,而没有这些答案的成本优化就是盲猜。
成本可观测性的成熟度模型
不是一次性把所有维度都做全——你可以从一个最小可行版本开始,逐步增加维度。下表定义了四个成熟度级别:
| 级别 | 追踪内容 | 能回答的问题 | 工程投入 |
|---|---|---|---|
| L1: Token 计数 | 每次 LLM 调用的 input/output tokens | "我这个月总共用了多少 token?" | 从 API response 提取 usage 字段即可,10 行代码 |
| L2: 成本计价 | L1 + Provider Pricing Registry → 美元金额 | "GPT-5.4 和 DeepSeek-V4 哪个更省钱?" | 维护模型定价表 + cost calculator 函数 |
| L3: 多维度归因 | L2 + 工具调用成本 + 重试浪费 + per-user 分摊 | "客户 A 上个月花了多少钱?重试浪费了多少?" | CostRecord 数据模型 + trace context 传播 + SQL 聚合 |
| L4: 成本控制闭环 | L3 + 预算告警 + 成本感知路由 + 自动降本 | "预算快超了,自动切换到便宜模型" | BudgetController + CostAwareRouter + Prometheus 告警 |
本文覆盖 L2-L4,假设你已经有 L1(如果没有,Section 2 的代码会帮你一步到位)。
2. 跨 Provider Token 成本追踪
Token 成本是可观测性的起点——如果你的 Agent 连每次调用花了多少 token 都不知道,后面的工具成本、重试分析、预算控制都无从谈起。这一节的目标是:用一套统一的接口追踪 OpenAI、Anthropic、DeepSeek 和任何其他 provider 的 token 成本。
Provider 计价对比(2026 年 6 月)
在选择模型之前,先看清楚数字。下面是当前主流模型的每百万 token 价格对比:
| Provider | Model | Input ($/MTok) | Output ($/MTok) | Cache Read ($/MTok) | Batch 折扣 | 备注 |
|---|---|---|---|---|---|---|
| OpenAI | GPT-5.5 | $5.00 | $30.00 | $0.50 | 50% | 最强推理,最贵 |
| OpenAI | GPT-5.4 | $2.50 | $15.00 | $0.25 | 50% | 生产主力 |
| OpenAI | GPT-5.4-mini | $0.75 | $4.50 | $0.075 | 50% | 简单路由 |
| OpenAI | GPT-5.4-nano | $0.20 | $1.25 | $0.02 | 50% | 最便宜 GPT(账户特定定价) |
| Anthropic | Claude Opus 4.8 | $5.00 | $25.00 | $0.50 | 50% | 复杂 Agent |
| Anthropic | Claude Sonnet 4.6 | $3.00 | $15.00 | $0.30 | 50% | 推荐生产模型 |
| Anthropic | Claude Haiku 4.5 | $1.00 | $5.00 | $0.10 | 50% | 轻量任务 |
| DeepSeek | DeepSeek-V4-Flash | $0.14 | $0.28 | $0.0028 | — | 极致性价比 |
| DeepSeek | DeepSeek-V4-Pro | $0.435 | $0.87 | $0.003625 | — | DeepSeek 旗舰 |
读表的关键发现:
- 价格差距巨大:GPT-5.5 的 input 价格是 DeepSeek-V4-Flash 的 35.7 倍,output 价格是 107 倍。一个 10K token 的 prompt,GPT-5.5 收 $0.05,DeepSeek-V4-Flash 收 $0.0014。
- Prompt caching 的威力:大部分模型的 cache read 价格是常规 input 的 1/10(OpenAI 和 Anthropic)甚至 1/50(DeepSeek)。如果你的 Agent 有固定的 system prompt(大多数 Agent 都有),启用 prompt caching 可以瞬间砍掉一半以上的 input 成本。
- Batch API 50% 折扣:不要求实时的任务(离线评测、批处理分析)走 batch API 可以直接省一半——这是最容易被忽略的降本手段。
Provider Pricing Registry 模式
不同 provider 的计价接口互不兼容。OpenAI 返回 usage.prompt_tokens,Anthropic 返回 usage.input_tokens,DeepSeek 返回 usage.prompt_tokens 但键名可能随模型名称变化。更麻烦的是,prompt caching 在不同 provider 的 usage 响应中位于完全不同的字段路径。
解决方案是Provider Pricing Registry——一个中心化的价格表 + 标准化的 cost calculator。它的核心设计原则:当厂商调价时,你只需要改一个字典,所有 cost record 自动更新。
from __future__ import annotations
from dataclasses import dataclass
from decimal import Decimal
from typing import Dict, Tuple, Optional
from enum import Enum
# ---------------------------------------------------------------------------
# Provider Pricing Registry — single source of truth for all model costs.
# Updated June 2026 with current pricing from OpenAI, Anthropic, DeepSeek.
#
# 价格常量使用 Decimal:二进制浮点数无法精确表示大多数美元金额。
# DeepSeek 单个缓存命中 token 的价格 $0.0000000028 在 float 中直接归零。
# Decimal 保持精确值。
# ---------------------------------------------------------------------------
class CostCategory(str, Enum):
TOKEN_INPUT = "token_input"
TOKEN_OUTPUT = "token_output"
TOKEN_CACHE_READ = "token_cache_read"
TOOL_CALL = "tool_call"
RETRY_WASTE = "retry_waste"
# Prices in USD per 1M tokens — Decimal from string literals (2026-06)
MODEL_PRICING: Dict[Tuple[str, str], Dict[str, Decimal]] = {
# OpenAI
("openai", "gpt-5.5"): {"input": Decimal("5.00"), "output": Decimal("30.00"), "cache_read": Decimal("0.50")},
("openai", "gpt-5.4"): {"input": Decimal("2.50"), "output": Decimal("15.00"), "cache_read": Decimal("0.25")},
("openai", "gpt-5.4-mini"): {"input": Decimal("0.75"), "output": Decimal("4.50"), "cache_read": Decimal("0.075")},
("openai", "gpt-5.4-nano"): {"input": Decimal("0.20"), "output": Decimal("1.25"), "cache_read": Decimal("0.02")}, # varies by account
# Anthropic
("anthropic", "claude-opus-4-8"): {"input": Decimal("5.00"), "output": Decimal("25.00"), "cache_read": Decimal("0.50")},
("anthropic", "claude-sonnet-4-6"): {"input": Decimal("3.00"), "output": Decimal("15.00"), "cache_read": Decimal("0.30")},
("anthropic", "claude-haiku-4-5"): {"input": Decimal("1.00"), "output": Decimal("5.00"), "cache_read": Decimal("0.10")},
# DeepSeek — roughly 18-36x cheaper than GPT-5.5 on input
("deepseek", "deepseek-v4-flash"): {"input": Decimal("0.14"), "output": Decimal("0.28"), "cache_read": Decimal("0.0028")},
("deepseek", "deepseek-v4-pro"): {"input": Decimal("0.435"), "output": Decimal("0.87"), "cache_read": Decimal("0.003625")},
}
# Batch processing gives 50% discount for OpenAI and Anthropic
BATCH_DISCOUNT = Decimal("0.5")
_ONE_MILLION = Decimal("1_000_000")
def _cost_to_display(c: Decimal) -> str:
"""Format a Decimal cost for display. No rounding until display time."""
return f"${float(c):.8f}"
@dataclass
class TokenCost:
"""Result of a token cost calculation.
Token counts 保持为整数(计数,非金额)。所有成本字段使用 Decimal
以避免微小美元金额的二进制浮点精度损失。
"""
provider: str
model: str
input_tokens: int
output_tokens: int
cache_read_tokens: int
input_cost_usd: Decimal
output_cost_usd: Decimal
total_cost_usd: Decimal
def __repr__(self) -> str:
return (f"TokenCost({self.provider}/{self.model}: "
f"in={self.input_tokens} out={self.output_tokens} cache={self.cache_read_tokens} "
f"→ {_cost_to_display(self.total_cost_usd)})")
def calculate_token_cost(
provider: str,
model: str,
input_tokens: int,
output_tokens: int,
cache_read_tokens: int = 0,
*,
is_batch: bool = False,
) -> TokenCost:
"""
使用中心化价格注册表从 token 用量计算美元成本。
所有金额运算使用 Decimal — 聚合不四舍五入,仅在展示时格式化。
Token 计数保持为普通 int(它们是计数,不是货币)。
"""
prices = MODEL_PRICING.get((provider.lower(), model.lower()))
if prices is None:
return TokenCost(
provider=provider, model=model,
input_tokens=input_tokens, output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
input_cost_usd=Decimal("0"), output_cost_usd=Decimal("0"),
total_cost_usd=Decimal("0"),
)
cache_read = min(cache_read_tokens, input_tokens)
uncached_input = max(0, input_tokens - cache_read)
input_cost = (
uncached_input * prices["input"] + cache_read * prices["cache_read"]
) / _ONE_MILLION
output_cost = output_tokens * prices["output"] / _ONE_MILLION
discount = BATCH_DISCOUNT if is_batch else Decimal("1")
total = (input_cost + output_cost) * discount
return TokenCost(
provider=provider, model=model,
input_tokens=input_tokens, output_tokens=output_tokens,
cache_read_tokens=cache_read,
input_cost_usd=input_cost * discount,
output_cost_usd=output_cost * discount,
total_cost_usd=total,
)
# ---------------------------------------------------------------------------
# Usage — the calculator works with all providers through a single interface
# ---------------------------------------------------------------------------
# Example: GPT-5.4 call with 8000 input, 2000 output, 3000 cache hits
cost_gpt = calculate_token_cost("openai", "gpt-5.4", 8000, 2000, 3000)
print(cost_gpt)
# TokenCost(openai/gpt-5.4: in=8000 out=2000 cache=3000 → $0.04325000)
# Example: Same tokens on DeepSeek-V4-Flash — 15x cheaper
cost_ds = calculate_token_cost("deepseek", "deepseek-v4-flash", 8000, 2000, 3000)
print(cost_ds)
# TokenCost(deepseek/deepseek-v4-flash: in=8000 out=2000 cache=3000 → $0.00126840)
# Batch mode — additional 50% off for OpenAI/Anthropic
cost_gpt_batch = calculate_token_cost("openai", "gpt-5.4", 8000, 2000, 3000, is_batch=True)
print(f"Batch: {_cost_to_display(cost_gpt_batch.total_cost_usd)}")
# Batch: $0.02162500
这个 Registry 的设计有几个巧妙之处:
- Lookup key 是 (provider, model) tuple:不同 provider 的模型可以重名("mini" 对 OpenAI 和 Anthropic 有不同含义),tuple key 避免了歧义。
- cache_read_tokens 的处理:先减去 cache tokens 再用 regular price 算剩余部分,确保不会 double-count。Anthropic 和 OpenAI 在 usage response 中会把 cache_read 放在不同的字段位置,但解析完之后统一塞进这个函数的同一个参数。
- unknown model → 零成本:这是一个 intentional design——在生产环境中,你应该对此发出警告。静默地记录零成本比抛异常更好,因为你不希望成本追踪的 bug 导致 Agent 任务失败(成本追踪是非功能需求,不应阻塞功能路径)。
解析不同 Provider 的 Usage 响应
上面的 calculator 假设你已经有统一的 (provider, model, input_tokens, output_tokens, cache_read_tokens) 元组。但不同 provider 的 API 响应结构各异,需要一层薄薄的适配器:
def extract_usage_openai(response: dict) -> tuple[int, int, int]:
"""Extract (input_tokens, output_tokens, cache_read_tokens) from OpenAI response."""
usage = response.get("usage", {})
details = usage.get("input_token_details", {})
return (
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
details.get("cached_tokens", 0),
)
def extract_usage_anthropic(response: dict) -> tuple[int, int, int]:
"""Extract (input_tokens, output_tokens, cache_read_tokens) from Anthropic response."""
usage = response.get("usage", {})
return (
usage.get("input_tokens", 0),
usage.get("output_tokens", 0),
usage.get("cache_read_input_tokens", 0),
)
def extract_usage_deepseek(response: dict) -> tuple[int, int, int]:
"""Extract (input_tokens, output_tokens, cache_read_tokens) from DeepSeek response."""
usage = response.get("usage", {})
# DeepSeek follows OpenAI-compatible format
return (
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
usage.get("prompt_cache_hit_tokens", 0),
)
# Provider-specific extractors registry
USAGE_EXTRACTORS = {
"openai": extract_usage_openai,
"anthropic": extract_usage_anthropic,
"deepseek": extract_usage_deepseek,
}
def track_llm_call(
provider: str, model: str, response: dict, *, is_batch: bool = False
) -> TokenCost:
"""Unified entry point: extract usage from any provider response and calculate cost."""
extractor = USAGE_EXTRACTORS.get(provider.lower())
if extractor is None:
raise ValueError(f"Unknown provider: {provider}")
in_tok, out_tok, cache_tok = extractor(response)
return calculate_token_cost(provider, model, in_tok, out_tok, cache_tok, is_batch=is_batch)
关于 prompt caching 和上下文压缩如何进一步降低 token 成本,参见 Agent 上下文窗口管理。它们与本节的价格计算器是上下游关系——上下文管理减少 token 消耗量,价格计算器把剩余的 token 转换成美元。
3. 工具调用成本核算
很多团队追踪了 token 成本就以为完成了成本可观测性——他们错了。工具调用有三层成本,其中第一层(function definition 的 token 开销)几乎被所有人忽略。
工具调用的三层成本模型
| 成本层 | 来源 | 典型数量级 | 计费方 |
|---|---|---|---|
| L1: Function definition tokens | 工具 schema 在每个 LLM 请求的 system prompt 中占据 token | 每个工具 290-806 tokens(含参数描述) | LLM Provider |
| L2: Function call + result tokens | LLM 生成 tool_use block + 工具返回结果被塞回 context | 每次调用 200-2000 tokens | LLM Provider |
| L3: External API / infrastructure | 搜索 API 按次收费、数据库查询占用连接、容器运行消耗 CPU | $0.001-0.05/次 | 第三方供应商 / 云平台 |
L1 是最大的盲区。假设你的 Agent 定义了 8 个工具,每个工具的 JSON Schema 加上 description 平均 450 tokens。在 Agent loop 中,这些定义被放入每个 LLM 请求的 system prompt(或 Anthropic 的 tool_use 参数)。如果 Agent 在一次 task 中做了 12 次 LLM 调用,8 个工具 × 450 tokens × 12 次 = 43,200 tokens 被消耗在工具定义上——这些 token 不产生任何推理价值,纯粹是"为了让 LLM 知道能干什么"的固定开销。使用 GPT-5.4,这 43,200 input tokens 就是 $0.108。
关键的工程决策:工具定义的粒度影响成本。8 个独立工具和 1 个"多功能工具"(通过参数区分操作类型)的功能等价,但前者每次请求多消耗 3,150 tokens(7 个额外工具的 schema)。在每天 10,000 次 LLM 调用下,这些"多余的工具定义"每天烧掉 $0.007875 × 10,000 = $78.75(每月约 $2,362.50)。在决定添加一个新工具之前,问:这个工具是否 必须在 system prompt 里说明,还是可以作为 fallback(只在需要时加入)?参见 Agent 工具设计最佳实践 中关于工具定义优化的详细讨论。
Tool Cost Registry 模式
与 Provider Pricing Registry 类似,工具调用成本也需要一个中心化的注册表——但工具的成本模型比 token 更异构:有的按次计费,有的按时间计费,有的免费。
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Optional, List
@dataclass
class ToolCostRecord:
"""A cost record for a single tool invocation.
All monetary fields use Decimal for exact sub-cent precision.
Token counts remain int (counts, not money).
"""
tool_name: str
call_duration_s: float
llm_tokens_consumed: int # L1 + L2 token overhead
llm_cost_usd: Decimal # dollar cost of those tokens
external_api_cost_usd: Decimal # L3: third-party API fee
infrastructure_cost_usd: Decimal # L3: compute/runtime cost
total_cost_usd: Decimal = Decimal("0")
metadata: Dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
def __post_init__(self):
self.total_cost_usd = self.llm_cost_usd + self.external_api_cost_usd + self.infrastructure_cost_usd
# Tool Cost Registry — pricing models for every tool in the agent's arsenal
# Key design: each tool declares its pricing model type (per_call, per_second, free)
# All monetary amounts use Decimal from string literals to avoid binary float
# precision loss on sub-cent amounts.
TOOL_COST_REGISTRY: Dict[str, Dict] = {
# --- Third-party API tools ---
"web_search_serpapi": {
"model": "per_call",
"cost_per_call": Decimal("0.01"),
"unit": "api_call",
},
"web_search_bing": {
"model": "per_call",
"cost_per_call": Decimal("0.003"),
"unit": "api_call",
},
"web_search_google_custom": {
"model": "per_call",
"cost_per_call": Decimal("0.005"),
"unit": "api_call",
},
# --- Infrastructure tools ---
"db_query_postgres": {
"model": "per_call",
"cost_per_call": Decimal("0.0001"),
"unit": "query",
},
"code_execution_docker": {
"model": "per_second",
"cost_per_second": Decimal("0.000014"), # ~$0.05/hour container runtime
"unit": "second",
},
# --- Free tools (local operations) ---
"file_read": {"model": "free", "cost_per_call": Decimal("0"), "unit": "free"},
"file_write": {"model": "free", "cost_per_call": Decimal("0"), "unit": "free"},
"bash_exec": {"model": "free", "cost_per_call": Decimal("0"), "unit": "free"},
"regex_search": {"model": "free", "cost_per_call": Decimal("0"), "unit": "free"},
}
# Canonical tool definition sizes (tokens consumed per LLM call just for the schema)
# These are measured with tiktoken/gpt-4o encoding; your numbers may vary by encoding.
TOOL_DEFINITION_TOKENS: Dict[str, int] = {
"web_search_serpapi": 520,
"web_search_bing": 480,
"db_query_postgres": 350,
"code_execution_docker": 620,
"file_read": 290,
"file_write": 320,
"bash_exec": 450,
"regex_search": 380,
# Default for unlisted tools
"__default__": 400,
}
def calculate_tool_cost(
tool_name: str,
call_duration_s: float = 0.0,
num_llm_calls_in_task: int = 1,
llm_input_price_per_mtok: Decimal = Decimal("0.75"), # default to GPT-5.4-mini input price
) -> ToolCostRecord:
"""
Calculate the full cost of a tool invocation.
Three cost sources:
1. LLM token overhead: the tool's JSON schema definition consumed tokens in
every LLM call of this task (L1), plus the tool_use block + result (L2).
L2 varies per invocation, so it's passed in via a running tracker.
2. External API cost: from the Tool Cost Registry (L3).
3. Infrastructure cost: container runtime, DB connections, etc. (L3).
Args:
tool_name: Name of the tool (must be in TOOL_COST_REGISTRY).
call_duration_s: How long the tool execution took.
num_llm_calls_in_task: Number of LLM calls that included this tool's definition.
llm_input_price_per_mtok: Price per 1M input tokens for the current model.
"""
pricing = TOOL_COST_REGISTRY.get(tool_name)
if pricing is None:
pricing = {"model": "per_call", "cost_per_call": Decimal("0")} # unknown = free
# L3: External API / infrastructure cost
if pricing["model"] == "per_call":
external_cost = pricing.get("cost_per_call", Decimal("0"))
infra_cost = Decimal("0")
elif pricing["model"] == "per_second":
external_cost = Decimal("0")
infra_cost = pricing.get("cost_per_second", Decimal("0")) * Decimal(str(call_duration_s))
else: # free
external_cost = Decimal("0")
infra_cost = Decimal("0")
# L1: Tool definition token overhead (charged on EVERY LLM call in the task)
tool_def_tokens = TOOL_DEFINITION_TOKENS.get(
tool_name, TOOL_DEFINITION_TOKENS["__default__"]
)
# Total tokens consumed by this tool's definition across all LLM calls
total_definition_tokens = tool_def_tokens * num_llm_calls_in_task
# Note: L2 tokens (tool_use block + result) are tracked per-invocation by
# the caller and added to the record before insertion.
llm_cost = (Decimal(total_definition_tokens) * llm_input_price_per_mtok) / Decimal("1_000_000")
return ToolCostRecord(
tool_name=tool_name,
call_duration_s=call_duration_s,
llm_tokens_consumed=total_definition_tokens,
llm_cost_usd=llm_cost,
external_api_cost_usd=external_cost,
infrastructure_cost_usd=infra_cost,
)
# ---------------------------------------------------------------------------
# Usage examples
# ---------------------------------------------------------------------------
# A serpapi search call in a task with 5 LLM calls (GPT-5.4-mini)
cost_search = calculate_tool_cost("web_search_serpapi", call_duration_s=1.2, num_llm_calls_in_task=5)
print(f"Search tool total: ${float(cost_search.total_cost_usd):.6f}")
# Search tool total: $0.011950
# L1: 520 tokens × 5 calls × $0.75/MTok = $0.001950
# L3: $0.01 per call
# Notice: L1 is 19.5% of the tool's total cost — not negligible!
# Docker code execution for 30 seconds, single LLM call task
cost_docker = calculate_tool_cost("code_execution_docker", call_duration_s=30.0, num_llm_calls_in_task=1)
print(f"Code execution total: ${float(cost_docker.total_cost_usd):.8f}")
# Code execution total: $0.00088500
# L1: 620 tokens × 1 × $0.75/MTok = $0.000465
# L3: 30s × $0.000014/s = $0.000420
这个 Registry 揭示了工具调用成本追踪中的两个重要事实:
- 高频小工具也可能主导成本。
bash_exec是免费的(本地执行),但 450 tokens × 20 次 LLM 调用/任务 × 800 任务/天 = 7,200,000 tokens/天。在 GPT-5.4 上这就是 $18/天的纯工具定义开销。免费的工具不是零成本。 - 搜索工具的 L3 成本远大于 L1。SerpAPI $0.01/次,如果 Agent 在一次 task 中搜索 6 次,L3 成本是 $0.06——而 L1 可能只有 $0.002。优化方向是减少不必要的搜索轮次,而非压缩工具定义。
关于 MCP 协议工具的跨进程成本追踪,参见 MCP 协议生产环境实战——MCP 的工具调用增加了额外的序列化和网络开销,需要单独的成本考量。
4. 重试与浪费成本——Agent 系统中隐蔽的金钱黑洞
实际生产中,Agent 重试浪费通常占 Token 总成本的 15-30%。这是业内常见的引用范围 (示意场景 — 具体比例取决于 Agent 设计、重试策略和模型可靠性)。如果你每个月花 $10,000 在 Agent API 调用上,其中 $1,500-$3,000 是纯浪费——这些 token 被消耗了,工具被调用了,但没有产生任何成功的输出。
这就是本节要制造的 "wow moment":绝大多数团队不知道他们在重试上烧了多少钱。你一旦量化出来,数字会让你立刻开始优化重试策略。
重试浪费的四类来源
不是所有重试都是"浪费"——有些重试是合理的容错。但很多重试是纯损耗。下表按"是否可避免"分类:
| 重试类型 | 触发原因 | 可避免? | 每次成本(典型) | 发生频率 |
|---|---|---|---|---|
| Timeout 重试 | LLM API 响应超时(网络波动) | 部分(增大 timeout 或切换 endpoint) | $0.005-0.05 | 2-8% of calls |
| Rate limit 重试 | 超出 provider 并发限制 | 是(优化并发策略、增加预留容量) | $0.001-0.02 | 1-5% of calls |
| 幻觉工具调用重试 | LLM 调用了一个不存在的工具或用错了参数 | 部分(更好的 tool description、few-shot 示例) | $0.01-0.10 | 3-12% of tool calls |
| 多轮失败重试 | Task 本身过于复杂、Agent 无法在给定轮次内完成 | 部分(提高 max_turns 或分解 task) | $0.10-2.00 | 5-15% of tasks |
幻觉工具调用是最贵的浪费类型——它不仅消耗了 LLM 生成 tool_use 的 tokens,还在工具调用层产生了实际成本(搜索 API 费用、数据库查询开销),而结果毫无价值。在一次对 50,000 次 Agent task 的分析中(示意场景),我们发现有 7.2% 的工具调用是"幻觉调用"——调用了正确的工具但参数在语义上不合理(如搜索了空字符串、查询了不存在的表、读取了一个被废弃的文件)。这些调用的平均成本是 $0.023/次,总浪费占工具调用总成本的 11.4%(示意数字,具体比例取决于 Agent 设计)。
RetryCost Tracker 代码实现
下面的代码展示了如何在 Agent loop 中追踪每一次重试的 token 浪费和 dollar 浪费,并计算 waste_ratio——这是成本大盘中最震撼的一个数字:
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime
@dataclass
class RetryRecord:
"""A single retry attempt's cost footprint."""
attempt_number: int
reason: str # timeout, rate_limit, wrong_tool, hallucination, model_error
tokens_wasted: int # total tokens consumed in this failed attempt
cost_wasted_usd: float # dollar cost of the wasted tokens
tool_calls_wasted: int # number of tool invocations in this failed attempt
tool_cost_wasted_usd: float # L3 tool costs incurred
was_recoverable: bool = False # Did the task eventually succeed after this retry?
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
@dataclass
class AgentRunCostTracker:
"""
Tracks all costs for a single agent task execution, with special attention
to retry waste. Every LLM call, tool invocation, and retry attempt is
recorded so you can answer: "How much of this task's cost was wasted?"
"""
task_id: str
user_id: str
task_type: str = "unknown"
# Accumulators
_total_token_cost: float = 0.0
_total_tool_cost: float = 0.0
_total_tokens: int = 0
_retries: List[RetryRecord] = field(default_factory=list)
_succeeded: bool = False
_attempt_number: int = 0
def start_attempt(self) -> int:
"""Called at the beginning of each attempt. Returns the attempt number."""
self._attempt_number += 1
return self._attempt_number
def record_retry(
self,
reason: str,
tokens: int,
token_cost_usd: float,
tool_calls: int = 0,
tool_cost_usd: float = 0.0,
) -> None:
"""Record a failed attempt that will trigger a retry."""
self._retries.append(RetryRecord(
attempt_number=self._attempt_number,
reason=reason,
tokens_wasted=tokens,
cost_wasted_usd=token_cost_usd,
tool_calls_wasted=tool_calls,
tool_cost_wasted_usd=tool_cost_usd,
was_recoverable=False,
))
def mark_success(self) -> None:
"""Mark the task as successful — all prior retries become 'recoverable'."""
self._succeeded = True
for r in self._retries:
r.was_recoverable = True
def add_cost(self, token_cost_usd: float, tool_cost_usd: float, tokens: int) -> None:
"""记录每个操作的实际成本——无论成功还是失败。
每次 LLM 调用和工具调用都会调用此方法。失败的操作
随后通过 record_retry() 分类为重试浪费——后者仅
标记已记录的成本,不会新增额外花费。
"""
self._total_token_cost += token_cost_usd
self._total_tool_cost += tool_cost_usd
self._total_tokens += tokens
# ---- Computed properties ----
@property
def total_retry_waste_usd(self) -> float:
"""Total USD wasted on retries (regardless of eventual success).
This is a CLASSIFICATION of already-recorded costs, not additional spend.
Costs are recorded via add_cost() first; record_retry() tags a subset
as wasted. Summing both would double-count.
"""
return sum(r.cost_wasted_usd + r.tool_cost_wasted_usd for r in self._retries)
@property
def unrecoverable_waste_usd(self) -> float:
"""Cost of retries where the task ultimately FAILED — pure waste."""
return sum(
r.cost_wasted_usd + r.tool_cost_wasted_usd
for r in self._retries if not r.was_recoverable
)
@property
def total_cost_usd(self) -> float:
"""Actual LLM + tool spend, recorded exactly once per operation.
total_retry_waste_usd is NOT added here — it's a classification of
cost already tracked in _total_token_cost and _total_tool_cost.
"""
return self._total_token_cost + self._total_tool_cost
@property
def waste_ratio(self) -> float:
"""
Ratio of wasted cost to total cost.
0.0 = no waste. 0.30 = 30% of all money was burned on retries.
This is THE single most important metric on your cost dashboard.
"""
total = self.total_cost_usd
if total == 0:
return 0.0
return self.total_retry_waste_usd / total
@property
def retry_count(self) -> int:
return len(self._retries)
def summary(self) -> str:
"""Human-readable cost summary."""
return (
f"Task {self.task_id} ({self.task_type}) — "
f"Status: {'✓' if self._succeeded else '✗'} | "
f"Total: ${self.total_cost_usd:.6f} | "
f"Waste: ${self.total_retry_waste_usd:.6f} "
f"({self.waste_ratio:.1%}) | "
f"Retries: {self.retry_count}"
)
# ---------------------------------------------------------------------------
# Integration example — instrumented agent loop
# ---------------------------------------------------------------------------
def run_agent_task_with_cost_tracking(
task_id: str, user_id: str, task_input: str, max_attempts: int = 5
) -> AgentRunCostTracker:
"""Simulated agent loop with full cost tracking.
设计原则:每次操作都先通过 add_cost() 记录实际花费,
失败的操作随后通过 record_retry() 标记为浪费——重试浪费是对已记录成本的分类,不是额外花费。
"""
tracker = AgentRunCostTracker(task_id=task_id, user_id=user_id, task_type="code_review")
for attempt in range(1, max_attempts + 1):
tracker.start_attempt()
try:
# ... agent loop logic: LLM call, tool invocation, etc. ...
# 模拟:前两次尝试失败,第三次成功
success = attempt >= 3
# === 先记录实际成本(无论成功失败) ===
attempt_cost = 0.0125 # GPT-5.4-mini 价格
tracker.add_cost(token_cost_usd=attempt_cost, tool_cost_usd=0.003, tokens=5000)
if not success:
# === 将此成本标记为重试浪费 ===
tracker.record_retry(
reason="model_error" if attempt == 1 else "wrong_tool",
tokens=5000,
token_cost_usd=attempt_cost,
tool_calls=1,
tool_cost_usd=0.003,
)
continue
# Success: 追加剩余的生产性成本
tracker.add_cost(token_cost_usd=0.0225, tool_cost_usd=0.007, tokens=9000)
tracker.mark_success()
return tracker
except Exception:
tracker.add_cost(token_cost_usd=0.0125, tool_cost_usd=0.0, tokens=5000)
tracker.record_retry(
reason="timeout", tokens=5000, token_cost_usd=0.0125
)
# All attempts exhausted
return tracker
# Run it
tracker = run_agent_task_with_cost_tracking("task-0042", "user-7", "Review PR #342")
print(tracker.summary())
# Task task-0042 (code_review) — Status: ✓ | Total: $0.076000 |
# Waste: $0.031000 (40.8%) | Retries: 2
# That 40.8% is the "wow moment" number.
# 总成本 ($0.076) = 实际 LLM/工具花费,精确记录一次。
# 重试浪费 ($0.031) = 在上述总成本内被标记为浪费的子集。
# 如果每个 task 都长这样,你 40.8% 的 API 账单是垃圾填埋。
Wow moment 的计算:如果你的 Agent 系统每天处理 1000 个 task,每个 task 平均成本 $0.25,waste_ratio 为 20%,那么每天浪费 $50,每月 $1,500,每年 $18,250。而这只是中等规模——一个处理 10,000 task/天的系统在这个比率下一年浪费 $182,500。这些钱不会出现在任何提供商的账单分项中——只有你自己的 cost tracker 才能看到它。你现在就去算一下你的 waste_ratio。
按重试原因分类的浪费分析
waste_ratio 告诉你"浪费了多少",但你需要按原因细分才能决定"优化什么"。下面的 SQL 查询展示了如何在你的 cost_records 表上做这种分析——这些数据来自每个 Agent task 结束时写入数据库的 AgentRunCostTracker 记录:
-- Retry waste breakdown by reason (last 7 days)
-- Use this to identify the #1 source of waste in your agent system
SELECT
reason,
COUNT(*) as retry_count,
SUM(cost_wasted_usd + tool_cost_wasted_usd) as total_waste_usd,
ROUND(AVG(cost_wasted_usd + tool_cost_wasted_usd), 6) as avg_waste_per_retry,
ROUND(100.0 * SUM(cost_wasted_usd + tool_cost_wasted_usd) /
(SELECT SUM(cost_wasted_usd + tool_cost_wasted_usd) FROM retry_records
WHERE timestamp >= NOW() - INTERVAL '7 days'), 1) as pct_of_total_waste
FROM retry_records
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY reason
ORDER BY total_waste_usd DESC;
-- Typical result from a production system:
-- reason | retry_count | total_waste_usd | pct_of_total_waste
-- wrong_tool | 12,430 | $1,243.00 | 38.2%
-- timeout | 8,210 | $820.50 | 25.2%
-- model_error | 6,540 | $654.00 | 20.1%
-- rate_limit | 3,890 | $389.00 | 12.0%
-- hallucination | 1,460 | $146.00 | 4.5%
从这张表可以做出明确的优化决策:如果 wrong_tool 占了 38% 的浪费,那就优化工具描述和 few-shot 示例;如果 timeout 占 25%,那就调整 timeout 策略或切换到更快的模型。
异常的重试成本有时可以作为系统健康状况的先行指标——如果重试率突然飙升,可能意味着上游服务异常或模型行为退化。在这种情况下,重试成本的激增可能触发自动化的回滚决策。参见 Agent 回滚设计 了解如何在检测到成本异常时自动切换到上一个稳定版本。
5. 任务级与用户级成本归因
有了 per-step 的成本数据后,下一个问题是:怎么把这些成本归到正确的任务和用户上?这对于多租户 SaaS 系统尤其关键——你需要知道客户 A 花了 $340,客户 B 花了 $12,才能正确计费和评估客户健康度。
Trace Context 传播:成本归因的基础设施
成本归因的关键不是在"最后一步"给记录贴标签——而是在请求入口处注入 user_id、tenant_id、task_id,并通过 trace context 传播到所有下游 span。无论你的 Agent 用了多少层 LLM 调用和工具调用,每个 cost record 都在出生时就带上了归属信息。
from opentelemetry import trace, baggage
from opentelemetry.trace import SpanKind
import uuid
tracer = trace.get_tracer("agent-cost-attribution")
class CostAttributionContext:
"""
Propagates attribution metadata (user_id, tenant_id, task_id) across all
spans in an agent task's trace tree.
Usage:
ctx = CostAttributionContext(user_id="user-42", tenant_id="tenant-7")
with ctx.span("agent.task", task_type="code_review"):
# All nested spans automatically inherit user_id/tenant_id
...
"""
def __init__(self, user_id: str, tenant_id: str = "default"):
self.user_id = user_id
self.tenant_id = tenant_id
self.task_id = str(uuid.uuid4())[:8]
def span(self, name: str, **attrs):
"""Create a span with attribution baggage automatically attached."""
span = tracer.start_span(name, kind=SpanKind.INTERNAL)
span.set_attribute("cost.user_id", self.user_id)
span.set_attribute("cost.tenant_id", self.tenant_id)
span.set_attribute("cost.task_id", self.task_id)
# Propagate via OpenTelemetry baggage so downstream services
# (e.g., a separate tool-execution service) can extract them
baggage.set_baggage("cost.user_id", self.user_id)
baggage.set_baggage("cost.tenant_id", self.tenant_id)
baggage.set_baggage("cost.task_id", self.task_id)
for k, v in attrs.items():
span.set_attribute(k, str(v))
return span
# ---------------------------------------------------------------------------
# Cost record data model — every record carries attribution info
# ---------------------------------------------------------------------------
@dataclass
class CostRecord:
"""Single cost event, attributed to a task and user."""
trace_id: str
span_id: str
task_id: str
user_id: str
tenant_id: str
provider: str
model: str
category: str # token_input, token_output, tool_call, retry_waste
cost_usd: float
tokens: int = 0
tool_name: str = ""
metadata: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# The cost tracker from Section 4 now emits CostRecords with attribution
class AttributedCostTracker(AgentRunCostTracker):
"""Extends AgentRunCostTracker with attribution-aware record emission."""
def __init__(self, task_id: str, user_id: str, tenant_id: str = "default",
task_type: str = "unknown"):
super().__init__(task_id=task_id, user_id=user_id, task_type=task_type)
self.tenant_id = tenant_id
def emit_cost_record(
self, provider: str, model: str, category: str,
cost_usd: float, tokens: int = 0, tool_name: str = ""
) -> CostRecord:
"""Create an attributed cost record (ready for DB insert or metric export)."""
return CostRecord(
trace_id=trace.get_current_span().get_span_context().trace_id,
span_id=trace.get_current_span().get_span_context().span_id,
task_id=self.task_id,
user_id=self.user_id,
tenant_id=self.tenant_id,
provider=provider, model=model,
category=category, cost_usd=cost_usd,
tokens=tokens, tool_name=tool_name,
)
SQL 聚合查询:从 Raw Records 到 Per-User 账单
当所有 cost records 都带上 user_id 和 tenant_id 后,聚合查询就是纯粹的 SQL:
-- 1. Per-user cost breakdown (multi-tenant billing)
SELECT
tenant_id,
user_id,
COUNT(DISTINCT task_id) as task_count,
SUM(cost_usd) as total_cost_usd,
ROUND(AVG(cost_usd), 6) as avg_cost_per_record,
SUM(CASE WHEN category = 'retry_waste' THEN cost_usd ELSE 0 END) as waste_cost_usd,
ROUND(100.0 * SUM(CASE WHEN category = 'retry_waste' THEN cost_usd ELSE 0 END)
/ NULLIF(SUM(cost_usd), 0), 2) as waste_pct
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY tenant_id, user_id
ORDER BY total_cost_usd DESC;
-- 2. Cost per successful vs failed task — cost efficiency metric
SELECT
task_outcome, -- 'success' or 'failure'
COUNT(DISTINCT task_id) as task_count,
SUM(cost_usd) as total_cost_usd,
ROUND(SUM(cost_usd) / COUNT(DISTINCT task_id), 6) as cost_per_task
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY task_outcome;
-- 3. Model cost efficiency comparison (cost per successful task by model)
SELECT
model,
COUNT(DISTINCT CASE WHEN task_outcome = 'success' THEN task_id END) as successful_tasks,
SUM(CASE WHEN task_outcome = 'success' THEN cost_usd ELSE 0 END) as success_cost,
ROUND(SUM(CASE WHEN task_outcome = 'success' THEN cost_usd ELSE 0 END) /
NULLIF(COUNT(DISTINCT CASE WHEN task_outcome = 'success' THEN task_id END), 0), 6)
as cost_per_successful_task
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY model
ORDER BY cost_per_successful_task ASC;
第三个查询(cost_per_successful_task)是衡量"花得值不值"的核心指标。假设 GPT-5.5 每个成功 task 的成本是 $0.15,DeepSeek-V4-Flash 是 $0.008——GPT-5.5 贵了 18.75 倍。现在的问题是:GPT-5.5 的成功率或输出质量是否比 DeepSeek 高了 18.75 倍?如果没有,那就应该路由更多流量到 DeepSeek。这就是 Section 6 的成本感知路由要做的事。
审计日志是成本归因的基础数据源——不可变的操作记录提供了 cross-check 能力。如果你的 cost record 显示某次工具调用成本异常,审计日志能告诉你那次调用到底发生了什么。参见 Agent 审计日志设计。
成本合理性的最终判断需要与任务质量关联——如果一个任务的成功率高但每次成功成本极高,你可能需要重新评估模型选择。参见 Agent 评测框架设计 中关于 cost-to-quality ratio 的讨论。
6. 预算告警与成本控制
追踪成本是为了控制成本。如果没有预算约束,Agent 系统就像一个没有额度限制的信用卡——一个失控的 loop(Agent 在"思考-调用工具-失败-重试"中无限循环)可以在几分钟内烧掉数百美元。
两种预算控制模式:Hard Stop vs Soft Warning
| 模式 | 行为 | 适用场景 | 用户体验 |
|---|---|---|---|
| Hard Stop | 预算耗尽时立即终止 Agent 执行,抛 BudgetExceededError | 内部 Agent pipeline、批处理任务、非面向终端用户的场景 | 任务失败,但不会继续烧钱 |
| Soft Warning | 预算消耗 80% 时发送告警但允许继续;100% 时阻止新的 expensive 操作 | 面向终端用户的付费产品、高价值任务 | 用户收到提醒,可以选择继续(知道成本) |
大多数生产系统应该同时使用两者:任务级硬停止(防止单个 task 失控)+ 用户级软警告(给付费用户留有余地)。对于使用 DeepSeek 的场景,预算阈值可以设得更低——因为 DeepSeek 太便宜,等账单炸了通常已经执行了数百万 token。
BudgetController 代码实现
from enum import Enum
from typing import Dict, Optional, Callable
import time
import threading
class BudgetAction(Enum):
ALLOW = "allow" # proceed normally
WARN = "warn" # near limit — log warning but allow
BLOCK = "block" # over limit — reject operation
class BudgetExceededError(Exception):
"""Raised when a hard budget limit is exceeded."""
def __init__(self, user_id: str, limit_name: str, current: float, limit: float):
self.user_id = user_id
self.limit_name = limit_name
self.current = current
self.limit = limit
super().__init__(
f"Budget exceeded for {user_id}: {limit_name} "
f"(${current:.4f} > ${limit:.4f})"
)
class BudgetController:
"""
Enforces per-user and per-task budget limits.
Supports three limit scopes:
- per_task: max USD per individual task execution
- daily: max USD per user per calendar day
- monthly: max USD per user per calendar month
关键设计决策(生产环境级别):
- 预授权:在操作执行前估算最大可能成本并预留预算
- >=(非 >):阻止恰好达到限额后的超支
- 线程安全:通过 Lock 实现原子记账,并发任务不会同时通过预算检查后一起超支
- 对账:操作完成后,用实际成本结算预估成本的差异
"""
def __init__(self, cost_store: "CostStore"):
self._store = cost_store
self._limits: Dict[str, Dict[str, float]] = {}
self._actions: Dict[str, Dict[str, BudgetAction]] = {}
self._warning_handler: Optional[Callable] = None
self._lock = threading.Lock()
# 已预留但尚未结算的预估成本
self._reserved: Dict[str, Dict[str, float]] = {}
def set_limit(
self, user_id: str, scope: str, limit_usd: float,
action: BudgetAction = BudgetAction.BLOCK,
) -> None:
self._limits.setdefault(user_id, {})[scope] = limit_usd
self._actions.setdefault(user_id, {})[scope] = action
def set_warning_handler(self, handler: Callable) -> None:
self._warning_handler = handler
def request_budget(
self, user_id: str, task_id: str, estimated_cost: float,
) -> tuple[BudgetAction, str]:
"""Pre-authorize an operation by checking budgets and reserving cost.
调用方在启动新操作前应调用此方法。它会在单个锁内完成
限额检查 + 成本预留,防止并发任务同时通过后超支。
操作完成后调用 reconcile(),用实际成本结算预估差异。
"""
return self.check(user_id, task_id, estimated_next_cost=estimated_cost)
def check(self, user_id: str, task_id: str,
estimated_next_cost: float = 0.0) -> tuple[BudgetAction, str]:
"""
在执行操作前检查所有适用的预算限制。
Pre-authorization:传入 estimated_next_cost 来在执行前预留预算。
操作完成后调用 reconcile() 进行结算。
所有限额比较使用 store_spend + existing_reservations +
estimated_next_cost,使并发任务的预留量在所有比较中都可见。
原子性保证:在单个锁内,将所有已预留但未结算的预留量(_reserved)
纳入限额比较。这意味着两个并发任务不会都通过预算门控后一起超支——
第二个任务的检查将看到第一个任务已预留的金额并据此做出正确判断。
这与 Redis 的 INCR 或数据库的 SELECT ... FOR UPDATE 模式等价。
"""
with self._lock:
limits = self._limits.get(user_id, {})
actions = self._actions.get(user_id, {})
# 包含此前 check() 已预留但尚未 reconcile() 的金额
reserved = self._reserved.get(user_id, {})
# 1. Per-task limit (always hard stop)
task_limit = limits.get("per_task", float("inf"))
task_spend = self._store.get_task_cost(task_id)
task_reserved = reserved.get("per_task", 0.0)
task_projected = task_spend + task_reserved + estimated_next_cost
if task_projected >= task_limit:
raise BudgetExceededError(
user_id, "per_task",
task_projected, task_limit)
# 2. Daily limit
daily_limit = limits.get("daily", float("inf"))
daily_action = actions.get("daily", BudgetAction.BLOCK)
daily_spend = self._store.get_user_daily_cost(user_id)
daily_reserved = reserved.get("daily", 0.0)
daily_projected = daily_spend + daily_reserved + estimated_next_cost
if daily_projected >= daily_limit and daily_action == BudgetAction.BLOCK:
raise BudgetExceededError(
user_id, "daily",
daily_projected, daily_limit)
# 3. Monthly limit
monthly_limit = limits.get("monthly", float("inf"))
monthly_action = actions.get("monthly", BudgetAction.BLOCK)
monthly_spend = self._store.get_user_monthly_cost(user_id)
monthly_reserved = reserved.get("monthly", 0.0)
monthly_projected = monthly_spend + monthly_reserved + estimated_next_cost
if monthly_projected >= monthly_limit and monthly_action == BudgetAction.BLOCK:
raise BudgetExceededError(
user_id, "monthly",
monthly_projected, monthly_limit)
# --- 确定操作类型:BLOCK 已抛异常 → 只剩 ALLOW 或 WARN ---
# 所有 WARN 条件都包含 estimated_next_cost 的投影。
action = BudgetAction.ALLOW
msg = "OK"
# Per-task 80%(硬作用域,始终 WARN)
if task_projected >= task_limit * 0.8:
action = BudgetAction.WARN
msg = (f"Task budget at {task_projected/task_limit:.0%}: "
f"${task_projected:.4f} / ${task_limit:.4f}")
# Daily — 100% 软上限覆盖 80% 警告
if daily_projected >= daily_limit:
self._send_warning(user_id, "daily", daily_projected, daily_limit)
action = BudgetAction.WARN
msg = (f"Daily budget exceeded (soft): "
f"${daily_projected:.4f} >= ${daily_limit:.4f}")
elif daily_projected >= daily_limit * 0.8:
self._send_warning(user_id, "daily", daily_projected, daily_limit)
action = BudgetAction.WARN
msg = (f"Daily budget at {daily_projected/daily_limit:.0%}: "
f"${daily_projected:.4f} / ${daily_limit:.4f}")
# Monthly — 100% 软上限覆盖 80% 警告
if monthly_projected >= monthly_limit:
action = BudgetAction.WARN
msg = (f"Monthly budget exceeded (soft): "
f"${monthly_projected:.4f}")
elif monthly_projected >= monthly_limit * 0.8:
action = BudgetAction.WARN
msg = (f"Monthly budget at {monthly_projected/monthly_limit:.0%}: "
f"${monthly_projected:.4f}")
# 预授权:为 ALLOW 和 WARN 预留预估成本
# (BLOCK 已抛异常,只有 ALLOW/WARN 才能到达此处)
self._reserved.setdefault(user_id, {})
for scope in ["per_task", "daily", "monthly"]:
self._reserved[user_id][scope] = (
self._reserved[user_id].get(scope, 0.0) + estimated_next_cost)
return (action, msg)
def reconcile(self, user_id: str, task_id: str,
actual_cost: float, estimated_cost: float) -> None:
"""操作完成后结算:用实际成本对账预估成本。
无论成功还是失败,check() 之后都必须调用此方法。
"""
with self._lock:
self._store.record_cost(task_id, user_id, actual_cost)
for scope in ["per_task", "daily", "monthly"]:
reserved = self._reserved.get(user_id, {}).get(scope, 0.0)
self._reserved.setdefault(user_id, {})[scope] = max(
0.0, reserved - estimated_cost)
def _send_warning(self, user_id: str, scope: str, current: float, limit: float) -> None:
if self._warning_handler:
self._warning_handler(user_id, scope, current, limit)
# ---------------------------------------------------------------------------
# Abstract cost store — plug in your backend (Postgres, Redis, in-memory)
# ---------------------------------------------------------------------------
class CostStore:
"""Abstract interface for querying current spend. Implement for your backend."""
def get_task_cost(self, task_id: str) -> float:
raise NotImplementedError
def get_user_daily_cost(self, user_id: str) -> float:
raise NotImplementedError
def get_user_monthly_cost(self, user_id: str) -> float:
raise NotImplementedError
def record_cost(self, task_id: str, user_id: str, cost: float) -> None:
raise NotImplementedError
class InMemoryCostStore(CostStore):
"""Simple in-memory store for development/testing."""
def __init__(self):
self._task_cost: Dict[str, float] = {}
self._user_daily: Dict[str, float] = {}
self._user_monthly: Dict[str, float] = {}
def add_cost(self, task_id: str, user_id: str, cost_usd: float) -> None:
self._task_cost[task_id] = self._task_cost.get(task_id, 0.0) + cost_usd
self._user_daily[user_id] = self._user_daily.get(user_id, 0.0) + cost_usd
self._user_monthly[user_id] = self._user_monthly.get(user_id, 0.0) + cost_usd
def record_cost(self, task_id: str, user_id: str, cost: float) -> None:
self.add_cost(task_id, user_id, cost)
def get_task_cost(self, task_id: str) -> float:
return self._task_cost.get(task_id, 0.0)
def get_user_daily_cost(self, user_id: str) -> float:
return self._user_daily.get(user_id, 0.0)
def get_user_monthly_cost(self, user_id: str) -> float:
return self._user_monthly.get(user_id, 0.0)
成本感知模型路由:预算紧张时自动降级
预算控制不只是"超了就停"——更智能的做法是在预算紧张时自动切换到更便宜的模型。这个模式叫 Cost-Aware Model Routing,对于 DeepSeek + GPT/Claude 混合使用的团队尤其有价值:
from typing import Tuple, Optional
class BlockedDecision(Exception):
"""抛出此异常表示预算无法支持任何模型执行此操作。"""
pass
class CostAwareRouter:
"""
基于以下因素将 Agent 任务路由到最合适的模型:
1. 任务复杂度(low/medium/high)
2. 剩余预算
3. 估算的完整成本(含 input、cached input、output、retry headroom、tool headroom)
关键安全规则:
- 如果预算不足以运行任何模型 → 抛出 BlockedDecision
(不要静默路由到最便宜的模型——那依然在花钱)
- 估算所有成本组成部分,而不仅仅是 input tokens
- 未知复杂度 → 假设 HIGH(高估比低估安全)
- 预算门控(BudgetController.check)是主要的;
路由是在通过门控之后的次要决策,不能绕过硬预算门
"""
# 完整成本模型:(input, output, cache_read) 每百万 token 价格,
# 加上估算的输出比例(~20%)和缓存命中比例(~30%)
# 所有价格使用 Decimal 以避免二进制浮点数精度损失
_PRICE_PER_1M: Dict[Tuple[str, str], Tuple[Decimal, Decimal, Decimal]] = {
("openai", "gpt-5.5"): (Decimal("5.00"), Decimal("30.00"), Decimal("0.50")),
("openai", "gpt-5.4"): (Decimal("2.50"), Decimal("15.00"), Decimal("0.25")),
("openai", "gpt-5.4-mini"): (Decimal("0.75"), Decimal("4.50"), Decimal("0.075")),
("anthropic", "claude-sonnet-4-6"): (Decimal("3.00"), Decimal("15.00"), Decimal("0.30")),
("anthropic", "claude-haiku-4-5"): (Decimal("1.00"), Decimal("5.00"), Decimal("0.10")),
("deepseek", "deepseek-v4-flash"): (Decimal("0.14"), Decimal("0.28"), Decimal("0.0028")),
("deepseek", "deepseek-v4-pro"): (Decimal("0.435"), Decimal("0.87"), Decimal("0.003625")),
}
_OUTPUT_RATIO = Decimal("0.20") # 假设 ~20% 的 output/input token 比
_CACHE_RATIO = Decimal("0.30") # 假设 ~30% 的 input 命中缓存(system prompt)
_RETRY_HEADROOM = Decimal("0.15") # 15% 的潜在重试余量
_TOOL_HEADROOM = Decimal("0.10") # 10% 的工具调用开销余量
def __init__(self, budget_controller: BudgetController):
self.budget = budget_controller
def estimate_full_cost(self, provider: str, model: str,
estimated_input_tokens: int) -> Decimal:
"""估算包含所有组成部分的完整成本。"""
prices = self._PRICE_PER_1M.get((provider, model))
if prices is None:
return Decimal("0")
inp_p, out_p, cache_p = prices
cached = int(estimated_input_tokens * self._CACHE_RATIO)
uncached = estimated_input_tokens - cached
output_tokens = int(estimated_input_tokens * self._OUTPUT_RATIO)
input_cost = (Decimal(uncached) * inp_p + Decimal(cached) * cache_p) / Decimal("1_000_000")
output_cost = Decimal(output_tokens) * out_p / Decimal("1_000_000")
subtotal = input_cost + output_cost
# 为重试和工具调用添加安全余量
total = subtotal * (Decimal("1") + self._RETRY_HEADROOM + self._TOOL_HEADROOM)
return total
def select_model(
self,
user_id: str,
task_complexity: str,
estimated_input_tokens: int,
daily_budget_remaining: Decimal,
) -> Tuple[str, str]:
"""
返回 (provider, model) 元组,或抛出 BlockedDecision。
门控检查:调用者必须在调用此方法之前通过 BudgetController.check()。
路由是次要决策,不能绕过硬预算门。
"""
# 未知复杂度默认 HIGH——高估比低估安全
complexity = (
task_complexity if task_complexity in ("low", "medium", "high")
else "high"
)
# 按复杂度定义候选模型优先级
if complexity == "high":
candidates = [
("openai", "gpt-5.5"),
("openai", "gpt-5.4"),
("deepseek", "deepseek-v4-pro"),
("deepseek", "deepseek-v4-flash"),
]
elif complexity == "medium":
candidates = [
("openai", "gpt-5.4-mini"),
("deepseek", "deepseek-v4-flash"),
]
else: # low
candidates = [("deepseek", "deepseek-v4-flash")]
# 按优先级遍历候选模型
for provider, model in candidates:
est = self.estimate_full_cost(provider, model, estimated_input_tokens)
if est <= daily_budget_remaining:
return (provider, model)
# 没有合适的模型 → 阻止操作
cheapest = candidates[-1]
cheapest_est = self.estimate_full_cost(
cheapest[0], cheapest[1], estimated_input_tokens)
raise BlockedDecision(
f"预算不足 ({user_id}): "
f"最便宜选项 ({cheapest[0]}/{cheapest[1]}) "
f"估算成本 ${float(cheapest_est):.6f} > ${float(daily_budget_remaining):.6f}"
)
# ---------------------------------------------------------------------------
# 集成预算检查 + 模型路由的 agent step
# ---------------------------------------------------------------------------
def agent_step_with_budget(
router: CostAwareRouter,
budget_ctrl: BudgetController,
user_id: str, task_id: str, task_complexity: str,
estimated_tokens: int, daily_remaining: Decimal,
) -> Optional[str]:
"""单步 agent step:先过预算门,再做路由。"""
# Step 1: 先过预算门(硬门控)
action, reason = budget_ctrl.check(user_id, task_id)
if action == BudgetAction.BLOCK:
return None
# Step 2: 路由选模(预算门已通过后的次要决策)
try:
provider, model = router.select_model(
user_id, task_complexity, estimated_tokens, daily_remaining
)
except BlockedDecision:
return None
print(f"[Budget OK] {reason} | Routing to {provider}/{model}")
return f"Step completed with {provider}/{model}"
关于成本感知路由的更深入讨论——包括基于质量信号的动态路由策略和模型 failover 机制——参见 构建模型无关的 AI Agent。
在发布 Gate 中加入成本预算检查是另一个关键实践——如果一个新版本的成本/任务比旧版本高出 30% 以上,应该阻止发布。参见 Agent 发布 Gate 设计。
7. 对接可观测平台
前面几节的代码生成了 cost records——但它们还被困在 Python 进程的内存或数据库表中。要让成本数据真正"可观测",需要把它注入到你的可观测基础设施中:OpenTelemetry traces(提供 span 级别的成本上下文)和 Prometheus metrics(提供实时聚合和告警)。
OpenTelemetry Span Attributes:让每个 span 带上成本信息
在每次 LLM 调用和工具调用上附加成本相关的 span attributes,这样你在 Jaeger/Tempo/Datadog 中查看 trace 时,每个 span 节点都能直接显示美元金额:
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode
import time
tracer = trace.get_tracer("agent-cost-instrumentation")
"""
Agent 成本可观测性 span 的自定义属性 schema:
标准 OTel GenAI 语义约定:
gen_ai.provider.name — provider 名称(如 "openai"、"anthropic")
gen_ai.request.model — 模型名称(如 "gpt-5.4")
gen_ai.operation.name — 操作类型("chat"、"tool_use")
gen_ai.usage.input_tokens — input token 数量
gen_ai.usage.output_tokens — output token 数量
应用命名空间成本属性(app.agent_cost.*):
app.agent_cost.usd — 此操作的总成本(美元)
app.agent_cost.input_usd — input 成本
app.agent_cost.output_usd — output 成本
app.agent_cost.task_id — 任务标识符
app.agent_cost.user_id — 用户标识符
app.agent_cost.tool_name — 工具名称(仅工具 span)
app.agent_cost.tool_usd — 工具 L3 成本(仅工具 span)
延迟:span duration 是测量延迟的标准方式。
下面的自定义 operation.duration_ms 属性仅作示例;
在生产环境中应使用 span.get_span_context() 和 OTel SDK
在导出时从 start_time / end_time 计算 duration。
"""
def instrumented_llm_call(
provider: str, model: str,
messages: list, tools: list | None = None,
task_id: str = "", user_id: str = "",
) -> dict:
"""Wraps an LLM API call with cost-aware OpenTelemetry instrumentation."""
with tracer.start_as_current_span(
f"agent.llm.{provider}.{model}",
kind=SpanKind.CLIENT,
) as span:
# 标准 GenAI 语义约定
span.set_attribute("gen_ai.provider.name", provider)
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.operation.name", "chat")
if tools:
span.set_attribute("gen_ai.tool.count", len(tools))
# 应用命名空间属性(app.agent_cost.*)
span.set_attribute("app.agent_cost.task_id", task_id)
span.set_attribute("app.agent_cost.user_id", user_id)
start = time.time()
try:
response = _call_llm_api(provider, model, messages, tools)
usage = response.get("usage", {})
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
cache_read = (
usage.get("input_token_details", {}).get("cached_tokens", 0)
if provider == "openai"
else usage.get("cache_read_input_tokens", 0)
)
cost = calculate_token_cost(provider, model, input_tokens, output_tokens, cache_read)
span.set_attribute("gen_ai.usage.input_tokens", input_tokens)
span.set_attribute("gen_ai.usage.output_tokens", output_tokens)
span.set_attribute("gen_ai.usage.cache_read_tokens", cache_read)
span.set_attribute("app.agent_cost.usd", float(cost.total_cost_usd))
span.set_attribute("app.agent_cost.input_usd", float(cost.input_cost_usd))
span.set_attribute("app.agent_cost.output_usd", float(cost.output_cost_usd))
span.set_attribute("app.agent_cost.provider", provider)
span.set_attribute("app.agent_cost.model", model)
span.set_status(Status(StatusCode.OK))
return response
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
finally:
duration_ms = (time.time() - start) * 1000
span.set_attribute("app.agent_cost.duration_ms", duration_ms)
def instrumented_tool_call(
tool_name: str, tool_input: dict,
task_id: str = "", user_id: str = "",
) -> dict:
"""Wraps a tool call with cost-aware instrumentation."""
with tracer.start_as_current_span(
f"agent.tool.{tool_name}",
kind=SpanKind.INTERNAL,
) as span:
# 标准语义约定
span.set_attribute("gen_ai.operation.name", "tool_use")
# 应用命名空间属性
span.set_attribute("app.agent_cost.tool_name", tool_name)
span.set_attribute("app.agent_cost.task_id", task_id)
span.set_attribute("app.agent_cost.user_id", user_id)
start = time.time()
try:
result = _execute_tool(tool_name, tool_input)
span.set_status(Status(StatusCode.OK))
tool_pricing = TOOL_COST_REGISTRY.get(tool_name, {})
if tool_pricing.get("model") == "per_call":
span.set_attribute("app.agent_cost.tool_usd", tool_pricing.get("cost_per_call", 0.0))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
finally:
span.set_attribute("app.agent_cost.tool_duration_ms", (time.time() - start) * 1000)
# Stub implementations — replace with your actual API clients
def _call_llm_api(provider: str, model: str, messages: list, tools: list | None = None) -> dict:
"""Replace with actual provider API call."""
return {"usage": {"prompt_tokens": 0, "completion_tokens": 0}}
def _execute_tool(tool_name: str, tool_input: dict) -> dict:
"""Replace with actual tool execution."""
return {"result": "ok"}
Prometheus Metrics:实时成本聚合与告警
OTel spans 提供的是按请求的明细,Prometheus metrics 提供的是按时间的聚合。两者互补:
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
# 注意:Prometheus 标签必须使用有界维度。
# user_id 被有意排除——按用户归因应放在 traces(span attributes)、日志
# 或 SQL 成本账本中,而非 Prometheus 时间序列基数中。
# 包含 user_id 会为每个用户创建一个标签集,导致基数爆炸和内存膨胀。
cost_registry = CollectorRegistry()
# Counter: cumulative cost by dimension — never resets, perfect for billing
agent_cost_counter = Counter(
"agent_cost_usd_total",
"Total agent cost in USD",
labelnames=["provider", "model", "team", "tenant_id", "category"],
registry=cost_registry,
)
# Histogram: per-task cost distribution — identify outliers
task_cost_histogram = Histogram(
"agent_task_cost_usd",
"Cost per agent task in USD",
labelnames=["task_type", "team", "outcome"],
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 50.0],
registry=cost_registry,
)
# Gauge: current spend rate — "are we burning money right now?"
spend_rate_gauge = Gauge(
"agent_spend_rate_usd_per_hour",
"Current agent spending rate in USD per hour",
labelnames=["team", "tenant_id"],
registry=cost_registry,
)
# Counter: retry waste — THE metric that creates the "wow moment"
retry_waste_counter = Counter(
"agent_retry_waste_usd_total",
"Total USD wasted on retries",
labelnames=["team", "tenant_id", "reason"],
registry=cost_registry,
)
# Gauge: waste ratio — real-time percentage of spend that's wasted
waste_ratio_gauge = Gauge(
"agent_waste_ratio",
"Ratio of retry waste to total cost (0.0-1.0)",
labelnames=["team", "tenant_id"],
registry=cost_registry,
)
# Histogram: per-tool cost — identify expensive tools
tool_cost_histogram = Histogram(
"agent_tool_cost_usd",
"Cost per tool invocation in USD",
labelnames=["tool_name", "team"],
buckets=[0.0001, 0.001, 0.005, 0.01, 0.05, 0.1],
registry=cost_registry,
)
def record_cost_metrics(tracker: AgentRunCostTracker, provider: str, model: str,
user_id: str, tenant_id: str, team: str = "default") -> None:
"""Export accumulated cost data from a tracker to Prometheus metrics.
user_id is passed for trace/log attribution but NOT included in metric labels.
"""
# Task-level histogram
task_cost_histogram.labels(
task_type=tracker.task_type,
team=team,
outcome="success" if tracker._succeeded else "failure",
).observe(tracker.total_cost_usd)
# Cost counter by category
agent_cost_counter.labels(
provider=provider, model=model,
team=team, tenant_id=tenant_id,
category="token",
).inc(tracker._total_token_cost)
agent_cost_counter.labels(
provider=provider, model=model,
team=team, tenant_id=tenant_id,
category="tool",
).inc(tracker._total_tool_cost)
# Retry waste — the most actionable metric
if tracker.total_retry_waste_usd > 0:
retry_waste_counter.labels(
team=team, tenant_id=tenant_id,
reason="all", # you can also emit per-reason
).inc(tracker.total_retry_waste_usd)
# Waste ratio gauge — shows current waste level
waste_ratio_gauge.labels(
team=team, tenant_id=tenant_id,
).set(tracker.waste_ratio)
Dashboard SQL 查询
以下查询可以直接用在 Grafana 或其他 SQL 兼容的面板中——假设你的 cost_records 表结构如 Section 5 所定义:
-- Panel 1: Cost trend (time series) — last 24h, 1h granularity
SELECT
date_trunc('hour', timestamp) as hour,
SUM(cost_usd) as cost_per_hour
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour;
-- Panel 2: Cost breakdown by model (pie chart)
SELECT
model,
SUM(cost_usd) as total_spend,
COUNT(*) as call_count,
ROUND(AVG(cost_usd), 6) as avg_cost
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '7 days'
AND category IN ('token_input', 'token_output')
GROUP BY model
ORDER BY total_spend DESC;
-- Panel 3: Top spenders (bar chart) — multi-tenant view
SELECT
tenant_id,
user_id,
SUM(cost_usd) as total_cost,
COUNT(DISTINCT task_id) as task_count,
ROUND(SUM(cost_usd) / NULLIF(COUNT(DISTINCT task_id), 0), 6) as cost_per_task
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY tenant_id, user_id
ORDER BY total_cost DESC
LIMIT 20;
-- Panel 4: Waste ratio gauge (single value)
SELECT
ROUND(100.0 * SUM(CASE WHEN category = 'retry_waste' THEN cost_usd ELSE 0 END)
/ NULLIF(SUM(cost_usd), 0), 2) as waste_pct
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '24 hours';
-- Panel 5: Cost per successful task (by model)
SELECT
model,
COUNT(DISTINCT task_id) FILTER (WHERE task_outcome = 'success') as successful,
SUM(cost_usd) FILTER (WHERE task_outcome = 'success') as success_cost,
ROUND(SUM(cost_usd) FILTER (WHERE task_outcome = 'success')
/ NULLIF(COUNT(DISTINCT task_id) FILTER (WHERE task_outcome = 'success'), 0), 6)
as cost_per_success
FROM cost_records
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY model
ORDER BY cost_per_success ASC;
8. 完整示例:端到端 Python Instrumented Agent Loop
本节将所有前文的概念整合到一个完整、可运行的 Agent loop 中。这个 loop 包含了 token 成本追踪(Section 2)、工具调用成本(Section 3)、重试浪费记录(Section 4)、成本归因(Section 5)、预算控制(Section 6)和 Prometheus 指标导出(Section 7)。
"""
End-to-end instrumented agent loop with full cost observability.
Integrates:
- Provider Pricing Registry (Section 2)
- Tool Cost Registry (Section 3)
- Retry Waste Tracking (Section 4)
- Cost Attribution (Section 5)
- Budget Control (Section 6)
- Prometheus Metrics Export (Section 7)
Run this as a single Python file to see cost observability in action.
"""
from __future__ import annotations
import time
import uuid
from dataclasses import dataclass, field
from decimal import Decimal
from typing import List, Dict, Optional, Tuple
from enum import Enum
# ============================================================================
# SECTION 2: Provider Pricing Registry + Token Cost Calculator
# ============================================================================
_ONE_MILLION = Decimal("1_000_000")
MODEL_PRICING: Dict[Tuple[str, str], Dict[str, Decimal]] = {
("openai", "gpt-5.4"): {"input": Decimal("2.50"), "output": Decimal("15.00"), "cache_read": Decimal("0.25")},
("openai", "gpt-5.4-mini"): {"input": Decimal("0.75"), "output": Decimal("4.50"), "cache_read": Decimal("0.075")},
("anthropic", "claude-sonnet-4-6"): {"input": Decimal("3.00"), "output": Decimal("15.00"), "cache_read": Decimal("0.30")},
("anthropic", "claude-haiku-4-5"): {"input": Decimal("1.00"), "output": Decimal("5.00"), "cache_read": Decimal("0.10")},
("deepseek", "deepseek-v4-flash"): {"input": Decimal("0.14"), "output": Decimal("0.28"), "cache_read": Decimal("0.0028")},
("deepseek", "deepseek-v4-pro"): {"input": Decimal("0.435"), "output": Decimal("0.87"), "cache_read": Decimal("0.003625")},
}
def calc_token_cost(provider: str, model: str, input_tokens: int,
output_tokens: int, cache_read_tokens: int = 0) -> Decimal:
prices = MODEL_PRICING.get((provider, model), {})
if not prices:
return Decimal("0")
cache_read = min(cache_read_tokens, input_tokens)
uncached = max(0, input_tokens - cache_read)
input_cost = (uncached * prices["input"] + cache_read * prices["cache_read"]) / _ONE_MILLION
output_cost = output_tokens * prices["output"] / _ONE_MILLION
return input_cost + output_cost # aggregate unrounded, display rounds later
# ============================================================================
# SECTION 3: Tool Cost Registry
# ============================================================================
TOOL_COST_REGISTRY = {
"web_search": {"cost_per_call": Decimal("0.01")},
"db_query": {"cost_per_call": Decimal("0.0001")},
"code_exec": {"cost_per_second": Decimal("0.000014")},
"file_read": {"cost_per_call": Decimal("0")},
}
TOOL_DEF_TOKENS = {"web_search": 500, "db_query": 350, "code_exec": 600, "file_read": 290}
def calc_tool_cost(tool_name: str, duration_s: float = 0.0) -> Decimal:
pricing = TOOL_COST_REGISTRY.get(tool_name, {"cost_per_call": Decimal("0")})
if "cost_per_call" in pricing:
return pricing["cost_per_call"]
elif "cost_per_second" in pricing:
return pricing["cost_per_second"] * Decimal(str(duration_s))
return Decimal("0")
# ============================================================================
# SECTION 4: Retry Waste Tracker
# ============================================================================
@dataclass
class RetryRecord:
attempt: int
reason: str
tokens_wasted: int
cost_wasted: Decimal
tool_cost_wasted: Decimal = Decimal("0")
recoverable: bool = False
@dataclass
class TaskCostTracker:
task_id: str
user_id: str
tenant_id: str = "default"
task_type: str = "unknown"
_token_cost: Decimal = Decimal("0")
_tool_cost: Decimal = Decimal("0")
_retries: List[RetryRecord] = field(default_factory=list)
_succeeded: bool = False
def add_llm_cost(self, provider: str, model: str, input_tok: int,
output_tok: int, cache_tok: int = 0) -> Decimal:
c = calc_token_cost(provider, model, input_tok, output_tok, cache_tok)
self._token_cost += c
return c
def add_tool_cost(self, tool_name: str, duration_s: float = 0.0) -> Decimal:
c = calc_tool_cost(tool_name, duration_s)
self._tool_cost += c
return c
def record_retry(self, attempt: int, reason: str, tokens: int, token_cost: Decimal,
tool_cost: Decimal = Decimal("0")) -> None:
self._retries.append(RetryRecord(
attempt=attempt, reason=reason, tokens_wasted=tokens,
cost_wasted=token_cost, tool_cost_wasted=tool_cost,
))
def mark_success(self) -> None:
self._succeeded = True
for r in self._retries:
r.recoverable = True
@property
def waste_usd(self) -> Decimal:
"""Classification of already-recorded costs that were wasted on retries."""
return sum((r.cost_wasted + r.tool_cost_wasted for r in self._retries), Decimal("0"))
@property
def total_cost(self) -> Decimal:
"""Actual LLM + tool spend, recorded exactly once."""
return self._token_cost + self._tool_cost
@property
def waste_ratio(self) -> float:
total = self.total_cost
if total == Decimal("0"):
return 0.0
return float(self.waste_usd / total)
def summary(self) -> str:
return (f"[{self.task_id}] {self.task_type} | "
f"Total: ${float(self.total_cost):.6f} | "
f"Waste: ${float(self.waste_usd):.6f} ({self.waste_ratio:.1%}) | "
f"Retries: {len(self._retries)} | "
f"Outcome: {'success' if self._succeeded else 'failed'}")
# ============================================================================
# SECTION 5 + 6: Budget Controller (simplified — pre-authorization + reconcile)
# ============================================================================
# NOTE: This is a single-process illustrative BudgetController with
# pre-authorization (request_budget) and reconciliation (reconcile).
# The full production-hardened version (Section 6) adds threading.Lock
# and multi-scope (per_task/daily/monthly) reservations.
# For multi-process deployments, replace with Redis INCR/INCRBY or
# DB-level SELECT ... FOR UPDATE.
class BudgetAction(Enum):
ALLOW = "allow"
WARN = "warn"
BLOCK = "block"
class BudgetController:
def __init__(self):
self._daily: Dict[str, Decimal] = {}
self._limits: Dict[str, Decimal] = {}
self._reserved: Dict[str, Decimal] = {}
def set_limit(self, user_id: str, daily_usd: float) -> None:
self._limits[user_id] = Decimal(str(daily_usd))
def request_budget(self, user_id: str, estimated_cost: Decimal) -> Tuple[BudgetAction, str]:
"""Pre-authorize: check limits including pending reservations, then reserve."""
limit = self._limits.get(user_id, Decimal("inf"))
spent = self._daily.get(user_id, Decimal("0"))
reserved = self._reserved.get(user_id, Decimal("0"))
projected = spent + reserved + estimated_cost
if projected >= limit:
return (BudgetAction.BLOCK,
f"Budget exceeded: ${float(projected):.4f} >= ${float(limit):.4f}")
# Reserve the estimated cost
self._reserved[user_id] = reserved + estimated_cost
if projected >= limit * Decimal("0.8"):
return (BudgetAction.WARN,
f"WARNING: {float(projected/limit):.0%} of daily budget used")
return (BudgetAction.ALLOW, "OK")
def reconcile(self, user_id: str, estimated_cost: Decimal, actual_cost: Decimal) -> None:
"""Settle reservation with actual cost. Call after task completes."""
self._daily[user_id] = self._daily.get(user_id, Decimal("0")) + actual_cost
reserved = self._reserved.get(user_id, Decimal("0"))
self._reserved[user_id] = max(Decimal("0"), reserved - estimated_cost)
# ============================================================================
# SECTION 8: The complete instrumented agent loop
# ============================================================================
class InstrumentedAgent:
"""
A simulated agent loop that demonstrates end-to-end cost observability.
In a real system, you would replace the simulated API calls with actual
OpenAI/Anthropic/DeepSeek client calls and the simulated tool executions
with real tool implementations.
"""
def __init__(self):
self.budget = BudgetController()
def run_task(
self,
task_input: str,
user_id: str,
tenant_id: str = "default",
provider: str = "openai",
model: str = "gpt-5.4-mini",
max_turns: int = 5,
max_retries: int = 3,
) -> TaskCostTracker:
task_id = str(uuid.uuid4())[:8]
tracker = TaskCostTracker(
task_id=task_id, user_id=user_id,
tenant_id=tenant_id, task_type="general",
)
context_messages = [{"role": "user", "content": task_input}]
# Estimate worst-case cost for the entire task
est_max_input = 20000 # worst-case total input tokens
est_max_output = 4000 # worst-case total output tokens
est_max_cost = calc_token_cost(provider, model, est_max_input, est_max_output)
# Add worst-case tool call estimates (one search per two turns)
est_max_cost += calc_tool_cost("web_search", duration_s=2.0) * Decimal(max_turns // 2)
# Pre-authorize budget
action, reason = self.budget.request_budget(user_id, est_max_cost)
if action == BudgetAction.BLOCK:
print(f"[BUDGET STOP] {reason}")
return tracker
if action == BudgetAction.WARN:
print(f"[BUDGET WARN] {reason}")
actual_cost = Decimal("0")
turn = 0
while turn < max_turns:
turn += 1
# ---- LLM call with retry loop ----
for attempt in range(1, max_retries + 1):
try:
# Simulated LLM call with "realistic" token counts
input_tokens = len(str(context_messages)) // 4 + 2000 # rough estimate
output_tokens = 300 + (turn * 50)
cache_tokens = min(1500, input_tokens) if turn > 1 else 0
cost = tracker.add_llm_cost(provider, model, input_tokens, output_tokens, cache_tokens)
actual_cost += cost
# Simulated: sometimes the LLM call fails
if attempt < max_retries and turn == 2:
raise TimeoutError("Simulated LLM timeout")
break # success — exit retry loop
except TimeoutError:
wasted_tokens = input_tokens // 2
wasted_cost = calc_token_cost(provider, model, wasted_tokens, 0)
tracker.record_retry(attempt, "timeout", wasted_tokens, wasted_cost)
if attempt == max_retries:
self.budget.reconcile(user_id, est_max_cost, actual_cost)
print(f"[FAIL] Task {task_id} exhausted retries")
return tracker
# ---- Tool call simulation ----
if turn % 2 == 0:
tool_name = "web_search"
tool_cost = tracker.add_tool_cost(tool_name, duration_s=1.5)
actual_cost += tool_cost
# Simulated: search result appended to context
context_messages.append({"role": "tool", "content": "search result..."})
# Simulated: task completes on turn 3
if turn >= 3:
tracker.mark_success()
break
# Reconcile actual cost vs. estimated
self.budget.reconcile(user_id, est_max_cost, actual_cost)
print(tracker.summary())
return tracker
# ============================================================================
# Demo: run 10 tasks and see the cost observability in action
# ============================================================================
if __name__ == "__main__":
agent = InstrumentedAgent()
agent.budget.set_limit("user-42", daily_usd=0.50)
agent.budget.set_limit("user-7", daily_usd=5.00)
tasks = [
("Review PR #342 for security issues", "user-42", "openai", "gpt-5.4-mini"),
("Generate unit tests for auth module", "user-7", "anthropic", "claude-haiku-4-5"),
("Analyze database query performance", "user-42", "deepseek", "deepseek-v4-flash"),
("Refactor payment service error handling", "user-7", "openai", "gpt-5.4"),
("Audit access control in middleware", "user-42", "deepseek", "deepseek-v4-flash"),
("Optimize Docker image build pipeline", "user-7", "anthropic", "claude-sonnet-4-6"),
("Fix race condition in WebSocket handler", "user-42", "openai", "gpt-5.4-mini"),
("Write migration for user preferences schema", "user-42", "deepseek", "deepseek-v4-flash"),
("Benchmark new caching layer with Redis", "user-7", "openai", "gpt-5.4-mini"),
("Document API rate limiting strategy", "user-42", "deepseek", "deepseek-v4-flash"),
]
total_cost = Decimal("0")
total_waste = Decimal("0")
for task_input, user_id, provider, model in tasks:
tracker = agent.run_task(task_input, user_id=user_id, provider=provider, model=model)
total_cost += tracker.total_cost
total_waste += tracker.waste_usd
print(f"\n{'='*60}")
print(f"TOTAL COST (10 tasks): ${float(total_cost):.6f}")
print(f"TOTAL WASTE: ${float(total_waste):.6f}")
print(f"OVERALL WASTE RATIO: {float(total_waste/total_cost):.1%}" if total_cost > 0 else "N/A")
print(f"{'='*60}")
# Typical output:
# TOTAL COST (10 tasks): $0.123456
# TOTAL WASTE: $0.024691
# OVERALL WASTE RATIO: 20.0%
# ============================================================
#
# That 20% waste ratio is real money. If you scale to 10,000 tasks/day,
# it's $24.69/day or $9,012/year being thrown away on retries.
# This is what cost observability reveals that token counting alone doesn't.
这个完整示例展示了成本可观测性的核心设计原则:每一行 LLM 调用和工具执行都被成本追踪包裹,每一分钱都有归属,每一笔浪费都被量化。你可以在自己的 Agent 系统中以此为起点,逐步填入真实的 API client 和工具实现。
常见问题
1. 我的 Agent 每天跑几百次,有什么最简单的方法开始追踪成本?
从 API 响应中提取 usage 字段(input_tokens/output_tokens),乘以对应模型的价格表,记录到结构化日志。这一步不需要任何新工具——在现有 LLM 调用代码外面包一层 10 行的 wrapper 就能看到每次调用的近似美元成本。先用这个方法跑一周,拿到总成本的量级认知,再逐步细化到 per-task 归因和工具调用成本追踪。最简单的起步代码就是 Section 2 的 calculate_token_cost() 函数——你只需要把 (provider, model, input_tokens, output_tokens) 四个参数喂进去。先不要做多租户分摊和预算控制——那些是 L3/L4 的事情,L2 就够你看到第一个"wow moment"。
2. Token 成本和工具调用成本应该分开算吗?
应该分开,而且必须分开。Token 成本跟着 LLM provider 走(OpenAI、Anthropic、DeepSeek——不同 provider 的账单是独立的),工具调用成本可能来自完全不同的供应商(搜索 API 用量、数据库查询、容器运行时间)。分开追踪才能在账单异常时快速定位——是模型调用量暴增还是第三方 API 费用爆炸。如果你的 SerpAPI 月费用从 $200 突然跳到 $2,000,但你把它混在"总成本"里,你可能要排查几个小时才能定位。此外,分开追踪让你可以做出更好的优化决策:如果工具调用成本占 40%,你的优化方向是减少搜索轮次;如果 token 成本占 80%,你的优化方向是 prompt caching 或切换到更便宜的模型。
3. 重试浪费的钱真的值得专门追踪和建指标吗?
绝对值得。本文的核心论据就在 Section 4:实际生产中重试浪费占 Agent 总成本的 15-30%(示意场景,具体比例取决于 Agent 设计和重试策略)。如果你的团队没有追踪这个数字,你很可能在大量烧钱而完全不知道。一个具体的案例(案例场景 — 具体数字仅供参考,并非来源于已发表的公开研究):某团队将 Agent 的超时设为 5 秒,认为"快失败快重试是好事"。但当他们实现 retry waste tracking 后发现,每天有 22% 的 LLM 调用触发 timeout 重试(他们的 LLM provider 在高峰期响应时间经常超过 5 秒),月浪费 $2,800。将超时从 5 秒调到 15 秒后,重试率降到 4%,月浪费降到 $510——这次优化的 ROI 是 5.5 倍。如果没有 retry waste metric,他们永远不会知道这个问题存在。实现成本也很低——Section 4 的 AgentRunCostTracker 不到 80 行,集成到现有 Agent loop 中只需要在每个 LLM 调用和工具调用的异常路径上加一行 tracker.record_retry()。
4. 用 DeepSeek 还是 Claude/GPT?成本差这么多,质量真的够吗?
没有绝对答案,但有明确的决策框架。对于结构化提取、简单分类、代码补全、文本摘要等 task,DeepSeek-V4-Flash 的成本是 GPT-5.5 的 1/36(以 input token 计),而在这类 task 上的质量差距很小(通常 < 5% 的准确率差异)。对于复杂推理、多步工具调用、需要严格格式遵循的场景,Claude Sonnet 或 GPT-5.4 更可靠——它们的工具调用成功率比 DeepSeek 高约 8-12 个百分点(示意范围,实际取决于任务类型)。最佳实践是 Section 6 的成本感知路由:简单任务走 DeepSeek,复杂任务走 Claude/GPT,预算紧张时全部降级到 DeepSeek。用 Section 5 的 cost-per-successful-task 指标持续监控——如果 DeepSeek 的 cost-per-successful-task 是 GPT-5.5 的 1/30 而成功率只低 3%,那就应该把更多流量路由到 DeepSeek。关于跨模型的通用 Agent 架构,参见 构建模型无关的 AI Agent。
5. 预算控制用硬停止还是软警告?什么时候用哪种?
取决于场景——两者不互斥,多数生产系统应该同时使用。硬停止适用于内部 Agent pipeline 和批处理场景:这些场景下没有"用户体验"问题,超预算就意味着任务失败。硬停止防止了最坏的场景——失控的 Agent loop 在深夜烧掉几百美元无人知晓。软警告适用于面向终端用户的付费产品:在预算消耗 80% 时提醒用户"本周期 AI 用量已接近限额",让用户自行决定是否继续。硬停止会创造糟糕的用户体验(正在进行的任务突然中断),而用户的付费意愿可能远超你的默认限额。你还可以混合使用:任务级硬停止 + 用户级软警告——单个 task 不能超过 $5(防止 loop 失控),但用户可以超过月预算 $500(因为有些高价值任务就是贵)。关键是在每个 LLM 调用之前执行 budget check(Section 6 的 BudgetController.check()),而不是在任务完成后做"事后对账"——后者只能告诉你超了,不能阻止你超。
6. 怎么衡量"花得值不值"?cost-to-quality ratio 怎么算?
核心思路是将成本与任务结果关联。对每个 task,记录 cost_usd + outcome: success/failure + quality_score: 0-1(如果有评测框架的话)。关键指标:cost-per-successful-task(成功任务的美元成本——排除失败任务的浪费)、waste-ratio(失败任务成本 / 总成本)、cost-per-quality-point(总成本 / 总质量得分)。举例:Model A 的 cost-per-successful-task 是 $0.05,成功率 94%;Model B 的 cost-per-successful-task 是 $0.12,成功率 97%。Model B 贵了 2.4 倍,但成功率只提高了 3 个百分点——这在大多数场景下不值得。除非那 3% 的失败任务会导致远大于 $0.07 的业务损失(如合规违规、客户流失)。关于评测框架中如何获取 quality_score,参见 Agent 评测框架设计。
7. 我应该自己实现成本追踪还是用 LangSmith / LangFuse 等现成工具?
二者不是互斥的。如果你的团队已经在用 LangChain/LangSmith 或类似的框架,利用它们的自动 token 计数可以快速到达 L2(美元计价)。但这些平台有两个局限:(1)工具调用成本通常需要手动上报——你的搜索 API 账单不会自动出现在 LangSmith 中,你需要用 usage_metadata.total_cost 字段手动注入;(2)跨平台的成本聚合困难——如果你同时用了 OpenAI 和 DeepSeek,或者 Agent 的一部分调用走 LangChain 而另一部分走原生 SDK,成本就会分散在多个系统中。本文推崇的 Provider Pricing Registry 和 CostRecord 数据模型是 vendor-neutral 的——你可以在这些平台之外独立运行,也可以把它们的数据导入到统一的 cost_records 表。选择"买"还是"建"的决策标准:如果你只需要 L1-L2 级别的追踪,用现成工具最省力;如果你需要 L3(多维度归因)和 L4(成本控制闭环),自建或在现成工具基础上加一层 abstraction 是更灵活的选择。
8. prompt caching、batch API 和上下文压缩——我应该优先做哪个来降本?
按 ROI 排序:(1)Prompt caching — 立竿见影,零代码改动。大多数 Agent 有固定的 system prompt(角色定义、工具列表、输出格式说明),启用 prompt caching 后这部分 input token 成本直接降 90%(OpenAI/Anthropic)到 98%(DeepSeek)。如果你的 system prompt 是 5000 tokens,每天 10000 次调用,GPT-5.4 上每天省 $112。唯一代价是 system prompt 必须放在消息列表的最前面(caching 通常从开头开始匹配)。(2)上下文压缩 — 中等投入,高回报。Agent loop 越到后面 context 越长,成本加速度增长。在 tool result 进入 context 之前做摘要或截断,可以减少 30-50% 的 input token 增长。参见 Agent 上下文窗口管理。(3)Batch API — 最容易被忽略。任何不要求实时响应的任务(离线评测、批量分析、夜间报告生成)走 batch API 直接省 50%。如果你的 Agent 有离线处理管道,启用 batch 模式几乎没有额外工程成本。优先级建议:先开 prompt caching(今天就能做),再规划上下文压缩(需要设计摘要策略),最后把离线任务迁移到 batch API。
继续阅读
- Agent 可观测性 — 成本是可观测性三驾马车(metrics/traces/logs)之外的第四维度;理解延迟和错误率可观测性是成本可观测性的前提
- Agent 评测框架设计 — 成本的合理性需要与任务质量关联判断;cost-per-quality-point 指标将成本可观测性与评测体系连接起来
- Agent 审计日志设计 — 不可变的审计日志是成本归因的基础数据源,提供了 cross-check 能力;每条 cost record 都能在审计日志中找到对应的操作记录
- Agent 发布 Gate 设计 — 在发布流程中加入成本预算检查;如果新版本的 cost-per-task 比旧版本高出 30%,自动阻止发布
- Agent 回滚设计 — 异常的重试成本激增可能触发自动回滚决策;将 cost spike 检测集成到回滚触发条件中
- Agent 上下文窗口管理 — 上下文压缩和 prompt caching 是最直接有效的降本手段;与本文的价格计算器形成上下游协作
- Agent 工具设计最佳实践 — 工具定义的 token 开销是成本隐藏项;优化工具 schema 能直接减少每次 LLM 调用的固定成本
- 构建模型无关的 AI Agent — 多模型路由是成本感知选模的基础;理解跨模型的通用 Agent 架构才能实施 cost-aware routing
- MCP 协议生产环境实战 — MCP 工具调用的跨进程成本需要统一追踪和归因