实现一个支持 Streamable HTTP 的 MCP 服务器及客户端_mcp streamable
在现代的 Web 开发中,实时数据传输是一个常见的需求。本文将介绍如何使用 FastAPI 实现一个支持 Streamable HTTP 的 MCP(Model Context Protocol)服务器,并提供 Python 客户端和前端客户端的实现。
1. 什么是 Streamable HTTP 和 MCP?
Streamable HTTP 是一种允许服务器以流的形式向客户端发送数据的技术。这在处理长时间运行的操作或实时数据更新时非常有用。
MCP(Model Context Protocol) 是一种协议,用于在客户端和服务器之间传输模型上下文信息。它支持初始化、消息传输和进度跟踪等功能。
2. 服务器端实现
我们将使用 FastAPI 来实现一个支持 Streamable HTTP 的 MCP 服务器。FastAPI 是一个现代、快速的 Web 框架,基于 Python 类型提示,支持异步操作。
安装 FastAPI 和 Uvicorn
首先,确保你已经安装了 FastAPI 和 Uvicorn。运行以下命令进行安装:
pip install fastapi uvicorn
服务器端代码
以下是服务器端的完整代码:
from fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.responses import StreamingResponse, JSONResponsefrom fastapi.middleware.cors import CORSMiddlewareimport asyncioimport uuidapp = FastAPI()# 配置 CORSapp.add_middleware( CORSMiddleware, allow_origins=[\"*\"], # 允许所有来源 allow_credentials=True, allow_methods=[\"*\"], # 允许所有 HTTP 方法 allow_headers=[\"*\"], # 允许所有头部 expose_headers=[\"Mcp-Session-Id\"], # 允许客户端访问的自定义头)# 模拟存储会话信息sessions = {}@app.post(\"/message\")async def handle_message(request: Request): session_id = request.headers.get(\"Mcp-Session-Id\") if not session_id: session_id = str(uuid.uuid4()) # 获取请求数据 data = await request.json() print(f\"Received message: {data}\") # 模拟处理请求并发送响应 response_data = {\"result\": \"Request processed\"} # 返回 JSON 响应,并在头中包含 Mcp-Session-Id return JSONResponse(content={\"jsonrpc\": \"2.0\", \"id\": data.get(\"id\"), \"result\": response_data}, headers={\"Mcp-Session-Id\": session_id})@app.get(\"/message\")async def handle_sse(request: Request): session_id = request.headers.get(\"Mcp-Session-Id\") or request.query_params.get(\"Mcp-Session-Id\") if not session_id: raise HTTPException(status_code=400, detail=\"Session ID is required\") async def event_generator(): for i in range(5): await asyncio.sleep(1) # 模拟延迟 yield f\"data: Message {i + 1}\\n\\n\" return StreamingResponse(event_generator(), media_type=\"text/event-stream\")if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=8000)
服务器端说明
-
CORS 配置:
- 使用
CORSMiddleware
允许跨域请求,确保前端页面可以访问服务器。 expose_headers
配置项允许客户端访问自定义头Mcp-Session-Id
。
- 使用
-
POST 请求处理:
- 如果客户端未提供
Mcp-Session-Id
,服务器会生成一个新的会话 ID 并返回。 - 服务器处理请求并返回响应,同时在响应头中包含
Mcp-Session-Id
。
- 如果客户端未提供
-
GET 请求处理:
- 服务器通过
StreamingResponse
返回流式数据。 - 每隔 1 秒发送一条消息,模拟实时数据。
- 服务器通过
3. Python 客户端实现
接下来,我们实现一个 Python 客户端,用于与服务器进行交互。
安装依赖
确保你已经安装了 requests
库。如果尚未安装,可以运行以下命令进行安装:
pip install requests
客户端代码
以下是 Python 客户端的完整代码:
import requestsserver_url = \"http://127.0.0.1:8000/message\"# 发送初始化请求init_data = { \"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"initialize\", \"params\": { \"protocolVersion\": \"2024-11-05\", \"capabilities\": { \"roots\": { \"listChanged\": True }, \"sampling\": {} }, \"clientInfo\": { \"name\": \"ExampleClient\", \"version\": \"1.0.0\" } }}response = requests.post(server_url, json=init_data)session_id = response.headers.get(\"Mcp-Session-Id\")print(f\"Session ID: {session_id}\")# 发送普通请求request_data = { \"jsonrpc\": \"2.0\", \"id\": 2, \"method\": \"some_method\", \"params\": { \"_meta\": { \"progressToken\": \"abc123\" } }}response = requests.post(server_url, json=request_data, headers={\"Mcp-Session-Id\": session_id})print(f\"Response: {response.json()}\")# 监听 SSE 流print(\"Listening for SSE messages...\")print(\"Mcp-Session-Id\", session_id)with requests.get(server_url, headers={\"Mcp-Session-Id\": session_id}, stream=True) as response: for line in response.iter_lines(): if line: decoded_line = line.decode(\"utf-8\") print(f\"SSE Message: {decoded_line}\")
客户端说明
-
初始化请求:
- 发送初始化请求,获取
Mcp-Session-Id
。 - 将
Mcp-Session-Id
保存在变量中,用于后续请求。
- 发送初始化请求,获取
-
发送普通请求:
- 在请求头中包含
Mcp-Session-Id
,发送普通请求并获取响应。
- 在请求头中包含
-
监听 SSE 流:
- 使用
requests.get
的stream=True
参数,监听服务器发送的 SSE 流。 - 实时打印接收到的消息。
- 使用
4. 前端客户端实现
最后,我们实现一个前端页面,用于与服务器进行交互并实时展示数据。
前端代码
以下是前端页面的完整代码:
<!DOCTYPE html><html lang=\"en\"><head> <meta charset=\"UTF-8\"> <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"> <title>MCP Client</title> <style> body { font-family: Arial, sans-serif; margin: 20px; background-color: #f4f4f4; color: #333; } .container { max-width: 800px; margin: 0 auto; padding: 20px; background: #fff; border-radius: 8px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); } h1 { text-align: center; color: #2c3e50; } .buttons { text-align: center; margin-bottom: 20px; } button { padding: 10px 20px; margin: 0 10px; font-size: 16px; color: #fff; background-color: #3498db; border: none; border-radius: 4px; cursor: pointer; transition: background-color 0.3s; } button:hover { background-color: #2980b9; } .messages { margin-top: 20px; padding: 10px; background: #ecf0f1; border-radius: 4px; } .message { margin-bottom: 10px; padding: 10px; background: #bdc3c7; border-radius: 4px; color: #2c3e50; } </style></head><body> <div class=\"container\"> <h1>MCP Client</h1> <div class=\"buttons\"> <button id=\"initButton\">Initialize</button> <button id=\"sendMessageButton\">Send Message</button> </div> <div class=\"messages\" id=\"messages\"></div> </div> <script> const serverUrl = \"http://127.0.0.1:8000/message\"; let sessionId = null; let eventSource = null; // 初始化按钮点击事件 document.getElementById(\'initButton\').addEventListener(\'click\', async () => { const initData = { \"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"initialize\", \"params\": { \"protocolVersion\": \"2024-11-05\", \"capabilities\": { \"roots\": { \"listChanged\": true }, \"sampling\": {} }, \"clientInfo\": { \"name\": \"ExampleClient\", \"version\": \"1.0.0\" } } }; const response = await fetch(serverUrl, { method: \'POST\', headers: { \'Content-Type\': \'application/json\' }, body: JSON.stringify(initData) }); if (response.ok) { const data = await response.json(); sessionId = response.headers.get(\'Mcp-Session-Id\'); displayMessage(`Session ID: ${sessionId}`); startSSE(); } else { displayMessage(`Initialization failed: ${response.statusText}`); } }); // 发送消息按钮点击事件 document.getElementById(\'sendMessageButton\').addEventListener(\'click\', async () => { if (!sessionId) { displayMessage(\"Please initialize first.\"); return; } const messageData = { \"jsonrpc\": \"2.0\", \"id\": 2, \"method\": \"some_method\", \"params\": { \"_meta\": { \"progressToken\": \"abc123\" } } }; const response = await fetch(serverUrl, { method: \'POST\', headers: { \'Content-Type\': \'application/json\', \'Mcp-Session-Id\': sessionId }, body: JSON.stringify(messageData) }); if (response.ok) { const data = await response.json(); displayMessage(`Response: ${JSON.stringify(data)}`); } else { displayMessage(`Message sending failed: ${response.statusText}`); } }); // 启动 SSE 流 function startSSE() { if (!sessionId) { displayMessage(\"Please initialize first.\"); return; } if (eventSource) { eventSource.close(); } eventSource = new EventSource(`${serverUrl}?Mcp-Session-Id=${sessionId}`); eventSource.onmessage = (event) => { displayMessage(`SSE Message: ${event.data}`); }; eventSource.onerror = (error) => { displayMessage(`SSE Error: ${error.message}`); eventSource.close(); }; } // 显示消息 function displayMessage(message) { const messagesDiv = document.getElementById(\'messages\'); const messageElement = document.createElement(\'div\'); messageElement.className = \'message\'; messageElement.textContent = message; messagesDiv.appendChild(messageElement); } </script></body></html>
前端说明
-
初始化按钮点击事件:
- 发送初始化请求,获取
Mcp-Session-Id
。 - 将
Mcp-Session-Id
保存在变量中,用于后续请求。
- 发送初始化请求,获取
-
发送消息按钮点击事件:
- 在请求头中包含
Mcp-Session-Id
,发送普通请求并获取响应。
- 在请求头中包含
-
监听 SSE 流:
- 使用
EventSource
监听服务器发送的 SSE 流。 - 每当接收到新的消息时,调用
displayMessage
函数动态展示消息。
- 使用
-
显示消息:
- 动态创建一个新的
div
元素,并将其添加到页面的messages
区域。
- 动态创建一个新的
5. 运行步骤
启动服务器
将服务器端代码保存为 server.py
,然后运行以下命令启动服务器:
uvicorn server:app --reload
运行 Python 客户端
将 Python 客户端代码保存为 client.py
,然后运行以下命令启动客户端:
python client.py
运行效果:
(.venv) (base) ➜ pop git:(main) ✗ python client.pySession ID: 587bb6ad-08f5-4102-8b27-4c276e9d7815Response: {\'jsonrpc\': \'2.0\', \'id\': 2, \'result\': {\'result\': \'Request processed\'}}Listening for SSE messages...Mcp-Session-Id 587bb6ad-08f5-4102-8b27-4c276e9d7815SSE Message: data: Message 1SSE Message: data: Message 2SSE Message: data: Message 3SSE Message: data: Message 4SSE Message: data: Message 5
运行前端页面
将前端代码保存为 index.html
,然后在浏览器中打开该文件。
操作步骤
- 点击“Initialize”按钮,初始化会话并获取
Mcp-Session-Id
。 - 点击“Send Message”按钮,发送普通请求并查看服务器的响应。
- 页面会自动监听 SSE 流,并实时显示服务器发送的实时消息。
运行效果:
6. 调试
如果遇到问题,可以使用以下方法进行调试:
-
检查服务器日志:
- 查看服务器日志,确认是否生成了
Mcp-Session-Id
并返回给客户端。 - 确认
StreamingResponse
是否正确返回了流式数据。
- 查看服务器日志,确认是否生成了
-
检查浏览器开发者工具:
- 打开浏览器的开发者工具(F12),查看网络请求的响应头,确认是否包含
Mcp-Session-Id
。 - 查看
EventSource
的网络请求,确认是否正确接收到了流式数据。
- 打开浏览器的开发者工具(F12),查看网络请求的响应头,确认是否包含
-
检查跨域问题:
- 确保服务器正确配置了 CORS,允许前端页面的域名和端口。
- 确保
expose_headers
配置项正确,允许客户端访问自定义头Mcp-Session-Id
。
通过以上步骤,你可以实现一个支持 Streamable HTTP 的 MCP 服务器,并通过 Python 客户端和前端页面与服务器进行交互。希望这篇文章对你有所帮助