← AI 智能体探索 · ← 前一系列:多 Agent 辩论 L4

多 Agent 辩论 × 市场分析 — 架构与数据管道

2026年5月15日 · 中级

30秒结论

  • 解决什么问题:单一 LLM 分析市场会产生「两边下注」的对冲式结论——没有立场、没有责任、没有深度。市场本质上是对抗系统(每笔交易都有多空双方),需要对抗式推理架构才能真正提取信号。
  • 核心方法:8 个专业化 Agent(4 多头 + 4 空头,横跨技术/基本面/宏观/情绪四个分析维度)+ 1 个中立裁判,通过三轮结构化辩论(开场→交叉质询→总结)产生不含糊的市场分析。所有 Agent 使用相同模型但不同提示词,确保差异来自分析视角而非模型能力。
  • 关键结论:辩论架构的价值不在于「更多 Agent」,而在于每个 Agent 只能看到与其分析视角相关的数据切片——约束创造深度,而非广度创造深度。8 个 Agent 并行运行,总耗时约 36 秒即可完成完整辩论。
  • 读完能做什么:运行本文提供的 market_data_pipeline.py(~220 行),从 Yahoo Finance 和 FRED 拉取真实市场数据,生成结构化知识库 JSON。这是整个辩论系统的数据燃料——无需付费 API,免费额度完全够用。

想象一下:早上 9:29。还有六十秒开盘。你手里有昨天的通胀数据、亚洲隔夜期货、中午的美联储讲话、以及向不同方向尖叫的板块轮动信号。你的直觉说一件事。头条说另一件事。而你那个单次 LLM 查询——「分析市场」——刚给你返回了一段平淡、两边下注的总结,跟任何财经新闻机器人都能写出来的东西无异。

现在想象另一个画面:八个专业化的 AI Agent——四个看涨,四个看跌——每个都装备了不同的分析视角,同时撕开同一份数据。技术分析师在解剖 VIX 期限结构。基本面分析师在计算盈利收益率差。宏观策略师在分析收益率曲线动态。情绪追踪师在解析资金流向。他们在三轮结构化辩论中互相揭露盲点。然后一个裁判 Agent——对结果没有利益牵扯——将他们的冲突综合成一份不含糊的分析,附带明确的推理链条。

这不是科幻小说。这就是我们在这个系列中要构建的东西。

这是一个新系列的第一篇文章:多 Agent 辩论 × 市场分析。我们将 L1-L4 辩论系统的理论框架(对抗协作、结构化协议、多裁判共识)应用到 AI 推理中最困难的领域:金融市场。市场本质上是对抗性的——每一个买家都需要一个卖家,每一个论点都有对立的论点。如果说有哪个领域天生需要辩论架构,那就是这里。

读完本文,你将得到:一个 8 Agent + 裁判辩论系统的清晰架构、一个从免费 API(Yahoo Finance、FRED)拉取真实市场数据的完整数据管道、以及一个你今天就能运行的可执行 Python 模块。更重要的是,你将理解为什么做了每个架构选择——而不只是代码做了什么。

为什么市场分析需要辩论?

在写任何一行代码之前,让我们面对那个区分真实系统与玩具的问题:为什么市场分析特别需要多 Agent 辩论架构?

单 Agent 的问题

问任何 LLM「你对标普 500 的展望如何?」,你会得到这样的回答:

"标普 500 面临复杂的展望。一方面,强劲的盈利增长和 AI 驱动的生产力提升提供了上行潜力。另一方面,持续的通胀、高企的估值和地缘政治风险构成逆风。投资者应保持平衡的配置……"

这就是对冲问题。一个被中性提示的单一 LLM,默认会两边都覆盖。它没错——它只是没用。模型没有任何利益相关,不为特定方向的错误负责,也没有机制来解决它刚刚识别出的矛盾。

但问题比对冲更深。单 Agent 分析有三个结构性缺陷:

缺陷 描述 市场影响
确认偏误 模型天然倾向寻找支持提示词所蕴含框架的证据 问"有什么利好"得到一片乐观;问"有什么风险"得到末日景象。同一个模型,截然相反的结论。
叙事俘获 LLM 的训练语料以共识叙事为主。它们复述主流故事,而不是异常信号。 2020 年 3 月,基于疫情前文本训练的模型会说"市场总是会恢复"——而当时 VIX 冲到 82,熔断机制触发。
假等价 没有对抗压力,模型把所有论点都视为同等有效——"一方面 X,另一方面 Y"——没有权衡机制。 一个十年图表形态和一个美联储利率决议得到相同篇幅。真正的分析师不这样思考。

市场是对抗系统

这就是让辩论架构必需而非可选的洞察:市场,就其基本结构而言,是对抗系统。每笔交易都有两个持相反观点的对手方。价格发现本质上就是多头和空头通过持续的反对意见来收敛到清算价格的过程。

单 Agent 分析试图用合作推理过程来建模一个对抗系统。这就像试图让一个棋手"公平地"同时下两边来模拟一盘棋。它行不通——不是因为棋手不够聪明,而是因为对抗深度需要对抗过程。

💡 核心设计原则:市场分析的质量,与它所承受的最佳反驳论点的质量成正比。如果你的看涨论点无法解释为什么空头的最佳观点是错的,那你没有论点——你只有愿望。辩论架构就是为产生这种"幸存者质量"而设计的。

为什么是 8 个 Agent,而不是 2 个?

你可能会想:如果 L1-L4 系列已经表明,即使两个 Agent 辩论也能提高可靠性,为什么我们需要八个?

因为市场是多维的。简单的多空辩论把所有市场因素坍缩成一个轴:涨还是跌。但真实市场有结构:

  • 时间维度:短期超买信号和长期趋势增长都是"看涨",但在完全不同的时间尺度上。一个单一的看涨 Agent 无法区分"未来两周看涨"和"未来两年看涨",除非在两者之间做取舍。
  • 分析框架:技术分析、基本面估值、宏观经济和情绪分析使用完全不同的数据、假设和推理模式。一个 Agent 试图"考虑所有因素",最终只能平均它们——这正是你错失信号的方式。
  • 关联盲点:共享相同分析框架的两个 Agent 会有相同的盲点。你需要阵营内部的多样性——多头之间就为什么看涨产生分歧——才能暴露真正的不确定性。

