欢迎光临
我们一直在努力

MCP (Model Context Protocol) 协议深度解析:从原理到生产级 Server 实现

什么是 MCP 协议?为什么它正在改变 AI 应用开发格局?

2024年底,Anthropic 发布了 Model Context Protocol (MCP)——一个开放标准协议,旨在为大型语言模型(LLM)提供统一、安全、标准化的外部工具和数据源访问方式。如果说 HTTP 协议改变了人类获取信息的方式,那么 MCP 协议正在从根本上改变 AI 模型与外部世界交互的方式。

在过去的一年多时间里,几乎所有主流的 AI Agent 框架(LangChain、CrewAI、AutoGen、Claude Code、Hermes Agent)都已经原生支持或通过插件支持 MCP 协议。截至2026年中,MCP 已经成为连接 LLM 与外部系统的”事实标准”。本文将深入解析 MCP 协议的核心原理,并通过完整的实战代码教你从零搭建一个生产级 MCP Server。

MCP 协议架构示意图

MCP 协议的核心架构与设计哲学

MCP 采用 客户端-服务器(Client-Server) 架构,但与传统 C/S 架构不同,这里的”客户端”是 LLM 应用本身(如 Claude Desktop、Hermes Agent 或自定义 AI 应用),而”服务器”则是提供工具、资源和能力的后端服务。

角色 说明 典型实现
MCP Host 运行 LLM 的主机程序,用户直接交互的界面 Claude Desktop、VS Code 插件、自定义 CLI
MCP Client 与 MCP Server 建立一对一连接的客户端 SDK 中的 Client 类(Python/TypeScript)
MCP Server 提供工具、资源和提示的轻量级服务 文件系统服务器、数据库查询服务器

协议设计的三项核心原则

  1. 零配置集成:MCP Server 通过标准 JSON-RPC 2.0 通信,任意语言实现的 Server 都可以即插即用。
  2. 安全优先:Server 显式声明其能力(工具列表、资源模板),Client 决定何时以及如何调用,模型无法越权。
  3. 可组合性:一个 Host 可以同时连接多个 Server,每个 Server 专注于一个能力域,类似微服务架构。

传输层:Stdio vs SSE 深度对比

MCP 当前支持两种传输方式,理解它们的区别对于正确设计部署架构至关重要。

stdio 传输(标准输入/输出)

Client 将 MCP Server 作为一个子进程启动,通过标准输入发送请求、从标准输出读取响应。这是最简单的模式,适用于本地部署。


1
2
3
# MCP Server 被作为子进程启动
$ npx @modelcontextprotocol/server-filesystem /path/to/allowed/dir
# Client 通过 stdin/stdout 进行 JSON-RPC 通信

优点:零网络开销,延迟最低;无需管理端口和认证;进程隔离天然安全。
缺点:Server 生命周期与 Client 绑定;无法远程访问;不适合分布式部署。

SSE 传输(Server-Sent Events)

Client 通过 HTTP 连接到远程 MCP Server,使用 SSE 接收服务器推送的事件。适用于生产环境的分布式架构。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Client 连接到远程 MCP Server
POST /mcp/v1/initialize
Host: mcp.example.com
Content-Type: application/json

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2025-03-26",
    "capabilities": {},
    "clientInfo": { "name": "my-agent", "version": "1.0.0" }
  }
}

优点:支持远程部署;Server 可以独立扩缩容;适合微服务架构。
缺点:增加网络延迟(通常 5-20ms);需要处理认证和授权;运维复杂度增加。

实战:从零搭建 Python MCP Server

接下来,我们将使用官方的

1
mcp

Python SDK 搭建一个生产级的 MCP Server,提供三个实用工具:

  • 获取实时股票价格
  • 计算两个日期之间的工作日天数
  • 执行 SQL 查询(带安全约束)

环境准备


1
2
3
4
5
6
7
8
9
10
11
12
# 创建项目目录
mkdir mcp-production-server && cd mcp-production-server

# 使用 Python 3.11+
python3 -m venv .venv
source .venv/bin/activate

