Multi-Agent Debate System: Production Deployment

We've come a long way.

In L1, you made two agents challenge each other — a simple idea, but already more reliable than a single answer. L2 gave it structure — a three-round protocol, multi-dimensional scoring, an argument trace table — turning "free debate" into "auditable debate." L3 tackled the hardest problem — what if the judge itself is unreliable — introducing a multi-judge expert panel, score calibration, Krippendorff's Alpha, and Fleiss' Kappa consensus metrics.

But all of this was scripts. You ran python debate_consensus.py in a terminal, watched it print a conclusion, then closed the terminal. That's not a product — that's a prototype.

This article's goal: turn everything from L1-L3 into a reliable service that can be deployed to production and depended on by real teams.

This isn't about writing more debate logic — it's about async orchestration, session management, error recovery, cost control, observability. In other words: it's about turning research code into engineering systems.

Real-World Use Cases

Before diving into architecture, let's answer the fundamental question: what real business scenarios would genuinely need a multi-agent debate system running in production?

Use Case 1: Market Analysis (Bull vs. Bear)

An investment team needs to evaluate dozens of market signals daily. Traditionally, analysts read each one and form judgments. A debate system replaces this with:

The debate result isn't "buy" or "sell" — it's a structured summary of key disagreements: which arguments both sides agree on (low divergence), which have fundamental disagreement (high divergence). Analysts don't need to accept AI conclusions wholesale — they only need to focus on "issues the AI itself can't reach consensus on."

💡 Production characteristics: Market analysis needs low latency (results within minutes of news) and high throughput (dozens of debates per day). This drives architectural choices: parallel agent calls + lightweight message queues.

Use Case 2: Technical Decision Review

An engineering team faces a technology choice — "monolith or microservices," "PostgreSQL or MongoDB," "build or buy." The traditional approach is hours of meetings, with decision quality heavily dependent on the loudest voice in the room.