八个 Agent(4 多头 + 4 空头),沿两个轴组织——分析框架和时间维度——恰好给了我们这种阵营内多样性,同时保持了阵营间的对抗张力。

系统架构

在深入数据管道之前,让我们先看整个系统的全景。架构如下:


                    ┌─────────────────────────────────────────┐
                    │          市场数据管道                     │
                    │  ┌──────────┐  ┌──────────┐  ┌───────┐  │
                    │  │  Yahoo   │  │   FRED   │  │ 情绪  │  │
                    │  │ Finance  │  │  宏观    │  │ 数据  │  │
                    │  └────┬─────┘  └────┬─────┘  └───┬───┘  │
                    │       └──────────────┼────────────┘      │
                    │                      ▼                   │
                    │           ┌──────────────────┐           │
                    │           │    知识库         │           │
                    │           │   (标准化)        │           │
                    │           └────────┬─────────┘           │
                    └────────────────────┼────────────────────┘
                                         │
              ┌──────────────────────────┼──────────────────────────┐
              │                          ▼                          │
              │               辩论编排器                             │
              │                                                     │
              │   第一轮:开场论据                                    │
              │   ┌─────────────────────┐ ┌─────────────────────┐   │
              │   │   4 个多头 AGENT    │ │   4 个空头 AGENT    │   │
              │   │ ┌────┐┌────┐┌────┐ │ │ ┌────┐┌────┐┌────┐ │   │
              │   │ │技术││基本││宏观│ │ │ │技术││基本││宏观│ │   │
              │   │ │ 🐂 ││ 🐂 ││ 🐂 │ │ │ │ 🐻 ││ 🐻 ││ 🐻 │ │   │
              │   │ └────┘└────┘└────┘ │ │ └────┘└────┘└────┘ │   │
              │   │ ┌────┐             │ │ ┌────┐             │   │
              │   │ │情绪│             │ │ │情绪│             │   │
              │   │ │ 🐂 │             │ │ │ 🐻 │             │   │
              │   │ └────┘             │ │ └────┘             │   │
              │   └─────────────────────┘ └─────────────────────┘   │
              │                                                     │
              │   第二轮:交叉质询                                    │
              │   (每个 Agent 针对同一分析领域内的                    │
              │    对手方进行挑战)                                   │
              │                                                     │
              │   第三轮:总结陈词                                    │
              │   (Agent 基于受到的批评完善论点                      │
              │    ——让步或加倍下注)                                 │
              │                                                     │
              │                         ▼                           │
              │              ┌──────────────────┐                   │
              │              │   裁判 AGENT ⚖️   │                   │
              │              │ 从辩论记录中综合   │                   │
              │              │ 最终分析报告      │                   │
              │              └──────────────────┘                   │
              └─────────────────────────────────────────────────────┘
  

组件分解

层级 组件 职责 本文覆盖
数据 市场数据管道 从免费 API 拉取指数、板块、宏观指标、情绪数据;标准化为结构化知识库 ✅ 完整代码
知识库 结构化 JSON 输出,包含元数据、数据新鲜度时间戳和 Agent 专属数据切片 ✅ 完整代码
辩论 辩论协议 3 轮结构化辩论(开场→质询→总结),针对市场分析适配 → 第二篇
Agent 专业化 8 个 Agent 各有独立提示词、分析视角和知识库切片 本文设计,第二篇代码
裁判 裁判 Agent 综合完整辩论记录,生成结构化最终分析,附置信度 → 第三篇
生产 编排器 + 调度 每日自动运行、异步执行、缓存、错误恢复 → 第四篇

辩论轮次结构

辩论采用 3 轮协议,改编自 L2 的结构化辩论设计,但有一个关键升级:所有 8 个 Agent 在每轮内并行运行。

轮次 耗时 多头 Agent 空头 Agent 并行?
第一轮 — 开场 ~12s 每个多头从自己的分析视角生成开场论点,引用具体数据 每个空头从自己的分析视角生成开场反论点 ✅ 8 个 Agent 全部并行
第二轮 — 质询 ~12s 每个多头直接挑战对应空头的开场论点(如技术多头攻击技术空头的论点) 每个空头直接挑战对应多头的开场论点 ⚠️ 每对串行,4 对并行
第三轮 — 总结 ~12s 完善原始论点——放弃薄弱环节,强化存活的论点 同样——基于质询反馈让步或加倍 ✅ 8 个 Agent 全部并行
💡 为什么质询是配对的,而不是自由混战:如果每个 Agent 都可以攻击其他所有 Agent,一轮就有 8×7 = 56 个攻击向量。那是噪音,不是信号。通过在同一分析领域内配对 Agent(技术多头 vs. 技术空头,宏观多头 vs. 宏观空头),每次质询都是聚焦的、深度的、跨对可比较的。这与 L2 的"一位裁判评估辩论的一个维度"是同一设计原则——约束创造质量。

Agent 专业化:8 角色矩阵

这就是架构变得有趣的地方。我们不只是把同一个提示词复制 8 份,然后改成"你是看涨的"和"你是看跌的"。每个 Agent 都有一个独特的分析视角——一套专门的方法论、数据源和推理框架。

专业化矩阵

Agent 分析视角 数据焦点 时间维度 核心问题
🐂 技术多头 技术分析 价格行为、均线、RSI、MACD、成交量、支撑/阻力 1 天 – 2 周 我们在上升趋势中吗?动量在加速吗?下一个阻力在哪?
🐻 技术空头 技术分析 超买信号、空头背离、派发形态、VIX 飙升 1 天 – 2 周 动量在消退吗?我们在筑顶吗?成交量确认走势吗?
🐂 基本多头 基本面分析 盈利收益率差、市盈率、行业 ROE、利润率、自由现金流 3 个月 – 2 年 估值有盈利支撑吗?盈利增长有动能吗?
🐻 基本空头 基本面分析 PE 扩张风险、利润率压缩、债务水平、盈利质量红旗 3 个月 – 2 年 倍数可持续吗?盈利质量在恶化吗?资产负债表杠杆多高?
🐂 宏观多头 宏观经济分析 GDP 增长、就业、收益率曲线、货币政策、全球 PMI、资本流动 6 个月 – 3 年 宏观环境支持吗?美联储转向了吗?全球资金流向正面吗?
🐻 宏观空头 宏观经济分析 通胀黏性、收益率曲线倒挂、流动性收紧、地缘政治风险、债务上限 6 个月 – 3 年 衰退被定价了吗?流动性在收缩吗?宏观失衡在积累吗?
🐂 情绪多头 情绪与资金流分析 看跌/看涨比率、资金流向、AAII 情绪、空头持仓、内部人买入 1 周 – 3 个月 情绪过度悲观了吗(反向买入)?聪明钱在流入吗?
🐻 情绪空头 情绪与资金流分析 极度看涨情绪、狂热指标、保证金债务水平、IPO 狂热 1 周 – 3 个月 狂热见顶了吗?散户资金流在发出顶部信号吗?保证金债务过度了吗?

