采用LangGraph集成多个MCP服务器的应用
现代人工智能应用程序通常需要对不同的语言模型和专门的服务器进行复杂的编排,每个服务器在更大的工作流中处理特定的任务。然而,这种分布式方法引入了一个关键的挑战: 保持上下文的连续性。
当会话或任务在不同的模型或服务器之间转换时,上下文信息很容易丢失。用户体验到的是,人工智能系统似乎 “忘记” 了对话的早期部分,导致体验支离破碎,效用降低。
1. MCP 的核心
模型上下文协议(Model Context Protocol, MCP)为在分布式人工智能系统中各组件之间一致地保存和高效传输丰富的上下文信息,提供了一个标准化的框架。MCP 的核心设计围绕以下三大要素展开:
-
它定义了一种统一的上下文容器格式,用于封装包括会话历史记录、用户偏好设置以及任务相关元数据在内的多种上下文信息。这种结构化的容器使得上下文具备良好的可读性和可操作性,能够准确反映交互过程中的状态与意图。
-
MCP 提供了一套规范的序列化标准,确保上下文能够在不同的系统、服务或 Agent 之间高效、可靠地传输。该标准不仅支持多种数据格式的转换与解析,还兼顾了性能与兼容性,适用于从本地调用到跨网络通信的多种场景。
-
MCP 还包含一套完整的上下文管理操作,支持上下文的创建、更新、合并与裁剪等关键操作。这些操作使得系统可以根据实际需求动态调整上下文内容,在保障信息完整性的同时避免冗余,提升处理效率与资源利用率。
MCP 架构的核心由上下文容器组成:
class ContextContainer: \"\"\"Container for model context information.\"\"\" def __init__(self, conversation_id=None): self.conversation_id = conversation_id or str(uuid.uuid4()) self.messages = [] self.metadata = {} self.created_at = datetime.now() self.updated_at = datetime.now() def add_message(self, role, content, metadata=None): \"\"\"Add a message to the context.\"\"\" message = { \"id\": str(uuid.uuid4()), \"role\": role, \"content\": content, \"timestamp\": datetime.now().isoformat(), \"metadata\": metadata or {} } self.messages.append(message) self.updated_at = datetime.now() def to_dict(self): \"\"\"Serialize the container to a dictionary.\"\"\" return { \"conversation_id\": self.conversation_id, \"messages\": self.messages, \"metadata\": self.metadata, \"created_at\": self.created_at.isoformat(), \"updated_at\": self.updated_at.isoformat() } @classmethod def from_dict(cls, data): \"\"\"Create a container from a dictionary.\"\"\" container = cls(data.get(\"conversation_id\")) container.messages = data.get(\"messages\", []) container.metadata = data.get(\"metadata\", {}) container.created_at = datetime.fromisoformat(data.get(\"created_at\")) container.updated_at = datetime.fromisoformat(data.get(\"updated_at\")) return container
2. LangGraph 集成
LangGraph 已逐渐成为构建复杂人工智能工作流的首选框架,尤其适用于需要多步骤推理、状态管理以及多 Agent 协作的场景。通过将模型上下文协议(MCP)与 LangGraph 深度集成,开发者能够构建起结构更复杂、分布更广泛的多服务系统,同时保持上下文在各组件之间的无缝流转与一致性。
以下是 MCP 在 LangGraph 环境中的实现方式:
import osfrom typing import Dict, List, Anyimport requestsfrom langchain_core.language_models import BaseLLMfrom langchain_core.messages import HumanMessage, AIMessagefrom langgraph.graph import END, StateGraphfrom langgraph.prebuilt import ToolNode# MCP Context Managerclass MCPContextManager: def __init__(self, mcp_server_url): self.mcp_server_url = mcp_server_url def create_context(self, initial_data=None): \"\"\"Create a new context container.\"\"\" response = requests.post( f\"{self.mcp_server_url}/contexts\", json=initial_data or {} ) return response.json() def get_context(self, context_id): \"\"\"Retrieve a context container.\"\"\" response = requests.get( f\"{self.mcp_server_url}/contexts/{context_id}\" ) return response.json() def update_context(self, context_id, data): \"\"\"Update a context container.\"\"\" response = requests.patch( f\"{self.mcp_server_url}/contexts/{context_id}\", json=data ) return response.json()# MCP-aware LLM Nodeclass MCPLLMNode: def __init__(self, llm: BaseLLM, context_manager: MCPContextManager): self.llm = llm self.context_manager = context_manager def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]: # Extract the context ID from state context_id = state.get(\"context_id\") if not context_id: # Create new context if none exists context = self.context_manager.create_context() context_id = context[\"conversation_id\"] state[\"context_id\"] = context_id else: # Retrieve existing context context = self.context_manager.get_context(context_id) # Convert MCP messages to LangChain messages messages = [] for msg in context.get(\"messages\", []): if msg[\"role\"] == \"user\": messages.append(HumanMessage(content=msg[\"content\"])) elif msg[\"role\"] == \"assistant\": messages.append(AIMessage(content=msg[\"content\"])) # Add the current message if present if \"current_input\" in state: messages.append(HumanMessage(content=state[\"current_input\"])) # Update MCP context with the user message self.context_manager.update_context( context_id, {\"add_message\": { \"role\": \"user\", \"content\": state[\"current_input\"] }} ) # Generate response response = self.llm.generate_response(messages) # Update MCP context with the assistant response self.context_manager.update_context( context_id, {\"add_message\": { \"role\": \"assistant\", \"content\": response.content }} ) # Update state state[\"current_output\"] = response.content return state# Building a multi-server LangGraph with MCPdef build_mcp_graph(llm_servers: List[Dict], mcp_server_url: str): \"\"\" Build a LangGraph using multiple LLM servers with MCP integration. llm_servers: List of server configurations with name, url, and routing_criteria mcp_server_url: URL of the MCP server \"\"\" context_manager = MCPContextManager(mcp_server_url) # Create LLM nodes for each server llm_nodes = {} for server in llm_servers: llm = RemoteLLM(server[\"url\"]) llm_nodes[server[\"name\"]] = MCPLLMNode(llm, context_manager) # Router function to determine which LLM to use def router(state: Dict[str, Any]) -> str: input_text = state.get(\"current_input\", \"\") for server in llm_servers: criteria = server.get(\"routing_criteria\", \"\") if criteria in input_text: return server[\"name\"] # Default to the first server return llm_servers[0][\"name\"] # Build the graph workflow = StateGraph() # Add nodes workflow.add_node(\"router\", router) for name, node in llm_nodes.items(): workflow.add_node(name, node) # Add edges workflow.add_edge(\"router\", dynamic=True) for name in llm_nodes.keys(): workflow.add_edge(name, END) # Set the entry point workflow.set_entry_point(\"router\") return workflow.compile()
这段代码实现了一个基于 LangGraph 和 MCP(Model Context Protocol,模型上下文协议)的多服务器 AI 工作流系统。它展示了如何将 MCP 与 LangGraph 集成,从而在多个 LLM(语言模型)服务之间协调任务,并保持统一的会话上下文状态。
3. 创建多服务器部署
现在,让我们看看如何在现实世界中部署这个架构:
from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport uvicornapp = FastAPI()# Configuration for our LLM serversllm_servers = [ { \"name\": \"general_llm\", \"url\": \"http://general-llm-server:8000/generate\", \"routing_criteria\": \"\" # Default server }, { \"name\": \"code_llm\", \"url\": \"http://code-llm-server:8001/generate\", \"routing_criteria\": \"code\" }, { \"name\": \"creative_llm\", \"url\": \"http://creative-llm-server:8002/generate\", \"routing_criteria\": \"write\" }]# MCP server URLmcp_server_url = \"http://mcp-server:9000\"# Build our graphgraph = build_mcp_graph(llm_servers, mcp_server_url)class ChatRequest(BaseModel): message: str context_id: str = Noneclass ChatResponse(BaseModel): response: str context_id: str@app.post(\"/chat\", response_model=ChatResponse)async def chat(request: ChatRequest): # Initialize state state = { \"current_input\": request.message } # Add context ID if provided if request.context_id: state[\"context_id\"] = request.context_id # Process through the graph result = graph.invoke(state) return ChatResponse( response=result[\"current_output\"], context_id=result[\"context_id\"] )if __name__ == \"__main__\": uvicorn.run(app, host=\"0.0.0.0\", port=8080)
4. MCP 服务器的实现
这种架构的一个关键组件是管理上下文容器的专用 MCP 服务器:
from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport uvicornfrom typing import Dict, List, Optional, Anyimport uuidfrom datetime import datetimeimport jsonimport redisapp = FastAPI()# Use Redis for persistenceredis_client = redis.Redis(host=\"redis\", port=6379, db=0)class Message(BaseModel): role: str content: str metadata: Optional[Dict[str, Any]] = {}class CreateContextRequest(BaseModel): metadata: Optional[Dict[str, Any]] = {} messages: Optional[List[Dict[str, Any]]] = []class UpdateContextRequest(BaseModel): metadata: Optional[Dict[str, Any]] = None add_message: Optional[Dict[str, Any]] = None replace_messages: Optional[List[Dict[str, Any]]] = None@app.post(\"/contexts\")async def create_context(request: CreateContextRequest): context_id = str(uuid.uuid4()) now = datetime.now().isoformat() context = { \"conversation_id\": context_id, \"messages\": request.messages, \"metadata\": request.metadata, \"created_at\": now, \"updated_at\": now } redis_client.set(f\"context:{context_id}\", json.dumps(context)) return context@app.get(\"/contexts/{context_id}\")async def get_context(context_id: str): context_data = redis_client.get(f\"context:{context_id}\") if not context_data: raise HTTPException(status_code=404, detail=\"Context not found\") return json.loads(context_data)@app.patch(\"/contexts/{context_id}\")async def update_context(context_id: str, request: UpdateContextRequest): context_data = redis_client.get(f\"context:{context_id}\") if not context_data: raise HTTPException(status_code=404, detail=\"Context not found\") context = json.loads(context_data) context[\"updated_at\"] = datetime.now().isoformat() # Update metadata if provided if request.metadata is not None: context[\"metadata\"].update(request.metadata) # Add a message if provided if request.add_message is not None: message = { \"id\": str(uuid.uuid4()), \"role\": request.add_message[\"role\"], \"content\": request.add_message[\"content\"], \"timestamp\": datetime.now().isoformat(), \"metadata\": request.add_message.get(\"metadata\", {}) } context[\"messages\"].append(message) # Replace messages if provided if request.replace_messages is not None: context[\"messages\"] = request.replace_messages redis_client.set(f\"context:{context_id}\", json.dumps(context)) return contextif __name__ == \"__main__\": uvicorn.run(app, host=\"0.0.0.0\", port=9000)
5. 为部署撰写 Docker
为了把一切联系起来,可以使用 Docker Compose 来部署我们的多服务器系统:
version: \'3\'services: # Main API Gateway api_gateway: build: ./api_gateway ports: - \"8080:8080\" environment: - MCP_SERVER_URL=http://mcp_server:9000 depends_on: - mcp_server - general_llm - code_llm - creative_llm # MCP Server mcp_server: build: ./mcp_server ports: - \"9000:9000\" depends_on: - redis # Redis for MCP persistence redis: image: redis:alpine ports: - \"6379:6379\" # LLM Servers general_llm: build: ./llm_servers/general ports: - \"8000:8000\" code_llm: build: ./llm_servers/code ports: - \"8001:8000\" creative_llm: build: ./llm_servers/creative ports: - \"8002:8000\"
6. 性能关注
在生产环境中部署模型上下文协议(MCP)时,性能优化是确保系统高效稳定运行的关键。以下是几个需要重点关注的性能考量及应对策略:
上下文大小管理
随着会话持续进行,上下文容器可能会不断增长,导致存储开销和传输延迟增加。为避免这一问题,应实施有效的剪枝策略,例如仅保留最近的几轮对话或通过语义分析提取关键信息,从而保持上下文的精简与相关性。
序列化效率
在高吞吐量的系统中,频繁的上下文序列化与反序列化可能成为性能瓶颈。建议采用更高效的序列化格式,如 Protocol Buffers、MessagePack 或 FlatBuffers,以替代传统的 JSON,从而减少数据体积并加快编解码速度,提升整体处理效率。
缓存策略
为了降低对后端 MCP 服务的访问频率,减少重复查询带来的延迟,应在多个层级(如客户端、网关或服务端)引入缓存机制。可缓存当前活跃的上下文片段,仅在必要时同步更新或拉取最新状态,从而显著提升响应速度并减轻系统负载。
通过合理设计这些性能优化策略,可以有效保障 MCP 在大规模、高并发 AI 应用场景下的稳定性与可扩展性,使其更好地服务于复杂的智能工作流需求。
class CachedMCPContextManager(MCPContextManager): def __init__(self, mcp_server_url, cache_ttl=300): super().__init__(mcp_server_url) self.cache = {} self.cache_ttl = cache_ttl self.last_access = {} def get_context(self, context_id): current_time = time.time() # Check cache if context_id in self.cache: self.last_access[context_id] = current_time return self.cache[context_id] # Retrieve from server context = super().get_context(context_id) # Update cache self.cache[context_id] = context self.last_access[context_id] = current_time # Prune cache if needed self._prune_cache() return context def _prune_cache(self): current_time = time.time() expired_keys = [ k for k, v in self.last_access.items() if current_time - v > self.cache_ttl ] for key in expired_keys: if key in self.cache: del self.cache[key] if key in self.last_access: del self.last_access[key]
7. 安全性考量
在实施模型上下文协议(MCP)的过程中,安全性是一个不可忽视的关键环节。由于上下文容器可能包含用户的敏感信息,如对话历史、个人偏好或业务数据,因此必须采取全面的安全措施来保障系统的可靠性和用户隐私。
在数据保护方面,应确保上下文数据在存储和传输过程中始终处于加密状态。同时,建立严格的访问控制机制,仅允许经过身份验证和授权的实体读取或修改上下文内容。此外,还需制定合理的数据保留策略,明确上下文信息的存储时限,并在任务完成后及时清理或归档,以降低数据泄露的风险。
身份验证与授权是保障系统安全的核心环节。所有试图访问或更新上下文的请求都必须通过可信的身份认证流程,例如使用 API 密钥、OAuth 令牌或基于角色的访问控制(RBAC)。这可以有效防止未经授权的系统或用户篡改上下文内容,从而保障整个 MCP 系统的数据完整性和操作合法性。
为防范潜在的攻击行为,还必须重视输入验证与内容消毒。任何进入上下文的数据都应经过严格校验,避免恶意构造的内容引发注入攻击或其他安全漏洞。建议在数据写入上下文之前进行格式检查、转义处理或使用沙箱机制,从源头上杜绝安全隐患。
通过在数据保护、访问控制和输入过滤等方面构建多层次的安全防护体系,可以有效提升 MCP 在实际应用中的安全性,为其在复杂、高要求的生产环境中的部署提供坚实保障。
class SecureMCPContextManager(MCPContextManager): def __init__(self, mcp_server_url, api_key): super().__init__(mcp_server_url) self.api_key = api_key def _get_headers(self): return { \"Authorization\": f\"Bearer {self.api_key}\", \"Content-Type\": \"application/json\" } def create_context(self, initial_data=None): \"\"\"Create a new context container with authentication.\"\"\" response = requests.post( f\"{self.mcp_server_url}/contexts\", headers=self._get_headers(), json=initial_data or {} ) return response.json() # Similarly update other methods to use authentication
小结
MCP的出现,标志着分布式人工智能系统体系架构取得了重大的突破与进展。它提供了一种标准化的方法,用于管理和传输不同组件之间的上下文信息,从而为创建能够保持上下文连续性的复杂多服务器系统提供了有力支持。
在当今时代,人工智能系统正变得愈发复杂。在这样的背景下,像MCP这样的协议对于打造无缝的用户体验而言,其重要性将日益凸显。开发人员通过实现本文所阐述的模式和代码,便能够构建出健壮的分布式AI系统,进而确保跨服务器和模型的上下文得以有效维护。
MCP的真正优势在于它的简洁性与灵活性。它将上下文管理与单个AI组件相分离,这使得更模块化、易于维护且具备可伸缩性的体系架构成为可能。随着人工智能生态系统的持续发展,我们有理由期待看到更多类似的标准化协议出现。
【关联阅读】
-
什么可能会定义人工智能的下一个十年?
-
大模型应用系列:两万字解读MCP
-
拆解OpenAI最大对手的杀手锏:为什么会是MCP?
-
万字解读:8种常见框架,选择哪一种来开发MCP呢?
-
MCP规范完整中译稿:2025-3-26版
-
智能体间协作的\"巴别塔困境\"如何破解?解读Agent通信4大协议:MCP/ACP/A2A/ANP
-
大模型应用的10种架构模式
-
7B?13B?175B?解读大模型的参数
-
大模型应用系列:从Ranking到Reranking
-
大模型应用系列:Query 变换的示例浅析
-
从零构建大模型之Transformer公式解读
-
如何选择Embedding Model?关于嵌入模型的10个思考
-
解读文本嵌入:语义表达的练习
-
解读知识图谱的自动构建
-
“提示工程”的技术分类
-
大模型系列:提示词管理
-
提示工程中的10个设计模式
-
解读:基于图的大模型提示技术
-
大模型微调:RHLF与DPO浅析
-
Chunking:基于大模型RAG系统中的文档分块
-
大模型应用框架:LangChain与LlamaIndex的对比选择
-
解读大模型应用的可观测性
-
大模型系列之解读MoE
-
在大模型RAG系统中应用知识图谱
-
面向知识图谱的大模型应用
-
让知识图谱成为大模型的伴侣
-
如何构建基于大模型的App
-
Qcon2023: 大模型时代的技术人成长(简)
-
论文学习笔记:增强学习应用于OS调度
-
《深入浅出Embedding》随笔
-
LLM的工程实践思考
-
大模型应用设计的10个思考
-
基于大模型(LLM)的Agent 应用开发
-
解读大模型的微调
-
解读向量数据库
-
解读向量索引
-
解读ChatGPT中的RLHF
-
解读大模型(LLM)的token
-
解读提示词工程(Prompt Engineering)
-
解读Toolformer
-
解读TaskMatrix.AI
-
解读LangChain
-
解读LoRA
-
解读RAG
-
大模型应用框架之Semantic Kernel
-
浅析多模态机器学习
-
大模型应用于数字人
-
深度学习架构的对比分析
-
老码农眼中的大模型(LLM)