Agent 审计日志设计:如何追踪一次工具调用的完整链路

第 5 篇,共 6 篇

⚡ 30 秒要点

  • Agent 决策链与普通应用有本质区别——LLM 调用不确定、工具调用有副作用、审批链路是异步的,普通应用日志的三列式(时间+级别+消息)完全不够用
  • 审计日志需要 5 种事件类型:decision / tool_call / tool_result / approval / error,每条事件携带 8 个通用字段(trace_id、span_id、agent_id 等)+ 5 个事件特定字段(tool_name、approver 等)
  • 数据模型用 Pydantic 定义,序列化为 JSON 写入存储,trace_id 复用 OpenTelemetry 的 32 位 hex trace ID,可选 audit_event_id 用 UUID v7 实现时间可排序——为后续的回放、搜索和事故分析打下基础

📖 可引用的定义

Agent 审计日志是一套以 trace_id 为主键的结构化事件记录系统,用于完整追踪 Agent 在一次用户请求中的决策链(LLM 推理 → 工具选择 → 参数构造 → 人工审批 → 执行 → 结果返回)。与普通应用日志的区别在于:它记录的不是「代码做了什么」,而是「LLM 决定了什么、谁批准了、实际执行了什么」——即决策(why)与执行(what)的完整映射。

一、为什么 Agent 需要专门的审计日志

一个凌晨 2:37 的事故

凌晨 2:37,你的生产环境 Agent 服务的监控面板跳出一条告警——一条 DELETE 操作被记录了。没人审批过它,甚至没人醒着。第二天早上,团队花了 2 个小时翻 grep 日志、查数据库表变更、回放对话历史,才勉强拼凑出发生了什么:一位用户白天问了一句「帮我清理一下上周的临时数据」,LLM 在一次多步推理中把「清理」理解成了数据库 DELETE 操作,Agent 照做了——没有审批,没有二次确认,没有留下任何决策痕迹。

如果你的 Agent 有结构化审计日志——带 trace_id、带决策上下文、带完整的工具调用参数和审批链路——这个排查过程只需要 3 分钟

# 3 分钟定位:通过审计日志搜索 + 关联
# Step 1: 搜索 DELETE 操作(10 秒)
$ audit-log search --event-type tool_call --tool-name delete_records --since "2026-05-21T00:00:00Z"
trace_id: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
  agent_id: prod-agent-03
  session_id: sess_8f3a2b1c
  timestamp: 2026-05-22T02:37:14.231Z
  parameters: {"table": "user_data", "filter": "created_at < '2026-05-15'"}
  status: success
  duration_ms: 847

# Step 2: 沿 trace_id 展开完整决策链(20 秒)
$ audit-log trace 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
[decision]  LLM 选择工具: delete_records, 理由: "用户要求清理临时数据"
[approval]  ❌ 无审批记录 — 未配置审批流
[tool_call] delete_records → success (847ms, 删除了 12,403 行)

# Step 3: 定位根因 → 审批流缺失(2.5 分钟)

这就是审计日志的核心价值:把不可观测的 LLM 决策链,变成可搜索、可关联、可回放的结构化数据

Agent 决策链 vs 传统应用:本质差异

为什么普通的应用日志(INFO/WARN/ERROR + message)在 Agent 场景中不够用?因为 Agent 的决策链与传统应用有本质区别:

维度传统应用AI Agent
执行路径确定性代码分支(if/else)LLM 推理 + 动态工具选择(不确定)
可复现性相同输入 → 相同输出相同输入 → 可能不同输出(temperature / 模型版本)
副作用来源明确的函数调用LLM 决定调用哪个工具、传什么参数
责任归属代码作者 + Code ReviewLLM 决策 + 人工审批 + 安全策略
日志需求stack trace + error logtrace_id + decision context + approval chain + tool parameters
事故分析方式grep 错误日志 → 定位代码行trace_id 展开决策链 → 定位 LLM 决策 + 审批缺失

关键区别在于第三行和第四行:传统应用的副作用是代码写死的,Agent 的副作用是 LLM「决定」的。当一个 DELETE 被触发,传统应用中你能定位到 user_service.py:142 的代码行。但在 Agent 中,你需要回答的是:LLM 当时为什么选择了 delete_records 工具?它看到了什么上下文?有没有人审批过?

这三个问题,普通应用日志一个都回答不了。

为什么普通应用日志不够用

即使你已经建立了一套完善的应用日志体系(结构化日志 + ELK/Grafana),它们在 Agent 场景中至少存在三个结构性缺陷:

1. 缺少 trace_id 作为第一公民。在 Agent 场景中,一次用户请求会触发多次 LLM 调用,每次 LLM 调用可能触发多次工具调用。普通应用日志的 trace_id 通常只到请求级别——你知道「这次请求调了哪些服务」,但不知道「LLM 第 2 步推理后选择了工具 A,第 4 步推理后选择了工具 B」。Agent 需要的是请求 → LLM 推理 → 工具调用的三级嵌套追踪,trace_id 必须在每一步推理和每一次工具调用之间传递。

2. 缺少决策上下文。普通日志记录的是「发生了什么」——函数被调用了、数据库查询耗时 50ms。Agent 还需要记录「为什么发生」——LLM 的 tool_choice 理由、当时的 system prompt 摘要、触发审批的原因。这不是「nice to have」,而是事故分析的必需信息。凌晨 2:37 那个事故中,如果日志里有 LLM 的 decision rationale(「用户要求清理临时数据」),根因定位从 2 小时缩到 2 分钟。

3. 缺少审批链路。Agent 的高风险工具调用(DELETE、资金操作、配置变更)需要人工审批。普通日志没有「审批」这个语义——你最多看到一个 API 请求的 200 返回,但不知道这个请求是被审批通过的还是跳过了审批。审计日志需要将审批作为一等事件类型记录下来:谁审批的、什么时候、审批时看到了什么上下文。

审计日志的三个核心用途

理解了 Agent 的特殊性之后,审计日志的价值体现在三个核心场景中:

🔍 合规审计。当你的 Agent 处理用户数据、执行金融操作或涉及合规框架(SOC 2、HIPAA)时,审计员会问一个简单但致命的问题:「你能证明 Agent 的每一次 DELETE 操作都是被授权的吗?」没有审计日志,你的答案是「我们信任 LLM 的输出」——在大多数受监管或企业环境中,这个答案不太可能让审计员满意。结构化的审计追踪可以使授权、审批和事故证据的展示变得容易得多。有了审计日志,你可以拿出完整的 trace:哪次对话、哪个 decision、谁审批的、什么时间执行、操作了什么数据——一条链路,完整可验证

这就是为什么像 Agent 运行时隔离中讨论的 VM 级隔离在审计中更有说服力——硬件 VM 边界是熟悉且可独立验证的,同样,结构化的审计日志让 Agent 的决策过程从「黑箱」变成了「可审计的白箱」。

🐛 调试排错。Agent 的 Bug 不是 NullPointerException,而是「LLM 不知为什么选了一个不该选的操作」。调试这种 Bug 的传统方法是:复现对话 → 观察 LLM 输出 → 猜测 → 修改 prompt → 再试。审计日志将这个循环从「小时级」缩到「分钟级」:用 trace_id 直接拉出完整的决策链,几步之内定位到 LLM 在哪一步做出了错误决策。

这与 Agent 错误恢复中讨论的错误分类体系直接衔接——审计日志是错误恢复的信息基础。你不知道 Agent 在哪一步犯了错,就无从设计恢复策略。

🔄 回放测试。审计日志记录了完整的工具调用参数和返回值,这使它成为回归测试的天然数据源。你可以:提取历史 trace 中的工具调用序列 → 作为测试用例输入 → 在新的 Agent 版本上重放 → 对比决策是否一致。这就是「golden dataset」的自动化生成——不用手工编写测试用例,直接从生产日志中提取。

这个能力与 Agent 评测框架设计中讨论的评测数据管道直接对应——审计日志为评测框架提供了真实的、生产级的测试数据,而非人工构造的 toy example。

审计日志不是什么

在继续深入之前,明确一下边界:

二、审计日志的核心数据模型

数据模型是审计日志系统的根基。设计得过于简化,事故发生时查不到关键信息;设计得过于复杂,存储成本失控、写入性能受损。本节给出一个经过验证的五事件类型 + 8 个通用字段 + 5 个事件特定字段的最小可用模型。

五种事件类型

Agent 的一次工具调用链路可以分解为五个独立的事件类型,每个类型回答一个不同的问题:

事件类型回答的问题触发时机典型字段
decisionLLM 为什么选择了这个工具?LLM 返回 tool_choice 后,实际调用前tool_name, rationale, prompt_summary, model, temperature
tool_call工具调用发生了什么?工具执行完成后tool_name, parameters, result_summary, duration_ms, status
tool_result工具返回了什么?工具返回结果后(可与 tool_call 合并或独立)tool_name, result, result_size, is_error
approval谁批准了这次调用?人工审批完成后approver, approval_context, decision, timestamp
error什么出错了?任何阶段发生错误时error_type, error_message, stack_trace, recoverable

重要设计决策:tool_call 和 tool_result 是否合并?在大多数实现中,tool_result 作为 tool_call 的一个字段(result)出现,而非独立事件。这样做的理由是:一次工具调用天然包含「请求 + 响应」,分开存储增加了关联成本。但在工具调用有显著异步特性的场景(如调用一个需要数秒才返回的外部 API),将 tool_result 独立为事件有助于记录中间状态。本文推荐的默认方案是合并为一个 tool_call 事件,在需要异步追踪时再拆分为 tool_call(启动)+ tool_result(完成)。

每条日志的必填字段

审计事件包含 8 个通用字段(所有事件类型必填)和 5 个事件特定字段(按 event_type 条件必填):

▸ 通用字段(8 个——所有事件必填)
字段类型必填说明示例
timestampISO 8601事件发生的精确时间(UTC)2026-05-22T02:37:14.231Z
trace_idOTel 32-char hex整个用户请求的唯一标识(OTel trace ID),贯穿所有事件0af7651916cd43dd8448eb211c80319c
span_id64-bit hex单次 LLM 推理 span 的唯一标识(用于关联 OTel Span)a1b2c3d4e5f67890
parent_span_id64-bit hex | null父 span 的 ID(根 span 为 null);Span Event 无独立的 parent_span_id0000000000000001
agent_idstringAgent 实例的唯一标识prod-agent-03
session_idstring用户会话的唯一标识(一次对话可能有多个 trace)sess_8f3a2b1c
event_typeenum事件类型:decision | tool_call | tool_result | approval | errortool_call
statusenum事件结果:success | failure | pending_approval | rejected | timeoutsuccess
▸ 事件特定字段(5 个——按 event_type 条件必填)
字段类型必填说明示例
tool_namestring见说明工具名称。decision / tool_call / tool_result 必填delete_records
parametersJSON见说明工具调用参数。tool_call 必填,decision 可选(LLM 的 proposed params){"table": "user_data", "filter": "..."}
resultJSON | null见说明工具返回结果(截断/脱敏后)。tool_call / tool_result 必填{"deleted_rows": 12403}
approverstring | null见说明审批人标识。approval 事件必填,其他事件为 nulluser_zhang_wei
duration_msinteger见说明事件持续时间(毫秒)。tool_call 必填,decision / approval 可选847

字段设计遵循两个原则:(1)trace_id + span_id + parent_span_id 三级嵌套足以重建完整的调用树——从用户请求 → 每次 LLM 推理 → 每次工具调用;(2)工具调用的核心对象(tool_name、parameters、result)作为一级字段存储,而非埋在 JSON blob 中——这使得搜索「所有 DELETE 操作」只需一个字段级别的查询,而不需要全文搜索。

这与 Agent 工具设计最佳实践中的工具注册机制直接对应——审计日志中的 tool_name 应该与工具注册表中的名称一致,确保日志可追溯到具体的工具定义。

JSON Schema 与 Pydantic 模型

下面是完整的 Pydantic 模型定义。使用 Pydantic 的好处是:内置类型校验、JSON 序列化/反序列化、自动生成 JSON Schema——这些特性使审计事件的生成和消费都有强类型保障。

from __future__ import annotations

from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from uuid import uuid4

from pydantic import BaseModel, Field


class EventType(str, Enum):
    """审计事件类型"""
    DECISION = "decision"
    TOOL_CALL = "tool_call"
    TOOL_RESULT = "tool_result"
    APPROVAL = "approval"
    ERROR = "error"


class EventStatus(str, Enum):
    """事件执行状态"""
    SUCCESS = "success"
    FAILURE = "failure"
    PENDING_APPROVAL = "pending_approval"
    REJECTED = "rejected"
    TIMEOUT = "timeout"


class AuditEvent(BaseModel):
    """Agent 审计日志的通用事件模型。

    所有事件共享 8 个通用字段,另有 5 个按 event_type 条件必填的字段。
    详见下方 validators。
    """

    # ── 时间与追踪 ──
    timestamp: str = Field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat(),
        description="事件发生时间(ISO 8601, UTC)",
        examples=["2026-05-22T02:37:14.231Z"],
    )
    trace_id: str = Field(
        description="OTel trace ID(32-char hex),由 OTel SDK 生成,贯穿所有事件",
        examples=["0af7651916cd43dd8448eb211c80319c"],
    )
    audit_event_id: Optional[str] = Field(
        default=None,
        description="审计事件唯一标识(UUID v7,时间可排序,用于业务查询)",
        examples=["0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67"],
    )
    span_id: str = Field(
        description="当前 span 的唯一标识(64-bit hex)",
        examples=["a1b2c3d4e5f67890"],
    )
    parent_span_id: Optional[str] = Field(
        default=None,
        description="父 span ID。根 span 为 null",
        examples=["0000000000000001"],
    )

    # ── 身份与会话 ──
    agent_id: str = Field(
        description="Agent 实例标识",
        examples=["prod-agent-03"],
    )
    session_id: str = Field(
        description="用户会话标识",
        examples=["sess_8f3a2b1c"],
    )

    # ── 事件核心 ──
    event_type: EventType = Field(description="事件类型")
    status: EventStatus = Field(description="事件执行状态")

    # ── 工具调用相关(按 event_type 条件必填)──
    tool_name: Optional[str] = Field(
        default=None,
        description="工具名称。decision/tool_call/tool_result 必填",
        examples=["delete_records"],
    )
    parameters: Optional[dict[str, Any]] = Field(
        default=None,
        description="工具调用参数(JSON)。tool_call 必填,decision 可选",
        examples=[{"table": "user_data", "filter": "created_at < '2026-05-15'"}],
    )
    result: Optional[dict[str, Any]] = Field(
        default=None,
        description="工具返回结果(截断/脱敏后)。tool_call/tool_result 必填",
        examples=[{"deleted_rows": 12403}],
    )

    # ── 审批与错误 ──
    approver: Optional[str] = Field(
        default=None,
        description="审批人标识。approval 事件必填",
        examples=["user_zhang_wei"],
    )
    error_message: Optional[str] = Field(
        default=None,
        description="错误信息。error 事件必填",
    )
    error_type: Optional[str] = Field(
        default=None,
        description="错误类型(如 TimeoutError, ValidationError)",
    )

    # ── 性能与上下文 ──
    duration_ms: int = Field(
        default=0,
        ge=0,
        description="事件持续时间(毫秒)",
        examples=[847],
    )
    metadata: dict[str, Any] = Field(
        default_factory=dict,
        description="扩展元数据(model, temperature, prompt_summary 等)",
    )

    # ── 序列化与工厂方法 ──

    def to_json(self, indent: int | None = None) -> str:
        """序列化为 JSON 字符串。"""
        return self.model_dump_json(indent=indent, exclude_none=True)

    @classmethod
    def from_json(cls, data: str) -> "AuditEvent":
        """从 JSON 字符串反序列化。"""
        return cls.model_validate_json(data)

    @classmethod
    def new_audit_event_id(cls) -> str:
        """生成 UUID v7 作为 audit_event_id(时间可排序,用于业务查询)。

        实际项目应使用 uuid6 或 uuid7 库。
        这里给出 UUID v4 作为可运行的简化版本。
        trace_id 由 OTel SDK 生成(32-char hex),由 Agent 框架注入。
        """
        return str(uuid4())


