In L2: Structured Debate Protocol, we gave the judge a "ruler" — four scoring dimensions (logic, evidence, responsiveness, honesty), each argument scored independently. That's far more reliable than L1's free-form judging.
But it introduces a new problem: what if the judge itself is unreliable?
More specifically:
These questions share a common name: inter-rater reliability. In human evaluation domains (clinical diagnosis, academic peer review, judicial decisions), this has been studied for over half a century — but for AI Agent debate systems, it's only beginning to be taken seriously.
This article's goal: transplant mature methodologies from human evaluation into the judging layer of multi-agent debate systems.
Don't get me wrong — L2's StructuredJudge works well in single-judge scenarios. The problem is single point of failure. No matter how detailed your scoring rubric, the judge is still an LLM — with its own knowledge blind spots, preferences, and randomness.
| Problem | Description | Consequence |
|---|---|---|
| Calibration Bias | Each judge has their own "scoring habits" — some favor 7-9, others 4-7 | Raw scores can't be compared across judges directly |
| Domain Blind Spots | Judges lack deep knowledge in certain technical domains and can't assess technical accuracy | Technical arguments get surface-level scoring instead of substantive evaluation |
| Single Perspective | One judge can only view the problem from one angle (technical, business, risk, ethics) | Important cross-dimensional trade-offs are missed |
These three problems aren't LLM-specific. Human judges have exactly the same issues — that's why academic journals use 2-4 reviewers, courts use juries, and competitive sports use multiple judges with trimmed extremes.
Before discussing multi-judge systems, let's solve the fundamental problem: how do we make different judges' scores comparable?
Say you have two judges scoring the same arguments:
| Argument | Judge A (Strict) | Judge B (Lenient) |
|---|---|---|
| PRO-1: Logic | 5 | 8 |
| PRO-2: Logic | 6 | 9 |
| PRO-3: Logic | 7 | 10 |
Judge A's scores are consistently 2-3 points lower. Yet both judges' rank ordering is identical: PRO-3 > PRO-2 > PRO-1. This tells us:
The absolute value of raw scores doesn't matter — what matters is relative ordering and standardized gaps.
Standardize each judge's scores independently:
z_score = (raw_score - judge_mean) / judge_stddev
After standardization, every judge's score distribution has mean = 0 and standard deviation = 1. You can now directly compare: a Z-score of -1.5 always means "significantly below this judge's average," regardless of which judge it came from.
Pros: Eliminates individual scoring-habit differences.
Cons: If a judge only scored a few arguments (e.g., 3), the mean estimate is unreliable.
Compress scores to [0, 1]:
normalized = (raw_score - judge_min) / (judge_max - judge_min)
Pros: Simple, intuitive, no distribution assumptions.
Cons: Extreme values severely affect normalization — a single 10 and a single 1 will squeeze all middle scores together.
| Scenario | Recommended | Reason |
|---|---|---|
| Short debate (≤ 3 arguments per side) | Min-Max | Too few scores for reliable mean/std estimation |
| Long debate (≥ 5 arguments per side) | Z-Score | Sufficient samples for accurate distribution estimation |
| Cross-debate comparison | Z-Score | Different debates have different score ranges; Z-Score enables cross-debate comparison |
After calibration, the next question: how much do these judges actually agree?
If you have three judges scoring the same set of arguments, you need a single number to quantify their agreement. This is what inter-rater reliability answers.
The two most commonly used metrics:
| Metric | Data Type | Notes |
|---|---|---|
| Krippendorff's Alpha | Interval/ratio data (e.g., 1-10 scores) | The most general reliability metric — supports multiple raters, multiple data types, and missing values |
| Fleiss' Kappa | Categorical data (e.g., UPHELD/REFUTED/UNCERTAIN) | Measures categorical agreement — ideal for argument trace-table status judgments |
For our debate scenario, we need both: Alpha for numerical scores (logic/evidence/responsiveness/honesty), Kappa for categorical judgments (is the argument UPHELD, REFUTED, or UNCERTAIN?).
Interpreting Alpha:
| α Range | Interpretation | Action |
|---|---|---|
| α ≥ 0.80 | High agreement | Reliable — aggregate scores and decide |
| 0.67 ≤ α < 0.80 | Moderate agreement | Acceptable — aggregate but flag high-variance items |
| 0.50 ≤ α < 0.67 | Low agreement | Caution — analyze divergence sources; don't blindly average |
| α < 0.50 | Unacceptable | Flag as irreconcilable — don't rush to conclusions; need more info or human intervention |
One judge isn't enough — you need a judge panel. But a panel isn't just running the same judge three times — you need differentiated judge roles.
Based on decision scenarios, we define four complementary judge roles:
| Role | Expertise Domain | Focus | Dimension Emphasis |
|---|---|---|---|
| Technical Judge | Implementation, architecture | Technical accuracy, implementation feasibility | Logic 40%, Evidence 35% |
| Business Judge | Business model, ROI, cost-benefit | Business soundness and cost implications | Evidence 40%, Logic 30% |
| Risk Judge | Risk assessment, edge cases, failure modes | Whether arguments overlook hidden risks and boundary conditions | Responsiveness 35%, Honesty 30% |
| General Judge | Holistic evaluation, balancing all factors | Overall debate quality and information completeness | Standard weights (L2 defaults) |
Different dimension emphasis means: the Technical judge weights logically rigorous arguments higher, while the Business judge weights arguments with concrete ROI data higher. This isn't favoritism — it's targeted differentiation.
Multi-judge evaluation isn't "run them in parallel and average." It follows a strict execution order:
Not all judges are equal. We introduce two layers of weighting:
Different judges have different authority for different types of debates:
| Debate Topic Type | Technical Weight | Business Weight | Risk Weight | General Weight |
|---|---|---|---|---|
| Tech Stack Decisions | 0.35 | 0.20 | 0.25 | 0.20 |
| Business Decisions | 0.20 | 0.40 | 0.20 | 0.20 |
| Security/Compliance | 0.20 | 0.15 | 0.45 | 0.20 |
These weights shouldn't be arbitrary — they should be automatically matched by keywords in the debate topic. For instance, when a topic contains "architecture," "tech stack," or "performance," the Technical judge's weight is automatically raised.
This is a more advanced mechanism — tracking each judge's performance across historical debates and dynamically adjusting weights.
How do we assess judge accuracy? A practical approach:
This is the most technically substantive section of the article. Let's break down the mathematical principles and code implementations of both metrics.
Alpha's core idea:
α = 1 - (observed disagreement / expected random disagreement)
α = 1 - (D_o / D_e)
Where:
D_o = actual observed disagreement between judges (sum of weighted squared differences)
D_e = expected disagreement if judges scored randomly
Alpha = 1.0 means perfect agreement (observed disagreement is 0). Alpha = 0 means agreement is no better than random. Alpha can be negative — indicating systematic disagreement (judges have an inverse relationship).
For interval data (our 1-10 scores), disagreement is measured by squared difference: if two judges score the same argument N points apart, that contributes N² to the disagreement weight. A 1-point gap is a small disagreement; a 5-point gap is severe.
Kappa is for categorical data. In our scenario, it measures how consistently multiple judges classify arguments into final statuses (UPHELD / PARTIALLY_UPHELD / REFUTED / UNCERTAIN).
κ = (P_o - P_e) / (1 - P_e)
Where:
P_o = observed proportion of agreement between judges
P_e = expected proportion of agreement if judges classified randomly
Unlike Alpha, Kappa only cares about "is the classification the same?" and not "how different are they?" Two judges classifying the same argument as UPHELD vs. PARTIALLY_UPHELD counts the same as UPHELD vs. REFUTED — both are "disagreement."
This aligns with real-world decision-making: for most decision-makers, the key question is whether judges agree on the conclusion — is this argument "upheld" or not?
| Metric | Data Type | Disagreement Measure | Usage in Our Code |
|---|---|---|---|
| Krippendorff's α | 1-10 numeric scores | Squared difference (degree-sensitive) | Measures scoring consistency (logic/evidence/responsiveness/honesty) |
| Fleiss' κ | Categories (UPHELD / REFUTED / …) | Equality (binary) | Measures argument-trace classification consistency |
This is the multi-judge system's most critical capability: the courage to say "I don't know."
Conditions that trigger irreconcilable disagreement:
When irreconcilable disagreement is triggered, the system must NOT force a "winner." It should:
A system's honesty matters more than its confidence. A system bold enough to say "our judges disagree significantly — this question requires your personal judgment" is far more reliable than one that强行 averages and outputs "Pro wins 5.67 vs. 5.32."
The code below extends L2's debate_protocol.py. It adds four core components:
MultiJudgePanel): Manages multiple differentiated judges, independently scores then synthesizes results.ScoreCalibrator): Z-Score and Min-Max normalization.WeightedVoter): Dual-layer weighting: domain relevance + historical accuracy.ConsensusCalculator): Krippendorff's Alpha + Fleiss' Kappa + divergence detection.Save as debate_consensus.py, in the same directory as L1's debate.py and L2's debate_protocol.py.
"""
Debate Consensus System — Multi-Judge Panel + Score Calibration
+ Weighted Voting + Consensus Metrics
Extends L2's debate_protocol.py with multi-judge orchestration
and consensus calculation.
Requires: pip install openai numpy
"""
import os
import json
import math
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
from openai import OpenAI
# ──────────────────────────────────────────────
# Import from L2 (if debate_protocol.py is in the same directory)
# ──────────────────────────────────────────────
# from debate_protocol import (
# Argument, CrossExamResponse, ScoringResult,
# StructuredDebateAgent, StructuredJudge,
# run_structured_debate
# )
#
# The following redefines core types for self-contained execution.
# ──────────────────────────────────────────────
client = OpenAI(
api_key="your-api-key",
base_url="https://api.example.com/v1"
)
@dataclass
class Argument:
"""Structured argument (reusing L2 definition)"""
id: str
claim: str
reasoning: str
evidence: str
def to_dict(self) -> dict:
return {
"id": self.id, "claim": self.claim,
"reasoning": self.reasoning, "evidence": self.evidence
}
def to_text(self) -> str:
return (
f"[{self.id}] Claim: {self.claim}\n"
f"Reasoning: {self.reasoning}\n"
f"Evidence: {self.evidence}"
)
# ──────────────────────────────────────────────
# 1. Judge Profile — defines differentiated roles
# ──────────────────────────────────────────────
class ExpertiseDomain(Enum):
TECHNICAL = "technical"
BUSINESS = "business"
RISK = "risk"
GENERAL = "general"
@dataclass
class JudgeProfile:
"""Defines a judge's expertise domain and scoring preferences"""
name: str
domain: ExpertiseDomain
# Custom dimension weights for this judge
dimension_weights: dict = field(default_factory=lambda: {
"logic": 0.30, "evidence": 0.30,
"responsiveness": 0.25, "honesty": 0.15
})
# Historical accuracy tracking
historical_accuracy: float = 1.0 # defaults to 1.0 (uncalibrated)
calibrations_completed: int = 0
total_correct: int = 0
def get_system_prompt(self, topic: str) -> str:
"""Generate a role-specific system prompt"""
base = (
f"You are a strictly impartial debate judge. Your area of "
f"expertise is [{self._domain_label()}].\n"
f"Topic: \"{topic}\"\n\n"
)
domain_instructions = {
ExpertiseDomain.TECHNICAL: (
"You focus especially on technical feasibility and "
"architectural soundness. You scrutinize technical "
"details and implementation paths. You do not accept "
"vague technical promises without concrete "
"implementation plans."
),
ExpertiseDomain.BUSINESS: (
"You focus especially on business soundness and "
"cost-benefit analysis. You rigorously examine ROI "
"data, market arguments, and resource efficiency. "
"You do not accept business claims without "
"quantitative support."
),
ExpertiseDomain.RISK: (
"You focus especially on risks and boundary conditions. "
"You actively look for overlooked risk factors, "
"assumptions, and failure modes. Your concern is not "
"\"the optimal solution under ideal conditions,\" but "
"\"whether the worst case is acceptable.\""
),
ExpertiseDomain.GENERAL: (
"You perform a comprehensive evaluation, balancing "
"technical, business, and risk factors. You focus on "
"overall debate quality and argument completeness."
),
}
base += domain_instructions[self.domain]
base += (
"\n\n### Scoring Rules\n"
"Score each opening argument on these four dimensions "
"(1-10 integer):\n"
f"1. logic_score: Is the reasoning chain self-consistent? "
f"(Your weight for this dimension: "
f"{self.dimension_weights['logic']})\n"
f"2. evidence_score: Is evidence specific and verifiable? "
f"(Weight: {self.dimension_weights['evidence']})\n"
f"3. responsiveness_score: Response quality to "
f"cross-examination? "
f"(Weight: {self.dimension_weights['responsiveness']})\n"
f"4. honesty_score: Honesty level? "
f"(Weight: {self.dimension_weights['honesty']})\n\n"
"For each argument, also provide a final standing: "
"UPHELD | PARTIALLY_UPHELD | REFUTED | UNCERTAIN\n\n"
"### Output Format\n"
"Output strictly as the following JSON, no other text:\n"
'{\n'
' "scores": [\n'
' {\n'
' "argument_id": "PRO-1",\n'
' "logic_score": 8,\n'
' "evidence_score": 7,\n'
' "responsiveness_score": 6,\n'
' "honesty_score": 8,\n'
' "standing": "PARTIALLY_UPHELD",\n'
' "notes": "Brief comment"\n'
' }\n'
' ],\n'
' "overall": {\n'
' "pro_total_raw": 0.0,\n'
' "con_total_raw": 0.0,\n'
' "key_finding": "Most important finding (1-2 sentences)"\n'
' }\n'
'}'
)
return base
def _domain_label(self) -> str:
labels = {
ExpertiseDomain.TECHNICAL: "Technical Implementation",
ExpertiseDomain.BUSINESS: "Business Analysis",
ExpertiseDomain.RISK: "Risk Assessment",
ExpertiseDomain.GENERAL: "General Evaluation",
}
return labels[self.domain]
# ──────────────────────────────────────────────
# 2. Score Calibrator
# ──────────────────────────────────────────────
class ScoreCalibrator:
"""Normalizes raw scores across multiple judges"""
@staticmethod
def z_score_normalize(scores: list[float]) -> list[float]:
"""
Z-Score normalization: center at 0, scale to unit variance.
If all scores are identical (std = 0), returns all zeros.
"""
if len(scores) < 2:
return [0.0] * len(scores)
mean = np.mean(scores)
std = np.std(scores, ddof=1) # sample standard deviation
if std == 0:
return [0.0] * len(scores)
return [(s - mean) / std for s in scores]
@staticmethod
def minmax_normalize(scores: list[float]) -> list[float]:
"""
Min-Max normalization: compress to [0, 1] interval.
If all scores are identical, returns all 0.5.
"""
if len(scores) < 2:
return [0.5] * len(scores)
min_val = min(scores)
max_val = max(scores)
if max_val == min_val:
return [0.5] * len(scores)
return [(s - min_val) / (max_val - min_val) for s in scores]
@classmethod
def calibrate_all(
cls,
judge_raw_scores: dict[str, list[float]],
method: str = "zscore"
) -> dict[str, list[float]]:
"""
Calibrate raw scores for all judges.
Args:
judge_raw_scores: {judge_name: [raw scores for all arguments]}
method: "zscore" or "minmax"
Returns:
{judge_name: [calibrated scores]}
"""
calibrator = (
cls.z_score_normalize if method == "zscore"
else cls.minmax_normalize
)
return {
judge_name: calibrator(scores)
for judge_name, scores in judge_raw_scores.items()
}
# ──────────────────────────────────────────────
# 3. Weighted Voter
# ──────────────────────────────────────────────
class WeightedVoter:
"""Computes weighted scores using domain relevance + historical accuracy"""
# Debate topic type → domain weights for each judge role
TOPIC_WEIGHTS = {
"architecture": {
ExpertiseDomain.TECHNICAL: 0.35,
ExpertiseDomain.BUSINESS: 0.20,
ExpertiseDomain.RISK: 0.25,
ExpertiseDomain.GENERAL: 0.20,
},
"business": {
ExpertiseDomain.TECHNICAL: 0.20,
ExpertiseDomain.BUSINESS: 0.40,
ExpertiseDomain.RISK: 0.20,
ExpertiseDomain.GENERAL: 0.20,
},
"security": {
ExpertiseDomain.TECHNICAL: 0.20,
ExpertiseDomain.BUSINESS: 0.15,
ExpertiseDomain.RISK: 0.45,
ExpertiseDomain.GENERAL: 0.20,
},
"default": {
ExpertiseDomain.TECHNICAL: 0.25,
ExpertiseDomain.BUSINESS: 0.25,
ExpertiseDomain.RISK: 0.25,
ExpertiseDomain.GENERAL: 0.25,
},
}
# Keywords → topic type mapping
TOPIC_KEYWORDS = {
"architecture": [
"architecture", "tech stack", "microservice", "monolith",
"framework", "database", "kubernetes", "container",
"deployment", "scalability"
],
"business": [
"ROI", "cost", "revenue", "business", "pricing",
"market", "investment", "profit", "budget", "valuation"
],
"security": [
"security", "compliance", "risk", "privacy", "data protection",
"incident", "vulnerability", "encryption", "audit",
"regulatory"
],
}
@classmethod
def detect_topic_type(cls, topic: str) -> str:
"""Auto-detect debate topic type from keywords"""
topic_lower = topic.lower()
scores = {}
for topic_type, keywords in cls.TOPIC_KEYWORDS.items():
scores[topic_type] = sum(
1 for kw in keywords if kw.lower() in topic_lower
)
best = max(scores, key=scores.get)
return best if scores[best] > 0 else "default"
@classmethod
def get_domain_weight(
cls, profile: JudgeProfile, topic: str
) -> float:
"""
Compute a judge's domain relevance weight for a given topic.
Returns:
Float between 0.0 and 1.0, combining domain relevance (70%)
and historical accuracy (30%).
"""
topic_type = cls.detect_topic_type(topic)
weights = cls.TOPIC_WEIGHTS.get(
topic_type, cls.TOPIC_WEIGHTS["default"]
)
domain_weight = weights.get(profile.domain, 0.20)
# Incorporate historical accuracy (if calibration data exists)
accuracy_weight = profile.historical_accuracy
if profile.calibrations_completed == 0:
accuracy_weight = 1.0 # no history → no adjustment
# Final weight = 0.7 × domain relevance + 0.3 × historical accuracy
return 0.7 * domain_weight + 0.3 * accuracy_weight
@classmethod
def compute_weighted_scores(
cls,
judges: list[JudgeProfile],
calibrated_scores: dict[str, dict[str, float]],
topic: str
) -> dict:
"""
Compute weighted scores across all judges.
Returns:
{"pro": pro_weighted_total, "con": con_weighted_total,
"pro_details": {...}, "con_details": {...}}
"""
judge_name_to_profile = {j.name: j for j in judges}
pro_weighted = 0.0
con_weighted = 0.0
total_weight = 0.0
pro_details = defaultdict(float)
con_details = defaultdict(float)
for judge_name, arg_scores in calibrated_scores.items():
profile = judge_name_to_profile.get(judge_name)
if not profile:
continue
weight = cls.get_domain_weight(profile, topic)
for arg_id, score in arg_scores.items():
weighted = score * weight
total_weight += weight
if arg_id.startswith("PRO"):
pro_weighted += weighted
pro_details[judge_name] += weighted
elif arg_id.startswith("CON"):
con_weighted += weighted
con_details[judge_name] += weighted
# Normalize
if total_weight > 0:
pro_weighted /= total_weight
con_weighted /= total_weight
return {
"pro": round(pro_weighted, 3),
"con": round(con_weighted, 3),
"pro_details": dict(pro_details),
"con_details": dict(con_details),
"topic_type": WeightedVoter.detect_topic_type(topic),
}
# ──────────────────────────────────────────────
# 4. Consensus Calculator
# ──────────────────────────────────────────────
class ConsensusCalculator:
"""Computes Krippendorff's Alpha and Fleiss' Kappa"""
@staticmethod
def krippendorff_alpha(
reliability_data: list[list[float]],
metric: str = "interval"
) -> float:
"""
Compute Krippendorff's Alpha.
Args:
reliability_data: Matrix of shape (n_judges, n_items).
Each row is one judge's scores for all arguments.
NaN indicates missing values.
metric: Distance metric.
"interval" - squared difference (for 1-10 interval scores)
"nominal" - equality (for categorical data)
Returns:
Alpha value. 1.0 = perfect agreement, 0 = chance agreement,
can be negative.
"""
data = np.array(reliability_data, dtype=float)
n_raters, n_items = data.shape
if n_raters < 2 or n_items < 2:
return 1.0
# Distance function
if metric == "nominal":
def delta(a, b):
return 0.0 if a == b else 1.0
else: # interval
def delta(a, b):
return (a - b) ** 2
# Compute observed disagreement D_o
D_o = 0.0
n_pairs = 0
for i in range(n_items):
for r1 in range(n_raters):
if np.isnan(data[r1, i]):
continue
for r2 in range(r1 + 1, n_raters):
if np.isnan(data[r2, i]):
continue
D_o += delta(data[r1, i], data[r2, i])
n_pairs += 1
if n_pairs == 0:
return 1.0
D_o /= n_pairs
# Compute expected disagreement D_e (average over all value pairs)
all_values = []
for i in range(n_items):
for r in range(n_raters):
if not np.isnan(data[r, i]):
all_values.append(data[r, i])
if len(all_values) < 2:
return 1.0
D_e = 0.0
n_val_pairs = 0
for v1 in all_values:
for v2 in all_values:
D_e += delta(v1, v2)
n_val_pairs += 1
if n_val_pairs == 0 or D_e == 0:
return 1.0 if D_o == 0 else 0.0
D_e /= n_val_pairs
alpha = 1.0 - (D_o / D_e)
return round(alpha, 4)
@staticmethod
def fleiss_kappa(
classifications: list[list[str]]
) -> float:
"""
Compute Fleiss' Kappa (for categorical data).
Args:
classifications: Matrix of shape (n_items, n_raters).
Each row is one argument, each column is one judge's
classification.
E.g.: [["UPHELD", "UPHELD", "PARTIALLY_UPHELD"], ...]
Returns:
Kappa value.
"""
# Transpose to (n_raters, n_items) for easier reading
data = np.array(classifications, dtype=str).T
n_raters, n_items = data.shape
if n_raters < 2 or n_items < 2:
return 1.0
# Collect all categories
all_categories = sorted(set(
c for row in classifications for c in row
))
n_categories = len(all_categories)
if n_categories < 2:
return 1.0
cat_to_idx = {c: i for i, c in enumerate(all_categories)}
# count_matrix[i, j] = number of raters who assigned
# argument i to category j
count_matrix = np.zeros((n_items, n_categories))
for i in range(n_items):
for r in range(n_raters):
j = cat_to_idx[data[r, i]]
count_matrix[i, j] += 1
# P_i: agreement among raters on argument i
P_i = np.zeros(n_items)
for i in range(n_items):
total = 0.0
for j in range(n_categories):
total += count_matrix[i, j] * (count_matrix[i, j] - 1)
if n_raters > 1:
P_i[i] = total / (n_raters * (n_raters - 1))
# P_bar: mean agreement across all arguments
P_bar = np.mean(P_i)
# p_j: overall proportion of each category
p_j = np.sum(count_matrix, axis=0) / (n_items * n_raters)
# P_e: expected agreement
P_e = np.sum(p_j ** 2)
if P_e >= 1.0:
return 1.0 if P_bar >= 1.0 else 0.0
kappa = (P_bar - P_e) / (1.0 - P_e)
return round(kappa, 4)
@staticmethod
def detect_irreconcilable(
alpha: float,
kappa: float,
per_arg_variances: dict[str, float],
weighted_scores: dict,
alpha_threshold: float = 0.50,
kappa_threshold: float = 0.40,
variance_threshold: float = 3.0,
score_gap_threshold: float = 0.05
) -> dict:
"""
Detect irreconcilable disagreement among judges.
Returns:
{
"irreconcilable": bool,
"reasons": [...],
"high_variance_args": [...],
"recommendation": str
}
"""
reasons = []
high_variance_args = [
arg_id for arg_id, var in per_arg_variances.items()
if var >= variance_threshold
]
score_gap = abs(
weighted_scores.get("pro", 0) -
weighted_scores.get("con", 0)
)
irreconcilable = False
if alpha < alpha_threshold and kappa < kappa_threshold:
irreconcilable = True
reasons.append(
f"Alpha ({alpha}) and Kappa ({kappa}) both below "
f"threshold — severe disagreement on both numeric "
f"scoring and categorical classification"
)
elif alpha < alpha_threshold:
reasons.append(
f"Alpha ({alpha}) below threshold ({alpha_threshold}) "
f"— insufficient scoring consistency"
)
elif kappa < kappa_threshold:
reasons.append(
f"Kappa ({kappa}) below threshold ({kappa_threshold}) "
f"— insufficient classification consistency"
)
if high_variance_args:
reasons.append(
f"Arguments with excessive inter-judge variance: "
f"{', '.join(high_variance_args)}"
)
irreconcilable = True
if score_gap < score_gap_threshold:
reasons.append(
f"Weighted score gap ({score_gap:.3f}) below threshold "
f"({score_gap_threshold}) — both sides too close to call"
)
irreconcilable = True
if not irreconcilable:
recommendation = (
"The judge panel reached acceptable consensus. You may "
"decide based on weighted scores, but manual review of "
"high-variance arguments is recommended."
)
else:
recommendation = (
"The judge panel has irreconcilable disagreement. "
"Recommendations: (1) manually review high-variance "
"arguments; (2) acquire additional data or external "
"information; (3) consult human domain experts. "
"Do not base critical decisions on this debate result "
"until the disagreement is resolved."
)
return {
"irreconcilable": irreconcilable,
"reasons": reasons,
"high_variance_args": high_variance_args,
"alpha": alpha,
"kappa": kappa,
"score_gap": round(score_gap, 4),
"recommendation": recommendation,
}
# ──────────────────────────────────────────────
# 5. Multi-Judge Panel — orchestrates all judges
# ──────────────────────────────────────────────
@dataclass
class PanelResult:
"""Output of the multi-judge panel"""
raw_scores: dict[str, list[dict]] = field(default_factory=dict)
calibrated_scores: dict[str, dict[str, float]] = field(
default_factory=dict
)
weighted_result: dict = field(default_factory=dict)
alpha: float = 0.0
kappa: float = 0.0
divergence: dict = field(default_factory=dict)
per_arg_stats: dict[str, dict] = field(default_factory=dict)
class MultiJudgePanel:
"""
Multi-Judge Panel — manages multiple differentiated judges.
Independent scoring → Calibration → Weighted voting
→ Consensus calculation → Divergence detection.
"""
def __init__(self, judges: list[JudgeProfile]):
"""
Args:
judges: List of judge profiles, at least 2 required
"""
if len(judges) < 2:
raise ValueError("Multi-judge panel requires at least 2 judges")
self.judges = judges
self.calibrator = ScoreCalibrator()
self.voter = WeightedVoter()
self.consensus = ConsensusCalculator()
def _single_judge_evaluate(
self, profile: JudgeProfile,
pro_args: list[Argument],
con_args: list[Argument],
pro_cross_text: str,
con_cross_text: str,
pro_closing: str,
con_closing: str,
topic: str
) -> dict:
"""Call the LLM for a single judge to evaluate the debate."""
user_prompt = (
f"## Topic\n{topic}\n\n"
f"## Pro Opening Arguments\n" +
"\n\n".join(a.to_text() for a in pro_args) +
f"\n\n## Con Opening Arguments\n" +
"\n\n".join(a.to_text() for a in con_args) +
f"\n\n## Pro Cross-Examination\n{pro_cross_text}\n\n"
f"## Con Cross-Examination\n{con_cross_text}\n\n"
f"## Pro Closing\n{pro_closing}\n\n"
f"## Con Closing\n{con_closing}\n\n"
f"Please output your evaluation in the JSON format "
f"specified in the system prompt."
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system",
"content": profile.get_system_prompt(topic)},
{"role": "user", "content": user_prompt}
],
temperature=0.2,
max_tokens=3000
)
reply = response.choices[0].message.content
import re
cleaned = re.sub(r'```(?:json)?\s*', '', reply).strip()
try:
return json.loads(cleaned)
except json.JSONDecodeError:
return {
"error": "JSON parse failed",
"raw_response": reply,
"scores": [],
"overall": {"pro_total_raw": 0, "con_total_raw": 0}
}
def evaluate(
self,
topic: str,
pro_args: list[Argument],
con_args: list[Argument],
pro_cross_text: str,
con_cross_text: str,
pro_closing: str,
con_closing: str
) -> PanelResult:
"""Execute the complete multi-judge evaluation pipeline."""
result = PanelResult()
# ── Phase 1: Independent Scoring ──
print(f"\n{'=' * 60}")
print(f"Multi-Judge Panel — {len(self.judges)} judges "
f"evaluating independently")
print(f"{'=' * 60}")
for judge in self.judges:
print(f"\n {judge.name} ({judge._domain_label()}) "
f"evaluating...")
evaluation = self._single_judge_evaluate(
judge, pro_args, con_args,
pro_cross_text, con_cross_text,
pro_closing, con_closing, topic
)
result.raw_scores[judge.name] = evaluation.get("scores", [])
# ── Phase 2: Score Calibration ──
print(f"\n{'─' * 60}")
print(f"Phase 2: Score Calibration (Z-Score Normalization)")
print(f"{'─' * 60}")
# Extract each judge's weighted average per argument
raw_per_judge = {}
for judge in self.judges:
scores = result.raw_scores.get(judge.name, [])
if not scores:
continue
dw = judge.dimension_weights
raw_per_judge[judge.name] = []
for s in scores:
weighted = (
s.get("logic_score", 5) * dw["logic"] +
s.get("evidence_score", 5) * dw["evidence"] +
s.get("responsiveness_score", 5) * dw["responsiveness"]
+
s.get("honesty_score", 5) * dw["honesty"]
)
raw_per_judge[judge.name].append(weighted)
# Collect all argument IDs (in order)
all_arg_ids = []
for judge in self.judges:
for s in result.raw_scores.get(judge.name, []):
aid = s.get("argument_id", "")
if aid and aid not in all_arg_ids:
all_arg_ids.append(aid)
# Z-Score normalize each judge's scores
calibrated = self.calibrator.calibrate_all(raw_per_judge)
# Map to argument IDs
result.calibrated_scores = {}
for judge in self.judges:
jname = judge.name
if jname not in calibrated or jname not in raw_per_judge:
continue
cal_list = calibrated[jname]
j_scores = result.raw_scores.get(jname, [])
result.calibrated_scores[jname] = {}
for i, s in enumerate(j_scores):
aid = s.get("argument_id", f"UNKNOWN-{i}")
result.calibrated_scores[jname][aid] = (
cal_list[i] if i < len(cal_list) else 0.0
)
# ── Phase 3: Weighted Voting ──
print(f"\n{'─' * 60}")
print(f"Phase 3: Weighted Voting")
print(f"{'─' * 60}")
result.weighted_result = self.voter.compute_weighted_scores(
self.judges, result.calibrated_scores, topic
)
print(f" Topic type: {result.weighted_result['topic_type']}")
for j in self.judges:
w = self.voter.get_domain_weight(j, topic)
print(f" {j.name}: final weight = {w:.3f}")
# ── Phase 4: Consensus Calculation ──
print(f"\n{'─' * 60}")
print(f"Phase 4: Consensus Metrics")
print(f"{'─' * 60}")
# Build Krippendorff's Alpha data matrix (n_judges, n_items)
alpha_data = []
for judge in self.judges:
jname = judge.name
row = []
for aid in all_arg_ids:
row.append(
result.calibrated_scores.get(jname, {}).get(aid, np.nan)
)
alpha_data.append(row)
result.alpha = self.consensus.krippendorff_alpha(
alpha_data, metric="interval"
)
# Build Fleiss' Kappa classification data (n_items, n_raters)
kappa_data = []
for i, aid in enumerate(all_arg_ids):
standings = []
for judge in self.judges:
j_scores = result.raw_scores.get(judge.name, [])
if i < len(j_scores):
standings.append(
j_scores[i].get("standing", "UNCERTAIN")
)
else:
standings.append("UNCERTAIN")
kappa_data.append(standings)
result.kappa = self.consensus.fleiss_kappa(kappa_data)
print(f" Krippendorff's Alpha: {result.alpha}")
print(f" Fleiss' Kappa: {result.kappa}")
# ── Phase 5: Divergence Detection ──
print(f"\n{'─' * 60}")
print(f"Phase 5: Divergence Detection")
print(f"{'─' * 60}")
# Compute per-argument score variance (across judges)
per_arg_var = {}
for i, aid in enumerate(all_arg_ids):
vals = []
for judge in self.judges:
jname = judge.name
val = result.calibrated_scores.get(jname, {}).get(aid)
if val is not None and not (
isinstance(val, float) and math.isnan(val)
):
vals.append(val)
if len(vals) >= 2:
per_arg_var[aid] = float(np.var(vals, ddof=1))
else:
per_arg_var[aid] = 0.0
result.divergence = self.consensus.detect_irreconcilable(
alpha=result.alpha,
kappa=result.kappa,
per_arg_variances=per_arg_var,
weighted_scores=result.weighted_result,
)
flag = "YES" if result.divergence['irreconcilable'] else "NO"
print(f" Irreconcilable: ⚠️ {flag}")
for reason in result.divergence.get("reasons", []):
print(f" - {reason}")
# ── Phase 6: Per-Argument Statistics ──
result.per_arg_stats = {}
for i, aid in enumerate(all_arg_ids):
standings = []
for judge in self.judges:
j_scores = result.raw_scores.get(judge.name, [])
if i < len(j_scores):
standings.append(
j_scores[i].get("standing", "UNCERTAIN")
)
counter = Counter(standings)
result.per_arg_stats[aid] = {
"variance": per_arg_var.get(aid, 0.0),
"standings": dict(counter),
"majority": counter.most_common(1)[0][0]
if counter else "UNCERTAIN",
}
return result
def print_report(self, result: PanelResult, topic: str):
"""Print a human-readable comprehensive report."""
print(f"\n{'=' * 60}")
print(f"Multi-Judge Comprehensive Report")
print(f"{'=' * 60}")
print(f"\nTopic: {topic}")
print(f"Judges: {len(self.judges)}")
print(f"Topic type: {result.weighted_result.get('topic_type', 'N/A')}")
print(f"\n── Consensus Metrics ──")
a_label = (
"High agreement" if result.alpha >= 0.80
else "Needs attention" if result.alpha < 0.67
else "Moderate agreement"
)
k_label = (
"High agreement" if result.kappa >= 0.80
else "Needs attention" if result.kappa < 0.67
else "Moderate agreement"
)
print(f" Krippendorff's Alpha: {result.alpha} ({a_label})")
print(f" Fleiss' Kappa: {result.kappa} ({k_label})")
print(f"\n── Weighted Scores ──")
print(f" Pro: {result.weighted_result.get('pro', 'N/A')}")
print(f" Con: {result.weighted_result.get('con', 'N/A')}")
gap = abs(
result.weighted_result.get("pro", 0) -
result.weighted_result.get("con", 0)
)
print(f" Gap: {gap:.3f}")
print(f"\n── Divergence Status ──")
flag = "YES" if result.divergence.get('irreconcilable') else "NO"
print(f" Irreconcilable: ⚠️ {flag}")
for reason in result.divergence.get("reasons", []):
print(f" - {reason}")
print(f" Recommendation: "
f"{result.divergence.get('recommendation', 'N/A')}")
# ──────────────────────────────────────────────
# 6. Usage Example
# ──────────────────────────────────────────────
def run_consensus_debate(topic: str) -> PanelResult:
"""
Run a complete debate with multi-judge consensus calculation.
This function assumes you already have debate records from
L2's debate_protocol.py. It uses mock data here to demo the
judge panel workflow.
"""
# ── Create judge panel ──
judges = [
JudgeProfile(
name="Technical Judge",
domain=ExpertiseDomain.TECHNICAL,
dimension_weights={
"logic": 0.40, "evidence": 0.35,
"responsiveness": 0.15, "honesty": 0.10
}
),
JudgeProfile(
name="Business Judge",
domain=ExpertiseDomain.BUSINESS,
dimension_weights={
"logic": 0.20, "evidence": 0.40,
"responsiveness": 0.20, "honesty": 0.20
}
),
JudgeProfile(
name="Risk Judge",
domain=ExpertiseDomain.RISK,
dimension_weights={
"logic": 0.15, "evidence": 0.20,
"responsiveness": 0.35, "honesty": 0.30
}
),
JudgeProfile(
name="General Judge",
domain=ExpertiseDomain.GENERAL,
dimension_weights={
"logic": 0.30, "evidence": 0.30,
"responsiveness": 0.25, "honesty": 0.15
}
),
]
# ── Create panel ──
panel = MultiJudgePanel(judges)
# ── Prepare debate data (mock — in practice, get this from
# L2's run_structured_debate()) ──
pro_args = [
Argument("PRO-1", "Independent deployment shortens release cycles",
"Microservices allow independent build/test/deploy, "
"avoiding monolithic full-deployment bottlenecks",
"Benchmarks: monolithic 3.8h vs microservices 0.7h"),
Argument("PRO-2", "Team tech stack flexibility improves",
"Each service can independently choose the best tech stack",
"Startup case: core in Go + analytics in Python"),
Argument("PRO-3", "Fault isolation reduces system risk",
"Single service failure doesn't affect others",
"AWS practice: blast radius reduced from full cluster "
"to single service"),
]
con_args = [
Argument("CON-1", "Operational complexity increases significantly",
"Microservices introduce inherent distributed system "
"complexity: network latency, service discovery, "
"distributed transactions",
"Research shows ops cost increases 40-60%"),
Argument("CON-2", "Team cognitive load is too high",
"A 10-person team maintaining 8+ services — each "
"developer must understand multiple service interactions",
"Small team survey: efficiency drops after >5 services"),
Argument("CON-3", "Initial development velocity decreases",
"Microservices require additional infrastructure setup "
"and DevOps investment",
"Startups typically see ROI only after 6-12 months"),
]
pro_cross_text = (
"To CON-1: Challenge — do the ops cost figures account for "
"modern container orchestration automation?\n"
"To CON-2: Partial — cognitive load exists but can be "
"mitigated by unified API gateways and documentation\n"
"To CON-3: Concede — initial velocity does drop, but "
"long-term gains justify the investment"
)
con_cross_text = (
"To PRO-1: Challenge — benchmark conditions are idealized, "
"ignoring network latency and CI/CD pipeline time\n"
"To PRO-2: Challenge — tech stack diversity in small teams "
"actually increases hiring and maintenance burden\n"
"To PRO-3: Refute — fault isolation has costs; distributed "
"systems introduce new failure modes"
)
pro_closing = (
"We acknowledge microservices' shortcomings in operational "
"complexity and initial velocity. But our core position stands: "
"for startups expecting long-term growth, microservices' "
"independent deployment and fault isolation advantages "
"win in the long run."
)
con_closing = (
"Pro failed to effectively address the core challenges of "
"operational cost and team cognitive load. For teams under 10, "
"microservices introduce complexity disproportionate to team "
"size. We recommend starting with a modular monolith and "
"splitting only when the team and business have grown "
"to a necessary scale."
)
# ── Run multi-judge evaluation ──
result = panel.evaluate(
topic=topic,
pro_args=pro_args,
con_args=con_args,
pro_cross_text=pro_cross_text,
con_cross_text=con_cross_text,
pro_closing=pro_closing,
con_closing=con_closing,
)
# ── Print report ──
panel.print_report(result, topic)
return result
# ──────────────────────────────────────────────
# 7. Helper: calibrate judge accuracy
# ──────────────────────────────────────────────
def update_judge_accuracy(
profile: JudgeProfile,
ground_truth: str, # "PRO" | "CON" | "TIE"
judge_vote: str # "PRO" | "CON" | "TIE"
):
"""
Update a judge's historical accuracy based on known ground truth.
Only use for calibration debates with known correct answers.
"""
profile.calibrations_completed += 1
if judge_vote == ground_truth:
profile.total_correct += 1
profile.historical_accuracy = (
profile.total_correct / profile.calibrations_completed
)
# ──────────────────────────────────────────────
# 8. LLM-free statistical test (quick algorithm verification)
# ──────────────────────────────────────────────
def test_consensus_without_llm():
"""Verify consensus algorithms with mock data — no LLM needed."""
print("=" * 60)
print("Statistical Test — Verifying Consensus Algorithms (no LLM)")
print("=" * 60)
# Mock: 4 judges × 6 arguments
mock_scores = [
[7.5, 8.0, 6.5, 4.0, 3.5, 5.0], # Judge 1
[8.0, 8.5, 7.0, 3.5, 3.0, 4.5], # Judge 2
[6.0, 7.0, 5.5, 5.0, 4.5, 6.0], # Judge 3 (more divergent)
[np.nan, 8.0, 6.0, 4.0, np.nan, 5.0], # Judge 4 (with missing)
]
calc = ConsensusCalculator()
alpha = calc.krippendorff_alpha(mock_scores)
print(f"\nKrippendorff's Alpha (mock data): {alpha}")
print(f"Expected: 0.70-0.90 range (moderate divergence)")
# Mock classification data
mock_classifications = [
["UPHELD", "UPHELD", "PARTIALLY_UPHELD",
"REFUTED", "REFUTED", "PARTIALLY_UPHELD"],
["UPHELD", "UPHELD", "UPHELD",
"REFUTED", "REFUTED", "REFUTED"],
["UPHELD", "PARTIALLY_UPHELD", "PARTIALLY_UPHELD",
"PARTIALLY_UPHELD", "REFUTED", "UNCERTAIN"],
["UPHELD", "UPHELD", "PARTIALLY_UPHELD",
"REFUTED", "REFUTED", "REFUTED"],
]
# Fleiss' Kappa expects (n_items, n_raters)
kappa_data = list(zip(*mock_classifications))
kappa_data = [list(row) for row in kappa_data]
kappa = calc.fleiss_kappa(kappa_data)
print(f"\nFleiss' Kappa (mock data): {kappa}")
print(f"Expected: 0.60-0.90 range (mostly agree, some diverge)")
# Test calibration
cal = ScoreCalibrator()
raw = {
"JudgeA": [5.0, 6.0, 7.0, 4.0, 3.0, 5.0],
"JudgeB": [8.0, 9.0, 10.0, 7.0, 6.0, 8.0],
}
calibrated = cal.calibrate_all(raw, method="zscore")
print(f"\nZ-Score Calibration:")
for name, scores in calibrated.items():
print(f" {name}: {[round(s, 3) for s in scores]}")
print(f" Expected: both judges' Z-Score distributions should be "
f"nearly identical")
if __name__ == "__main__":
# Run statistical test first (no LLM, no API needed)
test_consensus_without_llm()
print(f"\n{'=' * 60}")
print(f"To run a full multi-judge debate evaluation, call "
f"run_consensus_debate()")
print(f"Requires valid API credentials "
f"(your-api-key + api.example.com)")
print(f"{'=' * 60}")
# Uncomment to run full evaluation:
# result = run_consensus_debate(
# topic="Should a small startup (under 10 people) "
# "adopt microservices architecture from day one?"
# )
# with open("/tmp/consensus_debate_result.json", "w") as f:
# json.dump(result, f, ensure_ascii=False, indent=2, default=str)
Compared to L2's debate_protocol.py, L3 adds these core components:
| Component | Function | Key Methods |
|---|---|---|
JudgeProfile |
Defines judge role, expertise domain, scoring preferences, historical accuracy | get_system_prompt() — generates role-specific prompts |
ScoreCalibrator |
Z-Score / Min-Max normalization to eliminate scoring habit differences | calibrate_all() — batch-calibrates all judges' scores |
WeightedVoter |
Dual-layer weighting: domain relevance + historical accuracy; auto topic-type detection | detect_topic_type() / compute_weighted_scores() |
ConsensusCalculator |
Krippendorff's Alpha + Fleiss' Kappa + irreconcilable divergence detection | krippendorff_alpha() / fleiss_kappa() / detect_irreconcilable() |
MultiJudgePanel |
Orchestrates full pipeline: independent scoring → calibration → weighting → consensus → divergence | evaluate() — complete 5-phase evaluation pipeline |
Argument, etc.), we redefine them here so debate_consensus.py can run standalone. In a real project, you should import these types from debate_protocol.py rather than redefining them. The correct import is noted in the code comments.
Connecting L1, L2, and L3:
You can adopt progressively: try L1 first; if depth is insufficient, upgrade to L2; if single-judge conclusions feel unreliable, upgrade to L3.
📎 Series note: This is article 3 of the Multi-Agent Debate series. Recommended reading order: L1: Adversarial Collaboration Intro → L2: Structured Debate Protocol → This article (L3). The next article (L4) will explore production deployment and real-world applications.
📖 Next: Multi-Agent Debate System: Production Deployment — real-world use cases, system architecture, performance optimization (coming soon)