Agent Audit Log Design: Tracing a Complete Tool-Call Chain

Part 5 of 6

⚡ TL;DR — 30 Seconds

  • Agent decision chains are fundamentally different from traditional apps — LLM calls are non-deterministic, tool calls are side-effects, and approval chains are asynchronous. The standard three-column log format (time + level + message) is completely inadequate.
  • Audit logs need 5 event types: decision / tool_call / tool_result / approval / error, each carrying 8 universal fields (trace_id, span_id, agent_id, etc.) + 5 event-specific fields (tool_name, approver, etc.)
  • The data model is defined with Pydantic, serialized to JSON for storage, with trace_id reusing the OpenTelemetry 32-char hex trace ID and optional UUID v7 audit_event_id for time-sortable business queries — laying the foundation for replay, search, and post-incident analysis.

📖 Citable Definition

Agent audit log: a structured event-recording system keyed on trace_id that captures the complete decision chain of an agent during a single user request — LLM inference → tool selection → parameter construction → human approval → execution → result. Unlike regular application logs, an agent audit log records not "what the code did" but "what the LLM decided, who approved it, and what was actually executed" — the full mapping of decision (why) to execution (what).

1. Why Agents Need Specialized Audit Logs

An Incident at 2:37 AM

2:37 AM. A monitoring alert fires on your production agent service — a DELETE operation has been recorded. Nobody approved it. Nobody was even awake. The next morning, the team spends two hours grepping through application logs, checking database table history, and replaying conversation transcripts, barely piecing together what happened: a user had asked earlier that day, "clean up last week's temporary data for me." During a multi-step reasoning loop, the LLM interpreted "clean up" as a database DELETE. The agent executed it — no approval, no confirmation, no trace of the decision left behind.

If your agent had structured audit logs — with trace_id, decision context, complete tool-call parameters, and an approval chain — that same investigation would take three minutes:

# 3-minute root cause: audit log search + correlation
# Step 1: Find the DELETE operation (10 seconds)
$ audit-log search --event-type tool_call --tool-name delete_records --since "2026-05-21T00:00:00Z"
trace_id: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
  agent_id: prod-agent-03
  session_id: sess_8f3a2b1c
  timestamp: 2026-05-22T02:37:14.231Z
  parameters: {"table": "user_data", "filter": "created_at < '2026-05-15'"}
  status: success
  duration_ms: 847

# Step 2: Expand the full decision chain along trace_id (20 seconds)
$ audit-log trace 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
[decision]  LLM chose tool: delete_records, rationale: "user asked to clean up temporary data"
[approval]  ❌ No approval record — approval flow not configured
[tool_call] delete_records → success (847ms, 12,403 rows deleted)

# Step 3: Root cause identified → missing approval flow (2.5 minutes)

This is the core value proposition of audit logs: turn the unobservable LLM decision chain into structured data you can search, correlate, and replay.

Agent Decision Chains vs. Traditional Applications: The Fundamental Difference

Why are regular application logs (INFO/WARN/ERROR + message) insufficient in an agent context? Because an agent's decision chain differs from a traditional application at a structural level:

DimensionTraditional AppAI Agent
Execution pathDeterministic code branches (if/else)LLM reasoning + dynamic tool selection (non-deterministic)
ReproducibilitySame input → same outputSame input → potentially different output (temperature / model version)
Source of side effectsExplicit function calls in codeLLM decides which tool to call and with what parameters
AccountabilityCode author + code reviewLLM decision + human approval + safety policy
Logging needsStack trace + error logtrace_id + decision context + approval chain + tool parameters
Incident analysisgrep error logs → locate code linetrace_id expand decision chain → locate LLM decision + missing approval

The critical difference is in rows 3 and 4: a traditional app's side effects are hard-coded; an agent's side effects are decided by the LLM. When a DELETE fires in a traditional app, you trace it to user_service.py:142. In an agent, you need to answer: why did the LLM choose the delete_records tool? What context did it see? Did anyone approve it?

Regular application logs can answer none of these three questions.

Why Standard App Logs Fall Short

Even if you already have a mature application logging pipeline — structured logs + ELK/Grafana — it has at least three structural gaps in an agent context:

1. trace_id is not a first-class citizen. In an agent scenario, a single user request triggers multiple LLM calls, and each LLM call may trigger multiple tool calls. Standard app logs typically scope trace_id to the request level — you know "which services this request touched," but not "the LLM chose tool A after reasoning step 2, then tool B after step 4." Agents need three-level nested tracing: request → LLM inference → tool call. The trace_id must propagate through every reasoning step and every tool invocation.

2. Decision context is missing. Regular logs record "what happened" — a function was called, a DB query took 50ms. Agents need to record "why it happened" — the LLM's tool_choice rationale, a summary of the system prompt at that moment, what triggered the approval requirement. This isn't a nice-to-have; it's essential for incident analysis. In the 2:37 AM incident, if the logs had contained the LLM's decision rationale ("user asked to clean up temporary data"), root cause would have shrunk from 2 hours to 2 minutes.

3. The approval chain is absent. High-risk agent tool calls — DELETEs, financial operations, configuration changes — require human approval. Standard logs have no "approval" semantic. You might see an API returning 200, but you won't know whether that request was approved by a human or slipped through without approval. Audit logs need to treat approval as a first-class event type: who approved, when, and what context they saw at the time.

Three Core Purposes of Audit Logs

Once you understand what makes agents different, the value of audit logs crystallizes around three scenarios:

🔍 Compliance auditing. When your agent handles user data, executes financial operations, or operates under a compliance framework (SOC 2, HIPAA), auditors will ask one simple but devastating question: "Can you prove every DELETE your agent performed was authorized?" Without audit logs, your answer is "we trust the LLM's output" — In many regulated or enterprise environments, this answer is unlikely to satisfy auditors. A structured audit trail makes authorization, approval, and incident evidence much easier to demonstrate. With audit logs, you produce the full trace: which conversation, which decision, who approved it, when it executed, what data was affected — one complete, verifiable chain.

This is exactly why VM-level isolation, as discussed in Agent Runtime Isolation, carries more weight in audits — a hardware VM boundary is familiar and independently verifiable. Similarly, structured audit logs transform the agent's decision process from a "black box" into an "auditable white box."

🐛 Debugging. An agent bug isn't a NullPointerException — it's "the LLM inexplicably chose an operation it shouldn't have." The traditional debugging cycle for this is: reproduce the conversation → observe LLM output → guess → adjust the prompt → try again. Audit logs shrink this loop from hours to minutes: pull the full decision chain by trace_id, and within a few steps you've located exactly which reasoning step produced the wrong decision.

This connects directly to the error-classification framework in Agent Error Recovery — audit logs are the information foundation for error recovery. If you don't know which step the agent got wrong, you can't design a recovery strategy.

🔄 Replay testing. Audit logs record complete tool-call parameters and return values, making them a natural data source for regression testing. You can: extract the tool-call sequence from historical traces → feed them as test inputs → replay against a new agent version → compare whether decisions remain consistent. This is automated "golden dataset" generation — no manual test-case authoring, sourced directly from production logs.

This capability maps directly to the evaluation data pipeline discussed in Agent Evaluation Framework — audit logs supply the evaluation framework with real, production-grade test data, not hand-crafted toy examples.

What Audit Logs Are Not

Before going deeper, let's define the boundaries:

2. Core Data Model for Audit Logs

The data model is the foundation of any audit log system. Under-design it, and you can't retrieve critical information during an incident. Over-design it, and storage costs spiral while write performance tanks. This section presents a battle-tested five-event-type, twelve-required-field minimal viable model.

The Five Event Types

A single agent tool-call lifecycle decomposes into five independent event types, each answering a distinct question:

Event TypeQuestion It AnswersWhen It FiresTypical Fields
decisionWhy did the LLM choose this tool?After LLM returns tool_choice, before executiontool_name, rationale, prompt_summary, model, temperature
tool_callWhat happened during execution?After tool execution completestool_name, parameters, result_summary, duration_ms, status
tool_resultWhat did the tool return?After tool returns a result (may be merged with or separate from tool_call)tool_name, result, result_size, is_error
approvalWho approved this call?After human approval completesapprover, approval_context, decision, timestamp
errorWhat went wrong?When any error occurs at any stageerror_type, error_message, stack_trace, recoverable

Key design decision: should tool_call and tool_result be merged? In most implementations, tool_result appears as a field (result) inside the tool_call event, rather than as a separate event. The rationale: a single tool invocation naturally bundles "request + response," and splitting them adds correlation overhead. However, if your tools have significant asynchronous behavior — say, calling an external API that takes several seconds — keeping tool_result as an independent event helps record intermediate state. The default recommendation in this article: merge into a single tool_call event. Split into tool_call (start) + tool_result (completion) only when you need asynchronous tracing.

Required Fields for Every Event

Each audit event carries 8 universal fields (required for all event types) and 5 event-specific fields (conditionally required by event_type):

▸ Universal Fields (8 — required for all events)
FieldTypeRequiredDescriptionExample
timestampISO 8601Precise event time (UTC)2026-05-22T02:37:14.231Z
trace_idOTel 32-char hexUnique identifier for the entire user request (OTel trace ID), spans all events0af7651916cd43dd8448eb211c80319c
span_id64-bit hexUnique identifier for the LLM reasoning span (used to correlate with OTel Spans)a1b2c3d4e5f67890
parent_span_id64-bit hex | nullParent span ID (null for root span); Span Events do not have independent parent_span_id0000000000000001
agent_idstringUnique identifier for the agent instanceprod-agent-03
session_idstringUser session identifier (one conversation may span multiple traces)sess_8f3a2b1c
event_typeenumEvent type: decision | tool_call | tool_result | approval | errortool_call
statusenumEvent outcome: success | failure | pending_approval | rejected | timeoutsuccess
▸ Event-Specific Fields (5 — conditionally required by event_type)
FieldTypeRequiredDescriptionExample
tool_namestringConditionalTool name. Required for decision / tool_call / tool_resultdelete_records
parametersJSONConditionalTool call parameters. Required for tool_call; optional for decision (LLM's proposed params){"table": "user_data", "filter": "..."}
resultJSON | nullConditionalTool return value (truncated/sanitized). Required for tool_call / tool_result{"deleted_rows": 12403}
approverstring | nullConditionalApprover identity. Required for approval events; null for all othersuser_zhang_wei
duration_msintegerConditionalEvent duration in milliseconds. Required for tool_call; optional for decision / approval847

The field design follows two principles: (1) trace_id + span_id + parent_span_id — the three-level nesting is sufficient to reconstruct the full call tree, from user request → each LLM inference → each tool call; (2) tool call core objects (tool_name, parameters, result) live as top-level fields, not buried inside a JSON blob — this means searching "all DELETE operations" requires a field-level query rather than a full-text scan.

This directly aligns with the tool registration mechanism in Agent Tool Design Best Practices — the tool_name in audit logs should match the name in the tool registry, ensuring every log entry is traceable to a specific tool definition.

JSON Schema and Pydantic Model

Below is the complete Pydantic model definition. Using Pydantic provides built-in type validation, JSON serialization/deserialization, and automatic JSON Schema generation — giving audit event production and consumption strong typing guarantees.

from __future__ import annotations

from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from uuid import uuid4

from pydantic import BaseModel, Field


class EventType(str, Enum):
    """Audit event types."""
    DECISION = "decision"
    TOOL_CALL = "tool_call"
    TOOL_RESULT = "tool_result"
    APPROVAL = "approval"
    ERROR = "error"


class EventStatus(str, Enum):
    """Event execution status."""
    SUCCESS = "success"
    FAILURE = "failure"
    PENDING_APPROVAL = "pending_approval"
    REJECTED = "rejected"
    TIMEOUT = "timeout"


class AuditEvent(BaseModel):
    """Generic event model for agent audit logs.

    All events share 8 universal fields, with 5 more conditionally required by event_type.
    See field descriptions for conditional requirements.
    """

    # ── Time & Tracing ──
    timestamp: str = Field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat(),
        description="Event time (ISO 8601, UTC)",
        examples=["2026-05-22T02:37:14.231Z"],
    )
    trace_id: str = Field(
        description="OTel trace ID (32-char hex), generated by OTel SDK, spans all events",
        examples=["0af7651916cd43dd8448eb211c80319c"],
    )
    audit_event_id: Optional[str] = Field(
        default=None,
        description="Audit event unique identifier (UUID v7, time-sortable, for business queries)",
        examples=["0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67"],
    )
    span_id: str = Field(
        description="Current span unique identifier (64-bit hex)",
        examples=["a1b2c3d4e5f67890"],
    )
    parent_span_id: Optional[str] = Field(
        default=None,
        description="Parent span ID. Null for root spans",
        examples=["0000000000000001"],
    )

    # ── Identity & Session ──
    agent_id: str = Field(
        description="Agent instance identifier",
        examples=["prod-agent-03"],
    )
    session_id: str = Field(
        description="User session identifier",
        examples=["sess_8f3a2b1c"],
    )

    # ── Event Core ──
    event_type: EventType = Field(description="Event type")
    status: EventStatus = Field(description="Event execution status")

    # ── Tool Call Related (conditionally required by event_type) ──
    tool_name: Optional[str] = Field(
        default=None,
        description="Tool name. Required for decision/tool_call/tool_result",
        examples=["delete_records"],
    )
    parameters: Optional[dict[str, Any]] = Field(
        default=None,
        description="Tool call parameters (JSON). Required for tool_call, optional for decision",
        examples=[{"table": "user_data", "filter": "created_at < '2026-05-15'"}],
    )
    result: Optional[dict[str, Any]] = Field(
        default=None,
        description="Tool return value (truncated/sanitized). Required for tool_call/tool_result",
        examples=[{"deleted_rows": 12403}],
    )

    # ── Approval & Error ──
    approver: Optional[str] = Field(
        default=None,
        description="Approver identity. Required for approval events",
        examples=["user_zhang_wei"],
    )
    error_message: Optional[str] = Field(
        default=None,
        description="Error message. Required for error events",
    )
    error_type: Optional[str] = Field(
        default=None,
        description="Error type (e.g., TimeoutError, ValidationError)",
    )

    # ── Performance & Context ──
    duration_ms: int = Field(
        default=0,
        ge=0,
        description="Event duration in milliseconds",
        examples=[847],
    )
    metadata: dict[str, Any] = Field(
        default_factory=dict,
        description="Extended metadata (model, temperature, prompt_summary, etc.)",
    )

    # ── Serialization & Factory Methods ──

    def to_json(self, indent: int | None = None) -> str:
        """Serialize to JSON string."""
        return self.model_dump_json(indent=indent, exclude_none=True)

    @classmethod
    def from_json(cls, data: str) -> "AuditEvent":
        """Deserialize from JSON string."""
        return cls.model_validate_json(data)

    @classmethod
    def new_audit_event_id(cls) -> str:
        """Generate a UUID v7 as audit_event_id (time-sortable, for business queries).

        In production, use the uuid6 or uuid7 library.
        Shown here with uuid4 for a zero-dependency runnable version.
        trace_id is generated by the OTel SDK (32-char hex) and injected by the agent framework.
        """
        return str(uuid4())


# ── Usage Example ──

# Create a tool_call event (trace_id injected by OTel SDK)
event = AuditEvent(
    trace_id="0af7651916cd43dd8448eb211c80319c",  # OTel trace ID (32-char hex)
    audit_event_id=AuditEvent.new_audit_event_id(),  # UUID v7, for business queries
    span_id="a1b2c3d4e5f67890",
    parent_span_id="0000000000000001",
    agent_id="prod-agent-03",
    session_id="sess_8f3a2b1c",
    event_type=EventType.TOOL_CALL,
    status=EventStatus.SUCCESS,
    tool_name="delete_records",
    parameters={"table": "user_data", "filter": "created_at < '2026-05-15'"},
    result={"deleted_rows": 12403},
    duration_ms=847,
)

# You can also get the current trace_id from the OTel SDK:
# from opentelemetry import trace
# trace_id = format(span.get_span_context().trace_id, '032x')

# Serialize
json_str = event.to_json(indent=2)
print(json_str)

# Deserialize
restored = AuditEvent.from_json(json_str)
assert restored.tool_name == "delete_records"
assert restored.status == EventStatus.SUCCESS

Key design decisions in the model:

1. timestamp as string, not datetime object. Storing ISO 8601 UTC strings avoids ambiguity across serializers and downstream storage systems. If using datetime objects, configure serialization explicitly.

2. trace_id reuses the OTel trace ID. Use the 32-char hex trace_id generated by the OpenTelemetry SDK as the audit log's trace_id, ensuring full compatibility with Jaeger/Tempo for cross-system correlation. UUID v7 remains as an optional audit_event_id (time-sortable, for business queries and partitioning). UUID v7's first 48 bits are a millisecond-precision timestamp — this improves write locality and makes IDs roughly time-sortable, but timestamp remains a first-class field for partitioning, retention, and query filters. For UUID v7 generation in production, use the uuid6 or uuid7 Python library.

3. Pydantic conditional validation. The AuditEvent model above is a storage schema — it declares all fields for serialization. Production code should add event-type-specific validation. Here is a model_validator example showing the core pattern:

from pydantic import model_validator