# 安装 MCP SDK
pip install "mcp[cli]" httpx aiosqlite

# 验证安装
mcp --version

实现 MCP Server


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# server.py
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.types import (
    GetPromptResult,
    Prompt,
    PromptMessage,
    TextContent,
    Tool,
    ImageContent,
    EmbeddedResource,
)
from typing import Any
import mcp.server.stdio
import httpx
import json
from datetime import datetime, date
import aiosqlite
import os

# 创建 Server 实例
server = Server("production-tools")

# 安全配置:仅允许查询语句
ALLOWED_SQL_PREFIXES = ("SELECT", "PRAGMA", "WITH")
DB_PATH = os.environ.get("MCP_DB_PATH", "./data.db")

@server.list_prompts()
async def handle_list_prompts() -> list[Prompt]:
    """返回可用的 Prompt 模板"""
    return [
        Prompt(
            name="analyze-market",
            description="分析市场数据并生成报告模板",
        )
    ]

@server.get_prompt()
async def handle_get_prompt(
    name: str, arguments: dict[str, str] | None
) -> GetPromptResult:
    if name == "analyze-market":
        ticker = (arguments or {}).get("ticker", "UNKNOWN")
        return GetPromptResult(
            description=f"分析 {ticker} 的市场表现",
            messages=[
                PromptMessage(
                    role="user",
                    content=TextContent(
                        type="text",
                        text=f"请分析股票 {ticker} 的近期市场表现,"
                             f"包括价格走势、成交量变化和关键支撑位/阻力位分析。"
                             f"使用 stock_price 工具获取实时数据。"
                    ),
                )
            ],
        )
    raise ValueError(f"Unknown prompt: {name}")

@server.list_tools()
async def handle_list_tools() -> list[Tool]:
    """注册三个工具"""
    return [
        Tool(
            name="stock_price",
            description="获取指定股票代码的实时价格(支持美股和港股)",
            inputSchema={
                "type": "object",
                "properties": {
                    "symbol": {
                        "type": "string",
                        "description": "股票代码,如 AAPL、TSLA、0700.HK",
                    }
                },
                "required": ["symbol"],
            },
        ),
        Tool(
            name="business_days",
            description="计算两个日期之间的工作日天数(排除周末和通用节假日)",
            inputSchema={
                "type": "object",
                "properties": {
                    "start_date": {
                        "type": "string",
                        "description": "开始日期,YYYY-MM-DD 格式",
                    },
                    "end_date": {
                        "type": "string",
                        "description": "结束日期,YYYY-MM-DD 格式",
                    },
                },
                "required": ["start_date", "end_date"],
            },
        ),
        Tool(
            name="query_database",
            description="对本地 SQLite 数据库执行只读查询",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SQL 查询语句(仅支持 SELECT/PRAGMA/WITH)",
                    }
                },
                "required": ["sql"],
            },
        ),
    ]