# ── 使用示例 ──

# 创建一个 tool_call 事件(trace_id 由 OTel SDK 注入)
event = AuditEvent(
    trace_id="0af7651916cd43dd8448eb211c80319c",  # OTel trace ID (32-char hex)
    audit_event_id=AuditEvent.new_audit_event_id(),  # UUID v7, for business queries
    span_id="a1b2c3d4e5f67890",
    parent_span_id="0000000000000001",
    agent_id="prod-agent-03",
    session_id="sess_8f3a2b1c",
    event_type=EventType.TOOL_CALL,
    status=EventStatus.SUCCESS,
    tool_name="delete_records",
    parameters={"table": "user_data", "filter": "created_at < '2026-05-15'"},
    result={"deleted_rows": 12403},
    duration_ms=847,
)

# 也可以直接用 OTel SDK 获取当前 trace_id:
# from opentelemetry import trace
# trace_id = format(span.get_span_context().trace_id, '032x')

# 序列化
json_str = event.to_json(indent=2)
print(json_str)

# 反序列化
restored = AuditEvent.from_json(json_str)
assert restored.tool_name == "delete_records"
assert restored.status == EventStatus.SUCCESS

模型设计的几个关键决策点:

1. timestamp 用字符串而非 datetime 对象。存储 ISO 8601 UTC 字符串可避免跨序列化器和下游存储系统的歧义。如果使用 datetime 对象,请显式配置序列化行为。

2. trace_id 复用 OTel trace ID。使用 OpenTelemetry SDK 生成的 32-char hex trace_id 作为审计日志的 trace_id,确保与 Jaeger/Tempo 中的 trace 完全一致,可跨系统关联。UUID v7 仍可作为可选的 audit_event_id(时间可排序,用于业务查询和分区)。UUID v7 的前 48 位是毫秒级时间戳,提升了写入局部性并使 ID 大致按时间可排序,但 timestamp 仍然是分区、保留策略和查询过滤的一等字段。生产环境生成 UUID v7 建议使用 uuid6uuid7 Python 库。

3. Pydantic 条件必填校验。上述 AuditEvent 模型是存储 schema——它声明了所有字段供序列化使用。生产代码应为特定 event_type 添加条件校验。以下 model_validator 示例展示了核心模式:

from pydantic import model_validator

class AuditEvent(BaseModel):
    # ... 字段定义同上 ...

    @model_validator(mode="after")
    def validate_conditional_fields(self):
        """根据 event_type 校验条件必填字段。"""
        et = self.event_type
        if et in (EventType.DECISION, EventType.TOOL_CALL, EventType.TOOL_RESULT):
            if not self.tool_name:
                raise ValueError(f"{et.value} 事件必须提供 tool_name")
        if et == EventType.TOOL_CALL:
            if self.parameters is None:
                raise ValueError("tool_call 事件必须提供 parameters")
            if self.duration_ms is None:
                raise ValueError("tool_call 事件必须提供 duration_ms")
        if et == EventType.APPROVAL:
            if not self.approver:
                raise ValueError("approval 事件必须提供 approver")
        if et == EventType.ERROR:
            if not self.metadata.get("error_type"):
                raise ValueError("error 事件必须在 metadata 中提供 error_type")
        return self

4. result 和 parameters 需要截断和脱敏。代码示例中的 resultparameters 是完整值,但在实际写入日志存储之前需要经过脱敏管道。下面的代码展示了脱敏的集成方式:

import re
from typing import Any

# 敏感字段模式(需要脱敏的 key 名)
_SENSITIVE_KEY_PATTERNS = re.compile(
    r"(api_key|token|password|secret|credential|auth)",
    re.IGNORECASE,
)

# 结果截断的最大字符数
_MAX_RESULT_LENGTH = 1024


def sanitize_parameters(params: dict[str, Any]) -> dict[str, Any]:
    """对工具调用参数进行字段级脱敏。"""
    sanitized = {}
    for key, value in params.items():
        if _SENSITIVE_KEY_PATTERNS.search(key):
            sanitized[key] = "REDACTED"
        elif isinstance(value, dict):
            sanitized[key] = sanitize_parameters(value)
        elif isinstance(value, list):
            sanitized[key] = [
                sanitize_parameters(v) if isinstance(v, dict) else v
                for v in value
            ]
        else:
            sanitized[key] = value
    return sanitized


def truncate_result(result: Any, max_length: int = _MAX_RESULT_LENGTH) -> Any:
    """截断过长的工具返回结果。"""
    if isinstance(result, str) and len(result) > max_length:
        return result[:max_length] + f"... [truncated, total {len(result)} chars]"
    if isinstance(result, dict):
        return {k: truncate_result(v, max_length) for k, v in result.items()}
    if isinstance(result, list):
        return [truncate_result(v, max_length) for v in result[:10]]
    return result


# 使用示例:在写入日志前脱敏
raw_params = {"api_key": "sk-abc123", "table": "user_data"}
safe_params = sanitize_parameters(raw_params)
# → {"api_key": "REDACTED", "table": "user_data"}

raw_result = "x" * 5000
safe_result = truncate_result(raw_result)
# → "xxxx...xxxx [truncated, total 5000 chars]"

这个脱敏管道应该在事件对象构建之后、写入存储之前执行。不要试图在工具调用层面做脱敏——工具返回的是业务需要的完整数据,审计日志的脱敏是审计层的职责。

与 MCP 协议的集成点

如果你的 Agent 使用 MCP 协议(Model Context Protocol)来管理工具调用,审计日志的事件模型可以直接映射到 MCP 的调用生命周期:MCP 的 tools/call 请求对应 audit 的 tool_call 事件,MCP 的 tools/list 可以提供当前可用工具的注册信息作为审计日志的参考元数据。

一个推荐的集成模式是:在 MCP server 的工具调用处理器中嵌入审计日志钩子,使得每一次 MCP 工具调用都自动产生一条审计事件——无需 Agent 应用层额外编码。

三、Trace ID 与分布式追踪

Section 2 给出了审计日志的数据模型,但模型中的 trace_idspan_id 字段只有在正确生成和传播时才有意义。本节深入这两个字段的工程细节:生成策略怎么选、Span 如何嵌套、以及如何用 OpenTelemetry 在 Python 中实现 trace context 的全程传递。

Trace ID 的生成策略对比

trace_id 的生成看似简单——调个 UUID 库就行——但选择错误的策略会导致分布式追踪断裂或存储效率下降。推荐模型:OTel-native。trace_id 使用 OpenTelemetry SDK 生成的 32-char hex trace ID(与 Jaeger/Tempo 完全兼容),同时用 UUID v7 作为可选的 audit_event_id(时间可排序,用于业务查询和分区)。

策略结构长度时间可排序全局唯一冲突概率适用场景
OTel trace_id128-bit,32 位 hex 字符32 字符❌ 不可排序(随机的)✅ OTel SDK 保证唯一10-36trace_id 的推荐方案——兼容 Jaeger/Tempo
UUID v7前 48 位:毫秒时间戳
后 74 位:随机数
36 字符✅ 时间可排序✅ 极低冲突10-17audit_event_id / correlation_id——用于业务查询和分区
Snowflake41 位时间戳 + 10 位机器 ID + 12 位序列号19 位整数✅ 时间可排序⚠️ 依赖机器 ID 唯一性0(单机)超高吞吐、需要整数 ID 的场景

为什么推荐 OTel-native 模型:

1. 跨系统兼容。使用 OTel trace_id 可确保审计日志中的 trace_id 与 Jaeger、Grafana Tempo、ClickHouse 等后端中的 trace 完全一致——无需维护 ID 映射表。从 Jaeger 中发现问题 trace → 复制 trace_id → 在审计日志存储中直接检索——一步完成。

2. UUID v7 作为 audit_event_id 的补充价值。每条审计事件仍然需要一个独立的时间可排序 ID。UUID v7 的前 48 位是毫秒级时间戳,这意味着按 audit_event_id 排序的结果天然按时间排列——提升了存储的写入局部性。UUID v7 去中心化生成,无需协调,在多区域部署中优势明显。

3. 如果未使用 OTel。如果团队没有 OTel 基础设施,UUID v7 可作为自定义 trace_id 使用——它本身是一个很好的时间可排序相关 ID。但一旦后续引入 OTel,建议迁移到 OTel-native 模型。

下面的 Python 代码展示了 UUID v7 的生成和解析——用于 audit_event_id(每条审计事件的唯一 ID):

"""UUID v7 生成与解析示例。

用于生成 audit_event_id——每条审计事件的唯一时间可排序 ID。
实际生产环境推荐使用 'uuid7' 库(pip install uuid7)。
以下代码展示了核心原理——前 48 位毫秒时间戳的提取和验证。
"""

import time
import os


def generate_uuid_v7() -> str:
    """生成 UUID v7(用于 audit_event_id)。

    格式: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
           ^^^^^^^^ ^^^^ 毫秒时间戳(前 48 位)
    生产环境: pip install uuid7 → uuid7.uuid7()
    """
    # 注意:这是原理演示,生产环境请使用标准库
    try:
        from uuid_extensions import uuid7  # 第三方库
        return str(uuid7())
    except ImportError:
        # 回退方案:手动构造(仅用于演示理解结构)
        ts = int(time.time() * 1000)  # 毫秒时间戳
        rand = os.urandom(10)         # 74 位随机数
        ts_hex = f"{ts:012x}"         # 48 位 = 12 个 hex 字符
        rand_hex = rand.hex()[:20]    # 取前 20 个 hex 字符
        # 格式化为标准 UUID v7(注入版本号和变体标记)
        raw = ts_hex + rand_hex
        return (
            f"{raw[0:8]}-{raw[8:12]}-7{raw[13:16]}-"
            f"{'8' if int(raw[16], 16) >= 8 else raw[16]}{raw[17:20]}-{raw[20:32]}"
        )


def extract_timestamp_from_uuidv7(uuid_str: str) -> int:
    """从 UUID v7 中提取毫秒时间戳。

    可用于:日志排序、时间范围查询、存储分区。
    """
    # UUID v7: 前 8+4=12 个 hex 字符 = 48 位时间戳
    ts_hex = uuid_str[:8] + uuid_str[9:13]
    return int(ts_hex, 16)


# ── 使用示例 ──

audit_event_id = generate_uuid_v7()
print(f"Audit Event ID:  {audit_event_id}")
# → Audit Event ID: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67

ts = extract_timestamp_from_uuidv7(audit_event_id)
print(f"Timestamp: {ts} ms since epoch")
# → Timestamp: 1747888634231 ms since epoch

print(f"Readable:  {time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(ts/1000))}")
# → Readable: 2026-05-22T02:37:14

UUID v7 提升了写入局部性并使 audit_event_id 大致按时间可排序,但 timestamp 仍然是分区、保留策略和查询过滤的一等字段——不要完全依赖 ID 排序替代时间索引。

Span 嵌套模型:三级调用树

有了 trace_id 之后,下一个问题是:如何用它组织调用树?Agent 的一次用户请求天然具备三级嵌套结构:

用户请求 (trace_id, root span)
│
├── LLM 推理 #1 (span_id=a1, parent_span_id=null)
│   ├── decision: 选择工具 search_docs
│   ├── tool_call: search_docs (span_id=a2, parent_span_id=a1)
│   │   └── tool_result: {"documents": [...]}
│   └── LLM 收到结果,继续推理
│
├── LLM 推理 #2 (span_id=a3, parent_span_id=null ← 同为 root 的子节点)
│   ├── decision: 选择工具 delete_records(需要审批)
│   ├── approval: user_zhang_wei 审批通过 (span_id=a4, parent_span_id=a3)
│   └── tool_call: delete_records (span_id=a5, parent_span_id=a3)
│       └── tool_result: {"deleted_rows": 12403}
│
└── LLM 最终回复给用户

这个三级结构映射到 Span 字段上:

层级span_idparent_span_id对应事件类型示例
L1: 用户请求root span(如 0000000000000001null—(trace 容器,不产生事件)用户发送「帮我清理临时数据」
L2: LLM 推理推理 span(如 a1b2c3d4e5f678910000000000000001decisionLLM 选择 delete_records,理由是「用户要求清理」
L3: 工具调用/审批工具 span(如 f1e2d3c4b5a67892a1b2c3d4e5f67891tool_call / approval实际执行 delete_records,或审批链路

关键设计决策:LLM 推理 Span 之间的兄弟关系。在上面的示例中,LLM 推理 #1 和 #2 的 parent_span_id 都是 null(或指向同一个 root span)——它们是兄弟关系,而非父子关系。为什么不是嵌套?因为每次 LLM 推理是顺序执行的:推理 #1 完成后,Agent 框架收到工具返回值,再进行推理 #2。它们是同一请求中的两个独立步骤,不是嵌套调用的关系。把它们设为兄弟关系保留了时间线的线性特征,也更利于在追踪 UI 中按时间顺序查看。

相比之下,工具调用的 Span 是 LLM 推理 Span 的子节点——因为工具调用是由那次推理触发的,它是对推理决策的执行,天然具有父子关系。

与 OpenTelemetry 的集成

OpenTelemetry(OTel)已经是分布式追踪的事实标准。Agent 审计日志不应该重新发明 trace context 传递机制——而是应该在 OTel 的基础上增加 Agent 特有的语义层。集成方案如下:

架构层次:

┌─────────────────────────────────────────┐
│           Agent 审计事件层               │
│  (AuditEvent: decision, tool_call, ...)  │  ← 本文的核心
├─────────────────────────────────────────┤
│         OpenTelemetry Span 层            │
│  (trace_id, span_id, parent_span_id)     │  ← 复用 OTel 基础设施
├─────────────────────────────────────────┤
│           OTel SDK / Exporter           │
│  (OTLP → Jaeger / Tempo / 自定义后端)    │  ← 标准导出管道
└─────────────────────────────────────────┘

具体来说:

代码实现:Python OTel SDK 集成

下面的代码展示了 Agent 审计日志与 OpenTelemetry 的完整集成——从初始化、到 Span 创建、到 trace_id 在工具调用中的全程传递。

Step 1: OpenTelemetry 初始化配置

"""OpenTelemetry 初始化模块。

在 Agent 服务启动时调用一次 init_otel(),
配置 TracerProvider、Span Processor 和 Exporter。
"""

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource


def init_otel(
    service_name: str = "agent-service",
    otlp_endpoint: str = "http://localhost:4317",
    console_debug: bool = False,
) -> trace.TracerProvider:
    """初始化 OpenTelemetry SDK。

    Args:
        service_name: 服务名称,会出现在所有 Span 的 resource 属性中。
        otlp_endpoint: OTLP Collector 的 gRPC 地址。
        console_debug: 开发环境启用 Console Exporter 便于调试。

    Returns:
        配置完成的 TracerProvider 实例。

    生产环境最佳实践:
    - 使用 OTLPSpanExporter 导出到 Jaeger / Grafana Tempo
    - 使用 BatchSpanProcessor 避免每个 Span 都触发网络请求
    - Resource 中附加 service.version、deployment.environment 用于区分环境
    """

    # 创建 Resource:标识服务身份
    resource = Resource.create({
        SERVICE_NAME: service_name,
        "service.version": "2.3.1",
        "deployment.environment": "production",
    })

    # 创建 TracerProvider 并绑定 Resource
    provider = TracerProvider(resource=resource)

    # OTLP gRPC Exporter → Jaeger / Tempo / 任何 OTLP 兼容后端
    otlp_exporter = OTLPSpanExporter(
        endpoint=otlp_endpoint,
        insecure=True,  # 生产环境应使用 TLS
    )
    provider.add_span_processor(BatchSpanProcessor(otlp_exporter))

    # 开发环境:同时输出到控制台
    if console_debug:
        provider.add_span_processor(
            BatchSpanProcessor(ConsoleSpanExporter())
        )

    # 设置为全局 TracerProvider
    trace.set_tracer_provider(provider)

    return provider


# ── 获取 Tracer ──

def get_tracer(name: str = __name__) -> trace.Tracer:
    """获取 OpenTelemetry Tracer 实例。"""
    return trace.get_tracer(name)


# ── 使用示例 ──

if __name__ == "__main__":
    init_otel(console_debug=True)
    tracer = get_tracer("agent-audit-demo")
    print(f"OTel initialized. Tracer: {tracer}")
    # → OTel initialized. Tracer: <opentelemetry.sdk.trace.Tracer ...>

Step 2: 自定义 Span 创建与审计属性注入

"""Agent 审计专用 Span 工具。

在 OTel Span 的基础上注入 Agent 特有的审计语义——
decision、tool_call、approval 每种事件对应一个 Span,
审计字段作为 Span Attributes 写入。
"""

from __future__ import annotations

import json
import time
from contextlib import contextmanager
from typing import Any, Optional

from opentelemetry import trace
from opentelemetry.trace import Span, SpanKind, Status, StatusCode


# ── 全局 Tracer ──
tracer = trace.get_tracer("agent-audit")


# ── 常量化审计属性 Key ──

class AuditAttr:
    """审计日志的 OTel Span 属性 Key 常量。

    使用常量避免字符串硬编码导致的不一致。
    """
    AGENT_ID = "agent.id"
    SESSION_ID = "agent.session_id"
    EVENT_TYPE = "agent.event_type"
    TOOL_NAME = "agent.tool_name"
    PARAMETERS = "agent.parameters"
    RESULT = "agent.result"
    RESULT_SIZE = "agent.result_size"
    APPROVER = "agent.approver"
    APPROVAL_CONTEXT = "agent.approval_context"
    RATIONALE = "agent.rationale"
    MODEL = "agent.model"
    TEMPERATURE = "agent.temperature"
    PROMPT_SUMMARY = "agent.prompt_summary"
    DURATION_MS = "agent.duration_ms"
    STATUS = "agent.status"
    ERROR_TYPE = "agent.error_type"
    ERROR_MESSAGE = "agent.error_message"


# ── Span 创建工厂 ──

def create_llm_span(
    agent_id: str,
    session_id: str,
    model: str,
    temperature: float,
    prompt_summary: str = "",
    parent_span: Optional[Span] = None,
) -> Span:
    """创建一个 LLM 推理 Span。

    在 Agent 每次调用 LLM 时创建,作为 tool_call Span 的父 Span。
    """
    ctx = trace.set_span_in_context(parent_span) if parent_span else None
    span = tracer.start_span(
        "agent.llm.reasoning",
        context=ctx,
        kind=SpanKind.INTERNAL,
        attributes={
            AuditAttr.AGENT_ID: agent_id,
            AuditAttr.SESSION_ID: session_id,
            AuditAttr.MODEL: model,
            AuditAttr.TEMPERATURE: temperature,
            AuditAttr.PROMPT_SUMMARY: prompt_summary,
        },
    )
    return span


def record_decision(
    span: Span,
    tool_name: str,
    rationale: str,
    proposed_params: Optional[dict[str, Any]] = None,
):
    """在 LLM Span 上记录 decision 事件。

    调用时机:LLM 返回 tool_choice 后、实际执行工具前。

    Args:
        span: 当前的 LLM 推理 Span。
        tool_name: LLM 选择的工具名称。
        rationale: LLM 的决策理由(从 tool_choice 中提取)。
        proposed_params: LLM 提议的工具参数。
    """
    attrs = {
        AuditAttr.EVENT_TYPE: "decision",
        AuditAttr.TOOL_NAME: tool_name,
        AuditAttr.RATIONALE: rationale,
    }
    if proposed_params:
        attrs[AuditAttr.PARAMETERS] = json.dumps(proposed_params, ensure_ascii=False)

    span.add_event("agent.decision", attributes=attrs)


def record_tool_call(
    span: Span,
    tool_name: str,
    parameters: dict[str, Any],
    result: Any,
    duration_ms: int,
    status: str,
    error_type: Optional[str] = None,
    error_message: Optional[str] = None,
):
    """在 Span 上记录 tool_call 事件(合并 tool_result)。

    调用时机:工具执行完成后。

    Args:
        span: 父 LLM 推理 Span(tool_call 作为其 Event 而非独立 Span)。
             如需独立 Span,将 tool_call 创建为 span 的子 Span。
        tool_name: 工具名称。
        parameters: 实际执行的参数(已脱敏)。
        result: 工具返回结果(已截断)。
        duration_ms: 工具执行耗时(毫秒)。
        status: success | failure | timeout。
    """
    attrs = {
        AuditAttr.EVENT_TYPE: "tool_call",
        AuditAttr.TOOL_NAME: tool_name,
        AuditAttr.PARAMETERS: json.dumps(parameters, ensure_ascii=False),
        AuditAttr.RESULT: json.dumps(result, ensure_ascii=False),
        AuditAttr.DURATION_MS: str(duration_ms),
        AuditAttr.STATUS: status,
    }
    if error_type:
        attrs[AuditAttr.ERROR_TYPE] = error_type
    if error_message:
        attrs[AuditAttr.ERROR_MESSAGE] = error_message

    span.add_event("agent.tool_call", attributes=attrs)


def record_approval(
    span: Span,
    tool_name: str,
    approver: str,
    decision: str,
    approval_context: str = "",
):
    """在 Span 上记录 approval 事件。

    调用时机:人工审批完成后。

    Args:
        span: 当前的 LLM 推理 Span。
        tool_name: 需要审批的工具名称。
        approver: 审批人标识。
        decision: approved | rejected。
        approval_context: 审批时的上下文摘要。
    """
    attrs = {
        AuditAttr.EVENT_TYPE: "approval",
        AuditAttr.TOOL_NAME: tool_name,
        AuditAttr.APPROVER: approver,
        AuditAttr.STATUS: decision,
    }
    if approval_context:
        attrs[AuditAttr.APPROVAL_CONTEXT] = approval_context

    span.add_event("agent.approval", attributes=attrs)


# ── 上下文管理器:自动 Span 生命周期管理 ──

@contextmanager
def agent_trace(
    agent_id: str,
    session_id: str,
    model: str = "claude-sonnet-4-20250514",
    temperature: float = 0.0,
):
    """Agent 请求的顶层 trace 上下文管理器。

    用法:
        with agent_trace("prod-agent-03", "sess_8f3a2b1c") as span:
            trace_id = span.get_span_context().trace_id
            # ... Agent 推理循环 ...
            record_decision(span, "search_docs", "用户要求查询文档")
    """
    span = tracer.start_span(
        "agent.request",
        kind=SpanKind.SERVER,
        attributes={
            AuditAttr.AGENT_ID: agent_id,
            AuditAttr.SESSION_ID: session_id,
            AuditAttr.MODEL: model,
            AuditAttr.TEMPERATURE: temperature,
        },
    )
    try:
        yield span
        span.set_status(Status(StatusCode.OK))
    except Exception as e:
        span.set_status(Status(StatusCode.ERROR, str(e)))
        span.record_exception(e)
        raise
    finally:
        span.end()

Step 3: trace_id 全程传递——完整 Agent 推理示例

"""完整的 Agent 推理循环示例。

展示 trace_id 从 Agent 入口 → LLM 推理 → 工具调用 → 审批的全程传递。
每个步骤都产生对应的审计事件,并写入 OTel Span。
"""

import json
import time
from opentelemetry.trace import format_trace_id


def agent_reasoning_loop(
    agent_id: str,
    session_id: str,
    user_message: str,
):
    """Agent 一次完整的推理循环。

    模拟:用户请求 → LLM 推理 #1 → 工具调用 → LLM 推理 #2 → 审批 → 工具调用

    trace_id 在 with agent_trace() 内部自动生成和传播——
    所有 Span 和 Span Event 共享同一个 trace_id。
    """
    with agent_trace(agent_id, session_id) as root_span:

        # 获取 trace_id(16 字节 → 32 hex 字符)
        trace_id_bytes = root_span.get_span_context().trace_id
        trace_id_hex = format_trace_id(trace_id_bytes)
        print(f"\n═══ Trace: {trace_id_hex} ═══")
        print(f"User: {user_message}\n")

        # ── LLM 推理 #1:选择工具 ──
        llm_span_1 = create_llm_span(
            agent_id=agent_id,
            session_id=session_id,
            model="claude-sonnet-4-20250514",
            temperature=0.0,
            prompt_summary="用户要求搜索产品文档",
            parent_span=root_span,
        )

        # 模拟 LLM 决策
        decision_start = time.time()
        tool_choice = "search_docs"
        rationale = "用户询问产品功能,需要搜索相关文档"
        proposed_params = {"query": "产品功能介绍", "limit": 5}
        time.sleep(0.3)  # 模拟 LLM 推理耗时
        decision_duration = int((time.time() - decision_start) * 1000)

        record_decision(llm_span_1, tool_choice, rationale, proposed_params)
        print(f"  [decision] → {tool_choice}: {rationale} ({decision_duration}ms)")

        # ── 工具调用 #1:search_docs ──
        tool_start = time.time()
        # 模拟工具执行
        tool_result = {
            "documents": [
                {"id": "doc_001", "title": "产品功能介绍", "relevance": 0.95},
                {"id": "doc_002", "title": "API 使用指南", "relevance": 0.87},
            ],
            "total": 2,
        }
        time.sleep(0.15)  # 模拟工具执行耗时
        tool_duration = int((time.time() - tool_start) * 1000)

        record_tool_call(
            llm_span_1,
            tool_name="search_docs",
            parameters=proposed_params,
            result=tool_result,
            duration_ms=tool_duration,
            status="success",
        )
        print(f"  [tool_call] search_docs → success ({tool_duration}ms)")

        llm_span_1.end()

        # ── LLM 推理 #2:高风险操作,需审批 ──
        llm_span_2 = create_llm_span(
            agent_id=agent_id,
            session_id=session_id,
            model="claude-sonnet-4-20250514",
            temperature=0.0,
            prompt_summary="用户要求清理临时数据",
            parent_span=root_span,
        )

        decision_start = time.time()
        tool_choice_2 = "delete_records"
        rationale_2 = "用户明确要求'清理临时数据',匹配到 delete_records 工具"
        proposed_params_2 = {
            "table": "user_data",
            "filter": "created_at < '2026-05-15'",
        }
        time.sleep(0.25)
        decision_duration_2 = int((time.time() - decision_start) * 1000)

        record_decision(llm_span_2, tool_choice_2, rationale_2, proposed_params_2)
        print(f"  [decision] → {tool_choice_2}: {rationale_2} ({decision_duration_2}ms)")

        # ── 审批环节 ──
        print(f"  ⏳ 等待审批: {tool_choice_2}...")
        time.sleep(0.5)  # 模拟审批等待

        record_approval(
            llm_span_2,
            tool_name=tool_choice_2,
            approver="user_zhang_wei",
            decision="approved",
            approval_context="确认删除的是 5 月 15 日前的临时数据,共 12,403 行",
        )
        print(f"  [approval] → approved by user_zhang_wei")

        # ── 工具调用 #2:delete_records ──
        tool_start = time.time()
        # 模拟工具执行
        tool_result_2 = {
            "deleted_rows": 12403,
            "affected_tables": ["user_data"],
            "duration_ms": 847,
        }
        time.sleep(0.2)
        tool_duration_2 = int((time.time() - tool_start) * 1000)

        record_tool_call(
            llm_span_2,
            tool_name=tool_choice_2,
            parameters=proposed_params_2,
            result=tool_result_2,
            duration_ms=tool_duration_2,
            status="success",
        )
        print(f"  [tool_call] {tool_choice_2} → success ({tool_duration_2}ms)")

        llm_span_2.end()

        print(f"\n  ✅ Agent 响应: 已清理 12,403 条临时数据记录。")

    # 退出 with 块 → root_span.end() 自动执行
    # BatchSpanProcessor 在后台将 Span 导出到 OTLP Collector
    print(f"\n  📤 Trace {trace_id_hex} 已提交到 OTLP Collector")


# ── 运行示例 ──

if __name__ == "__main__":
    # 初始化 OTel(开发模式:控制台输出)
    init_otel(console_debug=True)

    # 模拟一次完整的 Agent 请求
    agent_reasoning_loop(
        agent_id="prod-agent-03",
        session_id="sess_8f3a2b1c",
        user_message="帮我搜索产品功能介绍,然后清理上周的临时数据",
    )

代码的三个关键设计点:

1. Span 的生命周期管理。agent_trace() 上下文管理器保证 root Span 一定会被 end()——即使 Agent 在执行过程中抛出异常。这是 OTel SDK 的最佳实践,避免 Span 泄漏导致追踪数据丢失。

2. Span Event vs Child Span 模型选择。两种建模方式各有适用场景:

维度Span Events 模型Child Span 模型
实现方式span.add_event("agent.decision", ...) 记录在 LLM span 上tracer.start_span("tool_call", ...) 创建独立子 span
span_id共享父 LLM span 的 span_id
Span Event 无独立 span_id
每个事件有独立 span_id / parent_span_id
Cardinality低——挂在一个 span 下高——每个工具调用产生一个独立 span
Jaeger 可读性好——不会 span 膨胀差——一个 200ms 推理产生多个 span,层级过深
回放树重建需要从 event attributes 提取天然有清晰的父子关系
推荐场景decision / approval——决策和审批长时间工具调用(如 > 5s 的外部 API)

本文代码采用 Span Events 模型:decision、tool_call、approval 作为 LLM 推理 Span 的 add_event()。一次 LLM 推理可能在 200ms 内产生三个事件——如果每个都作为独立 Span,会在 Jaeger 中产生难以阅读的层级膨胀。Span Event 保持了追踪树的可读性,同时保留了完整的审计字段。对于长时间运行的工具调用(如调用外部 API 耗时 > 5s),可创建独立的 Child Span 以更好地展示调用树。

3. trace_id 的提取。format_trace_id() 将 OTel SDK 内部的 16 字节 trace_id 转换为标准的 32 字符 hex 字符串——这个字符串就是你写入 AuditEvent.trace_id 的值。它确保审计日志存储中的 trace_id 和 Jaeger/Tempo 中的 trace_id 完全一致,可以跨系统关联。

自建 trace vs OTel vs 商业方案:开销对比

最后,一个实际工程决策:trace 基础设施是自己从头建、用 OpenTelemetry 开源方案、还是采购商业平台?

维度自建 traceOpenTelemetry 开源商业方案
(Datadog/LangSmith)
初始投入高——需要实现 trace ID 生成、传播、收集、存储、查询全链路中——OTel SDK 开箱即用,但需自建 Jaeger/Tempo 后端低——SDK 接入即可,后端托管
维护成本极高——自己修的 Bug 自己扛,无社区支持中——Jaeger/Tempo 的运维(存储、扩容、升级),社区活跃低——运维外包,但费用随规模线性增长
Agent 审计语义完全灵活——想加什么字段加什么需要自定义——Span Attributes + Span Events 承载审计字段,需自行设计 Key 规范部分支持——LangSmith 原生支持 LLM tracing,Datadog 的 LLM Observability 有专门的 Span 类型
存储成本取决于自建方案(PostgreSQL/ClickHouse)取决于后端选择(Elasticsearch 贵,ClickHouse 便宜)按 Span 数量或数据量计费,高吞吐下成本飙升
查询能力完全自定义Jaeger UI 适合追踪可视化;复杂审计查询需要额外工具(如直接查存储)内置丰富的查询和可视化,但受限于平台能力边界
合规与数据主权完全受控——数据不出自己的 VPC完全受控——开源方案私有化部署受限于——数据存储在第三方平台,需评估 SOC 2 / GDPR 合规性
推荐场景不推荐——除非有特殊合规需求且团队 > 10 人推荐——大部分 Agent 团队的最佳平衡点早期快速验证或需要 Turnkey 方案的大型团队

推荐路径:对于大多数构建生产级 Agent 的团队,OpenTelemetry 开源方案是最佳平衡点。它提供了标准化的 trace_id/span_id 生成和传播,生态成熟(Jaeger、Grafana Tempo、ClickHouse 后端均有现成集成),同时保留了数据的完全控制权。在此基础之上,本文描述的审计事件模型(Section 2 的 Pydantic AuditEvent + Section 3 的 Span Event 注入)作为 OTel 之上的语义扩展层——既复用了分布式追踪的基础设施,又填补了 LLM 决策审计的语义空白。

商业方案(LangSmith、Datadog LLM Observability)在早期原型阶段或团队缺乏基础设施运维能力时具有吸引力,但需仔细评估长期成本——一个中等规模的 Agent 系统每天可能产生数百万个 Span,商业平台的按量计费模型在规模化后可能成为意外的预算黑洞。此外,审计日志的合规要求(数据不能离开自有基础设施)在很多行业(金融、医疗)中是硬性约束,商业平台的 SaaS 部署模式天然不满足。

自建方案在 2026 年几乎不应该被考虑——OTel 已经成为行业标准,重新实现 trace context 传播相当于重新发明 TCP。唯一的例外是极端合规场景(如军方或情报系统),但这些场景不在本文的讨论范围内。

四、日志存储与保留策略

数据模型和 trace_id 基础设施就绪之后,下一个必须回答的工程问题是:审计日志写到哪里、存多久、花多少钱?与普通应用日志不同,审计日志包含结构化的工具参数和返回值,写入量和查询模式有独特特征——不是简单的「接个 ELK 就行」。本节从存储方案选型、分层保留、体积估算、脱敏写入四个维度给出生产级方案。

存储方案对比

Agent 审计日志的存储需求有几个关键约束:(1)需要支持对 tool_nameparameters 等 JSON 字段的结构化查询;(2)写入吞吐量高——每次工具调用产生 1-3 条事件,高并发 Agent 场景下写入 QPS 可能达到数万;(3)查询模式偏向时间范围扫描(「过去 24 小时所有 DELETE 操作」),而非随机点查;(4)需要合理的成本控制——审计日志的数据量增长很快,全量存 Elasticsearch 的账单会超出预期。

四种主流存储方案的对比:

存储方案查询速度存储成本JSON 查询运维复杂度适用场景
Elasticsearch⭐⭐⭐⭐⭐
全文搜索 + 聚合毫秒级
⭐⭐
贵——索引开销为原始数据 1.5-3 倍
⭐⭐⭐⭐
原生 JSON 支持,nested query 灵活
⭐⭐⭐
集群调优需要经验;大规模下 JVM 调优是必修课
热数据层——近 7 天的实时检索和事故排查
ClickHouse⭐⭐⭐⭐
列存压缩,时间范围扫描极快
⭐⭐⭐⭐⭐
便宜——压缩率 5-10 倍,单机可存 TB 级
⭐⭐⭐
支持 JSON 字段函数,但嵌套查询不如 ES 灵活
⭐⭐⭐
单机部署简单;集群模式运维中等
温数据层——7-90 天的结构化查询和回放提取
Loki⭐⭐⭐
标签检索快,全文搜索慢
⭐⭐⭐⭐
只索引标签,日志本体存对象存储(S3)
⭐⭐
不支持结构化 JSON 查询——本质是文本日志
⭐⭐⭐⭐
和 Grafana 集成零配置;轻量运维
不适合审计日志——缺少对 tool_name/parameters 的结构化查询能力
PostgreSQL⭐⭐⭐
JSONB 索引 + GIN 支持良好查询
⭐⭐⭐
中等——行存储,大数据量下磁盘占用显著
⭐⭐⭐⭐⭐
JSONB 类型 + GIN 索引,查询能力最强
⭐⭐⭐⭐
团队通常已有运维经验
小规模场景(< 1 亿条)或作为元数据索引层

推荐组合:Elasticsearch(热)+ ClickHouse(温)+ S3/对象存储(冷归档)。这个分层方案在查询性能、存储成本和运维复杂度之间达到最佳平衡。Loki 在审计日志场景中不适用——它面向非结构化文本日志,无法有效利用审计日志中 tool_namestatusparameters 的结构化字段。PostgreSQL 的 JSONB 查询能力最强,但在每天千万级写入量下,行存储的磁盘和 VACUUM 开销会成为瓶颈。

分层保留策略

审计日志不需要——也不应该——永远以相同的成本和速度存储。不同时间段的查询模式差异显著:

层级时间窗口存储后端查询 SLA典型用途
🔥 热数据0-7 天Elasticsearch毫秒级全文搜索实时事故排查、on-call 告警关联
🌤 温数据7-90 天ClickHouse秒级聚合查询周报分析、回放数据提取、异常模式挖掘
❄️ 冷归档90 天 - 1 年+S3/对象存储(Parquet 压缩)分钟级(需先恢复到 ClickHouse)合规审计、年度安全审查、长期趋势分析

三层之间的数据流转:

┌───────────────────────────────────────────────────┐
│                  Agent 审计事件                      │
└─────────────────┬─────────────────────────────────┘
                  │ 双写
     ┌────────────┴────────────┐
     ▼                         ▼
┌─────────┐             ┌─────────────┐
│   ES    │  7 天后 →   │  ClickHouse │  90 天后 →  ┌─────────┐
│ (热数据) │  ILM 自动   │  (温数据)    │  TTL 自动   │   S3    │
│ 0-7 天  │  迁移删除   │  7-90 天    │  导出压缩   │ (冷归档) │
└─────────┘             └─────────────┘             │ 90天-1年│
                                                     └─────────┘

关键实现细节:

1. ES → ClickHouse 迁移。利用 Elasticsearch 的 ILM(Index Lifecycle Management)策略——索引满 7 天或达到 50GB 后自动 rollover,新索引承接新写入,老索引通过定时任务(如 Apache Spark / 自定义 Python 脚本)批量导出到 ClickHouse。ES 端在确认 ClickHouse 写入成功后删除老索引。

2. ClickHouse → S3 归档。ClickHouse 的 TTL(Time To Live)功能可按分区自动将过期数据导出为 Parquet 格式到 S3,并删除本地存储。Parquet 的列式压缩可将审计日志压缩到原始 JSON 体积的 10-15%。

3. 冷归档查询。合规审计通常按时间范围查询(「请提供 2025 年 Q4 所有的 DELETE 操作记录」),不需要毫秒级响应。冷归档数据通过 Athena / Presto 直接查询 S3 上的 Parquet 文件,或按需恢复到 ClickHouse 的临时表中进行复杂查询。

日志体积估算

在设计存储方案之前,需要先估算日志体积——它直接影响存储成本和技术选型。审计日志体积的三个核心变量:

日志体积(GB/天)= events_per_request × requests_per_day × avg_event_size

其中:
  events_per_request    每次用户请求产生的平均审计事件数
  requests_per_day      每天处理的用户请求总数
  avg_event_size        单条审计事件的平均大小(JSON, KB)

以一个中等规模的 Agent 服务为例:

变量典型值说明
events_per_request4-8每次请求通常经历 1-3 次 LLM 推理,每次推理触发 1-2 个工具调用,加上 decision 和 approval 事件
requests_per_day100,000日均 10 万次用户请求(B2B SaaS 的典型量级)
avg_event_size0.5 - 2 KB取决于工具的返回值大小。search_docs 的结果可能只有 200 字节;delete_records 的 result 摘要约 500 字节;带有大段 rationale 的 decision 事件约 1.5 KB。平均取 1 KB

代入公式:6 events/req × 100,000 req/天 × 1 KB = 600 MB/天(原始 JSON)

不同存储后端的实际磁盘占用:

这个估算说明了两点:(1)ClickHouse 的列存压缩在审计日志场景中优势显著——同样的原始数据,存储成本仅为 ES 的约 1/12;(2)全量存 ES 一年的费用是 ClickHouse + S3 组合方案的 10 倍以上。注意:这些是容量估算的启发式参考,不是精确保证。实际占用取决于索引策略、压缩算法、payload 大小、分片/分区设计和保留设置。如果你的 Agent 日均请求量在百万级别,建议在生产环境的实际数据上压测后再确定存储方案。

如果工具的返回值包含大段文本(如 search_docs 返回完整文档内容),建议在审计日志中只记录摘要 + 文档 ID,完整内容通过 session_id 在应用日志中引用——避免审计日志被工具返回值撑爆。

敏感数据脱敏策略

审计日志记录了工具调用的完整参数和返回值,如果缺乏有效的脱敏机制,审计日志存储本身将成为系统中最危险的数据集——攻击者不需要攻破数据库,只需要拿到审计日志的读取权限。

Section 2 已经给出了字段级脱敏的代码实现(sanitize_parameters + truncate_result),此处从策略层面补充三个关键设计:

1. 脱敏在应用层完成,不依赖存储层。审计事件在写入任何存储之前,必须已经完成脱敏。理由:(a)数据传输过程中可能被拦截——如果脱敏在 ES 端做,传输中的数据是明文;(b)双写场景下(同时写 ES 和 ClickHouse),在应用层做一次脱敏比在两个存储端各做一次更可靠且一致;(c)应用层脱敏让你可以做到字段级别的精确控制——知道哪些 key 是敏感的(api_key、token、password),而存储层只能做正则匹配。

2. 自动识别敏感字段——不只是 key 名匹配。基于 key 名的脱敏(如匹配 api_keytoken)覆盖了 80% 的场景,但不够。对于参数值中嵌入的敏感数据——如一个 query 参数的值是 "SELECT * FROM users WHERE token='abc123'"——需要额外的值级别扫描:用正则匹配已知的敏感数据格式(JWT、AWS Access Key ID、GitHub PAT)。但值级别扫描的误报率较高(正常 SQL 语句中也可能出现类似格式的字符串),建议作为可选的增强层而非默认开启。

3. 脱敏 vs 可审计性的平衡。完全脱敏(所有参数都替换为 REDACTED)虽然最安全,但会导致审计日志失去事故排查的价值。一个务实的平衡策略:

代码示例:Python 日志写入 ClickHouse

以下代码展示了从 AuditEvent 对象到 ClickHouse 的完整写入管道——包括脱敏、批量写入和错误重试。ClickHouse 推荐使用 clickhouse-connect 库(官方 Python 驱动),它原生支持 dict 类型的参数化插入,无需拼接 SQL 字符串。

"""AuditEvent → ClickHouse 写入管道。

功能:
- 批量写入:积累 BATCH_SIZE 条事件后一次性 flush,减少网络往返
- 自动脱敏:写入前对 parameters 和 result 执行 sanitize + truncate
- 错误重试:网络抖动时自动重试(最多 3 次)
- 优雅关闭:进程退出时 flush 剩余事件

依赖:pip install clickhouse-connect
"""

from __future__ import annotations

import atexit
import logging
import time
from typing import Any, Optional

import clickhouse_connect

# ── 复用 Section 2 的脱敏函数 ──
# from audit_log.model import AuditEvent, sanitize_parameters, truncate_result

logger = logging.getLogger(__name__)

# ClickHouse 表结构(建表 DDL 参考):
#
# CREATE TABLE audit_log (
#     timestamp DateTime64(3) CODEC(Delta, ZSTD),
#     trace_id String CODEC(ZSTD),
#     span_id String CODEC(ZSTD),
#     parent_span_id Nullable(String) CODEC(ZSTD),
#     agent_id LowCardinality(String),
#     session_id String CODEC(ZSTD),
#     event_type LowCardinality(String),
#     status LowCardinality(String),
#     tool_name LowCardinality(String),
#     parameters String CODEC(ZSTD),      -- JSON string
#     result String CODEC(ZSTD),           -- JSON string
#     approver Nullable(String),
#     error_message Nullable(String),
#     error_type Nullable(String),
#     duration_ms UInt32,
#     metadata String CODEC(ZSTD),         -- JSON string
#     date Date DEFAULT toDate(timestamp)  -- 分区键
# ) ENGINE = MergeTree()
# PARTITION BY toYYYYMM(date)
# ORDER BY (agent_id, event_type, timestamp)
# TTL date + INTERVAL 90 DAY DELETE;


class ClickHouseAuditWriter:
    """审计事件 → ClickHouse 批量写入器。

    Usage:
        writer = ClickHouseAuditWriter(host="localhost", database="audit")
        for event in agent_events:
            writer.write(event)
        # 进程退出时自动 flush(atexit),或显式调用:
        writer.flush()
    """

    _TABLE_COLUMNS = [
        "timestamp", "trace_id", "span_id", "parent_span_id",
        "agent_id", "session_id", "event_type", "status",
        "tool_name", "parameters", "result", "approver",
        "error_message", "error_type", "duration_ms", "metadata",
    ]

    def __init__(
        self,
        host: str = "localhost",
        port: int = 8123,
        username: str = "default",
        password: str = "",
        database: str = "audit",
        table: str = "audit_log",
        batch_size: int = 1000,
        flush_interval: float = 5.0,
        max_retries: int = 3,
    ):
        self._table = table
        self._batch_size = batch_size
        self._flush_interval = flush_interval
        self._max_retries = max_retries
        self._buffer: list[dict[str, Any]] = []
        self._last_flush = time.monotonic()

        # 建立连接
        self._client = clickhouse_connect.get_client(
            host=host,
            port=port,
            username=username,
            password=password,
            database=database,
        )
        logger.info(f"ClickHouse connected: {host}:{port}/{database}.{table}")

        # 优雅关闭:进程退出时自动 flush
        atexit.register(self.flush)

    # ── 公开 API ──

    def write(self, event: "AuditEvent") -> None:
        """写入一条审计事件(先进入缓冲区,满一批后批量写入)。

        调用方不需要关心批量逻辑——只管逐条 write,
        flush 由 batch_size 或 flush_interval 自动触发。
        """
        row = self._event_to_row(event)
        self._buffer.append(row)

        # 条件触发 flush
        if len(self._buffer) >= self._batch_size:
            self._flush()
        elif time.monotonic() - self._last_flush > self._flush_interval:
            self._flush()

    def flush(self) -> None:
        """强制 flush 缓冲区中的所有事件。"""
        self._flush()

    # ── 内部实现 ──

    def _event_to_row(self, event: "AuditEvent") -> dict[str, Any]:
        """将 AuditEvent 转换为 ClickHouse 行 dict。

        执行脱敏 + 截断后序列化为 JSON 字符串。
        """

        # 脱敏处理(复用 Section 2 的脱敏函数)
        safe_params = sanitize_parameters(event.parameters or {})
        safe_result = truncate_result(event.result or {})

        return {
            "timestamp": event.timestamp,
            "trace_id": event.trace_id,
            "span_id": event.span_id,
            "parent_span_id": event.parent_span_id,
            "agent_id": event.agent_id,
            "session_id": event.session_id,
            "event_type": event.event_type.value,
            "status": event.status.value,
            "tool_name": event.tool_name or "",
            "parameters": self._safe_json_dumps(safe_params),
            "result": self._safe_json_dumps(safe_result),
            "approver": event.approver,
            "error_message": event.error_message,
            "error_type": event.error_type,
            "duration_ms": event.duration_ms,
            "metadata": self._safe_json_dumps(event.metadata),
        }

    def _flush(self) -> None:
        """批量写入 ClickHouse,失败自动重试。"""
        if not self._buffer:
            return

        rows = self._buffer
        self._buffer = []
        self._last_flush = time.monotonic()

        for attempt in range(1, self._max_retries + 1):
            try:
                self._client.insert(
                    table=self._table,
                    data=rows,
                    column_names=self._TABLE_COLUMNS,
                )
                logger.debug(f"Flushed {len(rows)} events to ClickHouse")
                return
            except Exception as e:
                if attempt < self._max_retries:
                    wait = 2 ** attempt  # 指数退避: 2s, 4s, 8s
                    logger.warning(
                        f"ClickHouse flush failed (attempt {attempt}/{self._max_retries}): {e}. "
                        f"Retrying in {wait}s..."
                    )
                    time.sleep(wait)
                else:
                    # 最终失败:记录到错误日志,不丢失数据
                    logger.error(
                        f"ClickHouse flush failed after {self._max_retries} attempts: {e}. "
                        f"Dropped {len(rows)} events. Last trace_id: {rows[-1].get('trace_id', 'N/A')}"
                    )
                    # 生产环境:应写入死信队列或本地 fallback 文件
                    raise

    @staticmethod
    def _safe_json_dumps(obj: Any) -> str:
        """安全 JSON 序列化——不抛异常。"""
        import json
        try:
            return json.dumps(obj, ensure_ascii=False, default=str)
        except Exception:
            return "{}"


# ── 使用示例 ──

writer = ClickHouseAuditWriter(
    host="localhost",
    database="audit",
    batch_size=500,
)

# 构造审计事件
# from audit_log.model import AuditEvent, EventType, EventStatus

# event = AuditEvent(
#     trace_id=...,
#     ...
# )
# writer.write(event)

# 批量写入 1000 条后自动 flush,或显式调用:
# writer.flush()

写入管道的三个关键设计点:

1. 批量写入 vs 逐条写入。ClickHouse 推荐每批次 1,000-10,000 行——太少则网络开销占主导,太多则内存压力和失败重试成本过高。batch_size=1000 是一个保守但安全的起点,可根据实际吞吐调高。

2. LowCardinality(String) 的 ClickHouse 优化。agent_idevent_typestatustool_name 这些枚举值有限的字段使用 LowCardinality 类型——ClickHouse 会将其存储为字典编码,查询性能和压缩率都大幅优于普通 String。这是 ClickHouse 在审计日志场景中存储成本低的底层原因之一。

3. 死信队列(Dead Letter Queue)。上面代码在最终写入失败时只记录错误日志,生产环境应改为写入死信队列(如本地文件、Redis List、或专门的 Kafka topic)——确保在 ClickHouse 完全不可用时(如集群宕机)不丢失审计事件。恢复后从死信队列回放。这是审计日志写入管道中最容易被忽略但最重要的一环。

五、日志回放与事故事后分析

审计日志被写入了,存储方案也选好了——但真正证明审计日志价值的是:事故发生时,你能多快定位根因?本节从三个维度回答这个问题:通过 trace_id 快速重建调用链、将日志回放用于回归测试、以及一套结构化的事故分析工作流。

3 分钟重建完整调用链

返回 Section 1 的凌晨 2:37 事故——如果有了审计日志存储,具体的排查流程如下:

第 1 步(10 秒):搜索可疑操作。tool_name=delete_records 在 ES 或 ClickHouse 中搜索近 24 小时的事件,按时间降序排列。定位到一条 trace_id = 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67 的 tool_call 事件。

第 2 步(20 秒):展开完整决策链。trace_id 为主键,拉取所有相关事件,按 timestamp 排序。输出如下(通过 parent_span_id 还原调用树):

Trace: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
Session: sess_8f3a2b1c   Agent: prod-agent-03

[02:37:13.100] LLM 推理 #1 (span: a1b2c3d4e5f67891) — 312ms
  ├── [decision] tool=search_docs, rationale="用户询问数据清理方式"
  └── [tool_call] search_docs → success (145ms, 3 docs found)

[02:37:13.557] LLM 推理 #2 (span: a3b4c5d6e7f89012) — 248ms
  ├── [decision] tool=delete_records, rationale="用户要求'清理临时数据',
  │             匹配到 delete_records。识别到可能影响 user_data 表,
  │             2026-05-15 前创建的行。"
  ├── [approval]  ❌ 无审批记录 — tool=delete_records 未配置审批流
  │               (approval_required 策略检查: 未命中)
  └── [tool_call] delete_records → success (847ms, 12,403 rows deleted)

[02:37:14.652] LLM 最终回复: "已为您清理了 12,403 条临时数据。"

第 3 步(2.5 分钟):根因分析。从时间线中可以直接看到:

核心价值在于:整个排查过程不需要联系用户复现对话、不需要 grep 搜索几十 GB 的纯文本日志、不需要猜测 LLM 当时在想什么——结构化审计日志已经给出了完整的决策链。

日志回放用于回归测试

审计日志不仅用于事故分析——它记录的完整工具调用序列(trace_id → decision → tool_call → result)是回归测试的天然输入。工作流程:

┌─────────────────┐     ┌────────────────┐     ┌─────────────────┐
│ 生产审计日志      │ →   │ 提取测试用例    │ →   │ Staging 环境重放 │
│ (ClickHouse/ES) │     │ (trace 序列化)  │     │ (新 Agent 版本)  │
└─────────────────┘     └────────────────┘     └────────┬────────┘
                                                        │
                                              ┌─────────▼─────────┐
                                              │ 对比决策差异        │
                                              │ (旧版本 vs 新版本) │
                                              └───────────────────┘

具体步骤:

  1. 筛选 golden trace:从 ClickHouse 中筛选满足条件的 trace——如「状态为 success、有人工审批通过、涉及高风险工具调用」——这些是高质量的测试用例
  2. 提取测试输入:从 trace 的根 decision 事件中提取用户原始输入和初始上下文,从后续 tool_call 事件中提取工具调用的完整参数
  3. 构造测试用例:生成标准化的测试用例格式(如 JSON),包含:输入 prompt、期望的工具调用序列、期望的审批节点
  4. Staging 重放:输入相同的 prompt,观察新版本 Agent 的决策路径(选择的工具、参数、是否需要审批)
  5. 对比差异:新版本的决策路径是否与旧版本一致?如果不一致——是改进还是回归?审批流是否正确触发?

这种方法——从生产日志中自动提取 golden dataset——比手工编写测试用例有两个优势:(1)覆盖真实世界的边界情况,而非人工设想的 happy path;(2)随着生产流量增长持续自动扩充测试集。

事故分析工作流

将审计日志融入 on-call 工程师的标准事故分析流程:

阶段操作审计日志的作用耗时
1. 搜索按 tool_name、status、时间范围搜索可疑事件结构化字段索引——ES 毫秒级, ClickHouse 秒级10-30 秒
2. 关联以 trace_id 拉取全链路事件,按 parent_span_id 还原调用树span 嵌套模型自动还原 LLM 推理 → 工具调用的层级关系20-60 秒
3. 时间线重建按 timestamp 排序所有事件,生成人类可读的时间线decision → approval → tool_call 序列清晰展示操作链30-60 秒
4. 根因定位分析时间线中的异常:缺少 approval?LLM 决策理由是否合理?decision.rationale + approval 状态直接回答「为什么」和「谁批准了」1-3 分钟

关键洞察:传统应用事故分析中最耗时的步骤是「重建时间线」——你需要从分散在各个服务、各个日志文件中的片段拼凑出完整的调用序列。Agent 审计日志通过 trace_id + span_id + parent_span_id 的三级嵌套,将这一步自动化为一个查询。

代码示例:trace_id 调用链重建脚本

以下 Python 脚本接收一个 trace_id,从 ClickHouse 查询所有相关事件,重建完整的时间线——同时输出到控制台和导出 JSON 文件,便于分享给团队成员。

"""审计日志调用链重建工具。

Usage:
    python trace_replay.py <trace_id> [--output timeline.json]

功能:
    - 从 ClickHouse 查询指定 trace_id 的所有审计事件
    - 按 timestamp + parent_span_id 还原调用树
    - 输出人类可读的控制台时间线
    - 导出结构化 JSON 文件
    - 自动检测缺失的关键事件(如缺失 approval 的高风险 tool_call)
"""

from __future__ import annotations

import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime
from typing import Any, Optional

import clickhouse_connect

# ── 事件字段 ──

EVENT_FIELDS = [
    "timestamp", "trace_id", "span_id", "parent_span_id",
    "agent_id", "session_id", "event_type", "status",
    "tool_name", "parameters", "result", "approver",
    "error_message", "error_type", "duration_ms",
]

# 需要审批的工具列表(示例)
HIGH_RISK_TOOLS = {"delete_records", "drop_table", "truncate", "update_config", "execute_sql"}


class TraceReplay:
    """trace_id 调用链重建器。"""

    def __init__(self, ch_host: str = "localhost", ch_port: int = 8123):
        self._client = clickhouse_connect.get_client(
            host=ch_host, port=ch_port, database="audit"
        )

    def replay(self, trace_id: str) -> dict[str, Any]:
        """查询并重建指定 trace 的完整调用链。"""
        events = self._fetch_events(trace_id)
        if not events:
            raise ValueError(f"未找到 trace_id={trace_id} 的审计事件")

        timeline = self._build_timeline(events)
        warnings = self._detect_anomalies(events)

        return {
            "trace_id": trace_id,
            "session_id": events[0].get("session_id", "N/A"),
            "agent_id": events[0].get("agent_id", "N/A"),
            "event_count": len(events),
            "time_span_ms": self._calc_time_span(events),
            "timeline": timeline,
            "warnings": warnings,
            "raw_events": events,
        }

    # ── 数据获取 ──

    def _fetch_events(self, trace_id: str) -> list[dict]:
        """从 ClickHouse 获取指定 trace_id 的所有事件。"""
        query = f"""
            SELECT {', '.join(EVENT_FIELDS)}
            FROM audit_log
            WHERE trace_id = {{trace_id:String}}
            ORDER BY timestamp ASC
        """
        result = self._client.query(query, parameters={"trace_id": trace_id})
        return [dict(zip(EVENT_FIELDS, row)) for row in result.result_rows]

    # ── 时间线构建 ──

    def _build_timeline(self, events: list[dict]) -> list[dict]:
        """重建调用链时间线。

        策略:
        1. 按 span_id 分组——每个 span 包含其下的 decision/tool_call/approval
        2. 按 parent_span_id 还原 span 之间的父子/兄弟关系
        3. 输出完整的层级时间线
        """
        # —— 第一步:按 span_id 分组 ——
        span_events: dict[str, list[dict]] = defaultdict(list)
        for e in events:
            sid = e.get("span_id", "unknown")
            span_events[sid].append(e)

        # —— 第二步:按 parent_span_id 建立 span 树 ——
        span_tree: dict[str, list[str]] = defaultdict(list)
        root_spans: list[str] = []
        for sid, evts in span_events.items():
            parent = evts[0].get("parent_span_id")
            if parent and parent in span_events:
                span_tree[parent].append(sid)
            elif parent is None or parent == "":
                root_spans.append(sid)
            else:
                # parent span 不在结果集中(可能被截断或在不同分区)
                root_spans.append(sid)

        # —— 第三步:按时间排序并生成时间线条目 ——
        timeline = []
        visited = set()
        all_spans = sorted(root_spans, key=lambda s: min(
            e["timestamp"] for e in span_events[s]
        ))

        # BFS 遍历 span 树
        queue = [(sid, 0) for sid in all_spans]  # (span_id, depth)
        while queue:
            sid, depth = queue.pop(0)
            if sid in visited:
                continue
            visited.add(sid)

            evts = sorted(span_events[sid], key=lambda e: e["timestamp"])

            # 尝试识别 span 类型
            span_type = self._classify_span(evts)

            # 添加 span 头部
            first_ts = evts[0]["timestamp"]
            timeline.append({
                "type": "span_start",
                "span_id": sid,
                "span_type": span_type,
                "depth": depth,
                "timestamp": first_ts,
                "event_count": len(evts),
            })

            # 添加 span 内的所有事件
            for e in evts:
                timeline.append({
                    "type": "event",
                    "span_id": sid,
                    "depth": depth + 1,
                    "timestamp": e["timestamp"],
                    "event_type": e["event_type"],
                    "tool_name": e.get("tool_name", ""),
                    "status": e["status"],
                    "approver": e.get("approver"),
                    "duration_ms": e.get("duration_ms", 0),
                    "error_message": e.get("error_message"),
                })

            # 将子 span 加入队列(按时间排序)
            children = sorted(
                span_tree.get(sid, []),
                key=lambda s: min(e["timestamp"] for e in span_events[s]),
            )
            for child_sid in children:
                queue.append((child_sid, depth + 1))

        return timeline

    def _classify_span(self, events: list[dict]) -> str:
        """通过 span 内的事件类型推断 span 类型。"""
        event_types = {e["event_type"] for e in events}
        if "decision" in event_types:
            return "LLM 推理"
        if "approval" in event_types:
            return "审批节点"
        if "tool_call" in event_types:
            return "工具调用"
        if "error" in event_types:
            return "错误"
        return "未知"

    # ── 异常检测 ──

    def _detect_anomalies(self, events: list[dict]) -> list[str]:
        """自动检测审计日志中的异常模式。"""
        warnings = []

        # 检测 1:高风险工具调用缺少 approval
        high_risk_calls = [
            e for e in events
            if e["event_type"] == "tool_call"
            and e.get("tool_name", "") in HIGH_RISK_TOOLS
        ]
        for call in high_risk_calls:
            span_id = call["span_id"]
            has_approval = any(
                e["event_type"] == "approval" and e["span_id"] == span_id
                for e in events
            )
            if not has_approval:
                warnings.append(
                    f"⚠️ 高风险工具调用缺少审批: tool={call['tool_name']}, "
                    f"span={span_id}, status={call['status']}"
                )

        # 检测 2:tool_call 执行失败
        failed_calls = [
            e for e in events
            if e["event_type"] == "tool_call" and e["status"] != "success"
        ]
        for call in failed_calls:
            warnings.append(
                f"❌ 工具调用失败: tool={call.get('tool_name', 'N/A')}, "
                f"status={call['status']}, "
                f"error={call.get('error_message', 'N/A')}"
            )

        # 检测 3:存在 error 事件
        error_events = [e for e in events if e["event_type"] == "error"]
        for err in error_events:
            warnings.append(
                f"💥 错误事件: type={err.get('error_type', 'N/A')}, "
                f"message={err.get('error_message', 'N/A')}"
            )

        return warnings

    # ── 工具函数 ──

    def _calc_time_span(self, events: list[dict]) -> int:
        """计算 trace 的总耗时(毫秒)。"""
        timestamps = [e["timestamp"] for e in events if e.get("timestamp")]
        if len(timestamps) < 2:
            return 0
        min_ts = min(timestamps)
        max_ts = max(timestamps)
        if isinstance(min_ts, str):
            min_ts = datetime.fromisoformat(min_ts.replace("Z", "+00:00"))
            max_ts = datetime.fromisoformat(max_ts.replace("Z", "+00:00"))
        return int((max_ts - min_ts).total_seconds() * 1000)


# ── 格式化输出 ──

def format_console(trace_data: dict) -> str:
    """生成人类可读的控制台时间线。"""
    lines = []
    lines.append(f"{'═' * 70}")
    lines.append(f"Trace: {trace_data['trace_id']}")
    lines.append(f"Session: {trace_data['session_id']}   Agent: {trace_data['agent_id']}")
    lines.append(f"Events: {trace_data['event_count']}   Time Span: {trace_data['time_span_ms']}ms")
    lines.append(f"{'═' * 70}")

    for entry in trace_data["timeline"]:
        ts = entry.get("timestamp", "")[:23]  # 截断到毫秒
        depth = entry.get("depth", 0)
        indent = "  " * depth

        if entry["type"] == "span_start":
            lines.append(f"\n{indent}[{ts}] {entry['span_type']} (span: {entry['span_id'][:12]}...) — {entry['event_count']} 事件")
        elif entry["type"] == "event":
            event_type = entry["event_type"]
            tool = entry.get("tool_name", "")
            status = entry["status"]
            markers = []
            if tool:
                markers.append(f"tool={tool}")
            if status:
                markers.append(status)
            extra = ", ".join(markers) if markers else ""
            lines.append(f"{indent}├── [{event_type}] {extra} ({entry.get('duration_ms', 0)}ms)")

    if trace_data["warnings"]:
        lines.append(f"\n{'─' * 70}")
        lines.append("⚠️  异常检测:")
        for w in trace_data["warnings"]:
            lines.append(f"  {w}")

    lines.append(f"\n{'═' * 70}")
    return "\n".join(lines)


# ── CLI 入口 ──

def main():
    parser = argparse.ArgumentParser(description="Agent 审计日志调用链重建")
    parser.add_argument("trace_id", help="要重建的 trace_id")
    parser.add_argument("--output", "-o", help="JSON 输出文件路径")
    parser.add_argument("--ch-host", default="localhost", help="ClickHouse 主机 (默认: localhost)")
    parser.add_argument("--ch-port", type=int, default=8123, help="ClickHouse 端口 (默认: 8123)")
    args = parser.parse_args()

    replay = TraceReplay(ch_host=args.ch_host, ch_port=args.ch_port)

    try:
        trace_data = replay.replay(args.trace_id)
    except ValueError as e:
        print(f"错误: {e}", file=sys.stderr)
        sys.exit(1)

    # 控制台输出
    print(format_console(trace_data))

    # JSON 导出
    if args.output:
        with open(args.output, "w", encoding="utf-8") as f:
            json.dump(trace_data, f, ensure_ascii=False, indent=2, default=str)
        print(f"\n✅ 时间线已导出: {args.output}")


if __name__ == "__main__":
    main()

# ── 使用示例 ──
#
# $ python trace_replay.py 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67 -o timeline.json
# ══════════════════════════════════════════════════════════════════
# Trace: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
# Session: sess_8f3a2b1c   Agent: prod-agent-03
# Events: 6   Time Span: 1552ms
# ══════════════════════════════════════════════════════════════════
#
# [2026-05-22T02:37:13.100] LLM 推理 (span: a1b2c3d4e5f6...) — 2 事件
#   ├── [decision] tool=search_docs, success (0ms)
#   ├── [tool_call] tool=search_docs, success (145ms)
#
# [2026-05-22T02:37:13.557] LLM 推理 (span: a3b4c5d6e7f8...) — 2 事件
#   ├── [decision] tool=delete_records, success (0ms)
#   ├── [tool_call] tool=delete_records, success (847ms)
#
# ──────────────────────────────────────────────────────────────────
# ⚠️  异常检测:
#   ⚠️ 高风险工具调用缺少审批: tool=delete_records, span=a3b4c5d6e7f8..., status=success
# ══════════════════════════════════════════════════════════════════
# ✅ 时间线已导出: timeline.json

代码的三个关键设计点:

1. Span 树的重建逻辑。_build_timeline 是核心——它通过 parent_span_id 还原 span 之间的层级关系,再通过 timestamp 排序保证时间线的正确性。这与 Section 3 中 OTEL 的 span 嵌套模型直接对应。

2. 自动异常检测。_detect_anomalies 在重建调用链的同时自动识别三类异常:高风险操作缺少审批、工具调用失败、以及显式的 error 事件。这让事故分析从「被动搜索」变为「主动发现」——on-call 工程师不需要猜测哪里出了问题,脚本直接标注。

3. 双输出模式。控制台输出用于快速人工阅读;JSON 导出用于集成到自动化管道——例如 CI/CD 在部署后自动重放关键 trace 并对比差异。

真实场景案例:隆升科技的审批流缺失事故

以下是基于真实事故模式构造的案例(公司名和细节已虚构):

背景:隆升科技(Longsheng Tech)是一家 B2B SaaS 公司,其客服 Agent 系统「智服助手」管理着约 50 万企业客户的工单数据。Agent 拥有 42 个工具,包括查询、创建、更新和删除工单。

事故经过:周二下午 3:12,一位客服主管发现上周创建的 3,200 条已关闭工单从数据库中消失了。经排查,这些工单在当天上午 10:45 被批量删除——但没有任何人手动执行过删除操作。

排查过程(使用审计日志):

  1. 10:48 AM — 搜索阶段(15 秒):on-call 工程师在 ClickHouse 中搜索 tool_name=delete_tickets AND timestamp > '2026-05-19T10:00:00'——返回 1 条记录,trace_id 为 0199c3e1-...
  2. 10:49 AM — 关联阶段(30 秒):以 trace_id 拉取全链路 8 条事件。时间线显示:
    • 10:45:03 — 客服人员张三在对话中说「帮我归档上周已关闭的工单,清理一下数据库」
    • 10:45:05 — LLM 推理 #1:选择 delete_tickets,理由是「用户要求清理数据库中的已关闭工单」
    • 10:45:06 — ❌ 缺失 approval 事件。delete_tickets 工具应有审批流,但策略配置错误——审批规则中写的是 delete_ticket(单数),而工具实际注册名是 delete_tickets(复数)
    • 10:45:06 — tool_call: delete_tickets → success(删除了 3,200 条工单)
  3. 10:50 AM — 根因定位(1 分钟):审批流配置中的工具名拼写不匹配——delete_ticket vs delete_tickets——导致审批策略静默跳过。修复:统一工具命名规范 + 添加审批策略的告警机制(当配置引用的工具名在注册表中不存在时,告警而非静默忽略)。

如果没有审计日志:传统排查方式需要(a)从应用日志中 grep 所有包含「delete」的请求——可能几千条;(b)逐一检查每条请求是否经过了审批——审批记录在另一个独立的系统中;(c)回放对话历史还原上下文——需要找到对应 session 的完整对话。整个过程预计需要 2-4 小时,且需要多人协作。

关键教训:(1)工具名的精确匹配至关重要——审计日志中的 tool_name 应与工具注册表、审批策略中的引用完全一致;(2)审批策略的「静默跳过」是最危险的行为——策略引擎在找不到匹配规则时,应该默认拒绝而非默认放行;(3)trace_replay.py 这类工具的自动异常检测(缺失 approval)可以在事故扩大前就发出告警。

六、审计日志与评测系统的集成

审计日志的核心用途——事故分析、合规审计、日志回放——在与评测系统集成后,会产生一个额外的复合价值:审计日志为评测框架提供真实、持续、自动更新的评测数据,评测框架反过来验证 Agent 版本升级时决策质量是否退化。本节聚焦于这个双向集成。

审计日志如何为评测系统提供真实数据

Agent 评测框架设计中,评测数据的获取通常依赖于两种方式:手工编写测试用例(费时、覆盖面窄)或从生产日志中提取(需要额外的管道)。审计日志通过其结构化的 trace_id → decision → tool_call → result 序列,天然地解决了第二种方式的输入问题。

具体而言,审计日志可以为评测框架提供三类评测数据:

评测数据类型从审计日志中如何提取评测用途
Golden Dataset
(标准答案集)
筛选 status=success + 有人工审批通过的 trace,提取用户输入 → tool_call 序列作为「正确解答」回归测试——新版本 Agent 对相同输入是否产生相同/更好的工具调用序列
边界用例
(Edge Cases)
筛选 status=failure 或 status=timeout 的 trace,提取导致失败的输入和上下文鲁棒性测试——新版本 Agent 在已知失败场景中是否有所改进
异常行为样本
(Anomaly Samples)
筛选「高风险工具调用缺少 approval」或「tool_call 序列偏离常规模式」的 trace安全性评测——新版本 Agent 的审批策略是否有效拦截了异常行为模式

关键设计原则:golden dataset 的质量取决于筛选条件。不是所有 status=success 的 trace 都适合作为标准答案——一个「成功」的调用可能包含了不合理的工具选择(如用户要求「查看」但 Agent 执行了「修改」)。人工审批通过的 trace 是更可靠的 golden 来源,因为它经过了人工验证。在实践中,建议用三层筛选来提取 golden dataset:

  1. 第一层——状态筛选:status=success + 审批状态=approved(如果工具需审批)
  2. 第二层——置信度筛选:LLM decision 的 rationale 中包含高置信度标记(如「明确」「确认」「匹配」等关键词)
  3. 第三层——结果验证:tool_call 的 result 与预期一致(如返回的文档相关度 > 阈值)

从日志中自动提取 Golden Dataset

以下代码展示了从 ClickHouse 中查询并生成评测数据的完整管道。输出格式兼容 Agent 评测框架设计中定义的评测数据集 schema:

"""从审计日志中生成 Agent 评测数据集。

将 ClickHouse 中的生产审计日志转换为评测框架可消费的评测用例。
输出格式兼容 agent-evaluation-framework 的 test_case schema。

Usage:
    python generate_eval_dataset.py --days 7 --output eval_dataset.json
"""

from __future__ import annotations

import argparse
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Optional

import clickhouse_connect


# ── 评测用例的数据结构 ──

class EvalTestCase:
    """单个评测用例——兼容 agent-evaluation-framework 格式。

    Fields:
        id: 唯一标识(使用 trace_id)
        input: 用户原始输入 + 上下文
        expected_tools: 期望的工具调用序列(名称 + 参数)
        expected_approvals: 期望的审批节点
        metadata: 来源 trace 的元信息
        difficulty: 自动推断的难度等级
    """

    def __init__(
        self,
        trace_id: str,
        user_input: str,
        expected_tools: list[dict[str, Any]],
        expected_approvals: list[str],
        metadata: dict[str, Any],
    ):
        self.id = trace_id
        self.input = user_input
        self.expected_tools = expected_tools
        self.expected_approvals = expected_approvals
        self.metadata = metadata
        self.difficulty = self._infer_difficulty(expected_tools)

    def _infer_difficulty(self, tools: list[dict]) -> str:
        """根据工具调用链的复杂度自动推断难度。"""
        high_risk = {"delete", "drop", "truncate", "update", "execute"}
        tool_names = [t.get("tool_name", "") for t in tools]

        if len(tools) > 5:
            return "hard"
        if any(any(risk in name for risk in high_risk) for name in tool_names):
            return "hard"
        if len(tools) > 2:
            return "medium"
        return "easy"

    def to_json(self) -> dict:
        return {
            "id": self.id,
            "input": self.input,
            "expected_tools": self.expected_tools,
            "expected_approvals": self.expected_approvals,
            "metadata": self.metadata,
            "difficulty": self.difficulty,
        }


# ── Golden Dataset 生成器 ──

class GoldenDatasetGenerator:
    """从 ClickHouse 审计日志中提取 Golden Dataset。

    三层筛选策略:
    1. 状态筛选:status=success + 有审批的已通过
    2. 置信度筛选:decision rationale 包含高置信度标记
    3. 结果验证:tool_call 返回了有意义的结果

    Usage:
        gen = GoldenDatasetGenerator(ch_host="localhost")
        cases = gen.generate(days=7, limit=500)
        gen.export_json(cases, "golden_dataset.json")
    """

    # 高置信度关键词——decision rationale 中包含这些词的 trace 更可靠
    HIGH_CONFIDENCE_MARKERS = [
        "明确", "确认", "匹配", "对应", "要求",
        "指定", "直接", "精确",
    ]

    # 需要人工审批的工具(高风险)
    HIGH_RISK_TOOLS = {"delete_records", "drop_table", "truncate", "update_config", "execute_sql"}

    def __init__(self, ch_host: str = "localhost", ch_port: int = 8123):
        self._client = clickhouse_connect.get_client(
            host=ch_host, port=ch_port, database="audit"
        )

    def generate(self, days: int = 7, limit: int = 500) -> list[EvalTestCase]:
        """生成 golden dataset。

        Args:
            days: 从最近多少天中提取数据。
            limit: 最大生成的测试用例数。
        """
        # Step 1: 获取候选 trace_id——成功的、有审批通过记录的
        candidate_traces = self._get_candidate_traces(days, limit * 3)  # 多取一些,后续筛选
        if not candidate_traces:
            return []

        # Step 2: 逐个 trace 提取完整信息并评分
        cases = []
        for trace_id in candidate_traces:
            if len(cases) >= limit:
                break

            test_case = self._build_test_case(trace_id)
            if test_case:
                cases.append(test_case)

        # Step 3: 按难度分层抽样,确保数据集的多样性
        return self._stratified_sample(cases, limit)

    # ── 内部实现 ──

    def _get_candidate_traces(self, days: int, limit: int) -> list[str]:
        """查询候选 trace_id——状态成功 + 包含审批记录。"""
        since = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")

        # 查找满足条件的 trace_id:
        # - 有 tool_call 事件且 status=success
        # - 如果是高风险工具,必须有 approval 事件且 status=approved
        query = """
            SELECT trace_id, count() as event_count
            FROM audit_log
            WHERE timestamp >= {since:String}
              AND status = 'success'
            GROUP BY trace_id
            HAVING event_count >= 2
            ORDER BY event_count DESC
            LIMIT {limit:UInt32}
        """
        result = self._client.query(
            query,
            parameters={"since": since, "limit": limit},
        )
        return [row[0] for row in result.result_rows]

    def _build_test_case(self, trace_id: str) -> Optional[EvalTestCase]:
        """为一个 trace 构建评测用例。"""
        events = self._fetch_trace_events(trace_id)
        if len(events) < 2:
            return None

        # —— 三层筛选 ——

        # 第一层:状态筛选
        has_failed = any(e.get("status") not in ("success", "approved") for e in events)
        if has_failed:
            return None

        # 第二层:置信度筛选——decision rationale 中是否包含高置信度关键词
        decisions = [e for e in events if e.get("event_type") == "decision"]
        if decisions:
            # decision 的 rationale 存储在 metadata 或单独的 result 字段中
            # 此处从 metadata JSON 中提取
            high_confidence = False
            for d in decisions:
                metadata = self._parse_json_field(d.get("metadata", "{}"))
                rationale = metadata.get("rationale", "")
                if any(marker in rationale for marker in self.HIGH_CONFIDENCE_MARKERS):
                    high_confidence = True
                    break
            if not high_confidence:
                return None

        # 第三层:结果验证——tool_call 有实际返回内容
        tool_calls = [e for e in events if e.get("event_type") == "tool_call"]
        for tc in tool_calls:
            result = tc.get("result", "")
            if not result or result in ("{}", "null", ""):
                return None

        # —— 提取评测用例信息 ——

        # 工具调用序列
        expected_tools = []
        for tc in sorted(tool_calls, key=lambda e: e["timestamp"]):
            expected_tools.append({
                "tool_name": tc.get("tool_name", ""),
                "parameters": self._parse_json_field(tc.get("parameters", "{}")),
                "expected_status": tc.get("status", "success"),
            })

        # 审批节点
        approvals = [e for e in events if e.get("event_type") == "approval"]
        expected_approvals = [a.get("tool_name", "") for a in approvals]

        # 用户输入(从第一个 decision 的 metadata 中提取)
        first_decision = decisions[0] if decisions else None
        metadata = self._parse_json_field(first_decision.get("metadata", "{}")) if first_decision else {}
        user_input = metadata.get("prompt_summary", metadata.get("user_input", "N/A"))

        return EvalTestCase(
            trace_id=trace_id,
            user_input=user_input,
            expected_tools=expected_tools,
            expected_approvals=expected_approvals,
            metadata={
                "source": "production_audit_log",
                "trace_id": trace_id,
                "session_id": events[0].get("session_id", ""),
                "agent_id": events[0].get("agent_id", ""),
                "timestamp": str(events[0].get("timestamp", "")),
                "model": metadata.get("model", "N/A"),
                "event_count": len(events),
                "extraction_method": "golden_dataset_v1",
            },
        )

    def _fetch_trace_events(self, trace_id: str) -> list[dict]:
        """获取指定 trace 的所有事件。"""
        fields = [
            "timestamp", "trace_id", "span_id", "event_type", "status",
            "tool_name", "parameters", "result", "approver", "metadata",
        ]
        query = f"""
            SELECT {', '.join(fields)}
            FROM audit_log
            WHERE trace_id = {{trace_id:String}}
            ORDER BY timestamp ASC
        """
        result = self._client.query(query, parameters={"trace_id": trace_id})
        return [dict(zip(fields, row)) for row in result.result_rows]

    @staticmethod
    def _parse_json_field(value: Any) -> Any:
        """安全解析 JSON 字段。ClickHouse 中的 JSON 存储为 String。"""
        if value is None:
            return {}
        if isinstance(value, (dict, list)):
            return value
        try:
            return json.loads(value)
        except (json.JSONDecodeError, TypeError):
            return {}

    def _stratified_sample(
        self, cases: list[EvalTestCase], limit: int
    ) -> list[EvalTestCase]:
        """按难度分层抽样,保证数据集多样性。"""
        if len(cases) <= limit:
            return cases

        # 按难度分组
        by_difficulty: dict[str, list[EvalTestCase]] = {"easy": [], "medium": [], "hard": []}
        for c in cases:
            by_difficulty[c.difficulty].append(c)

        # 按比例抽样:easy 30%, medium 40%, hard 30%
        ratios = {"easy": 0.3, "medium": 0.4, "hard": 0.3}
        sampled = []
        for level, ratio in ratios.items():
            pool = by_difficulty[level]
            n = max(1, int(limit * ratio))
            sampled.extend(pool[:n])

        return sampled[:limit]

    def export_json(self, cases: list[EvalTestCase], filepath: str) -> None:
        """导出评测数据集为 JSON 文件。"""
        dataset = {
            "meta": {
                "generator": "golden_dataset_from_audit_log",
                "generated_at": datetime.now(timezone.utc).isoformat(),
                "total_cases": len(cases),
                "schema_version": "1.0",
                "compatible_with": "agent-evaluation-framework",
            },
            "cases": [c.to_json() for c in cases],
        }
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(dataset, f, ensure_ascii=False, indent=2)
        print(f"✅ 导出了 {len(cases)} 条评测用例 → {filepath}")

    def print_summary(self, cases: list[EvalTestCase]) -> None:
        """打印数据集摘要。"""
        by_diff = {"easy": 0, "medium": 0, "hard": 0}
        by_tool: dict[str, int] = {}
        for c in cases:
            by_diff[c.difficulty] += 1
            for t in c.expected_tools:
                name = t.get("tool_name", "unknown")
                by_tool[name] = by_tool.get(name, 0) + 1

        print(f"\n{'─' * 50}")
        print(f"📊 Golden Dataset 摘要")
        print(f"  总用例数: {len(cases)}")
        print(f"  难度分布: Easy={by_diff['easy']}, Medium={by_diff['medium']}, Hard={by_diff['hard']}")
        print(f"  工具分布 (Top 5):")
        for name, count in sorted(by_tool.items(), key=lambda x: -x[1])[:5]:
            print(f"    - {name}: {count}")
        print(f"{'─' * 50}")


# ── CLI 入口 ──

def main():
    parser = argparse.ArgumentParser(description="从审计日志生成 Agent 评测数据集")
    parser.add_argument("--days", type=int, default=7, help="提取最近 N 天的数据 (默认: 7)")
    parser.add_argument("--limit", type=int, default=500, help="最大生成用例数 (默认: 500)")
    parser.add_argument("--output", "-o", default="golden_dataset.json", help="输出文件路径")
    parser.add_argument("--ch-host", default="localhost", help="ClickHouse 主机")
    parser.add_argument("--ch-port", type=int, default=8123, help="ClickHouse 端口")
    args = parser.parse_args()

    gen = GoldenDatasetGenerator(ch_host=args.ch_host, ch_port=args.ch_port)
    cases = gen.generate(days=args.days, limit=args.limit)
    gen.print_summary(cases)
    gen.export_json(cases, args.output)


if __name__ == "__main__":
    main()


# ── 使用示例 ──
#
# $ python generate_eval_dataset.py --days 30 --limit 200 -o golden_v2.json
# ──────────────────────────────────────────────────
# 📊 Golden Dataset 摘要
#   总用例数: 200
#   难度分布: Easy=60, Medium=80, Hard=60
#   工具分布 (Top 5):
#     - search_docs: 145
#     - create_ticket: 87
#     - delete_records: 34
#     - update_config: 23
#     - send_notification: 18
# ──────────────────────────────────────────────────
# ✅ 导出了 200 条评测用例 → golden_v2.json

异常模式自动检测

除了生成评测数据,审计日志的时间序列特性使其天然适合异常检测——不需要复杂的 ML 模型,简单的规则和统计就能发现大量有意义的异常:

异常类型检测方法触发条件示例评测用途
工具调用序列偏离马尔可夫链——统计 tool A → tool B 的转移概率,标记低概率转移search_docs → delete_records(正常转移概率 0.1%,某 trace 却出现了)输入到安全评测——新版本 Agent 是否仍在这些异常转移中触发?
权限异常提升工具权限等级矩阵——标记 tool_call 中调用了超出 agent_id 权限范围的工具readonly-agent-05 调用了 delete_records安全评测——权限控制策略是否在新版本中有效?
审批异常静默高风险工具 × 无 approval 事件 = 告警delete_records 的 tool_call 前后 60 秒内无 approval 事件策略评测——新版本的审批流配置是否覆盖了所有高风险工具?
耗时异常滑动窗口 Z-score——工具调用 duration_ms 偏离均值 > 3σsearch_docs 平时 150ms,某次耗时 8,500ms性能评测——新版本 Agent 的工具调用延迟是否退化?
结果异常短工具返回值大小偏离均值(result_size 字段)search_docs 通常返回 5-20 条结果,某次返回 0 条质量评测——新版本是否更频繁地触发空结果?

实现建议:异常检测管道应作为独立的后台任务运行(如每小时一次的定时任务),将检测到的异常 trace 写入专门的 audit_anomalies 表或推送到告警系统。不要在生产 Agent 的热路径上做异常检测——这会增加工具调用的延迟。

与评测流水线的集成架构

审计日志与评测系统的完整集成架构如下:

┌──────────────────────────────────────────────────────────────┐
│                        生产环境                                │
│  ┌─────────┐    ┌──────────┐    ┌──────────────────────────┐ │
│  │ Agent   │ →  │ 审计日志  │ →  │ ClickHouse / ES 日志存储  │ │
│  │ 服务    │    │ 写入管道  │    └───────────┬──────────────┘ │
│  └─────────┘    └──────────┘                │                │
└──────────────────────────────────────────────┼────────────────┘
                                               │
                    ┌──────────────────────────┘
                    │ 定时任务 (cron / Airflow)
                    ▼
┌──────────────────────────────────────────────────────────────┐
│                    评测数据生成管道                             │
│                                                               │
│  ┌──────────────┐   ┌────────────────┐   ┌────────────────┐  │
│  │ Golden       │   │ 异常检测        │   │ 边界用例提取    │  │
│  │ Dataset 提取 │   │ (序列偏离/权限) │   │ (failure trace)│  │
│  └──────┬───────┘   └───────┬────────┘   └───────┬────────┘  │
│         │                   │                     │           │
│         └───────────────────┼─────────────────────┘           │
│                             ▼                                 │
│                    ┌────────────────┐                         │
│                    │ 评测数据集存储   │                         │
│                    │ (JSON / DB)    │                         │
│                    └───────┬────────┘                         │
└────────────────────────────┼──────────────────────────────────┘
                             │
                             ▼
┌──────────────────────────────────────────────────────────────┐
│                    评测执行流水线                               │
│                                                               │
│  ┌──────────────┐   ┌────────────┐   ┌────────────────────┐  │
│  │ Staging      │   │ 评测框架    │   │ 评测结果 + 报告     │  │
│  │ Agent (新版本)│ → │ 运行用例    │ → │ (pass/fail/diff)  │  │
│  └──────────────┘   └────────────┘   └────────┬───────────┘  │
│                                                │              │
│                              反馈到 ────────────┘              │
│                              ┌────────────────┐               │
│                              │ CI/CD 质量门禁  │               │
│                              │ (回归检测)      │               │
│                              └────────────────┘               │
└──────────────────────────────────────────────────────────────┘

架构的关键设计点:

1. 数据生成与评测执行分离。Golden dataset 的提取是离线批处理(每小时/每天一次),不依赖 Agent 的实时请求。评测执行则可以在 CI/CD 管道中按需触发(每次部署前)。两个阶段的解耦保证了评测数据的新鲜度,同时不影响生产 Agent 的性能。

2. 三层数据源。不依赖单一数据源——golden dataset(正向验证)、异常检测(负向/安全验证)、边界用例(鲁棒性验证)三者组合,覆盖了评测框架需要的大部分数据类型。

3. CI/CD 质量门禁。评测结果直接反馈到部署管道——如果新版本 Agent 在 golden dataset 上的 pass rate 低于阈值(如 95%),或异常检测发现新的安全问题,部署自动阻断。这就是审计日志在工程实践中的最终价值落点:从生产日志到质量门禁的闭环

三个集成阶段

审计日志与评测系统的集成不是一步到位的——建议分三个阶段逐步推进:

阶段能力技术产出时间投入
Phase 1: 数据导出从 ClickHouse 手动导出 JSON → 评测框架消费generate_eval_dataset.py(如上)1-2 天
Phase 2: 自动化管道定时任务自动提取 + 异常检测 → 评测数据集自动更新Airflow DAG / cron + 上述脚本3-5 天
Phase 3: CI/CD 集成评测结果 → 质量门禁 → 自动阻断部署GitHub Actions / GitLab CI 集成3-5 天

三个阶段的推进逻辑是:先让数据流起来(Phase 1),再让流程自动化(Phase 2),最后让质量门禁生效(Phase 3)。每个阶段都独立产生价值——不需要等到 Phase 3 才开始使用评测数据。

关于评测框架本身的架构设计和评测指标的完整定义,请参见 Agent 评测框架设计——该文详细讨论了评测维度(正确性、安全性、效率)、评分算法、以及评测结果的可视化方案。本节聚焦于审计日志到评测框架的数据输入管道——两者配合构成了 Agent 质量保障的完整链条。

七、开源与商业方案对比

前面六节讲了「为什么需要审计日志」和「怎么设计数据模型与存储」(Section 1-3)、「怎么实现 SDK」(Section 4)、「怎么与评测框架集成」(Section 5-6)。现在到了落地前的最后一步:你该用什么基础设施来承载这套审计系统?——开源自建还是买商业方案?

7.1 挑战:审计日志基础设施的选择困境

审计日志的存储和查询需求与普通应用日志有显著差异:高写入吞吐(每次 Agent 推理循环产生 3-10 条事件)、结构化字段查询(按 trace_id、tool_name、approver、event_type 多维度检索)、长期保留(合规要求可能长达半年到数年)。同时,团队的能力、预算和合规约束决定了你是否有精力自建——还是有理由购买商业产品。

下面的对比覆盖当前主流的开源和商业方案,评估它们各自在 Agent 审计日志场景下的适配度。

7.2 开源方案:OpenTelemetry + 可观测性三件套

开源路线的标准组合是:OpenTelemetry(数据采集和导出)+ Jaeger/Zipkin(Trace 可视化)+ Elasticsearch / ClickHouse(结构化搜索和存储)

开源方案 PoC:OpenTelemetry + Jaeger + Elasticsearch

以下是一个可直接运行的 docker-compose.yml,搭建一个最小化的审计日志基础设施——适合在本地或小规模生产环境验证。

# docker-compose.yml — Agent 审计日志最小可用基础设施
version: "3.9"

services:
  # ── 1. OpenTelemetry Collector:接收审计事件并路由到后端 ──
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.102.0
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"   # OTLP gRPC
      - "4318:4318"   # OTLP HTTP
    depends_on:
      - jaeger
      - elasticsearch

  # ── 2. Jaeger:Trace 可视化 ──
  jaeger:
    image: jaegertracing/all-in-one:1.57
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      - SPAN_STORAGE_TYPE=elasticsearch
      - ES_SERVER_URLS=http://elasticsearch:9200
    ports:
      - "16686:16686"  # Jaeger UI
      - "4317"         # OTLP gRPC (内部分发)
    depends_on:
      - elasticsearch

  # ── 3. Elasticsearch + Kibana:结构化搜索与可视化 ──
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9200:9200"

  kibana:
    image: docker.elastic.co/kibana/kibana:8.13.4
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch
# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 5s
    send_batch_size: 512

exporters:
  # 路由到 Jaeger —— trace 可视化
  otlp/jaeger:
    endpoint: jaeger:4317
    tls:
      insecure: true

  # 路由到 Elasticsearch —— 结构化审计日志存储
  elasticsearch:
    endpoints: ["http://elasticsearch:9200"]
    logs_index: "agent-audit-logs"
    # 按日期滚动索引,便于归档和清理
    logs_dynamic_index:
      enabled: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [otlp/jaeger]
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [elasticsearch]

这套开源方案的核心优势是零许可成本完全的数据控制权——所有数据留存在你自己的基础设施中。但代价也是显而易见的:你需要有人维护 Elasticsearch 集群、处理索引滚动、管理 Jaeger 的存储——对于缺乏 DevOps 人力的团队(一般 < 5 人),这个运维负担不可忽视。

7.3 商业方案对比

以下对比表覆盖当前主流的 Agent 可观测性商业产品,聚焦于它们对审计日志场景的适配度——不是评测谁更好,而是帮你判断哪种场景下该选哪种

维度 LangSmith Weights & Biases Datadog LLM Arize
核心定位 LangChain 生态的 LLM 应用调试与评测平台 ML 实验追踪 + LLM 链路追踪 全栈可观测性(APM + 日志 + LLM) LLM 评测与生产监控
审计日志支持 ✅ Trace + Run 事件,原生支持 Agent 决策链追踪 ⚠️ 偏 ML 训练追踪,Agent 审计需手动集成 ⚠️ LLM Span 支持较好,但缺乏 Agent 专用语义(decision/approval) ⚠️ 侧重评测指标而非审计链路
集成难度 低——LangChain 原生集成,几行代码 中——需配置 W&B callback 中——需安装 datadog-agent 并配置 ddtrace 中——需使用 Arize SDK 手动插桩
审批链路 ❌ 不支持原生审批事件 ❌ 不支持 ❌ 不支持 ❌ 不支持
数据隐私 ⚠️ 数据上传到 LangSmith 云端(有私有部署选项) ⚠️ 默认云端,需额外配置私有化 ⚠️ 数据发送到 Datadog 服务器 ⚠️ 默认云端
成本(月/团队) 有免费 tier;付费版按 seat 计费——采购前核实最新定价 有免费 tier;付费版按 seat 计费——采购前核实最新定价 按用量计费(per-host + per-span)——采购前核实最新定价 开源核心(Apache 2.0)免费自托管;云托管按用量计费
vendor lock-in 中——通过 LangChain 抽象层,可替换 高——深度绑定 Datadog 生态 低——通过 OpenTelemetry 可迁移
合规认证 支持企业安全选项——在供应商 Trust Center 核实最新 SOC 2/HIPAA/ISO 声明 支持企业安全选项——在供应商 Trust Center 核实最新 SOC 2/HIPAA/ISO 声明 支持企业安全选项——在供应商 Trust Center 核实最新 SOC 2/HIPAA/ISO 声明 开源(Apache 2.0)自托管;云托管版——在供应商 Trust Center 核实认证
适用场景 LangChain 技术栈团队,快速迭代阶段的 Agent 应用 ML 团队同时做模型训练和 Agent 应用 已有 Datadog 基础设施的企业,需 LLM 可观测扩展 评测驱动的团队,监控+评测一体化

注:以上信息基于 2026 年 5 月的公开产品能力和定价,具体情况请查阅各产品最新文档。

7.4 决策框架:三步定位你的方案

面对这么多选项,不要陷入「比较一切」的陷阱。用下面这个三步决策树,在 5 分钟内确定你该走哪条路。

决策条件 推荐方案 核心理由
团队 < 5 人 / 预算有限 开源自建(OTel + ES/Jaeger) 零许可成本;小规模流量下运维负担可控。先用 Docker Compose 跑起来,Phase 2 再上 Elastic Cloud 避免自运维 ES
需要合规认证(SOC 2 / HIPAA / 等保) 自建 + 合规加固,或选 Datadog(已有 SOC 2/HIPAA) 数据不出 VPC 是合规底线。在供应商 Trust Center 核实商业方案的最新认证状态后评估
快速迭代 / 验证阶段 LangSmith(LangChain)或 OpenTelemetry Lite(非 LangChain) 快速接入,把人力集中在产品本身。等审计需求成熟后,再评估是否迁移到自建方案
已有 Datadog / 可观测性基础设施 Datadog LLM Observability 复用现有基础设施、告警通道、权限体系。但需注意:审批链路仍需自定义开发
Agent 框架非 LangChain OpenTelemetry + 自建存储 OTel 是框架无关的标准协议。本系列 SDK 通过 OTLP exporter 对接任何 OTel 后端

决策的核心原则:不要因为「以后可能需要」而提前购买商业方案。审计日志系统的升级路径是开放的——你完全可以从开源方案起步,Phase 1-2 跑通后再决定是否引入商业产品。参见下一节的实施路线图。


八、实施路线图

审计日志系统不需要一次性建成。和大多数基础设施系统一样,推荐渐进式交付——每个阶段独立产生价值,前一个阶段的沉淀自然成为下一个阶段的基础。

8.1 四阶段路线图总览

阶段 时间 人力 核心交付 到达什么状态 投入产出比
Phase 1 1-2 周 1 人(后端) trace_id + JSON 结构化日志 + 文件存储 能通过 grep / jq 排查单条 trace ⭐⭐⭐⭐⭐ 极高
Phase 2 2-4 周 1-2 人(后端 + DevOps) Elasticsearch/Kibana 或 Grafana Loki 按 trace_id/tool/event_type 搜索 + 仪表盘 ⭐⭐⭐⭐ 高
Phase 3 1-2 月 2-3 人(后端 + QA) 日志回放引擎 + CI 集成 + regression test suite CI 中自动运行回放测试,阻断回归 ⭐⭐⭐ 中高
Phase 4 持续 1 人(SRE + 后端) 规则告警 + ML 异常检测 自动化发现异常调用模式并告警 ⭐⭐⭐ 中

8.2 Phase 1(1-2 周):最小可用审计

目标:在 2 周内让每一笔 Agent 调用都有迹可循——哪怕只能用命令行查。

核心工作:

  1. 集成审计 SDK——将 Section 4 的 Python SDK 嵌入到 Agent 推理循环中。在每次 LLM 决策、工具调用、审批事件前后插入 sdk.record_decisionsdk.record_tool_call 等调用。
  2. 生成 trace_id——复用 OTel SDK 生成的 32-char hex trace_id(与 Jaeger/Tempo 兼容)。如未使用 OTel,可用 UUID v7 作为自定义相关 ID。在 Agent 入口处初始化 trace_id,通过 context 传递到所有子调用。
  3. 文件存储——每个 trace 输出一行 JSON,按日期滚动文件(如 /var/log/agent-audit/2026-05-22.jsonl)。
  4. 验证——触发几笔典型的 Agent 调用,用 grep <trace_id> /var/log/agent-audit/*.jsonl | jq 确认链路完整。
# Phase 1 验证:通过命令行排查一条 trace
$ grep "0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67" /var/log/agent-audit/2026-05-22.jsonl | jq '[.event_type, .tool_name, .status, .metadata.rationale]'

# 输出示例——完整的决策→审批→执行链一目了然
["decision","delete_records","success","User requested cleanup of temp data from last week"]
["approval","delete_records","approved",null]
["tool_call","delete_records","success",null]
["tool_result","delete_records","success",null]

投入产出评估:1 人 × 1-2 周,实现从「事故排查 2 小时」到「3 分钟定位」的效率飞跃。这是整个路线图中 ROI 最高的阶段——哪怕后续阶段不做,你已经获得了 95% 的事故排查效率提升。

8.3 Phase 2(2-4 周):日志可视化与搜索

目标:告别命令行,让非工程师(产品、合规、QA)也能通过 UI 搜索和可视化审计日志。

核心工作:

投入产出评估:1-2 人 × 2-4 周。从「只有后端能查」到「全团队可自助检索」,显著降低排查沟通成本。如果团队已经有 ES/Kibana 基础设施(很多公司都有),这个阶段可能缩短到 1 周——只需配置索引和仪表盘。

8.4 Phase 3(1-2 月):日志回放与自动化测试

目标:利用生产审计日志构建 golden test suite,在 CI 中自动回归——每次 Agent Prompt 或 Tool 变更,都知道是否破坏了已有行为。

核心工作:

  1. 回放引擎——从审计日志中提取成功的 trace,重建为测试用例(Section 6 已给出实现)。
  2. CI 集成——将回放测试写入 GitHub Actions / GitLab CI 流程。在 PR 合并前自动运行回归测试套件,对比新旧 Prompt/工具下的 Agent 行为差异。
  3. 用例管理——建立测试用例库(至少积累 50-100 个典型 trace),按场景分类(正常操作 / 边界情况 / 需要审批的操作)。每次事故修复后,将相关 trace 加入回归套件。
# Phase 3 .github/workflows/agent-regression-test.yml
name: Agent Regression Test

on:
  pull_request:
    paths:
      - "agents/**"        # Agent prompt 或工具定义变更
      - "tools/**"         # 工具实现变更

jobs:
  regression:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Run audit log replay tests
        run: |
          python -m audit_log.replay \
            --test-suite tests/suites/regression_suite_v1.json \
            --parallel 4 \
            --output test-results/regression_report.json

      - name: Validate regression results
        run: |
          python -m audit_log.validate \
            --report test-results/regression_report.json \
            --max-degradation-rate 0.05  # 不超过 5% 的 trace 产生行为差异

投入产出评估:2-3 人 × 1-2 月。这是投入最大的阶段——但收益也是结构性的:从此 Prompt 工程师不再靠「我感觉效果变好了」来判断变更是否安全,而是有数据支撑的质量门禁。

8.5 Phase 4(持续):异常检测与告警

目标:从「被动排查」到「主动发现」——在异常发生时就收到告警,而不是等用户报告。

核心工作:

# Phase 4 规则告警示例:高风险工具无审批
# 在 Kibana Alerting / Grafana Alerting 中定义
alert:
  name: "高风险工具缺少审批"
  query: |
    event_type:"tool_call"
    AND tool_name:("delete_records" OR "drop_table" OR "send_email" OR "execute_sql")
    AND NOT (
      event_type:"approval" AND approval_status:"approved"
      AND trace_id:{matched_trace_id}
    )
  window: 5m
  threshold: >= 1
  severity: critical
  notification: PagerDuty / 企业微信 / Slack #oncall

投入产出评估:1 人(SRE 背景)+ 持续迭代。规则告警部分 1-2 周即可上线并立即生效,ML 异常检测是锦上添花——在审计日志数据积累 3 个月以上后再启动,效果更佳。

8.6 路线图使用建议

四个阶段不是必须全部执行。一个务实的建议:

每阶段的成果独立可用——不会因为停在 Phase 2 而让 Phase 1 的白费,也不会因为没有启动 Phase 4 而否定前三阶段的价值。这是路线图设计最重要的原则:每一步都产生独立价值,每一步都是下一步的坚实基础。


常见问题

1. 普通应用日志加上 trace_id 不就够了吗?为什么 Agent 还需要专门的审计日志?

不够。普通应用日志的 trace_id 只能追踪确定性代码路径——函数 A 调用了函数 B,你记录下调用链即可。但 Agent 的决策路径不同:LLM 在每一步「选择」调用哪个工具、传什么参数——这个过程是不确定的。

即使有了 trace_id,你仍然不知道:

  • LLM 为什么选了 tool_a 而不是 tool_b?——普通日志没有 decision 事件类型
  • 它当时看到了什么上下文?——普通日志不记录 prompt_summary 和 tool_choice rationale
  • 有没有经过人工审批?谁审批的?——普通日志没有 approval 事件类型和 approver 字段

Agent 审计日志的核心增值在于:在 trace_id 的基础上,增加了 decision(为什么)、approval(谁批准)、tool_call(做了什么)三个语义层。这三个语义层是普通应用日志在设计时根本没有考虑的维度。

2. 审计日志应该记录多详细?工具调用的参数和返回值全记录吗?

原则:记录足够事后重建决策链,但要对敏感字段脱敏、对大数据截断。

具体来说:

  • 工具调用参数——记录完整的 JSON 参数,但执行字段级脱敏。api_key、token、password 等敏感字段用 REDACTED 替代,保留参数结构用于审计。
  • 工具返回值——记录 HTTP status code 和结果摘要(如前 1,024 字符)。超过阈值的截断,并附加原始长度标记。
  • LLM 决策上下文——记录 system prompt 摘要和 LLM 输出的 tool_choice + rationale,不记录完整的对话历史(通过 session_id 引用,对话历史在应用日志中已有)。
  • 审批记录——记录审批人 ID、审批时间、审批时的上下文摘要、审批决策(approve/reject)。

不要记录完整的对话历史和用户原始输入到审计日志中——这不仅使审计日志体积暴增,还可能引入 PII 合规风险。如果应用日志中存储了完整的用户输入,应用日志同样需要脱敏和保留策略——PII 风险不因为「存在应用日志里」就消失。审计日志应保持精简和高信噪比,通过 session_id 引用应用日志中的完整上下文。

3. 如何确保审计日志本身不成为安全风险?

审计日志记录了大量的工具调用参数和返回值,如果存储不当,本身就是高风险数据集。三条核心防护措施:

1. 写入时脱敏。在事件进入日志管道之前,用字段级脱敏规则过滤 API key、token、PII 等敏感信息。脱敏应在应用层完成——不要依赖存储层的后处理,因为攻击者可能在数据落地前就已经读取了传输中的数据。

2. 存储加密 + 物理隔离。使用存储层 at-rest 加密、传输层加密、严格的索引/表级访问控制,对特别敏感的字段可选字段级加密。加密不能替代脱敏——先脱敏,再加密。审计日志应与普通应用日志物理隔离。普通开发者可以访问应用日志排查 Bug,但不应该能访问审计日志中完整的工具调用记录。

3. 访问控制。审计日志的读取权限应独立于应用日志,仅限安全审计人员、合规团队和 on-call 工程师。参见本系列的 Agent 运行时隔离了解审计日志存储安全与隔离环境的关系——同样的隔离原则适用于日志基础设施。

4. decision 和 tool_call 事件类型的边界在哪里?

简单说:decision 回答「为什么」,tool_call 回答「做了什么」。

在一次 Agent 推理循环中,两者的顺序和职责是:

  1. LLM 推理 → 输出 tool_choice + rationale
  2. record decision 事件:记录 LLM 选择哪个工具、为什么这样选择(rationale)、当前的 prompt 上下文摘要、LLM 提议的参数
  3. (可选)人工审批:如果工具需要审批,在此插入 approval 事件
  4. Agent 框架执行工具调用
  5. record tool_call 事件:记录实际调用的工具名、参数、返回结果、耗时、成功/失败状态

一个常见的困惑是:decision 事件中的 parameters 和 tool_call 事件中的 parameters 有什么区别?区别在于:decision 记录的是 LLM 提议的参数(可能在审批阶段被修改),tool_call 记录的是实际执行的参数。在大多数简单实现中,两者相同——但一旦引入审批流或参数改写逻辑,这个区别就变得至关重要。

5. Agent 审计日志和 OpenTelemetry 的 Span 是什么关系?

OpenTelemetry Span 提供了分布式追踪的基础设施——trace_id 和 span_id 的生成、传播和可视化。Agent 审计日志可以建立在 OTel Span 之上,但需要语义扩展:

  • OTel Span 覆盖的部分:操作名称(对应 event_type)、开始/结束时间(对应 duration_ms)、属性键值对(对应 metadata)
  • OTel Span 缺失的部分:LLM 决策理由(decision rationale)、审批链路(approver + approval_context)、工具参数的字段级脱敏、完整的工具调用参数和返回值(OTel 后端和导出器通常对属性/事件大小有限制)

建模选择——Span Events vs Child Span:decision 和 approval 推荐作为 Llm Span 的 Span Events(add_event())——低 cardinality,不产生独立的 span_id。长时间运行的工具调用(如 > 5s 的外部 API)可创建独立的 Child Span,以更好地展示调用树。实践中推荐的做法:将审计事件作为 OTel Span 的 Span Events 写入(用于可视化追踪),同时单独持久化一份完整的结构化审计日志(用于合规检索和回放)。这样既复用了 OTel 的 trace 基础设施,又保留了审计所需的完整语义——Span 只携带 ID 和摘要,关键信息存储在专用的审计日志后端中。


下一步阅读

⬅️ 上一篇

Agent 运行时隔离:Docker、Firecracker、VM Sandbox 怎么选

隔离光谱与决策框架——按风险等级选择 Agent 代码执行的运行时边界。

➡️ 下一篇 · 即将发布

MCP 协议生产指南:Model Context Protocol 的安全部署

工具协议层的安全实践——MCP 在生产环境中的隔离、认证与传输安全。

📚 相关阅读