class AuditEvent(BaseModel):
    # ... field definitions as above ...

    @model_validator(mode="after")
    def validate_conditional_fields(self):
        """Enforce conditional required fields based on event_type."""
        et = self.event_type
        if et in (EventType.DECISION, EventType.TOOL_CALL, EventType.TOOL_RESULT):
            if not self.tool_name:
                raise ValueError(f"{et.value} event requires tool_name")
        if et == EventType.TOOL_CALL:
            if self.parameters is None:
                raise ValueError("tool_call event requires parameters")
            if self.duration_ms is None:
                raise ValueError("tool_call event requires duration_ms")
        if et == EventType.APPROVAL:
            if not self.approver:
                raise ValueError("approval event requires approver")
        if et == EventType.ERROR:
            if not self.metadata.get("error_type"):
                raise ValueError("error event requires error_type in metadata")
        return self

4. result and parameters require truncation and sanitization. The code example above shows full values, but before writing to log storage, every event must pass through a sanitization pipeline. Below is how sanitization integrates:

import re
from typing import Any

# Sensitive field patterns (key names that need redaction)
_SENSITIVE_KEY_PATTERNS = re.compile(
    r"(api_key|token|password|secret|credential|auth)",
    re.IGNORECASE,
)

# Maximum character length for result truncation
_MAX_RESULT_LENGTH = 1024


def sanitize_parameters(params: dict[str, Any]) -> dict[str, Any]:
    """Apply field-level redaction to tool call parameters."""
    sanitized = {}
    for key, value in params.items():
        if _SENSITIVE_KEY_PATTERNS.search(key):
            sanitized[key] = "REDACTED"
        elif isinstance(value, dict):
            sanitized[key] = sanitize_parameters(value)
        elif isinstance(value, list):
            sanitized[key] = [
                sanitize_parameters(v) if isinstance(v, dict) else v
                for v in value
            ]
        else:
            sanitized[key] = value
    return sanitized


def truncate_result(result: Any, max_length: int = _MAX_RESULT_LENGTH) -> Any:
    """Truncate overly long tool return values."""
    if isinstance(result, str) and len(result) > max_length:
        return result[:max_length] + f"... [truncated, total {len(result)} chars]"
    if isinstance(result, dict):
        return {k: truncate_result(v, max_length) for k, v in result.items()}
    if isinstance(result, list):
        return [truncate_result(v, max_length) for v in result[:10]]
    return result


# Usage example: sanitize before writing to log storage
raw_params = {"api_key": "sk-abc123", "table": "user_data"}
safe_params = sanitize_parameters(raw_params)
# → {"api_key": "REDACTED", "table": "user_data"}

raw_result = "x" * 5000
safe_result = truncate_result(raw_result)
# → "xxxx...xxxx [truncated, total 5000 chars]"

This sanitization pipeline should execute after event object construction and before writing to storage. Don't attempt sanitization at the tool-call layer — tools need to return complete business data. Audit-log sanitization is the responsibility of the audit layer.

Integration Point with the MCP Protocol

If your agent uses the MCP Protocol (Model Context Protocol) to manage tool calls, the audit event model maps directly onto MCP's invocation lifecycle: MCP's tools/call request corresponds to the audit tool_call event, and MCP's tools/list provides the current tool registry as reference metadata for audit logs.

A recommended integration pattern: embed audit log hooks inside the MCP server's tool-call handler, so every MCP tool invocation automatically produces an audit event — no extra coding required at the agent application layer.

3. Trace ID and Distributed Tracing

Section 2 defined the audit log data model, but the trace_id and span_id fields in that model only deliver value when they are correctly generated and propagated. This section dives into the engineering details behind these two fields: how to choose a generation strategy, how spans nest into a call tree, and how to use OpenTelemetry to propagate trace context end-to-end in Python.

Trace ID Generation Strategies Compared

Generating a trace_id looks trivial — just call a UUID library. But the wrong choice leads to broken tracing or storage inefficiencies. Recommended model: OTel-native. Use the OpenTelemetry SDK's 32-char hex trace ID as the canonical trace_id (fully compatible with Jaeger/Tempo), with UUID v7 as an optional audit_event_id (time-sortable, for business queries and partitioning).

StrategyStructureLengthTime-SortableGlobally UniqueCollision RiskUse Case
OTel trace_id128-bit, 32 hex chars32 chars❌ Not sortable (random)✅ OTel SDK guarantees uniqueness10-36Recommended for trace_id — compatible with Jaeger/Tempo
UUID v7First 48 bits: ms timestamp
Last 74 bits: random
36 chars✅ Time-sortable✅ Extremely low collision10-17audit_event_id / correlation_id — for business queries and partitioning
Snowflake41-bit timestamp + 10-bit machine ID + 12-bit sequence19-digit integer✅ Time-sortable⚠️ Depends on machine ID uniqueness0 (single machine)Ultra-high throughput; requires integer IDs

Why the OTel-native model is recommended:

1. Cross-system compatibility. Using OTel trace_id ensures that the trace_id in your audit log is exactly the same as the trace ID in Jaeger, Grafana Tempo, or ClickHouse — no mapping table needed. Find a problematic trace in Jaeger → copy the trace_id → search directly in your audit log store — one step, done.

2. UUID v7's complementary role as audit_event_id. Each audit event still needs its own time-sortable ID. UUID v7's first 48 bits are a millisecond timestamp, so sorting by audit_event_id naturally sorts by time — improving write locality. UUID v7 requires no external coordination, making it ideal for multi-region deployments.

3. If you're not using OTel. If your team has no OTel infrastructure, UUID v7 can serve as a custom trace_id — it's an excellent time-sortable correlation ID on its own. However, if you later adopt OTel, plan to migrate to the OTel-native model.

Below is a Python implementation for generating and parsing UUID v7 — used for audit_event_id (the unique ID for each persisted audit event):

"""UUID v7 generation and parsing demo.

For production, use the 'uuid7' library (pip install uuid7).
The code below demonstrates the core principle — extracting
and verifying the first 48-bit millisecond timestamp.
"""

import time
import os


def generate_uuid_v7() -> str:
    """Generate a UUID v7.

    Format: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
            ^^^^^^^^ ^^^^ millisecond timestamp (first 48 bits)
    Production: pip install uuid7 → uuid7.uuid7()
    """
    # Note: this is a principle demo; use the standard library in production
    try:
        from uuid_extensions import uuid7  # third-party library
        return str(uuid7())
    except ImportError:
        # Fallback: manual construction (for understanding the structure only)
        ts = int(time.time() * 1000)  # milliseconds since epoch
        rand = os.urandom(10)         # 74 random bits
        ts_hex = f"{ts:012x}"         # 48 bits = 12 hex chars
        rand_hex = rand.hex()[:20]    # first 20 hex chars
        # Format as standard UUID v7 (inject version and variant markers)
        raw = ts_hex + rand_hex
        return (
            f"{raw[0:8]}-{raw[8:12]}-7{raw[13:16]}-"
            f"{'8' if int(raw[16], 16) >= 8 else raw[16]}{raw[17:20]}-{raw[20:32]}"
        )


def extract_timestamp_from_uuidv7(uuid_str: str) -> int:
    """Extract the millisecond timestamp from a UUID v7.

    Useful for: log sorting, time-range queries, storage partitioning.
    """
    # UUID v7: first 8+4=12 hex chars = 48-bit timestamp
    ts_hex = uuid_str[:8] + uuid_str[9:13]
    return int(ts_hex, 16)


# ── Usage ──

audit_event_id = generate_uuid_v7()
print(f"Audit Event ID:  {audit_event_id}")
# → Audit Event ID: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67

ts = extract_timestamp_from_uuidv7(audit_event_id)
print(f"Timestamp: {ts} ms since epoch")
# → Timestamp: 1747888634231 ms since epoch

print(f"Readable:  {time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(ts/1000))}")
# → Readable: 2026-05-22T02:37:14

UUID v7 improves write locality and makes audit_event_id roughly time-sortable, but timestamp remains a first-class field for partitioning, retention, and query filters — don't rely solely on ID ordering to replace time-based indexing.

Span Nesting Model: A Three-Level Call Tree

With trace_id sorted, the next question is: how do you organize the call tree using it? A single agent user request has a natural three-level nesting structure:

User Request (trace_id, root span)
│
├── LLM Reasoning #1 (span_id=a1, parent_span_id=null)
│   ├── decision: tool selected = search_docs
│   ├── tool_call: search_docs (span_id=a2, parent_span_id=a1)
│   │   └── tool_result: {"documents": [...]}
│   └── LLM receives result, continues reasoning
│
├── LLM Reasoning #2 (span_id=a3, parent_span_id=null ← sibling, not child)
│   ├── decision: tool selected = delete_records (requires approval)
│   ├── approval: user_zhang_wei approved (span_id=a4, parent_span_id=a3)
│   └── tool_call: delete_records (span_id=a5, parent_span_id=a3)
│       └── tool_result: {"deleted_rows": 12403}
│
└── LLM final response to user

This three-level structure maps to span fields as follows:

Levelspan_idparent_span_idCorresponding Event TypeExample
L1: User Requestroot span (e.g., 0000000000000001)null— (trace container, emits no event)User sends "clean up temp data for me"
L2: LLM Reasoningreasoning span (e.g., a1b2c3d4e5f67891)0000000000000001decisionLLM selects delete_records, rationale: "user asked to clean up"
L3: Tool Call / Approvaltool span (e.g., f1e2d3c4b5a67892)a1b2c3d4e5f67891tool_call / approvalActual execution of delete_records, or the approval chain

Key design decision: sibling relationship between LLM reasoning spans. In the diagram above, LLM Reasoning #1 and #2 both have parent_span_id=null (or point to the same root span) — they are siblings, not parent and child. Why not nest them? Because each LLM reasoning step is sequentially executed: Reasoning #1 completes, the agent framework receives the tool result, then Reasoning #2 begins. They are two independent steps within the same request, not a nested invocation. Modeling them as siblings preserves the linear timeline and makes them easier to view chronologically in a tracing UI.

In contrast, tool-call spans are children of the LLM reasoning span — because the tool call is triggered by that reasoning step and is the execution of its decision, a natural parent-child relationship exists.

OpenTelemetry Integration

OpenTelemetry (OTel) is already the de facto standard for distributed tracing. Agent audit logs should not reinvent trace context propagation — instead, they should build an agent-specific semantic layer on top of OTel. The integration architecture is:

Architecture layers:

┌─────────────────────────────────────────┐
│        Agent Audit Event Layer           │
│  (AuditEvent: decision, tool_call, ...)   │  ← Core of this article
├─────────────────────────────────────────┤
│       OpenTelemetry Span Layer           │
│  (trace_id, span_id, parent_span_id)      │  ← Reuse OTel infrastructure
├─────────────────────────────────────────┤
│         OTel SDK / Exporter              │
│  (OTLP → Jaeger / Tempo / custom backend) │  ← Standard export pipeline
└─────────────────────────────────────────┘

Specifically:

Code: Python OTel SDK Integration

The code below demonstrates a complete integration of agent audit logging with OpenTelemetry — from initialization, to Span creation, to trace_id propagation through tool calls.

Step 1: OpenTelemetry Initialization

"""OpenTelemetry initialization module.

Call init_otel() once at agent service startup
to configure TracerProvider, Span Processor, and Exporter.
"""

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource


def init_otel(
    service_name: str = "agent-service",
    otlp_endpoint: str = "http://localhost:4317",
    console_debug: bool = False,
) -> trace.TracerProvider:
    """Initialize the OpenTelemetry SDK.

    Args:
        service_name: Service name — appears in resource attributes of all spans.
        otlp_endpoint: gRPC address of the OTLP Collector.
        console_debug: Enable Console Exporter in dev for easy debugging.

    Returns:
        A configured TracerProvider instance.

    Production best practices:
    - Use OTLPSpanExporter to export to Jaeger / Grafana Tempo
    - Use BatchSpanProcessor to avoid a network call per span
    - Attach service.version and deployment.environment in Resource
    """

    # Create Resource: identifies the service
    resource = Resource.create({
        SERVICE_NAME: service_name,
        "service.version": "2.3.1",
        "deployment.environment": "production",
    })

    # Create TracerProvider and bind Resource
    provider = TracerProvider(resource=resource)

    # OTLP gRPC Exporter → Jaeger / Tempo / any OTLP-compatible backend
    otlp_exporter = OTLPSpanExporter(
        endpoint=otlp_endpoint,
        insecure=True,  # use TLS in production
    )
    provider.add_span_processor(BatchSpanProcessor(otlp_exporter))

    # Dev mode: also output to console
    if console_debug:
        provider.add_span_processor(
            BatchSpanProcessor(ConsoleSpanExporter())
        )

    # Set as the global TracerProvider
    trace.set_tracer_provider(provider)

    return provider


# ── Obtain a Tracer ──

def get_tracer(name: str = __name__) -> trace.Tracer:
    """Get an OpenTelemetry Tracer instance."""
    return trace.get_tracer(name)


# ── Usage ──

if __name__ == "__main__":
    init_otel(console_debug=True)
    tracer = get_tracer("agent-audit-demo")
    print(f"OTel initialized. Tracer: {tracer}")
    # → OTel initialized. Tracer: <opentelemetry.sdk.trace.Tracer ...>

Step 2: Custom Span Creation with Audit Attributes

"""Agent-audit-specific Span utilities.

Injects agent-specific audit semantics on top of OTel Spans —
each decision, tool_call, and approval event maps to a Span Event,
with audit fields written as Span Attributes.
"""

from __future__ import annotations

import json
import time
from contextlib import contextmanager
from typing import Any, Optional

from opentelemetry import trace
from opentelemetry.trace import Span, SpanKind, Status, StatusCode


# ── Global Tracer ──
tracer = trace.get_tracer("agent-audit")


# ── Audit Attribute Key Constants ──

class AuditAttr:
    """Constants for audit-log OTel Span attribute keys.

    Using constants avoids inconsistency from hardcoded strings.
    """
    AGENT_ID = "agent.id"
    SESSION_ID = "agent.session_id"
    EVENT_TYPE = "agent.event_type"
    TOOL_NAME = "agent.tool_name"
    PARAMETERS = "agent.parameters"
    RESULT = "agent.result"
    RESULT_SIZE = "agent.result_size"
    APPROVER = "agent.approver"
    APPROVAL_CONTEXT = "agent.approval_context"
    RATIONALE = "agent.rationale"
    MODEL = "agent.model"
    TEMPERATURE = "agent.temperature"
    PROMPT_SUMMARY = "agent.prompt_summary"
    DURATION_MS = "agent.duration_ms"
    STATUS = "agent.status"
    ERROR_TYPE = "agent.error_type"
    ERROR_MESSAGE = "agent.error_message"


# ── Span Creation Factory ──

def create_llm_span(
    agent_id: str,
    session_id: str,
    model: str,
    temperature: float,
    prompt_summary: str = "",
    parent_span: Optional[Span] = None,
) -> Span:
    """Create an LLM reasoning Span.

    Called each time the agent invokes the LLM; serves as the
    parent span for subsequent tool_call Span Events.
    """
    ctx = trace.set_span_in_context(parent_span) if parent_span else None
    span = tracer.start_span(
        "agent.llm.reasoning",
        context=ctx,
        kind=SpanKind.INTERNAL,
        attributes={
            AuditAttr.AGENT_ID: agent_id,
            AuditAttr.SESSION_ID: session_id,
            AuditAttr.MODEL: model,
            AuditAttr.TEMPERATURE: temperature,
            AuditAttr.PROMPT_SUMMARY: prompt_summary,
        },
    )
    return span


def record_decision(
    span: Span,
    tool_name: str,
    rationale: str,
    proposed_params: Optional[dict[str, Any]] = None,
):
    """Record a decision event on the LLM Span.

    Call timing: after LLM returns tool_choice, before tool execution.

    Args:
        span: The current LLM reasoning Span.
        tool_name: The tool the LLM chose.
        rationale: The LLM's decision rationale (extracted from tool_choice).
        proposed_params: The parameters the LLM proposed.
    """
    attrs = {
        AuditAttr.EVENT_TYPE: "decision",
        AuditAttr.TOOL_NAME: tool_name,
        AuditAttr.RATIONALE: rationale,
    }
    if proposed_params:
        attrs[AuditAttr.PARAMETERS] = json.dumps(proposed_params, ensure_ascii=False)

    span.add_event("agent.decision", attributes=attrs)


def record_tool_call(
    span: Span,
    tool_name: str,
    parameters: dict[str, Any],
    result: Any,
    duration_ms: int,
    status: str,
    error_type: Optional[str] = None,
    error_message: Optional[str] = None,
):
    """Record a tool_call event on the Span (merged with tool_result).

    Call timing: after tool execution completes.

    Args:
        span: The parent LLM reasoning Span (tool_call as a Span Event,
              not an independent Span). For an independent Span, create
              tool_call as a child span of the reasoning span.
        tool_name: Tool name.
        parameters: Actually executed parameters (already sanitized).
        result: Tool return value (already truncated).
        duration_ms: Tool execution time in milliseconds.
        status: success | failure | timeout.
    """
    attrs = {
        AuditAttr.EVENT_TYPE: "tool_call",
        AuditAttr.TOOL_NAME: tool_name,
        AuditAttr.PARAMETERS: json.dumps(parameters, ensure_ascii=False),
        AuditAttr.RESULT: json.dumps(result, ensure_ascii=False),
        AuditAttr.DURATION_MS: str(duration_ms),
        AuditAttr.STATUS: status,
    }
    if error_type:
        attrs[AuditAttr.ERROR_TYPE] = error_type
    if error_message:
        attrs[AuditAttr.ERROR_MESSAGE] = error_message

    span.add_event("agent.tool_call", attributes=attrs)


