> 技术文档 > 实现一个支持 Streamable HTTP 的 MCP 服务器及客户端_mcp streamable

实现一个支持 Streamable HTTP 的 MCP 服务器及客户端_mcp streamable

在现代的 Web 开发中,实时数据传输是一个常见的需求。本文将介绍如何使用 FastAPI 实现一个支持 Streamable HTTP 的 MCP(Model Context Protocol)服务器,并提供 Python 客户端和前端客户端的实现。

1. 什么是 Streamable HTTP 和 MCP?

Streamable HTTP 是一种允许服务器以流的形式向客户端发送数据的技术。这在处理长时间运行的操作或实时数据更新时非常有用。

MCP(Model Context Protocol) 是一种协议,用于在客户端和服务器之间传输模型上下文信息。它支持初始化、消息传输和进度跟踪等功能。

2. 服务器端实现

我们将使用 FastAPI 来实现一个支持 Streamable HTTP 的 MCP 服务器。FastAPI 是一个现代、快速的 Web 框架,基于 Python 类型提示,支持异步操作。

安装 FastAPI 和 Uvicorn

首先,确保你已经安装了 FastAPI 和 Uvicorn。运行以下命令进行安装:

pip install fastapi uvicorn

服务器端代码

以下是服务器端的完整代码:

from fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.responses import StreamingResponse, JSONResponsefrom fastapi.middleware.cors import CORSMiddlewareimport asyncioimport uuidapp = FastAPI()# 配置 CORSapp.add_middleware( CORSMiddleware, allow_origins=[\"*\"], # 允许所有来源 allow_credentials=True, allow_methods=[\"*\"], # 允许所有 HTTP 方法 allow_headers=[\"*\"], # 允许所有头部 expose_headers=[\"Mcp-Session-Id\"], # 允许客户端访问的自定义头)# 模拟存储会话信息sessions = {}@app.post(\"/message\")async def handle_message(request: Request): session_id = request.headers.get(\"Mcp-Session-Id\") if not session_id: session_id = str(uuid.uuid4()) # 获取请求数据 data = await request.json() print(f\"Received message: {data}\") # 模拟处理请求并发送响应 response_data = {\"result\": \"Request processed\"} # 返回 JSON 响应,并在头中包含 Mcp-Session-Id return JSONResponse(content={\"jsonrpc\": \"2.0\", \"id\": data.get(\"id\"), \"result\": response_data}, headers={\"Mcp-Session-Id\": session_id})@app.get(\"/message\")async def handle_sse(request: Request): session_id = request.headers.get(\"Mcp-Session-Id\") or request.query_params.get(\"Mcp-Session-Id\") if not session_id: raise HTTPException(status_code=400, detail=\"Session ID is required\") async def event_generator(): for i in range(5): await asyncio.sleep(1) # 模拟延迟 yield f\"data: Message {i + 1}\\n\\n\" return StreamingResponse(event_generator(), media_type=\"text/event-stream\")if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=8000)

服务器端说明

  1. CORS 配置

    • 使用 CORSMiddleware 允许跨域请求,确保前端页面可以访问服务器。
    • expose_headers 配置项允许客户端访问自定义头 Mcp-Session-Id
  2. POST 请求处理

    • 如果客户端未提供 Mcp-Session-Id,服务器会生成一个新的会话 ID 并返回。
    • 服务器处理请求并返回响应,同时在响应头中包含 Mcp-Session-Id
  3. GET 请求处理

    • 服务器通过 StreamingResponse 返回流式数据。
    • 每隔 1 秒发送一条消息,模拟实时数据。

3. Python 客户端实现

接下来,我们实现一个 Python 客户端,用于与服务器进行交互。

安装依赖

确保你已经安装了 requests 库。如果尚未安装,可以运行以下命令进行安装:

pip install requests

客户端代码

以下是 Python 客户端的完整代码:

import requestsserver_url = \"http://127.0.0.1:8000/message\"# 发送初始化请求init_data = { \"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"initialize\", \"params\": { \"protocolVersion\": \"2024-11-05\", \"capabilities\": { \"roots\": { \"listChanged\": True }, \"sampling\": {} }, \"clientInfo\": { \"name\": \"ExampleClient\", \"version\": \"1.0.0\" } }}response = requests.post(server_url, json=init_data)session_id = response.headers.get(\"Mcp-Session-Id\")print(f\"Session ID: {session_id}\")# 发送普通请求request_data = { \"jsonrpc\": \"2.0\", \"id\": 2, \"method\": \"some_method\", \"params\": { \"_meta\": { \"progressToken\": \"abc123\" } }}response = requests.post(server_url, json=request_data, headers={\"Mcp-Session-Id\": session_id})print(f\"Response: {response.json()}\")# 监听 SSE 流print(\"Listening for SSE messages...\")print(\"Mcp-Session-Id\", session_id)with requests.get(server_url, headers={\"Mcp-Session-Id\": session_id}, stream=True) as response: for line in response.iter_lines(): if line: decoded_line = line.decode(\"utf-8\") print(f\"SSE Message: {decoded_line}\")

客户端说明

  1. 初始化请求

    • 发送初始化请求,获取 Mcp-Session-Id
    • Mcp-Session-Id 保存在变量中,用于后续请求。
  2. 发送普通请求

    • 在请求头中包含 Mcp-Session-Id,发送普通请求并获取响应。
  3. 监听 SSE 流

    • 使用 requests.getstream=True 参数,监听服务器发送的 SSE 流。
    • 实时打印接收到的消息。

4. 前端客户端实现

最后,我们实现一个前端页面,用于与服务器进行交互并实时展示数据。

前端代码

以下是前端页面的完整代码:

<!DOCTYPE html><html lang=\"en\"><head> <meta charset=\"UTF-8\"> <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"> <title>MCP Client</title> <style> body { font-family: Arial, sans-serif; margin: 20px; background-color: #f4f4f4; color: #333; } .container { max-width: 800px; margin: 0 auto; padding: 20px; background: #fff; border-radius: 8px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); } h1 { text-align: center; color: #2c3e50; } .buttons { text-align: center; margin-bottom: 20px; } button { padding: 10px 20px; margin: 0 10px; font-size: 16px; color: #fff; background-color: #3498db; border: none; border-radius: 4px; cursor: pointer; transition: background-color 0.3s; } button:hover { background-color: #2980b9; } .messages { margin-top: 20px; padding: 10px; background: #ecf0f1; border-radius: 4px; } .message { margin-bottom: 10px; padding: 10px; background: #bdc3c7; border-radius: 4px; color: #2c3e50; } </style></head><body> <div class=\"container\"> <h1>MCP Client</h1> <div class=\"buttons\"> <button id=\"initButton\">Initialize</button> <button id=\"sendMessageButton\">Send Message</button> </div> <div class=\"messages\" id=\"messages\"></div> </div> <script> const serverUrl = \"http://127.0.0.1:8000/message\"; let sessionId = null; let eventSource = null; // 初始化按钮点击事件 document.getElementById(\'initButton\').addEventListener(\'click\', async () => { const initData = { \"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"initialize\", \"params\": {  \"protocolVersion\": \"2024-11-05\",  \"capabilities\": { \"roots\": { \"listChanged\": true }, \"sampling\": {}  },  \"clientInfo\": { \"name\": \"ExampleClient\", \"version\": \"1.0.0\"  } } }; const response = await fetch(serverUrl, { method: \'POST\', headers: {  \'Content-Type\': \'application/json\' }, body: JSON.stringify(initData) }); if (response.ok) { const data = await response.json(); sessionId = response.headers.get(\'Mcp-Session-Id\'); displayMessage(`Session ID: ${sessionId}`); startSSE(); } else { displayMessage(`Initialization failed: ${response.statusText}`); } }); // 发送消息按钮点击事件 document.getElementById(\'sendMessageButton\').addEventListener(\'click\', async () => { if (!sessionId) { displayMessage(\"Please initialize first.\"); return; } const messageData = { \"jsonrpc\": \"2.0\", \"id\": 2, \"method\": \"some_method\", \"params\": {  \"_meta\": { \"progressToken\": \"abc123\"  } } }; const response = await fetch(serverUrl, { method: \'POST\', headers: {  \'Content-Type\': \'application/json\',  \'Mcp-Session-Id\': sessionId }, body: JSON.stringify(messageData) }); if (response.ok) { const data = await response.json(); displayMessage(`Response: ${JSON.stringify(data)}`); } else { displayMessage(`Message sending failed: ${response.statusText}`); } }); // 启动 SSE 流 function startSSE() { if (!sessionId) { displayMessage(\"Please initialize first.\"); return; } if (eventSource) { eventSource.close(); } eventSource = new EventSource(`${serverUrl}?Mcp-Session-Id=${sessionId}`); eventSource.onmessage = (event) => { displayMessage(`SSE Message: ${event.data}`); }; eventSource.onerror = (error) => { displayMessage(`SSE Error: ${error.message}`); eventSource.close(); }; } // 显示消息 function displayMessage(message) { const messagesDiv = document.getElementById(\'messages\'); const messageElement = document.createElement(\'div\'); messageElement.className = \'message\'; messageElement.textContent = message; messagesDiv.appendChild(messageElement); } </script></body></html>

前端说明

  1. 初始化按钮点击事件

    • 发送初始化请求,获取 Mcp-Session-Id
    • Mcp-Session-Id 保存在变量中,用于后续请求。
  2. 发送消息按钮点击事件

    • 在请求头中包含 Mcp-Session-Id,发送普通请求并获取响应。
  3. 监听 SSE 流

    • 使用 EventSource 监听服务器发送的 SSE 流。
    • 每当接收到新的消息时,调用 displayMessage 函数动态展示消息。
  4. 显示消息

    • 动态创建一个新的 div 元素,并将其添加到页面的 messages 区域。

5. 运行步骤

启动服务器

将服务器端代码保存为 server.py,然后运行以下命令启动服务器:

uvicorn server:app --reload

运行 Python 客户端

将 Python 客户端代码保存为 client.py,然后运行以下命令启动客户端:

python client.py

运行效果:

(.venv) (base) ➜ pop git:(main) ✗ python client.pySession ID: 587bb6ad-08f5-4102-8b27-4c276e9d7815Response: {\'jsonrpc\': \'2.0\', \'id\': 2, \'result\': {\'result\': \'Request processed\'}}Listening for SSE messages...Mcp-Session-Id 587bb6ad-08f5-4102-8b27-4c276e9d7815SSE Message: data: Message 1SSE Message: data: Message 2SSE Message: data: Message 3SSE Message: data: Message 4SSE Message: data: Message 5

运行前端页面

将前端代码保存为 index.html,然后在浏览器中打开该文件。

操作步骤

  1. 点击“Initialize”按钮,初始化会话并获取 Mcp-Session-Id
  2. 点击“Send Message”按钮,发送普通请求并查看服务器的响应。
  3. 页面会自动监听 SSE 流,并实时显示服务器发送的实时消息。

运行效果:
实现一个支持 Streamable HTTP 的 MCP 服务器及客户端_mcp streamable

6. 调试

如果遇到问题,可以使用以下方法进行调试:

  1. 检查服务器日志

    • 查看服务器日志,确认是否生成了 Mcp-Session-Id 并返回给客户端。
    • 确认 StreamingResponse 是否正确返回了流式数据。
  2. 检查浏览器开发者工具

    • 打开浏览器的开发者工具(F12),查看网络请求的响应头,确认是否包含 Mcp-Session-Id
    • 查看 EventSource 的网络请求,确认是否正确接收到了流式数据。
  3. 检查跨域问题

    • 确保服务器正确配置了 CORS,允许前端页面的域名和端口。
    • 确保 expose_headers 配置项正确,允许客户端访问自定义头 Mcp-Session-Id

通过以上步骤,你可以实现一个支持 Streamable HTTP 的 MCP 服务器,并通过 Python 客户端和前端页面与服务器进行交互。希望这篇文章对你有所帮助