We've come a long way.
In L1, you made two agents challenge each other — a simple idea, but already more reliable than a single answer. L2 gave it structure — a three-round protocol, multi-dimensional scoring, an argument trace table — turning "free debate" into "auditable debate." L3 tackled the hardest problem — what if the judge itself is unreliable — introducing a multi-judge expert panel, score calibration, Krippendorff's Alpha, and Fleiss' Kappa consensus metrics.
But all of this was scripts. You ran python debate_consensus.py in a terminal, watched it print a conclusion, then closed the terminal. That's not a product — that's a prototype.
This article's goal: turn everything from L1-L3 into a reliable service that can be deployed to production and depended on by real teams.
This isn't about writing more debate logic — it's about async orchestration, session management, error recovery, cost control, observability. In other words: it's about turning research code into engineering systems.
Before diving into architecture, let's answer the fundamental question: what real business scenarios would genuinely need a multi-agent debate system running in production?
An investment team needs to evaluate dozens of market signals daily. Traditionally, analysts read each one and form judgments. A debate system replaces this with:
The debate result isn't "buy" or "sell" — it's a structured summary of key disagreements: which arguments both sides agree on (low divergence), which have fundamental disagreement (high divergence). Analysts don't need to accept AI conclusions wholesale — they only need to focus on "issues the AI itself can't reach consensus on."
An engineering team faces a technology choice — "monolith or microservices," "PostgreSQL or MongoDB," "build or buy." The traditional approach is hours of meetings, with decision quality heavily dependent on the loudest voice in the room.
A debate system can:
The meeting shifts from "should we use microservices" to "targeted discussion on the three core divergence points identified by AI" — radically different efficiency.
For regulated industries (finance, healthcare, privacy), every policy change can trigger dozens of regulatory constraints. Traditionally, legal and compliance teams review each one. A debate system can:
A platform's content moderation system automatically flags user content. The user appeals. Traditionally, a human reviewer re-examines — but this doesn't scale.
A debate system can have two agents debate: one representing platform moderation standards (defending the original decision), one representing the user (making an interpretive defense of the content). The judge panel scores against platform content policies. If the debate is clear (Alpha ≥ 0.80), auto-resolve. If highly divergent (Alpha < 0.50), escalate to human review.
| Use Case | Debate Mode | Judge Panel | Key Production Req. |
|---|---|---|---|
| Market Analysis | L2 Structured | Tech + Business + Risk | Low latency, high throughput |
| Tech Decision Review | L3 Consensus | Tech + Business + General | Auditable, human fallback |
| Compliance Evaluation | L3 Consensus | Risk + Tech + General | Audit trail, immutability |
| Content Appeals | L2/L3 Hybrid | General + Risk | Large scale, auto-escalation |
A production-grade debate involves multiple LLM calls, multi-round orchestration, state persistence, and error recovery. It's not a Python script — it's an information-processing pipeline.
| Component | Responsibility | Tech Choice |
|---|---|---|
| Debate Orchestrator | Manage debate session lifecycle: create → execute → complete | asyncio + coroutines |
| Session Store | Persist all debate state, intermediate results, final conclusions | SQLite (startup) / PostgreSQL (scale) |
| Audit Log | Record complete timeline of each round, each agent call, each judge score | Structured log table + JSON columns |
| Cost Tracker | Real-time token tracking and cost estimation, with budget alerts | Token counting + model pricing table |
| LLM Gateway | Unified LLM call interface, multi-model support, load balancing, rate limiting | OpenAI SDK + retry middleware |
| Monitoring Dashboard | Debate success rate, avg duration, cost trends, Alpha distribution | Metrics collection + visualization |
Debate system LLM calls are inherently I/O-bound — waiting for API responses dominates CPU time. Asynchronous programming is therefore mandatory, not optional.
Core orchestration flow:
async def run_debate_pipeline(session):
"""Async orchestration flow for a complete debate"""
with session_timeout(300): # 5-minute global timeout
# Phase 1: Opening arguments — parallelizable
pro_args, con_args = await asyncio.gather(
pro_agent.generate_opening(topic),
con_agent.generate_opening(topic)
)
# Phase 2: Cross-examination — sequential (depends on opponent)
pro_cross = await pro_agent.cross_examine(con_args)
con_cross = await con_agent.cross_examine(pro_args)
# Phase 3: Closing — parallelizable
pro_close, con_close = await asyncio.gather(
pro_agent.closing_statement(con_cross),
con_agent.closing_statement(pro_cross)
)
# Phase 4: Judge evaluation — fully parallel (independent)
judge_results = await asyncio.gather(*[
judge.evaluate(transcript)
for judge in judge_panel
])
# Phase 5: Consensus calc — CPU-bound, no async needed
consensus = compute_consensus(judge_results)
return DebateResult(pro_args, con_args, consensus)
Key design decisions:
Debates aren't instantaneous. An L3 consensus debate from creation to conclusion can take 30-60 seconds. During this time, callers need non-blocking status queries.
Debate session state machine:
| Status | Meaning | Transitions To |
|---|---|---|
CREATED |
Session created, execution not yet started | DEBATING, FAILED |
DEBATING |
Debate flow in progress (agents interacting) | JUDGING, FAILED, TIMED_OUT |
JUDGING |
Debate complete, judge evaluation in progress | COMPLETED, FAILED |
COMPLETED |
Successfully completed, results stored | (terminal) |
FAILED |
Execution failed (API error, parse error, etc.) | DEBATING (retry) |
TIMED_OUT |
Exceeded global timeout limit | (terminal) |
Every state change is written to the audit log. If a debate fails, you can precisely reconstruct from the logs: at which phase, which agent, which API call went wrong.
Debate systems have significant redundant computation opportunities:
| Cache Tier | Content | TTL | Storage |
|---|---|---|---|
| L1: Session result cache | Full debate result (session_id → result) | 1 hour | In-memory LRU |
| L2: Topic hash cache | Normalized topic hash → session_id | 1 hour | Redis / SQLite |
| L3: Judge eval cache | (debate transcript hash + judge config hash) → eval results | 24 hours | SQLite |
Any agent call that doesn't depend on another's output should be parallel. Specific rules:
| Phase | Parallel Strategy | Latency Saved |
|---|---|---|
| Opening (both sides) | asyncio.gather(pro_opening, con_opening) |
~50% |
| Cross-examination | Sequential (data dependency) | N/A |
| Closing (both sides) | asyncio.gather(pro_close, con_close) |
~50% |
| Judging (N judges) | asyncio.gather(*judges) |
~75% (for 4 judges) |
| Consensus computation | Synchronous (CPU-bound) | N/A |
Two operating modes for different scenarios:
| Mode | Behavior | Use Case |
|---|---|---|
| Streaming (sync wait) | User submits topic, blocks until full result returns | Interactive analysis (user initiates debate from dashboard) |
| Batch (async submit) | User submits topic, immediately returns session_id, poll or webhook for result | Scheduled tasks (daily market analysis), large-scale batch evaluation |
Streaming mode requires SSE (Server-Sent Events) to push real-time progress for each debate phase — this not only improves UX but allows users to see key arguments mid-debate and intervene early.
Running a debate system in production means you need to know roughly how much you'll spend before calling the LLM.
| Debate Mode | LLM Calls | Est. Tokens | Est. Cost (GPT-4o) | Est. Cost (DeepSeek) |
|---|---|---|---|---|
| L1 Simple Debate | ~6 | ~8,000 | $0.04 | $0.002 |
| L2 Structured Debate | ~12 | ~25,000 | $0.15 | $0.007 |
| L3 Consensus (4 judges) | ~20 | ~60,000 | $0.40 | $0.02 |
Recommended cost control strategies:
DebateMode.SIMPLE for rapid exploration; only escalate truly critical questions to CONSENSUS.A debate system involves 6-20 LLM API calls. Every single one can fail. You need error classification and differentiated handling:
| Error Type | Example | Strategy | Max Retries |
|---|---|---|---|
| Transient API Error | 429 (rate limit), 503 (unavailable) | Exponential backoff: 1s, 2s, 4s, 8s | 4 |
| Content Filter Error | API refuses to generate (safety filter) | Mark argument as "filtered," continue with remaining | 0 (no retry) |
| JSON Parse Failure | LLM output doesn't match expected JSON format | Retry with stricter format prompt | 2 |
| Timeout Error | Single call exceeds 60s with no response | Cancel call, retry once | 1 |
| Authentication Error | 401 (invalid API key) | No retry — alert immediately | 0 |
Three-layer timeout protection:
# Per-layer timeout config
TIMEOUT_SINGLE_CALL = 60 # Single LLM API call
TIMEOUT_PER_PHASE = 120 # Single debate phase (e.g., cross-exam)
TIMEOUT_GLOBAL_SESSION = 300 # Entire debate
If a single agent call times out, the system can:
A production debate system needs monitoring far beyond "API call success/failure":
| Metric Category | Specific Metrics | Suggested Alert Threshold |
|---|---|---|
| Availability | Debate success rate, failure rate, timeout rate | Success rate < 95% alert |
| Performance | P50/P95/P99 debate duration, per-phase time distribution | P95 > 120s alert |
| Cost | Daily/per-debate/per-user cost, token consumption trends | Daily cost > 80% budget alert |
| Consensus Quality | Alpha/Kappa distribution, irreconcilable divergence rate | Irreconcilable > 30% investigate |
| Judge Health | Per-judge score mean, std dev, deviation from other judges | Single judge sustained deviation > 2σ alert |
| Model Availability | Per-model error rate, latency, rate limit trigger frequency | Single model error rate > 10% switch |
The code below wraps all L1-L3 components into a deployable production service. Core components:
DebateOrchestrator): Manages debate session lifecycle, controls parallel/serial execution flow.SessionStore): SQLite persistence with session and audit log tables.CostTracker): Real-time token counting and cost estimation.Save as debate_orchestrator.py, in the same directory as the L1-L3 files.
"""
Production-Grade Multi-Agent Debate Orchestrator
────────────────────────────────────────────────
Wraps L1 (debate.py), L2 (debate_protocol.py), L3 (debate_consensus.py)
into a deployable production service. Provides:
- Async debate execution (asyncio)
- Session state persistence (SQLite)
- Audit logging
- Cost tracking & budget control
- Error recovery (retry + timeout + degradation)
- REST-ish API interface
- Monitoring metrics
Requires: pip install openai aiosqlite
"""
import asyncio
import hashlib
import json
import math
import os
import time
import uuid
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
import aiosqlite
from openai import AsyncOpenAI
# ──────────────────────────────────────────────
# Import L1-L3 components (assumes same directory)
# ──────────────────────────────────────────────
# from debate import SimpleDebateAgent, run_debate
# from debate_protocol import (
# Argument, StructuredDebateAgent, StructuredJudge,
# run_structured_debate
# )
# from debate_consensus import (
# JudgeProfile, ExpertiseDomain, MultiJudgePanel,
# ScoreCalibrator, WeightedVoter, ConsensusCalculator,
# PanelResult, run_consensus_debate
# )
# ──────────────────────────────────────────────
# Client Configuration
# ──────────────────────────────────────────────
async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.example.com/v1",
timeout=60.0,
max_retries=2,
)
# ══════════════════════════════════════════════
# 1. Enums & Data Classes
# ══════════════════════════════════════════════
class DebateMode(str, Enum):
SIMPLE = "simple" # L1: Free-form debate
STRUCTURED = "structured" # L2: Structured protocol
CONSENSUS = "consensus" # L3: Multi-judge consensus
class SessionStatus(str, Enum):
CREATED = "created"
DEBATING = "debating"
JUDGING = "judging"
COMPLETED = "completed"
FAILED = "failed"
TIMED_OUT = "timed_out"
class EventType(str, Enum):
DEBATE_STARTED = "debate_started"
ROUND_START = "round_start"
ROUND_COMPLETE = "round_complete"
LLM_CALL = "llm_call"
LLM_ERROR = "llm_error"
RETRY = "retry"
JUDGE_SCORE = "judge_score"
CONSENSUS_CALC = "consensus_calc"
DEBATE_COMPLETED = "debate_completed"
DEBATE_FAILED = "debate_failed"
@dataclass
class CostRecord:
"""Per-model cost tracking record"""
model: str
prompt_tokens: int = 0
completion_tokens: int = 0
call_count: int = 0
estimated_cost_usd: float = 0.0
@dataclass
class DebateSession:
"""Debate session — full lifecycle data"""
session_id: str
topic: str
mode: DebateMode = DebateMode.CONSENSUS
status: SessionStatus = SessionStatus.CREATED
pro_model: str = "gpt-4o"
con_model: str = "gpt-4o"
judge_models: list = field(default_factory=lambda: ["gpt-4o"])
created_at: str = ""
completed_at: str = ""
elapsed_seconds: float = 0.0
costs: Dict[str, CostRecord] = field(default_factory=dict)
result: Optional[Dict] = None
error: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
timeout_seconds: int = 300
# ══════════════════════════════════════════════
# 2. Session Store (SQLite)
# ══════════════════════════════════════════════
class SessionStore:
"""Persistent debate session storage + audit log"""
def __init__(self, db_path: str = "debate_sessions.db"):
self.db_path = db_path
async def init(self):
"""Initialize database tables"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
mode TEXT NOT NULL DEFAULT 'consensus',
status TEXT NOT NULL DEFAULT 'created',
pro_model TEXT DEFAULT 'gpt-4o',
con_model TEXT DEFAULT 'gpt-4o',
judge_models TEXT DEFAULT '["gpt-4o"]',
created_at TEXT NOT NULL,
completed_at TEXT,
elapsed_seconds REAL DEFAULT 0,
costs_json TEXT DEFAULT '{}',
result_json TEXT,
error TEXT,
retry_count INTEGER DEFAULT 0
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
event_type TEXT NOT NULL,
agent_name TEXT,
round_number INTEGER,
timestamp TEXT NOT NULL,
data_json TEXT,
FOREIGN KEY (session_id)
REFERENCES sessions(session_id)
)
""")
# Query indexes
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_audit_session "
"ON audit_log(session_id)"
)
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_status "
"ON sessions(status)"
)
await db.commit()
async def save_session(self, session: DebateSession):
"""Save or update a session"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT OR REPLACE INTO sessions
(session_id, topic, mode, status, pro_model, con_model,
judge_models, created_at, completed_at, elapsed_seconds,
costs_json, result_json, error, retry_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
session.session_id, session.topic,
session.mode.value, session.status.value,
session.pro_model, session.con_model,
json.dumps(session.judge_models),
session.created_at, session.completed_at,
session.elapsed_seconds,
json.dumps(
{k: asdict(v) for k, v in session.costs.items()}
),
json.dumps(session.result, ensure_ascii=False)
if session.result else None,
session.error, session.retry_count,
))
await db.commit()
async def get_session(self, session_id: str) -> Optional[Dict]:
"""Query a single session"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM sessions WHERE session_id = ?",
(session_id,)
)
row = await cursor.fetchone()
return dict(row) if row else None
async def list_sessions(
self, limit: int = 20, status: str = None
) -> list:
"""List recent sessions"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
if status:
cursor = await db.execute(
"SELECT * FROM sessions WHERE status = ? "
"ORDER BY created_at DESC LIMIT ?",
(status, limit)
)
else:
cursor = await db.execute(
"SELECT * FROM sessions "
"ORDER BY created_at DESC LIMIT ?",
(limit,)
)
return [dict(row) for row in await cursor.fetchall()]
async def log_event(
self, session_id: str, event_type: EventType,
agent_name: str = None, round_number: int = None,
data: Dict = None
):
"""Write an audit event"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO audit_log
(session_id, event_type, agent_name, round_number,
timestamp, data_json)
VALUES (?, ?, ?, ?, ?, ?)
""", (
session_id, event_type.value, agent_name,
round_number,
datetime.now(timezone.utc).isoformat(),
json.dumps(data, ensure_ascii=False) if data else None,
))
await db.commit()
# ══════════════════════════════════════════════
# 3. Cost Tracker
# ══════════════════════════════════════════════
class CostTracker:
"""Token counting + cost estimation (approximate pricing for budgeting)"""
# $/1M tokens (input, output)
PRICING: Dict[str, tuple] = {
"gpt-4o": (2.50, 10.00),
"gpt-4o-mini": (0.15, 0.60),
"gpt-4-turbo": (10.00, 30.00),
"claude-3-opus": (15.00, 75.00),
"claude-3-sonnet": (3.00, 15.00),
"deepseek-chat": (0.14, 0.28),
"deepseek-reasoner": (0.55, 2.19),
}
@classmethod
def estimate_cost(
cls, model: str, prompt_tokens: int, completion_tokens: int
) -> float:
"""Estimate cost for a single LLM call"""
in_price, out_price = cls.PRICING.get(model, (5.0, 15.0))
cost = (
(prompt_tokens / 1_000_000) * in_price +
(completion_tokens / 1_000_000) * out_price
)
return round(cost, 6)
@classmethod
def record_call(
cls, costs: Dict[str, CostRecord], model: str,
prompt_tokens: int, completion_tokens: int
):
"""Record one LLM call into the costs dict"""
if model not in costs:
costs[model] = CostRecord(model=model)
c = costs[model]
c.prompt_tokens += prompt_tokens
c.completion_tokens += completion_tokens
c.call_count += 1
c.estimated_cost_usd += cls.estimate_cost(
model, prompt_tokens, completion_tokens
)
@classmethod
def total_cost(cls, costs: Dict[str, CostRecord]) -> float:
"""Total estimated cost"""
return round(
sum(c.estimated_cost_usd for c in costs.values()), 4
)
# ══════════════════════════════════════════════
# 4. Error Handling & Retry
# ══════════════════════════════════════════════
async def with_retry(
fn: Callable,
session: DebateSession,
store: SessionStore,
label: str,
max_retries: int = 3,
base_delay: float = 1.0,
):
"""
Exponential backoff retry wrapper for LLM calls.
Differentiates error types for appropriate retry strategy.
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return await fn()
except Exception as e:
last_error = e
error_str = str(e).lower()
# Non-retryable errors
if "401" in error_str or "403" in error_str:
await store.log_event(
session.session_id, EventType.LLM_ERROR,
agent_name=label,
data={"attempt": attempt + 1, "error": str(e),
"fatal": True}
)
raise
# Content filter — no retry
if "content_filter" in error_str or "safety" in error_str:
await store.log_event(
session.session_id, EventType.LLM_ERROR,
agent_name=label,
data={"attempt": attempt + 1, "error": str(e),
"type": "content_filter"}
)
return "[Content filtered by safety policy]"
if attempt >= max_retries:
break
delay = base_delay * (2 ** attempt)
await store.log_event(
session.session_id, EventType.RETRY,
agent_name=label,
data={"attempt": attempt + 1, "error": str(e),
"delay_seconds": delay}
)
await asyncio.sleep(delay)
raise last_error
# ══════════════════════════════════════════════
# 5. Debate Orchestrator (Core)
# ══════════════════════════════════════════════
class DebateOrchestrator:
"""
Production-grade debate orchestrator.
Manages session lifecycle, async execution, error recovery, cost tracking.
"""
def __init__(
self,
db_path: str = "debate_sessions.db",
default_model: str = "gpt-4o",
daily_budget_usd: float = 10.0,
):
self.store = SessionStore(db_path)
self.default_model = default_model
self.daily_budget = daily_budget_usd
self.tracker = CostTracker()
self.active_debates: Dict[str, asyncio.Task] = {}
self._daily_spend = 0.0
self._budget_reset_date = datetime.now(timezone.utc).date()
async def start(self):
"""Initialize the orchestrator (create DB tables)"""
await self.store.init()
def _check_budget(self, estimated_cost: float) -> bool:
"""Check if daily budget would be exceeded"""
today = datetime.now(timezone.utc).date()
if today != self._budget_reset_date:
self._daily_spend = 0.0
self._budget_reset_date = today
return (self._daily_spend + estimated_cost) <= self.daily_budget
def create_session(
self,
topic: str,
mode: DebateMode = DebateMode.CONSENSUS,
pro_model: str = None,
con_model: str = None,
judge_models: list = None,
timeout_seconds: int = 300,
) -> DebateSession:
"""Create a new debate session"""
return DebateSession(
session_id=str(uuid.uuid4())[:8],
topic=topic,
mode=mode,
pro_model=pro_model or self.default_model,
con_model=con_model or self.default_model,
judge_models=judge_models or [self.default_model],
created_at=datetime.now(timezone.utc).isoformat(),
timeout_seconds=timeout_seconds,
)
async def run_debate(
self, session: DebateSession
) -> DebateSession:
"""
Execute a full debate.
Includes timeout protection, error recovery, and budget check.
"""
start_time = time.time()
# Budget check
est_cost = self._estimate_session_cost(session)
if not self._check_budget(est_cost):
session.status = SessionStatus.FAILED
session.error = (
f"Exceeded daily budget (${self.daily_budget}). "
f"Est. cost ${est_cost:.4f} + spent ${self._daily_spend:.4f}"
)
await self.store.save_session(session)
return session
session.status = SessionStatus.DEBATING
await self.store.save_session(session)
await self.store.log_event(
session.session_id, EventType.DEBATE_STARTED,
data={"mode": session.mode.value, "topic": session.topic}
)
try:
result = await asyncio.wait_for(
self._execute(session),
timeout=session.timeout_seconds,
)
session.result = result
session.status = SessionStatus.COMPLETED
except asyncio.TimeoutError:
session.status = SessionStatus.TIMED_OUT
session.error = f"Debate timed out ({session.timeout_seconds}s)"
except Exception as e:
if session.retry_count < session.max_retries:
session.retry_count += 1
session.error = str(e)
await self.store.save_session(session)
await self.store.log_event(
session.session_id, EventType.RETRY,
data={"retry": session.retry_count,
"error": str(e)}
)
return await self.run_debate(session)
session.status = SessionStatus.FAILED
session.error = str(e)
finally:
session.completed_at = datetime.now(timezone.utc).isoformat()
session.elapsed_seconds = round(time.time() - start_time, 2)
total_cost = self.tracker.total_cost(session.costs)
self._daily_spend += total_cost
await self.store.save_session(session)
final_event = (
EventType.DEBATE_COMPLETED
if session.status == SessionStatus.COMPLETED
else EventType.DEBATE_FAILED
)
await self.store.log_event(
session.session_id, final_event,
data={
"status": session.status.value,
"elapsed": session.elapsed_seconds,
"cost": total_cost,
"retries": session.retry_count,
}
)
return session
async def _execute(self, session: DebateSession) -> Dict:
"""
Core execution — dispatches to mode-specific runner.
In production, this calls actual L1-L3 functions.
"""
methods = {
DebateMode.SIMPLE: self._run_simple,
DebateMode.STRUCTURED: self._run_structured,
DebateMode.CONSENSUS: self._run_consensus,
}
runner = methods.get(session.mode, self._run_consensus)
return await runner(session)
async def _run_simple(self, session: DebateSession) -> Dict:
"""L1: Free-form debate"""
# Production code:
# result = run_debate(
# topic=session.topic, rounds=3,
# pro_model=session.pro_model,
# con_model=session.con_model
# )
await self.store.log_event(
session.session_id, EventType.ROUND_START,
round_number=1
)
return {
"mode": "simple", "topic": session.topic,
"result": "L1 simple debate result placeholder",
"rounds_completed": 3,
}
async def _run_structured(self, session: DebateSession) -> Dict:
"""L2: Structured debate + single judge"""
# Production code:
# result = run_structured_debate(
# topic=session.topic,
# pro_model=session.pro_model,
# con_model=session.con_model,
# judge_model=session.judge_models[0]
# )
session.status = SessionStatus.JUDGING
await self.store.save_session(session)
return {
"mode": "structured", "topic": session.topic,
"result": "L2 structured debate result placeholder",
"trace_table": [],
}
async def _run_consensus(self, session: DebateSession) -> Dict:
"""L3: Multi-judge consensus debate"""
# Production code:
# pro_args = [...]
# con_args = [...]
# panel = MultiJudgePanel([
# JudgeProfile(name="Technical Judge",
# domain=ExpertiseDomain.TECHNICAL),
# JudgeProfile(name="Business Judge",
# domain=ExpertiseDomain.BUSINESS),
# JudgeProfile(name="Risk Judge",
# domain=ExpertiseDomain.RISK),
# JudgeProfile(name="General Judge",
# domain=ExpertiseDomain.GENERAL),
# ])
# result: PanelResult = panel.evaluate(
# topic=session.topic,
# pro_args=pro_args, con_args=con_args,
# pro_cross_text=..., con_cross_text=...,
# pro_closing=..., con_closing=...
# )
# return {
# "mode": "consensus",
# "alpha": result.alpha,
# "kappa": result.kappa,
# "weighted_pro": result.weighted_result["pro"],
# "weighted_con": result.weighted_result["con"],
# "irreconcilable": result.divergence["irreconcilable"],
# "recommendation": result.divergence["recommendation"],
# }
session.status = SessionStatus.JUDGING
await self.store.save_session(session)
return {
"mode": "consensus", "topic": session.topic,
"result": "L3 consensus debate result placeholder",
"alpha": 0.78, "kappa": 0.72,
"irreconcilable": False,
}
def _estimate_session_cost(self, session: DebateSession) -> float:
"""Estimate cost for a single debate session"""
base_tokens = {
DebateMode.SIMPLE: 8_000,
DebateMode.STRUCTURED: 25_000,
DebateMode.CONSENSUS: 60_000,
}
tokens = base_tokens.get(session.mode, 60_000)
model = session.pro_model
in_price, out_price = CostTracker.PRICING.get(
model, (5.0, 15.0)
)
# Rough estimate: 60% input, 40% output
return round(
(tokens * 0.6 / 1_000_000) * in_price +
(tokens * 0.4 / 1_000_000) * out_price, 4
)
# ── REST-ish API Methods ──
async def api_create_debate(
self, topic: str, mode: str = "consensus"
) -> Dict:
"""Create and asynchronously start a debate"""
debate_mode = DebateMode(mode)
session = self.create_session(topic=topic, mode=debate_mode)
await self.store.save_session(session)
# Execute in background
task = asyncio.create_task(self.run_debate(session))
self.active_debates[session.session_id] = task
task.add_done_callback(
lambda t: self.active_debates.pop(
session.session_id, None
)
)
return {
"session_id": session.session_id,
"status": "accepted",
"mode": session.mode.value,
"topic": session.topic,
"created_at": session.created_at,
"poll_url": f"/debates/{session.session_id}",
}
async def api_get_result(self, session_id: str) -> Dict:
"""Query debate status/result"""
data = await self.store.get_session(session_id)
if not data:
return {"error": "Session not found"}
return data
async def api_estimate_cost(
self, topic: str, mode: str = "consensus"
) -> Dict:
"""Estimate cost without executing the debate"""
session = self.create_session(
topic=topic, mode=DebateMode(mode)
)
cost = self._estimate_session_cost(session)
return {
"topic": topic, "mode": mode,
"estimated_cost_usd": cost,
"daily_budget_remaining": round(
self.daily_budget - self._daily_spend, 4
),
}
async def api_get_metrics(self) -> Dict:
"""Get monitoring metrics"""
sessions = await self.store.list_sessions(limit=200)
total = len(sessions)
if total == 0:
return {"total_sessions": 0}
completed = sum(
1 for s in sessions
if s["status"] == "completed"
)
failed = sum(
1 for s in sessions
if s["status"] == "failed"
)
timed_out = sum(
1 for s in sessions
if s["status"] == "timed_out"
)
times = [
s["elapsed_seconds"] for s in sessions
if s["elapsed_seconds"] and s["elapsed_seconds"] > 0
]
avg_time = sum(times) / len(times) if times else 0
sorted_times = sorted(times) if times else [0]
return {
"total_sessions": total,
"completed": completed,
"failed": failed,
"timed_out": timed_out,
"completion_rate_pct": round(
completed / total * 100, 1
),
"avg_duration_seconds": round(avg_time, 1),
"p95_duration_seconds": round(
sorted_times[
int(len(sorted_times) * 0.95)
] if len(sorted_times) >= 20
else sorted_times[-1] if sorted_times else 0,
1,
),
"active_debates": len(self.active_debates),
"daily_spend_usd": round(self._daily_spend, 4),
"budget_remaining": round(
self.daily_budget - self._daily_spend, 4
),
}
async def api_health(self) -> Dict:
"""Health check"""
return {
"status": "healthy",
"active_debates": len(self.active_debates),
"daily_spend": round(self._daily_spend, 4),
}
# ══════════════════════════════════════════════
# 6. Quick Start Demo
# ══════════════════════════════════════════════
async def quick_start_demo():
"""Demonstrate how to launch and use the orchestrator"""
orch = DebateOrchestrator(
db_path="debate_sessions.db",
default_model="gpt-4o",
daily_budget_usd=10.0,
)
await orch.start()
print("✅ Orchestrator started\n")
# ── 1. Estimate cost ──
est = await orch.api_estimate_cost(
"Should a startup adopt microservices from day one?",
mode="consensus"
)
print(f"💰 Cost estimate: ${est['estimated_cost_usd']}")
print(f" Daily budget remaining: ${est['daily_budget_remaining']}\n")
# ── 2. Create and launch debate ──
debate = await orch.api_create_debate(
topic="Should a startup adopt microservices from day one?",
mode="consensus"
)
print(f"🚀 Debate launched: {debate['session_id']} ({debate['mode']})")
# ── 3. Poll for results ──
for i in range(10):
await asyncio.sleep(3)
result = await orch.api_get_result(debate["session_id"])
status = result.get("status", "unknown")
print(f" [{i+1}] Status: {status}")
if status in ("completed", "failed", "timed_out"):
print(f" Elapsed: {result.get('elapsed_seconds', 0)}s")
if "error" in result and result["error"]:
print(f" Error: {result['error']}")
break
# ── 4. View metrics ──
metrics = await orch.api_get_metrics()
print(f"\n📊 System Metrics:")
print(f" Total debates: {metrics['total_sessions']}")
print(f" Success rate: {metrics['completion_rate_pct']}%")
print(f" Avg duration: {metrics['avg_duration_seconds']}s")
print(f" Daily spend: ${metrics['daily_spend_usd']}")
# ── 5. Health check ──
health = await orch.api_health()
print(f"Heartbeat: {health['status']}")
if __name__ == "__main__":
print("=" * 60)
print("Multi-Agent Debate System — Production Orchestrator")
print("=" * 60)
print()
print("To run the quick-start demo (requires valid LLM API credentials):")
print(" asyncio.run(quick_start_demo())")
print()
print("To deploy as a web service, wrap DebateOrchestrator methods")
print("in FastAPI/Flask routes. Example:")
print()
print(" from fastapi import FastAPI")
print(" app = FastAPI()")
print(" orch = DebateOrchestrator()")
print()
print(" @app.post('/debates')")
print(" async def create(topic: str, mode: str = 'consensus'):")
print(" return await orch.api_create_debate(topic, mode)")
print()
print(" @app.get('/debates/{session_id}')")
print(" async def get_result(session_id: str):")
print(" return await orch.api_get_result(session_id)")
print("=" * 60)
| Component | Function | Key Methods |
|---|---|---|
DebateSession |
Debate session data model — full lifecycle state | Fields: session_id, topic, mode, status, costs, result, error |
SessionStore |
SQLite persistence + audit log | init() / save_session() / log_event() |
CostTracker |
Multi-model pricing table + token counting + cost estimation | record_call() / total_cost() |
with_retry() |
Exponential backoff retry with error-type differentiation | Distinguishes transient, content filter, and auth errors |
DebateOrchestrator |
Core orchestrator — session lifecycle + L1-L3 integration + REST API | run_debate() / api_create_debate() / api_get_metrics() |
_run_simple(), _run_structured(), _run_consensus() — they currently return placeholder data. For production deployment, simply uncomment the imports and calls to wire in the full L1-L3 logic. The orchestrator layer (timeout, retry, logging, state management) is fully decoupled from the debate logic (L1-L3).
All agents and judges use the same model (e.g., GPT-4o), running on a single server. Simplest — suitable for internal team decision-support tools.
Different roles use different model providers:
| Role | Recommended Model | Reason |
|---|---|---|
| Pro Agent | Claude 3.5 Sonnet | Excels at building structured arguments with clear logic |
| Con Agent | GPT-4o | Excels at identifying flaws and raising counterexamples |
| Technical Judge | Claude 3.5 Sonnet | More precise on technical detail evaluation |
| Business Judge | GPT-4o | Stronger on business reasoning and data analysis |
| Risk Judge | Gemini 2.0 | Provides a different risk perspective, reducing homogeneous judgment |
Multi-model deployment's core value isn't "pick the best model for everything" — it's using model diversity to reduce systematic bias — the same principle as L3's differentiated multi-judge design.
For critical decisions (budget > $100k, legal/compliance implications, affecting many users), the debate system shouldn't auto-output the final conclusion. It should:
If you take away one core understanding from this article, let it be:
A production-grade debate system is not code — it's an information-processing pipeline where every stage must be observable, fault-tolerant, and cost-controlled.
Specifically:
When you get these three right, the debate system transforms from "an interesting AI experiment" into "decision infrastructure an organization can depend on."
This is the fourth and final article in the Multi-Agent Debate series. A look back at the journey:
| Article | Title | Core Contribution | Output |
|---|---|---|---|
| L1 | Why Debate Beats a Single Answer | Revealed single-model cognitive biases (confirmation bias, anchoring, overconfidence), proved value of adversarial collaboration | debate.py — dual-agent free-form debate |
| L2 | Structured Debate Protocol | Designed 3-round debate protocol (Opening → Cross-Exam → Closing), introduced multi-dimensional scoring and argument trace table | debate_protocol.py — structured debate + judge agent |
| L3 | Debate Scoring & Consensus | Multi-judge expert panel, score calibration, weighted voting, Krippendorff Alpha + Fleiss Kappa consensus metrics | debate_consensus.py — multi-judge consensus system |
| L4 | Production Deployment (this article) | Wrapped L1-L3 into a deployable production service: async orchestration, session store, error recovery, cost control, monitoring | debate_orchestrator.py — production orchestrator |
Looking back, this series follows a natural progression:
This "why → how → self-question → land" arc applies not just to debate systems — it applies to any journey from AI prototype to AI product.
Even after four articles, we still have important unsolved problems — they're beyond this series' scope but worth pondering in your own practice:
📎 Series note: This is the final article (4 of 4) in the Multi-Agent Debate series. Recommended reading order: L1: Adversarial Collaboration Intro → L2: Structured Debate Protocol → L3: Debate Scoring & Consensus → This article (L4).
🏁 Series complete. Return to AI Agent Exploration for more articles.