Agent Message Schema Design: Making Multi-Agent Workflows Verifiable and Traceable

30-Second Takeaway

  • Core Problem: In multi-agent systems, field name mismatches cause silent failures, broken traceability chains across pipelines, and downstream crashes when schemas evolve — because there is no explicit message contract between agents.
  • Solution: A four-layer message schema model — Data Layer carries the business payload, Metadata Layer handles routing and identity, Verification Layer provides integrity proofs, and Routing Layer controls delivery semantics — each layer evolves independently.
  • Key Output: Complete JSON Schema definitions for five core message types (TaskHandoff, ToolResult, ApprovalRequest, StatusUpdate, ErrorReport) plus a discriminated union envelope — ready to drop into your project.
  • What You'll Walk Away With: The ability to design a versioned, verifiable, evolvable, framework-agnostic message schema for your multi-agent system — eliminating traceability breaks and version incompatibility at the architectural level.

1. Why Multi-Agent Workflows Need Explicit Message Schemas — Beyond "Just Use JSON"

Here's a production incident that actually happened. A team built a three-agent CI pipeline: Code Analyzer Agent scans pull requests for issues → Test Runner Agent executes the test suite → Report Generator Agent posts results to Slack. The pipeline ran smoothly for two weeks — until a critical PR with a security fix was merged without any tests running.

It took two hours to find the root cause:

# Agent A (Code Analyzer) produces this output:
msg = {
    "task_id": "pr-2024-0812",
    "result": "pass",                   # ← field name: result
    "analysis": {"issues_found": 0, "severity": "none"}
}

# Agent B (Test Runner) parses it like this:
def handle_analysis_result(msg):
    status = msg.get("output")          # ← expects field: output
    if status == "pass":                # status is None — always falsy
        run_full_test_suite()
    # No else branch. No error. Silent pass.

Agent A used a result field to communicate the analysis outcome. Agent B expected output. Because there was no schema definition anywhere in the message contract, the two agents formed what we call an implicit message contract — it existed only in the developers' heads, not in any verifiable structure. When the contract was violated, the system didn't error, didn't alert, didn't roll back — it just silently did the wrong thing.

This isn't just about field name mismatches. In multi-agent pipelines, implicit message contracts fail systematically in three ways:

① Silent Failure Propagation: Agent B parses Agent A's output. Field names don't match, types are wrong, or the nesting structure shifted — but the parsing code has no validation (or validation that's too weak to catch it). Bad data propagates all the way to the end of the pipeline. You see the final output is wrong, but you cannot trace back to which agent introduced the corruption. A 5-agent CI pipeline fails — you spend hours bisecting to find the single agent that corrupted the message.

② Traceability Collapse: Five agents collaborate on a task. Agent 3 produces a malformed message. Because each agent is independently deployed and independently versioned, you have no way to answer: which agent, running which version, produced this message? Without sender_agent_id, agent_version, and schema_version embedded in the message itself, the traceability chain breaks at the first hop. You're left reconstructing timelines from scattered logs across five different services.

③ Version Coupling: You add three new fields to Agent A's output and deploy. Agent B is still running the old version. When it encounters fields it doesn't recognize, it throws an unhandled exception. The entire pipeline halts. You realize there was never a compatibility agreement between Agent A and Agent B — adding a field = crashing downstream.

All three failures share a common root cause: agent-to-agent message formats are treated as a "just pass JSON" implementation detail rather than an architectural contract that requires explicit design.

An explicit message schema provides three foundational properties:

① Contract (Enforcement): A schema is a formal contract between Agent A and Agent B — "I promise to output these fields with these types; you can depend on their existence." Schema contracts are machine-verifiable: CI checks can validate them, message validators can enforce them at runtime, and documentation can be auto-generated from them. The contract moves from the developer's head into executable code.

② Traceability (Provenance): By embedding sender_agent_id, agent_version, schema_version, and content_hash directly in each message, any message can be independently interrogated: "Which agent, running which version, produced this? Is the content identical to what was originally sent?" You don't need to correlate logs across five services — the message carries its own provenance.

③ Evolvability (Independent Change): With an explicit schema_version field and documented compatibility rules, Agent A can upgrade its schema while Agent B decides how to interpret it: "This message uses schema v1.2.0. I know v1.0.0 fields. New optional fields? I ignore them. My contract still holds."

Consider the parallel: in the microservices world, REST APIs got OpenAPI/Swagger. gRPC got protobuf service definitions. Developers would never connect two services without an API contract. But in agent systems, the overwhelming majority of developers pass bare {"key": "value"} dicts between agents — no schema, no validation, no versioning. You wouldn't wire up two production microservices with undocumented JSON payloads. Why do it for agents?

📌 Core Insight: A multi-agent system is not "a few functions calling each other inside one process." It's a distributed messaging system composed of independently deployed, independently evolving intelligent agents. Without schemas, there is no reliability. This aligns with the golden rule of agent-to-agent communication: agents communicate via structured data (JSON), not natural language. But structured data alone isn't enough — it needs a schema to define the structure itself.

2. The Four-Layer Schema Design Model: Data, Metadata, Verification, Routing

When you decide "I need to design a schema for agent messages," the first question is: what fields should the message body actually contain?

Developers tend to land at one of two extremes. Either they use a single content field and serialize everything into a string — dangerously under-designed. Or they dump every field they can think of into a flat object — trace_id, span_id, parent_span_id, timestamp, created_at, updated_at, version, api_version — until a basic task handoff message has 40 fields. Dangerously over-designed.

Neither approach works. Agent message schemas are not flat tables — their design should be layered, with each layer addressing a different dimension of concern. After reverse-engineering the message formats of existing multi-agent frameworks (Google A2A, OpenAI Agents SDK, CrewAI, AutoGen), we've distilled a four-layer model:

LayerPurposeKey FieldsWhen Required
Data LayerCarries the actual business payload — "what to do, with what parameters, producing what result"content, parameters, results, artifactsEvery message
Metadata LayerIdentifies the message — "who sent it, to whom, when, belonging to which task"task_id, sender_agent_id, receiver_agent_id, timestamp, correlation_id≥ 2 agents collaborating
Verification LayerProvides integrity proof — "has this message been tampered with? Does it match what the sender produced?"content_hash, schema_version, agent_signatureCross-process / cross-host communication in production
Routing LayerControls delivery semantics — "how many retries? What's the timeout? Where do replies go?"priority, ttl, max_retries, idempotency_key, reply_toAsync message queues / event-driven architectures

Let's ground this in a concrete scenario: a code review pipeline with three agents — Research Agent → Writer Agent → Reviewer Agent. Each step passes structured messages to the next:

Layer 1 — Data Layer: The Business Payload

This is the "what" of the message — what business content is being conveyed. The Data Layer exists in every message, regardless of deployment architecture.

{
  "data": {
    "content": {
      "action": "review_pr",
      "repository": "org/backend-service",
      "pr_number": 1842,
      "source_branch": "feat/add-rate-limiting",
      "target_branch": "main",
      "changed_files": ["middleware/rate_limiter.go", "config/rate_limits.yaml"],
      "author": "alice-dev"
    },
    "parameters": {
      "review_depth": "full",
      "focus_areas": ["security", "performance"],
      "max_files_to_review": 50
    },
    "results": null,
    "artifacts": []
  }
}

The Data Layer describes only business semantics — no infrastructure concerns leak in. The design follows a straightforward principle: content holds the core payload of this interaction, parameters carries constraints, results holds execution output (null when the message initiates an action), and artifacts holds references to large binary objects (ObjRefs, not the blobs themselves).

Layer 2 — Metadata Layer: Identity and Routing Context

This is the "who, when, where" of the message. As soon as your system has ≥ 2 agents, the Metadata Layer is non-negotiable. Without it, you cannot associate messages with tasks in any logging or tracing system.

{
  "metadata": {
    "message_id": "msg-8f3a1b2c",
    "task_id": "task-2024-1205-001",
    "correlation_id": "corr-9b2c3d4e",
    "sender_agent_id": "research-agent",
    "sender_agent_version": "2.1.0",
    "receiver_agent_id": "writer-agent",
    "timestamp": "2024-12-05T14:23:11.482Z",
    "ttl_seconds": 300
  }
}

Critical design decision: message_id and task_id operate at two different levels. message_id identifies this specific message (unique, used for deduplication). task_id identifies the task's full lifecycle (one task may produce N messages). correlation_id is an optional field for linking multiple tasks into a workflow — e.g., a "review PR" task belongs to a "CI pipeline run" that spans multiple independent tasks.

Layer 3 — Verification Layer: Content Integrity Proof

This is the "proof" layer. When agents cross process boundaries — especially over message queues or HTTP between hosts — each message must carry integrity proof.

{
  "verification": {
    "schema_version": "1.2.0",
    "content_hash": "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
    "content_hash_algorithm": "sha256",
    "agent_signature": null
  }
}

content_hash is the SHA-256 hash of the Data Layer. A downstream agent can recompute the hash of the received data and compare it to this value — confirming that "what the upstream agent sent has not been modified in transit." schema_version tells the downstream agent which schema version this message uses, enabling correct parsing. agent_signature is an optional digital signature for cross-trust-domain authentication (strongly recommended in production).

Layer 4 — Routing Layer: Delivery Semantics

This is the "how to deliver" layer. When messages are not synchronous calls but flow through asynchronous message queues, you need delivery control.

{
  "routing": {
    "priority": "high",
    "max_retries": 3,
    "retry_backoff": "exponential",
    "idempotency_key": "idem-msg-8f3a1b2c",
    "reply_to": "queue:research-agent-responses",
    "dead_letter_queue": "queue:dlq-reviews"
  }
}

idempotency_key is the most important field in the Routing Layer. Message queues can deliver the same message more than once due to network jitter. The receiver uses the key to deduplicate — ensuring "this message is processed at most once." reply_to enables request-response patterns: the sender specifies which queue the reply should be delivered to.

Combined: All Four Layers in One Message

Here's a complete task handoff message with all four layers active:

{
  "data": {
    "content": {
      "action": "draft_code_review",
      "findings": [
        {"file": "rate_limiter.go:42", "issue": "unbounded goroutine spawn", "severity": "high"},
        {"file": "rate_limits.yaml", "issue": "missing rate limit for admin endpoint", "severity": "medium"}
      ]
    },
    "parameters": {"target_length": "concise", "format": "markdown"},
    "results": null,
    "artifacts": []
  },
  "metadata": {
    "message_id": "msg-8f3a1b2c",
    "task_id": "task-review-pr-1842",
    "correlation_id": "corr-ci-pipeline-2024-1205",
    "sender_agent_id": "research-agent",
    "sender_agent_version": "2.1.0",
    "receiver_agent_id": "writer-agent",
    "timestamp": "2024-12-05T14:23:11.482Z",
    "ttl_seconds": 600
  },
  "verification": {
    "schema_version": "2.1.0",
    "content_hash": "sha256:a7f3c9e1...",
    "content_hash_algorithm": "sha256",
    "agent_signature": null
  },
  "routing": {
    "priority": "high",
    "max_retries": 2,
    "retry_backoff": "exponential",
    "idempotency_key": "idem-msg-8f3a1b2c",
    "reply_to": "queue:writer-agent-responses",
    "dead_letter_queue": "queue:dlq-reviews"
  }
}

When to use how many layers?

Not every message needs all four layers. The number of layers depends on your deployment architecture:

The core design principle: upper layers do not depend on lower layers; lower layers provide services upward. You can change the retry strategy in the Routing Layer without touching the Data Layer or Metadata Layer at all. This decoupling lets each layer evolve independently.

📌 Relationship to Context Protocol: This four-layer message schema model is complementary to the agent context protocol's four-layer data architecture (Message Bus → Tool Context → Memory Context → Task Context). The context protocol addresses how state flows between components inside an agent. The message schema addresses how messages flow between agents. Both use a four-layer design philosophy, but their scopes differ: internal component communication vs. inter-agent communication.

3. Core Message Types: Task Handoff, Tool Result, Approval, Status, Error

With the four-layer model as your design framework, the next step is defining concrete message types. A common mistake in multi-agent systems: all messages share a single {"type": "message", "data": {...}} envelope — the type field is so generic it can't distinguish a task handoff from a tool result, can't drive type-safe routing, and forces every downstream agent to use if "some_field" in msg guesswork to figure out what it received.

A well-designed agent messaging system needs an explicit message type taxonomy — each type with its own independent schema. After analyzing cross-framework interaction patterns, agent-to-agent communication boils down to five core message types:

Message TypeScenarioDirectionmessage_type Value
TaskHandoffAgent A delegates a task to Agent BOne-way (A → B)task_handoff
ToolResultAgent A receives tool execution outputResponse (Tool → A)tool_result
ApprovalRequestAgent A requests human or supervisor agent approvalRequest (A → Human/B)approval_request
StatusUpdateAgent A reports task progress to orchestratorPush (A → Orchestrator)status_update
ErrorReportAgent A reports its own or downstream errorsBroadcast (A → Monitor/Orchestrator)error_report

Below are the complete JSON Schema definitions for each type. Each schema includes required fields, optional fields, usage context, and anti-patterns — you can adopt them directly or trim to fit your system.

3.1 TaskHandoff — Task Delegation

Definition: Agent A formally hands a task to Agent B, transmitting the task specification and inherited context. This is the foundational message type in any multi-agent system.

Required fields: task_id | from_agent | to_agent | task_spec

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "TaskHandoff",
  "type": "object",
  "required": ["task_id", "from_agent", "to_agent", "task_spec", "schema_version"],
  "properties": {
    "message_type": {
      "const": "task_handoff"
    },
    "task_id": {
      "type": "string",
      "description": "Globally unique task identifier, persists across the full task lifecycle"
    },
    "from_agent": {
      "type": "string",
      "description": "Sender agent identifier"
    },
    "to_agent": {
      "type": "string",
      "description": "Receiver agent identifier"
    },
    "task_spec": {
      "type": "object",
      "required": ["action", "input"],
      "properties": {
        "action": {
          "type": "string",
          "description": "Operation to perform, e.g. 'analyze_pr', 'draft_response'"
        },
        "input": {
          "type": "object",
          "description": "Input parameters required by the operation"
        },
        "context": {
          "type": "object",
          "description": "Context inherited from upstream tasks (optional)"
        },
        "constraints": {
          "type": "object",
          "properties": {
            "max_duration_seconds": {"type": "integer"},
            "required_confidence": {"type": "number", "minimum": 0, "maximum": 1}
          }
        }
      }
    },
    "priority": {
      "type": "string",
      "enum": ["low", "normal", "high", "critical"],
      "default": "normal"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

When to use: The Research Agent completes its source analysis and hands findings to the Writer Agent for draft generation — carrying action: "draft_review" plus the structured analysis results.

Common anti-pattern: Hardcoding from_agent and to_agent as string literals. Use identifiers from an agent registry instead — this keeps agent identity decoupled from routing logic.

3.2 ToolResult — Tool Execution Output

Definition: The result returned after an agent invokes an external tool (API, database, filesystem). This is the "return leg" of the agent → tool → agent loop.

Required fields: tool_call_id | tool_name | output

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "ToolResult",
  "type": "object",
  "required": ["tool_call_id", "tool_name", "output", "schema_version"],
  "properties": {
    "message_type": {
      "const": "tool_result"
    },
    "tool_call_id": {
      "type": "string",
      "description": "Corresponding tool call ID — used to match request with response"
    },
    "tool_name": {
      "type": "string",
      "description": "Name of the invoked tool, e.g. 'git_diff', 'run_linter'"
    },
    "output": {
      "oneOf": [
        {"type": "object"},
        {"type": "array"},
        {"type": "string"},
        {"type": "number"},
        {"type": "boolean"},
        {"type": "null"}
      ],
      "description": "Tool execution result — can be any JSON type"
    },
    "error": {
      "type": "object",
      "description": "Error information when tool execution fails (optional, required when present)",
      "properties": {
        "code": {"type": "string"},
        "message": {"type": "string"},
        "details": {"type": "object"}
      }
    },
    "duration_ms": {
      "type": "integer",
      "description": "Tool execution duration in milliseconds"
    },
    "is_truncated": {
      "type": "boolean",
      "default": false,
      "description": "Whether the output was truncated due to size limits"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

When to use: The Research Agent calls git_diff to retrieve code changes for a PR, and the tool returns the diff output. The result wraps around the raw output with metadata about duration and truncation.

Common anti-pattern: Stuffing the entire raw API response (100KB+) into the output field verbatim. This doesn't just blow up the LLM context window — it can leak secrets (tokens, keys) from HTTP headers or internal metadata. Always trim and sanitize tool output at the context boundary before placing it into output.

3.3 ApprovalRequest — Human-in-the-Loop Gate

Definition: Before executing a high-risk operation (deleting data, modifying production config, issuing a large refund), the agent requests approval from a human or supervisor agent.

Required fields: request_id | action | resource | reason

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "ApprovalRequest",
  "type": "object",
  "required": ["request_id", "action", "resource", "reason", "schema_version"],
  "properties": {
    "message_type": {
      "const": "approval_request"
    },
    "request_id": {
      "type": "string",
      "description": "Unique approval request identifier"
    },
    "action": {
      "type": "string",
      "description": "Requested operation, e.g. 'delete_repository', 'merge_to_main', 'deploy_hotfix'"
    },
    "resource": {
      "type": "object",
      "description": "Target resource affected by this operation",
      "properties": {
        "type": {"type": "string"},
        "id": {"type": "string"},
        "summary": {"type": "string"}
      }
    },
    "reason": {
      "type": "string",
      "description": "Agent's reasoning summary for this operation — the approver needs to understand why the agent made this decision"
    },
    "context": {
      "type": "object",
      "description": "Supplementary context to help the approver decide (optional)"
    },
    "timeout_seconds": {
      "type": "integer",
      "default": 3600,
      "description": "Approval timeout — auto-reject after this duration"
    },
    "risk_level": {
      "type": "string",
      "enum": ["low", "medium", "high"],
      "description": "Risk assessment for this operation"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

When to use: The Reviewer Agent detects a proposed change that would delete a production database index. Before proceeding, it sends an ApprovalRequest — action: "drop_index" + resource: {type: "database_index", id: "idx_orders_created_at"} — with the reasoning behind the recommendation and a risk level of "high."

Common anti-pattern: Sending an approval request with only the operation and resource — no reason or context. The human approver then has to spend time in another system piecing together why the agent wants to do this. You've shifted the reasoning burden from the agent to the human.

3.4 StatusUpdate — Progress Reporting

Definition: An agent proactively reports its state and progress to the orchestrator or monitoring system during execution.

Required fields: task_id | new_status | timestamp

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "StatusUpdate",
  "type": "object",
  "required": ["task_id", "new_status", "timestamp", "schema_version"],
  "properties": {
    "message_type": {
      "const": "status_update"
    },
    "task_id": {
      "type": "string",
      "description": "Associated task ID"
    },
    "new_status": {
      "type": "string",
      "enum": ["pending", "running", "waiting_for_approval", "waiting_for_tool",
               "completed", "failed", "cancelled", "timed_out"],
      "description": "New task status"
    },
    "previous_status": {
      "type": "string",
      "description": "Previous task status (optional, for state machine validation)"
    },
    "progress_pct": {
      "type": "integer",
      "minimum": 0,
      "maximum": 100,
      "description": "Task completion percentage"
    },
    "message": {
      "type": "string",
      "description": "Human-readable status description"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "next_expected_status": {
      "type": "string",
      "description": "Expected next status (optional, for predictive monitoring)"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

When to use: The CI orchestrator tracks three sub-agents' progress. Each sub-agent sends a StatusUpdate after completing key phases — the orchestrator aggregates them into a dashboard: "PR #1842: Research ✅ → Drafting ⏳ 85% → Review pending."

Common anti-pattern: Making new_status a free-form string (e.g., "almost there", "working on it..."). Unconstrained status values prevent any downstream automation. The correct approach: use enumerated values for machine processing, with an optional message field carrying human-readable detail.

3.5 ErrorReport — Failure Escalation

Definition: When an agent encounters an error it cannot self-recover from, it sends an error report to the monitoring system or orchestrator.

Required fields: error_code | error_message | source_agent | timestamp

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "ErrorReport",
  "type": "object",
  "required": ["error_code", "error_message", "source_agent", "timestamp", "schema_version"],
  "properties": {
    "message_type": {
      "const": "error_report"
    },
    "error_code": {
      "type": "string",
      "description": "Machine-readable error code, e.g. 'TOOL_TIMEOUT', 'SCHEMA_VALIDATION_FAILED'",
      "examples": ["TOOL_TIMEOUT", "UPSTREAM_UNREACHABLE", "SCHEMA_VALIDATION_FAILED",
                   "LLM_RATE_LIMITED", "INVALID_TASK_SPEC"]
    },
    "error_message": {
      "type": "string",
      "description": "Human-readable error description"
    },
    "severity": {
      "type": "string",
      "enum": ["warning", "error", "critical"],
      "default": "error"
    },
    "source_agent": {
      "type": "string",
      "description": "Agent that produced this error"
    },
    "source_task_id": {
      "type": "string",
      "description": "Task being executed when the error occurred"
    },
    "stack_trace": {
      "type": "string",
      "description": "Technical stack trace (optional, only enabled in debug mode)"
    },
    "recovery_hint": {
      "type": "string",
      "description": "Agent's suggested recovery strategy (optional but strongly recommended)",
      "examples": ["Retry with exponential backoff", "Escalate to human operator",
                   "Skip this item and continue with next batch"]
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

When to use: The Writer Agent calls the LLM API and gets rate-limited after three retries. It sends an ErrorReport — error_code: "LLM_RATE_LIMITED" + recovery_hint: "retry_with_exponential_backoff_and_jitter". The orchestrator receives this and decides whether to retry or fail the task.

Common anti-pattern: Error reports with only an error_message string — "something went wrong." Without an error_code, downstream systems can't automate routing (which errors are retryable? which require escalation? which can be safely ignored?). Without a recovery_hint, every error requires human triage.

3.6 Discriminated Union: Unified Message Envelope

With five message types defined, you need a unified envelope to wrap them — enabling the message router to dispatch based on message_type automatically:

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "AgentMessage",
  "description": "Unified message envelope — discriminated union via message_type",
  "type": "object",
  "required": ["message_id", "message_type", "payload", "schema_version"],
  "properties": {
    "message_id": {
      "type": "string",
      "description": "Globally unique message ID"
    },
    "message_type": {
      "type": "string",
      "enum": ["task_handoff", "tool_result", "approval_request",
               "status_update", "error_report"],
      "description": "Message type discriminator — the router uses this to dispatch"
    },
    "payload": {
      "description": "Concrete message payload — type determined by message_type",
      "oneOf": [
        {"$ref": "#/$defs/TaskHandoffPayload"},
        {"$ref": "#/$defs/ToolResultPayload"},
        {"$ref": "#/$defs/ApprovalRequestPayload"},
        {"$ref": "#/$defs/StatusUpdatePayload"},
        {"$ref": "#/$defs/ErrorReportPayload"}
      ]
    },
    "metadata": {
      "type": "object",
      "description": "Metadata layer fields — task_id, correlation_id, etc."
    },
    "schema_version": {
      "type": "string",
      "pattern": "^\\d+\\.\\d+\\.\\d+$"
    }
  }
}

The router's consumption logic is straightforward — no isinstance(payload, TaskHandoff) type checks. Dispatch by message_type string:

# Message router — dispatching by message_type
ROUTING_TABLE = {
    "task_handoff": handle_task_handoff,
    "tool_result": handle_tool_result,
    "approval_request": handle_approval_request,
    "status_update": handle_status_update,
    "error_report": handle_error_report,
}

def dispatch(message: dict) -> None:
    msg_type = message["message_type"]
    handler = ROUTING_TABLE.get(msg_type)
    if handler is None:
        raise ValueError(f"Unknown message type: {msg_type}")
    handler(message["payload"])
📌 Not every system needs all five types: If you have two agents doing simple tool calls, you may only need TaskHandoff and ToolResult. As system complexity grows, progressively introduce ApprovalRequest (for human-in-the-loop gates), StatusUpdate (for progress visibility), and ErrorReport (for structured failure escalation). The schema should adapt to your system's complexity — not the other way around.

4. Schema Versioning and Forward Compatibility — Evolving Messages Without Breaking Pipelines

In the real world, schemas will change. You'll add fields (task_spec gains a priority field). You'll rename fields (receiver_agent_id becomes target_agent_id because it reads better). You'll restructure nested objects (a flat content blob gets split into content + parameters + context).

No versioning strategy = every change is a breaking change. Agent A deploys v2.0 — its output messages now contain three new fields. Agent B is still running old code. When it encounters a field it doesn't recognize, it throws an unhandled exception. The entire pipeline halts.

4.1 Three Versioning Strategies

There are three mainstream strategies for agent message schema versioning, each with its own tradeoffs:

StrategyApproachProsConsBest For
Additive Only add new optional fields. Never rename or delete. Mark deprecated fields with @deprecated. 100% forward-compatible. Old agents are completely unaffected. Schema bloats over time. Deprecated field accumulation. Internal systems, ≤ 5 agents
Migratory Assign a new MAJOR version to the new schema. Maintain parsers for both versions simultaneously. Old version has an explicit EOL date. Can redesign without historical baggage. Clean break possible. Must maintain two code paths during the transition. Higher cost. Public APIs, cross-team agent systems
Dual-Write Agent A outputs both v1 and v2 format messages. Agent B migrates to v2 incrementally. Message volume doubles during dual-write. Zero-downtime migration. Old and new agents consume their respective versions independently. Message traffic doubles. Storage costs increase. Large-scale systems, zero-downtime production environments

Recommended starting point: begin with Additive. The vast majority of schema changes in agent systems are just adding fields — Additive covers this completely. Only graduate to Migratory or Dual-Write when you need a fundamental redesign that can't be achieved through additive changes alone.

4.2 The schema_version Field

Every message must carry a schema_version field using semantic versioning (MAJOR.MINOR.PATCH):

{
  "message_type": "task_handoff",
  "message_id": "msg-9a2b3c4d",
  "schema_version": "2.1.3",
  "payload": {
    "task_id": "task-review-pr-1842",
    "from_agent": "research-agent",
    "to_agent": "writer-agent",
    "task_spec": {
      "action": "draft_review",
      "input": {"pr_number": 1842, "findings": [...]},
      "priority": "high"                // ← new optional field added in v2.1.0
    }
  }
}

schema_version tells the downstream agent how to parse this message. Without it, you're left guessing — you know the message is a task_handoff type, but you don't know whether its schema is v1.0 or v2.1.

4.3 Forward Compatibility Rules

Forward compatibility means: a consumer agent built against an older schema version can correctly read messages produced by a newer schema version.

Change TypeExampleVersion BumpForward Compatible?
Add optional fieldtask_spec gains priority (default "normal")MINOR++✅ Yes — old agents ignore unknown optional fields
Expand enum valuespriority enum adds "critical"MINOR++✅ Yes — old agents can treat unknown enum values as default
Add required fieldtask_spec gains required trace_idMAJOR++❌ No — old agents can't parse messages missing the required field
Rename fieldreceiver_agent_idtarget_agent_idMAJOR++❌ No — old agents look for the old field name
Change field typeduration_ms from int to stringMAJOR++❌ No — type mismatch causes parse failure
Delete fieldRemove legacy_fieldMAJOR++❌ No — any agent depending on that field breaks

The core rule is straightforward: if the change doesn't break existing agents' ability to parse and interpret already-known fields, it's a MINOR change. If it does break that ability, it's a MAJOR change.

4.4 Deprecation Lifecycle: @deprecated Annotation + Grace Period

When a field needs to be removed, don't delete it from the schema outright — that instantly breaks every agent still using it. Follow a three-phase deprecation lifecycle:

PhaseTimelineActionAgent Behavior
① Mark DeprecatedT+0Add "deprecated": true to the field in the schema. Update documentation. Introduce replacement field while keeping the old one.
Schema version: MINOR++.
All agents still use the old field. Log deprecation warnings.
② Grace PeriodT+30 daysRequire all agents to migrate to the new field. Old field remains valid but may have degraded performance (dual-write).New agents use the new field. Legacy agents continue using the old field.
③ RemoveT+60 daysDelete the old field from the schema.
Schema version: MAJOR++.
Only agents migrated to the new schema version continue functioning.

The deprecation marker in JSON Schema:

{
  "old_field_name": {
    "type": "string",
    "description": "[DEPRECATED] Use 'new_field_name' instead. Will be removed on 2026-08-01.",
    "deprecated": true,
    "deprecation_message": "Use 'new_field_name' instead — this field will be removed in v3.0.0"
  },
  "new_field_name": {
    "type": "string",
    "description": "Replacement for old_field_name"
  }
}

4.5 Schema Registry Pattern for Agent Systems

As your agent system scales, you'll need a centralized place to store, version, and serve schemas — analogous to Confluent Schema Registry for Kafka but adapted for agent messages. A lightweight schema registry for agents:

# Agent Message Schema Registry — minimal reference implementation
import json
from pathlib import Path
from typing import Optional

class AgentSchemaRegistry:
    """Central registry for agent message schemas with version history."""

    def __init__(self, schema_dir: str = "./schemas"):
        self.schema_dir = Path(schema_dir)
        self._cache: dict[str, dict] = {}

    def register(self, message_type: str, version: str, schema: dict) -> None:
        """Register a new schema version for a message type."""
        key = f"{message_type}:{version}"
        self._cache[key] = schema
        path = self.schema_dir / message_type / f"{version}.json"
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(json.dumps(schema, indent=2))

    def get_schema(self, message_type: str, version: Optional[str] = None) -> dict:
        """Retrieve a schema. If no version specified, returns latest."""
        if version:
            key = f"{message_type}:{version}"
            if key in self._cache:
                return self._cache[key]
            # Fallback: load from disk
            path = self.schema_dir / message_type / f"{version}.json"
            return json.loads(path.read_text())
        # Return latest version
        versions = sorted(
            (p.stem for p in (self.schema_dir / message_type).glob("*.json")),
            key=lambda v: tuple(map(int, v.split(".")))
        )
        return self.get_schema(message_type, versions[-1])

    def list_versions(self, message_type: str) -> list[str]:
        """List all registered versions for a message type."""
        path = self.schema_dir / message_type
        return sorted(p.stem for p in path.glob("*.json"))

This minimal registry gives you: versioned schema storage on disk, latest-version retrieval for new agents, and explicit version pinning for legacy agents. In production, you'd extend this with an HTTP API and caching layer.

4.6 Forward Compatibility Checker

Here's a runnable Python compatibility checker — it takes a message and a consumer's expected schema version and returns whether they're compatible:

from packaging import version
from typing import Optional

def is_forward_compatible(
    message_schema_version: str,
    consumer_expected_version: str,
    required_fields: Optional[set] = None
) -> tuple[bool, str]:
    """Check whether a message is forward-compatible with the consumer's expected schema version.

    Args:
        message_schema_version: The message's schema_version field value, e.g. "2.1.3"
        consumer_expected_version: The consumer's expected MAJOR version, e.g. "2.0.0"
        required_fields: Set of field names the consumer depends on (optional)

    Returns:
        (is_compatible, reason) — whether compatible + explanation
    """
    msg_ver = version.parse(message_schema_version)
    exp_ver = version.parse(consumer_expected_version)

    # Rule 1: MAJOR version mismatch = potentially incompatible
    if msg_ver.major != exp_ver.major:
        return False, (
            f"Major version mismatch: message v{msg_ver.major}.x "
            f"vs consumer expected v{exp_ver.major}.x"
        )

    # Rule 2: Same MAJOR, newer MINOR/PATCH → forward-compatible (additive changes)
    return True, (
        f"Compatible: message v{message_schema_version} >= "
        f"consumer expected v{consumer_expected_version} "
        f"(same major, newer minor/patch)"
    )

The core logic is two rules: same MAJOR = compatible (new optional fields don't affect old agents). Different MAJOR = incompatible (may have changed required fields or renamed fields).

In production, integrate this checker into your message router — before delivering a message to any consumer, verify the consumer's expected version against the message's actual version. Incompatible messages route to the dead letter queue instead of crashing the consumer.

📌 The Golden Rule of Versioning: Never make the consumer (downstream agent) guess the schema version. Every message explicitly carries schema_version. The downstream agent checks the version before doing any parsing. This is like a TCP handshake — confirm both sides "speak the same language" before data transmission begins. When the agent communication channel carries messages at scale, this single field prevents more production incidents than any other design decision in this article.

5. Making Messages Verifiable — Content Hashes, Agent Identity, Integrity Proofs

Here's the scenario: your three-agent pipeline has been running for weeks. One day, the Reviewer Agent starts producing nonsensical review scores — a PR with zero issues gets flagged as "high risk," while a PR touching critical auth logic sails through with a clean bill. You spend an hour debugging the Reviewer Agent's logic before realizing the problem isn't there. The Writer Agent received corrupted findings from the Research Agent, and neither agent knew it.

This is the message integrity gap: in a multi-agent pipeline, Agent C receives a broken result. Was it Agent A's fault (produced bad data)? Agent B's fault (transformed it incorrectly)? Or did something corrupt the message in transit (network glitch, queue truncation, serialization bug)? Without verification built into the message itself, you can't answer that question — you're stuck correlating external traces across three different services and guessing.

The Verification Layer (Layer 3 of the four-layer model) solves this by embedding integrity proofs directly in every message. Let's build it field by field.

5.1 content_hash — Cryptographic Fingerprint of the Data Layer

The simplest and most powerful verification mechanism: hash the data layer and include the hash in the message. The receiver recomputes the hash on the data they received and compares. If the hashes match, the data is intact. If they don't, something modified it in transit.

import hashlib
import json

def compute_content_hash(data: dict, algorithm: str = "sha256") -> str:
    """Compute deterministic content hash of the data layer.

    The key detail: canonical JSON serialization (sorted keys, no whitespace)
    ensures the same data always produces the same hash — regardless of
    which programming language or JSON library produced it.
    """
    canonical = json.dumps(data, sort_keys=True, separators=(",", ":"))
    h = hashlib.new(algorithm)
    h.update(canonical.encode("utf-8"))
    return f"{algorithm}:{h.hexdigest()}"

Why sort_keys=True and separators=(",", ":")? Because {"a":1,"b":2} and {"b":2,"a":1} are semantically identical JSON but produce different byte strings for hashing. Canonical serialization eliminates that ambiguity — the same data always hashes to the same value regardless of key ordering or whitespace.

5.2 sender_agent_id + agent_version — Identity and Provenance

A hash tells you whether the data changed. Identity fields tell you who produced it and which version of their code was running. This is essential for debugging: if the Research Agent v2.1.0 started producing malformed findings at 14:23 UTC, you need to know both the agent and the version to correlate with deployments.

{
  "metadata": {
    "sender_agent_id": "research-agent",
    "sender_agent_version": "2.1.0",
    "sender_host": "node-7.internal",     // optional, for cross-host debugging
    "timestamp": "2024-12-05T14:23:11.482Z"
  }
}

These fields live in the Metadata Layer — they describe the message's origin, not its content. But they're critical for verification: if two agents claim to have produced the same content hash, the identity fields let you distinguish them.

5.3 signature — Cryptographic Attestation Across Trust Boundaries

content_hash proves integrity (data wasn't modified). signature proves authenticity (data came from the claimed sender, not an impersonator). Not every scenario needs signatures — internal message queues within a VPC don't — but any message crossing a trust boundary (different hosts, different teams, different organizations) should carry one.

import hmac

def sign_message(message: dict, secret_key: str) -> str:
    """HMAC-SHA256 signature over the canonical data layer.

    The signature covers only the data layer — metadata and routing
    layers may change in transit (e.g., a proxy adds a hop).
    """
    data_canonical = json.dumps(
        message["data"], sort_keys=True, separators=(",", ":")
    )
    sig = hmac.new(
        secret_key.encode("utf-8"),
        data_canonical.encode("utf-8"),
        hashlib.sha256
    )
    return f"hmac-sha256:{sig.hexdigest()}"

def verify_signature(message: dict, secret_key: str) -> bool:
    """Verify the HMAC signature on a received message."""
    expected = message["verification"]["signature"]
    actual = sign_message(message, secret_key)
    return hmac.compare_digest(expected, actual)

Use hmac.compare_digest — never == for signature comparison. Constant-time comparison prevents timing side-channel attacks that could leak bits of the expected signature.

5.4 proof_chain — Multi-Hop Integrity Trail

In a pipeline like Research Agent → Writer Agent → Reviewer Agent, each hop transforms the data. A single content_hash can only verify the final step. To verify every step, each agent appends a (agent_id, content_hash) pair to a cumulative proof chain:

def append_to_proof_chain(message: dict, agent_id: str) -> dict:
    """Append current agent's identity and the data hash to the proof chain.

    This creates an immutable audit trail: every agent that touched the
    message records its identity and the hash of the data it produced.
    """
    if "proof_chain" not in message["verification"]:
        message["verification"]["proof_chain"] = []

    message["verification"]["proof_chain"].append({
        "agent_id": agent_id,
        "content_hash": compute_content_hash(message["data"]),
        "timestamp": message["metadata"]["timestamp"]
    })
    return message

After all three agents process the message, the proof chain tells the complete story:

{
  "verification": {
    "schema_version": "2.1.0",
    "content_hash": "sha256:f9e2d4...",
    "proof_chain": [
      {
        "agent_id": "research-agent",
        "content_hash": "sha256:a7f3c9...",
        "timestamp": "2024-12-05T14:23:11.482Z"
      },
      {
        "agent_id": "writer-agent",
        "content_hash": "sha256:b8e4d1...",
        "timestamp": "2024-12-05T14:24:03.117Z"
      },
      {
        "agent_id": "reviewer-agent",
        "content_hash": "sha256:f9e2d4...",
        "timestamp": "2024-12-05T14:25:41.839Z"
      }
    ]
  }
}

Now, when the Reviewer Agent produces an anomalous score, you don't have to guess which agent introduced the corruption. You walk the proof chain: Research Agent's hash → Writer Agent's hash → Reviewer Agent's hash. If Writer Agent's hash doesn't match what Research Agent claims to have sent, you know exactly where the pipeline broke.

5.5 End-to-End Verification Function

Here's a single verify_message function that checks all three integrity properties:

def verify_message(message: dict, secret_key: str = None) -> tuple[bool, list[str]]:
    """Verify a message's integrity. Returns (is_valid, issues).

    Checks performed:
      1. content_hash matches the actual data layer
      2. proof_chain entries are internally consistent (each references the prior hash)
      3. signature is valid (only if secret_key is provided)
    """
    issues = []

    # Check 1: content_hash
    expected_hash = message["verification"]["content_hash"]
    algorithm = message["verification"].get("content_hash_algorithm", "sha256")
    actual_hash = compute_content_hash(message["data"], algorithm)
    if not hmac.compare_digest(expected_hash, actual_hash):
        issues.append(
            f"Content hash mismatch: expected {expected_hash}, got {actual_hash}"
        )

    # Check 2: proof_chain consistency
    chain = message["verification"].get("proof_chain", [])
    for i in range(1, len(chain)):
        prev_hash = chain[i-1]["content_hash"]
        # In a full implementation, you'd verify each step's hash against
        # the data that agent actually received. This simplified check
        # confirms the chain references are all present and non-empty.
        if not prev_hash:
            issues.append(f"Proof chain entry {i} missing prior hash")

    # Check 3: signature (only if key provided)
    if secret_key and message["verification"].get("signature"):
        if not verify_signature(message, secret_key):
            issues.append("Signature verification failed")

    return (len(issues) == 0, issues)

When to include which verification fields? Not every message needs all of them. Use a graduated approach:

Make verification fields optional in your base schema. Use the message type's inherent risk profile — or an explicit risk_level field — to determine which integrity checks to apply at runtime.

📌 Verification is not just about security: The primary value of content hashes and proof chains in agent systems is debuggability, not security. When a 5-agent pipeline produces a wrong result at 2 AM, the proof chain cuts diagnosis time from hours (bisecting agents by replaying inputs) to minutes (reading the chain). The security benefit — detecting tampering — is secondary but becomes critical when agents cross trust boundaries.

6. Hands-On — Complete Message Schema for a Three-Agent Code Review Pipeline

Theory is valuable. A complete, runnable schema you can adapt is more valuable. This section builds a production-style message schema for a Code Review Pipeline with three agents — a scenario familiar to every English-speaking developer who's worked with CI/CD systems.

6.1 The Scenario

Three agents collaborate to review a pull request:

  1. ResearchAgent — scans the PR diff, identifies issues, gathers context from the codebase
  2. WriterAgent — takes the research findings and drafts a human-readable review
  3. ReviewerAgent — validates the draft, checks for consistency with findings, assigns a final score

Each agent produces a typed output message and passes it to the next. Each message carries the same trace_id across the entire pipeline, and each step appends to the proof_chain.

6.2 Pydantic Models — Complete Type-Safe Definitions

Python developers overwhelmingly use Pydantic for data validation. Here are complete, copy-pasteable model definitions for all three message types:

from __future__ import annotations

import hashlib
import json
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from uuid import uuid4

from pydantic import BaseModel, Field, field_validator


# ── Shared Enums ──────────────────────────────────────────────

class Severity(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


class Verdict(str, Enum):
    APPROVED = "approved"
    CHANGES_REQUESTED = "changes_requested"
    NEEDS_DISCUSSION = "needs_discussion"


# ── Shared Mixins ─────────────────────────────────────────────

class MessageMeta(BaseModel):
    """Metadata shared across all message types in this pipeline."""
    trace_id: str = Field(default_factory=lambda: uuid4().hex)
    sender_agent_id: str
    sender_agent_version: str
    timestamp: datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc)
    )


class VerifiableMessage(BaseModel):
    """Adds verification fields. Inherited by all message types."""
    schema_version: str = "1.0.0"
    content_hash: Optional[str] = None
    proof_chain: list[dict] = Field(default_factory=list)

    def compute_content_hash(self) -> str:
        """Hash the data fields (subclass must define _data_fields)."""
        data = self.model_dump(include=self._data_fields(), mode="json")
        canonical = json.dumps(data, sort_keys=True, separators=(",", ":"))
        h = hashlib.sha256(canonical.encode("utf-8"))
        return f"sha256:{h.hexdigest()}"

    def seal(self, agent_id: str) -> None:
        """Finalize: compute hash, append proof chain entry."""
        self.content_hash = self.compute_content_hash()
        self.proof_chain.append({
            "agent_id": agent_id,
            "content_hash": self.content_hash,
            "timestamp": datetime.now(timezone.utc).isoformat()
        })

    @staticmethod
    def _data_fields() -> set[str]:
        raise NotImplementedError


# ── Message Type 1: ResearchOutput ────────────────────────────

class Finding(BaseModel):
    file: str
    line_range: Optional[str] = None
    issue: str
    severity: Severity
    suggestion: Optional[str] = None


class ResearchOutput(MessageMeta, VerifiableMessage):
    """Produced by ResearchAgent — structured analysis of the PR."""
    message_type: str = Field(default="research_output", frozen=True)
    pr_number: int
    repository: str
    findings: list[Finding]
    sources_analyzed: list[str]
    gaps: list[str] = Field(
        default_factory=list,
        description="Areas the agent couldn't fully analyze"
    )
    keywords: list[str] = Field(
        default_factory=list,
        description="Key themes extracted for the writer"
    )

    @staticmethod
    def _data_fields() -> set[str]:
        return {"pr_number", "repository", "findings", "sources_analyzed",
                "gaps", "keywords"}


# ── Message Type 2: DraftOutput ───────────────────────────────

class Section(BaseModel):
    heading: str
    content: str
    findings_referenced: list[int] = Field(
        default_factory=list,
        description="Indexes into the ResearchOutput.findings list"
    )


class DraftOutput(MessageMeta, VerifiableMessage):
    """Produced by WriterAgent — human-readable review draft."""
    message_type: str = Field(default="draft_output", frozen=True)
    pr_number: int
    title: str
    summary: str
    sections: list[Section]
    word_count: int
    requires_human_review: bool = False

    @staticmethod
    def _data_fields() -> set[str]:
        return {"pr_number", "title", "summary", "sections",
                "word_count", "requires_human_review"}


# ── Message Type 3: ReviewOutput ──────────────────────────────

class Issue(BaseModel):
    severity: Severity
    description: str
    location: Optional[str] = None  # e.g., "Draft section 2, paragraph 3"


class ReviewOutput(MessageMeta, VerifiableMessage):
    """Produced by ReviewerAgent — final validation and scoring."""
    message_type: str = Field(default="review_output", frozen=True)
    pr_number: int
    verdict: Verdict
    score: int = Field(ge=0, le=100)
    issues: list[Issue] = Field(default_factory=list)
    suggestions: list[str] = Field(default_factory=list)
    reviewer_notes: Optional[str] = None

    @field_validator("score")
    @classmethod
    def score_must_match_verdict(cls, v, info):
        verdict = info.data.get("verdict")
        if verdict == Verdict.APPROVED and v < 70:
            raise ValueError("Approved reviews must score >= 70")
        return v

    @staticmethod
    def _data_fields() -> set[str]:
        return {"pr_number", "verdict", "score", "issues",
                "suggestions", "reviewer_notes"}

6.3 Complete End-to-End Pipeline

Here's the pipeline orchestrator that wires everything together — trace_id propagates through all three messages, and the proof_chain accumulates at each step:

def run_code_review_pipeline(pr_number: int, repository: str) -> ReviewOutput:
    """Execute the full three-agent code review pipeline.

    trace_id is generated once and propagated through all messages.
    proof_chain accumulates as each agent seals its output.
    """
    trace_id = uuid4().hex

    # ── Step 1: Research Agent ──
    research_output = ResearchOutput(
        trace_id=trace_id,
        sender_agent_id="research-agent",
        sender_agent_version="2.1.0",
        pr_number=pr_number,
        repository=repository,
        findings=[
            Finding(
                file="middleware/rate_limiter.go",
                line_range="42-67",
                issue="Unbounded goroutine spawn under high concurrency",
                severity=Severity.HIGH,
                suggestion="Add worker pool with max concurrency limit"
            ),
            Finding(
                file="config/rate_limits.yaml",
                issue="Missing rate limit for admin endpoint",
                severity=Severity.MEDIUM,
                suggestion="Add /admin/* rate limit block"
            ),
        ],
        sources_analyzed=["git_diff", "static_analysis", "test_coverage_report"],
        gaps=["No integration test coverage for rate limiter edge cases"],
        keywords=["goroutine leak", "rate limiting", "admin security",
                  "concurrency safety"]
    )
    research_output.seal(agent_id="research-agent")
    print(f"[Research] {len(research_output.findings)} findings, "
          f"hash={research_output.content_hash[:20]}...")

    # ── Step 2: Writer Agent ──
    draft_output = DraftOutput(
        trace_id=trace_id,
        sender_agent_id="writer-agent",
        sender_agent_version="1.5.0",
        pr_number=pr_number,
        title=f"Code Review: PR #{pr_number} — {repository}",
        summary=(
            "This PR introduces rate limiting middleware with two issues: "
            "a high-severity goroutine leak risk and a medium-severity "
            "missing admin endpoint rate limit."
        ),
        sections=[
            Section(
                heading="Critical: Goroutine Spawn Safety",
                content=(
                    "The rate limiter in middleware/rate_limiter.go:42-67 "
                    "spawns a new goroutine per request under burst traffic. "
                    "At 10k concurrent requests, this creates 10k goroutines — "
                    "likely exceeding the scheduler's capacity and causing OOM. "
                    "Mitigation: add a bounded worker pool."
                ),
                findings_referenced=[0]
            ),
            Section(
                heading="Configuration Gap: Admin Endpoint",
                content=(
                    "config/rate_limits.yaml defines rate limits for all public "
                    "endpoints but omits the /admin/* namespace. The admin "
                    "endpoint is unauthenticated in the current setup, making it "
                    "a vector for resource exhaustion. Add a /admin/* block with "
                    "stricter limits."
                ),
                findings_referenced=[1]
            ),
        ],
        word_count=378,
        requires_human_review=True
    )
    # Carry forward the proof chain from the research step
    draft_output.proof_chain = research_output.proof_chain.copy()
    draft_output.seal(agent_id="writer-agent")
    print(f"[Writer] {draft_output.word_count} words, "
          f"hash={draft_output.content_hash[:20]}...")

    # ── Step 3: Reviewer Agent ──
    review_output = ReviewOutput(
        trace_id=trace_id,
        sender_agent_id="reviewer-agent",
        sender_agent_version="3.0.1",
        pr_number=pr_number,
        verdict=Verdict.CHANGES_REQUESTED,
        score=55,
        issues=[
            Issue(
                severity=Severity.HIGH,
                description=(
                    "Draft correctly identifies the goroutine leak but the "
                    "suggested fix (bounded worker pool) lacks specifics — "
                    "pool size, queue depth, rejection policy. The writer "
                    "should include concrete configuration values."
                ),
                location="Draft section 1, paragraph 1"
            ),
            Issue(
                severity=Severity.LOW,
                description=(
                    "Draft mentions 'OOM' without explaining to the PR author "
                    "what OOM is. Add a brief definition or link."
                ),
                location="Draft section 1, paragraph 1"
            ),
        ],
        suggestions=[
            "Specify worker pool parameters: max_workers=100, queue_size=1000",
            "Add OOM definition footnote for non-systems readers",
            "Consider adding a positive note — the PR's architecture is solid "
            "aside from these two issues"
        ],
        reviewer_notes=(
            "Overall good draft. The writer correctly identified both issues. "
            "The goroutine fix recommendation needs more engineering detail "
            "before this review is actionable for the PR author."
        )
    )
    review_output.proof_chain = draft_output.proof_chain.copy()
    review_output.seal(agent_id="reviewer-agent")
    print(f"[Reviewer] Verdict={review_output.verdict.value}, "
          f"score={review_output.score}, hash={review_output.content_hash[:20]}...")

    print(f"\n── Proof Chain ({len(review_output.proof_chain)} entries) ──")
    for entry in review_output.proof_chain:
        print(f"  {entry['agent_id']}: {entry['content_hash'][:20]}...")

    return review_output

6.4 Alternative: TypeScript + Zod

If your agent system runs in Node.js/TypeScript, here's the equivalent schema using Zod — the de facto validation library for English-speaking TypeScript developers:

import { z } from "zod";
import { createHash } from "node:crypto";

// ── Shared ───────────────────────────────────────────────────

const SeverityEnum = z.enum(["low", "medium", "high", "critical"]);
const VerdictEnum = z.enum(["approved", "changes_requested", "needs_discussion"]);

const MessageMeta = z.object({
  trace_id: z.string(),
  sender_agent_id: z.string(),
  sender_agent_version: z.string(),
  timestamp: z.string().datetime(),
});

// ── ResearchOutput ───────────────────────────────────────────

const FindingSchema = z.object({
  file: z.string(),
  line_range: z.string().optional(),
  issue: z.string(),
  severity: SeverityEnum,
  suggestion: z.string().optional(),
});

const ResearchOutputSchema = MessageMeta.extend({
  message_type: z.literal("research_output"),
  pr_number: z.number().int().positive(),
  repository: z.string(),
  findings: z.array(FindingSchema),
  sources_analyzed: z.array(z.string()),
  gaps: z.array(z.string()).default([]),
  keywords: z.array(z.string()).default([]),
  schema_version: z.string().default("1.0.0"),
  content_hash: z.string().optional(),
  proof_chain: z.array(z.record(z.unknown())).default([]),
});

// ── DraftOutput ──────────────────────────────────────────────

const SectionSchema = z.object({
  heading: z.string(),
  content: z.string(),
  findings_referenced: z.array(z.number().int()).default([]),
});

const DraftOutputSchema = MessageMeta.extend({
  message_type: z.literal("draft_output"),
  pr_number: z.number().int().positive(),
  title: z.string(),
  summary: z.string(),
  sections: z.array(SectionSchema),
  word_count: z.number().int().positive(),
  requires_human_review: z.boolean().default(false),
  schema_version: z.string().default("1.0.0"),
  content_hash: z.string().optional(),
  proof_chain: z.array(z.record(z.unknown())).default([]),
});

// ── ReviewOutput ─────────────────────────────────────────────

const IssueSchema = z.object({
  severity: SeverityEnum,
  description: z.string(),
  location: z.string().optional(),
});

const ReviewOutputSchema = MessageMeta.extend({
  message_type: z.literal("review_output"),
  pr_number: z.number().int().positive(),
  verdict: VerdictEnum,
  score: z.number().int().min(0).max(100),
  issues: z.array(IssueSchema).default([]),
  suggestions: z.array(z.string()).default([]),
  reviewer_notes: z.string().optional(),
  schema_version: z.string().default("1.0.0"),
  content_hash: z.string().optional(),
  proof_chain: z.array(z.record(z.unknown())).default([]),
});

// ── Serialization helpers ────────────────────────────────────

function computeContentHash(data: Record<string, unknown>): string {
  const canonical = JSON.stringify(data, Object.keys(data).sort());
  const h = createHash("sha256");
  h.update(canonical);
  return `sha256:${h.digest("hex")}`;
}

// Type inference — use these in your handler signatures
type ResearchOutput = z.infer<typeof ResearchOutputSchema>;
type DraftOutput = z.infer<typeof DraftOutputSchema>;
type ReviewOutput = z.infer<typeof ReviewOutputSchema>;

6.5 Key Design Decisions in This Schema

trace_id propagates through all messages. Generated once at pipeline start, the same trace_id appears in ResearchOutput, DraftOutput, and ReviewOutput. This is how you link all three messages to the same logical pipeline execution — a single query in your observability tool returns the full pipeline trace.

proof_chain accumulates at each step. Each agent copies the previous agent's proof chain and appends its own entry. The final ReviewOutput contains three entries — one per agent. If the reviewer's score seems off, you can verify the research findings hash and the draft hash independently.

findings_referenced creates an explicit link between Draft and Research. The WriterAgent's sections reference specific findings by index. This creates a verifiable trail: "Which research finding did this draft section come from?" — essential when the reviewer challenges a claim in the draft.

gaps and requires_human_review provide escape hatches. Agents should know what they don't know. The ResearchAgent's gaps field tells downstream agents what wasn't analyzed, preventing overconfident conclusions. The WriterAgent's requires_human_review flag escalates ambiguous drafts before they reach the PR author.

📌 Adapt this schema to your pipeline: This three-agent pattern generalizes to any linear pipeline — replace "code review" with "customer support ticket triage," "content moderation," or "lead qualification." The structure is the same: upstream agent analyzes → midstream agent synthesizes → downstream agent validates. Change the field names to match your domain; keep the trace_id + proof_chain skeleton intact.

7. Frequently Asked Questions

Q: How is agent message schema design different from REST API design?

REST APIs are request-response with a clear client/server boundary — the client sends a request, the server sends a response, the interaction ends. Agent messages are multi-hop and asynchronous: a message may pass through three agents, each transforming the data before passing it on. This has two direct schema implications:

① Embedded provenance: REST APIs rely on external systems for tracing — request IDs in HTTP headers, OpenTelemetry spans, separate log aggregation. Agent messages must carry their own provenance (proof_chain, sender_agent_id, content_hash) because there's no external tracing infrastructure guaranteed to be available at every hop.

② Versioning model: REST APIs version at the endpoint level (/v1/tasks vs /v2/tasks) — the server controls which version to use. Agent messages version at the message level (schema_version in every message) — because each agent in the chain may be running a different version, and messages may be stored in queues where they're read by agents with different version expectations.

TL;DR: REST API design optimizes for synchronous client/server contracts. Agent message design optimizes for asynchronous, multi-hop data flows with embedded integrity and versioning.

Q: JSON or Protobuf for agent messages?

In-process or same-host agent systems: JSON. Readability beats microsecond serialization gains when you're debugging at 2 AM and need to inspect a message payload. JSON's human-readability is not a luxury — it's a debugging tool. The serialization overhead is negligible compared to the LLM inference time that dominates agent execution.

Cross-host or cross-language systems: consider Protobuf with JSON compatibility mode. Google's A2A (Agent-to-Agent) protocol chose Proto-first with JSON wire compatibility for exactly this reason: Protobuf gives you schema enforcement and compact serialization at the wire level, while JSON compatibility means you can still inspect messages during development.

If you're unsure, start with JSON. The schema design principles in this article — four-layer model, message type taxonomy, versioning — are format-agnostic. They apply equally to JSON, Protobuf, MessagePack, or any serialization format. Only switch to Protobuf when serialization overhead becomes measurable in your profiling (i.e., when your agent pipeline processes thousands of messages per second and the JSON encode/decode shows up in flame graphs).

Q: How does this relate to MCP and A2A?

MCP (Model Context Protocol) defines agent-to-tool communication. It uses JSON-RPC 2.0 as the wire format, with typed ToolResult responses. MCP's tools/list, tools/call, and resources/read methods are concrete implementations of the message types defined in Section 3 of this article.

A2A (Agent-to-Agent) defines agent-to-agent communication at the wire level. It uses Protobuf with streaming support, with typed Task and Message structures. A2A's task lifecycle management (pending → working → completed/failed) maps directly to the StatusUpdate message type in Section 3.4.

How this article fits: Think of this article as the design rationale that MCP and A2A implement. MCP's ToolResult is a concrete instance of the ToolResult type in Section 3.2. A2A's Task type maps to TaskHandoff in Section 3.1, with added streaming semantics. The four-layer model (Section 2) explains why both protocols separate data from metadata from verification — it's not arbitrary; it's architectural.

If you're building on MCP or A2A, this article helps you understand the design decisions behind them. If you're building your own agent protocol, this article gives you the patterns to follow.

Q: What happens to in-flight tasks when schema_version goes from 1.0 to 2.0?

This is the most common production question for schema versioning. There are three strategies, ordered by complexity:

Strategy 1 — Additive (zero-downtime): New tasks use v2 schemas. Tasks already in-flight continue with v1 until they complete. No migration needed, no dual processing. This works when v1 and v2 differ only by optional fields. Production recommendation: use this for all non-breaking changes.

Strategy 2 — Migratory (requires upgrade code): Deploy an upgrade proxy that converts v1 messages to v2 by adding default values for new required fields. During the transition window, the proxy intercepts all v1 messages and upgrades them. Once all producers emit v2 natively, remove the proxy. Production recommendation: use this when v2 requires new fields that v1 didn't have.

Strategy 3 — Dual-Write (highest overhead): Run both v1 and v2 handlers simultaneously. Route a percentage of traffic to v2, monitor for errors, increase the percentage gradually. This doubles processing overhead during migration. Production recommendation: only for systems where even a brief outage during strategy 2 proxy deployment is unacceptable.

The golden rule: In-flight tasks should never be disrupted by a schema change. The task started with a contract — let it finish under that contract. Schema version changes only apply to new tasks.

Q: Do I need content_hash and signature on every message?

No. The verification fields are graduated — apply them based on risk, not uniformly.

  • Low-risk messages (internal heartbeats, log entries, status updates between co-located agents): omit both. The overhead isn't justified.
  • Medium-risk messages (task handoffs, tool results that affect state, approval requests): include content_hash only. This is the minimum bar for any message where corruption would cause incorrect behavior.
  • High-risk messages (cross-trust-domain messages, financial operations, production config changes): include content_hash + signature + proof_chain (if multi-hop).

Make verification fields optional in your base schema — don't force every message to carry them. Use the message type's inherent risk profile (or an explicit risk_level field) to gate verification at runtime. The most expensive verification field is the one you include but never check.

Q: How do I avoid over-designing the schema?

Start minimal and add fields only when real problems arise. Over-designed schemas are the #1 reason teams abandon schema discipline entirely — they try to design the "perfect" schema upfront, get bogged down in hypothetical field discussions, and revert to bare {"key": "value"} dicts.

Here's a concrete, problem-driven approach:

  • Debugging is painful (can't tell which agent produced which output) → add sender_agent_id and correlation_id to the Metadata Layer.
  • Agent count grows beyond 3 (routing logic gets complex) → add the full Metadata Layer with receiver_agent_id and task_id.
  • Schema changes break downstream agents (deployment coordination headaches) → add schema_version and implement the forward compatibility checker from Section 4.6.
  • Messages cross process boundaries (serialization, network, queue issues) → add content_hash to the Verification Layer.
  • Security audit requirement (compliance, SOC 2, cross-org communication) → add signature and proof_chain.
  • Messages flow through async queues (retries, deduplication, TTL) → add the Routing Layer.

Never pre-design fields for "we might need this someday." The most expensive field is the one nobody uses but everyone has to maintain — it appears in every message payload, every test fixture, every documentation example, every onboarding session. If you can't point to a specific, current problem that a field solves, don't add it.

The schema should grow with your system's complexity, not dictate it.

This article is part of the Agent Communication and Protocols series. Here's where to go next:

New to agent engineering? Start with What Is an AI Agent?