为什么这个矩阵有效

这不是随意分配。专业化矩阵沿两个正交轴设计:

  1. 分析框架(列):技术、基本面、宏观、情绪。这四个视角使用根本不同的数据和推理。一个技术分析师和一个基本面分析师可以看同一个市场而得出相反的结论,且都不算"错"——他们回答的是不同的问题。这迫使裁判权衡哪个框架对当前市场环境更相关,而不是简单地选一个赢家。
  2. 方向立场(行):每个框架内的多头 vs. 空头。这在每个分析维度内创造了对抗张力。技术多头和技术空头不是在争论技术分析是否有效——他们一致同意它是有效的。他们在争论的是技术数据此刻意味着什么。这比"技术面说买、基本面说卖"那种辩论高级得多——那是拿苹果和橘子比较。
⚠️ 重要设计约束:所有 Agent 使用相同的 LLM(模型),但各自有不同的系统提示词和不同的知识库切片。在这个架构中,我们不给不同的 Agent 分配不同的模型。为什么?因为如果技术空头用 GPT-4o 而技术多头用 Claude,你无法分辨辩论结果反映的是真实的分析差异还是单纯的模型能力差异。相同模型,不同提示词 = 干净的实验设计。多模型部署在后面(第四篇)作为一个鲁棒性升级出现,而不是基线。

市场数据管道:从原始 API 到辩论知识库

现在来看你能实际运行的部分。数据管道是原始市场现实与 Agent 可读分析上下文之间的桥梁。它的工作:从免费数据源拉取数据,标准化为结构化知识库,并为每个 Agent 的特定分析视角做切片。

数据源

数据源 API 我们拉取什么 免费额度 使用者
Yahoo Finance yfinance 指数(标普500、纳斯达克、道琼斯、VIX、恒生、日经、欧洲STOXX50)、行业ETF、历史价格、成交量 无限 技术多/空、基本多/空
FRED fredapi GDP、CPI、失业率、联邦基金利率、收益率曲线(10Y-2Y)、M2 货币供应、工业产出、零售销售、新屋开工、非农就业 120 次/分钟(免费 API key) 宏观多/空
内部计算 内置 衍生指标:均线、RSI、板块广度、收益率利差、波动率期限结构 N/A 所有 Agent

知识库 Schema

管道输出一个结构化 JSON 对象——知识库——每个 Agent 都会收到(但每个 Agent 只读取与其分析视角相关的切片)。Schema 如下:

模块 字段 消费 Agent
meta 时间戳、数据新鲜度、警告(数据过期提醒) 全部
indices 每个指数:价格、涨跌幅、5/20/50/200日回报、距52周高/低点、成交量比率 技术多/空、基本多/空
sectors 每个行业ETF:价格、5/20日回报、相对标普500强度、板块广度 技术多/空、基本多/空
technicals 每个指数:MA(20/50/200)状态(上方/下方)、RSI(14)、MACD信号、ATR(14)、成交量趋势 技术多/空
fundamentals 标普500:当前/5年/10年平均PE、盈利收益率、股息率、Shiller CAPE、行业PE分布 基本多/空
macro GDP增速、CPI同比、失业率、联邦基金利率、收益率曲线利差、M2增速、ISM制造业PMI 宏观多/空
sentiment VIX水平和期限结构、看跌/看涨比率、板块资金流向信号、波动率区间 情绪多/空
global 恒生、日经225、STOXX50:价格、5/20日回报、与标普500相关性 宏观多/空

代码:market_data_pipeline.py

以下是完整的数据管道。保存为 market_data_pipeline.py。约 220 行,从 Yahoo Finance 和 FRED 拉取真实数据,输出结构化知识库。每个函数都有错误处理——如果某个 API 挂了,管道会优雅降级而不是崩溃。

"""
market_data_pipeline.py
多 Agent 辩论 × 市场分析 — 数据管道
────────────────────────────────────
从免费 API (Yahoo Finance, FRED) 拉取市场数据,
并结构化为辩论系统的知识库。

数据源:
  - Yahoo Finance (yfinance): 指数、行业 ETF、历史价格
  - FRED (fredapi): 宏观经济指标
  - 内部计算: 技术指标、衍生指标

输出: 结构化 JSON 知识库,供 8 个辩论 Agent 消费。
       每个 Agent 只读取与其相关的切片。

依赖:
  pip install yfinance fredapi pandas numpy

FRED API key (免费): https://fred.stlouisfed.org/docs/api/api_key.html
"""

import json
import os
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import yfinance as yf

# 可选: FRED 宏观数据。如果 fredapi 未安装或无 key,
# 宏观数据将标记为不可用而不是崩溃。
try:
    from fredapi import Fred
    FRED_AVAILABLE = True
except ImportError:
    FRED_AVAILABLE = False


# ═══════════════════════════════════════════════════════════
# 配置
# ═══════════════════════════════════════════════════════════

FRED_API_KEY=*** "your-fred-api-key-here")

# — 追踪的指数 —
INDICES: Dict[str, str] = {
    "SPX":    "^GSPC",       # 标普 500
    "NASDAQ": "^IXIC",       # 纳斯达克综合
    "DJIA":   "^DJI",        # 道琼斯工业平均
    "VIX":    "^VIX",        # CBOE 波动率指数
    "HSI":    "^HSI",        # 恒生指数
    "N225":   "^N225",       # 日经 225
    "STOXX":  "^STOXX50E",   # 欧洲 STOXX 50
}