A debate system can:

  1. Before the meeting, run a structured debate to map out all key arguments.
  2. Use historical accuracy weights (L3's calibration mechanism) to assess each argument's credibility.
  3. If the judge panel Alpha drops below 0.50 — flag before the meeting that this is "a highly contentious issue requiring deep human discussion."

The meeting shifts from "should we use microservices" to "targeted discussion on the three core divergence points identified by AI" — radically different efficiency.

Use Case 3: Policy / Compliance Evaluation

For regulated industries (finance, healthcare, privacy), every policy change can trigger dozens of regulatory constraints. Traditionally, legal and compliance teams review each one. A debate system can:

⚠️ Compliance-specific requirement: Debate records must be immutable — every argument, every judge decision needs timestamp and version tracking. This is why our architecture includes full audit log tables.

Use Case 4: Content Moderation Appeals

A platform's content moderation system automatically flags user content. The user appeals. Traditionally, a human reviewer re-examines — but this doesn't scale.

A debate system can have two agents debate: one representing platform moderation standards (defending the original decision), one representing the user (making an interpretive defense of the content). The judge panel scores against platform content policies. If the debate is clear (Alpha ≥ 0.80), auto-resolve. If highly divergent (Alpha < 0.50), escalate to human review.

Use Case Debate Mode Judge Panel Key Production Req.
Market Analysis L2 Structured Tech + Business + Risk Low latency, high throughput
Tech Decision Review L3 Consensus Tech + Business + General Auditable, human fallback
Compliance Evaluation L3 Consensus Risk + Tech + General Audit trail, immutability
Content Appeals L2/L3 Hybrid General + Risk Large scale, auto-escalation

System Architecture

A production-grade debate involves multiple LLM calls, multi-round orchestration, state persistence, and error recovery. It's not a Python script — it's an information-processing pipeline.

Core Components

Component Responsibility Tech Choice
Debate Orchestrator Manage debate session lifecycle: create → execute → complete asyncio + coroutines
Session Store Persist all debate state, intermediate results, final conclusions SQLite (startup) / PostgreSQL (scale)
Audit Log Record complete timeline of each round, each agent call, each judge score Structured log table + JSON columns
Cost Tracker Real-time token tracking and cost estimation, with budget alerts Token counting + model pricing table
LLM Gateway Unified LLM call interface, multi-model support, load balancing, rate limiting OpenAI SDK + retry middleware
Monitoring Dashboard Debate success rate, avg duration, cost trends, Alpha distribution Metrics collection + visualization

Async Orchestration Pattern

Debate system LLM calls are inherently I/O-bound — waiting for API responses dominates CPU time. Asynchronous programming is therefore mandatory, not optional.

Core orchestration flow:

async def run_debate_pipeline(session):
    """Async orchestration flow for a complete debate"""
    with session_timeout(300):  # 5-minute global timeout
        # Phase 1: Opening arguments — parallelizable
        pro_args, con_args = await asyncio.gather(
            pro_agent.generate_opening(topic),
            con_agent.generate_opening(topic)
        )

        # Phase 2: Cross-examination — sequential (depends on opponent)
        pro_cross = await pro_agent.cross_examine(con_args)
        con_cross = await con_agent.cross_examine(pro_args)

        # Phase 3: Closing — parallelizable
        pro_close, con_close = await asyncio.gather(
            pro_agent.closing_statement(con_cross),
            con_agent.closing_statement(pro_cross)
        )

        # Phase 4: Judge evaluation — fully parallel (independent)
        judge_results = await asyncio.gather(*[
            judge.evaluate(transcript)
            for judge in judge_panel
        ])

        # Phase 5: Consensus calc — CPU-bound, no async needed
        consensus = compute_consensus(judge_results)

        return DebateResult(pro_args, con_args, consensus)

Key design decisions:

💡 Latency math: Assume 3 seconds per LLM call. Sequential execution of all 5 phases = 15 seconds. With parallel optimization: Phase 1 (parallel) = 3s → Phase 2 (serial) = 3s → Phase 3 (parallel) = 3s → Phase 4 (parallel) = 3s → Phase 5 (sync) = 0.2s = ~12 seconds total. A 20% saving, with greater impact under high concurrency.

Session State Management

Debates aren't instantaneous. An L3 consensus debate from creation to conclusion can take 30-60 seconds. During this time, callers need non-blocking status queries.

Debate session state machine:

Status Meaning Transitions To
CREATED Session created, execution not yet started DEBATING, FAILED
DEBATING Debate flow in progress (agents interacting) JUDGING, FAILED, TIMED_OUT
JUDGING Debate complete, judge evaluation in progress COMPLETED, FAILED
COMPLETED Successfully completed, results stored (terminal)
FAILED Execution failed (API error, parse error, etc.) DEBATING (retry)
TIMED_OUT Exceeded global timeout limit (terminal)

Every state change is written to the audit log. If a debate fails, you can precisely reconstruct from the logs: at which phase, which agent, which API call went wrong.

Caching Strategy

Debate systems have significant redundant computation opportunities:

  1. Identical topic caching: If the same topic is submitted again within a short window — return cached result (configurable TTL, e.g., 1 hour).
  2. Judge evaluation caching: If the same debate's judge evaluations have been computed — don't recompute for every query.
  3. Cross-session argument reuse: If the Pro agent generated an argument for Topic A, and Topic B is a variant of A — that argument can be injected as warmup context, reducing generation latency.
Cache Tier Content TTL Storage
L1: Session result cache Full debate result (session_id → result) 1 hour In-memory LRU
L2: Topic hash cache Normalized topic hash → session_id 1 hour Redis / SQLite
L3: Judge eval cache (debate transcript hash + judge config hash) → eval results 24 hours SQLite

Performance Optimization

Parallel Agent Calls

Any agent call that doesn't depend on another's output should be parallel. Specific rules:

Phase Parallel Strategy Latency Saved
Opening (both sides) asyncio.gather(pro_opening, con_opening) ~50%
Cross-examination Sequential (data dependency) N/A
Closing (both sides) asyncio.gather(pro_close, con_close) ~50%
Judging (N judges) asyncio.gather(*judges) ~75% (for 4 judges)
Consensus computation Synchronous (CPU-bound) N/A

Streaming vs. Batch Debate

Two operating modes for different scenarios:

Mode Behavior Use Case
Streaming (sync wait) User submits topic, blocks until full result returns Interactive analysis (user initiates debate from dashboard)
Batch (async submit) User submits topic, immediately returns session_id, poll or webhook for result Scheduled tasks (daily market analysis), large-scale batch evaluation

Streaming mode requires SSE (Server-Sent Events) to push real-time progress for each debate phase — this not only improves UX but allows users to see key arguments mid-debate and intervene early.

Cost Estimation & Budgeting

Running a debate system in production means you need to know roughly how much you'll spend before calling the LLM.

Debate Mode LLM Calls Est. Tokens Est. Cost (GPT-4o) Est. Cost (DeepSeek)
L1 Simple Debate ~6 ~8,000 $0.04 $0.002
L2 Structured Debate ~12 ~25,000 $0.15 $0.007
L3 Consensus (4 judges) ~20 ~60,000 $0.40 $0.02

Recommended cost control strategies:

  1. Topic tiering: Not every topic needs L3 consensus mode. Use DebateMode.SIMPLE for rapid exploration; only escalate truly critical questions to CONSENSUS.
  2. Daily budget cap: Set a global daily budget at the orchestrator level (e.g., $10/day). Once cumulative daily spend exceeds the threshold, auto-downgrade all new debates to cheaper modes.
  3. Model sharding: Different agents use different models. Pro and Con agents use cheaper models (e.g., GPT-4o-mini or DeepSeek) to generate arguments; the judge panel uses stronger models (e.g., GPT-4o) for evaluation. Argument generation needs breadth and creativity; evaluation needs precision and consistency.
💡 Counterintuitive cost-saving strategy: Don't reduce judge count to save money. Two judges cost more than four — because two judges are more likely to produce high divergence → require human intervention → human time is far more expensive than API calls. Four judges give better consensus signals, reducing downstream human costs.

Production Operations

Error Handling Matrix

A debate system involves 6-20 LLM API calls. Every single one can fail. You need error classification and differentiated handling:

Error Type Example Strategy Max Retries
Transient API Error 429 (rate limit), 503 (unavailable) Exponential backoff: 1s, 2s, 4s, 8s 4
Content Filter Error API refuses to generate (safety filter) Mark argument as "filtered," continue with remaining 0 (no retry)
JSON Parse Failure LLM output doesn't match expected JSON format Retry with stricter format prompt 2
Timeout Error Single call exceeds 60s with no response Cancel call, retry once 1
Authentication Error 401 (invalid API key) No retry — alert immediately 0

Tiered Timeout Management

Three-layer timeout protection:

# Per-layer timeout config
TIMEOUT_SINGLE_CALL = 60      # Single LLM API call
TIMEOUT_PER_PHASE = 120        # Single debate phase (e.g., cross-exam)
TIMEOUT_GLOBAL_SESSION = 300   # Entire debate

If a single agent call times out, the system can:

  1. Replace that agent's output with default/placeholder content (degraded mode).
  2. Mark that argument as "timeout fill" in the audit log, ensuring downstream decision-makers know it's not a real debate result.

Audit & Monitoring

A production debate system needs monitoring far beyond "API call success/failure":

Metric Category Specific Metrics Suggested Alert Threshold
Availability Debate success rate, failure rate, timeout rate Success rate < 95% alert
Performance P50/P95/P99 debate duration, per-phase time distribution P95 > 120s alert
Cost Daily/per-debate/per-user cost, token consumption trends Daily cost > 80% budget alert
Consensus Quality Alpha/Kappa distribution, irreconcilable divergence rate Irreconcilable > 30% investigate
Judge Health Per-judge score mean, std dev, deviation from other judges Single judge sustained deviation > 2σ alert
Model Availability Per-model error rate, latency, rate limit trigger frequency Single model error rate > 10% switch
💡 Dashboard philosophy: Don't just watch "is the system up" — watch "is debate quality declining." If the Alpha distribution suddenly shifts downward (e.g., from avg 0.75 to 0.55), it could be due to a model update or increased topic difficulty — this is a signal to investigate, not something you discover only after the system crashes.

Code: Production-Grade Debate Orchestrator

The code below wraps all L1-L3 components into a deployable production service. Core components:

  1. Async Orchestrator (DebateOrchestrator): Manages debate session lifecycle, controls parallel/serial execution flow.
  2. Session Store (SessionStore): SQLite persistence with session and audit log tables.
  3. Cost Tracker (CostTracker): Real-time token counting and cost estimation.
  4. REST API (function-based): Create debates, query status, fetch results, estimate costs.

Save as debate_orchestrator.py, in the same directory as the L1-L3 files.

"""
Production-Grade Multi-Agent Debate Orchestrator
────────────────────────────────────────────────
Wraps L1 (debate.py), L2 (debate_protocol.py), L3 (debate_consensus.py)
into a deployable production service. Provides:
  - Async debate execution (asyncio)
  - Session state persistence (SQLite)
  - Audit logging
  - Cost tracking & budget control
  - Error recovery (retry + timeout + degradation)
  - REST-ish API interface
  - Monitoring metrics

Requires: pip install openai aiosqlite
"""
import asyncio
import hashlib
import json
import math
import os
import time
import uuid
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Dict, List, Optional

import aiosqlite
from openai import AsyncOpenAI

# ──────────────────────────────────────────────
# Import L1-L3 components (assumes same directory)
# ──────────────────────────────────────────────
# from debate import SimpleDebateAgent, run_debate
# from debate_protocol import (
#     Argument, StructuredDebateAgent, StructuredJudge,
#     run_structured_debate
# )
# from debate_consensus import (
#     JudgeProfile, ExpertiseDomain, MultiJudgePanel,
#     ScoreCalibrator, WeightedVoter, ConsensusCalculator,
#     PanelResult, run_consensus_debate
# )

# ──────────────────────────────────────────────
# Client Configuration
# ──────────────────────────────────────────────
async_client = AsyncOpenAI(
    api_key="your-api-key",
    base_url="https://api.example.com/v1",
    timeout=60.0,
    max_retries=2,
)


# ══════════════════════════════════════════════
# 1. Enums & Data Classes
# ══════════════════════════════════════════════

class DebateMode(str, Enum):
    SIMPLE = "simple"          # L1: Free-form debate
    STRUCTURED = "structured"  # L2: Structured protocol
    CONSENSUS = "consensus"    # L3: Multi-judge consensus


class SessionStatus(str, Enum):
    CREATED = "created"
    DEBATING = "debating"
    JUDGING = "judging"
    COMPLETED = "completed"
    FAILED = "failed"
    TIMED_OUT = "timed_out"


class EventType(str, Enum):
    DEBATE_STARTED = "debate_started"
    ROUND_START = "round_start"
    ROUND_COMPLETE = "round_complete"
    LLM_CALL = "llm_call"
    LLM_ERROR = "llm_error"
    RETRY = "retry"
    JUDGE_SCORE = "judge_score"
    CONSENSUS_CALC = "consensus_calc"
    DEBATE_COMPLETED = "debate_completed"
    DEBATE_FAILED = "debate_failed"


@dataclass
class CostRecord:
    """Per-model cost tracking record"""
    model: str
    prompt_tokens: int = 0
    completion_tokens: int = 0
    call_count: int = 0
    estimated_cost_usd: float = 0.0


@dataclass
class DebateSession:
    """Debate session — full lifecycle data"""
    session_id: str
    topic: str
    mode: DebateMode = DebateMode.CONSENSUS
    status: SessionStatus = SessionStatus.CREATED
    pro_model: str = "gpt-4o"
    con_model: str = "gpt-4o"
    judge_models: list = field(default_factory=lambda: ["gpt-4o"])
    created_at: str = ""
    completed_at: str = ""
    elapsed_seconds: float = 0.0
    costs: Dict[str, CostRecord] = field(default_factory=dict)
    result: Optional[Dict] = None
    error: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3
    timeout_seconds: int = 300


# ══════════════════════════════════════════════
# 2. Session Store (SQLite)
# ══════════════════════════════════════════════

class SessionStore:
    """Persistent debate session storage + audit log"""

    def __init__(self, db_path: str = "debate_sessions.db"):
        self.db_path = db_path

    async def init(self):
        """Initialize database tables"""
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute("""
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id      TEXT PRIMARY KEY,
                    topic           TEXT NOT NULL,
                    mode            TEXT NOT NULL DEFAULT 'consensus',
                    status          TEXT NOT NULL DEFAULT 'created',
                    pro_model       TEXT DEFAULT 'gpt-4o',
                    con_model       TEXT DEFAULT 'gpt-4o',
                    judge_models    TEXT DEFAULT '["gpt-4o"]',
                    created_at      TEXT NOT NULL,
                    completed_at    TEXT,
                    elapsed_seconds REAL DEFAULT 0,
                    costs_json      TEXT DEFAULT '{}',
                    result_json     TEXT,
                    error           TEXT,
                    retry_count     INTEGER DEFAULT 0
                )
            """)
            await db.execute("""
                CREATE TABLE IF NOT EXISTS audit_log (
                    id            INTEGER PRIMARY KEY AUTOINCREMENT,
                    session_id    TEXT NOT NULL,
                    event_type    TEXT NOT NULL,
                    agent_name    TEXT,
                    round_number  INTEGER,
                    timestamp     TEXT NOT NULL,
                    data_json     TEXT,
                    FOREIGN KEY (session_id)
                        REFERENCES sessions(session_id)
                )
            """)
            # Query indexes
            await db.execute(
                "CREATE INDEX IF NOT EXISTS idx_audit_session "
                "ON audit_log(session_id)"
            )
            await db.execute(
                "CREATE INDEX IF NOT EXISTS idx_sessions_status "
                "ON sessions(status)"
            )
            await db.commit()

    async def save_session(self, session: DebateSession):
        """Save or update a session"""
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute("""
                INSERT OR REPLACE INTO sessions
                (session_id, topic, mode, status, pro_model, con_model,
                 judge_models, created_at, completed_at, elapsed_seconds,
                 costs_json, result_json, error, retry_count)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                session.session_id, session.topic,
                session.mode.value, session.status.value,
                session.pro_model, session.con_model,
                json.dumps(session.judge_models),
                session.created_at, session.completed_at,
                session.elapsed_seconds,
                json.dumps(
                    {k: asdict(v) for k, v in session.costs.items()}
                ),
                json.dumps(session.result, ensure_ascii=False)
                    if session.result else None,
                session.error, session.retry_count,
            ))
            await db.commit()

    async def get_session(self, session_id: str) -> Optional[Dict]:
        """Query a single session"""
        async with aiosqlite.connect(self.db_path) as db:
            db.row_factory = aiosqlite.Row
            cursor = await db.execute(
                "SELECT * FROM sessions WHERE session_id = ?",
                (session_id,)
            )
            row = await cursor.fetchone()
            return dict(row) if row else None

    async def list_sessions(
        self, limit: int = 20, status: str = None
    ) -> list:
        """List recent sessions"""
        async with aiosqlite.connect(self.db_path) as db:
            db.row_factory = aiosqlite.Row
            if status:
                cursor = await db.execute(
                    "SELECT * FROM sessions WHERE status = ? "
                    "ORDER BY created_at DESC LIMIT ?",
                    (status, limit)
                )
            else:
                cursor = await db.execute(
                    "SELECT * FROM sessions "
                    "ORDER BY created_at DESC LIMIT ?",
                    (limit,)
                )
            return [dict(row) for row in await cursor.fetchall()]

    async def log_event(
        self, session_id: str, event_type: EventType,
        agent_name: str = None, round_number: int = None,
        data: Dict = None
    ):
        """Write an audit event"""
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute("""
                INSERT INTO audit_log
                (session_id, event_type, agent_name, round_number,
                 timestamp, data_json)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                session_id, event_type.value, agent_name,
                round_number,
                datetime.now(timezone.utc).isoformat(),
                json.dumps(data, ensure_ascii=False) if data else None,
            ))
            await db.commit()