def record_approval(
    span: Span,
    tool_name: str,
    approver: str,
    decision: str,
    approval_context: str = "",
):
    """Record an approval event on the Span.

    Call timing: after human approval completes.

    Args:
        span: The current LLM reasoning Span.
        tool_name: The tool requiring approval.
        approver: Approver identity.
        decision: approved | rejected.
        approval_context: Context summary shown during approval.
    """
    attrs = {
        AuditAttr.EVENT_TYPE: "approval",
        AuditAttr.TOOL_NAME: tool_name,
        AuditAttr.APPROVER: approver,
        AuditAttr.STATUS: decision,
    }
    if approval_context:
        attrs[AuditAttr.APPROVAL_CONTEXT] = approval_context

    span.add_event("agent.approval", attributes=attrs)


# ── Context Manager: Automatic Span Lifecycle ──

@contextmanager
def agent_trace(
    agent_id: str,
    session_id: str,
    model: str = "claude-sonnet-4-20250514",
    temperature: float = 0.0,
):
    """Top-level trace context manager for an agent request.

    Usage:
        with agent_trace("prod-agent-03", "sess_8f3a2b1c") as span:
            trace_id = span.get_span_context().trace_id
            # ... agent reasoning loop ...
            record_decision(span, "search_docs", "User asked to search docs")
    """
    span = tracer.start_span(
        "agent.request",
        kind=SpanKind.SERVER,
        attributes={
            AuditAttr.AGENT_ID: agent_id,
            AuditAttr.SESSION_ID: session_id,
            AuditAttr.MODEL: model,
            AuditAttr.TEMPERATURE: temperature,
        },
    )
    try:
        yield span
        span.set_status(Status(StatusCode.OK))
    except Exception as e:
        span.set_status(Status(StatusCode.ERROR, str(e)))
        span.record_exception(e)
        raise
    finally:
        span.end()

Step 3: trace_id End-to-End Propagation — Complete Agent Reasoning Example

"""Complete agent reasoning loop example.

Demonstrates trace_id propagation through the full chain:
agent entry → LLM reasoning → tool call → approval.
Every step emits the corresponding audit event into the OTel Span.
"""

import json
import time
from opentelemetry.trace import format_trace_id


def agent_reasoning_loop(
    agent_id: str,
    session_id: str,
    user_message: str,
):
    """A single complete agent reasoning loop.

    Simulates: user request → LLM reasoning #1 → tool call →
               LLM reasoning #2 → approval → tool call

    trace_id is automatically generated and propagated inside
    the with agent_trace() block — all Spans and Span Events
    share the same trace_id.
    """
    with agent_trace(agent_id, session_id) as root_span:

        # Get trace_id (16 bytes → 32 hex chars)
        trace_id_bytes = root_span.get_span_context().trace_id
        trace_id_hex = format_trace_id(trace_id_bytes)
        print(f"\n═══ Trace: {trace_id_hex} ═══")
        print(f"User: {user_message}\n")

        # ── LLM Reasoning #1: Tool Selection ──
        llm_span_1 = create_llm_span(
            agent_id=agent_id,
            session_id=session_id,
            model="claude-sonnet-4-20250514",
            temperature=0.0,
            prompt_summary="User asking to search product documentation",
            parent_span=root_span,
        )

        # Simulate LLM decision
        decision_start = time.time()
        tool_choice = "search_docs"
        rationale = "User is asking about product features; need to search related docs"
        proposed_params = {"query": "product features", "limit": 5}
        time.sleep(0.3)  # simulate LLM inference time
        decision_duration = int((time.time() - decision_start) * 1000)

        record_decision(llm_span_1, tool_choice, rationale, proposed_params)
        print(f"  [decision] → {tool_choice}: {rationale} ({decision_duration}ms)")

        # ── Tool Call #1: search_docs ──
        tool_start = time.time()
        # Simulate tool execution
        tool_result = {
            "documents": [
                {"id": "doc_001", "title": "Product Feature Overview", "relevance": 0.95},
                {"id": "doc_002", "title": "API Usage Guide", "relevance": 0.87},
            ],
            "total": 2,
        }
        time.sleep(0.15)  # simulate tool execution time
        tool_duration = int((time.time() - tool_start) * 1000)

        record_tool_call(
            llm_span_1,
            tool_name="search_docs",
            parameters=proposed_params,
            result=tool_result,
            duration_ms=tool_duration,
            status="success",
        )
        print(f"  [tool_call] search_docs → success ({tool_duration}ms)")

        llm_span_1.end()

        # ── LLM Reasoning #2: High-Risk Operation, Requires Approval ──
        llm_span_2 = create_llm_span(
            agent_id=agent_id,
            session_id=session_id,
            model="claude-sonnet-4-20250514",
            temperature=0.0,
            prompt_summary="User asked to clean up temporary data",
            parent_span=root_span,
        )

        decision_start = time.time()
        tool_choice_2 = "delete_records"
        rationale_2 = "User explicitly asked to 'clean up temporary data', matched delete_records tool"
        proposed_params_2 = {
            "table": "user_data",
            "filter": "created_at < '2026-05-15'",
        }
        time.sleep(0.25)
        decision_duration_2 = int((time.time() - decision_start) * 1000)

        record_decision(llm_span_2, tool_choice_2, rationale_2, proposed_params_2)
        print(f"  [decision] → {tool_choice_2}: {rationale_2} ({decision_duration_2}ms)")

        # ── Approval ──
        print(f"  ⏳ Awaiting approval: {tool_choice_2}...")
        time.sleep(0.5)  # simulate approval wait time

        record_approval(
            llm_span_2,
            tool_name=tool_choice_2,
            approver="user_zhang_wei",
            decision="approved",
            approval_context="Confirmed: deleting temp data before May 15, 12,403 rows total",
        )
        print(f"  [approval] → approved by user_zhang_wei")

        # ── Tool Call #2: delete_records ──
        tool_start = time.time()
        # Simulate tool execution
        tool_result_2 = {
            "deleted_rows": 12403,
            "affected_tables": ["user_data"],
            "duration_ms": 847,
        }
        time.sleep(0.2)
        tool_duration_2 = int((time.time() - tool_start) * 1000)

        record_tool_call(
            llm_span_2,
            tool_name=tool_choice_2,
            parameters=proposed_params_2,
            result=tool_result_2,
            duration_ms=tool_duration_2,
            status="success",
        )
        print(f"  [tool_call] {tool_choice_2} → success ({tool_duration_2}ms)")

        llm_span_2.end()

        print(f"\n  ✅ Agent response: Cleaned up 12,403 temporary data records.")

    # Exiting the with block → root_span.end() called automatically
    # BatchSpanProcessor exports spans to OTLP Collector in the background
    print(f"\n  📤 Trace {trace_id_hex} submitted to OTLP Collector")


# ── Run the Example ──

if __name__ == "__main__":
    # Initialize OTel (dev mode: console output)
    init_otel(console_debug=True)

    # Simulate a complete agent request
    agent_reasoning_loop(
        agent_id="prod-agent-03",
        session_id="sess_8f3a2b1c",
        user_message="Search for product feature docs, then clean up last week's temp data",
    )

Three key design points in the code:

1. Span lifecycle management. The agent_trace() context manager guarantees the root span will always be end()-ed — even if the agent throws an exception mid-execution. This is an OTel SDK best practice that prevents span leaks and lost trace data.

2. Span Event vs Child Span model choice. Two modeling approaches, each with different tradeoffs:

DimensionSpan Events ModelChild Span Model
Implementationspan.add_event("agent.decision", ...)
on the LLM span
tracer.start_span("tool_call", ...)
creates an independent child span
span_idShares the parent LLM span's span_id
Span Events have no independent span_id
Each event has its own span_id / parent_span_id
CardinalityLow — all events under one spanHigh — each tool call produces a separate span
Jaeger readabilityGood — no span explosionPoor — a 200ms reasoning step produces multiple spans, deep hierarchy
Replay tree reconstructionRequires extracting from event attributesNaturally has clear parent-child relationships
Recommended fordecision / approval — choices and approvalsLong-running tool calls (e.g., > 5s external API)

This article's code uses the Span Events model: decision, tool_call, and approval are recorded as add_event() on the LLM reasoning Span. A single LLM reasoning step may produce three events in under 200ms — treating each as an independent span would balloon the hierarchy and make Jaeger traces unreadable. Span Events keep the trace tree readable while preserving all audit fields. For long-running tool calls (e.g., an external API taking > 5s), create an independent Child Span for clearer call-tree visibility.

3. trace_id extraction. format_trace_id() converts the OTel SDK's internal 16-byte trace_id into the standard 32-character hex string — the exact value you write into AuditEvent.trace_id. This ensures the trace_id in your audit log store is identical to the one in Jaeger/Tempo, enabling cross-system correlation.

DIY Tracing vs. OTel vs. Commercial Solutions: Overhead Comparison

Finally, a practical engineering decision: should you build trace infrastructure from scratch, use the OpenTelemetry open-source stack, or buy a commercial platform?

DimensionDIY TracingOpenTelemetry (Open Source)Commercial
(Datadog / LangSmith)
Initial investmentHigh — must build trace ID generation, propagation, collection, storage, and query from scratchMedium — OTel SDK is ready out of the box, but you must self-host Jaeger/TempoLow — just integrate the SDK; backend is fully managed
Maintenance costExtremely high — you fix your own bugs, no community supportMedium — operating Jaeger/Tempo (storage, scaling, upgrades); active communityLow — ops are outsourced, but costs scale linearly with volume
Agent audit semanticsFully flexible — add any field you wantRequires customization — Span Attributes + Span Events carry audit fields; you design your own key conventionPartial — LangSmith natively supports LLM tracing; Datadog LLM Observability has dedicated Span types
Storage costDepends on your choice (PostgreSQL / ClickHouse)Depends on backend choice (Elasticsearch is expensive; ClickHouse is cheap)Billed by span count or data volume; costs spike at high throughput
Query capabilityFully customizableJaeger UI is good for trace visualization; complex audit queries need additional tooling (e.g., query storage directly)Rich built-in query and visualization, but bound to platform capabilities
Compliance & data sovereigntyFull control — data never leaves your VPCFull control — open-source, self-hosted deploymentLimited — data stored on third-party infrastructure; must evaluate SOC 2 / GDPR compliance
Recommended forNot recommended — unless you have special compliance needs and a 10+ person teamRecommended — the sweet spot for most agent teamsEarly rapid validation, or large teams needing a turnkey solution

Recommended path: For the majority of teams building production-grade agents, the OpenTelemetry open-source stack offers the best balance. It provides standardized trace_id/span_id generation and propagation, has a mature ecosystem (Jaeger, Grafana Tempo, ClickHouse backends all have ready-made integrations), and keeps your data fully under your control. On top of this foundation, the audit event model described in this article (Section 2's Pydantic AuditEvent + Section 3's Span Event injection) serves as a semantic extension layer — it reuses the distributed tracing infrastructure while filling the semantic gap for LLM decision auditing.

Commercial solutions (LangSmith, Datadog LLM Observability) can be attractive during early prototyping or when the team lacks infrastructure operations capacity, but require careful evaluation of long-term cost. A mid-sized agent system can generate millions of spans per day, and commercial platforms' per-span billing can become an unexpected budget black hole at scale. Moreover, compliance requirements for audit logs — data must not leave your own infrastructure — are hard constraints in many industries (finance, healthcare), and commercial SaaS deployments are inherently incompatible.

DIY tracing should almost never be considered in 2026 — OTel has become the industry standard, and reimplementing trace context propagation is effectively reinventing TCP. The only exception is extreme compliance scenarios (military or intelligence systems), which are outside the scope of this article.

4. Log Storage and Retention Policies

With the data model and trace_id infrastructure in place, the next unavoidable engineering question is: where do audit logs get written, how long do they live, and what does it cost? Unlike regular application logs, audit logs contain structured tool parameters and return values — their write volume and query patterns have unique characteristics. It's not as simple as "just pipe everything to ELK." This section addresses the problem from four angles: storage backend selection, tiered retention, volume estimation, and sanitized writes — a production-grade end-to-end plan.

Storage Backend Comparison

Agent audit log storage has several key constraints: (1) must support structured queries on JSON fields like tool_name and parameters; (2) write throughput is high — each tool call produces 1–3 events, and under high agent concurrency, write QPS can reach tens of thousands; (3) query patterns are heavily time-range scans ("all DELETE operations in the last 24 hours"), not random point lookups; (4) costs must be reasonable — audit log volume grows quickly, and storing everything in Elasticsearch will produce an unexpectedly high bill.

Comparison of four mainstream storage backends:

BackendQuery SpeedStorage CostJSON QueryOps ComplexityBest For
Elasticsearch⭐⭐⭐⭐⭐
Full-text search + aggregations in milliseconds
⭐⭐
Expensive — index overhead is 1.5–3× raw data
⭐⭐⭐⭐
Native JSON support; nested queries are flexible
⭐⭐⭐
Cluster tuning requires experience; JVM tuning is mandatory at scale
Hot tier — real-time search and incident investigation within the last 7 days
ClickHouse⭐⭐⭐⭐
Columnar compression; time-range scans are extremely fast
⭐⭐⭐⭐⭐
Cheap — 5–10× compression ratio; a single node can store TBs
⭐⭐⭐
JSON functions supported, but nested queries are less flexible than ES
⭐⭐⭐
Single-node deployment is simple; cluster-mode ops are moderate
Warm tier — structured queries and replay extraction for 7–90 days
Loki⭐⭐⭐
Label-based retrieval is fast; full-text search is slow
⭐⭐⭐⭐
Only indexes labels; log bodies are stored in object storage (S3)
⭐⭐
No structured JSON query support — it treats logs as text
⭐⭐⭐⭐
Zero-config integration with Grafana; lightweight ops
Not suitable for audit logs — lacks structured query capability on tool_name / parameters
PostgreSQL⭐⭐⭐
JSONB indexes + GIN support decent queries
⭐⭐⭐
Moderate — row storage; disk usage becomes significant at large volumes
⭐⭐⭐⭐⭐
JSONB type + GIN index; strongest query capability
⭐⭐⭐⭐
Teams typically already have operational experience
Small-scale scenarios (< 100M events) or as a metadata index layer

Recommended combination: Elasticsearch (hot) + ClickHouse (warm) + S3/object storage (cold archive). This tiered approach achieves the best balance of query performance, storage cost, and operational complexity. Loki is unsuitable for audit logs — it is designed for unstructured text logs and cannot effectively exploit the structured fields (tool_name, status, parameters) that give audit logs their power. PostgreSQL has the strongest JSONB query capability, but at tens of millions of writes per day, row-storage disk overhead and VACUUM costs become a bottleneck.

Tiered Retention Strategy

Audit logs do not — and should not — all be stored at the same cost and speed forever. Query patterns vary significantly by time window:

TierTime WindowStorage BackendQuery SLATypical Use
🔥 Hot0–7 daysElasticsearchMillisecond full-text searchReal-time incident investigation; on-call alert correlation
🌤 Warm7–90 daysClickHouseSecond-level aggregation queriesWeekly report analysis; replay data extraction; anomaly pattern mining
❄️ Cold Archive90 days – 1 year+S3/Object Storage (Parquet compressed)Minute-level (requires restoration to ClickHouse)Compliance auditing; annual security reviews; long-term trend analysis

Data flow across the three tiers:

┌───────────────────────────────────────────────────┐
│                Agent Audit Events                   │
└─────────────────┬─────────────────────────────────┘
                  │ Dual-write
     ┌────────────┴────────────┐
     ▼                         ▼
┌─────────┐             ┌─────────────┐
│   ES    │  After 7d → │  ClickHouse │  After 90d →  ┌─────────┐
│ (Hot)   │  ILM auto   │  (Warm)      │  TTL auto    │   S3    │
│ 0–7d    │  migrate &  │  7–90d      │  export &    │ (Cold)  │
│         │  delete     │             │  compress    │ 90d–1yr │
└─────────┘             └─────────────┘               └─────────┘

Key implementation details:

1. ES → ClickHouse migration. Use Elasticsearch's ILM (Index Lifecycle Management) policy — indices automatically roll over after 7 days or when they reach 50 GB; new indices receive incoming writes, and old indices are bulk-exported to ClickHouse via a scheduled job (e.g., Apache Spark or a custom Python script). The ES side deletes the old indices only after confirming a successful ClickHouse write.

2. ClickHouse → S3 archival. ClickHouse's TTL (Time To Live) feature can automatically export expired data by partition to S3 in Parquet format and delete the local storage. Parquet's columnar compression can shrink audit logs to 10–15% of their raw JSON volume.

3. Cold archive queries. Compliance audits typically query by time range ("please provide all DELETE operations from Q4 2025"); millisecond response is unnecessary. Cold archive data is queried directly on S3 Parquet files via Athena / Presto, or restored on-demand into a temporary ClickHouse table for complex queries.

Volume Estimation

Before designing a storage solution, estimate the log volume — it directly affects storage cost and technology choices. The three core variables of audit log volume:

Log volume (GB/day) = events_per_request × requests_per_day × avg_event_size

