MCP的SSE的底层通信原理(给出了不使用SDK实现mcp服务器的示例)_mcp sse message
MCP通信原理
模型上下文协议( MCP
)定义了两种标准的通信传输协议:STDIO
和SSE
,用于连接大型语言模型(LLM
)与外部工具或数据源。
Stdio
)SSE
)stdin
)和标准输出(stdout
)进行通信HTTP POST
发送请求,服务器使用 SSE
推送响应SSE介绍
Server-Sent Events(SSE)
是一种基于 HTTP 协议的服务器推送技术,允许服务器通过单向的方式向客户端发送实时更新的数据流。SSE
与 WebSocket
作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。
总体来说,WebSocket
更强大和灵活。因为它是全双工通道,可以双向通信;SSE
是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。
stdio
中可以使用stdin
来进行输入,使用stdout
来进行输出。
但是SSE
是单向通道,MCP
要如何实现双向通信呢?是建立两根SSE通道吗?带着这个疑问,我们来进行动手实践。
MCP的SSE通信流程
我们可以构建一个简单的MCP Server
,然后利用MCP官方提供的工具npx @modelcontextprotocol/inspector
可以比较方便地拉起一个验证MCP
的管理页。针对这个管理页抓包就能发现一些SSE
的通信端倪。
初始化项目
uv init mcp-server-democd mcp-server-demouv add \"mcp[cli]\"
构建一个简单的MCP server
# server.pyfrom mcp.server.fastmcp import FastMCP# Create an MCP servermcp = FastMCP(\"Demo\", port=3001)# Add an addition tool@mcp.tool()def add(a: int, b: int) -> int: \"\"\"Add two numbers\"\"\" return a + b# Add a dynamic greeting resource@mcp.resource(\"greeting://{name}\")def get_greeting(name: str) -> str: \"\"\"Get a personalized greeting\"\"\" return f\"Hello, {name}!\"if __name__ == \"__main__\": mcp.run(transport=\"sse\")
启动服务器以及启动mcp inspector
python server.pynpx @modelcontextprotocol/inspector
打开http://127.0.0.1:6274进入管理页,打开开发者模式,针对这个管理页抓包就能发现一些SSE的通信端倪。
-
当连接到
mcp server
时,会建立一条SSE
长连接,它只负责推送消息可以看到
Client
连接上/sse
这个地址的第一个Event
就是告诉Client
发送信息需要去哪个URL
发,这个URL
通常会带上唯一的会话ID。 -
当我调用工具时计算
2+7
时,client
会发送一个post
请求,结果会通过一开始的SSE
长连接返回
总结双向通信的实现方式为
-
只有一根
SSE
长连接,用来Server
向Client
推送数据,另外一个Client
向Server
发送请求的通道是使用普通的HTTP POST
请求。 -
Client
向Server
发送的HTTP POST
请求中只使用2xx
反馈是否收到指令,所有的数据返回是通过一开始的SSE
长连接来推送。
简单实现MCP,致敬这个优秀的设计
from fastapi import FastAPI, Requestfrom sse_starlette.sse import EventSourceResponsefrom pydantic import BaseModelfrom typing import Optional, Callable, Awaitable, Any, List, Dictimport asyncioimport uuidimport jsonimport inspectimport uvicornapp = FastAPI()mcpHub: Dict[str, \"MCPServer\"] = {}# 请求模型class McpRequest(BaseModel): id: Optional[int] = None jsonrpc: str method: str params: Optional[dict] = None# MCP 服务核心类class MCPServer: def __init__( self, name: str, message_path: str, tools: List[Callable[..., Awaitable[Any]]] ): \"\"\" 初始化函数,用于设置实例变量和启动必要的异步任务。 参数: - name (str): 服务器名称。 - message_path (str): 消息路径,用于指定消息的路由。 - tools (List[Callable[..., Awaitable[Any]]]): 工具函数列表,这些函数是异步的,并可接受不定数量的参数。 \"\"\" # 初始化一个异步队列,用于处理消息 self.queue: asyncio.Queue = asyncio.Queue() # 生成一个唯一的客户端ID self.client_id: str = str(uuid.uuid4()) # 存储消息路径 self.message_path: str = message_path # 存储提供的工具函数列表 self.tools = tools # 初始化信息字典,包含协议版本、功能和服务器信息 self.info = { \"protocolVersion\": \"2024-11-05\", \"capabilities\": {\"experimental\": {}, \"tools\": {\"listChanged\": False}}, \"serverInfo\": {\"name\": name, \"version\": \"1.6.0\"}, } def list_tool(self) -> List[Dict[str, Any]]: \"\"\" 生成工具列表的函数。 此函数遍历self.tools中的所有工具,并为每个工具生成一个包含名称、描述和输入模式的字典。 这些字典最终被收集到一个列表中并返回。该函数主要用于整理和提供工具的相关信息,以便于后续的处理或展示。 Returns: List[Dict[str, Any]]: 包含所有工具信息的列表,每个工具的信息包括名称、描述和输入模式。 \"\"\" # 初始化一个空列表,用于存储所有工具的信息 tool_list = [] # 遍历self.tools中的每个工具 for tool in self.tools: # 使用inspect模块获取工具的签名信息,以便后续获取参数信息 sig = inspect.signature(tool) # 将工具的名称、描述和输入模式添加到tool_list中 tool_list.append( { \"name\": tool.__name__, # 工具的名称 \"description\": tool.__doc__, # 工具的描述信息 \"inputSchema\": { \"type\": \"object\", \"properties\": { # 根据工具的签名信息,生成输入模式的属性 name: {\"title\": name, \"type\": \"string\"} for name in sig.parameters }, }, } ) # 返回包含所有工具信息的列表 return tool_list # 异步生成器函数:reader # 目的:持续从队列中读取事件并返回 async def reader(self): # 循环无限期地尝试从队列中获取下一个事件 while True: # 异步地从队列中获取事件 event = await self.queue.get() # 返回事件 yield event @staticmethod def response(result: Any, id: Optional[int]) -> str: \"\"\" 生成一个JSON-RPC格式的响应字符串。 参数: - result: Any 类型,代表JSON-RPC请求的结果,可以是任意类型。 - id: Optional[int] 类型,代表JSON-RPC请求的ID,可能为None,用于标识请求。 返回值: - str 类型,代表JSON-RPC格式的响应字符串,包括结果和可选的请求ID。 \"\"\" # 初始化JSON-RPC响应消息的基本结构,包含结果。 message = {\"jsonrpc\": \"2.0\", \"result\": result} # 如果请求ID不为空,则将其添加到响应消息中。 if id is not None: message[\"id\"] = id # 将响应消息序列化为JSON字符串并返回。 return json.dumps(message) async def handle_request(self, req: McpRequest): \"\"\" 处理请求函数,根据不同的请求方法执行相应的逻辑。 参数: - req (McpRequest): 请求对象,包含请求的方法、参数等信息。 此函数根据请求方法的不同,执行初始化、工具列表查询或工具调用等操作,并将结果通过队列返回。 \"\"\" if req.method == \"initialize\": # 当请求方法为initialize时,将包含info信息的响应放入队列 await self.queue.put( {\"event\": \"message\", \"data\": self.response(self.info, req.id)} ) elif req.method == \"tools/list\": # 当请求方法为tools/list时,列出所有工具信息并放入队列 tools_info = self.list_tool() await self.queue.put( { \"event\": \"message\", \"data\": self.response({\"tools\": tools_info}, req.id), } ) elif req.method == \"tools/call\": # 当请求方法为tools/call时,根据名称调用工具,并将结果放入队列 tool_name = req.params.get(\"name\") args = req.params.get(\"arguments\", {}) for tool in self.tools: if tool.__name__ == tool_name: try: # 尝试调用工具并处理结果 result = await tool(**args) await self.queue.put( { \"event\": \"message\", \"data\": self.response( {\"content\": result, \"isError\": False}, req.id ), } ) except Exception as e: # 如果工具调用出错,将错误信息放入队列 await self.queue.put( { \"event\": \"message\", \"data\": self.response( {\"content\": str(e), \"isError\": True}, req.id ), } ) break# 工具函数async def test(state: Optional[str] = None) -> str: \"\"\"Returns a simple greeting message\"\"\" await asyncio.sleep(1) return f\"hi {state}!\"# SSE 接收端:创建 MCPServer 并建立连接@app.get(\"/sse\")async def receive_test(): \"\"\" 创建并初始化一个MCPServer实例,将其添加到mcpHub中,并向其队列放入事件信息。 该函数主要完成以下任务: 1. 实例化MCPServer对象,指定名称、消息路径和工具函数。 2. 将新创建的MCPServer实例根据其client_id存储在mcpHub中。 3. 向MCPServer的队列放入包含endpoint信息的数据。 4. 返回一个EventSourceResponse对象,用于SSE连接的响应。 \"\"\" # 实例化MCPServer对象,参数包括名称、消息路径和工具函数列表 mcp = MCPServer(name=\"mcp-test\", message_path=\"/message\", tools=[test]) # 将MCPServer实例存储在mcpHub中,键为client_id mcpHub[mcp.client_id] = mcp # 向MCPServer的队列放入事件信息,包含endpoint的路径和client_id await mcp.queue.put( {\"event\": \"endpoint\", \"data\": f\"{mcp.message_path}?client_id={mcp.client_id}\"} ) # 返回EventSourceResponse对象,用于SSE连接的响应 return EventSourceResponse(mcp.reader())# SSE 发送端:接收 JSON-RPC 请求@app.post(\"/message\")async def send_test(request: Request, payload: McpRequest): \"\"\" 处理来自客户端的JSON-RPC请求。 该函数首先从请求的查询参数中获取client_id,检查其是否存在且有效。 如果client_id无效或不在mcpHub中,则返回错误信息。 否则,将payload转发给对应的mcpHub客户端处理。 参数: - request: Request对象,包含请求的相关信息。 - payload: McpRequest对象,承载着JSON-RPC请求的数据。 返回: - 如果client_id无效或不在mcpHub中,返回包含错误信息的字典。 - 如果请求被成功处理,返回包含状态信息的字典。 \"\"\" # 从请求的查询参数中获取client_id client_id = request.query_params.get(\"client_id\") # 检查client_id是否存在且有效 if not client_id or client_id not in mcpHub: # 如果client_id无效或不在mcpHub中,返回错误信息 return {\"error\": \"Invalid client_id\"} # 转发payload给对应的mcpHub客户端处理 await mcpHub[client_id].handle_request(payload) # 请求成功处理,返回状态信息 return {\"status\": \"ok\"}# 本地运行入口if __name__ == \"__main__\": uvicorn.run(app, host=\"0.0.0.0\", port=8001)
启动
我这里使用APIFOX
来发送请求
把这个client_id
复制下来去请求message
接口
sse
长连接成功返回信息
从我们的简单实现中可以看到,我们完全可以不依赖 /sse /message
这些默认路由地址,MCP
的URL
可以完全自定义。
参考链接:https://mp.weixin.qq.com/s/UM6PwoBGhRGvJbvUYggObw