想象一下:早上 9:29。还有六十秒开盘。你手里有昨天的通胀数据、亚洲隔夜期货、中午的美联储讲话、以及向不同方向尖叫的板块轮动信号。你的直觉说一件事。头条说另一件事。而你那个单次 LLM 查询——「分析市场」——刚给你返回了一段平淡、两边下注的总结,跟任何财经新闻机器人都能写出来的东西无异。
现在想象另一个画面:八个专业化的 AI Agent——四个看涨,四个看跌——每个都装备了不同的分析视角,同时撕开同一份数据。技术分析师在解剖 VIX 期限结构。基本面分析师在计算盈利收益率差。宏观策略师在分析收益率曲线动态。情绪追踪师在解析资金流向。他们在三轮结构化辩论中互相揭露盲点。然后一个裁判 Agent——对结果没有利益牵扯——将他们的冲突综合成一份不含糊的分析,附带明确的推理链条。
这不是科幻小说。这就是我们在这个系列中要构建的东西。
这是一个新系列的第一篇文章:多 Agent 辩论 × 市场分析。我们将 L1-L4 辩论系统的理论框架(对抗协作、结构化协议、多裁判共识)应用到 AI 推理中最困难的领域:金融市场。市场本质上是对抗性的——每一个买家都需要一个卖家,每一个论点都有对立的论点。如果说有哪个领域天生需要辩论架构,那就是这里。
读完本文,你将得到:一个 8 Agent + 裁判辩论系统的清晰架构、一个从免费 API(Yahoo Finance、FRED)拉取真实市场数据的完整数据管道、以及一个你今天就能运行的可执行 Python 模块。更重要的是,你将理解为什么做了每个架构选择——而不只是代码做了什么。
在写任何一行代码之前,让我们面对那个区分真实系统与玩具的问题:为什么市场分析特别需要多 Agent 辩论架构?
问任何 LLM「你对标普 500 的展望如何?」,你会得到这样的回答:
"标普 500 面临复杂的展望。一方面,强劲的盈利增长和 AI 驱动的生产力提升提供了上行潜力。另一方面,持续的通胀、高企的估值和地缘政治风险构成逆风。投资者应保持平衡的配置……"
这就是对冲问题。一个被中性提示的单一 LLM,默认会两边都覆盖。它没错——它只是没用。模型没有任何利益相关,不为特定方向的错误负责,也没有机制来解决它刚刚识别出的矛盾。
但问题比对冲更深。单 Agent 分析有三个结构性缺陷:
这就是让辩论架构必需而非可选的洞察:市场,就其基本结构而言,是对抗系统。每笔交易都有两个持相反观点的对手方。价格发现本质上就是多头和空头通过持续的反对意见来收敛到清算价格的过程。
单 Agent 分析试图用合作推理过程来建模一个对抗系统。这就像试图让一个棋手"公平地"同时下两边来模拟一盘棋。它行不通——不是因为棋手不够聪明,而是因为对抗深度需要对抗过程。
你可能会想:如果 L1-L4 系列已经表明,即使两个 Agent 辩论也能提高可靠性,为什么我们需要八个?
因为市场是多维的。简单的多空辩论把所有市场因素坍缩成一个轴:涨还是跌。但真实市场有结构:
八个 Agent(4 多头 + 4 空头),沿两个轴组织——分析框架和时间维度——恰好给了我们这种阵营内多样性,同时保持了阵营间的对抗张力。
在深入数据管道之前,让我们先看整个系统的全景。架构如下:
┌─────────────────────────────────────────┐ │ 市场数据管道 │ │ ┌──────────┐ ┌──────────┐ ┌───────┐ │ │ │ Yahoo │ │ FRED │ │ 情绪 │ │ │ │ Finance │ │ 宏观 │ │ 数据 │ │ │ └────┬─────┘ └────┬─────┘ └───┬───┘ │ │ └──────────────┼────────────┘ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ 知识库 │ │ │ │ (标准化) │ │ │ └────────┬─────────┘ │ └────────────────────┼────────────────────┘ │ ┌──────────────────────────┼──────────────────────────┐ │ ▼ │ │ 辩论编排器 │ │ │ │ 第一轮:开场论据 │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ 4 个多头 AGENT │ │ 4 个空头 AGENT │ │ │ │ ┌────┐┌────┐┌────┐ │ │ ┌────┐┌────┐┌────┐ │ │ │ │ │技术││基本││宏观│ │ │ │技术││基本││宏观│ │ │ │ │ │ 🐂 ││ 🐂 ││ 🐂 │ │ │ │ 🐻 ││ 🐻 ││ 🐻 │ │ │ │ │ └────┘└────┘└────┘ │ │ └────┘└────┘└────┘ │ │ │ │ ┌────┐ │ │ ┌────┐ │ │ │ │ │情绪│ │ │ │情绪│ │ │ │ │ │ 🐂 │ │ │ │ 🐻 │ │ │ │ │ └────┘ │ │ └────┘ │ │ │ └─────────────────────┘ └─────────────────────┘ │ │ │ │ 第二轮:交叉质询 │ │ (每个 Agent 针对同一分析领域内的 │ │ 对手方进行挑战) │ │ │ │ 第三轮:总结陈词 │ │ (Agent 基于受到的批评完善论点 │ │ ——让步或加倍下注) │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ 裁判 AGENT ⚖️ │ │ │ │ 从辩论记录中综合 │ │ │ │ 最终分析报告 │ │ │ └──────────────────┘ │ └─────────────────────────────────────────────────────┘
辩论采用 3 轮协议,改编自 L2 的结构化辩论设计,但有一个关键升级:所有 8 个 Agent 在每轮内并行运行。
这就是架构变得有趣的地方。我们不只是把同一个提示词复制 8 份,然后改成"你是看涨的"和"你是看跌的"。每个 Agent 都有一个独特的分析视角——一套专门的方法论、数据源和推理框架。
这不是随意分配。专业化矩阵沿两个正交轴设计:
现在来看你能实际运行的部分。数据管道是原始市场现实与 Agent 可读分析上下文之间的桥梁。它的工作:从免费数据源拉取数据,标准化为结构化知识库,并为每个 Agent 的特定分析视角做切片。
yfinance
fredapi
管道输出一个结构化 JSON 对象——知识库——每个 Agent 都会收到(但每个 Agent 只读取与其分析视角相关的切片)。Schema 如下:
meta
indices
sectors
technicals
fundamentals
macro
sentiment
global
以下是完整的数据管道。保存为 market_data_pipeline.py。约 220 行,从 Yahoo Finance 和 FRED 拉取真实数据,输出结构化知识库。每个函数都有错误处理——如果某个 API 挂了,管道会优雅降级而不是崩溃。
market_data_pipeline.py
""" market_data_pipeline.py 多 Agent 辩论 × 市场分析 — 数据管道 ──────────────────────────────────── 从免费 API (Yahoo Finance, FRED) 拉取市场数据, 并结构化为辩论系统的知识库。 数据源: - Yahoo Finance (yfinance): 指数、行业 ETF、历史价格 - FRED (fredapi): 宏观经济指标 - 内部计算: 技术指标、衍生指标 输出: 结构化 JSON 知识库,供 8 个辩论 Agent 消费。 每个 Agent 只读取与其相关的切片。 依赖: pip install yfinance fredapi pandas numpy FRED API key (免费): https://fred.stlouisfed.org/docs/api/api_key.html """ import json import os import sys from dataclasses import asdict, dataclass, field from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd import yfinance as yf # 可选: FRED 宏观数据。如果 fredapi 未安装或无 key, # 宏观数据将标记为不可用而不是崩溃。 try: from fredapi import Fred FRED_AVAILABLE = True except ImportError: FRED_AVAILABLE = False # ═══════════════════════════════════════════════════════════ # 配置 # ═══════════════════════════════════════════════════════════ FRED_API_KEY=*** "your-fred-api-key-here") # — 追踪的指数 — INDICES: Dict[str, str] = { "SPX": "^GSPC", # 标普 500 "NASDAQ": "^IXIC", # 纳斯达克综合 "DJIA": "^DJI", # 道琼斯工业平均 "VIX": "^VIX", # CBOE 波动率指数 "HSI": "^HSI", # 恒生指数 "N225": "^N225", # 日经 225 "STOXX": "^STOXX50E", # 欧洲 STOXX 50 } # — 行业 ETF 用于广度分析 — SECTORS: Dict[str, str] = { "XLK": "科技", "XLF": "金融", "XLE": "能源", "XLV": "医疗健康", "XLI": "工业", "XLY": "可选消费", "XLP": "必需消费", "XLU": "公用事业", "XLB": "原材料", "XLRE": "房地产", } # — FRED 宏观序列 — MACRO_SERIES: Dict[str, str] = { "GDP": "GDP", # 国内生产总值 (季度) "CPI": "CPIAUCSL", # 消费者价格指数 (月度) "UNRATE": "UNRATE", # 失业率 (月度) "FEDFUNDS": "FEDFUNDS", # 有效联邦基金利率 (月度) "T10Y2Y": "T10Y2Y", # 10年-2年国债利差 (每日) "M2": "M2SL", # M2 货币供应量 (月度) "INDPRO": "INDPRO", # 工业产出指数 (月度) "RETAIL": "RSXFS", # 零售销售 (月度) "HOUST": "HOUST", # 新屋开工 (月度) "PAYEMS": "PAYEMS", # 非农就业总人数 (月度) } # — 技术指标参数 — TECHNICAL_CONFIG = { "ma_periods": [20, 50, 200], "rsi_period": 14, "macd_fast": 12, "macd_slow": 26, "macd_signal": 9, "atr_period": 14, }
(续——数据类定义)
# ═══════════════════════════════════════════════════════════ # 数据类 — 结构化知识库输出 # ═══════════════════════════════════════════════════════════ @dataclass class IndexSnapshot: """单个指数的快照""" ticker: str name: str price: float change_pct: float returns: Dict[str, float] = field(default_factory=dict) vs_52w_high_pct: float = 0.0 vs_52w_low_pct: float = 0.0 volume_ratio: float = 1.0 @dataclass class SectorSnapshot: """行业 ETF 的快照""" ticker: str name: str price: float change_5d_pct: float change_20d_pct: float relative_strength_vs_spx: float @dataclass class TechnicalSignals: """单个指数的技术指标""" ticker: str ma_status: Dict[str, str] = field(default_factory=dict) rsi_14: Optional[float] = None macd_signal: Optional[str] = None atr_14: Optional[float] = None volume_trend: Optional[str] = None @dataclass class MacroSnapshot: """宏观经济指标值""" indicator: str description: str latest_value: Optional[float] = None latest_date: Optional[str] = None yoy_change_pct: Optional[float] = None trend: Optional[str] = None @dataclass class KnowledgeBase: """辩论 Agent 的完整结构化知识库""" meta: Dict[str, Any] = field(default_factory=dict) indices: Dict[str, IndexSnapshot] = field(default_factory=dict) sectors: Dict[str, SectorSnapshot] = field(default_factory=dict) technicals: Dict[str, TechnicalSignals] = field(default_factory=dict) fundamentals: Dict[str, Any] = field(default_factory=dict) macro: Dict[str, MacroSnapshot] = field(default_factory=dict) sentiment: Dict[str, Any] = field(default_factory=dict) global_markets: Dict[str, IndexSnapshot] = field(default_factory=dict) # ═══════════════════════════════════════════════════════════ # 数据获取 # ═══════════════════════════════════════════════════════════ def fetch_index_data(ticker: str, name: str, period: str = "1y") -> Optional[IndexSnapshot]: """从 Yahoo Finance 获取指数价格数据。""" try: data = yf.download(ticker, period=period, progress=False) if data.empty: print(f" ⚠ 无数据 {ticker} ({name})", file=sys.stderr) return None close = data["Close"].squeeze() volume = data["Volume"].squeeze() current = float(close.iloc[-1]) prev = float(close.iloc[-2]) if len(close) > 1 else current change_pct = round((current - prev) / prev * 100, 2) returns = {} for label, days in [("5d", 5), ("20d", 20), ("50d", 50), ("200d", 200)]: if len(close) >= days: past = float(close.iloc[-(days + 1)]) returns[label] = round((current - past) / past * 100, 2) else: returns[label] = None high_52w = float(close.max()) low_52w = float(close.min()) vs_high = round((current - high_52w) / high_52w * 100, 2) vs_low = round((current - low_52w) / low_52w * 100, 2) avg_vol_20d = float(volume.tail(20).mean()) if len(volume) >= 20 else float(volume.mean()) vol_ratio = round(float(volume.iloc[-1]) / avg_vol_20d, 2) if avg_vol_20d > 0 else 1.0 return IndexSnapshot( ticker=ticker, name=name, price=current, change_pct=change_pct, returns=returns, vs_52w_high_pct=vs_high, vs_52w_low_pct=vs_low, volume_ratio=vol_ratio, ) except Exception as e: print(f" ✗ 获取 {ticker} 错误: {e}", file=sys.stderr) return None def compute_technical_signals(ticker: str, period: str = "6mo") -> Optional[TechnicalSignals]: """计算某个指数的技术指标。""" try: data = yf.download(ticker, period=period, progress=False) if data.empty: return None close = data["Close"].squeeze() volume = data["Volume"].squeeze() # 移动平均线 ma_status = {} for ma in TECHNICAL_CONFIG["ma_periods"]: if len(close) >= ma: ma_val = float(close.rolling(ma).mean().iloc[-1]) current = float(close.iloc[-1]) status = "above" if current > ma_val else "below" ma_status[f"ma{ma}"] = status # RSI(14) rsi = None rsi_period = TECHNICAL_CONFIG["rsi_period"] if len(close) >= rsi_period + 1: delta = close.diff() gain = delta.clip(lower=0) loss = (-delta).clip(lower=0) avg_gain = gain.rolling(rsi_period).mean() avg_loss = loss.rolling(rsi_period).mean() rs = avg_gain / avg_loss.replace(0, np.nan) rsi_series = 100 - (100 / (1 + rs)) rsi = round(float(rsi_series.iloc[-1]), 1) # MACD macd_sig = None cfg = TECHNICAL_CONFIG if len(close) >= cfg["macd_slow"] + cfg["macd_signal"]: ema_fast = close.ewm(span=cfg["macd_fast"]).mean() ema_slow = close.ewm(span=cfg["macd_slow"]).mean() macd_line = ema_fast - ema_slow signal_line = macd_line.ewm(span=cfg["macd_signal"]).mean() if macd_line.iloc[-1] > signal_line.iloc[-1]: macd_sig = "bullish" elif macd_line.iloc[-1] < signal_line.iloc[-1]: macd_sig = "bearish" else: macd_sig = "neutral" # ATR(14) atr = None if len(data) >= cfg["atr_period"] + 1: high = data["High"].squeeze() low = data["Low"].squeeze() tr = pd.concat([ high - low, (high - close.shift()).abs(), (low - close.shift()).abs() ], axis=1).max(axis=1) atr = round(float(tr.rolling(cfg["atr_period"]).mean().iloc[-1]), 2) # 成交量趋势 vol_trend = "flat" if len(volume) >= 20: recent_vol = float(volume.tail(5).mean()) prior_vol = float(volume.tail(20).head(15).mean()) if prior_vol > 0: ratio = recent_vol / prior_vol if ratio > 1.2: vol_trend = "increasing" elif ratio < 0.8: vol_trend = "decreasing" return TechnicalSignals( ticker=ticker, ma_status=ma_status, rsi_14=rsi, macd_signal=macd_sig, atr_14=atr, volume_trend=vol_trend, ) except Exception as e: print(f" ✗ 计算 {ticker} 技术指标错误: {e}", file=sys.stderr) return None def fetch_macro_data() -> Dict[str, MacroSnapshot]: """从 FRED 获取宏观经济数据。""" results: Dict[str, MacroSnapshot] = {} if not FRED_AVAILABLE: for key in MACRO_SERIES: results[key] = MacroSnapshot( indicator=key, description=f"FRED 序列 {MACRO_SERIES[key]}", latest_value=None, trend="unavailable", ) return results try: fred = Fred(api_key=FRED_API_KEY) for key, series_id in MACRO_SERIES.items(): try: series = fred.get_series(series_id) if series.empty: results[key] = MacroSnapshot( indicator=key, description=series_id, latest_value=None, trend="no_data", ) continue latest = float(series.dropna().iloc[-1]) latest_date = str(series.dropna().index[-1].date()) yoy = None trend = None if len(series.dropna()) >= 13: yoy_val = float(series.dropna().iloc[-13]) if yoy_val != 0: yoy = round((latest - yoy_val) / abs(yoy_val) * 100, 2) if len(series.dropna()) >= 6: recent_avg = float(series.dropna().tail(3).mean()) prior_avg = float(series.dropna().tail(6).head(3).mean()) if prior_avg != 0: delta_pct = (recent_avg - prior_avg) / abs(prior_avg) * 100 if delta_pct > 0.5: trend = "rising" elif delta_pct < -0.5: trend = "falling" else: trend = "flat" results[key] = MacroSnapshot( indicator=key, description=series_id, latest_value=round(latest, 4), latest_date=latest_date, yoy_change_pct=yoy, trend=trend, ) except Exception as e: results[key] = MacroSnapshot( indicator=key, description=series_id, latest_value=None, trend=f"错误: {str(e)[:80]}", ) except Exception as e: for key in MACRO_SERIES: results[key] = MacroSnapshot( indicator=key, description=MACRO_SERIES[key], latest_value=None, trend="connection_error", ) return results
(续——知识库组装、辅助函数、Agent 切片器)
# ═══════════════════════════════════════════════════════════ # 知识库组装 # ═══════════════════════════════════════════════════════════ def build_knowledge_base() -> KnowledgeBase: """主管道入口。获取所有数据源并组装知识库。""" kb = KnowledgeBase() now = datetime.now(timezone.utc) kb.meta = { "generated_at": now.isoformat(), "market_status": "open" if _is_market_hours(now) else "closed", "data_sources": ["yfinance", "fred"] if FRED_AVAILABLE else ["yfinance"], "warnings": [], } print("📊 获取指数数据...") for name, ticker in INDICES.items(): snap = fetch_index_data(ticker, name) if snap is None: kb.meta["warnings"].append(f"无数据 {name} ({ticker})") continue if name in ("HSI", "N225", "STOXX"): kb.global_markets[name] = snap else: kb.indices[name] = snap print("📈 计算技术指标...") for name, ticker in INDICES.items(): signals = compute_technical_signals(ticker) if signals: kb.technicals[name] = signals print("🏢 获取行业数据...") for ticker, sector_name in SECTORS.items(): snap = fetch_index_data(ticker, sector_name) if snap is None: continue spx = kb.indices.get("SPX") spx_ret = spx.returns.get("20d", 0) or 0 if spx else 0 sec_ret = snap.returns.get("20d", 0) or 0 rs = round(sec_ret - spx_ret, 2) kb.sectors[ticker] = SectorSnapshot( ticker=ticker, name=sector_name, price=snap.price, change_5d_pct=snap.returns.get("5d", 0) or 0, change_20d_pct=sec_ret, relative_strength_vs_spx=rs, ) print("🏛 获取宏观数据 (FRED)...") kb.macro = fetch_macro_data() if all(v.trend in ("unavailable", "no_data", "connection_error") for v in kb.macro.values()): kb.meta["warnings"].append( "FRED 宏观数据不可用 — 请检查 API key 或网络") # — 基本面(从指数/行业/FRED 数据衍生) — spx = kb.indices.get("SPX") kb.fundamentals = { "sp500_pe_approx": _estimate_pe(spx), "sp500_earnings_yield_approx": _estimate_earnings_yield(spx), "sector_rotation_signal": _detect_sector_rotation(kb.sectors), } # — 情绪面(从 VIX + 成交量 + 行业轮动衍生) — vix = kb.indices.get("VIX") kb.sentiment = { "vix_level": vix.price if vix else None, "vix_regime": _classify_vix_regime(vix), "volume_signal": _volume_sentiment_signal(kb.indices), "sector_breadth": _sector_breadth(kb.sectors), } if kb.meta["market_status"] == "closed": kb.meta["warnings"].append( "市场已休市 — 价格为上一收盘价,可能已过时") print(f"✅ 知识库就绪 ({len(kb.indices)} 个指数, " f"{len(kb.sectors)} 个行业, {len(kb.macro)} 个宏观指标)") if kb.meta["warnings"]: print(f"⚠ 警告: {kb.meta['warnings']}") return kb
(续——辅助函数、Agent 数据切片器、主函数)
# ═══════════════════════════════════════════════════════════ # 辅助函数 # ═══════════════════════════════════════════════════════════ def _is_market_hours(now: datetime) -> bool: """粗略判断美股是否在交易时间 (9:30-16:00 ET, 工作日)。""" et_hour = (now.hour - 4) % 24 # UTC-4 近似 EDT et_minute = now.minute weekday = now.weekday() if weekday >= 5: return False total_minutes = et_hour * 60 + et_minute return 570 <= total_minutes <= 960 def _estimate_pe(spx: Optional[IndexSnapshot]) -> Dict[str, Any]: """估算标普 500 PE(占位——生产环境需使用真实基本面 API)。""" if spx is None or spx.price == 0: return {"note": "PE 估算不可用 — 无 SPX 数据"} estimated_earnings = 240.0 # 近12个月近似值 pe = round(spx.price / estimated_earnings, 1) return { "current_pe_approx": pe, "long_term_avg_pe": 17.0, "note": "PE 基于 SPX 价格 / 近似近12个月盈利估算。生产环境请替换为真实基本面 API。" } def _estimate_earnings_yield(spx: Optional[IndexSnapshot]) -> Optional[float]: """盈利收益率 = 1 / PE (近似)。""" pe_data = _estimate_pe(spx) pe = pe_data.get("current_pe_approx") if pe and pe > 0: return round(100 / pe, 2) return None def _detect_sector_rotation(sectors: Dict[str, SectorSnapshot]) -> str: """基于相对强度变化的简单行业轮动信号。""" if not sectors: return "insufficient_data" defensive = ["XLP", "XLU", "XLV"] cyclical = ["XLK", "XLY", "XLI", "XLB"] def_rs = sum(sectors[s].relative_strength_vs_spx for s in defensive if s in sectors) cyc_rs = sum(sectors[s].relative_strength_vs_spx for s in cyclical if s in sectors) if def_rs > cyc_rs + 2: return "defensive_rotation" elif cyc_rs > def_rs + 2: return "cyclical_rotation" return "neutral" def _classify_vix_regime(vix: Optional[IndexSnapshot]) -> str: """基于 VIX 水平分类波动率区间。""" if vix is None: return "unknown" if vix.price < 15: return "low_volatility" elif vix.price < 20: return "normal" elif vix.price < 30: return "elevated" else: return "high_fear" def _volume_sentiment_signal(indices: Dict[str, IndexSnapshot]) -> str: """基于各指数成交量比率的情绪信号。""" if not indices: return "unknown" ratios = [idx.volume_ratio for idx in indices.values() if idx.volume_ratio > 0 and idx.ticker not in ("^VIX",)] if not ratios: return "unknown" avg_ratio = sum(ratios) / len(ratios) if avg_ratio > 1.3: return "high_volume_rally" elif avg_ratio < 0.7: return "low_volume_drift" return "normal_volume" def _sector_breadth(sectors: Dict[str, SectorSnapshot]) -> Dict[str, Any]: """统计 5 日和 20 日内上涨的行业数量。""" if not sectors: return {"breadth_5d": None, "breadth_20d": None} pos_5d = sum(1 for s in sectors.values() if s.change_5d_pct > 0) pos_20d = sum(1 for s in sectors.values() if s.change_20d_pct > 0) total = len(sectors) regime = ("broad_strength" if pos_20d >= 7 else "narrow_leadership" if pos_20d <= 3 else "mixed") return { "positive_5d": f"{pos_5d}/{total}", "positive_20d": f"{pos_20d}/{total}", "breadth_regime": regime, }
(续——Agent 数据切片器、主函数)
# ═══════════════════════════════════════════════════════════ # AGENT 数据切片器 — 每个 Agent 只获取与其相关的数据切片 # ═══════════════════════════════════════════════════════════ AGENT_SLICES = { "tech_bull": ["meta", "indices", "technicals"], "tech_bear": ["meta", "indices", "technicals"], "fund_bull": ["meta", "indices", "sectors", "fundamentals"], "fund_bear": ["meta", "indices", "sectors", "fundamentals"], "macro_bull": ["meta", "macro", "global_markets", "indices"], "macro_bear": ["meta", "macro", "global_markets", "indices"], "senti_bull": ["meta", "sentiment", "indices", "sectors"], "senti_bear": ["meta", "sentiment", "indices", "sectors"], "judge": ["meta", "indices", "sectors", "technicals", "fundamentals", "macro", "sentiment", "global_markets"], } def slice_for_agent(kb: KnowledgeBase, agent_id: str) -> Dict[str, Any]: """提取仅与特定 Agent 相关的数据部分。""" sections = AGENT_SLICES.get(agent_id, AGENT_SLICES["judge"]) result = {} kb_dict = asdict(kb) for section in sections: if section in kb_dict: result[section] = kb_dict[section] return result # ═══════════════════════════════════════════════════════════ # 主函数 # ═══════════════════════════════════════════════════════════ if __name__ == "__main__": print("=" * 60) print("📊 市场数据管道 — 多 Agent 辩论知识库") print("=" * 60) print() kb = build_knowledge_base() # 保存完整知识库 output_path = "market_knowledge_base.json" with open(output_path, "w", encoding="utf-8") as f: json.dump(asdict(kb), f, indent=2, ensure_ascii=False, default=str) print(f"\n💾 完整知识库已保存至: {output_path}") # 展示某个 Agent 的切片示例 print("\n── 示例: 技术多头 Agent 数据切片 ──") tech_bull_slice = slice_for_agent(kb, "tech_bull") print(json.dumps(tech_bull_slice, indent=2, ensure_ascii=False, default=str)[:1200]) print("... (已截断)") print("\n── Agent 数据切片 Schema ──") for agent_id, sections in AGENT_SLICES.items(): print(f" {agent_id:15s} ← {sections}") print(f"\n✅ 管道完成。{len(kb.meta.get('warnings', []))} 个警告。")
# 安装依赖 pip install yfinance fredapi pandas numpy # 获取免费的 FRED API key(可选但推荐): # https://fred.stlouisfed.org/docs/api/api_key.html # 运行管道 export FRED_API_KEY="***" python market_data_pipeline.py
预期输出:
============================================================ 📊 市场数据管道 — 多 Agent 辩论知识库 ============================================================ 📊 获取指数数据... 📈 计算技术指标... 🏢 获取行业数据... 🏛 获取宏观数据 (FRED)... ✅ 知识库就绪 (7 个指数, 10 个行业, 10 个宏观指标) 💾 完整知识库已保存至: market_knowledge_base.json ── 示例: 技术多头 Agent 数据切片 ── { "meta": { "generated_at": "2026-05-15T...", ... }, "indices": { "SPX": { "price": 5847.23, ... }, ... }, "technicals": { "SPX": { "rsi_14": 58.3, "macd_signal": "bullish" }, ... } } ── Agent 数据切片 Schema ── tech_bull ← ['meta', 'indices', 'technicals'] tech_bear ← ['meta', 'indices', 'technicals'] fund_bull ← ['meta', 'indices', 'sectors', 'fundamentals'] ... ✅ 管道完成。0 个警告。
build_knowledge_base()
让我们具体化这一点。管道运行后,每个 Agent 在辩论开始时收到的内容如下:
至此,我们有了一个可运行的数据管道。运行 python market_data_pipeline.py,你会得到 market_knowledge_base.json——一个任何 Agent 都能读取的结构化市场快照。但知识库不会辩论。数据是燃料,不是火焰。
python market_data_pipeline.py
market_knowledge_base.json
在下一篇文章中,我们将构建辩论协议——将这些数据转化为竞争性分析、交叉质询和综合结论的引擎。以下是即将到来的内容:
debate_protocol_market.py
但在此之前,我想让你用今天的代码做一件事。运行管道。查看知识库。问你自己:如果你是裁判,仅凭这些数据,你对市场的看法是什么?把它写下来。不要两边下注。选一个方向,写三条支持性的要点。
然后,当你阅读第二篇时,将你单人的裁判分析与 8 Agent 辩论的产出进行比较。这两者之间的差异——一个人看数据 vs. 八个专业化 Agent 把它撕开——正是我们要构建这个系统的原因。
📎 系列:多 Agent 辩论 × 市场分析。第 1 篇,共 4 篇。前一系列:多 Agent 辩论 L1-L4(对抗协作理论与生产部署)。下一篇:第二篇——辩论协议。
🔥 今天就运行管道。← 前一系列:多 Agent 辩论 L4 · 返回 AI 智能体探索 查看更多文章。
A: 市场是多维度的——时间尺度(短期超买 vs. 长期趋势增长)、分析框架(技术 vs. 基本面 vs. 宏观 vs. 情绪)、以及阵营内部的分歧(两个「看涨」的技术分析师可能因为关注不同指标而得出不同结论)。2 个 Agent 的辩论把所有因素坍缩成一个轴(涨还是跌),丢失了市场结构。8 个 Agent(4×2 矩阵)保留了阵营间的对抗张力,同时引入了阵营内的多样性——多头之间就「为什么看涨」产生的分歧,恰恰暴露了真正的不确定性。
A: 两个免费数据源:Yahoo Finance(通过 yfinance 库,无限调用,获取指数/行业 ETF/历史价格/成交量)和 FRED(通过 fredapi,免费 API key 支持 120 次/分钟,获取 GDP/CPI/失业率/收益率曲线等宏观指标)。管道设计为优雅降级——即使 FRED 不可用(无 key 或网络故障),系统不会崩溃,而是标记宏观数据不可用并继续运行。技术指标(RSI、MACD、ATR)和情绪指标(VIX 区间、行业广度)均为本地计算,不依赖第三方服务。
A: 数据切片是指每个 Agent 只收到知识库中与其分析视角相关的模块。例如,技术多头只看到 meta + indices + technicals,看不到宏观数据。这不是限制——是设计。如果每个 Agent 都看到一切,它们会收敛到相同的分析结论,失去专业化的价值。被迫评论 GDP 增长的技术分析师只能产出低质量分析,分析 K 线形态的基本面分析师同样超出其能力范围。约束创造深度:每个 Agent 在自己的维度上做深,裁判(看到全部数据)负责最终的综合。
A: 当前架构下,所有 8 个 Agent + 裁判使用相同的 LLM(如 GPT-4o 或 Claude),但各自拥有不同的系统提示词和知识库切片。设计约束是:如果技术空头用 GPT-4o 而技术多头用 Claude,你无法分辨辩论结果反映的是真实的分析差异还是模型能力差异。相同模型、不同提示词 = 干净的实验设计。多模型部署(不同 Agent 使用最匹配其分析任务的模型)作为后续鲁棒性升级出现在第四篇。
A: 三轮辩论的并行策略:第一轮(开场论据)——8 个 Agent 全部并行,约 12 秒;第二轮(交叉质询)——4 对 Agent 并行(技术多头 vs. 技术空头、基本多头 vs. 基本空头等),每对内串行,约 12 秒;第三轮(总结陈词)——8 个 Agent 全部并行,约 12 秒。加上裁判综合约 3-5 秒,完整辩论总耗时约 40 秒。质询采用配对而非自由混战的设计(避免 8×7=56 个攻击向量变成噪音),与 L2 系列「约束创造质量」的设计原则一致。