Where:
  events_per_request    Average number of audit events per user request
  requests_per_day      Total user requests processed per day
  avg_event_size        Average size of a single audit event (JSON, KB)

Using a mid-sized agent service as an example:

VariableTypical ValueExplanation
events_per_request4–8Each request typically goes through 1–3 LLM reasoning steps; each reasoning step triggers 1–2 tool calls, plus decision and approval events
requests_per_day100,000Daily average of 100K user requests (typical B2B SaaS volume)
avg_event_size0.5–2 KBDepends on tool return value size. A search_docs result may be ~200 bytes; a delete_records result summary ~500 bytes; a decision event with a lengthy rationale ~1.5 KB. Average: 1 KB

Plugging into the formula: 6 events/req × 100,000 req/day × 1 KB = 600 MB/day (raw JSON).

Actual disk usage across backends:

This estimation reveals two things: (1) ClickHouse's columnar compression is dramatically advantageous for audit logs — the same raw data costs ~1/12th of Elasticsearch's storage; (2) storing everything in Elasticsearch for a year costs 10× more than the ClickHouse + S3 combination. Important: these are sizing heuristics, not guarantees. Actual footprint depends on mapping, indexing policy, compression codec, payload size, shard/partition design, and retention settings. If your agent handles millions of requests per day, benchmark on real workload data before finalizing your storage backend.

If tool return values contain large text blocks (e.g., search_docs returns full document content), record only a summary + document ID in audit logs and reference the full content via session_id in application logs — preventing audit log volume from being inflated by tool return values.

Sensitive Data Masking Strategy

Audit logs record complete tool-call parameters and return values. Without an effective masking mechanism, the audit log store itself becomes the most dangerous dataset in your system — an attacker doesn't need to breach the database; they only need read access to the audit logs.

Section 2 already provided the code implementation for field-level sanitization (sanitize_parameters + truncate_result). Here we supplement with three strategic design principles:

1. Masking happens at the application layer — do not rely on the storage layer. Audit events must be fully sanitized before entering any storage backend. Rationale: (a) data in transit can be intercepted — if masking is done on the ES side, the data in transit is plaintext; (b) in the dual-write scenario (writing to both ES and ClickHouse), sanitizing once at the application layer is more reliable and consistent than sanitizing at each storage endpoint; (c) application-layer masking gives you field-level precision — you know which keys are sensitive (api_key, token, password), whereas the storage layer can only do regex matching.

2. Automatically identify sensitive fields — beyond key-name matching. Key-name-based masking (matching api_key, token) covers 80% of cases, but it's not enough. For sensitive data embedded in parameter values — e.g., a query parameter whose value is "SELECT * FROM users WHERE token='abc123'" — you need an additional value-level scan: use regex to match known sensitive data formats (JWTs, AWS Access Key IDs, GitHub PATs). However, value-level scanning has a higher false-positive rate (normal SQL statements may contain strings that resemble these formats), so it's recommended as an optional enhancement layer, not enabled by default.

3. Balancing masking vs. auditability. Fully masking every field (replacing all values with REDACTED) is the safest approach but renders audit logs useless for incident investigation. A pragmatic balancing strategy:

Code: Python ClickHouse Batch Write Pipeline

The code below demonstrates a complete write pipeline from AuditEvent objects to ClickHouse — including sanitization, batch writes, and error retries. ClickHouse recommends the clickhouse-connect library (the official Python driver), which natively supports dict-type parameterized inserts without manual SQL string concatenation.

"""AuditEvent → ClickHouse write pipeline.

Features:
- Batch writes: accumulate BATCH_SIZE events before flushing to reduce
  network round-trips
- Auto-sanitization: sanitize + truncate parameters and result before writing
- Error retry: automatic retry on network hiccups (up to 3 attempts)
- Graceful shutdown: flush remaining events on process exit

Dependency: pip install clickhouse-connect
"""

from __future__ import annotations

import atexit
import logging
import time
from typing import Any, Optional

import clickhouse_connect

# ── Reuse sanitization functions from Section 2 ──
# from audit_log.model import AuditEvent, sanitize_parameters, truncate_result

logger = logging.getLogger(__name__)

# ClickHouse table schema (DDL reference):
#
# CREATE TABLE audit_log (
#     timestamp DateTime64(3) CODEC(Delta, ZSTD),
#     trace_id String CODEC(ZSTD),
#     span_id String CODEC(ZSTD),
#     parent_span_id Nullable(String) CODEC(ZSTD),
#     agent_id LowCardinality(String),
#     session_id String CODEC(ZSTD),
#     event_type LowCardinality(String),
#     status LowCardinality(String),
#     tool_name LowCardinality(String),
#     parameters String CODEC(ZSTD),      -- JSON string
#     result String CODEC(ZSTD),           -- JSON string
#     approver Nullable(String),
#     error_message Nullable(String),
#     error_type Nullable(String),
#     duration_ms UInt32,
#     metadata String CODEC(ZSTD),         -- JSON string
#     date Date DEFAULT toDate(timestamp)  -- partition key
# ) ENGINE = MergeTree()
# PARTITION BY toYYYYMM(date)
# ORDER BY (agent_id, event_type, timestamp)
# TTL date + INTERVAL 90 DAY DELETE;


class ClickHouseAuditWriter:
    """Audit event → ClickHouse batch writer.

    Usage:
        writer = ClickHouseAuditWriter(host="localhost", database="audit")
        for event in agent_events:
            writer.write(event)
        # Auto-flush on process exit (atexit), or explicit:
        writer.flush()
    """

    _TABLE_COLUMNS = [
        "timestamp", "trace_id", "span_id", "parent_span_id",
        "agent_id", "session_id", "event_type", "status",
        "tool_name", "parameters", "result", "approver",
        "error_message", "error_type", "duration_ms", "metadata",
    ]

    def __init__(
        self,
        host: str = "localhost",
        port: int = 8123,
        username: str = "default",
        password: str = "",
        database: str = "audit",
        table: str = "audit_log",
        batch_size: int = 1000,
        flush_interval: float = 5.0,
        max_retries: int = 3,
    ):
        self._table = table
        self._batch_size = batch_size
        self._flush_interval = flush_interval
        self._max_retries = max_retries
        self._buffer: list[dict[str, Any]] = []
        self._last_flush = time.monotonic()

        # Establish connection
        self._client = clickhouse_connect.get_client(
            host=host,
            port=port,
            username=username,
            password=password,
            database=database,
        )
        logger.info(f"ClickHouse connected: {host}:{port}/{database}.{table}")

        # Graceful shutdown: auto-flush on process exit
        atexit.register(self.flush)

    # ── Public API ──

    def write(self, event: "AuditEvent") -> None:
        """Write a single audit event (buffered; batch-inserted on flush).

        Callers don't need to worry about batching — just call write()
        per event. Flush is triggered automatically by batch_size or
        flush_interval.
        """
        row = self._event_to_row(event)
        self._buffer.append(row)

        # Conditional flush triggers
        if len(self._buffer) >= self._batch_size:
            self._flush()
        elif time.monotonic() - self._last_flush > self._flush_interval:
            self._flush()

    def flush(self) -> None:
        """Force-flush all buffered events."""
        self._flush()

    # ── Internal Implementation ──

    def _event_to_row(self, event: "AuditEvent") -> dict[str, Any]:
        """Convert an AuditEvent to a ClickHouse row dict.

        Applies sanitization + truncation before serializing to JSON strings.
        """

        # Sanitization (reuses Section 2 sanitization functions)
        safe_params = sanitize_parameters(event.parameters or {})
        safe_result = truncate_result(event.result or {})

        return {
            "timestamp": event.timestamp,
            "trace_id": event.trace_id,
            "span_id": event.span_id,
            "parent_span_id": event.parent_span_id,
            "agent_id": event.agent_id,
            "session_id": event.session_id,
            "event_type": event.event_type.value,
            "status": event.status.value,
            "tool_name": event.tool_name or "",
            "parameters": self._safe_json_dumps(safe_params),
            "result": self._safe_json_dumps(safe_result),
            "approver": event.approver,
            "error_message": event.error_message,
            "error_type": event.error_type,
            "duration_ms": event.duration_ms,
            "metadata": self._safe_json_dumps(event.metadata),
        }

    def _flush(self) -> None:
        """Batch-insert into ClickHouse with automatic retry on failure."""
        if not self._buffer:
            return

        rows = self._buffer
        self._buffer = []
        self._last_flush = time.monotonic()

        for attempt in range(1, self._max_retries + 1):
            try:
                self._client.insert(
                    table=self._table,
                    data=rows,
                    column_names=self._TABLE_COLUMNS,
                )
                logger.debug(f"Flushed {len(rows)} events to ClickHouse")
                return
            except Exception as e:
                if attempt < self._max_retries:
                    wait = 2 ** attempt  # exponential backoff: 2s, 4s, 8s
                    logger.warning(
                        f"ClickHouse flush failed (attempt {attempt}/{self._max_retries}): {e}. "
                        f"Retrying in {wait}s..."
                    )
                    time.sleep(wait)
                else:
                    # Final failure: log to error; do not silently drop data
                    logger.error(
                        f"ClickHouse flush failed after {self._max_retries} attempts: {e}. "
                        f"Dropped {len(rows)} events. Last trace_id: {rows[-1].get('trace_id', 'N/A')}"
                    )
                    # Production: should write to dead-letter queue or local fallback file
                    raise

    @staticmethod
    def _safe_json_dumps(obj: Any) -> str:
        """Safe JSON serialization — never throws."""
        import json
        try:
            return json.dumps(obj, ensure_ascii=False, default=str)
        except Exception:
            return "{}"


# ── Usage Example ──

writer = ClickHouseAuditWriter(
    host="localhost",
    database="audit",
    batch_size=500,
)

# Construct audit events
# from audit_log.model import AuditEvent, EventType, EventStatus

# event = AuditEvent(
#     trace_id=...,
#     ...
# )
# writer.write(event)

# Automatically flushes after 1,000 events, or explicit:
# writer.flush()

Three key design points in the write pipeline:

1. Batch writes vs. individual writes. ClickHouse recommends 1,000–10,000 rows per batch — too few and network overhead dominates; too many and memory pressure and retry cost become excessive. batch_size=1000 is a conservative but safe starting point; tune upward based on actual throughput.

2. LowCardinality(String) ClickHouse optimization. For fields with limited enumeration values — agent_id, event_type, status, tool_name — use the LowCardinality type. ClickHouse stores these as dictionary-encoded values, delivering dramatically better query performance and compression ratios than plain String. This is one of the low-level reasons ClickHouse's storage cost is so low for audit log workloads.

3. Dead Letter Queue (DLQ). The code above only logs an error on final write failure. In production, you should write to a dead-letter queue (e.g., local file, Redis list, or a dedicated Kafka topic) — ensuring no audit events are lost when ClickHouse is completely unavailable (e.g., a cluster outage). Replay from the DLQ once it recovers. This is the most easily overlooked — and most critical — component of an audit log write pipeline.

5. Log Replay and Post-Incident Analysis

Audit logs are written, storage is configured — but the true value of an audit log system is proven by a single question: when an incident hits, how fast can you find root cause? This section addresses that question from three angles: rapidly reconstructing the call chain via trace_id, repurposing logs for regression testing, and a structured incident analysis workflow.

3-Minute Trace Reconstruction by trace_id

Let's return to the 2:37 AM incident from Section 1. With audit log storage in place, the investigation proceeds as follows:

Step 1 (10 seconds): Search for the suspicious operation. Query for tool_name=delete_records in ES or ClickHouse within the last 24 hours, sorted by descending time. Locate a tool_call event with trace_id = 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67.

Step 2 (20 seconds): Expand the full decision chain. Use trace_id as the primary key to pull all related events, sorted by timestamp. The output (reconstructing the call tree via parent_span_id) looks like this:

Trace: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
Session: sess_8f3a2b1c   Agent: prod-agent-03

[02:37:13.100] LLM Reasoning #1 (span: a1b2c3d4e5f67891) — 312ms
  ├── [decision] tool=search_docs, rationale="User is asking about data cleanup methods"
  └── [tool_call] search_docs → success (145ms, 3 docs found)

[02:37:13.557] LLM Reasoning #2 (span: a3b4c5d6e7f89012) — 248ms
  ├── [decision] tool=delete_records, rationale="User requested 'clean up temp data',
  │              matched to delete_records. Identified potential impact on user_data table,
  │              rows created before 2026-05-15."
  ├── [approval]  ❌ No approval record — tool=delete_records has no approval flow configured
  │               (approval_required policy check: no match)
  └── [tool_call] delete_records → success (847ms, 12,403 rows deleted)

[02:37:14.652] LLM Final Response: "Cleaned up 12,403 temporary records for you."

Step 3 (2.5 minutes): Root cause analysis. The timeline makes the diagnosis immediate:

The core value: the entire investigation requires no contacting the user to replay the conversation, no grepping through tens of gigabytes of plain-text logs, and no guessing what the LLM was thinking at the time — the structured audit log has already delivered the complete decision chain.

Log Replay for Regression Testing

Audit logs are not just for incident analysis — the complete tool-call sequences they record (trace_id → decision → tool_call → result) are natural inputs for regression testing. The workflow:

┌─────────────────┐     ┌────────────────┐     ┌─────────────────┐
│ Production       │ →   │ Extract Test   │ →   │ Replay in        │
│ Audit Logs       │     │ Cases          │     │ Staging          │
│ (ClickHouse/ES)  │     │ (trace → JSON) │     │ (new Agent ver.) │
└─────────────────┘     └────────────────┘     └────────┬────────┘
                                                        │
                                              ┌─────────▼─────────┐
                                              │ Compare Decisions  │
                                              │ (old vs. new ver.) │
                                              └───────────────────┘

Concrete steps:

  1. Filter golden traces: From ClickHouse, select traces matching quality criteria — e.g., "status=success, with human approval, involving high-risk tool calls" — these are high-quality test cases
  2. Extract test inputs: From the trace's root decision event, extract the original user input and initial context; from subsequent tool_call events, extract full tool-call parameters
  3. Construct test cases: Generate a standardized test case format (e.g., JSON) containing: input prompt, expected tool-call sequence, expected approval nodes
  4. Replay in staging: Feed the same prompt to the new Agent version and observe its decision path (chosen tools, parameters, whether approval was triggered)
  5. Compare differences: Does the new version's decision path match the old version? If different — is it an improvement or a regression? Was the approval flow correctly triggered?

This approach — automatically extracting golden datasets from production logs — has two advantages over hand-written test cases: (1) it covers real-world edge cases, not just human-imagined happy paths; (2) the test set continuously and automatically expands as production traffic grows.

4-Phase Incident Analysis Workflow

Integrating audit logs into the standard on-call engineer incident analysis process:

PhaseActionRole of Audit LogsTime
1. SearchQuery suspicious events by tool_name, status, time rangeStructured field indexes — ES in milliseconds, ClickHouse in seconds10–30 sec
2. CorrelatePull the full trace by trace_id; reconstruct the call tree via parent_span_idSpan nesting model automatically restores LLM reasoning → tool call hierarchy20–60 sec
3. TimelineSort all events by timestamp; generate a human-readable timelineThe decision → approval → tool_call sequence clearly presents the operation chain30–60 sec
4. Root CauseAnalyze anomalies in the timeline: missing approval? Does the LLM rationale make sense?decision.rationale + approval status directly answers "why" and "who approved"1–3 min

Key insight: in traditional application incident analysis, the most time-consuming step is "timeline reconstruction" — you must piece together the complete call sequence from fragments scattered across different services and log files. Agent audit logs, through the three-level nesting of trace_id + span_id + parent_span_id, automate this step into a single query.

Code: trace_replay.py — Trace Call-Chain Reconstruction Script

The following Python script accepts a trace_id, queries all related events from ClickHouse, reconstructs the full timeline — outputting to both console and a JSON file for sharing with team members.

"""Audit log call-chain reconstruction tool.

Usage:
    python trace_replay.py <trace_id> [--output timeline.json]

Features:
    - Queries ClickHouse for all audit events matching a trace_id
    - Reconstructs the call tree via timestamp + parent_span_id
    - Outputs a human-readable console timeline
    - Exports a structured JSON file
    - Auto-detects missing critical events (e.g., high-risk tool_call
      without an approval record)
"""

from __future__ import annotations

import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime
from typing import Any, Optional

import clickhouse_connect

# ── Event fields ──

EVENT_FIELDS = [
    "timestamp", "trace_id", "span_id", "parent_span_id",
    "agent_id", "session_id", "event_type", "status",
    "tool_name", "parameters", "result", "approver",
    "error_message", "error_type", "duration_ms",
]

# Tools that require approval (example)
HIGH_RISK_TOOLS = {"delete_records", "drop_table", "truncate", "update_config", "execute_sql"}