# ══════════════════════════════════════════════
# 3. Cost Tracker
# ══════════════════════════════════════════════

class CostTracker:
    """Token counting + cost estimation (approximate pricing for budgeting)"""

    # $/1M tokens (input, output)
    PRICING: Dict[str, tuple] = {
        "gpt-4o":          (2.50,  10.00),
        "gpt-4o-mini":     (0.15,   0.60),
        "gpt-4-turbo":    (10.00,  30.00),
        "claude-3-opus":  (15.00,  75.00),
        "claude-3-sonnet": (3.00,  15.00),
        "deepseek-chat":   (0.14,   0.28),
        "deepseek-reasoner": (0.55, 2.19),
    }

    @classmethod
    def estimate_cost(
        cls, model: str, prompt_tokens: int, completion_tokens: int
    ) -> float:
        """Estimate cost for a single LLM call"""
        in_price, out_price = cls.PRICING.get(model, (5.0, 15.0))
        cost = (
            (prompt_tokens / 1_000_000) * in_price +
            (completion_tokens / 1_000_000) * out_price
        )
        return round(cost, 6)

    @classmethod
    def record_call(
        cls, costs: Dict[str, CostRecord], model: str,
        prompt_tokens: int, completion_tokens: int
    ):
        """Record one LLM call into the costs dict"""
        if model not in costs:
            costs[model] = CostRecord(model=model)
        c = costs[model]
        c.prompt_tokens += prompt_tokens
        c.completion_tokens += completion_tokens
        c.call_count += 1
        c.estimated_cost_usd += cls.estimate_cost(
            model, prompt_tokens, completion_tokens
        )

    @classmethod
    def total_cost(cls, costs: Dict[str, CostRecord]) -> float:
        """Total estimated cost"""
        return round(
            sum(c.estimated_cost_usd for c in costs.values()), 4
        )


