我们走了很长一段路。
在 L1 中,你让两个 Agent 互相质疑——一个简单的想法,但已经比单一回答更可靠。L2 给它加了结构——三轮协议、多维度评分、论据追溯表,把「自由辩论」变成了「可审计的辩论」。L3 解决了最棘手的问题——当裁判自己不可靠时怎么办——引入了多裁判专家面板、评分校准、Krippendorff's Alpha 和 Fleiss' Kappa 共识度量。
但所有这些都是脚本。你在终端里跑 python debate_consensus.py,看它打印出结论,然后关掉终端。那不是产品——那是原型。
本文要做的是:把 L1-L3 的一切变成一个可以部署到生产环境、被真实团队依赖的可靠服务。
这不是关于写更多的辩论逻辑——这是关于异步编排、会话管理、错误恢复、成本控制、可观测性。换句话说:这是关于把研究代码变成工程系统。
在深入架构之前,先回答一个根本问题:什么样的真实业务场景,会真正需要把一个多 Agent 辩论系统跑在生产环境里?
一个投资团队每天需要评估数十条市场消息。传统做法是分析师逐一阅读、形成判断。用辩论系统替代:
辩论结果不是「买」或「卖」——而是对关键分歧的结构化总结:双方在哪条论据上达成了一致(低分歧),在哪条上存在根本性分歧(高分歧)。分析师不需要全盘接受 AI 的结论——他们只需要聚焦于「AI 内部也无法达成共识的问题」。
一个工程团队面临技术选型——比如「单体还是微服务」、「PostgreSQL 还是 MongoDB」、「自建还是上云」。传统做法是开会讨论几小时,决策质量高度依赖团队中嗓门最大的人。
辩论系统可以:
会议从「讨论要不要微服务」变成了「针对 AI 识别出的三个核心分歧点进行定向讨论」——效率完全不同。
对于受监管行业(金融、医疗、隐私),每个策略变更可能触发数十条监管约束。传统做法是法务和合规团队逐一审查。辩论系统可以:
一个平台的内容审核系统自动标记了用户内容。用户申诉。传统做法是人工审核员重新审查——但这不规模化。
辩论系统可以让两个 Agent 辩论:一个代表平台审核标准(辩护原决定),一个代表用户(为内容做解释性辩护)。裁判组根据平台的内容政策进行评分。如果辩论清晰(Alpha ≥ 0.80),自动化裁决。如果高度分歧(Alpha < 0.50),升级到人工审核。
| 场景 | 辩论模式 | 裁判组 | 关键生产要求 |
|---|---|---|---|
| 市场分析 | L2 结构化 | 技术 + 商业 + 风险 | 低延迟、高吞吐 |
| 技术决策评审 | L3 共识 | 技术 + 商业 + 综合 | 可审计、人工兜底 |
| 合规评估 | L3 共识 | 风险 + 技术 + 综合 | 审计跟踪、不可篡改 |
| 内容申诉 | L2/L3 混合 | 综合 + 风险 | 大规模、自动升级 |
一场生产级辩论涉及多个 LLM 调用、多轮编排、状态持久化和错误恢复。它不是一份 Python 脚本——它是一个信息处理流水线。
| 组件 | 职责 | 技术选型 |
|---|---|---|
| 辩论编排器 | 管理辩论会话生命周期:创建 → 执行 → 完成 | asyncio + 协程 |
| 会话存储 | 持久化所有辩论状态、中间结果、最终结论 | SQLite(初创)/ PostgreSQL(规模化) |
| 审计日志 | 记录每一轮、每一次 Agent 调用、每一次裁判评分的完整时间线 | 结构化日志表 + JSON 数据列 |
| 成本追踪器 | 实时追踪 token 消耗和预估费用,支持预算告警 | Token 计数 + 模型定价表 |
| LLM 网关 | 统一 LLM 调用接口,支持多模型、负载均衡、速率限制 | OpenAI SDK + 重试中间件 |
| 监控面板 | 辩论成功率、平均时长、成本趋势、Alpha 分布 | 指标收集 + 可视化 |
辩论系统的 LLM 调用天然是 I/O 密集型的——等待 API 响应的时间远多于 CPU 计算时间。因此异步编程是必须的,不是可选的。
核心编排流程:
async def run_debate_pipeline(session):
"""一场完整辩论的异步编排流程"""
with session_timeout(300): # 5 分钟全局超时
# 阶段 1: 双方开场论据 — 可并行
pro_args, con_args = await asyncio.gather(
pro_agent.generate_opening(topic),
con_agent.generate_opening(topic)
)
# 阶段 2: 交叉质询 — 串行(依赖对方内容)
pro_cross = await pro_agent.cross_examine(con_args)
con_cross = await con_agent.cross_examine(pro_args)
# 阶段 3: 总结 — 可并行
pro_close, con_close = await asyncio.gather(
pro_agent.closing_statement(con_cross),
con_agent.closing_statement(pro_cross)
)
# 阶段 4: 裁判评估 — 全部并行(独立评分)
judge_results = await asyncio.gather(*[
judge.evaluate(transcript)
for judge in judge_panel
])
# 阶段 5: 共识计算 — CPU 密集,不需要异步
consensus = compute_consensus(judge_results)
return DebateResult(pro_args, con_args, consensus)
关键设计决策:
辩论不是瞬间完成的。一场 L3 共识辩论从创建到结论可能需要 30-60 秒。在这期间,调用方需要非阻塞地查询状态。
辩论会话的状态机:
| 状态 | 含义 | 可转换到 |
|---|---|---|
CREATED |
会话已创建,尚未开始执行 | DEBATING, FAILED |
DEBATING |
正在执行辩论流程(Agent 交互中) | JUDGING, FAILED, TIMED_OUT |
JUDGING |
辩论完成,正在执行裁判评估 | COMPLETED, FAILED |
COMPLETED |
成功完成,结果已存储 | (终态) |
FAILED |
执行失败(API 错误、解析错误等) | DEBATING(重试) |
TIMED_OUT |
超过全局超时限制 | (终态) |
每个状态变化都写入审计日志。如果一场辩论失败,你可以从日志中精确重建:在哪个阶段、哪个 Agent、哪个 API 调用出了问题。
辩论系统存在大量重复计算的机会:
| 缓存层级 | 缓存内容 | TTL | 存储 |
|---|---|---|---|
| L1: 会话结果缓存 | 完整辩论结果(session_id → result) | 1 小时 | 内存 LRU |
| L2: 辩题哈希缓存 | 辩题标准化哈希 → session_id | 1 小时 | Redis / SQLite |
| L3: 裁判评估缓存 | (辩论记录哈希 + 裁判配置哈希) → 评估结果 | 24 小时 | SQLite |
任何不依赖他人输出的 Agent 调用都应该并行。具体规则:
| 阶段 | 并行策略 | 延迟节省 |
|---|---|---|
| 开场论据(双方) | asyncio.gather(pro_opening, con_opening) |
~50% |
| 交叉质询 | 串行(有数据依赖) | N/A |
| 总结陈词(双方) | asyncio.gather(pro_close, con_close) |
~50% |
| 裁判评估(N 位裁判) | asyncio.gather(*judges) |
~75%(对于 4 位裁判) |
| 共识计算 | 同步执行(CPU 密集) | N/A |
两种运行模式,适用不同场景:
| 模式 | 行为 | 适用场景 |
|---|---|---|
| 流式(同步等待) | 用户提交辩题,阻塞等待完整结果后返回 | 交互式分析(用户在仪表盘上发起辩论) |
| 批量(异步提交) | 用户提交辩题,立即返回 session_id,后续轮询或 webhook 通知 | 计划任务(每日市场分析)、大规模批量评估 |
流式模式需要 SSE(Server-Sent Events)来实时推送每个辩论阶段的进展——这不仅改善用户体验,还能让用户在辩论进行到一半时看到某些关键论据并提前介入。
在生产环境中运行辩论系统,你需要在调用 LLM 之前知道大概花多少钱。
| 辩论模式 | LLM 调用次数 | 预估 Token | 预估费用(GPT-4o) | 预估费用(DeepSeek) |
|---|---|---|---|---|
| L1 简单辩论 | ~6 次 | ~8,000 | $0.04 | $0.002 |
| L2 结构化辩论 | ~12 次 | ~25,000 | $0.15 | $0.007 |
| L3 共识辩论(4 裁判) | ~20 次 | ~60,000 | $0.40 | $0.02 |
建议的成本控制策略:
DebateMode.SIMPLE 做快速探索,确认真正关键的问题再升级到 CONSENSUS。辩论系统涉及 6-20 次 LLM API 调用。每一次都可能失败。你需要分类错误并做差异化处理:
| 错误类型 | 示例 | 处理策略 | 最大重试 |
|---|---|---|---|
| 瞬时 API 错误 | 429 (速率限制), 503 (服务不可用) | 指数退避重试: 1s, 2s, 4s, 8s | 4 次 |
| 内容过滤错误 | API 拒绝生成(安全过滤触发) | 标记论据为「被过滤」,继续使用剩余论据 | 0 次(不重试) |
| JSON 解析失败 | LLM 返回格式不符合预期 JSON | 重试(带更严格的格式要求提示词) | 2 次 |
| 超时错误 | 单次调用超过 60s 无响应 | 取消当前调用,重试一次 | 1 次 |
| 认证错误 | 401 (API key 无效) | 不重试——立即告警 | 0 次 |
三重超时保护:
# 每层超时配置
TIMEOUT_SINGLE_CALL = 60 # 单个 LLM API 调用
TIMEOUT_PER_PHASE = 120 # 辩论的单个阶段(如质询)
TIMEOUT_GLOBAL_SESSION = 300 # 整场辩论
如果单个 Agent 调用超时,系统可以:
生产辩论系统需要的监控指标,远超「API 调用成功/失败」:
| 指标类别 | 具体指标 | 告警阈值建议 |
|---|---|---|
| 可用性 | 辩论成功率、失败率、超时率 | 成功率 < 95% 告警 |
| 性能 | P50/P95/P99 辩论完成时间、各阶段耗时分布 | P95 > 120s 告警 |
| 成本 | 每日/每辩论/每用户费用、token 消耗趋势 | 日费用 > 预算 80% 告警 |
| 共识质量 | Alpha/Kappa 分布、不可调和分歧比例 | 不可调和分歧 > 30% 调查 |
| 裁判健康 | 每位裁判的评分均值、标准差、与其他裁判的偏差 | 单裁判连续偏差 > 2σ 告警 |
| 模型可用性 | 各模型的错误率、延迟、速率限制触发频率 | 单模型错误率 > 10% 切换 |
以下代码将 L1-L3 的所有组件包装成一个可部署的生产服务。核心组件:
DebateOrchestrator Optional[Dict]:
"""查询单个会话"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM sessions WHERE session_id = ?",
(session_id,)
)
row = await cursor.fetchone()
return dict(row) if row else None
async def list_sessions(
self, limit: int = 20, status: str = None
) -> list:
"""列出最近的会话"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
if status:
cursor = await db.execute(
"SELECT * FROM sessions WHERE status = ? "
"ORDER BY created_at DESC LIMIT ?",
(status, limit)
)
else:
cursor = await db.execute(
"SELECT * FROM sessions "
"ORDER BY created_at DESC LIMIT ?",
(limit,)
)
return [dict(row) for row in await cursor.fetchall()]
async def log_event(
self, session_id: str, event_type: EventType,
agent_name: str = None, round_number: int = None,
data: Dict = None
):
"""写入审计事件"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO audit_log
(session_id, event_type, agent_name, round_number,
timestamp, data_json)
VALUES (?, ?, ?, ?, ?, ?)
""", (
session_id, event_type.value, agent_name,
round_number,
datetime.now(timezone.utc).isoformat(),
json.dumps(data, ensure_ascii=False) if data else None,
))
await db.commit()
# ══════════════════════════════════════════════
# 3. 成本追踪器
# ══════════════════════════════════════════════
class CostTracker:
"""Token 计数 + 费用估算 (近似定价,仅供预算参考)"""
# $/1M tokens (输入, 输出)
PRICING: Dict[str, tuple] = {
"gpt-4o": (2.50, 10.00),
"gpt-4o-mini": (0.15, 0.60),
"gpt-4-turbo": (10.00, 30.00),
"claude-3-opus": (15.00, 75.00),
"claude-3-sonnet": (3.00, 15.00),
"deepseek-chat": (0.14, 0.28),
"deepseek-reasoner": (0.55, 2.19),
}
@classmethod
def estimate_cost(
cls, model: str, prompt_tokens: int, completion_tokens: int
) -> float:
"""计算单次 LLM 调用的估算费用"""
in_price, out_price = cls.PRICING.get(model, (5.0, 15.0))
cost = (
(prompt_tokens / 1_000_000) * in_price +
(completion_tokens / 1_000_000) * out_price
)
return round(cost, 6)
@classmethod
def record_call(
cls, costs: Dict[str, CostRecord], model: str,
prompt_tokens: int, completion_tokens: int
):
"""记录一次 LLM 调用到 cost 字典中"""
if model not in costs:
costs[model] = CostRecord(model=model)
c = costs[model]
c.prompt_tokens += prompt_tokens
c.completion_tokens += completion_tokens
c.call_count += 1
c.estimated_cost_usd += cls.estimate_cost(
model, prompt_tokens, completion_tokens
)
@classmethod
def total_cost(cls, costs: Dict[str, CostRecord]) -> float:
"""计算总费用"""
return round(
sum(c.estimated_cost_usd for c in costs.values()), 4
)
# ══════════════════════════════════════════════
# 4. 错误处理与重试
# ══════════════════════════════════════════════
async def with_retry(
fn: Callable,
session: DebateSession,
store: SessionStore,
label: str,
max_retries: int = 3,
base_delay: float = 1.0,
):
"""
为 LLM 调用添加指数退避重试。
根据错误类型决定是否重试和重试次数。
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return await fn()
except Exception as e:
last_error = e
error_str = str(e).lower()
# 不可重试的错误
if "401" in error_str or "403" in error_str:
await store.log_event(
session.session_id, EventType.LLM_ERROR,
agent_name=label,
data={"attempt": attempt + 1, "error": str(e),
"fatal": True}
)
raise
# 内容过滤 — 不重试
if "content_filter" in error_str or "safety" in error_str:
await store.log_event(
session.session_id, EventType.LLM_ERROR,
agent_name=label,
data={"attempt": attempt + 1, "error": str(e),
"type": "content_filter"}
)
return "[内容被安全策略过滤]"
if attempt >= max_retries:
break
delay = base_delay * (2 ** attempt)
await store.log_event(
session.session_id, EventType.RETRY,
agent_name=label,
data={"attempt": attempt + 1, "error": str(e),
"delay_seconds": delay}
)
await asyncio.sleep(delay)
raise last_error
# ══════════════════════════════════════════════
# 5. 辩论编排器 (核心)
# ══════════════════════════════════════════════
class DebateOrchestrator:
"""
生产级辩论编排器。
管理会话生命周期、异步执行、错误恢复、成本追踪。
"""
def __init__(
self,
db_path: str = "debate_sessions.db",
default_model: str = "gpt-4o",
daily_budget_usd: float = 10.0,
):
self.store = SessionStore(db_path)
self.default_model = default_model
self.daily_budget = daily_budget_usd
self.tracker = CostTracker()
self.active_debates: Dict[str, asyncio.Task] = {}
self._daily_spend = 0.0
self._budget_reset_date = datetime.now(timezone.utc).date()
async def start(self):
"""初始化编排器(创建数据库表)"""
await self.store.init()
def _check_budget(self, estimated_cost: float) -> bool:
"""检查是否超出日预算"""
today = datetime.now(timezone.utc).date()
if today != self._budget_reset_date:
self._daily_spend = 0.0
self._budget_reset_date = today
return (self._daily_spend + estimated_cost) <= self.daily_budget
def create_session(
self,
topic: str,
mode: DebateMode = DebateMode.CONSENSUS,
pro_model: str = None,
con_model: str = None,
judge_models: list = None,
timeout_seconds: int = 300,
) -> DebateSession:
"""创建一个新的辩论会话"""
return DebateSession(
session_id=str(uuid.uuid4())[:8],
topic=topic,
mode=mode,
pro_model=pro_model or self.default_model,
con_model=con_model or self.default_model,
judge_models=judge_models or [self.default_model],
created_at=datetime.now(timezone.utc).isoformat(),
timeout_seconds=timeout_seconds,
)
async def run_debate(
self, session: DebateSession
) -> DebateSession:
"""
执行一场完整的辩论。
包含超时保护、错误恢复和预算检查。
"""
start_time = time.time()
# 预算检查
est_cost = self._estimate_session_cost(session)
if not self._check_budget(est_cost):
session.status = SessionStatus.FAILED
session.error = (
f"超出日预算 (${self.daily_budget})。"
f"预估费用 ${est_cost:.4f} + 已用 ${self._daily_spend:.4f}"
)
await self.store.save_session(session)
return session
session.status = SessionStatus.DEBATING
await self.store.save_session(session)
await self.store.log_event(
session.session_id, EventType.DEBATE_STARTED,
data={"mode": session.mode.value, "topic": session.topic}
)
try:
result = await asyncio.wait_for(
self._execute(session),
timeout=session.timeout_seconds,
)
session.result = result
session.status = SessionStatus.COMPLETED
except asyncio.TimeoutError:
session.status = SessionStatus.TIMED_OUT
session.error = (
f"辩论超时 ({session.timeout_seconds}s)"
)
except Exception as e:
if session.retry_count < session.max_retries:
session.retry_count += 1
session.error = str(e)
await self.store.save_session(session)
await self.store.log_event(
session.session_id, EventType.RETRY,
data={"retry": session.retry_count,
"error": str(e)}
)
return await self.run_debate(session)
session.status = SessionStatus.FAILED
session.error = str(e)
finally:
session.completed_at = (
datetime.now(timezone.utc).isoformat()
)
session.elapsed_seconds = round(
time.time() - start_time, 2
)
total_cost = self.tracker.total_cost(session.costs)
self._daily_spend += total_cost
await self.store.save_session(session)
final_event = (
EventType.DEBATE_COMPLETED
if session.status == SessionStatus.COMPLETED
else EventType.DEBATE_FAILED
)
await self.store.log_event(
session.session_id, final_event,
data={
"status": session.status.value,
"elapsed": session.elapsed_seconds,
"cost": total_cost,
"retries": session.retry_count,
}
)
return session
async def _execute(self, session: DebateSession) -> Dict:
"""
核心执行流程 — 根据模式分发。
生产环境中,这里调用 L1-L3 的实际函数。
"""
methods = {
DebateMode.SIMPLE: self._run_simple,
DebateMode.STRUCTURED: self._run_structured,
DebateMode.CONSENSUS: self._run_consensus,
}
runner = methods.get(session.mode, self._run_consensus)
return await runner(session)
async def _run_simple(self, session: DebateSession) -> Dict:
"""L1: 自由式辩论"""
# 生产代码中:
# result = run_debate(
# topic=session.topic, rounds=3,
# pro_model=session.pro_model,
# con_model=session.con_model
# )
await self.store.log_event(
session.session_id, EventType.ROUND_START,
round_number=1
)
return {
"mode": "simple", "topic": session.topic,
"result": "L1 simple debate result placeholder",
"rounds_completed": 3,
}
async def _run_structured(self, session: DebateSession) -> Dict:
"""L2: 结构化辩论 + 单裁判"""
# 生产代码中:
# result = run_structured_debate(
# topic=session.topic,
# pro_model=session.pro_model,
# con_model=session.con_model,
# judge_model=session.judge_models[0]
# )
session.status = SessionStatus.JUDGING
await self.store.save_session(session)
return {
"mode": "structured", "topic": session.topic,
"result": "L2 structured debate result placeholder",
"trace_table": [],
}
async def _run_consensus(self, session: DebateSession) -> Dict:
"""L3: 多裁判共识辩论"""
# 生产代码中:
# pro_args = [...]
# con_args = [...]
# panel = MultiJudgePanel([
# JudgeProfile(name="技术裁判",
# domain=ExpertiseDomain.TECHNICAL),
# JudgeProfile(name="商业裁判",
# domain=ExpertiseDomain.BUSINESS),
# JudgeProfile(name="风险裁判",
# domain=ExpertiseDomain.RISK),
# JudgeProfile(name="综合裁判",
# domain=ExpertiseDomain.GENERAL),
# ])
# result: PanelResult = panel.evaluate(
# topic=session.topic,
# pro_args=pro_args, con_args=con_args,
# pro_cross_text=..., con_cross_text=...,
# pro_closing=..., con_closing=...
# )
# return {
# "mode": "consensus",
# "alpha": result.alpha,
# "kappa": result.kappa,
# "weighted_pro": result.weighted_result["pro"],
# "weighted_con": result.weighted_result["con"],
# "irreconcilable": result.divergence["irreconcilable"],
# "recommendation": result.divergence["recommendation"],
# }
session.status = SessionStatus.JUDGING
await self.store.save_session(session)
return {
"mode": "consensus", "topic": session.topic,
"result": "L3 consensus debate result placeholder",
"alpha": 0.78, "kappa": 0.72,
"irreconcilable": False,
}
def _estimate_session_cost(
self, session: DebateSession
) -> float:
"""预估单场辩论的费用"""
base_tokens = {
DebateMode.SIMPLE: 8_000,
DebateMode.STRUCTURED: 25_000,
DebateMode.CONSENSUS: 60_000,
}
tokens = base_tokens.get(session.mode, 60_000)
model = session.pro_model
in_price, out_price = CostTracker.PRICING.get(
model, (5.0, 15.0)
)
# 粗略估算: 60% 输入, 40% 输出
return round(
(tokens * 0.6 / 1_000_000) * in_price +
(tokens * 0.4 / 1_000_000) * out_price, 4
)
# ── REST-ish API 方法 ──
async def api_create_debate(
self, topic: str, mode: str = "consensus"
) -> Dict:
"""创建并异步启动一场辩论"""
debate_mode = DebateMode(mode)
session = self.create_session(topic=topic, mode=debate_mode)
await self.store.save_session(session)
# 后台执行
task = asyncio.create_task(self.run_debate(session))
self.active_debates[session.session_id] = task
task.add_done_callback(
lambda t: self.active_debates.pop(
session.session_id, None
)
)
return {
"session_id": session.session_id,
"status": "accepted",
"mode": session.mode.value,
"topic": session.topic,
"created_at": session.created_at,
"poll_url": f"/debates/{session.session_id}",
}
async def api_get_result(
self, session_id: str
) -> Dict:
"""查询辩论状态/结果"""
data = await self.store.get_session(session_id)
if not data:
return {"error": "Session not found"}
return data
async def api_estimate_cost(
self, topic: str, mode: str = "consensus"
) -> Dict:
"""预估费用(不执行辩论)"""
session = self.create_session(
topic=topic, mode=DebateMode(mode)
)
cost = self._estimate_session_cost(session)
return {
"topic": topic, "mode": mode,
"estimated_cost_usd": cost,
"daily_budget_remaining": round(
self.daily_budget - self._daily_spend, 4
),
}
async def api_get_metrics(self) -> Dict:
"""获取监控指标"""
sessions = await self.store.list_sessions(limit=200)
total = len(sessions)
if total == 0:
return {"total_sessions": 0}
completed = sum(
1 for s in sessions
if s["status"] == "completed"
)
failed = sum(
1 for s in sessions
if s["status"] == "failed"
)
timed_out = sum(
1 for s in sessions
if s["status"] == "timed_out"
)
times = [
s["elapsed_seconds"] for s in sessions
if s["elapsed_seconds"] and s["elapsed_seconds"] > 0
]
avg_time = (
sum(times) / len(times) if times else 0
)
sorted_times = sorted(times) if times else [0]
return {
"total_sessions": total,
"completed": completed,
"failed": failed,
"timed_out": timed_out,
"completion_rate_pct": round(
completed / total * 100, 1
),
"avg_duration_seconds": round(avg_time, 1),
"p95_duration_seconds": round(
sorted_times[
int(len(sorted_times) * 0.95)
] if len(sorted_times) >= 20
else sorted_times[-1] if sorted_times else 0,
1,
),
"active_debates": len(self.active_debates),
"daily_spend_usd": round(self._daily_spend, 4),
"budget_remaining": round(
self.daily_budget - self._daily_spend, 4
),
}
async def api_health(self) -> Dict:
"""健康检查"""
return {
"status": "healthy",
"active_debates": len(self.active_debates),
"daily_spend": round(self._daily_spend, 4),
}
# ══════════════════════════════════════════════
# 6. 快速启动示例
# ══════════════════════════════════════════════
async def quick_start_demo():
"""演示如何启动和使用编排器"""
orch = DebateOrchestrator(
db_path="debate_sessions.db",
default_model="gpt-4o",
daily_budget_usd=10.0,
)
await orch.start()
print("✅ 编排器已启动\n")
# ── 1. 预估费用 ──
est = await orch.api_estimate_cost(
"初创公司是否应从第一天就采用微服务?",
mode="consensus"
)
print(f"💰 费用预估: ${est['estimated_cost_usd']}")
print(f" 今日剩余预算: ${est['daily_budget_remaining']}\n")
# ── 2. 创建并启动辩论 ──
debate = await orch.api_create_debate(
topic="初创公司是否应从第一天就采用微服务?",
mode="consensus"
)
print(
f"🚀 辩论已启动: {debate['session_id']} "
f"({debate['mode']})"
)
# ── 3. 轮询等待结果 ──
for i in range(10):
await asyncio.sleep(3)
result = await orch.api_get_result(debate["session_id"])
status = result.get("status", "unknown")
print(f" [{i+1}] 状态: {status}")
if status in ("completed", "failed", "timed_out"):
print(f" 耗时: {result.get('elapsed_seconds', 0)}s")
if "error" in result and result["error"]:
print(f" 错误: {result['error']}")
break
# ── 4. 查看指标 ──
metrics = await orch.api_get_metrics()
print(f"\n📊 系统指标:")
print(f" 总辩论: {metrics['total_sessions']}")
print(f" 成功率: {metrics['completion_rate_pct']}%")
print(f" 平均时长: {metrics['avg_duration_seconds']}s")
print(f" 今日费用: ${metrics['daily_spend_usd']}")
# ── 5. 健康检查 ──
health = await orch.api_health()
print(f"🫀 健康: {health['status']}")
if __name__ == "__main__":
print("=" * 60)
print("🚀 多 Agent 辩论系统 — 生产编排器")
print("=" * 60)
print()
print("要运行快速演示 (需要有效的 LLM API 凭证):")
print(" asyncio.run(quick_start_demo())")
print()
print("要部署为 Web 服务,请将 DebateOrchestrator 的方法")
print("包装到 FastAPI/Flask 路由中。示例:")
print()
print(" from fastapi import FastAPI")
print(" app = FastAPI()")
print(" orch = DebateOrchestrator()")
print()
print(" @app.post('/debates')")
print(" async def create(topic: str, mode: str = 'consensus'):")
print(" return await orch.api_create_debate(topic, mode)")
print()
print(" @app.get('/debates/{session_id}')")
print(" async def get_result(session_id: str):")
print(" return await orch.api_get_result(session_id)")
print("=" * 60)
| 组件 | 功能 | 关键方法 |
|---|---|---|
DebateSession |
辩论会话数据模型——包含全部生命周期状态 | 字段: session_id, topic, mode, status, costs, result, error |
SessionStore |
SQLite 持久化 + 审计日志 | init() / save_session() / log_event() |
CostTracker |
多模型定价表 + token 计数 + 费用估算 | record_call() / total_cost() |
with_retry() |
指数退避重试,根据错误类型差异化处理 | 区分瞬时错误、内容过滤、认证错误 |
DebateOrchestrator |
核心编排器——管理会话生命周期 + L1-L3 集成 + REST API | run_debate() / api_create_debate() / api_get_metrics() |
DebateOrchestrator 的三个方法——_run_simple()、_run_structured()、_run_consensus()——它们目前返回占位数据。在生产部署时,只需取消注释其中的 import 和调用,即可将 L1-L3 的完整逻辑接入。编排器层(超时、重试、日志、状态管理)与辩论逻辑(L1-L3)是完全解耦的。
所有 Agent 和裁判使用同一个模型(如 GPT-4o),跑在一台服务器上。最简单,适合团队内部的决策辅助工具。
不同角色使用不同的模型提供商:
| 角色 | 推荐模型 | 原因 |
|---|---|---|
| 正方 Agent | Claude 3.5 Sonnet | 擅长构建结构化论证,逻辑清晰 |
| 反方 Agent | GPT-4o | 擅长识别漏洞和提出反例 |
| 技术裁判 | Claude 3.5 Sonnet | 在技术细节评估上更精准 |
| 商业裁判 | GPT-4o | 商业推理和数据分析能力更强 |
| 风险裁判 | Gemini 2.0 | 提供不同的风险视角,减少同质化判断 |
多模型部署的核心价值不是「选最好的模型做所有事」,而是通过模型多样性降低系统性偏误——这与 L3 的多裁判差异化设计是同一个原理。
对于关键决策(预算 > $100k、涉及法律合规、影响大量用户),辩论系统不应自动输出最终结论。它应该:
如果你想从本文带走一个核心认知,那就是:
一个生产级的辩论系统不是代码——它是一个信息处理流水线,其中每一阶段都必须是可观测的、可容错的和成本可控的。
具体来说:
当你把这三点做到位,辩论系统就从「一个有趣的 AI 实验」变成了「一个可以被组织依赖的决策基础设施」。
这是多 Agent 辩论系列的第四篇,也是最后一篇。回顾我们走过的路:
| 篇章 | 标题 | 核心贡献 | 产出 |
|---|---|---|---|
| L1 | 为什么辩论比单一回答更可靠 | 揭示单一模型的认知偏误(确认偏误、锚定效应、过度自信),证明对抗协作的价值 | debate.py — 双 Agent 自由式辩论 |
| L2 | 结构化辩论协议 | 设计 3 轮辩论协议(开场→质询→总结),引入多维度评分和论据追溯表 | debate_protocol.py — 结构化辩论 + 裁判 Agent |
| L3 | 辩论的评分与共识 | 多裁判专家面板、评分校准、加权投票、Krippendorff Alpha + Fleiss Kappa 共识度量 | debate_consensus.py — 多裁判共识系统 |
| L4 | 生产部署(本文) | 将 L1-L3 包装为可部署的生产服务:异步编排、会话存储、错误恢复、成本控制、监控 | debate_orchestrator.py — 生产级编排器 |
回头看,这个系列遵循了一个自然的进阶路径:
这个「为什么 → 怎么做 → 自我质疑 → 落地」的思维弧,不仅适用于辩论系统——它适用于任何从 AI 原型到 AI 产品的过程。
即使经过了四篇文章,我们仍然有重要的未解问题——它们超出了当前系列的范畴,但值得你在自己的实践中思考:
📎 系列说明:本文是多 Agent 辩论系列的终篇(第 4 篇)。建议按顺序阅读:L1:对抗协作入门 → L2:结构化辩论协议 → L3:辩论的评分与共识 → 本文 L4。
🏁 本系列完结。返回 AI 智能体探索 查看更多文章。