class TraceReplay:
    """trace_id call-chain reconstructor."""

    def __init__(self, ch_host: str = "localhost", ch_port: int = 8123):
        self._client = clickhouse_connect.get_client(
            host=ch_host, port=ch_port, database="audit"
        )

    def replay(self, trace_id: str) -> dict[str, Any]:
        """Query and reconstruct the complete call chain for a trace."""
        events = self._fetch_events(trace_id)
        if not events:
            raise ValueError(f"No audit events found for trace_id={trace_id}")

        timeline = self._build_timeline(events)
        warnings = self._detect_anomalies(events)

        return {
            "trace_id": trace_id,
            "session_id": events[0].get("session_id", "N/A"),
            "agent_id": events[0].get("agent_id", "N/A"),
            "event_count": len(events),
            "time_span_ms": self._calc_time_span(events),
            "timeline": timeline,
            "warnings": warnings,
            "raw_events": events,
        }

    # ── Data Fetching ──

    def _fetch_events(self, trace_id: str) -> list[dict]:
        """Fetch all events for a trace_id from ClickHouse."""
        query = f"""
            SELECT {', '.join(EVENT_FIELDS)}
            FROM audit_log
            WHERE trace_id = {{trace_id:String}}
            ORDER BY timestamp ASC
        """
        result = self._client.query(query, parameters={"trace_id": trace_id})
        return [dict(zip(EVENT_FIELDS, row)) for row in result.result_rows]

    # ── Timeline Construction ──

    def _build_timeline(self, events: list[dict]) -> list[dict]:
        """Reconstruct the call-chain timeline.

        Strategy:
        1. Group events by span_id — each span contains decision/tool_call/approval
        2. Reconstruct span parent/child/sibling relationships via parent_span_id
        3. Output the full hierarchical timeline
        """
        # ── Step 1: Group by span_id ──
        span_events: dict[str, list[dict]] = defaultdict(list)
        for e in events:
            sid = e.get("span_id", "unknown")
            span_events[sid].append(e)

        # ── Step 2: Build span tree via parent_span_id ──
        span_tree: dict[str, list[str]] = defaultdict(list)
        root_spans: list[str] = []
        for sid, evts in span_events.items():
            parent = evts[0].get("parent_span_id")
            if parent and parent in span_events:
                span_tree[parent].append(sid)
            elif parent is None or parent == "":
                root_spans.append(sid)
            else:
                # parent span not in result set (possibly truncated or in a different partition)
                root_spans.append(sid)

        # ── Step 3: Sort by time and generate timeline entries ──
        timeline = []
        visited = set()
        all_spans = sorted(root_spans, key=lambda s: min(
            e["timestamp"] for e in span_events[s]
        ))

        # BFS traversal of the span tree
        queue = [(sid, 0) for sid in all_spans]  # (span_id, depth)
        while queue:
            sid, depth = queue.pop(0)
            if sid in visited:
                continue
            visited.add(sid)

            evts = sorted(span_events[sid], key=lambda e: e["timestamp"])

            # Classify span type
            span_type = self._classify_span(evts)

            # Add span header
            first_ts = evts[0]["timestamp"]
            timeline.append({
                "type": "span_start",
                "span_id": sid,
                "span_type": span_type,
                "depth": depth,
                "timestamp": first_ts,
                "event_count": len(evts),
            })

            # Add all events within the span
            for e in evts:
                timeline.append({
                    "type": "event",
                    "span_id": sid,
                    "depth": depth + 1,
                    "timestamp": e["timestamp"],
                    "event_type": e["event_type"],
                    "tool_name": e.get("tool_name", ""),
                    "status": e["status"],
                    "approver": e.get("approver"),
                    "duration_ms": e.get("duration_ms", 0),
                    "error_message": e.get("error_message"),
                })

            # Enqueue child spans (sorted by time)
            children = sorted(
                span_tree.get(sid, []),
                key=lambda s: min(e["timestamp"] for e in span_events[s]),
            )
            for child_sid in children:
                queue.append((child_sid, depth + 1))

        return timeline

    def _classify_span(self, events: list[dict]) -> str:
        """Infer span type from the event types within a span."""
        event_types = {e["event_type"] for e in events}
        if "decision" in event_types:
            return "LLM Reasoning"
        if "approval" in event_types:
            return "Approval Node"
        if "tool_call" in event_types:
            return "Tool Call"
        if "error" in event_types:
            return "Error"
        return "Unknown"

    # ── Anomaly Detection ──

    def _detect_anomalies(self, events: list[dict]) -> list[str]:
        """Auto-detect anomalous patterns in audit logs."""
        warnings = []

        # Detection 1: High-risk tool call missing approval
        high_risk_calls = [
            e for e in events
            if e["event_type"] == "tool_call"
            and e.get("tool_name", "") in HIGH_RISK_TOOLS
        ]
        for call in high_risk_calls:
            span_id = call["span_id"]
            has_approval = any(
                e["event_type"] == "approval" and e["span_id"] == span_id
                for e in events
            )
            if not has_approval:
                warnings.append(
                    f"⚠️ High-risk tool call missing approval: tool={call['tool_name']}, "
                    f"span={span_id}, status={call['status']}"
                )

        # Detection 2: Failed tool_call
        failed_calls = [
            e for e in events
            if e["event_type"] == "tool_call" and e["status"] != "success"
        ]
        for call in failed_calls:
            warnings.append(
                f"❌ Tool call failed: tool={call.get('tool_name', 'N/A')}, "
                f"status={call['status']}, "
                f"error={call.get('error_message', 'N/A')}"
            )

        # Detection 3: Explicit error events
        error_events = [e for e in events if e["event_type"] == "error"]
        for err in error_events:
            warnings.append(
                f"💥 Error event: type={err.get('error_type', 'N/A')}, "
                f"message={err.get('error_message', 'N/A')}"
            )

        return warnings

    # ── Utility ──

    def _calc_time_span(self, events: list[dict]) -> int:
        """Calculate total trace duration (milliseconds)."""
        timestamps = [e["timestamp"] for e in events if e.get("timestamp")]
        if len(timestamps) < 2:
            return 0
        min_ts = min(timestamps)
        max_ts = max(timestamps)
        if isinstance(min_ts, str):
            min_ts = datetime.fromisoformat(min_ts.replace("Z", "+00:00"))
            max_ts = datetime.fromisoformat(max_ts.replace("Z", "+00:00"))
        return int((max_ts - min_ts).total_seconds() * 1000)


# ── Formatted Output ──

def format_console(trace_data: dict) -> str:
    """Generate a human-readable console timeline."""
    lines = []
    lines.append(f"{'═' * 70}")
    lines.append(f"Trace: {trace_data['trace_id']}")
    lines.append(f"Session: {trace_data['session_id']}   Agent: {trace_data['agent_id']}")
    lines.append(f"Events: {trace_data['event_count']}   Time Span: {trace_data['time_span_ms']}ms")
    lines.append(f"{'═' * 70}")

    for entry in trace_data["timeline"]:
        ts = entry.get("timestamp", "")[:23]  # Truncate to milliseconds
        depth = entry.get("depth", 0)
        indent = "  " * depth

        if entry["type"] == "span_start":
            lines.append(f"\n{indent}[{ts}] {entry['span_type']} "
                         f"(span: {entry['span_id'][:12]}...) — {entry['event_count']} events")
        elif entry["type"] == "event":
            event_type = entry["event_type"]
            tool = entry.get("tool_name", "")
            status = entry["status"]
            markers = []
            if tool:
                markers.append(f"tool={tool}")
            if status:
                markers.append(status)
            extra = ", ".join(markers) if markers else ""
            lines.append(f"{indent}├── [{event_type}] {extra} "
                         f"({entry.get('duration_ms', 0)}ms)")

    if trace_data["warnings"]:
        lines.append(f"\n{'─' * 70}")
        lines.append("⚠️  Anomaly Detection:")
        for w in trace_data["warnings"]:
            lines.append(f"  {w}")

    lines.append(f"\n{'═' * 70}")
    return "\n".join(lines)


# ── CLI Entry Point ──

def main():
    parser = argparse.ArgumentParser(description="Agent audit log call-chain reconstruction")
    parser.add_argument("trace_id", help="trace_id to reconstruct")
    parser.add_argument("--output", "-o", help="JSON output file path")
    parser.add_argument("--ch-host", default="localhost", help="ClickHouse host (default: localhost)")
    parser.add_argument("--ch-port", type=int, default=8123, help="ClickHouse port (default: 8123)")
    args = parser.parse_args()

    replay = TraceReplay(ch_host=args.ch_host, ch_port=args.ch_port)

    try:
        trace_data = replay.replay(args.trace_id)
    except ValueError as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)

    # Console output
    print(format_console(trace_data))

    # JSON export
    if args.output:
        with open(args.output, "w", encoding="utf-8") as f:
            json.dump(trace_data, f, ensure_ascii=False, indent=2, default=str)
        print(f"\n✅ Timeline exported: {args.output}")


if __name__ == "__main__":
    main()

# ── Usage Example ──
#
# $ python trace_replay.py 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67 -o timeline.json
# ══════════════════════════════════════════════════════════════════
# Trace: 0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67
# Session: sess_8f3a2b1c   Agent: prod-agent-03
# Events: 6   Time Span: 1552ms
# ══════════════════════════════════════════════════════════════════
#
# [2026-05-22T02:37:13.100] LLM Reasoning (span: a1b2c3d4e5f6...) — 2 events
#   ├── [decision] tool=search_docs, success (0ms)
#   ├── [tool_call] tool=search_docs, success (145ms)
#
# [2026-05-22T02:37:13.557] LLM Reasoning (span: a3b4c5d6e7f8...) — 2 events
#   ├── [decision] tool=delete_records, success (0ms)
#   ├── [tool_call] tool=delete_records, success (847ms)
#
# ──────────────────────────────────────────────────────────────────
# ⚠️  Anomaly Detection:
#   ⚠️ High-risk tool call missing approval: tool=delete_records, span=a3b4c5d6e7f8..., status=success
# ══════════════════════════════════════════════════════════════════
# ✅ Timeline exported: timeline.json

Three key design points in the code:

1. Span tree reconstruction logic. _build_timeline is the core — it reconstructs the hierarchical relationships between spans via parent_span_id, then sorts by timestamp to ensure timeline correctness. This directly maps to the OTEL span nesting model from Section 3.

2. Automatic anomaly detection. _detect_anomalies identifies three categories of anomaly while reconstructing the call chain: high-risk operations missing approval, failed tool calls, and explicit error events. This transforms incident analysis from "passive search" to "active discovery" — the on-call engineer doesn't need to guess where the problem is; the script flags it directly.

3. Dual output mode. Console output enables rapid human review; JSON export integrates into automated pipelines — for example, CI/CD automatically replays critical traces after deployment and compares differences.

Real Scenario: Longrise Tech — The Missing Approval Flow Incident

The following case is constructed from real-world incident patterns (company name and details fictionalized):

Background: Longrise Tech is a B2B SaaS company whose customer-service Agent system "SmartService Assistant" manages ~500,000 enterprise customer ticket records. The Agent has 42 tools, including query, create, update, and delete operations on tickets.

Incident: On Tuesday at 3:12 PM, a customer service supervisor discovered that 3,200 closed tickets from the previous week had vanished from the database. Investigation revealed these tickets were bulk-deleted at 10:45 AM that morning — but no human had manually executed any delete operation.

Investigation (using audit logs):

  1. 10:48 AM — Search phase (15 seconds): The on-call engineer queries ClickHouse for tool_name=delete_tickets AND timestamp > '2026-05-19T10:00:00' — returns 1 record, trace_id 0199c3e1-...
  2. 10:49 AM — Correlation phase (30 seconds): Pulling the full trace by trace_id yields 8 events. The timeline shows:
    • 10:45:03 — Support agent Zhang San says in conversation: "Help me archive last week's closed tickets and clean up the database"
    • 10:45:05 — LLM Reasoning #1: selects delete_tickets, rationale: "User requested cleanup of closed tickets in database"
    • 10:45:06 — ❌ Missing approval event. The delete_tickets tool should have an approval flow, but the policy is misconfigured — the approval rule references delete_ticket (singular), while the tool's registered name is delete_tickets (plural)
    • 10:45:06 — tool_call: delete_tickets → success (3,200 tickets deleted)
  3. 10:50 AM — Root cause (1 minute): A tool-name mismatch in the approval flow configuration — delete_ticket vs delete_tickets — caused the approval policy to silently skip. Remediation: enforce unified tool naming conventions + add an alerting mechanism to the approval policy engine (when a configured tool name does not exist in the registry, alert rather than silently ignore).

Without audit logs: Traditional investigation would require (a) grepping application logs for all requests containing "delete" — potentially thousands; (b) checking each request individually for approval records — which live in a separate, independent system; (c) replaying conversation history to reconstruct context — requiring the full dialogue for the relevant session. Estimated total effort: 2–4 hours, requiring coordination across multiple people.

Key lessons: (1) Exact tool-name matching is critical — the tool_name in audit logs must be identical to the name in the tool registry and the approval policy references; (2) "Silent skip" in approval policies is the most dangerous behavior — when the policy engine cannot find a matching rule, it should default to deny, not allow; (3) Tools like trace_replay.py with automatic anomaly detection (missing approval) can surface alerts before an incident escalates.

6. Integrating Audit Logs with Evaluation Systems

Audit logs serve a dual purpose: they are an incident forensics tool and a high-quality data source for agent evaluation. While Section 5 showed how to replay logs for post-incident analysis, this section addresses a different, forward-looking question: how do you turn accumulated audit logs into a systematic evaluation pipeline that continuously measures and improves agent quality?

The answer lies in treating audit logs as a living dataset factory. Every production agent interaction — every decision, every tool call, every approval — is a labeled data point. The challenge is not collecting enough data; it's extracting the right data and structuring it for an evaluation framework. This section covers the data extraction pipeline, the anomaly patterns that surface regression-test candidates, and the CI/CD integration architecture that closes the loop from production observation to quality gate enforcement.

6.1 Three Data Types Audit Logs Provide for Evaluation

When you connect audit logs to an agent evaluation framework, three distinct data types emerge, each serving a different evaluation purpose:

Data Type Source in Audit Logs Evaluation Use Extraction Criteria
Golden Dataset Traces where all tool_calls returned success, approval flows were satisfied, and the final outcome was verified as correct Regression testing — "does the agent still handle this correctly after a model or prompt change?" All events in trace have status=success; no anomaly warnings; human-verified outcome (optional label)
Edge Cases Traces where the LLM selected an unusual tool, used non-default parameters, or the execution took significantly longer than the median Coverage testing — "does the agent handle rare but valid scenarios?" Tool choice is a statistical outlier vs. historical frequency; duration_ms > P95; rare parameter combination
Anomaly Samples Traces containing failed tool calls, missing approval events, LLM error responses, or unexpected state transitions Error-recovery testing — "when things go wrong, does the agent recover gracefully?" At least one event with status=failure or missing mandatory approval; auto-detected by anomaly patterns (Section 6.4)

The key insight: you don't need to manually curate evaluation datasets. The audit log pipeline can automatically classify traces into these three buckets, producing a continuously refreshed evaluation dataset that reflects real production behavior — not synthetic, hand-crafted test cases that drift from reality over time.

6.2 3-Layer Filtering for Golden Dataset Extraction

Extracting a golden dataset from raw audit logs is not as simple as WHERE status = 'success'. You need progressively stricter filters to ensure the extracted data genuinely represents correct agent behavior:

Layer 1 — Technical Correctness Filter. All events in the trace must pass basic structural validation:

Filtering query (ClickHouse):

-- Layer 1: Find traces with zero failure events
SELECT trace_id
FROM agent_audit_log
WHERE timestamp >= now() - INTERVAL 7 DAY
GROUP BY trace_id
HAVING countIf(status = 'failure' OR status = 'timeout') = 0
   AND countIf(event_type = 'tool_call') > 0
LIMIT 10000;

Layer 2 — Approval Integrity Filter. For traces that passed Layer 1, verify that all high-risk operations had proper approval flows. This catches the "silent skip" pattern from Section 5:

-- Layer 2: Exclude traces where high-risk tools lack approval
WITH high_risk_tools AS (
    SELECT trace_id, tool_name
    FROM agent_audit_log
    WHERE event_type = 'tool_call'
      AND (tool_name LIKE 'delete_%' OR tool_name LIKE 'drop_%'
           OR tool_name LIKE 'write_%' OR tool_name LIKE 'execute_%')
      AND timestamp >= now() - INTERVAL 7 DAY
),
approved_traces AS (
    SELECT DISTINCT trace_id
    FROM agent_audit_log
    WHERE event_type = 'approval' AND decision = 'approved'
)
SELECT h.trace_id
FROM high_risk_tools h
INNER JOIN approved_traces a ON h.trace_id = a.trace_id;

Layer 3 — Behavioral Quality Filter. Even technically correct traces can represent poor agent behavior. This layer applies heuristic quality checks:

After passing all three layers, the remaining traces form a high-confidence golden dataset — real production interactions where the agent behaved correctly, efficiently, and with proper safety controls.

6.3 Python: generate_eval_dataset.py with Stratified Sampling

The following script implements the extraction pipeline described above. It connects to ClickHouse, applies the 3-layer filter, and outputs an evaluation dataset with stratified sampling by difficulty — ensuring your eval set represents the full spectrum of production workload complexity, not just the easy cases.

#!/usr/bin/env python3
"""
generate_eval_dataset.py — Extract evaluation datasets from agent audit logs.

Reads from ClickHouse audit log storage, applies the 3-layer golden dataset
filter (technical correctness → approval integrity → behavioral quality),
and exports a stratified eval dataset for the agent evaluation framework.
"""

import argparse
import hashlib
import json
import sys
from collections import defaultdict
from datetime import datetime, timedelta

import clickhouse_connect


# ── Difficulty Classification ──