# ══════════════════════════════════════════════
# 4. Error Handling & Retry
# ══════════════════════════════════════════════

async def with_retry(
    fn: Callable,
    session: DebateSession,
    store: SessionStore,
    label: str,
    max_retries: int = 3,
    base_delay: float = 1.0,
):
    """
    Exponential backoff retry wrapper for LLM calls.
    Differentiates error types for appropriate retry strategy.
    """
    last_error = None
    for attempt in range(max_retries + 1):
        try:
            return await fn()
        except Exception as e:
            last_error = e
            error_str = str(e).lower()

            # Non-retryable errors
            if "401" in error_str or "403" in error_str:
                await store.log_event(
                    session.session_id, EventType.LLM_ERROR,
                    agent_name=label,
                    data={"attempt": attempt + 1, "error": str(e),
                          "fatal": True}
                )
                raise

            # Content filter — no retry
            if "content_filter" in error_str or "safety" in error_str:
                await store.log_event(
                    session.session_id, EventType.LLM_ERROR,
                    agent_name=label,
                    data={"attempt": attempt + 1, "error": str(e),
                          "type": "content_filter"}
                )
                return "[Content filtered by safety policy]"

            if attempt >= max_retries:
                break

            delay = base_delay * (2 ** attempt)
            await store.log_event(
                session.session_id, EventType.RETRY,
                agent_name=label,
                data={"attempt": attempt + 1, "error": str(e),
                      "delay_seconds": delay}
            )
            await asyncio.sleep(delay)

    raise last_error


