> 技术文档 > MCP的SSE的底层通信原理(给出了不使用SDK实现mcp服务器的示例)_mcp sse message

MCP的SSE的底层通信原理(给出了不使用SDK实现mcp服务器的示例)_mcp sse message


MCP通信原理

模型上下文协议( MCP)定义了两种标准的通信传输协议:STDIOSSE,用于连接大型语言模型(LLM)与外部工具或数据源。

特性 标准输入/输出(Stdio) HTTP + 服务器发送事件(SSE通信方式 通过标准输入(stdin)和标准输出(stdout)进行通信 客户端通过 HTTP POST 发送请求,服务器使用 SSE 推送响应 典型场景 本地开发、插件集成、命令行工具等 分布式部署、远程服务调用、需要实时数据更新的应用等 优势 无需网络连接,通信延迟低;部署简单;数据在本地传输,有助于保障数据隐私 支持跨网络通信;服务器可主动推送数据;兼容现有的 HTTP 基础设施,易于集成和扩展 限制 仅适用于同一台机器上的进程间通信,无法支持分布式部署 需要网络连接,可能涉及更复杂的安全配置和网络管理 认证机制 不适用 支持 JWT 和 API 密钥等认证机制 可扩展性 适用于单一进程通信 支持多客户端连接,适合分布式系统 配置复杂度 配置简单,适合快速开发和调试 配置灵活,可自定义端口、端点、认证等参数 适用场景 构建命令行工具、实现本地集成、需要简单的进程通信、使用 shell 脚本 构建 Web 应用程序、需要网络通信、需要认证、支持多个客户端、需要水平扩展

SSE介绍

Server-Sent Events(SSE)是一种基于 HTTP 协议的服务器推送技术,允许服务器通过单向的方式向客户端发送实时更新的数据流。SSEWebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息

总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。

stdio中可以使用stdin来进行输入,使用stdout来进行输出。

但是SSE是单向通道,MCP要如何实现双向通信呢?是建立两根SSE通道吗?带着这个疑问,我们来进行动手实践。

MCP的SSE通信流程

我们可以构建一个简单的MCP Server,然后利用MCP官方提供的工具npx @modelcontextprotocol/inspector 可以比较方便地拉起一个验证MCP的管理页。针对这个管理页抓包就能发现一些SSE的通信端倪。

初始化项目

uv init mcp-server-democd mcp-server-demouv add \"mcp[cli]\"

构建一个简单的MCP server

# server.pyfrom mcp.server.fastmcp import FastMCP# Create an MCP servermcp = FastMCP(\"Demo\", port=3001)# Add an addition tool@mcp.tool()def add(a: int, b: int) -> int: \"\"\"Add two numbers\"\"\" return a + b# Add a dynamic greeting resource@mcp.resource(\"greeting://{name}\")def get_greeting(name: str) -> str: \"\"\"Get a personalized greeting\"\"\" return f\"Hello, {name}!\"if __name__ == \"__main__\": mcp.run(transport=\"sse\")

启动服务器以及启动mcp inspector

python server.pynpx @modelcontextprotocol/inspector

打开http://127.0.0.1:6274进入管理页,打开开发者模式,针对这个管理页抓包就能发现一些SSE的通信端倪。

  1. 当连接到mcp server时,会建立一条SSE长连接,它只负责推送消息
    在这里插入图片描述

    可以看到Client连接上/sse这个地址的第一个Event就是告诉Client发送信息需要去哪个URL,这个URL通常会带上唯一的会话ID

  2. 当我调用工具时计算2+7时,client会发送一个post请求,结果会通过一开始的SSE长连接返回

    在这里插入图片描述
    在这里插入图片描述

总结双向通信的实现方式为

  1. 只有一根SSE长连接,用来ServerClient推送数据,另外一个ClientServer发送请求的通道是使用普通的HTTP POST请求。

  2. ClientServer发送的HTTP POST请求中只使用2xx反馈是否收到指令,所有的数据返回是通过一开始的SSE长连接来推送。

简单实现MCP,致敬这个优秀的设计

from fastapi import FastAPI, Requestfrom sse_starlette.sse import EventSourceResponsefrom pydantic import BaseModelfrom typing import Optional, Callable, Awaitable, Any, List, Dictimport asyncioimport uuidimport jsonimport inspectimport uvicornapp = FastAPI()mcpHub: Dict[str, \"MCPServer\"] = {}# 请求模型class McpRequest(BaseModel): id: Optional[int] = None jsonrpc: str method: str params: Optional[dict] = None# MCP 服务核心类class MCPServer: def __init__( self, name: str, message_path: str, tools: List[Callable[..., Awaitable[Any]]] ): \"\"\" 初始化函数,用于设置实例变量和启动必要的异步任务。 参数: - name (str): 服务器名称。 - message_path (str): 消息路径,用于指定消息的路由。 - tools (List[Callable[..., Awaitable[Any]]]): 工具函数列表,这些函数是异步的,并可接受不定数量的参数。 \"\"\" # 初始化一个异步队列,用于处理消息 self.queue: asyncio.Queue = asyncio.Queue() # 生成一个唯一的客户端ID self.client_id: str = str(uuid.uuid4()) # 存储消息路径 self.message_path: str = message_path # 存储提供的工具函数列表 self.tools = tools # 初始化信息字典,包含协议版本、功能和服务器信息 self.info = { \"protocolVersion\": \"2024-11-05\", \"capabilities\": {\"experimental\": {}, \"tools\": {\"listChanged\": False}}, \"serverInfo\": {\"name\": name, \"version\": \"1.6.0\"}, } def list_tool(self) -> List[Dict[str, Any]]: \"\"\" 生成工具列表的函数。 此函数遍历self.tools中的所有工具,并为每个工具生成一个包含名称、描述和输入模式的字典。 这些字典最终被收集到一个列表中并返回。该函数主要用于整理和提供工具的相关信息,以便于后续的处理或展示。 Returns: List[Dict[str, Any]]: 包含所有工具信息的列表,每个工具的信息包括名称、描述和输入模式。 \"\"\" # 初始化一个空列表,用于存储所有工具的信息 tool_list = [] # 遍历self.tools中的每个工具 for tool in self.tools: # 使用inspect模块获取工具的签名信息,以便后续获取参数信息 sig = inspect.signature(tool) # 将工具的名称、描述和输入模式添加到tool_list中 tool_list.append( {  \"name\": tool.__name__, # 工具的名称  \"description\": tool.__doc__, # 工具的描述信息  \"inputSchema\": { \"type\": \"object\", \"properties\": { # 根据工具的签名信息,生成输入模式的属性 name: {\"title\": name, \"type\": \"string\"} for name in sig.parameters },  }, } ) # 返回包含所有工具信息的列表 return tool_list # 异步生成器函数:reader # 目的:持续从队列中读取事件并返回 async def reader(self): # 循环无限期地尝试从队列中获取下一个事件 while True: # 异步地从队列中获取事件 event = await self.queue.get() # 返回事件 yield event @staticmethod def response(result: Any, id: Optional[int]) -> str: \"\"\" 生成一个JSON-RPC格式的响应字符串。 参数: - result: Any 类型,代表JSON-RPC请求的结果,可以是任意类型。 - id: Optional[int] 类型,代表JSON-RPC请求的ID,可能为None,用于标识请求。 返回值: - str 类型,代表JSON-RPC格式的响应字符串,包括结果和可选的请求ID。 \"\"\" # 初始化JSON-RPC响应消息的基本结构,包含结果。 message = {\"jsonrpc\": \"2.0\", \"result\": result} # 如果请求ID不为空,则将其添加到响应消息中。 if id is not None: message[\"id\"] = id # 将响应消息序列化为JSON字符串并返回。 return json.dumps(message) async def handle_request(self, req: McpRequest): \"\"\" 处理请求函数,根据不同的请求方法执行相应的逻辑。 参数: - req (McpRequest): 请求对象,包含请求的方法、参数等信息。 此函数根据请求方法的不同,执行初始化、工具列表查询或工具调用等操作,并将结果通过队列返回。 \"\"\" if req.method == \"initialize\": # 当请求方法为initialize时,将包含info信息的响应放入队列 await self.queue.put( {\"event\": \"message\", \"data\": self.response(self.info, req.id)} ) elif req.method == \"tools/list\": # 当请求方法为tools/list时,列出所有工具信息并放入队列 tools_info = self.list_tool() await self.queue.put( {  \"event\": \"message\",  \"data\": self.response({\"tools\": tools_info}, req.id), } ) elif req.method == \"tools/call\": # 当请求方法为tools/call时,根据名称调用工具,并将结果放入队列 tool_name = req.params.get(\"name\") args = req.params.get(\"arguments\", {}) for tool in self.tools: if tool.__name__ == tool_name:  try: # 尝试调用工具并处理结果 result = await tool(**args) await self.queue.put( { \"event\": \"message\", \"data\": self.response(  {\"content\": result, \"isError\": False}, req.id ), } )  except Exception as e: # 如果工具调用出错,将错误信息放入队列 await self.queue.put( { \"event\": \"message\", \"data\": self.response(  {\"content\": str(e), \"isError\": True}, req.id ), } )  break# 工具函数async def test(state: Optional[str] = None) -> str: \"\"\"Returns a simple greeting message\"\"\" await asyncio.sleep(1) return f\"hi {state}!\"# SSE 接收端:创建 MCPServer 并建立连接@app.get(\"/sse\")async def receive_test(): \"\"\" 创建并初始化一个MCPServer实例,将其添加到mcpHub中,并向其队列放入事件信息。 该函数主要完成以下任务: 1. 实例化MCPServer对象,指定名称、消息路径和工具函数。 2. 将新创建的MCPServer实例根据其client_id存储在mcpHub中。 3. 向MCPServer的队列放入包含endpoint信息的数据。 4. 返回一个EventSourceResponse对象,用于SSE连接的响应。 \"\"\" # 实例化MCPServer对象,参数包括名称、消息路径和工具函数列表 mcp = MCPServer(name=\"mcp-test\", message_path=\"/message\", tools=[test]) # 将MCPServer实例存储在mcpHub中,键为client_id mcpHub[mcp.client_id] = mcp # 向MCPServer的队列放入事件信息,包含endpoint的路径和client_id await mcp.queue.put( {\"event\": \"endpoint\", \"data\": f\"{mcp.message_path}?client_id={mcp.client_id}\"} ) # 返回EventSourceResponse对象,用于SSE连接的响应 return EventSourceResponse(mcp.reader())# SSE 发送端:接收 JSON-RPC 请求@app.post(\"/message\")async def send_test(request: Request, payload: McpRequest): \"\"\" 处理来自客户端的JSON-RPC请求。 该函数首先从请求的查询参数中获取client_id,检查其是否存在且有效。 如果client_id无效或不在mcpHub中,则返回错误信息。 否则,将payload转发给对应的mcpHub客户端处理。 参数: - request: Request对象,包含请求的相关信息。 - payload: McpRequest对象,承载着JSON-RPC请求的数据。 返回: - 如果client_id无效或不在mcpHub中,返回包含错误信息的字典。 - 如果请求被成功处理,返回包含状态信息的字典。 \"\"\" # 从请求的查询参数中获取client_id client_id = request.query_params.get(\"client_id\") # 检查client_id是否存在且有效 if not client_id or client_id not in mcpHub: # 如果client_id无效或不在mcpHub中,返回错误信息 return {\"error\": \"Invalid client_id\"} # 转发payload给对应的mcpHub客户端处理 await mcpHub[client_id].handle_request(payload) # 请求成功处理,返回状态信息 return {\"status\": \"ok\"}# 本地运行入口if __name__ == \"__main__\": uvicorn.run(app, host=\"0.0.0.0\", port=8001)

启动
在这里插入图片描述

我这里使用APIFOX来发送请求
在这里插入图片描述

把这个client_id复制下来去请求message接口
在这里插入图片描述
在这里插入图片描述

sse长连接成功返回信息

从我们的简单实现中可以看到,我们完全可以不依赖 /sse /message 这些默认路由地址,MCPURL可以完全自定义。

参考链接:https://mp.weixin.qq.com/s/UM6PwoBGhRGvJbvUYggObw