def classify_difficulty(trace: dict) -> str:
    """
    Classify trace difficulty based on structural complexity.

    Rules (applied in order, first match wins):
      - HARD: >= 5 tool calls OR involves high-risk tools (delete/drop/execute)
              OR approval required
      - MEDIUM: 3-4 tool calls OR non-default tool parameters
      - EASY: 1-2 tool calls, all low-risk, no approval
    """
    tool_calls = [e for e in trace["events"]
                  if e.get("event_type") == "tool_call"]

    n_tools = len(tool_calls)
    high_risk = any(
        e.get("tool_name", "").startswith(("delete_", "drop_", "execute_"))
        for e in tool_calls
    )
    has_approval = any(
        e.get("event_type") == "approval" for e in trace["events"]
    )
    has_non_default_params = any(
        len(e.get("metadata", {}).get("params", {})) > 2
        for e in tool_calls
    )

    if n_tools >= 5 or high_risk or has_approval:
        return "HARD"
    if n_tools >= 3 or has_non_default_params:
        return "MEDIUM"
    return "EASY"


# ── 3-Layer Filter ──

class GoldenDatasetExtractor:
    """
    Extracts golden evaluation datasets from ClickHouse audit logs.

    Implements the 3-layer filtering pipeline:
      1. Technical correctness (no failures, no timeouts)
      2. Approval integrity (high-risk tools have approval records)
      3. Behavioral quality (no excessive tool calls, no redundant ops)
    """

    def __init__(self, ch_host="localhost", ch_port=8123,
                 ch_user="default", ch_password=""):
        self.client = clickhouse_connect.get_client(
            host=ch_host, port=ch_port,
            username=ch_user, password=ch_password
        )

    def extract_trace_ids_layer1(self, days=7, limit=10000) -> list:
        """Layer 1: Technical correctness — zero failures."""
        query = f"""
        SELECT trace_id
        FROM agent_audit_log
        WHERE timestamp >= now() - INTERVAL {days} DAY
        GROUP BY trace_id
        HAVING countIf(status IN ('failure', 'timeout')) = 0
           AND countIf(event_type = 'tool_call') > 0
           AND countIf(event_type = 'tool_call') <= 20
        ORDER BY rand()
        LIMIT {limit}
        """
        result = self.client.query(query)
        return [row[0] for row in result.result_rows]

    def filter_layer2_approval(self, trace_ids: list) -> list:
        """Layer 2: High-risk tools must have approval records."""
        if not trace_ids:
            return []

        placeholders = ",".join([f"'{tid}'" for tid in trace_ids])
        query = f"""
        WITH high_risk AS (
            SELECT trace_id
            FROM agent_audit_log
            WHERE trace_id IN ({placeholders})
              AND event_type = 'tool_call'
              AND (tool_name LIKE 'delete_%'
                   OR tool_name LIKE 'drop_%'
                   OR tool_name LIKE 'execute_%')
        ),
        approved AS (
            SELECT DISTINCT trace_id
            FROM agent_audit_log
            WHERE trace_id IN ({placeholders})
              AND event_type = 'approval'
              AND metadata LIKE '%"decision":"approved"%'
        )
        SELECT DISTINCT h.trace_id
        FROM high_risk h
        INNER JOIN approved a ON h.trace_id = a.trace_id
        """
        result = self.client.query(query)
        approved_set = {row[0] for row in result.result_rows}

        # Keep traces that either (a) passed approval check OR
        # (b) don't use high-risk tools at all
        return [
            tid for tid in trace_ids
            if tid in approved_set or not self._has_high_risk(tid, placeholders)
        ]

    def _has_high_risk(self, trace_id: str, placeholders: str) -> bool:
        """Check if a given trace uses high-risk tools."""
        query = f"""
        SELECT count() > 0
        FROM agent_audit_log
        WHERE trace_id = '{trace_id}'
          AND event_type = 'tool_call'
          AND (tool_name LIKE 'delete_%'
               OR tool_name LIKE 'drop_%'
               OR tool_name LIKE 'execute_%')
        """
        return self.client.query(query).first_row[0] == 1

    def fetch_full_traces(self, trace_ids: list) -> list:
        """Fetch complete event data for a list of trace_ids."""
        if not trace_ids:
            return []

        placeholders = ",".join([f"'{tid}'" for tid in trace_ids])
        query = f"""
        SELECT trace_id, session_id, agent_id, span_id,
               parent_span_id, event_type, tool_name,
               status, duration_ms, timestamp, metadata
        FROM agent_audit_log
        WHERE trace_id IN ({placeholders})
        ORDER BY trace_id, timestamp ASC
        """
        result = self.client.query(query)
        rows = result.result_rows

        # Group by trace_id
        traces = defaultdict(lambda: {
            "trace_id": None, "session_id": None, "agent_id": None,
            "events": []
        })
        for row in rows:
            tid = row[0]
            traces[tid]["trace_id"] = tid
            traces[tid]["session_id"] = row[1]
            traces[tid]["agent_id"] = row[2]
            traces[tid]["events"].append({
                "span_id": row[3],
                "parent_span_id": row[4],
                "event_type": row[5],
                "tool_name": row[6],
                "status": row[7],
                "duration_ms": row[8],
                "timestamp": str(row[9]),
                "metadata": json.loads(row[10] or "{}")
            })

        return list(traces.values())

    def filter_layer3_behavioral(self, traces: list) -> list:
        """
        Layer 3: Behavioral quality filters.

        Removes traces with:
          - Excessive tool calls (> 10)
          - Redundant consecutive identical tool calls
          - Missing final agent response (incomplete trace)
          - Total duration > 60 seconds
        """
        filtered = []
        for trace in traces:
            events = trace["events"]

            # Excessive tool calls
            n_tool_calls = sum(
                1 for e in events if e["event_type"] == "tool_call")
            if n_tool_calls > 10:
                continue

            # Redundant consecutive identical calls
            has_redundant = False
            for i in range(len(events) - 1):
                if (events[i]["event_type"] == "tool_call" and
                    events[i+1]["event_type"] == "tool_call" and
                    events[i]["tool_name"] == events[i+1]["tool_name"] and
                    events[i]["metadata"] == events[i+1]["metadata"]):
                    has_redundant = True
                    break
            if has_redundant:
                continue

            # Incomplete trace (no final response or final call failed)
            if events and events[-1]["event_type"] == "decision":
                continue

            # Duration outlier
            if len(events) >= 2:
                try:
                    t0 = datetime.fromisoformat(
                        events[0]["timestamp"].replace("Z", "+00:00"))
                    t1 = datetime.fromisoformat(
                        events[-1]["timestamp"].replace("Z", "+00:00"))
                    duration_s = (t1 - t0).total_seconds()
                    if duration_s > 60:
                        continue
                except (ValueError, KeyError):
                    pass

            filtered.append(trace)

        return filtered

    def extract(self, days=7, limit=1000) -> list:
        """Run the full 3-layer extraction pipeline."""
        print(f"Layer 1: Extracting candidate trace_ids "
              f"(last {days} days, limit {limit})...")
        tids = self.extract_trace_ids_layer1(days=days, limit=limit)
        print(f"  → {len(tids)} candidates passed Layer 1")

        print("Layer 2: Approval integrity check...")
        tids = self.filter_layer2_approval(tids)
        print(f"  → {len(tids)} candidates passed Layer 2")

        print("Layer 3: Fetching full traces + behavioral quality...")
        traces = self.fetch_full_traces(tids)
        golden = self.filter_layer3_behavioral(traces)
        print(f"  → {len(golden)} traces passed Layer 3 (golden dataset)")

        return golden


# ── Stratified Sampling ──

def stratified_sample(traces: list, target_per_class: int = 100) -> list:
    """
    Stratified sampling by difficulty class.

    Ensures the eval dataset contains a balanced representation of
    EASY, MEDIUM, and HARD traces — matching the complexity distribution
    of the agent evaluation framework's 3-layer scoring model.
    """
    buckets = defaultdict(list)
    for trace in traces:
        difficulty = classify_difficulty(trace)
        trace["difficulty"] = difficulty
        buckets[difficulty].append(trace)

    sampled = []
    for cls in ["EASY", "MEDIUM", "HARD"]:
        pool = buckets.get(cls, [])
        n = min(len(pool), target_per_class)
        # Use hash-based deterministic sampling for reproducibility
        pool.sort(key=lambda t: hashlib.md5(
            t["trace_id"].encode()).hexdigest())
        sampled.extend(pool[:n])
        print(f"  {cls}: sampled {n}/{len(pool)}")

    return sampled


# ── CLI Entry Point ──

def main():
    parser = argparse.ArgumentParser(
        description="Extract golden evaluation dataset from agent audit logs")
    parser.add_argument("--days", type=int, default=7,
                        help="Lookback window in days (default: 7)")
    parser.add_argument("--limit", type=int, default=1000,
                        help="Max candidates from Layer 1 (default: 1000)")
    parser.add_argument("--per-class", type=int, default=100,
                        help="Max samples per difficulty class (default: 100)")
    parser.add_argument("--output", "-o", default="eval_dataset.json",
                        help="Output file path")
    parser.add_argument("--ch-host", default="localhost",
                        help="ClickHouse host")
    parser.add_argument("--ch-port", type=int, default=8123,
                        help="ClickHouse port")
    args = parser.parse_args()

    extractor = GoldenDatasetExtractor(
        ch_host=args.ch_host, ch_port=args.ch_port)
    golden = extractor.extract(days=args.days, limit=args.limit)

    if not golden:
        print("No traces passed all filters. "
              "Try widening --days or increasing --limit.")
        sys.exit(0)

    sampled = stratified_sample(golden, target_per_class=args.per_class)

    # Export
    output = {
        "generated_at": datetime.utcnow().isoformat() + "Z",
        "source": "agent_audit_log",
        "pipeline": "3-layer golden dataset extraction",
        "total_candidates_layer1": args.limit,
        "total_golden": len(golden),
        "total_sampled": len(sampled),
        "trace_count": len(sampled),
        "traces": sampled
    }

    with open(args.output, "w", encoding="utf-8") as f:
        json.dump(output, f, ensure_ascii=False, indent=2, default=str)

    print(f"\n✅ Eval dataset exported: {args.output}")
    print(f"   {len(sampled)} traces ({sum(1 for t in sampled if t['difficulty'] == 'EASY')} easy, "
          f"{sum(1 for t in sampled if t['difficulty'] == 'MEDIUM')} medium, "
          f"{sum(1 for t in sampled if t['difficulty'] == 'HARD')} hard)")


if __name__ == "__main__":
    main()


# ── Usage Example ──
#
# $ python generate_eval_dataset.py --days 14 --limit 5000 --per-class 200
# Layer 1: Extracting candidate trace_ids (last 14 days, limit 5000)...
#   → 4847 candidates passed Layer 1
# Layer 2: Approval integrity check...
#   → 4123 candidates passed Layer 2
# Layer 3: Fetching full traces + behavioral quality...
#   → 3811 traces passed Layer 3 (golden dataset)
#   EASY: sampled 200/2147
#   MEDIUM: sampled 200/1288
#   HARD: sampled 200/376
#
# ✅ Eval dataset exported: eval_dataset.json
#    600 traces (200 easy, 200 medium, 200 hard)

Three key design points:

1. Stratified sampling prevents eval set skew. Without stratification, a naive extraction tends to produce 80%+ EASY traces — because most production interactions are simple queries. But if your eval set is 80% easy, it won't catch regressions in complex multi-tool reasoning. Stratification by difficulty class ensures coverage across the complexity spectrum, matching the 3-layer evaluation model in the evaluation framework.

2. Hash-based deterministic sampling. The stratified_sample function uses hashlib.md5(trace_id) to sort within each difficulty bucket before sampling. This means running the script on the same dataset twice produces identical results — critical for reproducibility in CI/CD pipelines where you need consistent eval sets between runs.

3. Layer 3 is the most subjective — and the most valuable. Layers 1 and 2 are purely mechanical (technical correctness, approval integrity). Layer 3 encodes your team's definition of "good agent behavior" — the thresholds for excessive tool calls, what constitutes redundant operations, what's an acceptable completion time. These thresholds should be tuned per agent and revisited quarterly as your agents evolve.

6.4 5 Anomaly Patterns to Auto-Detect

Anomaly detection in audit logs serves two purposes simultaneously: it flags production incidents for on-call response, and it identifies high-value regression-test candidates for the evaluation pipeline. The following five patterns are derived from real production observations and can be implemented as automated queries or streaming rules:

# Anomaly Pattern Detection Rule Severity Eval Dataset Value
1 Missing Approval — high-risk tool call without a corresponding approval event event_type=tool_call AND tool_name IN (delete_*, drop_*, execute_*) AND trace_id NOT IN (SELECT trace_id WHERE event_type=approval) 🔴 Critical Tests approval policy enforcement — does the agent's safety net work?
2 Tool Call Loops — the same tool called ≥ 5 times in a single trace with identical or near-identical parameters Group by trace_id, tool_name; count ≥ 5; parameter Levenshtein distance < 0.1 🟠 High Tests whether the agent can break out of repetitive reasoning loops
3 Decision/Tool Mismatch — a decision event selects tool_a but the subsequent tool_call executes tool_b Within same span: decision.tool_name != tool_call.tool_name 🟠 High Tests tool-routing correctness — did the agent execute what it intended?
4 Latency Spikes — tool_call duration_ms exceeds the P99 of that tool's historical baseline by 5x or more duration_ms > 5 * historical_p99(tool_name, last_7d) 🟡 Medium Tests timeout handling and user experience under degraded performance
5 Silent Partial Failuretool_call.status=success but return value metadata indicates a partial or degraded result (e.g., "3 of 5 records updated") Parse metadata.result_summary for partial-success indicators: "X of Y", "partially", "some failed" 🟡 Medium Tests whether the agent detects and responds to partial failures vs. silently proceeding

Implementation approach: These five patterns can run as either (a) a scheduled batch query that runs every 5 minutes and writes anomaly traces to a dedicated agent_audit_anomalies table, or (b) a streaming rule engine that evaluates each event at write time. For most teams, the batch approach is simpler to implement and maintain — a 5-minute detection delay is acceptable for anomaly detection, whereas write-time streaming evaluation requires careful latency management to avoid blocking the audit log write pipeline.

Each detected anomaly trace is automatically tagged and fed into the evaluation framework as a regression test candidate — ensuring that when you fix the root cause, the fix is verified against the exact production scenario that exposed the issue.

6.5 Integration Architecture: Audit Logs → CI/CD Quality Gate

The end-to-end architecture closes the loop from production observation to quality gate enforcement:

┌─────────────────────────────────────────────────────────────────────┐
│                    PRODUCTION                                       │
│  ┌─────────┐    ┌──────────────┐    ┌─────────────────────────────┐ │
│  │ Agent    │───▶│ Audit Log    │───▶│ Anomaly Detector (5 rules)  │ │
│  │ Runtime  │    │ Pipeline     │    │ - Writes to anomaly table   │ │
│  └─────────┘    └──────────────┘    └──────────┬──────────────────┘ │
│                                                │                     │
└────────────────────────────────────────────────┼─────────────────────┘
                                                 │
                    ┌────────────────────────────▼──────────────────┐
                    │            DATA EXTRACTION (Weekly Cron)       │
                    │                                              │
                    │  generate_eval_dataset.py                     │
                    │  ┌──────────────────────────────────────┐     │
                    │  │ 3-Layer Golden Dataset Filter        │     │
                    │  │ + Anomaly Sample Collection          │     │
                    │  │ + Stratified Sampling (E/M/H)        │     │
                    │  └──────────────┬───────────────────────┘     │
                    └─────────────────┼─────────────────────────────┘
                                      │
                    ┌─────────────────▼─────────────────────────────┐
                    │           EVALUATION DATASET STORE             │
                    │  eval_dataset.json (versioned in Git)         │
                    └─────────────────┬─────────────────────────────┘
                                      │
                    ┌─────────────────▼─────────────────────────────┐
                    │             CI/CD PIPELINE                     │
                    │                                              │
                    │  ┌──────────┐   ┌──────────────┐             │
                    │  │ PR Open  │──▶│ Run Eval     │             │
                    │  │ (prompt  │   │ Framework    │             │
                    │  │  change) │   │ against      │             │
                    │  │          │   │ dataset      │             │
                    │  └──────────┘   └──────┬───────┘             │
                    │                       │                       │
                    │          ┌────────────▼──────────┐            │
                    │          │ Quality Gate Check     │            │
                    │          │ - Golden pass rate     │            │
                    │          │   must be ≥ 95%        │            │
                    │          │ - Anomaly recovery     │            │
                    │          │   rate must be ≥ 80%   │            │
                    │          │ - No regression in     │            │
                    │          │   any difficulty class │            │
                    │          └────────────┬──────────┘            │
                    │                       │                       │
                    │              ┌────────▼────────┐              │
                    │              │ PASS → Merge PR  │              │
                    │              │ FAIL → Block +   │              │
                    │              │ Report Diff      │              │
                    │              └─────────────────┘              │
                    └──────────────────────────────────────────────┘

Pipeline stages explained:

1. Production observation. The agent runtime writes audit events through the pipeline described in Section 4. The anomaly detector runs as a scheduled job (e.g., every 5 minutes via cron or a workflow scheduler), evaluating each new trace against the 5 anomaly patterns. Detected anomalies are written to an agent_audit_anomalies table for investigation and dataset inclusion.