@server.call_tool()
async def handle_call_tool(
    name: str, arguments: dict | None
) -> list[TextContent | ImageContent | EmbeddedResource]:
    if not arguments:
        arguments = {}

    if name == "stock_price":
        symbol = arguments.get("symbol", "AAPL")
        async with httpx.AsyncClient() as client:
            # 使用免费 API 获取实时价格
            url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}"
            headers = {"User-Agent": "Mozilla/5.0"}
            resp = await client.get(url, headers=headers, timeout=10)
            data = resp.json()
            result = data["chart"]["result"][0]
            meta = result["meta"]
            timestamp = result["timestamp"][0]
            price_time = datetime.fromtimestamp(timestamp).isoformat()

            return [
                TextContent(
                    type="text",
                    text=json.dumps({
                        "symbol": symbol,
                        "price": meta["regularMarketPrice"],
                        "previous_close": meta["chartPreviousClose"],
                        "currency": meta["currency"],
                        "time": price_time,
                    }, indent=2, ensure_ascii=False)
                )
            ]

    elif name == "business_days":
        start = date.fromisoformat(arguments["start_date"])
        end = date.fromisoformat(arguments["end_date"])
        # 简化计算:排除周末
        business_days = 0
        current = start
        while current <= end:
            if current.weekday() < 5:  # 周一到周五
                business_days += 1
            current = date.fromordinal(current.toordinal() + 1)

        return [
            TextContent(
                type="text",
                text=json.dumps({
                    "start_date": arguments["start_date"],
                    "end_date": arguments["end_date"],
                    "business_days": business_days,
                    "total_days": (end - start).days + 1,
                }, indent=2, ensure_ascii=False)
            )
        ]

    elif name == "query_database":
        sql = arguments["sql"].strip()
        # 安全校验:仅允许只读查询
        if not sql.upper().startswith(ALLOWED_SQL_PREFIXES):
            return [
                TextContent(
                    type="text",
                    text=json.dumps({
                        "error": "仅允许执行 SELECT、PRAGMA 和 WITH 查询",
                        "code": "ACCESS_DENIED"
                    }, indent=2)
                )
            ]

        async with aiosqlite.connect(DB_PATH) as db:
            db.row_factory = aiosqlite.Row
            cursor = await db.execute(sql)
            rows = await cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            result = [dict(zip(columns, row)) for row in rows]

            return [
                TextContent(
                    type="text",
                    text=json.dumps({
                        "row_count": len(result),
                        "columns": columns,
                        "data": result[:100],  # 限制返回行数
                    }, indent=2, ensure_ascii=False)
                )
            ]

    else:
        raise ValueError(f"未知工具: {name}")

async def main():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="production-tools",
                server_version="1.0.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

初始化测试数据库


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# init_db.py
import sqlite3
import os

DB_PATH = os.environ.get("MCP_DB_PATH", "./data.db")
os.makedirs(os.path.dirname(DB_PATH) or ".", exist_ok=True)

conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

cursor.executescript("""
CREATE TABLE IF NOT EXISTS products (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    category TEXT,
    price REAL,
    stock INTEGER,
    created_at TEXT DEFAULT (datetime('now'))
);

INSERT OR IGNORE INTO products VALUES
(1, '高性能GPU服务器 A100', '计算硬件', 15999.99, 12, '2026-06-01'),
(2, 'MCP 协议开发入门', '电子书', 39.99, 200, '2026-05-15'),
(3, 'AI Agent 架构设计指南', '课程', 299.99, 45, '2026-06-10'),
(4, '云原生部署工具包', '软件', 0, 9999, '2026-04-20');
""")

conn.commit()
conn.close()
print(f"数据库已初始化: {DB_PATH}")
print("包含 4 条产品记录")

配置 MCP Client(以 Claude Desktop 为例)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ~/.config/Claude/claude_desktop_config.json
{
  "mcpServers": {
    "production-tools": {
      "command": "python3",
      "args": ["/path/to/mcp-production-server/server.py"],
      "env": {
        "MCP_DB_PATH": "/path/to/mcp-production-server/data.db"
      }
    },
    "filesystem": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "/path/to/allowed/directory"
      ]
    }
  }
}

生产级部署:Docker + Supervisord 方案

对于生产环境,推荐使用 Docker 容器化部署,配合 SSE 传输模式实现远程访问。

Dockerfile


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用
COPY server.py init_db.py ./
RUN python3 init_db.py

# 暴露 SSE 端口
EXPOSE 8000

# 使用 uvicorn 启动 FastMCP SSE 服务
CMD ["python3", "-m", "mcp.cli", "run", "server:server", "--transport", "sse", "--port", "8000"]

1
2
3
4
# requirements.txt
mcp>=1.0.0
httpx>=0.27.0
aiosqlite>=0.19.0

docker-compose.yml(带健康检查和自动重启)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
version: "3.8"

services:
  mcp-server:
    build: .
    container_name: mcp-production
    ports:
      - "8000:8000"
    environment:
      - MCP_DB_PATH=/app/data.db
    volumes:
      - mcp-data:/app/data
    healthcheck:
      test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 10s
    restart: unless-stopped
    deploy:
      resources:
        limits:
          cpus: "0.5"
          memory: "256M"