# — 行业 ETF 用于广度分析 —
SECTORS: Dict[str, str] = {
    "XLK":  "科技",
    "XLF":  "金融",
    "XLE":  "能源",
    "XLV":  "医疗健康",
    "XLI":  "工业",
    "XLY":  "可选消费",
    "XLP":  "必需消费",
    "XLU":  "公用事业",
    "XLB":  "原材料",
    "XLRE": "房地产",
}

# — FRED 宏观序列 —
MACRO_SERIES: Dict[str, str] = {
    "GDP":        "GDP",         # 国内生产总值 (季度)
    "CPI":        "CPIAUCSL",    # 消费者价格指数 (月度)
    "UNRATE":     "UNRATE",      # 失业率 (月度)
    "FEDFUNDS":   "FEDFUNDS",    # 有效联邦基金利率 (月度)
    "T10Y2Y":     "T10Y2Y",      # 10年-2年国债利差 (每日)
    "M2":         "M2SL",        # M2 货币供应量 (月度)
    "INDPRO":     "INDPRO",      # 工业产出指数 (月度)
    "RETAIL":     "RSXFS",       # 零售销售 (月度)
    "HOUST":      "HOUST",       # 新屋开工 (月度)
    "PAYEMS":     "PAYEMS",      # 非农就业总人数 (月度)
}

# — 技术指标参数 —
TECHNICAL_CONFIG = {
    "ma_periods": [20, 50, 200],
    "rsi_period": 14,
    "macd_fast": 12,
    "macd_slow": 26,
    "macd_signal": 9,
    "atr_period": 14,
}

(续——数据类定义)


# ═══════════════════════════════════════════════════════════
# 数据类 — 结构化知识库输出
# ═══════════════════════════════════════════════════════════

@dataclass
class IndexSnapshot:
    """单个指数的快照"""
    ticker: str
    name: str
    price: float
    change_pct: float
    returns: Dict[str, float] = field(default_factory=dict)
    vs_52w_high_pct: float = 0.0
    vs_52w_low_pct: float = 0.0
    volume_ratio: float = 1.0


@dataclass
class SectorSnapshot:
    """行业 ETF 的快照"""
    ticker: str
    name: str
    price: float
    change_5d_pct: float
    change_20d_pct: float
    relative_strength_vs_spx: float


@dataclass
class TechnicalSignals:
    """单个指数的技术指标"""
    ticker: str
    ma_status: Dict[str, str] = field(default_factory=dict)
    rsi_14: Optional[float] = None
    macd_signal: Optional[str] = None
    atr_14: Optional[float] = None
    volume_trend: Optional[str] = None


@dataclass
class MacroSnapshot:
    """宏观经济指标值"""
    indicator: str
    description: str
    latest_value: Optional[float] = None
    latest_date: Optional[str] = None
    yoy_change_pct: Optional[float] = None
    trend: Optional[str] = None


@dataclass
class KnowledgeBase:
    """辩论 Agent 的完整结构化知识库"""
    meta: Dict[str, Any] = field(default_factory=dict)
    indices: Dict[str, IndexSnapshot] = field(default_factory=dict)
    sectors: Dict[str, SectorSnapshot] = field(default_factory=dict)
    technicals: Dict[str, TechnicalSignals] = field(default_factory=dict)
    fundamentals: Dict[str, Any] = field(default_factory=dict)
    macro: Dict[str, MacroSnapshot] = field(default_factory=dict)
    sentiment: Dict[str, Any] = field(default_factory=dict)
    global_markets: Dict[str, IndexSnapshot] = field(default_factory=dict)


# ═══════════════════════════════════════════════════════════
# 数据获取
# ═══════════════════════════════════════════════════════════

def fetch_index_data(ticker: str, name: str,
                     period: str = "1y") -> Optional[IndexSnapshot]:
    """从 Yahoo Finance 获取指数价格数据。"""
    try:
        data = yf.download(ticker, period=period, progress=False)
        if data.empty:
            print(f"  ⚠ 无数据 {ticker} ({name})", file=sys.stderr)
            return None

        close = data["Close"].squeeze()
        volume = data["Volume"].squeeze()

        current = float(close.iloc[-1])
        prev = float(close.iloc[-2]) if len(close) > 1 else current
        change_pct = round((current - prev) / prev * 100, 2)

        returns = {}
        for label, days in [("5d", 5), ("20d", 20), ("50d", 50), ("200d", 200)]:
            if len(close) >= days:
                past = float(close.iloc[-(days + 1)])
                returns[label] = round((current - past) / past * 100, 2)
            else:
                returns[label] = None

        high_52w = float(close.max())
        low_52w = float(close.min())
        vs_high = round((current - high_52w) / high_52w * 100, 2)
        vs_low = round((current - low_52w) / low_52w * 100, 2)

        avg_vol_20d = float(volume.tail(20).mean()) if len(volume) >= 20 else float(volume.mean())
        vol_ratio = round(float(volume.iloc[-1]) / avg_vol_20d, 2) if avg_vol_20d > 0 else 1.0

        return IndexSnapshot(
            ticker=ticker, name=name, price=current,
            change_pct=change_pct, returns=returns,
            vs_52w_high_pct=vs_high, vs_52w_low_pct=vs_low,
            volume_ratio=vol_ratio,
        )
    except Exception as e:
        print(f"  ✗ 获取 {ticker} 错误: {e}", file=sys.stderr)
        return None