# ══════════════════════════════════════════════
# 5. Debate Orchestrator (Core)
# ══════════════════════════════════════════════

class DebateOrchestrator:
    """
    Production-grade debate orchestrator.
    Manages session lifecycle, async execution, error recovery, cost tracking.
    """

    def __init__(
        self,
        db_path: str = "debate_sessions.db",
        default_model: str = "gpt-4o",
        daily_budget_usd: float = 10.0,
    ):
        self.store = SessionStore(db_path)
        self.default_model = default_model
        self.daily_budget = daily_budget_usd
        self.tracker = CostTracker()
        self.active_debates: Dict[str, asyncio.Task] = {}
        self._daily_spend = 0.0
        self._budget_reset_date = datetime.now(timezone.utc).date()

    async def start(self):
        """Initialize the orchestrator (create DB tables)"""
        await self.store.init()

    def _check_budget(self, estimated_cost: float) -> bool:
        """Check if daily budget would be exceeded"""
        today = datetime.now(timezone.utc).date()
        if today != self._budget_reset_date:
            self._daily_spend = 0.0
            self._budget_reset_date = today
        return (self._daily_spend + estimated_cost) <= self.daily_budget

    def create_session(
        self,
        topic: str,
        mode: DebateMode = DebateMode.CONSENSUS,
        pro_model: str = None,
        con_model: str = None,
        judge_models: list = None,
        timeout_seconds: int = 300,
    ) -> DebateSession:
        """Create a new debate session"""
        return DebateSession(
            session_id=str(uuid.uuid4())[:8],
            topic=topic,
            mode=mode,
            pro_model=pro_model or self.default_model,
            con_model=con_model or self.default_model,
            judge_models=judge_models or [self.default_model],
            created_at=datetime.now(timezone.utc).isoformat(),
            timeout_seconds=timeout_seconds,
        )

    async def run_debate(
        self, session: DebateSession
    ) -> DebateSession:
        """
        Execute a full debate.
        Includes timeout protection, error recovery, and budget check.
        """
        start_time = time.time()

        # Budget check
        est_cost = self._estimate_session_cost(session)
        if not self._check_budget(est_cost):
            session.status = SessionStatus.FAILED
            session.error = (
                f"Exceeded daily budget (${self.daily_budget}). "
                f"Est. cost ${est_cost:.4f} + spent ${self._daily_spend:.4f}"
            )
            await self.store.save_session(session)
            return session

        session.status = SessionStatus.DEBATING
        await self.store.save_session(session)
        await self.store.log_event(
            session.session_id, EventType.DEBATE_STARTED,
            data={"mode": session.mode.value, "topic": session.topic}
        )

        try:
            result = await asyncio.wait_for(
                self._execute(session),
                timeout=session.timeout_seconds,
            )
            session.result = result
            session.status = SessionStatus.COMPLETED

        except asyncio.TimeoutError:
            session.status = SessionStatus.TIMED_OUT
            session.error = f"Debate timed out ({session.timeout_seconds}s)"

        except Exception as e:
            if session.retry_count < session.max_retries:
                session.retry_count += 1
                session.error = str(e)
                await self.store.save_session(session)
                await self.store.log_event(
                    session.session_id, EventType.RETRY,
                    data={"retry": session.retry_count,
                          "error": str(e)}
                )
                return await self.run_debate(session)

            session.status = SessionStatus.FAILED
            session.error = str(e)

        finally:
            session.completed_at = datetime.now(timezone.utc).isoformat()
            session.elapsed_seconds = round(time.time() - start_time, 2)
            total_cost = self.tracker.total_cost(session.costs)
            self._daily_spend += total_cost

            await self.store.save_session(session)
            final_event = (
                EventType.DEBATE_COMPLETED
                if session.status == SessionStatus.COMPLETED
                else EventType.DEBATE_FAILED
            )
            await self.store.log_event(
                session.session_id, final_event,
                data={
                    "status": session.status.value,
                    "elapsed": session.elapsed_seconds,
                    "cost": total_cost,
                    "retries": session.retry_count,
                }
            )

        return session

    async def _execute(self, session: DebateSession) -> Dict:
        """
        Core execution — dispatches to mode-specific runner.
        In production, this calls actual L1-L3 functions.
        """
        methods = {
            DebateMode.SIMPLE: self._run_simple,
            DebateMode.STRUCTURED: self._run_structured,
            DebateMode.CONSENSUS: self._run_consensus,
        }
        runner = methods.get(session.mode, self._run_consensus)
        return await runner(session)

    async def _run_simple(self, session: DebateSession) -> Dict:
        """L1: Free-form debate"""
        # Production code:
        # result = run_debate(
        #     topic=session.topic, rounds=3,
        #     pro_model=session.pro_model,
        #     con_model=session.con_model
        # )
        await self.store.log_event(
            session.session_id, EventType.ROUND_START,
            round_number=1
        )
        return {
            "mode": "simple", "topic": session.topic,
            "result": "L1 simple debate result placeholder",
            "rounds_completed": 3,
        }

    async def _run_structured(self, session: DebateSession) -> Dict:
        """L2: Structured debate + single judge"""
        # Production code:
        # result = run_structured_debate(
        #     topic=session.topic,
        #     pro_model=session.pro_model,
        #     con_model=session.con_model,
        #     judge_model=session.judge_models[0]
        # )
        session.status = SessionStatus.JUDGING
        await self.store.save_session(session)
        return {
            "mode": "structured", "topic": session.topic,
            "result": "L2 structured debate result placeholder",
            "trace_table": [],
        }

    async def _run_consensus(self, session: DebateSession) -> Dict:
        """L3: Multi-judge consensus debate"""
        # Production code:
        # pro_args = [...]
        # con_args = [...]
        # panel = MultiJudgePanel([
        #     JudgeProfile(name="Technical Judge",
        #                  domain=ExpertiseDomain.TECHNICAL),
        #     JudgeProfile(name="Business Judge",
        #                  domain=ExpertiseDomain.BUSINESS),
        #     JudgeProfile(name="Risk Judge",
        #                  domain=ExpertiseDomain.RISK),
        #     JudgeProfile(name="General Judge",
        #                  domain=ExpertiseDomain.GENERAL),
        # ])
        # result: PanelResult = panel.evaluate(
        #     topic=session.topic,
        #     pro_args=pro_args, con_args=con_args,
        #     pro_cross_text=..., con_cross_text=...,
        #     pro_closing=..., con_closing=...
        # )
        # return {
        #     "mode": "consensus",
        #     "alpha": result.alpha,
        #     "kappa": result.kappa,
        #     "weighted_pro": result.weighted_result["pro"],
        #     "weighted_con": result.weighted_result["con"],
        #     "irreconcilable": result.divergence["irreconcilable"],
        #     "recommendation": result.divergence["recommendation"],
        # }
        session.status = SessionStatus.JUDGING
        await self.store.save_session(session)
        return {
            "mode": "consensus", "topic": session.topic,
            "result": "L3 consensus debate result placeholder",
            "alpha": 0.78, "kappa": 0.72,
            "irreconcilable": False,
        }

    def _estimate_session_cost(self, session: DebateSession) -> float:
        """Estimate cost for a single debate session"""
        base_tokens = {
            DebateMode.SIMPLE:     8_000,
            DebateMode.STRUCTURED: 25_000,
            DebateMode.CONSENSUS:  60_000,
        }
        tokens = base_tokens.get(session.mode, 60_000)
        model = session.pro_model
        in_price, out_price = CostTracker.PRICING.get(
            model, (5.0, 15.0)
        )
        # Rough estimate: 60% input, 40% output
        return round(
            (tokens * 0.6 / 1_000_000) * in_price +
            (tokens * 0.4 / 1_000_000) * out_price, 4
        )

    # ── REST-ish API Methods ──

    async def api_create_debate(
        self, topic: str, mode: str = "consensus"
    ) -> Dict:
        """Create and asynchronously start a debate"""
        debate_mode = DebateMode(mode)
        session = self.create_session(topic=topic, mode=debate_mode)
        await self.store.save_session(session)

        # Execute in background
        task = asyncio.create_task(self.run_debate(session))
        self.active_debates[session.session_id] = task
        task.add_done_callback(
            lambda t: self.active_debates.pop(
                session.session_id, None
            )
        )

        return {
            "session_id": session.session_id,
            "status": "accepted",
            "mode": session.mode.value,
            "topic": session.topic,
            "created_at": session.created_at,
            "poll_url": f"/debates/{session.session_id}",
        }

    async def api_get_result(self, session_id: str) -> Dict:
        """Query debate status/result"""
        data = await self.store.get_session(session_id)
        if not data:
            return {"error": "Session not found"}
        return data

    async def api_estimate_cost(
        self, topic: str, mode: str = "consensus"
    ) -> Dict:
        """Estimate cost without executing the debate"""
        session = self.create_session(
            topic=topic, mode=DebateMode(mode)
        )
        cost = self._estimate_session_cost(session)
        return {
            "topic": topic, "mode": mode,
            "estimated_cost_usd": cost,
            "daily_budget_remaining": round(
                self.daily_budget - self._daily_spend, 4
            ),
        }

    async def api_get_metrics(self) -> Dict:
        """Get monitoring metrics"""
        sessions = await self.store.list_sessions(limit=200)
        total = len(sessions)
        if total == 0:
            return {"total_sessions": 0}

        completed = sum(
            1 for s in sessions
            if s["status"] == "completed"
        )
        failed = sum(
            1 for s in sessions
            if s["status"] == "failed"
        )
        timed_out = sum(
            1 for s in sessions
            if s["status"] == "timed_out"
        )
        times = [
            s["elapsed_seconds"] for s in sessions
            if s["elapsed_seconds"] and s["elapsed_seconds"] > 0
        ]
        avg_time = sum(times) / len(times) if times else 0
        sorted_times = sorted(times) if times else [0]

        return {
            "total_sessions": total,
            "completed": completed,
            "failed": failed,
            "timed_out": timed_out,
            "completion_rate_pct": round(
                completed / total * 100, 1
            ),
            "avg_duration_seconds": round(avg_time, 1),
            "p95_duration_seconds": round(
                sorted_times[
                    int(len(sorted_times) * 0.95)
                ] if len(sorted_times) >= 20
                else sorted_times[-1] if sorted_times else 0,
                1,
            ),
            "active_debates": len(self.active_debates),
            "daily_spend_usd": round(self._daily_spend, 4),
            "budget_remaining": round(
                self.daily_budget - self._daily_spend, 4
            ),
        }

    async def api_health(self) -> Dict:
        """Health check"""
        return {
            "status": "healthy",
            "active_debates": len(self.active_debates),
            "daily_spend": round(self._daily_spend, 4),
        }