volumes:
  mcp-data:

Nginx 反向代理配置(生产环境)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
server {
    listen 443 ssl;
    server_name mcp.example.com;

    ssl_certificate /etc/letsencrypt/live/mcp.example.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/mcp.example.com/privkey.pem;

    # SSE 需要长连接
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 86400s;
    proxy_send_timeout 86400s;

    location /mcp/ {
        proxy_pass http://127.0.0.1:8000/;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }

    # 健康检查端点
    location /health {
        proxy_pass http://127.0.0.1:8000/health;
        access_log off;
    }
}

安全最佳实践

MCP Server 生产部署时,必须考虑以下安全要点:

1. 认证与授权


1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 简单的 API Key 验证中间件
from fastapi import FastAPI, Header, HTTPException

app = FastAPI()

VALID_API_KEYS = set(os.environ.get("MCP_API_KEYS", "").split(","))

async def verify_auth(authorization: str = Header(None)):
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing API key")
    token = authorization.replace("Bearer ", "")
    if token not in VALID_API_KEYS:
        raise HTTPException(status_code=403, detail="Invalid API key")
    return token

2. 工具调用审计日志


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import logging
import json
from datetime import datetime

logging.basicConfig(level=logging.INFO)
audit_logger = logging.getLogger("mcp_audit")

def log_tool_call(tool_name: str, arguments: dict, client_id: str, result_status: str):
    audit_logger.info(json.dumps({
        "timestamp": datetime.utcnow().isoformat(),
        "tool": tool_name,
        "arguments": arguments,
        "client_id": client_id,
        "status": result_status,
    }))

3. 速率限制


1
2
3
4
5
6
7
8
9
10
11
12
13
# 使用 slowapi 实现速率限制
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)

@server.call_tool()
@limiter.limit("60/minute")  # 每分钟最多 60 次调用
async def handle_call_tool(name, arguments):
    # ... 工具实现
    pass

MCP 生态与未来展望

截至2026年中,MCP 生态已相当成熟:

  • 官方 SDK:Python 和 TypeScript SDK 均已稳定发布,支持全部协议特性
  • 社区 Server 市场:GitHub 上有超过 1000 个开源 MCP Server,覆盖数据库、文件系统、浏览器、Git、Slack、邮件等常见场景
  • 框架集成:LangChain、CrewAI、AutoGen、Hermes Agent、Claude Code 等主流 Agent 框架均已原生支持 MCP
  • 企业级特性:支持分布式追踪(OpenTelemetry)、Prometheus 指标暴露、结构化日志

未来值得关注的方向包括:

  1. MCP Registry:类似 Docker Hub 的中央 Server 注册中心,实现一键安装发现
  2. 双向通信增强:Server 主动推送事件通知 Client(如数据变更回调)
  3. 流式工具调用:大模型可以流式接收工具调用结果,边处理边输出
  4. 联邦 MCP:跨组织的 MCP Server 发现和信任传播机制

总结

MCP 协议正在快速成为 AI Agent 工具箱中的”HTTP 协议”——它标准化了 LLM 与外部世界的交互方式,使得工具开发者和 AI 应用开发者可以解耦工作。通过本文的学习,你应该已经掌握了:

  • MCP 协议的核心架构和设计哲学
  • stdio 和 SSE 两种传输方式的适用场景
  • 如何使用 Python SDK 开发自定义 MCP Server
  • 如何将 MCP Server Docker 化并部署到生产环境
  • 生产级安全最佳实践

无论你是 AI 应用开发者、后端工程师还是平台架构师,掌握 MCP 协议都将在 AI 原生应用开发中占据先机。建议从本地 stdio 模式开始体验,逐步迁移到 SSE 生产部署。

如果你有任何问题或实践经验,欢迎在评论区交流讨论!

【本站文章皆为原创,未经允许不得转载】:汤不热吧 » MCP (Model Context Protocol) 协议深度解析:从原理到生产级 Server 实现
分享到: 更多 (0)