2. Weekly data extraction. generate_eval_dataset.py runs as a weekly cron job (or triggered manually before a major release). It applies the 3-layer golden dataset filter to the trailing 7–14 days of audit logs and combines golden traces with anomaly samples to produce a comprehensive evaluation dataset. The output — eval_dataset.json — is committed to the agent's Git repository, versioned alongside the prompt templates and tool definitions it evaluates.

3. CI/CD quality gate. When a developer opens a PR that modifies the agent's system prompt, tool definitions, or model configuration, the CI pipeline runs the evaluation framework against the versioned dataset. The quality gate enforces three metrics:

4. Merge or block. If all three quality gates pass, the PR can merge. If any gate fails, the pipeline reports a detailed diff — which specific traces regressed, in which difficulty class, and what the agent did differently — enabling the developer to understand and fix the regression before merging.

6.6 Connecting to the Agent Evaluation Framework

This article focuses on the data supply side — how audit logs produce structured evaluation datasets. For the evaluation execution side — the 3-layer scoring model (tool-use accuracy, reasoning-chain quality, production-quality metrics), the evaluation runner architecture, and the CI/CD integration patterns — see the dedicated article: AI Agent Evaluation Framework: A 3-Layer System for Measuring Tool Use, Reasoning Chains, and Production Quality.

The two systems are designed to work together: audit logs supply continuously refreshed, production-derived evaluation data; the evaluation framework consumes that data to measure and gate agent quality. The pipeline described in this section — generate_eval_dataset.py → versioned dataset → CI/CD quality gate — is the bridge between them.

Summary of Section 6:

  1. Three data types flow from audit logs to evaluation: golden datasets (regression testing), edge cases (coverage testing), and anomaly samples (error-recovery testing).
  2. 3-layer filtering (technical correctness → approval integrity → behavioral quality) ensures golden datasets represent genuinely correct agent behavior, not just technically successful traces.
  3. Stratified sampling by difficulty prevents eval set skew — if your dataset is 80% easy traces, you won't catch regressions in complex multi-tool scenarios.
  4. 5 anomaly patterns (missing approval, tool loops, decision/tool mismatch, latency spikes, silent partial failure) serve dual purpose: alerting on production incidents and feeding regression-test candidates into the evaluation pipeline.
  5. CI/CD integration closes the loop — versioned eval datasets combined with quality gates (golden pass rate, anomaly recovery rate, no per-class regression) ensure every prompt or tool change is validated against production-derived scenarios before deployment.

7. Open-Source vs Commercial Solutions

By this point, you have a clear picture of what to log (Section 2), how to trace it (Section 3), where to store it (Section 4), and how to use it for incident analysis (Section 5) and evaluation (Section 6). The remaining question is practical: what infrastructure should you actually deploy to build this system?

The answer depends on your team's size, compliance requirements, budget, and iteration speed. This section provides a side-by-side comparison of open-source and commercial stacks, a decision framework to guide your choice, and a ready-to-run docker-compose setup for the most common self-hosted path: OpenTelemetry + Jaeger + ClickHouse.

7.1 The Open-Source Stack

The recommended open-source stack consists of three layers:

Layer Technology Role Why This Choice
Instrumentation OpenTelemetry Generate spans and traces from agent code; export to collector CNCF standard, vendor-neutral, SDKs in all major languages, supports custom span attributes for agent-specific fields (decision, approval, tool_call)
Trace Storage + Visualization Jaeger Ingest traces from OpenTelemetry Collector; provide trace search and waterfall visualization Native OpenTelemetry support, mature UI for trace inspection, lightweight deployment, active community (graduated CNCF project)
Log Storage + Analytics ClickHouse
(or Elasticsearch)
Store structured audit log events as JSON; run analytical queries for anomaly detection, golden dataset extraction, and retention policy enforcement ClickHouse: columnar storage, sub-second aggregation on billions of rows, excellent compression (10:1 typical). Elasticsearch: better full-text search, Kibana dashboards, but higher infrastructure cost at scale

How the layers connect. Your agent code uses the OpenTelemetry SDK to create spans — one span per event (decision, tool_call, tool_result, approval, error). Each span carries the trace_id from Section 2 design. Spans are exported via OTLP to the OpenTelemetry Collector, which forwards them to Jaeger (for trace visualization) and to ClickHouse (for structured log storage and analytical queries). This dual-write architecture gives you both real-time trace debugging (Jaeger) and long-term analytical querying (ClickHouse) from a single instrumentation point.

Elasticsearch alternative. If your team already operates an ELK stack, you can substitute ClickHouse with Elasticsearch. Elasticsearch provides built-in Kibana dashboards and stronger full-text search on prompt_summary and llm_response fields. However, at high volume (millions of events per day), ClickHouse's columnar compression and aggregation performance significantly outperform Elasticsearch for the analytical workloads described in Sections 4–6. A pragmatic compromise: store the most recent 30 days in Elasticsearch for operational search, and archive older data to ClickHouse for long-term analytics.

7.2 Commercial Comparison Table

For teams that prefer managed solutions, four major platforms offer LLM observability specifically relevant to agent audit logging:

Dimension LangSmith Weights & Biases Datadog LLM Arize Phoenix
Cost Free tier available; paid plans per-seat — verify current pricing before procurement Free tier available; paid plans per-seat — verify current pricing before procurement Usage-based pricing (per-host + per-span) — verify current pricing before procurement Open-source core (Apache 2.0) free self-hosted; managed cloud usage-based — verify current pricing
Integration Effort Low — native Python SDK; 2-line decorator to trace any function; built for LangChain/LangGraph Low — wandb.init() plus auto-instrumentation; strong ML experiment tracking lineage Medium — requires Datadog Agent; LLM observability is an add-on to existing APM; broader setup needed Medium — OpenTelemetry-based; drop-in for existing OTel setups; requires Phoenix server deployment
Agent-Specific Features Strong. Built-in trace types for chain/agent/ tool; native support for tool_call and decision events; eval dataset creation from traces Moderate. General trace + metrics; LLM-specific prompts/tokens tracked; limited agent decision-path modeling Moderate. LLM span types; prompt/response capture; token usage monitoring; less native support for approval flows Moderate. OpenTelemetry standard spans; flexible attribute schema; no built-in agent-specific event types
Data Privacy Cloud-hosted SaaS; supports enterprise security options — verify current compliance claims in vendor trust center Cloud-hosted SaaS; supports enterprise security options — verify current compliance claims in vendor trust center Cloud-hosted SaaS; supports enterprise security options — verify current compliance claims in vendor trust center Best for data sovereignty. Full self-hosting with Apache 2.0 license; data never leaves your infrastructure; ideal for regulated industries
Vendor Lock-In High. Trace format, eval datasets, and prompt management are LangSmith-specific; migration requires significant rework Moderate. Traces exportable via API; eval logic tied to W&B run model; migration is feasible but non-trivial Moderate. Underlying traces use OpenTelemetry; dashboards and alerts are Datadog-specific; data export via API Low. OpenTelemetry-native; data stored in standard formats (Parquet, JSON); zero lock-in for self-hosted deployments

LangSmith is the furthest along in agent-specific observability — if your stack is LangChain/LangGraph-based, its native trace types map almost 1:1 to the five event types defined in Section 2. The downside is vendor lock-in: your trace format, evaluation datasets, and prompt registry all live inside LangSmith's ecosystem.

Weights & Biases excels if your team already uses it for ML experiment tracking. Adding agent traces to existing W&B runs provides lineage from model training to production agent behavior — useful for A/B testing model changes. However, W&B's agent-specific features are less mature than LangSmith's.

Datadog LLM Observability is the natural choice for teams already on Datadog APM. It unifies agent observability with infrastructure monitoring (CPU, memory, GPU utilization during LLM calls). The trade-off: LLM observability is priced as an add-on, and the cost can escalate quickly if you trace every agent interaction at high volume.

Arize Phoenix occupies a unique position — open-source core (Apache 2.0) with a managed cloud option. It uses OpenTelemetry as its native ingestion format, meaning you can start with the self-hosted version and migrate to cloud later (or vice versa) with zero format conversion. For teams in regulated industries that require data to stay on-premises, Phoenix is the strongest commercial-adjacent option.

7.3 Decision Framework

Use the following decision tree to select your stack. The framework prioritizes practical constraints over feature checklists because, in practice, the "best" tool is the one your team will actually deploy and maintain:

Your Situation Recommended Stack Rationale
Team < 5 engineers Open-source: OTel + Jaeger + ClickHouse Single docker-compose up (Section 7.4) gives you a working stack in 5 minutes. No per-seat licensing cost. Small teams benefit from understanding the internals rather than abstracting them away.
Compliance requires self-hosted Open-source OR Arize Phoenix (self-hosted) Neither LangSmith, W&B, nor Datadog offer self-hosted deployment for non-enterprise tiers. If data must stay on-premises, your choice is pure open-source or Phoenix's Apache 2.0 distribution. Phoenix adds a polished UI on top of OpenTelemetry, reducing the need for custom dashboards.
Need fast iteration (days, not weeks) Commercial: LangSmith or Datadog LLM Managed solutions eliminate infrastructure toil — no Elasticsearch cluster tuning, no ClickHouse schema migrations, no Jaeger retention config. Your team spends time on agent behavior analysis, not on maintaining observability infrastructure. LangSmith is the faster path for LangChain teams; Datadog is faster for teams already on Datadog.
Already on Datadog APM Datadog LLM Observability Unify agent traces with existing service traces, infrastructure metrics, and alerting. The marginal integration cost is minimal if the Datadog Agent is already deployed. Be mindful of ingestion pricing at high trace volumes — consider sampling non-critical traces.
Hybrid: sensitive data on-prem, telemetry in cloud OTel + ClickHouse (on-prem) + Grafana (cloud or on-prem) Keep prompt_summary, tool_params, and llm_response fields on-premises in ClickHouse. Export only anonymized trace metadata (trace_id, duration_ms, tool_name, status) to a cloud dashboard. The OpenTelemetry Collector's filter processor can strip sensitive fields before export.

Pragmatic guidance. Most teams should start with the open-source stack (Section 7.4) during development and initial production deployment. It costs nothing, gives you complete control, and builds genuine understanding of agent observability. If and when infrastructure maintenance becomes a bottleneck — typically around 3–6 months into production, when the team is spending more time managing ClickHouse partitions than analyzing agent behavior — evaluate LangSmith or Datadog as a managed upgrade. The OpenTelemetry instrumentation code you write for the open-source stack is reusable: switching to LangSmith or Datadog later is a configuration change, not a rewrite.

7.4 docker-compose: Self-Hosted OpenTelemetry + Jaeger Stack

Below is a complete, production-ready docker-compose.yml that launches the entire open-source stack: OpenTelemetry Collector for span ingestion, Jaeger for trace storage and visualization, and ClickHouse for structured audit log storage and analytics. Copy this file, run docker-compose up -d, and you have a working agent audit log backend in under 5 minutes.

# docker-compose.yml — Self-hosted Agent Audit Log Stack
# OpenTelemetry Collector + Jaeger + ClickHouse
#
# Usage:
#   docker-compose up -d
#   # Jaeger UI:    http://localhost:16686
#   # ClickHouse:   http://localhost:8123/play
#   # OTLP gRPC:    localhost:4317
#   # OTLP HTTP:    localhost:4318
#
# Your agent code exports spans to the OTel Collector at localhost:4317.
# The collector forwards traces to Jaeger and structured logs to ClickHouse.

version: "3.8"

services:
  # ── OpenTelemetry Collector ──────────────────────────────────
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.102.0
    container_name: otel-collector
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
    ports:
      - "4317:4317"   # OTLP gRPC receiver
      - "4318:4318"   # OTLP HTTP receiver
      - "8888:8888"   # Prometheus metrics (collector health)
    depends_on:
      - jaeger
      - clickhouse
    restart: unless-stopped

  # ── Jaeger (Trace Storage + Visualization) ───────────────────
  jaeger:
    image: jaegertracing/all-in-one:1.57
    container_name: jaeger
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      - SPAN_STORAGE_TYPE=badger
      - BADGER_EPHEMERAL=false
      - BADGER_DIRECTORY_KEY=/badger/data
      - BADGER_DIRECTORY_VALUE=/badger/data
      - QUERY_MAX_TRACES_PER_QUERY=10000
    volumes:
      - jaeger_data:/badger
    ports:
      - "16686:16686"  # Jaeger Query UI
      - "16685:16685"  # gRPC Query (used by Grafana)
    restart: unless-stopped

  # ── ClickHouse (Structured Audit Log Storage) ────────────────
  clickhouse:
    image: clickhouse/clickhouse-server:24.3
    container_name: clickhouse
    environment:
      CLICKHOUSE_DB: audit_logs
      CLICKHOUSE_USER: audit
      CLICKHOUSE_PASSWORD: audit_secret
    volumes:
      - clickhouse_data:/var/lib/clickhouse
      - ./clickhouse-init.sql:/docker-entrypoint-initdb.d/init.sql:ro
    ports:
      - "8123:8123"   # HTTP interface
      - "9000:9000"   # Native protocol
    ulimits:
      nofile:
        soft: 262144
        hard: 262144
    restart: unless-stopped

  # ── (Optional) Grafana for Unified Dashboards ────────────────
  grafana:
    image: grafana/grafana:10.4.0
    container_name: grafana
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
      - GF_INSTALL_PLUGINS=grafana-clickhouse-datasource
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:ro
    ports:
      - "3000:3000"
    depends_on:
      - jaeger
      - clickhouse
    restart: unless-stopped
    profiles:
      - monitoring   # Start only with: docker-compose --profile monitoring up

volumes:
  jaeger_data:
  clickhouse_data:
  grafana_data:

OpenTelemetry Collector configuration (otel-collector-config.yaml):

# otel-collector-config.yaml
# Place this file alongside docker-compose.yml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 5s
    send_batch_size: 512

  # Optional: strip sensitive fields before export to external systems
  # attributes/redact:
  #   actions:
  #     - key: prompt_summary
  #       action: delete
  #     - key: llm_response
  #       action: delete

exporters:
  # Export traces to Jaeger via OTLP
  otlp/jaeger:
    endpoint: jaeger:4317
    tls:
      insecure: true

  # Export structured logs to ClickHouse via HTTP
  clickhouse:
    endpoint: tcp://clickhouse:9000?database=audit_logs
    username: audit
    password: audit_secret
    ttl: 90d
    create_schema: true
    logs_table_name: agent_audit_log

  # Optional: debug exporter for local testing
  # logging:
  #   loglevel: debug

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [otlp/jaeger]
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [clickhouse]

  telemetry:
    metrics:
      address: 0.0.0.0:8888

ClickHouse initialization (clickhouse-init.sql):

-- clickhouse-init.sql
-- Automatically executed on first container start.
-- Creates the audit log table matching the data model from Section 2.