def compute_technical_signals(ticker: str,
                              period: str = "6mo") -> Optional[TechnicalSignals]:
    """计算某个指数的技术指标。"""
    try:
        data = yf.download(ticker, period=period, progress=False)
        if data.empty:
            return None

        close = data["Close"].squeeze()
        volume = data["Volume"].squeeze()

        # 移动平均线
        ma_status = {}
        for ma in TECHNICAL_CONFIG["ma_periods"]:
            if len(close) >= ma:
                ma_val = float(close.rolling(ma).mean().iloc[-1])
                current = float(close.iloc[-1])
                status = "above" if current > ma_val else "below"
                ma_status[f"ma{ma}"] = status

        # RSI(14)
        rsi = None
        rsi_period = TECHNICAL_CONFIG["rsi_period"]
        if len(close) >= rsi_period + 1:
            delta = close.diff()
            gain = delta.clip(lower=0)
            loss = (-delta).clip(lower=0)
            avg_gain = gain.rolling(rsi_period).mean()
            avg_loss = loss.rolling(rsi_period).mean()
            rs = avg_gain / avg_loss.replace(0, np.nan)
            rsi_series = 100 - (100 / (1 + rs))
            rsi = round(float(rsi_series.iloc[-1]), 1)

        # MACD
        macd_sig = None
        cfg = TECHNICAL_CONFIG
        if len(close) >= cfg["macd_slow"] + cfg["macd_signal"]:
            ema_fast = close.ewm(span=cfg["macd_fast"]).mean()
            ema_slow = close.ewm(span=cfg["macd_slow"]).mean()
            macd_line = ema_fast - ema_slow
            signal_line = macd_line.ewm(span=cfg["macd_signal"]).mean()
            if macd_line.iloc[-1] > signal_line.iloc[-1]:
                macd_sig = "bullish"
            elif macd_line.iloc[-1] < signal_line.iloc[-1]:
                macd_sig = "bearish"
            else:
                macd_sig = "neutral"

        # ATR(14)
        atr = None
        if len(data) >= cfg["atr_period"] + 1:
            high = data["High"].squeeze()
            low = data["Low"].squeeze()
            tr = pd.concat([
                high - low,
                (high - close.shift()).abs(),
                (low - close.shift()).abs()
            ], axis=1).max(axis=1)
            atr = round(float(tr.rolling(cfg["atr_period"]).mean().iloc[-1]), 2)

        # 成交量趋势
        vol_trend = "flat"
        if len(volume) >= 20:
            recent_vol = float(volume.tail(5).mean())
            prior_vol = float(volume.tail(20).head(15).mean())
            if prior_vol > 0:
                ratio = recent_vol / prior_vol
                if ratio > 1.2:
                    vol_trend = "increasing"
                elif ratio < 0.8:
                    vol_trend = "decreasing"

        return TechnicalSignals(
            ticker=ticker, ma_status=ma_status,
            rsi_14=rsi, macd_signal=macd_sig,
            atr_14=atr, volume_trend=vol_trend,
        )
    except Exception as e:
        print(f"  ✗ 计算 {ticker} 技术指标错误: {e}", file=sys.stderr)
        return None


def fetch_macro_data() -> Dict[str, MacroSnapshot]:
    """从 FRED 获取宏观经济数据。"""
    results: Dict[str, MacroSnapshot] = {}

    if not FRED_AVAILABLE:
        for key in MACRO_SERIES:
            results[key] = MacroSnapshot(
                indicator=key, description=f"FRED 序列 {MACRO_SERIES[key]}",
                latest_value=None, trend="unavailable",
            )
        return results

    try:
        fred = Fred(api_key=FRED_API_KEY)
        for key, series_id in MACRO_SERIES.items():
            try:
                series = fred.get_series(series_id)
                if series.empty:
                    results[key] = MacroSnapshot(
                        indicator=key, description=series_id,
                        latest_value=None, trend="no_data",
                    )
                    continue

                latest = float(series.dropna().iloc[-1])
                latest_date = str(series.dropna().index[-1].date())

                yoy = None
                trend = None
                if len(series.dropna()) >= 13:
                    yoy_val = float(series.dropna().iloc[-13])
                    if yoy_val != 0:
                        yoy = round((latest - yoy_val) / abs(yoy_val) * 100, 2)

                if len(series.dropna()) >= 6:
                    recent_avg = float(series.dropna().tail(3).mean())
                    prior_avg = float(series.dropna().tail(6).head(3).mean())
                    if prior_avg != 0:
                        delta_pct = (recent_avg - prior_avg) / abs(prior_avg) * 100
                        if delta_pct > 0.5:
                            trend = "rising"
                        elif delta_pct < -0.5:
                            trend = "falling"
                        else:
                            trend = "flat"

                results[key] = MacroSnapshot(
                    indicator=key, description=series_id,
                    latest_value=round(latest, 4), latest_date=latest_date,
                    yoy_change_pct=yoy, trend=trend,
                )
            except Exception as e:
                results[key] = MacroSnapshot(
                    indicator=key, description=series_id,
                    latest_value=None, trend=f"错误: {str(e)[:80]}",
                )
    except Exception as e:
        for key in MACRO_SERIES:
            results[key] = MacroSnapshot(
                indicator=key, description=MACRO_SERIES[key],
                latest_value=None, trend="connection_error",
            )

    return results

(续——知识库组装、辅助函数、Agent 切片器)


# ═══════════════════════════════════════════════════════════
# 知识库组装
# ═══════════════════════════════════════════════════════════

def build_knowledge_base() -> KnowledgeBase:
    """主管道入口。获取所有数据源并组装知识库。"""
    kb = KnowledgeBase()
    now = datetime.now(timezone.utc)

    kb.meta = {
        "generated_at": now.isoformat(),
        "market_status": "open" if _is_market_hours(now) else "closed",
        "data_sources": ["yfinance", "fred"] if FRED_AVAILABLE else ["yfinance"],
        "warnings": [],
    }

    print("📊 获取指数数据...")
    for name, ticker in INDICES.items():
        snap = fetch_index_data(ticker, name)
        if snap is None:
            kb.meta["warnings"].append(f"无数据 {name} ({ticker})")
            continue
        if name in ("HSI", "N225", "STOXX"):
            kb.global_markets[name] = snap
        else:
            kb.indices[name] = snap

    print("📈 计算技术指标...")
    for name, ticker in INDICES.items():
        signals = compute_technical_signals(ticker)
        if signals:
            kb.technicals[name] = signals

    print("🏢 获取行业数据...")
    for ticker, sector_name in SECTORS.items():
        snap = fetch_index_data(ticker, sector_name)
        if snap is None:
            continue
        spx = kb.indices.get("SPX")
        spx_ret = spx.returns.get("20d", 0) or 0 if spx else 0
        sec_ret = snap.returns.get("20d", 0) or 0
        rs = round(sec_ret - spx_ret, 2)
        kb.sectors[ticker] = SectorSnapshot(
            ticker=ticker, name=sector_name,
            price=snap.price,
            change_5d_pct=snap.returns.get("5d", 0) or 0,
            change_20d_pct=sec_ret,
            relative_strength_vs_spx=rs,
        )

    print("🏛  获取宏观数据 (FRED)...")
    kb.macro = fetch_macro_data()
    if all(v.trend in ("unavailable", "no_data", "connection_error")
           for v in kb.macro.values()):
        kb.meta["warnings"].append(
            "FRED 宏观数据不可用 — 请检查 API key 或网络")

    # — 基本面(从指数/行业/FRED 数据衍生) —
    spx = kb.indices.get("SPX")
    kb.fundamentals = {
        "sp500_pe_approx": _estimate_pe(spx),
        "sp500_earnings_yield_approx": _estimate_earnings_yield(spx),
        "sector_rotation_signal": _detect_sector_rotation(kb.sectors),
    }

    # — 情绪面(从 VIX + 成交量 + 行业轮动衍生) —
    vix = kb.indices.get("VIX")
    kb.sentiment = {
        "vix_level": vix.price if vix else None,
        "vix_regime": _classify_vix_regime(vix),
        "volume_signal": _volume_sentiment_signal(kb.indices),
        "sector_breadth": _sector_breadth(kb.sectors),
    }

    if kb.meta["market_status"] == "closed":
        kb.meta["warnings"].append(
            "市场已休市 — 价格为上一收盘价,可能已过时")

    print(f"✅ 知识库就绪 ({len(kb.indices)} 个指数, "
          f"{len(kb.sectors)} 个行业, {len(kb.macro)} 个宏观指标)")
    if kb.meta["warnings"]:
        print(f"⚠  警告: {kb.meta['warnings']}")

    return kb