# ══════════════════════════════════════════════
# 6. Quick Start Demo
# ══════════════════════════════════════════════

async def quick_start_demo():
    """Demonstrate how to launch and use the orchestrator"""

    orch = DebateOrchestrator(
        db_path="debate_sessions.db",
        default_model="gpt-4o",
        daily_budget_usd=10.0,
    )
    await orch.start()
    print("✅ Orchestrator started\n")

    # ── 1. Estimate cost ──
    est = await orch.api_estimate_cost(
        "Should a startup adopt microservices from day one?",
        mode="consensus"
    )
    print(f"💰 Cost estimate: ${est['estimated_cost_usd']}")
    print(f"   Daily budget remaining: ${est['daily_budget_remaining']}\n")

    # ── 2. Create and launch debate ──
    debate = await orch.api_create_debate(
        topic="Should a startup adopt microservices from day one?",
        mode="consensus"
    )
    print(f"🚀 Debate launched: {debate['session_id']} ({debate['mode']})")

    # ── 3. Poll for results ──
    for i in range(10):
        await asyncio.sleep(3)
        result = await orch.api_get_result(debate["session_id"])
        status = result.get("status", "unknown")
        print(f"  [{i+1}] Status: {status}")
        if status in ("completed", "failed", "timed_out"):
            print(f"  Elapsed: {result.get('elapsed_seconds', 0)}s")
            if "error" in result and result["error"]:
                print(f"  Error: {result['error']}")
            break

    # ── 4. View metrics ──
    metrics = await orch.api_get_metrics()
    print(f"\n📊 System Metrics:")
    print(f"  Total debates: {metrics['total_sessions']}")
    print(f"  Success rate: {metrics['completion_rate_pct']}%")
    print(f"  Avg duration: {metrics['avg_duration_seconds']}s")
    print(f"  Daily spend: ${metrics['daily_spend_usd']}")

    # ── 5. Health check ──
    health = await orch.api_health()
    print(f"Heartbeat: {health['status']}")