CREATE TABLE IF NOT EXISTS audit_logs.agent_audit_log (
    timestamp       DateTime64(3) CODEC(DoubleDelta, ZSTD(3)),
    trace_id        String,
    span_id         String,
    parent_span_id  String,
    event_type      LowCardinality(String),  -- decision, tool_call, tool_result, approval, error
    tool_name       String,
    tool_params     String CODEC(ZSTD(5)),
    tool_result     String CODEC(ZSTD(5)),
    status          LowCardinality(String),  -- success, failure, timeout, pending
    duration_ms     UInt32,
    prompt_summary  String CODEC(ZSTD(5)),
    llm_response    String CODEC(ZSTD(5)),
    approver        String,
    approval_decision LowCardinality(String),
    error_message   String CODEC(ZSTD(3)),
    error_type      LowCardinality(String),
    metadata        String CODEC(ZSTD(5)),   -- JSON blob for extensibility
    agent_id        LowCardinality(String),
    session_id      String,

    -- Partition by month for efficient retention management (Section 4.2)
    INDEX idx_trace_id trace_id TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_event_type event_type TYPE set(0) GRANULARITY 1,
    INDEX idx_status status TYPE set(0) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (agent_id, timestamp, trace_id, span_id)
TTL timestamp + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;

-- Pre-aggregated materialized view: hourly event counts by agent
-- Supports the operational dashboards described in Section 4.3
CREATE MATERIALIZED VIEW IF NOT EXISTS audit_logs.agent_event_stats_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (agent_id, event_type, hour)
AS SELECT
    toStartOfHour(timestamp) AS hour,
    agent_id,
    event_type,
    status,
    count() AS event_count,
    avg(duration_ms) AS avg_duration_ms,
    quantile(0.95)(duration_ms) AS p95_duration_ms
FROM audit_logs.agent_audit_log
GROUP BY hour, agent_id, event_type, status;

Instrumenting your agent code (Python example using OpenTelemetry SDK):

# agent_tracing.py — OpenTelemetry instrumentation for agent audit logs
#
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
#
# This module creates OTel spans that map to the five audit event types
# defined in Section 2. Spans are automatically exported to the OTel
# Collector (localhost:4317), which forwards them to Jaeger and ClickHouse.

import uuid
from contextlib import contextmanager

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# ── One-time setup ─────────────────────────────────────────────
resource = Resource(attributes={
    SERVICE_NAME: "my-agent-service",
    "agent.id": "shipping-agent-v2",
})
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

# ── Audit Span Helpers ─────────────────────────────────────────
# Each helper creates a span for one of the five audit event types.
# Span attributes follow the data model from Section 2.

def create_audit_span(
    event_type: str,
    trace_id: str,
    span_id: str,
    parent_span_id: str = "",
    **attrs
) -> trace.Span:
    """Create an OTel span with agent-audit-specific attributes."""
    span = tracer.start_span(
        name=f"agent.{event_type}",
        attributes={
            "audit.event_type": event_type,
            "audit.trace_id": trace_id,
            "audit.span_id": span_id,
            "audit.parent_span_id": parent_span_id,
            **attrs,
        }
    )
    return span


@contextmanager
def decision_span(trace_id: str, parent_span_id: str = ""):
    """Wrap an LLM decision point in an audit span (event_type=decision)."""
    span_id = uuid.uuid4().hex[:16]
    span = create_audit_span("decision", trace_id, span_id, parent_span_id)
    span.set_attribute("audit.span_id", span_id)
    try:
        yield span
    except Exception as e:
        span.set_attribute("audit.status", "failure")
        span.set_attribute("audit.error_message", str(e))
        span.set_attribute("audit.error_type", type(e).__name__)
        raise
    finally:
        span.end()


@contextmanager
def tool_call_span(
    trace_id: str,
    parent_span_id: str,
    tool_name: str,
    tool_params: str = ""
):
    """Wrap a tool execution in an audit span (event_type=tool_call)."""
    span_id = uuid.uuid4().hex[:16]
    span = create_audit_span(
        "tool_call", trace_id, span_id, parent_span_id,
        **{"audit.tool_name": tool_name, "audit.tool_params": tool_params}
    )
    try:
        yield span
    except Exception as e:
        span.set_attribute("audit.status", "failure")
        span.set_attribute("audit.error_message", str(e))
        raise
    finally:
        span.end()


@contextmanager
def approval_span(
    trace_id: str,
    parent_span_id: str,
    approver: str,
    approval_decision: str
):
    """Wrap an approval event in an audit span (event_type=approval)."""
    span_id = uuid.uuid4().hex[:16]
    span = create_audit_span(
        "approval", trace_id, span_id, parent_span_id,
        **{
            "audit.approver": approver,
            "audit.approval_decision": approval_decision
        }
    )
    try:
        yield span
    finally:
        span.end()


# ── Example Usage ──────────────────────────────────────────────
if __name__ == "__main__":
    trace_id = format_trace_id(uuid.uuid4().int)  # OTel 32-char hex

    with decision_span(trace_id) as dec_span:
        dec_span.set_attribute("audit.tool_choice", "shipping_lookup")
        dec_span.set_attribute("audit.tool_choice_rationale",
                               "User asked about order status")
        dec_span.set_attribute("audit.status", "success")

        with tool_call_span(
            trace_id,
            dec_span.attributes.get("audit.span_id", ""),
            tool_name="shipping_lookup",
            tool_params='{"order_id": "ORD-12345"}'
        ) as tc_span:
            tc_span.set_attribute("audit.status", "success")
            tc_span.set_attribute("audit.tool_result",
                                  '{"status": "in_transit", "eta": "2 days"}')

Validating your stack. After running docker-compose up -d, verify the setup with three checks:

  1. Jaeger UI — Open http://localhost:16686. Run the example agent code above (python agent_tracing.py), then search for service my-agent-service. You should see a trace with three spans: decision → tool_call → tool_result.
  2. ClickHouse — Open http://localhost:8123/play and run SELECT * FROM audit_logs.agent_audit_log LIMIT 10. You should see structured audit log rows with the same trace_id as the Jaeger trace.
  3. Grafana — If you started with docker-compose --profile monitoring up, open http://localhost:3000 (login: admin/admin). The Jaeger and ClickHouse data sources are pre-configured; you can build dashboards combining trace latency from Jaeger with event-type distributions from ClickHouse.

This stack handles up to ~50,000 events/second on a single 8-core/32GB machine with the default configuration. For higher throughput, scale ClickHouse to a cluster and add Jaeger with Kafka + Elasticsearch backend (beyond the scope of this starter docker-compose).

8. Implementation Roadmap

You've now seen the full picture: what to log (Section 2), how to trace it (Section 3), where to store it (Section 4), how to use it for incident analysis (Section 5) and evaluation (Section 6), and what infrastructure to deploy (Section 7). The remaining question is: how do you get started, and in what order should you build each capability?

This section presents a four-phase implementation roadmap, designed so that each phase delivers independently useful value. You don't need to wait until Phase 4 to realize benefits — Phase 1 alone can cut incident investigation time from hours to minutes. Each phase builds on the previous one, and each has a clear ROI estimation so you can prioritize based on your team's constraints.

8.1 Phase 1: Minimal Viable Audit (1–2 Weeks)

Goal: Turn the agent's decision chain from a black box into searchable, structured data. At this stage, you don't need dashboards, alerting, or replay — you need the ability to answer "what happened?" when something goes wrong.

What to build:

What you can do at this stage:

# Search for all tool_call events in the last 24 hours
$ grep '"event_type":"tool_call"' /var/log/agent/audit-2026-05-22.jsonl | jq '.'

# Find all events for a specific trace_id
$ grep '0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67' /var/log/agent/audit-*.jsonl | jq '.'

# Extract the decision chain for a trace_id, sorted by timestamp
$ grep '0199c2d5-8a7f-7000-9b3e-1f2d3c4b5a67' /var/log/agent/audit-*.jsonl \
  | jq -s 'sort_by(.timestamp) | .[] | {event_type, tool_name, status, timestamp}'

Team effort: One engineer, 1–2 weeks. The instrumentation code is straightforward — the bulk of the work is integrating trace_id propagation into your existing agent loop and ensuring you don't accidentally create a new trace_id mid-conversation.

Infrastructure cost: Near-zero. JSONL files on existing disk. At 50,000 events/day (moderate agent traffic), you'll produce ~50MB daily, or ~1.5GB per month. Rotate aggressively and this fits in any production environment.

8.2 Phase 2: Visualization & Search (2–4 Weeks)

Goal: Replace grep-and-jq with a proper search UI and dashboards. Phase 1's file-based approach works for single-incident debugging, but it breaks down when you need to answer aggregate questions: "how many approvals were bypassed this week?" or "which agent has the highest tool-call failure rate?"

What to build:

What you can do at this stage:

# Kibana/Elasticsearch DSL: find all DELETE tool_calls without approval
GET audit-logs-*/_search
{
  "query": {
    "bool": {
      "must": [
        { "term": { "event_type": "tool_call" } },
        { "term": { "tool_name": "delete_records" } }
      ],
      "must_not": [
        { "term": { "has_approval": true } }
      ]
    }
  }
}

Team effort: 1–2 engineers, 2–4 weeks. The main work is in standing up the ingestion pipeline, mapping the audit log fields to the search backend's index schema, and building the dashboards. If your team already operates Elasticsearch or Grafana, this phase can be done in the lower end of the range.

Infrastructure cost: Moderate. A single-node Elasticsearch or Loki instance for agent audit data runs comfortably on 4 vCPU / 8GB RAM. If you're already running these tools for other purposes, the incremental cost is the storage volume for audit log data.

8.3 Phase 3: Log Replay & Automated Testing (1–2 Months)

Goal: Turn audit log data into a regression test suite. Every time you upgrade the LLM model, change the system prompt, or modify tool definitions, you replay historical tool-call sequences to verify nothing breaks. This closes the loop between production observation (Phases 1–2) and pre-deployment validation.

What to build:

What you can do at this stage:

# Run replay suite against a new agent version
$ agent-replay run \
    --golden-dataset ./golden/top-200-traces.jsonl \
    --agent-version v2.3.0 \
    --pass-threshold 0.90

Results:
  Total traces replayed: 200
  Passed: 187 (93.5%)        ← above threshold
  Failed: 13 (6.5%)
  Drift report: ./replay-results/v2.3.0-drift.md

# CI check (fails if below threshold)
$ agent-replay ci-check --threshold 0.90
✓ Pass rate 93.5% ≥ threshold 90% — merge allowed

Team effort: 2 engineers, 1–2 months. The replay harness requires careful design — you're essentially building a deterministic comparison layer on top of a non-deterministic system. Start with sequence-matching only (simplest, highest signal), add parameter similarity and outcome equivalence iteratively. The golden dataset curation is an ongoing activity, not a one-time task.

Infrastructure cost: Low to moderate. The golden dataset itself is small (a few hundred traces is ~10MB). The replay execution uses your existing agent runtime — you're just feeding it historical inputs and capturing outputs. CI integration uses your existing CI pipeline (GitHub Actions, GitLab CI, Jenkins).

8.4 Phase 4: Anomaly Detection & Alerting (Ongoing)

Goal: Shift from reactive ("we discovered a problem when someone complained") to proactive ("the system told us something changed"). This phase never really ends — you continuously refine detection rules and add new signals.

What to build:

What you can do at this stage:

# Example: Prometheus alert rule for approval bypass rate
- alert: AgentApprovalBypassHigh
  expr: |
    rate(agent_tool_calls_without_approval_total[5m])
    /
    rate(agent_tool_calls_total[5m])
    > 0.05
  for: 5m
  labels:
    severity: p1
  annotations:
    summary: "Agent {{ $labels.agent_id }} approval bypass rate > 5%"
    description: "{{ $value | humanizePercentage }} of tool calls in the last 5 minutes
                  were executed without approval. Trace sample: run audit-log search
                  --event-type tool_call --has-approval false --since 10m"

Team effort: Ongoing. 1 engineer can set up the initial rule-based detectors in 1–2 weeks. ML-based detection requires a data scientist or ML engineer for the initial model training and an ongoing commitment to model maintenance (retraining as agent behavior evolves, tuning false-positive rates).

Infrastructure cost: Moderate to high, depending on ML depth. Rule-based detectors run on your existing monitoring stack (Prometheus + Alertmanager, Datadog, Grafana). ML-based detection may require a model training pipeline, feature store, and model serving infrastructure — budget 2–4 additional vCPUs and a GPU instance if training custom models.

8.5 ROI Estimation by Phase

The table below provides a realistic estimate of the investment and return for each phase, based on a mid-size engineering team running 3–5 production agents at moderate scale (~50,000 tool calls/day). Numbers are approximate and should be calibrated to your team's specific context.

Phase Time Effort Infra Cost Value Delivered When It Pays Off
Phase 1
Minimal Viable Audit
1–2 weeks 1 engineer ~$0 (existing disk) Incident investigation: 2 hours → 3 minutes per incident. 40× time savings on every post-mortem. Compliance: you can now produce a trace for any tool execution. First production incident
Phase 2
Visualization & Search
2–4 weeks 1–2 engineers ~$50–150/month (Elasticsearch or Loki instance) Aggregate queries in seconds (vs. grep across files). Live dashboards for team situational awareness. 10× faster bulk investigation. Second incident requiring cross-agent analysis; first audit or compliance review
Phase 3
Log Replay & Testing
1–2 months 2 engineers ~$0–50/month (CI runner + storage) Catch prompt/model regressions before deployment, not after. Automated regression suite replaces manual ad-hoc testing. Prevents production incidents. First model upgrade or prompt change that would have caused a silent regression
Phase 4
Anomaly Detection
Ongoing 1 engineer (sustaining) ~$100–500/month (monitoring + ML infra) Proactive detection: you know about issues before users report them. Mean Time To Detect (MTTD) drops from hours/days to minutes. First silent failure that rule-based detection catches before users notice

Key insight from the roadmap. Each phase is independently valuable. Phase 1 alone — trace_id + JSONL files — is the highest-ROI investment you can make in agent observability. It costs almost nothing, takes days, and pays off on the very first production incident. Every subsequent phase amplifies that value: Phase 2 makes it accessible to the whole team, Phase 3 prevents regressions, and Phase 4 catches problems before humans do. The most common mistake is trying to skip to Phase 4 without Phase 1 — anomaly detection is useless if you can't trace the anomaly back to a specific decision chain. Build the foundation first.

For teams just starting out: schedule Phase 1 for your next sprint. It's a two-week task that will fundamentally change how you debug agent behavior. The trace_id propagation you set up in Phase 1 is forward-compatible with every subsequent phase — you won't need to re-instrument your agent code.


Frequently Asked Questions

1. Isn't adding trace_id to regular app logs enough? Why do agents need specialized audit logs?

No. trace_id in regular app logs can only trace deterministic code paths — function A called function B, you record the chain. But an agent's decision path is different: at each step, the LLM chooses which tool to call and what parameters to pass — and that choice is non-deterministic.

Even with trace_id, you still can't answer:

  • Why did the LLM pick tool_a over tool_b? — regular logs have no decision event type
  • What context did it see at that moment? — regular logs don't record prompt_summary or tool_choice rationale
  • Was there a human approval? Who approved it? — regular logs have no approval event type or approver field

Agent audit logs add value on top of trace_id by introducing three semantic layers: decision (why), approval (who approved), and tool_call (what happened). These three layers are dimensions that regular application logs were never designed to capture.

2. How detailed should audit logs be? Do you record full tool-call parameters and return values?

The principle: record enough to reconstruct the decision chain afterward, but sanitize sensitive fields and truncate large payloads.

Specifically:

  • Tool call parameters — record the full JSON, but apply field-level redaction. api_key, token, password, and similar keys should be replaced with REDACTED, preserving the parameter structure for audit purposes.
  • Tool return values — record HTTP status codes and a result summary (e.g., first 1,024 characters). Truncate longer values and append the original length marker.
  • LLM decision context — record a system prompt summary and the LLM's tool_choice + rationale. Don't record the full conversation history (reference it via session_id; the full history lives in application logs).
  • Approval records — record approver ID, approval time, a summary of the approval context, and the decision (approve/reject).

Do not record full conversation histories or raw user input in audit logs. Not only does this bloat audit log volume, it can introduce PII compliance risk. If application logs store the full user input, they too need redaction and retention policies — PII risk doesn't disappear just because it's in app logs. Audit logs should stay lean and high-signal, referencing full context in application logs via session_id.

3. How do you keep the audit log itself from becoming a security risk?

Audit logs contain detailed tool-call parameters and return values. If stored carelessly, they're a high-value target. Three core protections:

1. Sanitize on write. Apply field-level redaction rules before events enter the log pipeline. Do this at the application layer — don't rely on storage-layer post-processing, since an attacker could read data in transit before it reaches storage.

2. Encrypt at rest + physical isolation. Use storage-layer at-rest encryption, transport encryption, strict index/table-level access control, and optional field-level encryption for especially sensitive values. Encryption does not replace redaction — sanitize first, then encrypt. Audit logs should be physically separated from regular application logs. Regular developers may need access to app logs for debugging, but they shouldn't have access to full tool-call records in audit logs.

3. Access control. Audit log read permissions should be independent of app log access, restricted to security auditors, compliance teams, and on-call engineers. See Agent Runtime Isolation in this series for the relationship between log storage security and isolation environments — the same isolation principles apply to log infrastructure.

4. What's the boundary between decision and tool_call event types?

In short: decision answers why, tool_call answers what.

In a single agent reasoning loop, the sequence and responsibilities are:

  1. LLM inference → outputs tool_choice + rationale
  2. Record decision event: which tool the LLM chose, why (rationale), the current prompt context summary, and the LLM's proposed parameters
  3. (Optional) Human approval: if the tool requires approval, insert an approval event here
  4. Agent framework executes the tool
  5. Record tool_call event: the actual tool name called, the executed parameters, the return value, duration, and success/failure status

A common point of confusion: what's the difference between the parameters in a decision event and those in a tool_call event? The difference: decision records the LLM's proposed parameters (which could be modified during approval), while tool_call records the actually executed parameters. In most simple implementations the two are identical — but once you introduce approval flows or parameter rewriting logic, the distinction becomes critical.

5. How do agent audit logs relate to OpenTelemetry Spans?

OpenTelemetry Spans provide the distributed tracing infrastructure — trace_id and span_id generation, propagation, and visualization. Agent audit logs can be built on top of OTel Spans but need semantic extensions:

  • What OTel Spans cover: operation name (maps to event_type), start/end time (maps to duration_ms), attribute key-value pairs (maps to metadata)
  • What OTel Spans lack: LLM decision rationale, approval chains (approver + approval_context), field-level sanitization of tool parameters, and complete tool-call parameters and return values (OTel backends and exporters often impose attribute/event size limits)

Model choice — Span Events vs Child Spans: Use Span Events (add_event()) for decision and approval — low cardinality, no independent span_id. For long-running tool calls (e.g., > 5s external API), create independent Child Spans for clearer call-tree visibility. The recommended approach in practice: write audit events as OTel Span Events (for trace visualization), while also persisting a separate, complete, structured audit log in dedicated storage (for compliance search and replay). This way you reuse OTel's trace infrastructure without losing the audit semantics — the Span carries IDs and summaries, while critical information lives in the dedicated audit log backend.


Next Steps

⬅️ Previous

Agent Runtime Isolation: Docker, Firecracker, VM Sandbox — How to Choose

The isolation spectrum and decision framework — choose the runtime boundary for agent code execution by risk level.

➡️ Next · Coming Soon

MCP Protocol Production Guide: Secure Deployment of the Model Context Protocol

Security practices at the tool protocol layer — MCP isolation, authentication, and transport security in production.

📚 Related Reading