(续——辅助函数、Agent 数据切片器、主函数)


# ═══════════════════════════════════════════════════════════
# 辅助函数
# ═══════════════════════════════════════════════════════════

def _is_market_hours(now: datetime) -> bool:
    """粗略判断美股是否在交易时间 (9:30-16:00 ET, 工作日)。"""
    et_hour = (now.hour - 4) % 24  # UTC-4 近似 EDT
    et_minute = now.minute
    weekday = now.weekday()
    if weekday >= 5:
        return False
    total_minutes = et_hour * 60 + et_minute
    return 570 <= total_minutes <= 960


def _estimate_pe(spx: Optional[IndexSnapshot]) -> Dict[str, Any]:
    """估算标普 500 PE(占位——生产环境需使用真实基本面 API)。"""
    if spx is None or spx.price == 0:
        return {"note": "PE 估算不可用 — 无 SPX 数据"}
    estimated_earnings = 240.0  # 近12个月近似值
    pe = round(spx.price / estimated_earnings, 1)
    return {
        "current_pe_approx": pe,
        "long_term_avg_pe": 17.0,
        "note": "PE 基于 SPX 价格 / 近似近12个月盈利估算。生产环境请替换为真实基本面 API。"
    }


def _estimate_earnings_yield(spx: Optional[IndexSnapshot]) -> Optional[float]:
    """盈利收益率 = 1 / PE (近似)。"""
    pe_data = _estimate_pe(spx)
    pe = pe_data.get("current_pe_approx")
    if pe and pe > 0:
        return round(100 / pe, 2)
    return None


def _detect_sector_rotation(sectors: Dict[str, SectorSnapshot]) -> str:
    """基于相对强度变化的简单行业轮动信号。"""
    if not sectors:
        return "insufficient_data"
    defensive = ["XLP", "XLU", "XLV"]
    cyclical = ["XLK", "XLY", "XLI", "XLB"]
    def_rs = sum(sectors[s].relative_strength_vs_spx
                 for s in defensive if s in sectors)
    cyc_rs = sum(sectors[s].relative_strength_vs_spx
                 for s in cyclical if s in sectors)
    if def_rs > cyc_rs + 2:
        return "defensive_rotation"
    elif cyc_rs > def_rs + 2:
        return "cyclical_rotation"
    return "neutral"


def _classify_vix_regime(vix: Optional[IndexSnapshot]) -> str:
    """基于 VIX 水平分类波动率区间。"""
    if vix is None:
        return "unknown"
    if vix.price < 15:
        return "low_volatility"
    elif vix.price < 20:
        return "normal"
    elif vix.price < 30:
        return "elevated"
    else:
        return "high_fear"


def _volume_sentiment_signal(indices: Dict[str, IndexSnapshot]) -> str:
    """基于各指数成交量比率的情绪信号。"""
    if not indices:
        return "unknown"
    ratios = [idx.volume_ratio for idx in indices.values()
              if idx.volume_ratio > 0 and idx.ticker not in ("^VIX",)]
    if not ratios:
        return "unknown"
    avg_ratio = sum(ratios) / len(ratios)
    if avg_ratio > 1.3:
        return "high_volume_rally"
    elif avg_ratio < 0.7:
        return "low_volume_drift"
    return "normal_volume"


def _sector_breadth(sectors: Dict[str, SectorSnapshot]) -> Dict[str, Any]:
    """统计 5 日和 20 日内上涨的行业数量。"""
    if not sectors:
        return {"breadth_5d": None, "breadth_20d": None}
    pos_5d = sum(1 for s in sectors.values() if s.change_5d_pct > 0)
    pos_20d = sum(1 for s in sectors.values() if s.change_20d_pct > 0)
    total = len(sectors)
    regime = ("broad_strength" if pos_20d >= 7 else
              "narrow_leadership" if pos_20d <= 3 else "mixed")
    return {
        "positive_5d": f"{pos_5d}/{total}",
        "positive_20d": f"{pos_20d}/{total}",
        "breadth_regime": regime,
    }

(续——Agent 数据切片器、主函数)


# ═══════════════════════════════════════════════════════════
# AGENT 数据切片器 — 每个 Agent 只获取与其相关的数据切片
# ═══════════════════════════════════════════════════════════

AGENT_SLICES = {
    "tech_bull":   ["meta", "indices", "technicals"],
    "tech_bear":   ["meta", "indices", "technicals"],
    "fund_bull":   ["meta", "indices", "sectors", "fundamentals"],
    "fund_bear":   ["meta", "indices", "sectors", "fundamentals"],
    "macro_bull":  ["meta", "macro", "global_markets", "indices"],
    "macro_bear":  ["meta", "macro", "global_markets", "indices"],
    "senti_bull":  ["meta", "sentiment", "indices", "sectors"],
    "senti_bear":  ["meta", "sentiment", "indices", "sectors"],
    "judge":       ["meta", "indices", "sectors", "technicals",
                    "fundamentals", "macro", "sentiment", "global_markets"],
}