if __name__ == "__main__":
    print("=" * 60)
    print("Multi-Agent Debate System — Production Orchestrator")
    print("=" * 60)
    print()
    print("To run the quick-start demo (requires valid LLM API credentials):")
    print("  asyncio.run(quick_start_demo())")
    print()
    print("To deploy as a web service, wrap DebateOrchestrator methods")
    print("in FastAPI/Flask routes. Example:")
    print()
    print("  from fastapi import FastAPI")
    print("  app = FastAPI()")
    print("  orch = DebateOrchestrator()")
    print()
    print("  @app.post('/debates')")
    print("  async def create(topic: str, mode: str = 'consensus'):")
    print("      return await orch.api_create_debate(topic, mode)")
    print()
    print("  @app.get('/debates/{session_id}')")
    print("  async def get_result(session_id: str):")
    print("      return await orch.api_get_result(session_id)")
    print("=" * 60)

Code Structure Breakdown

Component Function Key Methods
DebateSession Debate session data model — full lifecycle state Fields: session_id, topic, mode, status, costs, result, error
SessionStore SQLite persistence + audit log init() / save_session() / log_event()
CostTracker Multi-model pricing table + token counting + cost estimation record_call() / total_cost()
with_retry() Exponential backoff retry with error-type differentiation Distinguishes transient, content filter, and auth errors
DebateOrchestrator Core orchestrator — session lifecycle + L1-L3 integration + REST API run_debate() / api_create_debate() / api_get_metrics()
💡 Key difference from prototype to production: Notice the three methods — _run_simple(), _run_structured(), _run_consensus() — they currently return placeholder data. For production deployment, simply uncomment the imports and calls to wire in the full L1-L3 logic. The orchestrator layer (timeout, retry, logging, state management) is fully decoupled from the debate logic (L1-L3).

Deployment Patterns

Pattern 1: Single-Machine (Starter)

All agents and judges use the same model (e.g., GPT-4o), running on a single server. Simplest — suitable for internal team decision-support tools.

Pattern 2: Multi-Model (Recommended)

Different roles use different model providers:

Role Recommended Model Reason
Pro Agent Claude 3.5 Sonnet Excels at building structured arguments with clear logic
Con Agent GPT-4o Excels at identifying flaws and raising counterexamples
Technical Judge Claude 3.5 Sonnet More precise on technical detail evaluation
Business Judge GPT-4o Stronger on business reasoning and data analysis
Risk Judge Gemini 2.0 Provides a different risk perspective, reducing homogeneous judgment

