Agent 审计日志设计:如何追踪一次工具调用的完整链路
第 5 篇,共 6 篇⚡ 30 秒要点
- Agent 决策链与普通应用有本质区别——LLM 调用不确定、工具调用有副作用、审批链路是异步的,普通应用日志的三列式(时间+级别+消息)完全不够用
- 审计日志需要 5 种事件类型:decision / tool_call / tool_result / approval / error,每条事件携带 8 个通用字段(trace_id、span_id、agent_id 等)+ 5 个事件特定字段(tool_name、approver 等)
- 数据模型用 Pydantic 定义,序列化为 JSON 写入存储,trace_id 复用 OpenTelemetry 的 32 位 hex trace ID,可选 audit_event_id 用 UUID v7 实现时间可排序——为后续的回放、搜索和事故分析打下基础
📖 可引用的定义
Agent 审计日志是一套以 trace_id 为主键的结构化事件记录系统,用于完整追踪 Agent 在一次用户请求中的决策链(LLM 推理 → 工具选择 → 参数构造 → 人工审批 → 执行 → 结果返回)。与普通应用日志的区别在于:它记录的不是「代码做了什么」,而是「LLM 决定了什么、谁批准了、实际执行了什么」——即决策(why)与执行(what)的完整映射。
一、为什么 Agent 需要专门的审计日志
一个凌晨 2:37 的事故
凌晨 2:37,你的生产环境 Agent 服务的监控面板跳出一条告警——一条 DELETE 操作被记录了。没人审批过它,甚至没人醒着。第二天早上,团队花了 2 个小时翻 grep 日志、查数据库表变更、回放对话历史,才勉强拼凑出发生了什么:一位用户白天问了一句「帮我清理一下上周的临时数据」,LLM 在一次多步推理中把「清理」理解成了数据库 DELETE 操作,Agent 照做了——没有审批,没有二次确认,没有留下任何决策痕迹。
如果你的 Agent 有结构化审计日志——带 trace_id、带决策上下文、带完整的工具调用参数和审批链路——这个排查过程只需要 3 分钟:
# 3 分钟定位:通过审计日志搜索 + 关联
# Step 1: 搜索 DELETE 操作(10 秒)
$ audit-log search --event-type tool_call --tool-name delete_records --since "2026-05-21T00:00:00Z"
trace_id: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
agent_id: prod-agent-03
session_id: sess_8f3a2b1c
timestamp: 2026-05-22T02:37:14.231Z
parameters: {"table": "user_data", "filter": "created_at < '2026-05-15'"}
status: success
duration_ms: 847
# Step 2: 沿 trace_id 展开完整决策链(20 秒)
$ audit-log trace 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
[decision] LLM 选择工具: delete_records, 理由: "用户要求清理临时数据"
[approval] ❌ 无审批记录 — 未配置审批流
[tool_call] delete_records → success (847ms, 删除了 12,403 行)
# Step 3: 定位根因 → 审批流缺失(2.5 分钟)
这就是审计日志的核心价值:把不可观测的 LLM 决策链,变成可搜索、可关联、可回放的结构化数据。
Agent 决策链 vs 传统应用:本质差异
为什么普通的应用日志(INFO/WARN/ERROR + message)在 Agent 场景中不够用?因为 Agent 的决策链与传统应用有本质区别:
| 维度 | 传统应用 | AI Agent |
|---|---|---|
| 执行路径 | 确定性代码分支(if/else) | LLM 推理 + 动态工具选择(不确定) |
| 可复现性 | 相同输入 → 相同输出 | 相同输入 → 可能不同输出(temperature / 模型版本) |
| 副作用来源 | 明确的函数调用 | LLM 决定调用哪个工具、传什么参数 |
| 责任归属 | 代码作者 + Code Review | LLM 决策 + 人工审批 + 安全策略 |
| 日志需求 | stack trace + error log | trace_id + decision context + approval chain + tool parameters |
| 事故分析方式 | grep 错误日志 → 定位代码行 | trace_id 展开决策链 → 定位 LLM 决策 + 审批缺失 |
关键区别在于第三行和第四行:传统应用的副作用是代码写死的,Agent 的副作用是 LLM「决定」的。当一个 DELETE 被触发,传统应用中你能定位到 user_service.py:142 的代码行。但在 Agent 中,你需要回答的是:LLM 当时为什么选择了 delete_records 工具?它看到了什么上下文?有没有人审批过?
这三个问题,普通应用日志一个都回答不了。
为什么普通应用日志不够用
即使你已经建立了一套完善的应用日志体系(结构化日志 + ELK/Grafana),它们在 Agent 场景中至少存在三个结构性缺陷:
1. 缺少 trace_id 作为第一公民。在 Agent 场景中,一次用户请求会触发多次 LLM 调用,每次 LLM 调用可能触发多次工具调用。普通应用日志的 trace_id 通常只到请求级别——你知道「这次请求调了哪些服务」,但不知道「LLM 第 2 步推理后选择了工具 A,第 4 步推理后选择了工具 B」。Agent 需要的是请求 → LLM 推理 → 工具调用的三级嵌套追踪,trace_id 必须在每一步推理和每一次工具调用之间传递。
2. 缺少决策上下文。普通日志记录的是「发生了什么」——函数被调用了、数据库查询耗时 50ms。Agent 还需要记录「为什么发生」——LLM 的 tool_choice 理由、当时的 system prompt 摘要、触发审批的原因。这不是「nice to have」,而是事故分析的必需信息。凌晨 2:37 那个事故中,如果日志里有 LLM 的 decision rationale(「用户要求清理临时数据」),根因定位从 2 小时缩到 2 分钟。
3. 缺少审批链路。Agent 的高风险工具调用(DELETE、资金操作、配置变更)需要人工审批。普通日志没有「审批」这个语义——你最多看到一个 API 请求的 200 返回,但不知道这个请求是被审批通过的还是跳过了审批。审计日志需要将审批作为一等事件类型记录下来:谁审批的、什么时候、审批时看到了什么上下文。
审计日志的三个核心用途
理解了 Agent 的特殊性之后,审计日志的价值体现在三个核心场景中:
🔍 合规审计。当你的 Agent 处理用户数据、执行金融操作或涉及合规框架(SOC 2、HIPAA)时,审计员会问一个简单但致命的问题:「你能证明 Agent 的每一次 DELETE 操作都是被授权的吗?」没有审计日志,你的答案是「我们信任 LLM 的输出」——在大多数受监管或企业环境中,这个答案不太可能让审计员满意。结构化的审计追踪可以使授权、审批和事故证据的展示变得容易得多。有了审计日志,你可以拿出完整的 trace:哪次对话、哪个 decision、谁审批的、什么时间执行、操作了什么数据——一条链路,完整可验证。
这就是为什么像 Agent 运行时隔离中讨论的 VM 级隔离在审计中更有说服力——硬件 VM 边界是熟悉且可独立验证的,同样,结构化的审计日志让 Agent 的决策过程从「黑箱」变成了「可审计的白箱」。
🐛 调试排错。Agent 的 Bug 不是 NullPointerException,而是「LLM 不知为什么选了一个不该选的操作」。调试这种 Bug 的传统方法是:复现对话 → 观察 LLM 输出 → 猜测 → 修改 prompt → 再试。审计日志将这个循环从「小时级」缩到「分钟级」:用 trace_id 直接拉出完整的决策链,几步之内定位到 LLM 在哪一步做出了错误决策。
这与 Agent 错误恢复中讨论的错误分类体系直接衔接——审计日志是错误恢复的信息基础。你不知道 Agent 在哪一步犯了错,就无从设计恢复策略。
🔄 回放测试。审计日志记录了完整的工具调用参数和返回值,这使它成为回归测试的天然数据源。你可以:提取历史 trace 中的工具调用序列 → 作为测试用例输入 → 在新的 Agent 版本上重放 → 对比决策是否一致。这就是「golden dataset」的自动化生成——不用手工编写测试用例,直接从生产日志中提取。
这个能力与 Agent 评测框架设计中讨论的评测数据管道直接对应——审计日志为评测框架提供了真实的、生产级的测试数据,而非人工构造的 toy example。
审计日志不是什么
在继续深入之前,明确一下边界:
- 审计日志不是通用的应用日志系统(ELK/Grafana)的替代品——它是应用日志之上的一层结构化语义层
- 审计日志不是评测框架本身——它为评测提供数据,但不做评分和对比(那是评测框架的职责)
- 审计日志不是实时告警系统——它是事后分析、回放和合规的基础设施,实时告警可以建立在审计日志之上但需要额外的处理管道
二、审计日志的核心数据模型
数据模型是审计日志系统的根基。设计得过于简化,事故发生时查不到关键信息;设计得过于复杂,存储成本失控、写入性能受损。本节给出一个经过验证的五事件类型 + 8 个通用字段 + 5 个事件特定字段的最小可用模型。
五种事件类型
Agent 的一次工具调用链路可以分解为五个独立的事件类型,每个类型回答一个不同的问题:
| 事件类型 | 回答的问题 | 触发时机 | 典型字段 |
|---|---|---|---|
| decision | LLM 为什么选择了这个工具? | LLM 返回 tool_choice 后,实际调用前 | tool_name, rationale, prompt_summary, model, temperature |
| tool_call | 工具调用发生了什么? | 工具执行完成后 | tool_name, parameters, result_summary, duration_ms, status |
| tool_result | 工具返回了什么? | 工具返回结果后(可与 tool_call 合并或独立) | tool_name, result, result_size, is_error |
| approval | 谁批准了这次调用? | 人工审批完成后 | approver, approval_context, decision, timestamp |
| error | 什么出错了? | 任何阶段发生错误时 | error_type, error_message, stack_trace, recoverable |
重要设计决策:tool_call 和 tool_result 是否合并?在大多数实现中,tool_result 作为 tool_call 的一个字段(result)出现,而非独立事件。这样做的理由是:一次工具调用天然包含「请求 + 响应」,分开存储增加了关联成本。但在工具调用有显著异步特性的场景(如调用一个需要数秒才返回的外部 API),将 tool_result 独立为事件有助于记录中间状态。本文推荐的默认方案是合并为一个 tool_call 事件,在需要异步追踪时再拆分为 tool_call(启动)+ tool_result(完成)。
每条日志的必填字段
审计事件包含 8 个通用字段(所有事件类型必填)和 5 个事件特定字段(按 event_type 条件必填):
| ▸ 通用字段(8 个——所有事件必填) | ||||
|---|---|---|---|---|
| 字段 | 类型 | 必填 | 说明 | 示例 |
| timestamp | ISO 8601 | ✅ | 事件发生的精确时间(UTC) | 2026-05-22T02:37:14.231Z |
| trace_id | OTel 32-char hex | ✅ | 整个用户请求的唯一标识(OTel trace ID),贯穿所有事件 | 0af7651916cd43dd8448eb211c80319c |
| span_id | 64-bit hex | ✅ | 单次 LLM 推理 span 的唯一标识(用于关联 OTel Span) | a1b2c3d4e5f67890 |
| parent_span_id | 64-bit hex | null | ✅ | 父 span 的 ID(根 span 为 null);Span Event 无独立的 parent_span_id | 0000000000000001 |
| agent_id | string | ✅ | Agent 实例的唯一标识 | prod-agent-03 |
| session_id | string | ✅ | 用户会话的唯一标识(一次对话可能有多个 trace) | sess_8f3a2b1c |
| event_type | enum | ✅ | 事件类型:decision | tool_call | tool_result | approval | error | tool_call |
| status | enum | ✅ | 事件结果:success | failure | pending_approval | rejected | timeout | success |
| ▸ 事件特定字段(5 个——按 event_type 条件必填) | ||||
| 字段 | 类型 | 必填 | 说明 | 示例 |
| tool_name | string | 见说明 | 工具名称。decision / tool_call / tool_result 必填 | delete_records |
| parameters | JSON | 见说明 | 工具调用参数。tool_call 必填,decision 可选(LLM 的 proposed params) | {"table": "user_data", "filter": "..."} |
| result | JSON | null | 见说明 | 工具返回结果(截断/脱敏后)。tool_call / tool_result 必填 | {"deleted_rows": 12403} |
| approver | string | null | 见说明 | 审批人标识。approval 事件必填,其他事件为 null | user_zhang_wei |
| duration_ms | integer | 见说明 | 事件持续时间(毫秒)。tool_call 必填,decision / approval 可选 | 847 |
字段设计遵循两个原则:(1)trace_id + span_id + parent_span_id 三级嵌套足以重建完整的调用树——从用户请求 → 每次 LLM 推理 → 每次工具调用;(2)工具调用的核心对象(tool_name、parameters、result)作为一级字段存储,而非埋在 JSON blob 中——这使得搜索「所有 DELETE 操作」只需一个字段级别的查询,而不需要全文搜索。
这与 Agent 工具设计最佳实践中的工具注册机制直接对应——审计日志中的 tool_name 应该与工具注册表中的名称一致,确保日志可追溯到具体的工具定义。
JSON Schema 与 Pydantic 模型
下面是完整的 Pydantic 模型定义。使用 Pydantic 的好处是:内置类型校验、JSON 序列化/反序列化、自动生成 JSON Schema——这些特性使审计事件的生成和消费都有强类型保障。
from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from uuid import uuid4
from pydantic import BaseModel, Field
class EventType(str, Enum):
"""审计事件类型"""
DECISION = "decision"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
APPROVAL = "approval"
ERROR = "error"
class EventStatus(str, Enum):
"""事件执行状态"""
SUCCESS = "success"
FAILURE = "failure"
PENDING_APPROVAL = "pending_approval"
REJECTED = "rejected"
TIMEOUT = "timeout"
class AuditEvent(BaseModel):
"""Agent 审计日志的通用事件模型。
所有事件共享 8 个通用字段,另有 5 个按 event_type 条件必填的字段。
详见下方 validators。
"""
# ── 时间与追踪 ──
timestamp: str = Field(
default_factory=lambda: datetime.now(timezone.utc).isoformat(),
description="事件发生时间(ISO 8601, UTC)",
examples=["2026-05-22T02:37:14.231Z"],
)
trace_id: str = Field(
description="OTel trace ID(32-char hex),由 OTel SDK 生成,贯穿所有事件",
examples=["0af7651916cd43dd8448eb211c80319c"],
)
audit_event_id: Optional[str] = Field(
default=None,
description="审计事件唯一标识(UUID v7,时间可排序,用于业务查询)",
examples=["0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67"],
)
span_id: str = Field(
description="当前 span 的唯一标识(64-bit hex)",
examples=["a1b2c3d4e5f67890"],
)
parent_span_id: Optional[str] = Field(
default=None,
description="父 span ID。根 span 为 null",
examples=["0000000000000001"],
)
# ── 身份与会话 ──
agent_id: str = Field(
description="Agent 实例标识",
examples=["prod-agent-03"],
)
session_id: str = Field(
description="用户会话标识",
examples=["sess_8f3a2b1c"],
)
# ── 事件核心 ──
event_type: EventType = Field(description="事件类型")
status: EventStatus = Field(description="事件执行状态")
# ── 工具调用相关(按 event_type 条件必填)──
tool_name: Optional[str] = Field(
default=None,
description="工具名称。decision/tool_call/tool_result 必填",
examples=["delete_records"],
)
parameters: Optional[dict[str, Any]] = Field(
default=None,
description="工具调用参数(JSON)。tool_call 必填,decision 可选",
examples=[{"table": "user_data", "filter": "created_at < '2026-05-15'"}],
)
result: Optional[dict[str, Any]] = Field(
default=None,
description="工具返回结果(截断/脱敏后)。tool_call/tool_result 必填",
examples=[{"deleted_rows": 12403}],
)
# ── 审批与错误 ──
approver: Optional[str] = Field(
default=None,
description="审批人标识。approval 事件必填",
examples=["user_zhang_wei"],
)
error_message: Optional[str] = Field(
default=None,
description="错误信息。error 事件必填",
)
error_type: Optional[str] = Field(
default=None,
description="错误类型(如 TimeoutError, ValidationError)",
)
# ── 性能与上下文 ──
duration_ms: int = Field(
default=0,
ge=0,
description="事件持续时间(毫秒)",
examples=[847],
)
metadata: dict[str, Any] = Field(
default_factory=dict,
description="扩展元数据(model, temperature, prompt_summary 等)",
)
# ── 序列化与工厂方法 ──
def to_json(self, indent: int | None = None) -> str:
"""序列化为 JSON 字符串。"""
return self.model_dump_json(indent=indent, exclude_none=True)
@classmethod
def from_json(cls, data: str) -> "AuditEvent":
"""从 JSON 字符串反序列化。"""
return cls.model_validate_json(data)
@classmethod
def new_audit_event_id(cls) -> str:
"""生成 UUID v7 作为 audit_event_id(时间可排序,用于业务查询)。
实际项目应使用 uuid6 或 uuid7 库。
这里给出 UUID v4 作为可运行的简化版本。
trace_id 由 OTel SDK 生成(32-char hex),由 Agent 框架注入。
"""
return str(uuid4())
# ── 使用示例 ──
# 创建一个 tool_call 事件(trace_id 由 OTel SDK 注入)
event = AuditEvent(
trace_id="0af7651916cd43dd8448eb211c80319c", # OTel trace ID (32-char hex)
audit_event_id=AuditEvent.new_audit_event_id(), # UUID v7, for business queries
span_id="a1b2c3d4e5f67890",
parent_span_id="0000000000000001",
agent_id="prod-agent-03",
session_id="sess_8f3a2b1c",
event_type=EventType.TOOL_CALL,
status=EventStatus.SUCCESS,
tool_name="delete_records",
parameters={"table": "user_data", "filter": "created_at < '2026-05-15'"},
result={"deleted_rows": 12403},
duration_ms=847,
)
# 也可以直接用 OTel SDK 获取当前 trace_id:
# from opentelemetry import trace
# trace_id = format(span.get_span_context().trace_id, '032x')
# 序列化
json_str = event.to_json(indent=2)
print(json_str)
# 反序列化
restored = AuditEvent.from_json(json_str)
assert restored.tool_name == "delete_records"
assert restored.status == EventStatus.SUCCESS
模型设计的几个关键决策点:
1. timestamp 用字符串而非 datetime 对象。存储 ISO 8601 UTC 字符串可避免跨序列化器和下游存储系统的歧义。如果使用 datetime 对象,请显式配置序列化行为。
2. trace_id 复用 OTel trace ID。使用 OpenTelemetry SDK 生成的 32-char hex trace_id 作为审计日志的 trace_id,确保与 Jaeger/Tempo 中的 trace 完全一致,可跨系统关联。UUID v7 仍可作为可选的 audit_event_id(时间可排序,用于业务查询和分区)。UUID v7 的前 48 位是毫秒级时间戳,提升了写入局部性并使 ID 大致按时间可排序,但 timestamp 仍然是分区、保留策略和查询过滤的一等字段。生产环境生成 UUID v7 建议使用 uuid6 或 uuid7 Python 库。
3. Pydantic 条件必填校验。上述 AuditEvent 模型是存储 schema——它声明了所有字段供序列化使用。生产代码应为特定 event_type 添加条件校验。以下 model_validator 示例展示了核心模式:
from pydantic import model_validator
class AuditEvent(BaseModel):
# ... 字段定义同上 ...
@model_validator(mode="after")
def validate_conditional_fields(self):
"""根据 event_type 校验条件必填字段。"""
et = self.event_type
if et in (EventType.DECISION, EventType.TOOL_CALL, EventType.TOOL_RESULT):
if not self.tool_name:
raise ValueError(f"{et.value} 事件必须提供 tool_name")
if et == EventType.TOOL_CALL:
if self.parameters is None:
raise ValueError("tool_call 事件必须提供 parameters")
if self.duration_ms is None:
raise ValueError("tool_call 事件必须提供 duration_ms")
if et == EventType.APPROVAL:
if not self.approver:
raise ValueError("approval 事件必须提供 approver")
if et == EventType.ERROR:
if not self.metadata.get("error_type"):
raise ValueError("error 事件必须在 metadata 中提供 error_type")
return self
4. result 和 parameters 需要截断和脱敏。代码示例中的 result 和 parameters 是完整值,但在实际写入日志存储之前需要经过脱敏管道。下面的代码展示了脱敏的集成方式:
import re
from typing import Any
# 敏感字段模式(需要脱敏的 key 名)
_SENSITIVE_KEY_PATTERNS = re.compile(
r"(api_key|token|password|secret|credential|auth)",
re.IGNORECASE,
)
# 结果截断的最大字符数
_MAX_RESULT_LENGTH = 1024
def sanitize_parameters(params: dict[str, Any]) -> dict[str, Any]:
"""对工具调用参数进行字段级脱敏。"""
sanitized = {}
for key, value in params.items():
if _SENSITIVE_KEY_PATTERNS.search(key):
sanitized[key] = "REDACTED"
elif isinstance(value, dict):
sanitized[key] = sanitize_parameters(value)
elif isinstance(value, list):
sanitized[key] = [
sanitize_parameters(v) if isinstance(v, dict) else v
for v in value
]
else:
sanitized[key] = value
return sanitized
def truncate_result(result: Any, max_length: int = _MAX_RESULT_LENGTH) -> Any:
"""截断过长的工具返回结果。"""
if isinstance(result, str) and len(result) > max_length:
return result[:max_length] + f"... [truncated, total {len(result)} chars]"
if isinstance(result, dict):
return {k: truncate_result(v, max_length) for k, v in result.items()}
if isinstance(result, list):
return [truncate_result(v, max_length) for v in result[:10]]
return result
# 使用示例:在写入日志前脱敏
raw_params = {"api_key": "sk-abc123", "table": "user_data"}
safe_params = sanitize_parameters(raw_params)
# → {"api_key": "REDACTED", "table": "user_data"}
raw_result = "x" * 5000
safe_result = truncate_result(raw_result)
# → "xxxx...xxxx [truncated, total 5000 chars]"
这个脱敏管道应该在事件对象构建之后、写入存储之前执行。不要试图在工具调用层面做脱敏——工具返回的是业务需要的完整数据,审计日志的脱敏是审计层的职责。
与 MCP 协议的集成点
如果你的 Agent 使用 MCP 协议(Model Context Protocol)来管理工具调用,审计日志的事件模型可以直接映射到 MCP 的调用生命周期:MCP 的 tools/call 请求对应 audit 的 tool_call 事件,MCP 的 tools/list 可以提供当前可用工具的注册信息作为审计日志的参考元数据。
一个推荐的集成模式是:在 MCP server 的工具调用处理器中嵌入审计日志钩子,使得每一次 MCP 工具调用都自动产生一条审计事件——无需 Agent 应用层额外编码。
三、Trace ID 与分布式追踪
Section 2 给出了审计日志的数据模型,但模型中的 trace_id 和 span_id 字段只有在正确生成和传播时才有意义。本节深入这两个字段的工程细节:生成策略怎么选、Span 如何嵌套、以及如何用 OpenTelemetry 在 Python 中实现 trace context 的全程传递。
Trace ID 的生成策略对比
trace_id 的生成看似简单——调个 UUID 库就行——但选择错误的策略会导致分布式追踪断裂或存储效率下降。推荐模型:OTel-native。trace_id 使用 OpenTelemetry SDK 生成的 32-char hex trace ID(与 Jaeger/Tempo 完全兼容),同时用 UUID v7 作为可选的 audit_event_id(时间可排序,用于业务查询和分区)。
| 策略 | 结构 | 长度 | 时间可排序 | 全局唯一 | 冲突概率 | 适用场景 |
|---|---|---|---|---|---|---|
| OTel trace_id | 128-bit,32 位 hex 字符 | 32 字符 | ❌ 不可排序(随机的) | ✅ OTel SDK 保证唯一 | 10-36 | trace_id 的推荐方案——兼容 Jaeger/Tempo |
| UUID v7 | 前 48 位:毫秒时间戳 后 74 位:随机数 | 36 字符 | ✅ 时间可排序 | ✅ 极低冲突 | 10-17 | audit_event_id / correlation_id——用于业务查询和分区 |
| Snowflake | 41 位时间戳 + 10 位机器 ID + 12 位序列号 | 19 位整数 | ✅ 时间可排序 | ⚠️ 依赖机器 ID 唯一性 | 0(单机) | 超高吞吐、需要整数 ID 的场景 |
为什么推荐 OTel-native 模型:
1. 跨系统兼容。使用 OTel trace_id 可确保审计日志中的 trace_id 与 Jaeger、Grafana Tempo、ClickHouse 等后端中的 trace 完全一致——无需维护 ID 映射表。从 Jaeger 中发现问题 trace → 复制 trace_id → 在审计日志存储中直接检索——一步完成。
2. UUID v7 作为 audit_event_id 的补充价值。每条审计事件仍然需要一个独立的时间可排序 ID。UUID v7 的前 48 位是毫秒级时间戳,这意味着按 audit_event_id 排序的结果天然按时间排列——提升了存储的写入局部性。UUID v7 去中心化生成,无需协调,在多区域部署中优势明显。
3. 如果未使用 OTel。如果团队没有 OTel 基础设施,UUID v7 可作为自定义 trace_id 使用——它本身是一个很好的时间可排序相关 ID。但一旦后续引入 OTel,建议迁移到 OTel-native 模型。
下面的 Python 代码展示了 UUID v7 的生成和解析——用于 audit_event_id(每条审计事件的唯一 ID):
"""UUID v7 生成与解析示例。
用于生成 audit_event_id——每条审计事件的唯一时间可排序 ID。
实际生产环境推荐使用 'uuid7' 库(pip install uuid7)。
以下代码展示了核心原理——前 48 位毫秒时间戳的提取和验证。
"""
import time
import os
def generate_uuid_v7() -> str:
"""生成 UUID v7(用于 audit_event_id)。
格式: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
^^^^^^^^ ^^^^ 毫秒时间戳(前 48 位)
生产环境: pip install uuid7 → uuid7.uuid7()
"""
# 注意:这是原理演示,生产环境请使用标准库
try:
from uuid_extensions import uuid7 # 第三方库
return str(uuid7())
except ImportError:
# 回退方案:手动构造(仅用于演示理解结构)
ts = int(time.time() * 1000) # 毫秒时间戳
rand = os.urandom(10) # 74 位随机数
ts_hex = f"{ts:012x}" # 48 位 = 12 个 hex 字符
rand_hex = rand.hex()[:20] # 取前 20 个 hex 字符
# 格式化为标准 UUID v7(注入版本号和变体标记)
raw = ts_hex + rand_hex
return (
f"{raw[0:8]}-{raw[8:12]}-7{raw[13:16]}-"
f"{'8' if int(raw[16], 16) >= 8 else raw[16]}{raw[17:20]}-{raw[20:32]}"
)
def extract_timestamp_from_uuidv7(uuid_str: str) -> int:
"""从 UUID v7 中提取毫秒时间戳。
可用于:日志排序、时间范围查询、存储分区。
"""
# UUID v7: 前 8+4=12 个 hex 字符 = 48 位时间戳
ts_hex = uuid_str[:8] + uuid_str[9:13]
return int(ts_hex, 16)
# ── 使用示例 ──
audit_event_id = generate_uuid_v7()
print(f"Audit Event ID: {audit_event_id}")
# → Audit Event ID: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
ts = extract_timestamp_from_uuidv7(audit_event_id)
print(f"Timestamp: {ts} ms since epoch")
# → Timestamp: 1747888634231 ms since epoch
print(f"Readable: {time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(ts/1000))}")
# → Readable: 2026-05-22T02:37:14
UUID v7 提升了写入局部性并使 audit_event_id 大致按时间可排序,但 timestamp 仍然是分区、保留策略和查询过滤的一等字段——不要完全依赖 ID 排序替代时间索引。
Span 嵌套模型:三级调用树
有了 trace_id 之后,下一个问题是:如何用它组织调用树?Agent 的一次用户请求天然具备三级嵌套结构:
用户请求 (trace_id, root span)
│
├── LLM 推理 #1 (span_id=a1, parent_span_id=null)
│ ├── decision: 选择工具 search_docs
│ ├── tool_call: search_docs (span_id=a2, parent_span_id=a1)
│ │ └── tool_result: {"documents": [...]}
│ └── LLM 收到结果,继续推理
│
├── LLM 推理 #2 (span_id=a3, parent_span_id=null ← 同为 root 的子节点)
│ ├── decision: 选择工具 delete_records(需要审批)
│ ├── approval: user_zhang_wei 审批通过 (span_id=a4, parent_span_id=a3)
│ └── tool_call: delete_records (span_id=a5, parent_span_id=a3)
│ └── tool_result: {"deleted_rows": 12403}
│
└── LLM 最终回复给用户
这个三级结构映射到 Span 字段上:
| 层级 | span_id | parent_span_id | 对应事件类型 | 示例 |
|---|---|---|---|---|
| L1: 用户请求 | root span(如 0000000000000001) | null | —(trace 容器,不产生事件) | 用户发送「帮我清理临时数据」 |
| L2: LLM 推理 | 推理 span(如 a1b2c3d4e5f67891) | 0000000000000001 | decision | LLM 选择 delete_records,理由是「用户要求清理」 |
| L3: 工具调用/审批 | 工具 span(如 f1e2d3c4b5a67892) | a1b2c3d4e5f67891 | tool_call / approval | 实际执行 delete_records,或审批链路 |
关键设计决策:LLM 推理 Span 之间的兄弟关系。在上面的示例中,LLM 推理 #1 和 #2 的 parent_span_id 都是 null(或指向同一个 root span)——它们是兄弟关系,而非父子关系。为什么不是嵌套?因为每次 LLM 推理是顺序执行的:推理 #1 完成后,Agent 框架收到工具返回值,再进行推理 #2。它们是同一请求中的两个独立步骤,不是嵌套调用的关系。把它们设为兄弟关系保留了时间线的线性特征,也更利于在追踪 UI 中按时间顺序查看。
相比之下,工具调用的 Span 是 LLM 推理 Span 的子节点——因为工具调用是由那次推理触发的,它是对推理决策的执行,天然具有父子关系。
与 OpenTelemetry 的集成
OpenTelemetry(OTel)已经是分布式追踪的事实标准。Agent 审计日志不应该重新发明 trace context 传递机制——而是应该在 OTel 的基础上增加 Agent 特有的语义层。集成方案如下:
架构层次:
┌─────────────────────────────────────────┐
│ Agent 审计事件层 │
│ (AuditEvent: decision, tool_call, ...) │ ← 本文的核心
├─────────────────────────────────────────┤
│ OpenTelemetry Span 层 │
│ (trace_id, span_id, parent_span_id) │ ← 复用 OTel 基础设施
├─────────────────────────────────────────┤
│ OTel SDK / Exporter │
│ (OTLP → Jaeger / Tempo / 自定义后端) │ ← 标准导出管道
└─────────────────────────────────────────┘
具体来说:
- 复用 OTel 的 trace_id 和 span_id。审计事件的 trace_id 和 span_id 直接取自 OTel Span 的 context。这样可以确保审计日志和 OTel Span 追踪使用的是同一套标识符——在 Jaeger 中看到的 Span 和在审计日志存储中查到的 event 是同一个 ID。
- 用 Span Events 承载审计事件。每一次 decision、tool_call、approval 作为 OTel Span 的一个 Span Event 写入,携带完整的审计字段(tool_name、parameters、result、approver 等)作为 event attributes。
- 双写模式。同时将审计事件写入 OTel Span(用于可视化追踪)和独立的审计日志存储(用于合规检索和回放)。双写的原因:OTel 后端和导出器通常对属性、事件或 payload 大小有限制,而工具返回值和完整的参数 JSON 可能超过这些限制。应将大体积的工具参数和返回值存储在专用的审计日志后端中,Span 只携带 ID、摘要和少量关键属性。
代码实现:Python OTel SDK 集成
下面的代码展示了 Agent 审计日志与 OpenTelemetry 的完整集成——从初始化、到 Span 创建、到 trace_id 在工具调用中的全程传递。
Step 1: OpenTelemetry 初始化配置
"""OpenTelemetry 初始化模块。
在 Agent 服务启动时调用一次 init_otel(),
配置 TracerProvider、Span Processor 和 Exporter。
"""
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
def init_otel(
service_name: str = "agent-service",
otlp_endpoint: str = "http://localhost:4317",
console_debug: bool = False,
) -> trace.TracerProvider:
"""初始化 OpenTelemetry SDK。
Args:
service_name: 服务名称,会出现在所有 Span 的 resource 属性中。
otlp_endpoint: OTLP Collector 的 gRPC 地址。
console_debug: 开发环境启用 Console Exporter 便于调试。
Returns:
配置完成的 TracerProvider 实例。
生产环境最佳实践:
- 使用 OTLPSpanExporter 导出到 Jaeger / Grafana Tempo
- 使用 BatchSpanProcessor 避免每个 Span 都触发网络请求
- Resource 中附加 service.version、deployment.environment 用于区分环境
"""
# 创建 Resource:标识服务身份
resource = Resource.create({
SERVICE_NAME: service_name,
"service.version": "2.3.1",
"deployment.environment": "production",
})
# 创建 TracerProvider 并绑定 Resource
provider = TracerProvider(resource=resource)
# OTLP gRPC Exporter → Jaeger / Tempo / 任何 OTLP 兼容后端
otlp_exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
insecure=True, # 生产环境应使用 TLS
)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
# 开发环境:同时输出到控制台
if console_debug:
provider.add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter())
)
# 设置为全局 TracerProvider
trace.set_tracer_provider(provider)
return provider
# ── 获取 Tracer ──
def get_tracer(name: str = __name__) -> trace.Tracer:
"""获取 OpenTelemetry Tracer 实例。"""
return trace.get_tracer(name)
# ── 使用示例 ──
if __name__ == "__main__":
init_otel(console_debug=True)
tracer = get_tracer("agent-audit-demo")
print(f"OTel initialized. Tracer: {tracer}")
# → OTel initialized. Tracer: <opentelemetry.sdk.trace.Tracer ...>
Step 2: 自定义 Span 创建与审计属性注入
"""Agent 审计专用 Span 工具。
在 OTel Span 的基础上注入 Agent 特有的审计语义——
decision、tool_call、approval 每种事件对应一个 Span,
审计字段作为 Span Attributes 写入。
"""
from __future__ import annotations
import json
import time
from contextlib import contextmanager
from typing import Any, Optional
from opentelemetry import trace
from opentelemetry.trace import Span, SpanKind, Status, StatusCode
# ── 全局 Tracer ──
tracer = trace.get_tracer("agent-audit")
# ── 常量化审计属性 Key ──
class AuditAttr:
"""审计日志的 OTel Span 属性 Key 常量。
使用常量避免字符串硬编码导致的不一致。
"""
AGENT_ID = "agent.id"
SESSION_ID = "agent.session_id"
EVENT_TYPE = "agent.event_type"
TOOL_NAME = "agent.tool_name"
PARAMETERS = "agent.parameters"
RESULT = "agent.result"
RESULT_SIZE = "agent.result_size"
APPROVER = "agent.approver"
APPROVAL_CONTEXT = "agent.approval_context"
RATIONALE = "agent.rationale"
MODEL = "agent.model"
TEMPERATURE = "agent.temperature"
PROMPT_SUMMARY = "agent.prompt_summary"
DURATION_MS = "agent.duration_ms"
STATUS = "agent.status"
ERROR_TYPE = "agent.error_type"
ERROR_MESSAGE = "agent.error_message"
# ── Span 创建工厂 ──
def create_llm_span(
agent_id: str,
session_id: str,
model: str,
temperature: float,
prompt_summary: str = "",
parent_span: Optional[Span] = None,
) -> Span:
"""创建一个 LLM 推理 Span。
在 Agent 每次调用 LLM 时创建,作为 tool_call Span 的父 Span。
"""
ctx = trace.set_span_in_context(parent_span) if parent_span else None
span = tracer.start_span(
"agent.llm.reasoning",
context=ctx,
kind=SpanKind.INTERNAL,
attributes={
AuditAttr.AGENT_ID: agent_id,
AuditAttr.SESSION_ID: session_id,
AuditAttr.MODEL: model,
AuditAttr.TEMPERATURE: temperature,
AuditAttr.PROMPT_SUMMARY: prompt_summary,
},
)
return span
def record_decision(
span: Span,
tool_name: str,
rationale: str,
proposed_params: Optional[dict[str, Any]] = None,
):
"""在 LLM Span 上记录 decision 事件。
调用时机:LLM 返回 tool_choice 后、实际执行工具前。
Args:
span: 当前的 LLM 推理 Span。
tool_name: LLM 选择的工具名称。
rationale: LLM 的决策理由(从 tool_choice 中提取)。
proposed_params: LLM 提议的工具参数。
"""
attrs = {
AuditAttr.EVENT_TYPE: "decision",
AuditAttr.TOOL_NAME: tool_name,
AuditAttr.RATIONALE: rationale,
}
if proposed_params:
attrs[AuditAttr.PARAMETERS] = json.dumps(proposed_params, ensure_ascii=False)
span.add_event("agent.decision", attributes=attrs)
def record_tool_call(
span: Span,
tool_name: str,
parameters: dict[str, Any],
result: Any,
duration_ms: int,
status: str,
error_type: Optional[str] = None,
error_message: Optional[str] = None,
):
"""在 Span 上记录 tool_call 事件(合并 tool_result)。
调用时机:工具执行完成后。
Args:
span: 父 LLM 推理 Span(tool_call 作为其 Event 而非独立 Span)。
如需独立 Span,将 tool_call 创建为 span 的子 Span。
tool_name: 工具名称。
parameters: 实际执行的参数(已脱敏)。
result: 工具返回结果(已截断)。
duration_ms: 工具执行耗时(毫秒)。
status: success | failure | timeout。
"""
attrs = {
AuditAttr.EVENT_TYPE: "tool_call",
AuditAttr.TOOL_NAME: tool_name,
AuditAttr.PARAMETERS: json.dumps(parameters, ensure_ascii=False),
AuditAttr.RESULT: json.dumps(result, ensure_ascii=False),
AuditAttr.DURATION_MS: str(duration_ms),
AuditAttr.STATUS: status,
}
if error_type:
attrs[AuditAttr.ERROR_TYPE] = error_type
if error_message:
attrs[AuditAttr.ERROR_MESSAGE] = error_message
span.add_event("agent.tool_call", attributes=attrs)
def record_approval(
span: Span,
tool_name: str,
approver: str,
decision: str,
approval_context: str = "",
):
"""在 Span 上记录 approval 事件。
调用时机:人工审批完成后。
Args:
span: 当前的 LLM 推理 Span。
tool_name: 需要审批的工具名称。
approver: 审批人标识。
decision: approved | rejected。
approval_context: 审批时的上下文摘要。
"""
attrs = {
AuditAttr.EVENT_TYPE: "approval",
AuditAttr.TOOL_NAME: tool_name,
AuditAttr.APPROVER: approver,
AuditAttr.STATUS: decision,
}
if approval_context:
attrs[AuditAttr.APPROVAL_CONTEXT] = approval_context
span.add_event("agent.approval", attributes=attrs)
# ── 上下文管理器:自动 Span 生命周期管理 ──
@contextmanager
def agent_trace(
agent_id: str,
session_id: str,
model: str = "claude-sonnet-4-20250514",
temperature: float = 0.0,
):
"""Agent 请求的顶层 trace 上下文管理器。
用法:
with agent_trace("prod-agent-03", "sess_8f3a2b1c") as span:
trace_id = span.get_span_context().trace_id
# ... Agent 推理循环 ...
record_decision(span, "search_docs", "用户要求查询文档")
"""
span = tracer.start_span(
"agent.request",
kind=SpanKind.SERVER,
attributes={
AuditAttr.AGENT_ID: agent_id,
AuditAttr.SESSION_ID: session_id,
AuditAttr.MODEL: model,
AuditAttr.TEMPERATURE: temperature,
},
)
try:
yield span
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
finally:
span.end()
Step 3: trace_id 全程传递——完整 Agent 推理示例
"""完整的 Agent 推理循环示例。
展示 trace_id 从 Agent 入口 → LLM 推理 → 工具调用 → 审批的全程传递。
每个步骤都产生对应的审计事件,并写入 OTel Span。
"""
import json
import time
from opentelemetry.trace import format_trace_id
def agent_reasoning_loop(
agent_id: str,
session_id: str,
user_message: str,
):
"""Agent 一次完整的推理循环。
模拟:用户请求 → LLM 推理 #1 → 工具调用 → LLM 推理 #2 → 审批 → 工具调用
trace_id 在 with agent_trace() 内部自动生成和传播——
所有 Span 和 Span Event 共享同一个 trace_id。
"""
with agent_trace(agent_id, session_id) as root_span:
# 获取 trace_id(16 字节 → 32 hex 字符)
trace_id_bytes = root_span.get_span_context().trace_id
trace_id_hex = format_trace_id(trace_id_bytes)
print(f"\n═══ Trace: {trace_id_hex} ═══")
print(f"User: {user_message}\n")
# ── LLM 推理 #1:选择工具 ──
llm_span_1 = create_llm_span(
agent_id=agent_id,
session_id=session_id,
model="claude-sonnet-4-20250514",
temperature=0.0,
prompt_summary="用户要求搜索产品文档",
parent_span=root_span,
)
# 模拟 LLM 决策
decision_start = time.time()
tool_choice = "search_docs"
rationale = "用户询问产品功能,需要搜索相关文档"
proposed_params = {"query": "产品功能介绍", "limit": 5}
time.sleep(0.3) # 模拟 LLM 推理耗时
decision_duration = int((time.time() - decision_start) * 1000)
record_decision(llm_span_1, tool_choice, rationale, proposed_params)
print(f" [decision] → {tool_choice}: {rationale} ({decision_duration}ms)")
# ── 工具调用 #1:search_docs ──
tool_start = time.time()
# 模拟工具执行
tool_result = {
"documents": [
{"id": "doc_001", "title": "产品功能介绍", "relevance": 0.95},
{"id": "doc_002", "title": "API 使用指南", "relevance": 0.87},
],
"total": 2,
}
time.sleep(0.15) # 模拟工具执行耗时
tool_duration = int((time.time() - tool_start) * 1000)
record_tool_call(
llm_span_1,
tool_name="search_docs",
parameters=proposed_params,
result=tool_result,
duration_ms=tool_duration,
status="success",
)
print(f" [tool_call] search_docs → success ({tool_duration}ms)")
llm_span_1.end()
# ── LLM 推理 #2:高风险操作,需审批 ──
llm_span_2 = create_llm_span(
agent_id=agent_id,
session_id=session_id,
model="claude-sonnet-4-20250514",
temperature=0.0,
prompt_summary="用户要求清理临时数据",
parent_span=root_span,
)
decision_start = time.time()
tool_choice_2 = "delete_records"
rationale_2 = "用户明确要求'清理临时数据',匹配到 delete_records 工具"
proposed_params_2 = {
"table": "user_data",
"filter": "created_at < '2026-05-15'",
}
time.sleep(0.25)
decision_duration_2 = int((time.time() - decision_start) * 1000)
record_decision(llm_span_2, tool_choice_2, rationale_2, proposed_params_2)
print(f" [decision] → {tool_choice_2}: {rationale_2} ({decision_duration_2}ms)")
# ── 审批环节 ──
print(f" ⏳ 等待审批: {tool_choice_2}...")
time.sleep(0.5) # 模拟审批等待
record_approval(
llm_span_2,
tool_name=tool_choice_2,
approver="user_zhang_wei",
decision="approved",
approval_context="确认删除的是 5 月 15 日前的临时数据,共 12,403 行",
)
print(f" [approval] → approved by user_zhang_wei")
# ── 工具调用 #2:delete_records ──
tool_start = time.time()
# 模拟工具执行
tool_result_2 = {
"deleted_rows": 12403,
"affected_tables": ["user_data"],
"duration_ms": 847,
}
time.sleep(0.2)
tool_duration_2 = int((time.time() - tool_start) * 1000)
record_tool_call(
llm_span_2,
tool_name=tool_choice_2,
parameters=proposed_params_2,
result=tool_result_2,
duration_ms=tool_duration_2,
status="success",
)
print(f" [tool_call] {tool_choice_2} → success ({tool_duration_2}ms)")
llm_span_2.end()
print(f"\n ✅ Agent 响应: 已清理 12,403 条临时数据记录。")
# 退出 with 块 → root_span.end() 自动执行
# BatchSpanProcessor 在后台将 Span 导出到 OTLP Collector
print(f"\n 📤 Trace {trace_id_hex} 已提交到 OTLP Collector")
# ── 运行示例 ──
if __name__ == "__main__":
# 初始化 OTel(开发模式:控制台输出)
init_otel(console_debug=True)
# 模拟一次完整的 Agent 请求
agent_reasoning_loop(
agent_id="prod-agent-03",
session_id="sess_8f3a2b1c",
user_message="帮我搜索产品功能介绍,然后清理上周的临时数据",
)
代码的三个关键设计点:
1. Span 的生命周期管理。agent_trace() 上下文管理器保证 root Span 一定会被 end()——即使 Agent 在执行过程中抛出异常。这是 OTel SDK 的最佳实践,避免 Span 泄漏导致追踪数据丢失。
2. Span Event vs Child Span 模型选择。两种建模方式各有适用场景:
| 维度 | Span Events 模型 | Child Span 模型 |
|---|---|---|
| 实现方式 | span.add_event("agent.decision", ...) 记录在 LLM span 上 | tracer.start_span("tool_call", ...) 创建独立子 span |
| span_id | 共享父 LLM span 的 span_id Span Event 无独立 span_id | 每个事件有独立 span_id / parent_span_id |
| Cardinality | 低——挂在一个 span 下 | 高——每个工具调用产生一个独立 span |
| Jaeger 可读性 | 好——不会 span 膨胀 | 差——一个 200ms 推理产生多个 span,层级过深 |
| 回放树重建 | 需要从 event attributes 提取 | 天然有清晰的父子关系 |
| 推荐场景 | decision / approval——决策和审批 | 长时间工具调用(如 > 5s 的外部 API) |
本文代码采用 Span Events 模型:decision、tool_call、approval 作为 LLM 推理 Span 的 add_event()。一次 LLM 推理可能在 200ms 内产生三个事件——如果每个都作为独立 Span,会在 Jaeger 中产生难以阅读的层级膨胀。Span Event 保持了追踪树的可读性,同时保留了完整的审计字段。对于长时间运行的工具调用(如调用外部 API 耗时 > 5s),可创建独立的 Child Span 以更好地展示调用树。
3. trace_id 的提取。format_trace_id() 将 OTel SDK 内部的 16 字节 trace_id 转换为标准的 32 字符 hex 字符串——这个字符串就是你写入 AuditEvent.trace_id 的值。它确保审计日志存储中的 trace_id 和 Jaeger/Tempo 中的 trace_id 完全一致,可以跨系统关联。
自建 trace vs OTel vs 商业方案:开销对比
最后,一个实际工程决策:trace 基础设施是自己从头建、用 OpenTelemetry 开源方案、还是采购商业平台?
| 维度 | 自建 trace | OpenTelemetry 开源 | 商业方案 (Datadog/LangSmith) |
|---|---|---|---|
| 初始投入 | 高——需要实现 trace ID 生成、传播、收集、存储、查询全链路 | 中——OTel SDK 开箱即用,但需自建 Jaeger/Tempo 后端 | 低——SDK 接入即可,后端托管 |
| 维护成本 | 极高——自己修的 Bug 自己扛,无社区支持 | 中——Jaeger/Tempo 的运维(存储、扩容、升级),社区活跃 | 低——运维外包,但费用随规模线性增长 |
| Agent 审计语义 | 完全灵活——想加什么字段加什么 | 需要自定义——Span Attributes + Span Events 承载审计字段,需自行设计 Key 规范 | 部分支持——LangSmith 原生支持 LLM tracing,Datadog 的 LLM Observability 有专门的 Span 类型 |
| 存储成本 | 取决于自建方案(PostgreSQL/ClickHouse) | 取决于后端选择(Elasticsearch 贵,ClickHouse 便宜) | 按 Span 数量或数据量计费,高吞吐下成本飙升 |
| 查询能力 | 完全自定义 | Jaeger UI 适合追踪可视化;复杂审计查询需要额外工具(如直接查存储) | 内置丰富的查询和可视化,但受限于平台能力边界 |
| 合规与数据主权 | 完全受控——数据不出自己的 VPC | 完全受控——开源方案私有化部署 | 受限于——数据存储在第三方平台,需评估 SOC 2 / GDPR 合规性 |
| 推荐场景 | 不推荐——除非有特殊合规需求且团队 > 10 人 | 推荐——大部分 Agent 团队的最佳平衡点 | 早期快速验证或需要 Turnkey 方案的大型团队 |
推荐路径:对于大多数构建生产级 Agent 的团队,OpenTelemetry 开源方案是最佳平衡点。它提供了标准化的 trace_id/span_id 生成和传播,生态成熟(Jaeger、Grafana Tempo、ClickHouse 后端均有现成集成),同时保留了数据的完全控制权。在此基础之上,本文描述的审计事件模型(Section 2 的 Pydantic AuditEvent + Section 3 的 Span Event 注入)作为 OTel 之上的语义扩展层——既复用了分布式追踪的基础设施,又填补了 LLM 决策审计的语义空白。
商业方案(LangSmith、Datadog LLM Observability)在早期原型阶段或团队缺乏基础设施运维能力时具有吸引力,但需仔细评估长期成本——一个中等规模的 Agent 系统每天可能产生数百万个 Span,商业平台的按量计费模型在规模化后可能成为意外的预算黑洞。此外,审计日志的合规要求(数据不能离开自有基础设施)在很多行业(金融、医疗)中是硬性约束,商业平台的 SaaS 部署模式天然不满足。
自建方案在 2026 年几乎不应该被考虑——OTel 已经成为行业标准,重新实现 trace context 传播相当于重新发明 TCP。唯一的例外是极端合规场景(如军方或情报系统),但这些场景不在本文的讨论范围内。
四、日志存储与保留策略
数据模型和 trace_id 基础设施就绪之后,下一个必须回答的工程问题是:审计日志写到哪里、存多久、花多少钱?与普通应用日志不同,审计日志包含结构化的工具参数和返回值,写入量和查询模式有独特特征——不是简单的「接个 ELK 就行」。本节从存储方案选型、分层保留、体积估算、脱敏写入四个维度给出生产级方案。
存储方案对比
Agent 审计日志的存储需求有几个关键约束:(1)需要支持对 tool_name、parameters 等 JSON 字段的结构化查询;(2)写入吞吐量高——每次工具调用产生 1-3 条事件,高并发 Agent 场景下写入 QPS 可能达到数万;(3)查询模式偏向时间范围扫描(「过去 24 小时所有 DELETE 操作」),而非随机点查;(4)需要合理的成本控制——审计日志的数据量增长很快,全量存 Elasticsearch 的账单会超出预期。
四种主流存储方案的对比:
| 存储方案 | 查询速度 | 存储成本 | JSON 查询 | 运维复杂度 | 适用场景 |
|---|---|---|---|---|---|
| Elasticsearch | ⭐⭐⭐⭐⭐ 全文搜索 + 聚合毫秒级 | ⭐⭐ 贵——索引开销为原始数据 1.5-3 倍 | ⭐⭐⭐⭐ 原生 JSON 支持,nested query 灵活 | ⭐⭐⭐ 集群调优需要经验;大规模下 JVM 调优是必修课 | 热数据层——近 7 天的实时检索和事故排查 |
| ClickHouse | ⭐⭐⭐⭐ 列存压缩,时间范围扫描极快 | ⭐⭐⭐⭐⭐ 便宜——压缩率 5-10 倍,单机可存 TB 级 | ⭐⭐⭐ 支持 JSON 字段函数,但嵌套查询不如 ES 灵活 | ⭐⭐⭐ 单机部署简单;集群模式运维中等 | 温数据层——7-90 天的结构化查询和回放提取 |
| Loki | ⭐⭐⭐ 标签检索快,全文搜索慢 | ⭐⭐⭐⭐ 只索引标签,日志本体存对象存储(S3) | ⭐⭐ 不支持结构化 JSON 查询——本质是文本日志 | ⭐⭐⭐⭐ 和 Grafana 集成零配置;轻量运维 | 不适合审计日志——缺少对 tool_name/parameters 的结构化查询能力 |
| PostgreSQL | ⭐⭐⭐ JSONB 索引 + GIN 支持良好查询 | ⭐⭐⭐ 中等——行存储,大数据量下磁盘占用显著 | ⭐⭐⭐⭐⭐ JSONB 类型 + GIN 索引,查询能力最强 | ⭐⭐⭐⭐ 团队通常已有运维经验 | 小规模场景(< 1 亿条)或作为元数据索引层 |
推荐组合:Elasticsearch(热)+ ClickHouse(温)+ S3/对象存储(冷归档)。这个分层方案在查询性能、存储成本和运维复杂度之间达到最佳平衡。Loki 在审计日志场景中不适用——它面向非结构化文本日志,无法有效利用审计日志中 tool_name、status、parameters 的结构化字段。PostgreSQL 的 JSONB 查询能力最强,但在每天千万级写入量下,行存储的磁盘和 VACUUM 开销会成为瓶颈。
分层保留策略
审计日志不需要——也不应该——永远以相同的成本和速度存储。不同时间段的查询模式差异显著:
| 层级 | 时间窗口 | 存储后端 | 查询 SLA | 典型用途 |
|---|---|---|---|---|
| 🔥 热数据 | 0-7 天 | Elasticsearch | 毫秒级全文搜索 | 实时事故排查、on-call 告警关联 |
| 🌤 温数据 | 7-90 天 | ClickHouse | 秒级聚合查询 | 周报分析、回放数据提取、异常模式挖掘 |
| ❄️ 冷归档 | 90 天 - 1 年+ | S3/对象存储(Parquet 压缩) | 分钟级(需先恢复到 ClickHouse) | 合规审计、年度安全审查、长期趋势分析 |
三层之间的数据流转:
┌───────────────────────────────────────────────────┐
│ Agent 审计事件 │
└─────────────────┬─────────────────────────────────┘
│ 双写
┌────────────┴────────────┐
▼ ▼
┌─────────┐ ┌─────────────┐
│ ES │ 7 天后 → │ ClickHouse │ 90 天后 → ┌─────────┐
│ (热数据) │ ILM 自动 │ (温数据) │ TTL 自动 │ S3 │
│ 0-7 天 │ 迁移删除 │ 7-90 天 │ 导出压缩 │ (冷归档) │
└─────────┘ └─────────────┘ │ 90天-1年│
└─────────┘
关键实现细节:
1. ES → ClickHouse 迁移。利用 Elasticsearch 的 ILM(Index Lifecycle Management)策略——索引满 7 天或达到 50GB 后自动 rollover,新索引承接新写入,老索引通过定时任务(如 Apache Spark / 自定义 Python 脚本)批量导出到 ClickHouse。ES 端在确认 ClickHouse 写入成功后删除老索引。
2. ClickHouse → S3 归档。ClickHouse 的 TTL(Time To Live)功能可按分区自动将过期数据导出为 Parquet 格式到 S3,并删除本地存储。Parquet 的列式压缩可将审计日志压缩到原始 JSON 体积的 10-15%。
3. 冷归档查询。合规审计通常按时间范围查询(「请提供 2025 年 Q4 所有的 DELETE 操作记录」),不需要毫秒级响应。冷归档数据通过 Athena / Presto 直接查询 S3 上的 Parquet 文件,或按需恢复到 ClickHouse 的临时表中进行复杂查询。
日志体积估算
在设计存储方案之前,需要先估算日志体积——它直接影响存储成本和技术选型。审计日志体积的三个核心变量:
日志体积(GB/天)= events_per_request × requests_per_day × avg_event_size
其中:
events_per_request 每次用户请求产生的平均审计事件数
requests_per_day 每天处理的用户请求总数
avg_event_size 单条审计事件的平均大小(JSON, KB)
以一个中等规模的 Agent 服务为例:
| 变量 | 典型值 | 说明 |
|---|---|---|
events_per_request | 4-8 | 每次请求通常经历 1-3 次 LLM 推理,每次推理触发 1-2 个工具调用,加上 decision 和 approval 事件 |
requests_per_day | 100,000 | 日均 10 万次用户请求(B2B SaaS 的典型量级) |
avg_event_size | 0.5 - 2 KB | 取决于工具的返回值大小。search_docs 的结果可能只有 200 字节;delete_records 的 result 摘要约 500 字节;带有大段 rationale 的 decision 事件约 1.5 KB。平均取 1 KB |
代入公式:6 events/req × 100,000 req/天 × 1 KB = 600 MB/天(原始 JSON)。
不同存储后端的实际磁盘占用:
- Elasticsearch:600 MB × 1.8(索引膨胀系数)= 约 1.1 GB/天,7 天热数据 = ~8 GB
- ClickHouse:600 MB × 0.15(列存压缩率)= 约 90 MB/天,90 天温数据 = ~8 GB
- S3 Parquet:600 MB × 0.1(Parquet 压缩率)= 约 60 MB/天,1 年冷归档 = ~22 GB
这个估算说明了两点:(1)ClickHouse 的列存压缩在审计日志场景中优势显著——同样的原始数据,存储成本仅为 ES 的约 1/12;(2)全量存 ES 一年的费用是 ClickHouse + S3 组合方案的 10 倍以上。注意:这些是容量估算的启发式参考,不是精确保证。实际占用取决于索引策略、压缩算法、payload 大小、分片/分区设计和保留设置。如果你的 Agent 日均请求量在百万级别,建议在生产环境的实际数据上压测后再确定存储方案。
如果工具的返回值包含大段文本(如 search_docs 返回完整文档内容),建议在审计日志中只记录摘要 + 文档 ID,完整内容通过 session_id 在应用日志中引用——避免审计日志被工具返回值撑爆。
敏感数据脱敏策略
审计日志记录了工具调用的完整参数和返回值,如果缺乏有效的脱敏机制,审计日志存储本身将成为系统中最危险的数据集——攻击者不需要攻破数据库,只需要拿到审计日志的读取权限。
Section 2 已经给出了字段级脱敏的代码实现(sanitize_parameters + truncate_result),此处从策略层面补充三个关键设计:
1. 脱敏在应用层完成,不依赖存储层。审计事件在写入任何存储之前,必须已经完成脱敏。理由:(a)数据传输过程中可能被拦截——如果脱敏在 ES 端做,传输中的数据是明文;(b)双写场景下(同时写 ES 和 ClickHouse),在应用层做一次脱敏比在两个存储端各做一次更可靠且一致;(c)应用层脱敏让你可以做到字段级别的精确控制——知道哪些 key 是敏感的(api_key、token、password),而存储层只能做正则匹配。
2. 自动识别敏感字段——不只是 key 名匹配。基于 key 名的脱敏(如匹配 api_key、token)覆盖了 80% 的场景,但不够。对于参数值中嵌入的敏感数据——如一个 query 参数的值是 "SELECT * FROM users WHERE token='abc123'"——需要额外的值级别扫描:用正则匹配已知的敏感数据格式(JWT、AWS Access Key ID、GitHub PAT)。但值级别扫描的误报率较高(正常 SQL 语句中也可能出现类似格式的字符串),建议作为可选的增强层而非默认开启。
3. 脱敏 vs 可审计性的平衡。完全脱敏(所有参数都替换为 REDACTED)虽然最安全,但会导致审计日志失去事故排查的价值。一个务实的平衡策略:
- 完全掩码:api_key、token、password、secret 等认证凭据——替换为
REDACTED - 部分掩码:email、phone、user_id——保留前缀或后缀用于关联,如
user***@example.com - 哈希保留:需要唯一标识但不需要显示原文的值(如 API key 的 hash 用于判断「这个 key 是否在事故中被使用过」)——用 SHA-256 哈希替代
- 完整保留:tool_name、status、duration_ms、table name 等非敏感的结构化字段
代码示例:Python 日志写入 ClickHouse
以下代码展示了从 AuditEvent 对象到 ClickHouse 的完整写入管道——包括脱敏、批量写入和错误重试。ClickHouse 推荐使用 clickhouse-connect 库(官方 Python 驱动),它原生支持 dict 类型的参数化插入,无需拼接 SQL 字符串。
"""AuditEvent → ClickHouse 写入管道。
功能:
- 批量写入:积累 BATCH_SIZE 条事件后一次性 flush,减少网络往返
- 自动脱敏:写入前对 parameters 和 result 执行 sanitize + truncate
- 错误重试:网络抖动时自动重试(最多 3 次)
- 优雅关闭:进程退出时 flush 剩余事件
依赖:pip install clickhouse-connect
"""
from __future__ import annotations
import atexit
import logging
import time
from typing import Any, Optional
import clickhouse_connect
# ── 复用 Section 2 的脱敏函数 ──
# from audit_log.model import AuditEvent, sanitize_parameters, truncate_result
logger = logging.getLogger(__name__)
# ClickHouse 表结构(建表 DDL 参考):
#
# CREATE TABLE audit_log (
# timestamp DateTime64(3) CODEC(Delta, ZSTD),
# trace_id String CODEC(ZSTD),
# span_id String CODEC(ZSTD),
# parent_span_id Nullable(String) CODEC(ZSTD),
# agent_id LowCardinality(String),
# session_id String CODEC(ZSTD),
# event_type LowCardinality(String),
# status LowCardinality(String),
# tool_name LowCardinality(String),
# parameters String CODEC(ZSTD), -- JSON string
# result String CODEC(ZSTD), -- JSON string
# approver Nullable(String),
# error_message Nullable(String),
# error_type Nullable(String),
# duration_ms UInt32,
# metadata String CODEC(ZSTD), -- JSON string
# date Date DEFAULT toDate(timestamp) -- 分区键
# ) ENGINE = MergeTree()
# PARTITION BY toYYYYMM(date)
# ORDER BY (agent_id, event_type, timestamp)
# TTL date + INTERVAL 90 DAY DELETE;
class ClickHouseAuditWriter:
"""审计事件 → ClickHouse 批量写入器。
Usage:
writer = ClickHouseAuditWriter(host="localhost", database="audit")
for event in agent_events:
writer.write(event)
# 进程退出时自动 flush(atexit),或显式调用:
writer.flush()
"""
_TABLE_COLUMNS = [
"timestamp", "trace_id", "span_id", "parent_span_id",
"agent_id", "session_id", "event_type", "status",
"tool_name", "parameters", "result", "approver",
"error_message", "error_type", "duration_ms", "metadata",
]
def __init__(
self,
host: str = "localhost",
port: int = 8123,
username: str = "default",
password: str = "",
database: str = "audit",
table: str = "audit_log",
batch_size: int = 1000,
flush_interval: float = 5.0,
max_retries: int = 3,
):
self._table = table
self._batch_size = batch_size
self._flush_interval = flush_interval
self._max_retries = max_retries
self._buffer: list[dict[str, Any]] = []
self._last_flush = time.monotonic()
# 建立连接
self._client = clickhouse_connect.get_client(
host=host,
port=port,
username=username,
password=password,
database=database,
)
logger.info(f"ClickHouse connected: {host}:{port}/{database}.{table}")
# 优雅关闭:进程退出时自动 flush
atexit.register(self.flush)
# ── 公开 API ──
def write(self, event: "AuditEvent") -> None:
"""写入一条审计事件(先进入缓冲区,满一批后批量写入)。
调用方不需要关心批量逻辑——只管逐条 write,
flush 由 batch_size 或 flush_interval 自动触发。
"""
row = self._event_to_row(event)
self._buffer.append(row)
# 条件触发 flush
if len(self._buffer) >= self._batch_size:
self._flush()
elif time.monotonic() - self._last_flush > self._flush_interval:
self._flush()
def flush(self) -> None:
"""强制 flush 缓冲区中的所有事件。"""
self._flush()
# ── 内部实现 ──
def _event_to_row(self, event: "AuditEvent") -> dict[str, Any]:
"""将 AuditEvent 转换为 ClickHouse 行 dict。
执行脱敏 + 截断后序列化为 JSON 字符串。
"""
# 脱敏处理(复用 Section 2 的脱敏函数)
safe_params = sanitize_parameters(event.parameters or {})
safe_result = truncate_result(event.result or {})
return {
"timestamp": event.timestamp,
"trace_id": event.trace_id,
"span_id": event.span_id,
"parent_span_id": event.parent_span_id,
"agent_id": event.agent_id,
"session_id": event.session_id,
"event_type": event.event_type.value,
"status": event.status.value,
"tool_name": event.tool_name or "",
"parameters": self._safe_json_dumps(safe_params),
"result": self._safe_json_dumps(safe_result),
"approver": event.approver,
"error_message": event.error_message,
"error_type": event.error_type,
"duration_ms": event.duration_ms,
"metadata": self._safe_json_dumps(event.metadata),
}
def _flush(self) -> None:
"""批量写入 ClickHouse,失败自动重试。"""
if not self._buffer:
return
rows = self._buffer
self._buffer = []
self._last_flush = time.monotonic()
for attempt in range(1, self._max_retries + 1):
try:
self._client.insert(
table=self._table,
data=rows,
column_names=self._TABLE_COLUMNS,
)
logger.debug(f"Flushed {len(rows)} events to ClickHouse")
return
except Exception as e:
if attempt < self._max_retries:
wait = 2 ** attempt # 指数退避: 2s, 4s, 8s
logger.warning(
f"ClickHouse flush failed (attempt {attempt}/{self._max_retries}): {e}. "
f"Retrying in {wait}s..."
)
time.sleep(wait)
else:
# 最终失败:记录到错误日志,不丢失数据
logger.error(
f"ClickHouse flush failed after {self._max_retries} attempts: {e}. "
f"Dropped {len(rows)} events. Last trace_id: {rows[-1].get('trace_id', 'N/A')}"
)
# 生产环境:应写入死信队列或本地 fallback 文件
raise
@staticmethod
def _safe_json_dumps(obj: Any) -> str:
"""安全 JSON 序列化——不抛异常。"""
import json
try:
return json.dumps(obj, ensure_ascii=False, default=str)
except Exception:
return "{}"
# ── 使用示例 ──
writer = ClickHouseAuditWriter(
host="localhost",
database="audit",
batch_size=500,
)
# 构造审计事件
# from audit_log.model import AuditEvent, EventType, EventStatus
# event = AuditEvent(
# trace_id=...,
# ...
# )
# writer.write(event)
# 批量写入 1000 条后自动 flush,或显式调用:
# writer.flush()
写入管道的三个关键设计点:
1. 批量写入 vs 逐条写入。ClickHouse 推荐每批次 1,000-10,000 行——太少则网络开销占主导,太多则内存压力和失败重试成本过高。batch_size=1000 是一个保守但安全的起点,可根据实际吞吐调高。
2. LowCardinality(String) 的 ClickHouse 优化。对 agent_id、event_type、status、tool_name 这些枚举值有限的字段使用 LowCardinality 类型——ClickHouse 会将其存储为字典编码,查询性能和压缩率都大幅优于普通 String。这是 ClickHouse 在审计日志场景中存储成本低的底层原因之一。
3. 死信队列(Dead Letter Queue)。上面代码在最终写入失败时只记录错误日志,生产环境应改为写入死信队列(如本地文件、Redis List、或专门的 Kafka topic)——确保在 ClickHouse 完全不可用时(如集群宕机)不丢失审计事件。恢复后从死信队列回放。这是审计日志写入管道中最容易被忽略但最重要的一环。
五、日志回放与事故事后分析
审计日志被写入了,存储方案也选好了——但真正证明审计日志价值的是:事故发生时,你能多快定位根因?本节从三个维度回答这个问题:通过 trace_id 快速重建调用链、将日志回放用于回归测试、以及一套结构化的事故分析工作流。
3 分钟重建完整调用链
返回 Section 1 的凌晨 2:37 事故——如果有了审计日志存储,具体的排查流程如下:
第 1 步(10 秒):搜索可疑操作。用 tool_name=delete_records 在 ES 或 ClickHouse 中搜索近 24 小时的事件,按时间降序排列。定位到一条 trace_id = 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67 的 tool_call 事件。
第 2 步(20 秒):展开完整决策链。以 trace_id 为主键,拉取所有相关事件,按 timestamp 排序。输出如下(通过 parent_span_id 还原调用树):
Trace: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
Session: sess_8f3a2b1c Agent: prod-agent-03
[02:37:13.100] LLM 推理 #1 (span: a1b2c3d4e5f67891) — 312ms
├── [decision] tool=search_docs, rationale="用户询问数据清理方式"
└── [tool_call] search_docs → success (145ms, 3 docs found)
[02:37:13.557] LLM 推理 #2 (span: a3b4c5d6e7f89012) — 248ms
├── [decision] tool=delete_records, rationale="用户要求'清理临时数据',
│ 匹配到 delete_records。识别到可能影响 user_data 表,
│ 2026-05-15 前创建的行。"
├── [approval] ❌ 无审批记录 — tool=delete_records 未配置审批流
│ (approval_required 策略检查: 未命中)
└── [tool_call] delete_records → success (847ms, 12,403 rows deleted)
[02:37:14.652] LLM 最终回复: "已为您清理了 12,403 条临时数据。"
第 3 步(2.5 分钟):根因分析。从时间线中可以直接看到:
- 直接原因:LLM 在推理 #2 中将「清理临时数据」理解为
delete_records操作,且正确识别了影响的表和筛选条件——不是模型的幻觉,而是策略缺失 - 根本原因:
delete_records工具未配置审批流(approval_required 策略未命中),导致高风险操作未经人工确认直接执行 - 修复措施:为
delete_records、drop_table、truncate等数据变更类工具启用强制审批流
核心价值在于:整个排查过程不需要联系用户复现对话、不需要 grep 搜索几十 GB 的纯文本日志、不需要猜测 LLM 当时在想什么——结构化审计日志已经给出了完整的决策链。
日志回放用于回归测试
审计日志不仅用于事故分析——它记录的完整工具调用序列(trace_id → decision → tool_call → result)是回归测试的天然输入。工作流程:
┌─────────────────┐ ┌────────────────┐ ┌─────────────────┐
│ 生产审计日志 │ → │ 提取测试用例 │ → │ Staging 环境重放 │
│ (ClickHouse/ES) │ │ (trace 序列化) │ │ (新 Agent 版本) │
└─────────────────┘ └────────────────┘ └────────┬────────┘
│
┌─────────▼─────────┐
│ 对比决策差异 │
│ (旧版本 vs 新版本) │
└───────────────────┘
具体步骤:
- 筛选 golden trace:从 ClickHouse 中筛选满足条件的 trace——如「状态为 success、有人工审批通过、涉及高风险工具调用」——这些是高质量的测试用例
- 提取测试输入:从 trace 的根 decision 事件中提取用户原始输入和初始上下文,从后续 tool_call 事件中提取工具调用的完整参数
- 构造测试用例:生成标准化的测试用例格式(如 JSON),包含:输入 prompt、期望的工具调用序列、期望的审批节点
- Staging 重放:输入相同的 prompt,观察新版本 Agent 的决策路径(选择的工具、参数、是否需要审批)
- 对比差异:新版本的决策路径是否与旧版本一致?如果不一致——是改进还是回归?审批流是否正确触发?
这种方法——从生产日志中自动提取 golden dataset——比手工编写测试用例有两个优势:(1)覆盖真实世界的边界情况,而非人工设想的 happy path;(2)随着生产流量增长持续自动扩充测试集。
事故分析工作流
将审计日志融入 on-call 工程师的标准事故分析流程:
| 阶段 | 操作 | 审计日志的作用 | 耗时 |
|---|---|---|---|
| 1. 搜索 | 按 tool_name、status、时间范围搜索可疑事件 | 结构化字段索引——ES 毫秒级, ClickHouse 秒级 | 10-30 秒 |
| 2. 关联 | 以 trace_id 拉取全链路事件,按 parent_span_id 还原调用树 | span 嵌套模型自动还原 LLM 推理 → 工具调用的层级关系 | 20-60 秒 |
| 3. 时间线重建 | 按 timestamp 排序所有事件,生成人类可读的时间线 | decision → approval → tool_call 序列清晰展示操作链 | 30-60 秒 |
| 4. 根因定位 | 分析时间线中的异常:缺少 approval?LLM 决策理由是否合理? | decision.rationale + approval 状态直接回答「为什么」和「谁批准了」 | 1-3 分钟 |
关键洞察:传统应用事故分析中最耗时的步骤是「重建时间线」——你需要从分散在各个服务、各个日志文件中的片段拼凑出完整的调用序列。Agent 审计日志通过 trace_id + span_id + parent_span_id 的三级嵌套,将这一步自动化为一个查询。
代码示例:trace_id 调用链重建脚本
以下 Python 脚本接收一个 trace_id,从 ClickHouse 查询所有相关事件,重建完整的时间线——同时输出到控制台和导出 JSON 文件,便于分享给团队成员。
"""审计日志调用链重建工具。
Usage:
python trace_replay.py <trace_id> [--output timeline.json]
功能:
- 从 ClickHouse 查询指定 trace_id 的所有审计事件
- 按 timestamp + parent_span_id 还原调用树
- 输出人类可读的控制台时间线
- 导出结构化 JSON 文件
- 自动检测缺失的关键事件(如缺失 approval 的高风险 tool_call)
"""
from __future__ import annotations
import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime
from typing import Any, Optional
import clickhouse_connect
# ── 事件字段 ──
EVENT_FIELDS = [
"timestamp", "trace_id", "span_id", "parent_span_id",
"agent_id", "session_id", "event_type", "status",
"tool_name", "parameters", "result", "approver",
"error_message", "error_type", "duration_ms",
]
# 需要审批的工具列表(示例)
HIGH_RISK_TOOLS = {"delete_records", "drop_table", "truncate", "update_config", "execute_sql"}
class TraceReplay:
"""trace_id 调用链重建器。"""
def __init__(self, ch_host: str = "localhost", ch_port: int = 8123):
self._client = clickhouse_connect.get_client(
host=ch_host, port=ch_port, database="audit"
)
def replay(self, trace_id: str) -> dict[str, Any]:
"""查询并重建指定 trace 的完整调用链。"""
events = self._fetch_events(trace_id)
if not events:
raise ValueError(f"未找到 trace_id={trace_id} 的审计事件")
timeline = self._build_timeline(events)
warnings = self._detect_anomalies(events)
return {
"trace_id": trace_id,
"session_id": events[0].get("session_id", "N/A"),
"agent_id": events[0].get("agent_id", "N/A"),
"event_count": len(events),
"time_span_ms": self._calc_time_span(events),
"timeline": timeline,
"warnings": warnings,
"raw_events": events,
}
# ── 数据获取 ──
def _fetch_events(self, trace_id: str) -> list[dict]:
"""从 ClickHouse 获取指定 trace_id 的所有事件。"""
query = f"""
SELECT {', '.join(EVENT_FIELDS)}
FROM audit_log
WHERE trace_id = {{trace_id:String}}
ORDER BY timestamp ASC
"""
result = self._client.query(query, parameters={"trace_id": trace_id})
return [dict(zip(EVENT_FIELDS, row)) for row in result.result_rows]
# ── 时间线构建 ──
def _build_timeline(self, events: list[dict]) -> list[dict]:
"""重建调用链时间线。
策略:
1. 按 span_id 分组——每个 span 包含其下的 decision/tool_call/approval
2. 按 parent_span_id 还原 span 之间的父子/兄弟关系
3. 输出完整的层级时间线
"""
# —— 第一步:按 span_id 分组 ——
span_events: dict[str, list[dict]] = defaultdict(list)
for e in events:
sid = e.get("span_id", "unknown")
span_events[sid].append(e)
# —— 第二步:按 parent_span_id 建立 span 树 ——
span_tree: dict[str, list[str]] = defaultdict(list)
root_spans: list[str] = []
for sid, evts in span_events.items():
parent = evts[0].get("parent_span_id")
if parent and parent in span_events:
span_tree[parent].append(sid)
elif parent is None or parent == "":
root_spans.append(sid)
else:
# parent span 不在结果集中(可能被截断或在不同分区)
root_spans.append(sid)
# —— 第三步:按时间排序并生成时间线条目 ——
timeline = []
visited = set()
all_spans = sorted(root_spans, key=lambda s: min(
e["timestamp"] for e in span_events[s]
))
# BFS 遍历 span 树
queue = [(sid, 0) for sid in all_spans] # (span_id, depth)
while queue:
sid, depth = queue.pop(0)
if sid in visited:
continue
visited.add(sid)
evts = sorted(span_events[sid], key=lambda e: e["timestamp"])
# 尝试识别 span 类型
span_type = self._classify_span(evts)
# 添加 span 头部
first_ts = evts[0]["timestamp"]
timeline.append({
"type": "span_start",
"span_id": sid,
"span_type": span_type,
"depth": depth,
"timestamp": first_ts,
"event_count": len(evts),
})
# 添加 span 内的所有事件
for e in evts:
timeline.append({
"type": "event",
"span_id": sid,
"depth": depth + 1,
"timestamp": e["timestamp"],
"event_type": e["event_type"],
"tool_name": e.get("tool_name", ""),
"status": e["status"],
"approver": e.get("approver"),
"duration_ms": e.get("duration_ms", 0),
"error_message": e.get("error_message"),
})
# 将子 span 加入队列(按时间排序)
children = sorted(
span_tree.get(sid, []),
key=lambda s: min(e["timestamp"] for e in span_events[s]),
)
for child_sid in children:
queue.append((child_sid, depth + 1))
return timeline
def _classify_span(self, events: list[dict]) -> str:
"""通过 span 内的事件类型推断 span 类型。"""
event_types = {e["event_type"] for e in events}
if "decision" in event_types:
return "LLM 推理"
if "approval" in event_types:
return "审批节点"
if "tool_call" in event_types:
return "工具调用"
if "error" in event_types:
return "错误"
return "未知"
# ── 异常检测 ──
def _detect_anomalies(self, events: list[dict]) -> list[str]:
"""自动检测审计日志中的异常模式。"""
warnings = []
# 检测 1:高风险工具调用缺少 approval
high_risk_calls = [
e for e in events
if e["event_type"] == "tool_call"
and e.get("tool_name", "") in HIGH_RISK_TOOLS
]
for call in high_risk_calls:
span_id = call["span_id"]
has_approval = any(
e["event_type"] == "approval" and e["span_id"] == span_id
for e in events
)
if not has_approval:
warnings.append(
f"⚠️ 高风险工具调用缺少审批: tool={call['tool_name']}, "
f"span={span_id}, status={call['status']}"
)
# 检测 2:tool_call 执行失败
failed_calls = [
e for e in events
if e["event_type"] == "tool_call" and e["status"] != "success"
]
for call in failed_calls:
warnings.append(
f"❌ 工具调用失败: tool={call.get('tool_name', 'N/A')}, "
f"status={call['status']}, "
f"error={call.get('error_message', 'N/A')}"
)
# 检测 3:存在 error 事件
error_events = [e for e in events if e["event_type"] == "error"]
for err in error_events:
warnings.append(
f"💥 错误事件: type={err.get('error_type', 'N/A')}, "
f"message={err.get('error_message', 'N/A')}"
)
return warnings
# ── 工具函数 ──
def _calc_time_span(self, events: list[dict]) -> int:
"""计算 trace 的总耗时(毫秒)。"""
timestamps = [e["timestamp"] for e in events if e.get("timestamp")]
if len(timestamps) < 2:
return 0
min_ts = min(timestamps)
max_ts = max(timestamps)
if isinstance(min_ts, str):
min_ts = datetime.fromisoformat(min_ts.replace("Z", "+00:00"))
max_ts = datetime.fromisoformat(max_ts.replace("Z", "+00:00"))
return int((max_ts - min_ts).total_seconds() * 1000)
# ── 格式化输出 ──
def format_console(trace_data: dict) -> str:
"""生成人类可读的控制台时间线。"""
lines = []
lines.append(f"{'═' * 70}")
lines.append(f"Trace: {trace_data['trace_id']}")
lines.append(f"Session: {trace_data['session_id']} Agent: {trace_data['agent_id']}")
lines.append(f"Events: {trace_data['event_count']} Time Span: {trace_data['time_span_ms']}ms")
lines.append(f"{'═' * 70}")
for entry in trace_data["timeline"]:
ts = entry.get("timestamp", "")[:23] # 截断到毫秒
depth = entry.get("depth", 0)
indent = " " * depth
if entry["type"] == "span_start":
lines.append(f"\n{indent}[{ts}] {entry['span_type']} (span: {entry['span_id'][:12]}...) — {entry['event_count']} 事件")
elif entry["type"] == "event":
event_type = entry["event_type"]
tool = entry.get("tool_name", "")
status = entry["status"]
markers = []
if tool:
markers.append(f"tool={tool}")
if status:
markers.append(status)
extra = ", ".join(markers) if markers else ""
lines.append(f"{indent}├── [{event_type}] {extra} ({entry.get('duration_ms', 0)}ms)")
if trace_data["warnings"]:
lines.append(f"\n{'─' * 70}")
lines.append("⚠️ 异常检测:")
for w in trace_data["warnings"]:
lines.append(f" {w}")
lines.append(f"\n{'═' * 70}")
return "\n".join(lines)
# ── CLI 入口 ──
def main():
parser = argparse.ArgumentParser(description="Agent 审计日志调用链重建")
parser.add_argument("trace_id", help="要重建的 trace_id")
parser.add_argument("--output", "-o", help="JSON 输出文件路径")
parser.add_argument("--ch-host", default="localhost", help="ClickHouse 主机 (默认: localhost)")
parser.add_argument("--ch-port", type=int, default=8123, help="ClickHouse 端口 (默认: 8123)")
args = parser.parse_args()
replay = TraceReplay(ch_host=args.ch_host, ch_port=args.ch_port)
try:
trace_data = replay.replay(args.trace_id)
except ValueError as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
# 控制台输出
print(format_console(trace_data))
# JSON 导出
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(trace_data, f, ensure_ascii=False, indent=2, default=str)
print(f"\n✅ 时间线已导出: {args.output}")
if __name__ == "__main__":
main()
# ── 使用示例 ──
#
# $ python trace_replay.py 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67 -o timeline.json
# ══════════════════════════════════════════════════════════════════
# Trace: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
# Session: sess_8f3a2b1c Agent: prod-agent-03
# Events: 6 Time Span: 1552ms
# ══════════════════════════════════════════════════════════════════
#
# [2026-05-22T02:37:13.100] LLM 推理 (span: a1b2c3d4e5f6...) — 2 事件
# ├── [decision] tool=search_docs, success (0ms)
# ├── [tool_call] tool=search_docs, success (145ms)
#
# [2026-05-22T02:37:13.557] LLM 推理 (span: a3b4c5d6e7f8...) — 2 事件
# ├── [decision] tool=delete_records, success (0ms)
# ├── [tool_call] tool=delete_records, success (847ms)
#
# ──────────────────────────────────────────────────────────────────
# ⚠️ 异常检测:
# ⚠️ 高风险工具调用缺少审批: tool=delete_records, span=a3b4c5d6e7f8..., status=success
# ══════════════════════════════════════════════════════════════════
# ✅ 时间线已导出: timeline.json
代码的三个关键设计点:
1. Span 树的重建逻辑。_build_timeline 是核心——它通过 parent_span_id 还原 span 之间的层级关系,再通过 timestamp 排序保证时间线的正确性。这与 Section 3 中 OTEL 的 span 嵌套模型直接对应。
2. 自动异常检测。_detect_anomalies 在重建调用链的同时自动识别三类异常:高风险操作缺少审批、工具调用失败、以及显式的 error 事件。这让事故分析从「被动搜索」变为「主动发现」——on-call 工程师不需要猜测哪里出了问题,脚本直接标注。
3. 双输出模式。控制台输出用于快速人工阅读;JSON 导出用于集成到自动化管道——例如 CI/CD 在部署后自动重放关键 trace 并对比差异。
真实场景案例:隆升科技的审批流缺失事故
以下是基于真实事故模式构造的案例(公司名和细节已虚构):
背景:隆升科技(Longsheng Tech)是一家 B2B SaaS 公司,其客服 Agent 系统「智服助手」管理着约 50 万企业客户的工单数据。Agent 拥有 42 个工具,包括查询、创建、更新和删除工单。
事故经过:周二下午 3:12,一位客服主管发现上周创建的 3,200 条已关闭工单从数据库中消失了。经排查,这些工单在当天上午 10:45 被批量删除——但没有任何人手动执行过删除操作。
排查过程(使用审计日志):
- 10:48 AM — 搜索阶段(15 秒):on-call 工程师在 ClickHouse 中搜索
tool_name=delete_tickets AND timestamp > '2026-05-19T10:00:00'——返回 1 条记录,trace_id 为0199c3e1-... - 10:49 AM — 关联阶段(30 秒):以 trace_id 拉取全链路 8 条事件。时间线显示:
- 10:45:03 — 客服人员张三在对话中说「帮我归档上周已关闭的工单,清理一下数据库」
- 10:45:05 — LLM 推理 #1:选择
delete_tickets,理由是「用户要求清理数据库中的已关闭工单」 - 10:45:06 — ❌ 缺失 approval 事件。
delete_tickets工具应有审批流,但策略配置错误——审批规则中写的是delete_ticket(单数),而工具实际注册名是delete_tickets(复数) - 10:45:06 — tool_call: delete_tickets → success(删除了 3,200 条工单)
- 10:50 AM — 根因定位(1 分钟):审批流配置中的工具名拼写不匹配——
delete_ticketvsdelete_tickets——导致审批策略静默跳过。修复:统一工具命名规范 + 添加审批策略的告警机制(当配置引用的工具名在注册表中不存在时,告警而非静默忽略)。
如果没有审计日志:传统排查方式需要(a)从应用日志中 grep 所有包含「delete」的请求——可能几千条;(b)逐一检查每条请求是否经过了审批——审批记录在另一个独立的系统中;(c)回放对话历史还原上下文——需要找到对应 session 的完整对话。整个过程预计需要 2-4 小时,且需要多人协作。
关键教训:(1)工具名的精确匹配至关重要——审计日志中的 tool_name 应与工具注册表、审批策略中的引用完全一致;(2)审批策略的「静默跳过」是最危险的行为——策略引擎在找不到匹配规则时,应该默认拒绝而非默认放行;(3)trace_replay.py 这类工具的自动异常检测(缺失 approval)可以在事故扩大前就发出告警。
六、审计日志与评测系统的集成
审计日志的核心用途——事故分析、合规审计、日志回放——在与评测系统集成后,会产生一个额外的复合价值:审计日志为评测框架提供真实、持续、自动更新的评测数据,评测框架反过来验证 Agent 版本升级时决策质量是否退化。本节聚焦于这个双向集成。
审计日志如何为评测系统提供真实数据
在 Agent 评测框架设计中,评测数据的获取通常依赖于两种方式:手工编写测试用例(费时、覆盖面窄)或从生产日志中提取(需要额外的管道)。审计日志通过其结构化的 trace_id → decision → tool_call → result 序列,天然地解决了第二种方式的输入问题。
具体而言,审计日志可以为评测框架提供三类评测数据:
| 评测数据类型 | 从审计日志中如何提取 | 评测用途 |
|---|---|---|
| Golden Dataset (标准答案集) | 筛选 status=success + 有人工审批通过的 trace,提取用户输入 → tool_call 序列作为「正确解答」 | 回归测试——新版本 Agent 对相同输入是否产生相同/更好的工具调用序列 |
| 边界用例 (Edge Cases) | 筛选 status=failure 或 status=timeout 的 trace,提取导致失败的输入和上下文 | 鲁棒性测试——新版本 Agent 在已知失败场景中是否有所改进 |
| 异常行为样本 (Anomaly Samples) | 筛选「高风险工具调用缺少 approval」或「tool_call 序列偏离常规模式」的 trace | 安全性评测——新版本 Agent 的审批策略是否有效拦截了异常行为模式 |
关键设计原则:golden dataset 的质量取决于筛选条件。不是所有 status=success 的 trace 都适合作为标准答案——一个「成功」的调用可能包含了不合理的工具选择(如用户要求「查看」但 Agent 执行了「修改」)。人工审批通过的 trace 是更可靠的 golden 来源,因为它经过了人工验证。在实践中,建议用三层筛选来提取 golden dataset:
- 第一层——状态筛选:status=success + 审批状态=approved(如果工具需审批)
- 第二层——置信度筛选:LLM decision 的 rationale 中包含高置信度标记(如「明确」「确认」「匹配」等关键词)
- 第三层——结果验证:tool_call 的 result 与预期一致(如返回的文档相关度 > 阈值)
从日志中自动提取 Golden Dataset
以下代码展示了从 ClickHouse 中查询并生成评测数据的完整管道。输出格式兼容 Agent 评测框架设计中定义的评测数据集 schema:
"""从审计日志中生成 Agent 评测数据集。
将 ClickHouse 中的生产审计日志转换为评测框架可消费的评测用例。
输出格式兼容 agent-evaluation-framework 的 test_case schema。
Usage:
python generate_eval_dataset.py --days 7 --output eval_dataset.json
"""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import clickhouse_connect
# ── 评测用例的数据结构 ──
class EvalTestCase:
"""单个评测用例——兼容 agent-evaluation-framework 格式。
Fields:
id: 唯一标识(使用 trace_id)
input: 用户原始输入 + 上下文
expected_tools: 期望的工具调用序列(名称 + 参数)
expected_approvals: 期望的审批节点
metadata: 来源 trace 的元信息
difficulty: 自动推断的难度等级
"""
def __init__(
self,
trace_id: str,
user_input: str,
expected_tools: list[dict[str, Any]],
expected_approvals: list[str],
metadata: dict[str, Any],
):
self.id = trace_id
self.input = user_input
self.expected_tools = expected_tools
self.expected_approvals = expected_approvals
self.metadata = metadata
self.difficulty = self._infer_difficulty(expected_tools)
def _infer_difficulty(self, tools: list[dict]) -> str:
"""根据工具调用链的复杂度自动推断难度。"""
high_risk = {"delete", "drop", "truncate", "update", "execute"}
tool_names = [t.get("tool_name", "") for t in tools]
if len(tools) > 5:
return "hard"
if any(any(risk in name for risk in high_risk) for name in tool_names):
return "hard"
if len(tools) > 2:
return "medium"
return "easy"
def to_json(self) -> dict:
return {
"id": self.id,
"input": self.input,
"expected_tools": self.expected_tools,
"expected_approvals": self.expected_approvals,
"metadata": self.metadata,
"difficulty": self.difficulty,
}
# ── Golden Dataset 生成器 ──
class GoldenDatasetGenerator:
"""从 ClickHouse 审计日志中提取 Golden Dataset。
三层筛选策略:
1. 状态筛选:status=success + 有审批的已通过
2. 置信度筛选:decision rationale 包含高置信度标记
3. 结果验证:tool_call 返回了有意义的结果
Usage:
gen = GoldenDatasetGenerator(ch_host="localhost")
cases = gen.generate(days=7, limit=500)
gen.export_json(cases, "golden_dataset.json")
"""
# 高置信度关键词——decision rationale 中包含这些词的 trace 更可靠
HIGH_CONFIDENCE_MARKERS = [
"明确", "确认", "匹配", "对应", "要求",
"指定", "直接", "精确",
]
# 需要人工审批的工具(高风险)
HIGH_RISK_TOOLS = {"delete_records", "drop_table", "truncate", "update_config", "execute_sql"}
def __init__(self, ch_host: str = "localhost", ch_port: int = 8123):
self._client = clickhouse_connect.get_client(
host=ch_host, port=ch_port, database="audit"
)
def generate(self, days: int = 7, limit: int = 500) -> list[EvalTestCase]:
"""生成 golden dataset。
Args:
days: 从最近多少天中提取数据。
limit: 最大生成的测试用例数。
"""
# Step 1: 获取候选 trace_id——成功的、有审批通过记录的
candidate_traces = self._get_candidate_traces(days, limit * 3) # 多取一些,后续筛选
if not candidate_traces:
return []
# Step 2: 逐个 trace 提取完整信息并评分
cases = []
for trace_id in candidate_traces:
if len(cases) >= limit:
break
test_case = self._build_test_case(trace_id)
if test_case:
cases.append(test_case)
# Step 3: 按难度分层抽样,确保数据集的多样性
return self._stratified_sample(cases, limit)
# ── 内部实现 ──
def _get_candidate_traces(self, days: int, limit: int) -> list[str]:
"""查询候选 trace_id——状态成功 + 包含审批记录。"""
since = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
# 查找满足条件的 trace_id:
# - 有 tool_call 事件且 status=success
# - 如果是高风险工具,必须有 approval 事件且 status=approved
query = """
SELECT trace_id, count() as event_count
FROM audit_log
WHERE timestamp >= {since:String}
AND status = 'success'
GROUP BY trace_id
HAVING event_count >= 2
ORDER BY event_count DESC
LIMIT {limit:UInt32}
"""
result = self._client.query(
query,
parameters={"since": since, "limit": limit},
)
return [row[0] for row in result.result_rows]
def _build_test_case(self, trace_id: str) -> Optional[EvalTestCase]:
"""为一个 trace 构建评测用例。"""
events = self._fetch_trace_events(trace_id)
if len(events) < 2:
return None
# —— 三层筛选 ——
# 第一层:状态筛选
has_failed = any(e.get("status") not in ("success", "approved") for e in events)
if has_failed:
return None
# 第二层:置信度筛选——decision rationale 中是否包含高置信度关键词
decisions = [e for e in events if e.get("event_type") == "decision"]
if decisions:
# decision 的 rationale 存储在 metadata 或单独的 result 字段中
# 此处从 metadata JSON 中提取
high_confidence = False
for d in decisions:
metadata = self._parse_json_field(d.get("metadata", "{}"))
rationale = metadata.get("rationale", "")
if any(marker in rationale for marker in self.HIGH_CONFIDENCE_MARKERS):
high_confidence = True
break
if not high_confidence:
return None
# 第三层:结果验证——tool_call 有实际返回内容
tool_calls = [e for e in events if e.get("event_type") == "tool_call"]
for tc in tool_calls:
result = tc.get("result", "")
if not result or result in ("{}", "null", ""):
return None
# —— 提取评测用例信息 ——
# 工具调用序列
expected_tools = []
for tc in sorted(tool_calls, key=lambda e: e["timestamp"]):
expected_tools.append({
"tool_name": tc.get("tool_name", ""),
"parameters": self._parse_json_field(tc.get("parameters", "{}")),
"expected_status": tc.get("status", "success"),
})
# 审批节点
approvals = [e for e in events if e.get("event_type") == "approval"]
expected_approvals = [a.get("tool_name", "") for a in approvals]
# 用户输入(从第一个 decision 的 metadata 中提取)
first_decision = decisions[0] if decisions else None
metadata = self._parse_json_field(first_decision.get("metadata", "{}")) if first_decision else {}
user_input = metadata.get("prompt_summary", metadata.get("user_input", "N/A"))
return EvalTestCase(
trace_id=trace_id,
user_input=user_input,
expected_tools=expected_tools,
expected_approvals=expected_approvals,
metadata={
"source": "production_audit_log",
"trace_id": trace_id,
"session_id": events[0].get("session_id", ""),
"agent_id": events[0].get("agent_id", ""),
"timestamp": str(events[0].get("timestamp", "")),
"model": metadata.get("model", "N/A"),
"event_count": len(events),
"extraction_method": "golden_dataset_v1",
},
)
def _fetch_trace_events(self, trace_id: str) -> list[dict]:
"""获取指定 trace 的所有事件。"""
fields = [
"timestamp", "trace_id", "span_id", "event_type", "status",
"tool_name", "parameters", "result", "approver", "metadata",
]
query = f"""
SELECT {', '.join(fields)}
FROM audit_log
WHERE trace_id = {{trace_id:String}}
ORDER BY timestamp ASC
"""
result = self._client.query(query, parameters={"trace_id": trace_id})
return [dict(zip(fields, row)) for row in result.result_rows]
@staticmethod
def _parse_json_field(value: Any) -> Any:
"""安全解析 JSON 字段。ClickHouse 中的 JSON 存储为 String。"""
if value is None:
return {}
if isinstance(value, (dict, list)):
return value
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return {}
def _stratified_sample(
self, cases: list[EvalTestCase], limit: int
) -> list[EvalTestCase]:
"""按难度分层抽样,保证数据集多样性。"""
if len(cases) <= limit:
return cases
# 按难度分组
by_difficulty: dict[str, list[EvalTestCase]] = {"easy": [], "medium": [], "hard": []}
for c in cases:
by_difficulty[c.difficulty].append(c)
# 按比例抽样:easy 30%, medium 40%, hard 30%
ratios = {"easy": 0.3, "medium": 0.4, "hard": 0.3}
sampled = []
for level, ratio in ratios.items():
pool = by_difficulty[level]
n = max(1, int(limit * ratio))
sampled.extend(pool[:n])
return sampled[:limit]
def export_json(self, cases: list[EvalTestCase], filepath: str) -> None:
"""导出评测数据集为 JSON 文件。"""
dataset = {
"meta": {
"generator": "golden_dataset_from_audit_log",
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_cases": len(cases),
"schema_version": "1.0",
"compatible_with": "agent-evaluation-framework",
},
"cases": [c.to_json() for c in cases],
}
with open(filepath, "w", encoding="utf-8") as f:
json.dump(dataset, f, ensure_ascii=False, indent=2)
print(f"✅ 导出了 {len(cases)} 条评测用例 → {filepath}")
def print_summary(self, cases: list[EvalTestCase]) -> None:
"""打印数据集摘要。"""
by_diff = {"easy": 0, "medium": 0, "hard": 0}
by_tool: dict[str, int] = {}
for c in cases:
by_diff[c.difficulty] += 1
for t in c.expected_tools:
name = t.get("tool_name", "unknown")
by_tool[name] = by_tool.get(name, 0) + 1
print(f"\n{'─' * 50}")
print(f"📊 Golden Dataset 摘要")
print(f" 总用例数: {len(cases)}")
print(f" 难度分布: Easy={by_diff['easy']}, Medium={by_diff['medium']}, Hard={by_diff['hard']}")
print(f" 工具分布 (Top 5):")
for name, count in sorted(by_tool.items(), key=lambda x: -x[1])[:5]:
print(f" - {name}: {count}")
print(f"{'─' * 50}")
# ── CLI 入口 ──
def main():
parser = argparse.ArgumentParser(description="从审计日志生成 Agent 评测数据集")
parser.add_argument("--days", type=int, default=7, help="提取最近 N 天的数据 (默认: 7)")
parser.add_argument("--limit", type=int, default=500, help="最大生成用例数 (默认: 500)")
parser.add_argument("--output", "-o", default="golden_dataset.json", help="输出文件路径")
parser.add_argument("--ch-host", default="localhost", help="ClickHouse 主机")
parser.add_argument("--ch-port", type=int, default=8123, help="ClickHouse 端口")
args = parser.parse_args()
gen = GoldenDatasetGenerator(ch_host=args.ch_host, ch_port=args.ch_port)
cases = gen.generate(days=args.days, limit=args.limit)
gen.print_summary(cases)
gen.export_json(cases, args.output)
if __name__ == "__main__":
main()
# ── 使用示例 ──
#
# $ python generate_eval_dataset.py --days 30 --limit 200 -o golden_v2.json
# ──────────────────────────────────────────────────
# 📊 Golden Dataset 摘要
# 总用例数: 200
# 难度分布: Easy=60, Medium=80, Hard=60
# 工具分布 (Top 5):
# - search_docs: 145
# - create_ticket: 87
# - delete_records: 34
# - update_config: 23
# - send_notification: 18
# ──────────────────────────────────────────────────
# ✅ 导出了 200 条评测用例 → golden_v2.json
异常模式自动检测
除了生成评测数据,审计日志的时间序列特性使其天然适合异常检测——不需要复杂的 ML 模型,简单的规则和统计就能发现大量有意义的异常:
| 异常类型 | 检测方法 | 触发条件示例 | 评测用途 |
|---|---|---|---|
| 工具调用序列偏离 | 马尔可夫链——统计 tool A → tool B 的转移概率,标记低概率转移 | search_docs → delete_records(正常转移概率 0.1%,某 trace 却出现了) | 输入到安全评测——新版本 Agent 是否仍在这些异常转移中触发? |
| 权限异常提升 | 工具权限等级矩阵——标记 tool_call 中调用了超出 agent_id 权限范围的工具 | readonly-agent-05 调用了 delete_records | 安全评测——权限控制策略是否在新版本中有效? |
| 审批异常静默 | 高风险工具 × 无 approval 事件 = 告警 | delete_records 的 tool_call 前后 60 秒内无 approval 事件 | 策略评测——新版本的审批流配置是否覆盖了所有高风险工具? |
| 耗时异常 | 滑动窗口 Z-score——工具调用 duration_ms 偏离均值 > 3σ | search_docs 平时 150ms,某次耗时 8,500ms | 性能评测——新版本 Agent 的工具调用延迟是否退化? |
| 结果异常短 | 工具返回值大小偏离均值(result_size 字段) | search_docs 通常返回 5-20 条结果,某次返回 0 条 | 质量评测——新版本是否更频繁地触发空结果? |
实现建议:异常检测管道应作为独立的后台任务运行(如每小时一次的定时任务),将检测到的异常 trace 写入专门的 audit_anomalies 表或推送到告警系统。不要在生产 Agent 的热路径上做异常检测——这会增加工具调用的延迟。
与评测流水线的集成架构
审计日志与评测系统的完整集成架构如下:
┌──────────────────────────────────────────────────────────────┐
│ 生产环境 │
│ ┌─────────┐ ┌──────────┐ ┌──────────────────────────┐ │
│ │ Agent │ → │ 审计日志 │ → │ ClickHouse / ES 日志存储 │ │
│ │ 服务 │ │ 写入管道 │ └───────────┬──────────────┘ │
│ └─────────┘ └──────────┘ │ │
└──────────────────────────────────────────────┼────────────────┘
│
┌──────────────────────────┘
│ 定时任务 (cron / Airflow)
▼
┌──────────────────────────────────────────────────────────────┐
│ 评测数据生成管道 │
│ │
│ ┌──────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ Golden │ │ 异常检测 │ │ 边界用例提取 │ │
│ │ Dataset 提取 │ │ (序列偏离/权限) │ │ (failure trace)│ │
│ └──────┬───────┘ └───────┬────────┘ └───────┬────────┘ │
│ │ │ │ │
│ └───────────────────┼─────────────────────┘ │
│ ▼ │
│ ┌────────────────┐ │
│ │ 评测数据集存储 │ │
│ │ (JSON / DB) │ │
│ └───────┬────────┘ │
└────────────────────────────┼──────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ 评测执行流水线 │
│ │
│ ┌──────────────┐ ┌────────────┐ ┌────────────────────┐ │
│ │ Staging │ │ 评测框架 │ │ 评测结果 + 报告 │ │
│ │ Agent (新版本)│ → │ 运行用例 │ → │ (pass/fail/diff) │ │
│ └──────────────┘ └────────────┘ └────────┬───────────┘ │
│ │ │
│ 反馈到 ────────────┘ │
│ ┌────────────────┐ │
│ │ CI/CD 质量门禁 │ │
│ │ (回归检测) │ │
│ └────────────────┘ │
└──────────────────────────────────────────────────────────────┘
架构的关键设计点:
1. 数据生成与评测执行分离。Golden dataset 的提取是离线批处理(每小时/每天一次),不依赖 Agent 的实时请求。评测执行则可以在 CI/CD 管道中按需触发(每次部署前)。两个阶段的解耦保证了评测数据的新鲜度,同时不影响生产 Agent 的性能。
2. 三层数据源。不依赖单一数据源——golden dataset(正向验证)、异常检测(负向/安全验证)、边界用例(鲁棒性验证)三者组合,覆盖了评测框架需要的大部分数据类型。
3. CI/CD 质量门禁。评测结果直接反馈到部署管道——如果新版本 Agent 在 golden dataset 上的 pass rate 低于阈值(如 95%),或异常检测发现新的安全问题,部署自动阻断。这就是审计日志在工程实践中的最终价值落点:从生产日志到质量门禁的闭环。
三个集成阶段
审计日志与评测系统的集成不是一步到位的——建议分三个阶段逐步推进:
| 阶段 | 能力 | 技术产出 | 时间投入 |
|---|---|---|---|
| Phase 1: 数据导出 | 从 ClickHouse 手动导出 JSON → 评测框架消费 | generate_eval_dataset.py(如上) | 1-2 天 |
| Phase 2: 自动化管道 | 定时任务自动提取 + 异常检测 → 评测数据集自动更新 | Airflow DAG / cron + 上述脚本 | 3-5 天 |
| Phase 3: CI/CD 集成 | 评测结果 → 质量门禁 → 自动阻断部署 | GitHub Actions / GitLab CI 集成 | 3-5 天 |
三个阶段的推进逻辑是:先让数据流起来(Phase 1),再让流程自动化(Phase 2),最后让质量门禁生效(Phase 3)。每个阶段都独立产生价值——不需要等到 Phase 3 才开始使用评测数据。
关于评测框架本身的架构设计和评测指标的完整定义,请参见 Agent 评测框架设计——该文详细讨论了评测维度(正确性、安全性、效率)、评分算法、以及评测结果的可视化方案。本节聚焦于审计日志到评测框架的数据输入管道——两者配合构成了 Agent 质量保障的完整链条。
七、开源与商业方案对比
前面六节讲了「为什么需要审计日志」和「怎么设计数据模型与存储」(Section 1-3)、「怎么实现 SDK」(Section 4)、「怎么与评测框架集成」(Section 5-6)。现在到了落地前的最后一步:你该用什么基础设施来承载这套审计系统?——开源自建还是买商业方案?
7.1 挑战:审计日志基础设施的选择困境
审计日志的存储和查询需求与普通应用日志有显著差异:高写入吞吐(每次 Agent 推理循环产生 3-10 条事件)、结构化字段查询(按 trace_id、tool_name、approver、event_type 多维度检索)、长期保留(合规要求可能长达半年到数年)。同时,团队的能力、预算和合规约束决定了你是否有精力自建——还是有理由购买商业产品。
下面的对比覆盖当前主流的开源和商业方案,评估它们各自在 Agent 审计日志场景下的适配度。
7.2 开源方案:OpenTelemetry + 可观测性三件套
开源路线的标准组合是:OpenTelemetry(数据采集和导出)+ Jaeger/Zipkin(Trace 可视化)+ Elasticsearch / ClickHouse(结构化搜索和存储)。
- OpenTelemetry(OTel)——作为审计事件的数据采集层。通过自定义 Span Processor 或 Log Exporter,将 JSON 审计事件注入到 OTel 管道中。Span 的 trace_id 和 span_id 直接复用为审计日志的核心标识符。
- Jaeger——免费的分布式追踪 UI。可以通过 Span Tags 展示审计事件的 event_type、tool_name、duration_ms 等关键字段。但 Jaeger 的搜索能力有限——它是为追踪可视化设计的,不是为审计检索设计的。
- Elasticsearch / ClickHouse——审计日志的结构化存储和查询层。Elasticsearch 适合全文搜索和 Kibana 可视化;ClickHouse 适合高吞吐写入和聚合分析。对于审计日志场景,通常两者选其一:注重 UI 和告警选 ES+Kibana,注重查询性能和压缩率选 ClickHouse+Grafana。
开源方案 PoC:OpenTelemetry + Jaeger + Elasticsearch
以下是一个可直接运行的 docker-compose.yml,搭建一个最小化的审计日志基础设施——适合在本地或小规模生产环境验证。
# docker-compose.yml — Agent 审计日志最小可用基础设施
version: "3.9"
services:
# ── 1. OpenTelemetry Collector:接收审计事件并路由到后端 ──
otel-collector:
image: otel/opentelemetry-collector-contrib:0.102.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
depends_on:
- jaeger
- elasticsearch
# ── 2. Jaeger:Trace 可视化 ──
jaeger:
image: jaegertracing/all-in-one:1.57
environment:
- COLLECTOR_OTLP_ENABLED=true
- SPAN_STORAGE_TYPE=elasticsearch
- ES_SERVER_URLS=http://elasticsearch:9200
ports:
- "16686:16686" # Jaeger UI
- "4317" # OTLP gRPC (内部分发)
depends_on:
- elasticsearch
# ── 3. Elasticsearch + Kibana:结构化搜索与可视化 ──
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:8.13.4
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 5s
send_batch_size: 512
exporters:
# 路由到 Jaeger —— trace 可视化
otlp/jaeger:
endpoint: jaeger:4317
tls:
insecure: true
# 路由到 Elasticsearch —— 结构化审计日志存储
elasticsearch:
endpoints: ["http://elasticsearch:9200"]
logs_index: "agent-audit-logs"
# 按日期滚动索引,便于归档和清理
logs_dynamic_index:
enabled: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp/jaeger]
logs:
receivers: [otlp]
processors: [batch]
exporters: [elasticsearch]
这套开源方案的核心优势是零许可成本和完全的数据控制权——所有数据留存在你自己的基础设施中。但代价也是显而易见的:你需要有人维护 Elasticsearch 集群、处理索引滚动、管理 Jaeger 的存储——对于缺乏 DevOps 人力的团队(一般 < 5 人),这个运维负担不可忽视。
7.3 商业方案对比
以下对比表覆盖当前主流的 Agent 可观测性商业产品,聚焦于它们对审计日志场景的适配度——不是评测谁更好,而是帮你判断哪种场景下该选哪种。
| 维度 | LangSmith | Weights & Biases | Datadog LLM | Arize |
|---|---|---|---|---|
| 核心定位 | LangChain 生态的 LLM 应用调试与评测平台 | ML 实验追踪 + LLM 链路追踪 | 全栈可观测性(APM + 日志 + LLM) | LLM 评测与生产监控 |
| 审计日志支持 | ✅ Trace + Run 事件,原生支持 Agent 决策链追踪 | ⚠️ 偏 ML 训练追踪,Agent 审计需手动集成 | ⚠️ LLM Span 支持较好,但缺乏 Agent 专用语义(decision/approval) | ⚠️ 侧重评测指标而非审计链路 |
| 集成难度 | 低——LangChain 原生集成,几行代码 | 中——需配置 W&B callback | 中——需安装 datadog-agent 并配置 ddtrace | 中——需使用 Arize SDK 手动插桩 |
| 审批链路 | ❌ 不支持原生审批事件 | ❌ 不支持 | ❌ 不支持 | ❌ 不支持 |
| 数据隐私 | ⚠️ 数据上传到 LangSmith 云端(有私有部署选项) | ⚠️ 默认云端,需额外配置私有化 | ⚠️ 数据发送到 Datadog 服务器 | ⚠️ 默认云端 |
| 成本(月/团队) | 有免费 tier;付费版按 seat 计费——采购前核实最新定价 | 有免费 tier;付费版按 seat 计费——采购前核实最新定价 | 按用量计费(per-host + per-span)——采购前核实最新定价 | 开源核心(Apache 2.0)免费自托管;云托管按用量计费 |
| vendor lock-in | 中——通过 LangChain 抽象层,可替换 | 中 | 高——深度绑定 Datadog 生态 | 低——通过 OpenTelemetry 可迁移 |
| 合规认证 | 支持企业安全选项——在供应商 Trust Center 核实最新 SOC 2/HIPAA/ISO 声明 | 支持企业安全选项——在供应商 Trust Center 核实最新 SOC 2/HIPAA/ISO 声明 | 支持企业安全选项——在供应商 Trust Center 核实最新 SOC 2/HIPAA/ISO 声明 | 开源(Apache 2.0)自托管;云托管版——在供应商 Trust Center 核实认证 |
| 适用场景 | LangChain 技术栈团队,快速迭代阶段的 Agent 应用 | ML 团队同时做模型训练和 Agent 应用 | 已有 Datadog 基础设施的企业,需 LLM 可观测扩展 | 评测驱动的团队,监控+评测一体化 |
注:以上信息基于 2026 年 5 月的公开产品能力和定价,具体情况请查阅各产品最新文档。
7.4 决策框架:三步定位你的方案
面对这么多选项,不要陷入「比较一切」的陷阱。用下面这个三步决策树,在 5 分钟内确定你该走哪条路。
| 决策条件 | 推荐方案 | 核心理由 |
|---|---|---|
| 团队 < 5 人 / 预算有限 | 开源自建(OTel + ES/Jaeger) | 零许可成本;小规模流量下运维负担可控。先用 Docker Compose 跑起来,Phase 2 再上 Elastic Cloud 避免自运维 ES |
| 需要合规认证(SOC 2 / HIPAA / 等保) | 自建 + 合规加固,或选 Datadog(已有 SOC 2/HIPAA) | 数据不出 VPC 是合规底线。在供应商 Trust Center 核实商业方案的最新认证状态后评估 |
| 快速迭代 / 验证阶段 | LangSmith(LangChain)或 OpenTelemetry Lite(非 LangChain) | 快速接入,把人力集中在产品本身。等审计需求成熟后,再评估是否迁移到自建方案 |
| 已有 Datadog / 可观测性基础设施 | Datadog LLM Observability | 复用现有基础设施、告警通道、权限体系。但需注意:审批链路仍需自定义开发 |
| Agent 框架非 LangChain | OpenTelemetry + 自建存储 | OTel 是框架无关的标准协议。本系列 SDK 通过 OTLP exporter 对接任何 OTel 后端 |
决策的核心原则:不要因为「以后可能需要」而提前购买商业方案。审计日志系统的升级路径是开放的——你完全可以从开源方案起步,Phase 1-2 跑通后再决定是否引入商业产品。参见下一节的实施路线图。
八、实施路线图
审计日志系统不需要一次性建成。和大多数基础设施系统一样,推荐渐进式交付——每个阶段独立产生价值,前一个阶段的沉淀自然成为下一个阶段的基础。
8.1 四阶段路线图总览
| 阶段 | 时间 | 人力 | 核心交付 | 到达什么状态 | 投入产出比 |
|---|---|---|---|---|---|
| Phase 1 | 1-2 周 | 1 人(后端) | trace_id + JSON 结构化日志 + 文件存储 | 能通过 grep / jq 排查单条 trace | ⭐⭐⭐⭐⭐ 极高 |
| Phase 2 | 2-4 周 | 1-2 人(后端 + DevOps) | Elasticsearch/Kibana 或 Grafana Loki | 按 trace_id/tool/event_type 搜索 + 仪表盘 | ⭐⭐⭐⭐ 高 |
| Phase 3 | 1-2 月 | 2-3 人(后端 + QA) | 日志回放引擎 + CI 集成 + regression test suite | CI 中自动运行回放测试,阻断回归 | ⭐⭐⭐ 中高 |
| Phase 4 | 持续 | 1 人(SRE + 后端) | 规则告警 + ML 异常检测 | 自动化发现异常调用模式并告警 | ⭐⭐⭐ 中 |
8.2 Phase 1(1-2 周):最小可用审计
目标:在 2 周内让每一笔 Agent 调用都有迹可循——哪怕只能用命令行查。
核心工作:
- 集成审计 SDK——将 Section 4 的 Python SDK 嵌入到 Agent 推理循环中。在每次 LLM 决策、工具调用、审批事件前后插入
sdk.record_decision、sdk.record_tool_call等调用。 - 生成 trace_id——复用 OTel SDK 生成的 32-char hex trace_id(与 Jaeger/Tempo 兼容)。如未使用 OTel,可用 UUID v7 作为自定义相关 ID。在 Agent 入口处初始化 trace_id,通过 context 传递到所有子调用。
- 文件存储——每个 trace 输出一行 JSON,按日期滚动文件(如
/var/log/agent-audit/2026-05-22.jsonl)。 - 验证——触发几笔典型的 Agent 调用,用
grep <trace_id> /var/log/agent-audit/*.jsonl | jq确认链路完整。
# Phase 1 验证:通过命令行排查一条 trace
$ grep "0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67" /var/log/agent-audit/2026-05-22.jsonl | jq '[.event_type, .tool_name, .status, .metadata.rationale]'
# 输出示例——完整的决策→审批→执行链一目了然
["decision","delete_records","success","User requested cleanup of temp data from last week"]
["approval","delete_records","approved",null]
["tool_call","delete_records","success",null]
["tool_result","delete_records","success",null]
投入产出评估:1 人 × 1-2 周,实现从「事故排查 2 小时」到「3 分钟定位」的效率飞跃。这是整个路线图中 ROI 最高的阶段——哪怕后续阶段不做,你已经获得了 95% 的事故排查效率提升。
8.3 Phase 2(2-4 周):日志可视化与搜索
目标:告别命令行,让非工程师(产品、合规、QA)也能通过 UI 搜索和可视化审计日志。
核心工作:
- 接入 Elasticsearch + Kibana(或 Grafana Loki + Grafana)。如果团队规模小、不想自运维 ES,用 Elastic Cloud 或 Grafana Cloud 的 managed 服务替代。
- 配置索引模板——为审计日志建立 ES index template,确保 event_type、tool_name、approver、status 等关键字段被正确索引(mapping 为 keyword 类型,支持精确搜索和聚合)。
- 搭建仪表盘——至少 3 个核心面板:
- 工具调用量趋势:按 tool_name 分组的调用次数时间序列,监控异常流量
- 审批通过率:按工具维度展示 approve/reject/pending 分布,发现过度宽松或过度严格的审批模式
- 错误率与延迟:按 tool_name 分组展示 status=failure 的比例和 P50/P95/P99 duration_ms
投入产出评估:1-2 人 × 2-4 周。从「只有后端能查」到「全团队可自助检索」,显著降低排查沟通成本。如果团队已经有 ES/Kibana 基础设施(很多公司都有),这个阶段可能缩短到 1 周——只需配置索引和仪表盘。
8.4 Phase 3(1-2 月):日志回放与自动化测试
目标:利用生产审计日志构建 golden test suite,在 CI 中自动回归——每次 Agent Prompt 或 Tool 变更,都知道是否破坏了已有行为。
核心工作:
- 回放引擎——从审计日志中提取成功的 trace,重建为测试用例(Section 6 已给出实现)。
- CI 集成——将回放测试写入 GitHub Actions / GitLab CI 流程。在 PR 合并前自动运行回归测试套件,对比新旧 Prompt/工具下的 Agent 行为差异。
- 用例管理——建立测试用例库(至少积累 50-100 个典型 trace),按场景分类(正常操作 / 边界情况 / 需要审批的操作)。每次事故修复后,将相关 trace 加入回归套件。
# Phase 3 .github/workflows/agent-regression-test.yml
name: Agent Regression Test
on:
pull_request:
paths:
- "agents/**" # Agent prompt 或工具定义变更
- "tools/**" # 工具实现变更
jobs:
regression:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run audit log replay tests
run: |
python -m audit_log.replay \
--test-suite tests/suites/regression_suite_v1.json \
--parallel 4 \
--output test-results/regression_report.json
- name: Validate regression results
run: |
python -m audit_log.validate \
--report test-results/regression_report.json \
--max-degradation-rate 0.05 # 不超过 5% 的 trace 产生行为差异
投入产出评估:2-3 人 × 1-2 月。这是投入最大的阶段——但收益也是结构性的:从此 Prompt 工程师不再靠「我感觉效果变好了」来判断变更是否安全,而是有数据支撑的质量门禁。
8.5 Phase 4(持续):异常检测与告警
目标:从「被动排查」到「主动发现」——在异常发生时就收到告警,而不是等用户报告。
核心工作:
- 规则告警(Week 1-2):
- 高风险工具(DELETE / WRITE / EXEC)无审批记录时告警
- 工具调用失败率在 5 分钟内超过阈值(如 10%)时告警
- 单个 Agent session 的 tool_call 次数超过上限(如 50 次),可能陷入循环
- ML 异常检测(持续迭代):
- 基于历史审计日志训练每个工具的「正常调用模式」——参数分布、调用频率、耗时基线
- 在线检测偏离——当 LLM 开始以异常模式调用工具(如突然给一个通常只读的工具传入写入参数),触发告警
- 简单实现可以用 ES 的 anomaly detection job 或简单的 Z-score 阈值——不需要一开始就上重型 ML 管道
# Phase 4 规则告警示例:高风险工具无审批
# 在 Kibana Alerting / Grafana Alerting 中定义
alert:
name: "高风险工具缺少审批"
query: |
event_type:"tool_call"
AND tool_name:("delete_records" OR "drop_table" OR "send_email" OR "execute_sql")
AND NOT (
event_type:"approval" AND approval_status:"approved"
AND trace_id:{matched_trace_id}
)
window: 5m
threshold: >= 1
severity: critical
notification: PagerDuty / 企业微信 / Slack #oncall
投入产出评估:1 人(SRE 背景)+ 持续迭代。规则告警部分 1-2 周即可上线并立即生效,ML 异常检测是锦上添花——在审计日志数据积累 3 个月以上后再启动,效果更佳。
8.6 路线图使用建议
四个阶段不是必须全部执行。一个务实的建议:
- 所有团队都应该完成 Phase 1——它是基础,ROI 极高,且为后续所有阶段提供数据。
- 团队 ≥ 3 人且有合规需求时,完成 Phase 2——可搜索的审计日志是合规审查的硬性需求。
- 当 Agent 行为开始频繁变更(每周改 Prompt / 工具)时,启动 Phase 3——没有回归测试的快速迭代终将导致线上事故。
- 当 Agent 承载核心业务(付费用户依赖)时,启动 Phase 4——从被动响应到主动发现是 SLA 保障的必备能力。
每阶段的成果独立可用——不会因为停在 Phase 2 而让 Phase 1 的白费,也不会因为没有启动 Phase 4 而否定前三阶段的价值。这是路线图设计最重要的原则:每一步都产生独立价值,每一步都是下一步的坚实基础。
常见问题
1. 普通应用日志加上 trace_id 不就够了吗?为什么 Agent 还需要专门的审计日志?
不够。普通应用日志的 trace_id 只能追踪确定性代码路径——函数 A 调用了函数 B,你记录下调用链即可。但 Agent 的决策路径不同:LLM 在每一步「选择」调用哪个工具、传什么参数——这个过程是不确定的。
即使有了 trace_id,你仍然不知道:
- LLM 为什么选了 tool_a 而不是 tool_b?——普通日志没有 decision 事件类型
- 它当时看到了什么上下文?——普通日志不记录 prompt_summary 和 tool_choice rationale
- 有没有经过人工审批?谁审批的?——普通日志没有 approval 事件类型和 approver 字段
Agent 审计日志的核心增值在于:在 trace_id 的基础上,增加了 decision(为什么)、approval(谁批准)、tool_call(做了什么)三个语义层。这三个语义层是普通应用日志在设计时根本没有考虑的维度。
2. 审计日志应该记录多详细?工具调用的参数和返回值全记录吗?
原则:记录足够事后重建决策链,但要对敏感字段脱敏、对大数据截断。
具体来说:
- 工具调用参数——记录完整的 JSON 参数,但执行字段级脱敏。api_key、token、password 等敏感字段用
REDACTED替代,保留参数结构用于审计。 - 工具返回值——记录 HTTP status code 和结果摘要(如前 1,024 字符)。超过阈值的截断,并附加原始长度标记。
- LLM 决策上下文——记录 system prompt 摘要和 LLM 输出的 tool_choice + rationale,不记录完整的对话历史(通过 session_id 引用,对话历史在应用日志中已有)。
- 审批记录——记录审批人 ID、审批时间、审批时的上下文摘要、审批决策(approve/reject)。
不要记录完整的对话历史和用户原始输入到审计日志中——这不仅使审计日志体积暴增,还可能引入 PII 合规风险。如果应用日志中存储了完整的用户输入,应用日志同样需要脱敏和保留策略——PII 风险不因为「存在应用日志里」就消失。审计日志应保持精简和高信噪比,通过 session_id 引用应用日志中的完整上下文。
3. 如何确保审计日志本身不成为安全风险?
审计日志记录了大量的工具调用参数和返回值,如果存储不当,本身就是高风险数据集。三条核心防护措施:
1. 写入时脱敏。在事件进入日志管道之前,用字段级脱敏规则过滤 API key、token、PII 等敏感信息。脱敏应在应用层完成——不要依赖存储层的后处理,因为攻击者可能在数据落地前就已经读取了传输中的数据。
2. 存储加密 + 物理隔离。使用存储层 at-rest 加密、传输层加密、严格的索引/表级访问控制,对特别敏感的字段可选字段级加密。加密不能替代脱敏——先脱敏,再加密。审计日志应与普通应用日志物理隔离。普通开发者可以访问应用日志排查 Bug,但不应该能访问审计日志中完整的工具调用记录。
3. 访问控制。审计日志的读取权限应独立于应用日志,仅限安全审计人员、合规团队和 on-call 工程师。参见本系列的 Agent 运行时隔离了解审计日志存储安全与隔离环境的关系——同样的隔离原则适用于日志基础设施。
4. decision 和 tool_call 事件类型的边界在哪里?
简单说:decision 回答「为什么」,tool_call 回答「做了什么」。
在一次 Agent 推理循环中,两者的顺序和职责是:
- LLM 推理 → 输出 tool_choice + rationale
- record decision 事件:记录 LLM 选择哪个工具、为什么这样选择(rationale)、当前的 prompt 上下文摘要、LLM 提议的参数
- (可选)人工审批:如果工具需要审批,在此插入 approval 事件
- Agent 框架执行工具调用
- record tool_call 事件:记录实际调用的工具名、参数、返回结果、耗时、成功/失败状态
一个常见的困惑是:decision 事件中的 parameters 和 tool_call 事件中的 parameters 有什么区别?区别在于:decision 记录的是 LLM 提议的参数(可能在审批阶段被修改),tool_call 记录的是实际执行的参数。在大多数简单实现中,两者相同——但一旦引入审批流或参数改写逻辑,这个区别就变得至关重要。
5. Agent 审计日志和 OpenTelemetry 的 Span 是什么关系?
OpenTelemetry Span 提供了分布式追踪的基础设施——trace_id 和 span_id 的生成、传播和可视化。Agent 审计日志可以建立在 OTel Span 之上,但需要语义扩展:
- OTel Span 覆盖的部分:操作名称(对应 event_type)、开始/结束时间(对应 duration_ms)、属性键值对(对应 metadata)
- OTel Span 缺失的部分:LLM 决策理由(decision rationale)、审批链路(approver + approval_context)、工具参数的字段级脱敏、完整的工具调用参数和返回值(OTel 后端和导出器通常对属性/事件大小有限制)
建模选择——Span Events vs Child Span:decision 和 approval 推荐作为 Llm Span 的 Span Events(add_event())——低 cardinality,不产生独立的 span_id。长时间运行的工具调用(如 > 5s 的外部 API)可创建独立的 Child Span,以更好地展示调用树。实践中推荐的做法:将审计事件作为 OTel Span 的 Span Events 写入(用于可视化追踪),同时单独持久化一份完整的结构化审计日志(用于合规检索和回放)。这样既复用了 OTel 的 trace 基础设施,又保留了审计所需的完整语义——Span 只携带 ID 和摘要,关键信息存储在专用的审计日志后端中。
下一步阅读
📚 相关阅读
- Agent 工具设计最佳实践:从接口定义到错误处理的完整规范——工具调用是审计日志的核心记录对象
- Agent 评测框架设计:如何系统化评估 AI Agent 质量——审计日志为评测框架提供真实的 golden dataset
- Agent 错误恢复与自愈:当 Agent 搞砸了怎么办——审计日志是错误恢复的信息基础
- MCP 协议生产指南:Model Context Protocol 的安全部署——MCP 工具调用的审计集成模式
- Agent 运行时隔离:Docker、Firecracker、VM Sandbox 怎么选——审计日志的存储安全与隔离环境的关系