def slice_for_agent(kb: KnowledgeBase, agent_id: str) -> Dict[str, Any]:
    """提取仅与特定 Agent 相关的数据部分。"""
    sections = AGENT_SLICES.get(agent_id, AGENT_SLICES["judge"])
    result = {}
    kb_dict = asdict(kb)
    for section in sections:
        if section in kb_dict:
            result[section] = kb_dict[section]
    return result


# ═══════════════════════════════════════════════════════════
# 主函数
# ═══════════════════════════════════════════════════════════

if __name__ == "__main__":
    print("=" * 60)
    print("📊 市场数据管道 — 多 Agent 辩论知识库")
    print("=" * 60)
    print()

    kb = build_knowledge_base()

    # 保存完整知识库
    output_path = "market_knowledge_base.json"
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(asdict(kb), f, indent=2, ensure_ascii=False, default=str)
    print(f"\n💾 完整知识库已保存至: {output_path}")

    # 展示某个 Agent 的切片示例
    print("\n── 示例: 技术多头 Agent 数据切片 ──")
    tech_bull_slice = slice_for_agent(kb, "tech_bull")
    print(json.dumps(tech_bull_slice, indent=2, ensure_ascii=False, default=str)[:1200])
    print("... (已截断)")

    print("\n── Agent 数据切片 Schema ──")
    for agent_id, sections in AGENT_SLICES.items():
        print(f"  {agent_id:15s} ← {sections}")

    print(f"\n✅ 管道完成。{len(kb.meta.get('warnings', []))} 个警告。")

运行管道

# 安装依赖
pip install yfinance fredapi pandas numpy

# 获取免费的 FRED API key(可选但推荐):
# https://fred.stlouisfed.org/docs/api/api_key.html

# 运行管道
export FRED_API_KEY="***"
python market_data_pipeline.py

预期输出:

============================================================
📊 市场数据管道 — 多 Agent 辩论知识库
============================================================

📊 获取指数数据...
📈 计算技术指标...
🏢 获取行业数据...
🏛  获取宏观数据 (FRED)...
✅ 知识库就绪 (7 个指数, 10 个行业, 10 个宏观指标)

💾 完整知识库已保存至: market_knowledge_base.json

── 示例: 技术多头 Agent 数据切片 ──
{
  "meta": { "generated_at": "2026-05-15T...", ... },
  "indices": { "SPX": { "price": 5847.23, ... }, ... },
  "technicals": { "SPX": { "rsi_14": 58.3, "macd_signal": "bullish" }, ... }
}

── Agent 数据切片 Schema ──
  tech_bull       ← ['meta', 'indices', 'technicals']
  tech_bear       ← ['meta', 'indices', 'technicals']
  fund_bull       ← ['meta', 'indices', 'sectors', 'fundamentals']
  ...

✅ 管道完成。0 个警告。

管道的关键设计决策

  1. 优雅降级:如果 FRED 不可用(没有 API key、网络故障),管道不会崩溃——它把宏观数据标记为不可用并继续运行。辩论仍然可以进行(宏观 Agent 数据较少,但可以从现有数据中推理)。
  2. Agent 数据切片:每个 Agent 只能看到与其分析视角相关的数据。技术多头看不到宏观数据——不是因为宏观不重要,而是因为专业化需要聚焦。如果每个 Agent 都能看到一切,它们都会收敛到相同的分析。约束创造多样性。
  3. 衍生指标:技术信号(RSI、MACD、ATR)和情绪指标(VIX 区间、行业广度)是本地计算的,而不是从外部 API 拉取的。这确保了结果可复现,避免了对第三方指标服务的依赖。
  4. 新鲜度元数据:meta 模块追踪数据获取时间和市场是否开盘。这对辩论协议至关重要——Agent 需要知道它们分析的是实时数据还是昨日收盘数据。
💡 管道是模块,不是服务:这段代码被设计为由辩论编排器(第二篇)导入,而不是作为独立的 Web 服务运行。编排器将为每场辩论调用一次 build_knowledge_base(),或每天为定时任务调用一次。缓存机制(第四篇)将在其上添加 TTL 层,这样我们不会为每场辩论重新拉取数据。

每个 Agent 看到什么:切片原则

让我们具体化这一点。管道运行后,每个 Agent 在辩论开始时收到的内容如下:

Agent 数据模块 关键数据点示例
🐂 技术多头 meta, indices, technicals SPX: 价格在 MA50/MA200 上方,RSI 58(未超买),MACD 金叉,成交量放大
🐻 技术空头 meta, indices, technicals SPX: 接近 5900 阻力位,RSI 背离形成,VIX 期货升水收窄
🐂 基本多头 meta, indices, sectors, fundamentals SPX PE 24.3 vs. 10年国债收益率,利差有利股票;行业盈利广度正面
🐻 基本空头 meta, indices, sectors, fundamentals PE 高于 5 年均值;利润率处于峰值;盈利收益率与债券利差收窄
🐂 宏观多头 meta, macro, global_markets, indices GDP 增长为正,失业率低位,美联储可能暂停加息,全球 PMI 扩张
🐻 宏观空头 meta, macro, global_markets, indices CPI 仍高于目标,收益率曲线倒挂,M2 收缩,地缘政治风险上升
🐂 情绪多头 meta, sentiment, indices, sectors VIX 处于正常区间,看跌/看涨比率偏高(反向买入),周期板块轮动进行中
🐻 情绪空头 meta, sentiment, indices, sectors AAII 看涨比例偏高(反向卖出),保证金债务高企,行业广度收窄
⚠️ 切片约束是经过设计的,不是限制:给每个 Agent 完整知识库的想法很诱人——"更多数据 = 更好的分析"。但这恰恰违背了专业化的目的。被迫评论 GDP 增长的技术分析师只会产出低质量分析。分析 K 线形态的基本面分析师超出其能力范围。通过将每个 Agent 的数据限制在其分析视角内,我们在每个维度上得到更深入的分析,而不是在所有维度上得到更肤浅的分析。看到一切的裁判负责最终的合成。

从数据到辩论:前方的路