Multi-model deployment's core value isn't "pick the best model for everything" — it's using model diversity to reduce systematic bias — the same principle as L3's differentiated multi-judge design.

Pattern 3: Human-in-the-Loop (Hybrid)

For critical decisions (budget > $100k, legal/compliance implications, affecting many users), the debate system shouldn't auto-output the final conclusion. It should:

  1. Complete L3-level debate and consensus evaluation.
  2. If Alpha ≥ 0.80: auto-generate a decision recommendation, marked as "high confidence."
  3. If Alpha < 0.67 or irreconcilable divergence triggered: pause the pipeline, push the most divergent arguments and judge commentary to a human decision-maker.
  4. The human decision-maker makes the final judgment based on the AI-provided structured divergence summary — but what they see isn't the raw debate transcript; it's a divergence heatmap already curated by the AI judge panel.
⚠️ Human-in-the-loop trap: Don't treat the human decision-maker as the "final judge" — this creates the illusion of "I decide in the end anyway, the AI analysis is just reference," leading to insufficient review. The right approach: the human reviews only what the AI couldn't reach consensus on, not what the AI already agreed on.

Key Insight: The Debate System Is an Information-Processing Pipeline

If you take away one core understanding from this article, let it be:

A production-grade debate system is not code — it's an information-processing pipeline where every stage must be observable, fault-tolerant, and cost-controlled.

Specifically:

  1. Observable: Every debate round, every LLM call, every judge score has a timestamp and audit record. You can trace exactly how any decision was made. When someone asks "why did the AI reach this conclusion," you don't need to say "the model said so" — you can show them the complete debate transcript and judge scorecards.
  2. Fault-tolerant: LLM calls will fail, time out, and return malformed content. Every stage of the pipeline has independent error handling — not "the entire debate failed," but "one argument in this round was degraded."
  3. Cost-controlled: Not every question needs L3-level consensus debate. Debate mode tiering + daily budget cap + model sharding ensure you get high-quality decisions without going bankrupt.

When you get these three right, the debate system transforms from "an interesting AI experiment" into "decision infrastructure an organization can depend on."

Series Retrospective

This is the fourth and final article in the Multi-Agent Debate series. A look back at the journey:

Article Title Core Contribution Output
L1 Why Debate Beats a Single Answer Revealed single-model cognitive biases (confirmation bias, anchoring, overconfidence), proved value of adversarial collaboration debate.py — dual-agent free-form debate
L2 Structured Debate Protocol Designed 3-round debate protocol (Opening → Cross-Exam → Closing), introduced multi-dimensional scoring and argument trace table debate_protocol.py — structured debate + judge agent
L3 Debate Scoring & Consensus Multi-judge expert panel, score calibration, weighted voting, Krippendorff Alpha + Fleiss Kappa consensus metrics debate_consensus.py — multi-judge consensus system
L4 Production Deployment (this article) Wrapped L1-L3 into a deployable production service: async orchestration, session store, error recovery, cost control, monitoring debate_orchestrator.py — production orchestrator

From L1 to L4: An Arc of Thinking

Looking back, this series follows a natural progression:

This "why → how → self-question → land" arc applies not just to debate systems — it applies to any journey from AI prototype to AI product.

Open Questions

Even after four articles, we still have important unsolved problems — they're beyond this series' scope but worth pondering in your own practice:

  1. Automatic topic discovery: Currently, topics are human-provided. A truly autonomous debate system should automatically identify "contentious issues worth debating" from data streams. This requires combining anomaly detection and controversy mining.
  2. Cross-debate knowledge accumulation: Each debate is siloed. But "microservices operational cost" comes up repeatedly across debates — the system should accumulate knowledge across sessions, forming a "controversy knowledge graph."
  3. Debate strategy evolution: Currently, Pro and Con have fixed prompts. But if Pro always loses on the same argument (e.g., "operational cost"), the system should automatically adjust Pro's strategy on that point.
  4. Real-time debate intervention: In streaming mode, a human observer could inject new evidence or questions mid-debate. This requires designing an elegant "human intervention protocol."

Key Takeaways

  1. Debate systems are productionizable: With an async orchestrator, session store, error recovery, and cost control, L1-L3's debate capabilities can be packaged into a reliable production service for daily team use.
  2. Observability is the foundation of trust: When you can precisely trace "why the system reached this conclusion" — not "the model said" but "Judges A, B, and C evaluated which arguments how" — the debate system transforms from a black box into a trusted decision tool.
  3. Budget control is not optional: In production, LLM costs are real and ongoing. Three-layer cost control — topic tiering, daily budget caps, and model sharding — lets you improve decision quality without losing control of costs.
  4. Deployment pattern determines system quality: Single-model deployment is simple but introduces systematic bias; multi-model deployment improves robustness through diversity; human-in-the-loop preserves human final judgment for critical decisions.
  5. A debate system is an information-processing pipeline: Carve this into the first page of your project docs — it reminds your team that you're building not just another LLM app, but a complex information-processing system where every stage must be monitored, fault-tolerant, and cost-managed.

📎 Series note: This is the final article (4 of 4) in the Multi-Agent Debate series. Recommended reading order: L1: Adversarial Collaboration IntroL2: Structured Debate ProtocolL3: Debate Scoring & Consensus → This article (L4).

🏁 Series complete. Return to AI Agent Exploration for more articles.