至此,我们有了一个可运行的数据管道。运行 python market_data_pipeline.py,你会得到 market_knowledge_base.json——一个任何 Agent 都能读取的结构化市场快照。但知识库不会辩论。数据是燃料,不是火焰。

在下一篇文章中,我们将构建辩论协议——将这些数据转化为竞争性分析、交叉质询和综合结论的引擎。以下是即将到来的内容:

第二篇预览:辩论协议

  • Agent 提示词工程:所有 8 个 Agent 的精确系统提示词,旨在产生结构化、有证据支撑的论点,而非自由形态的意见。每个提示词约束 Agent 引用其知识库切片中的具体数据,减少幻觉。
  • 3 轮协议实现:完整的辩论编排代码——开场论据(并行)、交叉质询(配对)、总结陈词(并行)。继承 L4 编排器的异步执行模式。
  • 论点格式规范:每个 Agent 输出遵循严格的 JSON schema:主张、证据(附知识库引用)、置信度和关键假设。这使得论点是机器可读且跨 Agent 可比较的。
  • 辩论记录生成:裁判将收到的完整记录——结构化、带时间戳,清晰标注每个论点归属于哪个 Agent。
  • 约 300 行可运行代码:debate_protocol_market.py——实例化 Agent、运行辩论轮次、收集记录。

但在此之前,我想让你用今天的代码做一件事。运行管道。查看知识库。问你自己:如果你是裁判,仅凭这些数据,你对市场的看法是什么?把它写下来。不要两边下注。选一个方向,写三条支持性的要点。

然后,当你阅读第二篇时,将你单人的裁判分析与 8 Agent 辩论的产出进行比较。这两者之间的差异——一个人看数据 vs. 八个专业化 Agent 把它撕开——正是我们要构建这个系统的原因。


📎 系列:多 Agent 辩论 × 市场分析。第 1 篇,共 4 篇。前一系列:多 Agent 辩论 L1-L4(对抗协作理论与生产部署)。下一篇:第二篇——辩论协议。

🔥 今天就运行管道。← 前一系列:多 Agent 辩论 L4 · 返回 AI 智能体探索 查看更多文章。

下一步阅读

  • 📖 同系列下一篇:辩论协议设计 — 8 个 AI Agent 如何进行结构化对抗与交叉质询 — 将本文的数据管道接入完整的 3 轮辩论引擎,包含 8 个 Agent 的精确系统提示词和 JSON 论据格式规范。
  • 📖 辩论理论基础:多 Agent 辩论 L3:评分与共识理论 — 理解本文裁判评分体系的理论来源:多维度评分、加权综合、以及为什么辩论需要结构化的胜负判定而非简单的「谁赢了」。
  • 📖 基础技能:多 Agent 编排 — 从单 Agent 到 Agent 团队 — 掌握多 Agent 系统的异步执行、消息传递和结果聚合模式,这些是辩论编排器的底层工程基础。

常见问题

Q: 为什么是 8 个 Agent 而不是 2 个(一多一空)?简单的多空辩论不够吗?

A: 市场是多维度的——时间尺度(短期超买 vs. 长期趋势增长)、分析框架(技术 vs. 基本面 vs. 宏观 vs. 情绪)、以及阵营内部的分歧(两个「看涨」的技术分析师可能因为关注不同指标而得出不同结论)。2 个 Agent 的辩论把所有因素坍缩成一个轴(涨还是跌),丢失了市场结构。8 个 Agent(4×2 矩阵)保留了阵营间的对抗张力,同时引入了阵营内的多样性——多头之间就「为什么看涨」产生的分歧,恰恰暴露了真正的不确定性。

Q: 数据管道用了哪些免费 API?需要付费吗?

A: 两个免费数据源:Yahoo Finance(通过 yfinance 库,无限调用,获取指数/行业 ETF/历史价格/成交量)和 FRED(通过 fredapi,免费 API key 支持 120 次/分钟,获取 GDP/CPI/失业率/收益率曲线等宏观指标)。管道设计为优雅降级——即使 FRED 不可用(无 key 或网络故障),系统不会崩溃,而是标记宏观数据不可用并继续运行。技术指标(RSI、MACD、ATR)和情绪指标(VIX 区间、行业广度)均为本地计算,不依赖第三方服务。

Q: 「Agent 数据切片」是什么意思?为什么不让每个 Agent 看到全部数据?

A: 数据切片是指每个 Agent 只收到知识库中与其分析视角相关的模块。例如,技术多头只看到 meta + indices + technicals,看不到宏观数据。这不是限制——是设计。如果每个 Agent 都看到一切,它们会收敛到相同的分析结论,失去专业化的价值。被迫评论 GDP 增长的技术分析师只能产出低质量分析,分析 K 线形态的基本面分析师同样超出其能力范围。约束创造深度:每个 Agent 在自己的维度上做深,裁判(看到全部数据)负责最终的综合。

Q: 辩论系统用的是什么模型?不同 Agent 可以用不同模型吗?

A: 当前架构下,所有 8 个 Agent + 裁判使用相同的 LLM(如 GPT-4o 或 Claude),但各自拥有不同的系统提示词和知识库切片。设计约束是:如果技术空头用 GPT-4o 而技术多头用 Claude,你无法分辨辩论结果反映的是真实的分析差异还是模型能力差异。相同模型、不同提示词 = 干净的实验设计。多模型部署(不同 Agent 使用最匹配其分析任务的模型)作为后续鲁棒性升级出现在第四篇。

Q: 辩论轮次是并行的还是串行的?总耗时多久?

A: 三轮辩论的并行策略:第一轮(开场论据)——8 个 Agent 全部并行,约 12 秒;第二轮(交叉质询)——4 对 Agent 并行(技术多头 vs. 技术空头、基本多头 vs. 基本空头等),每对内串行,约 12 秒;第三轮(总结陈词)——8 个 Agent 全部并行,约 12 秒。加上裁判综合约 3-5 秒,完整辩论总耗时约 40 秒。质询采用配对而非自由混战的设计(避免 8×7=56 个攻击向量变成噪音),与 L2 系列「约束创造质量」的设计原则一致。

© 2026 xslyl.com — 多 Agent 辩论 × 市场分析系列 · 第 1 篇

关于 · 联系 · 隐私政策 · Sitemap