> 技术文档 > 模型上下文协议(MCP):AI 时代的通用连接器-15个开源MCP源码深度解析

模型上下文协议(MCP):AI 时代的通用连接器-15个开源MCP源码深度解析


模型上下文协议(MCP):AI 时代的通用连接器-15个开源MCP源码深度解析

简介

在人工智能快速发展的今天,大语言模型(LLM)的能力越来越强大,但它们往往像孤岛一样存在,难以与外部系统进行有效交互。为了解决这个问题,modelcontextprotocol.io 推出了模型上下文协议(Model Context Protocol,简称MCP),这是一个革命性的开放标准协议。

MCP代表了从定制API集成到统一标准的重要转变,由Anthropic倡导并得到OpenAI支持,它为连接模型到不断扩展的数据源宇宙提供了类似USB-C的标准接口。

MCP 是什么

模型上下文协议是一种新兴的开放协议,它标准化了应用程序向大语言模型提供上下文和工具的方式。简单来说,MCP就像是AI世界的通用连接器,让不同的应用程序和服务能够通过标准化的接口与AI模型进行交互。

MCP不仅仅是API技术的演进,而是AI系统理解和交互数字世界方式的革命。

MCP 的核心架构

MCP采用客户端-服务器架构,其中主机应用程序可以连接到多个服务器。整个系统包含以下核心组件:

  1. MCP Hosts(主机): 如Claude Desktop、Cursor、Windsurf等应用程序,或任何希望通过MCP访问数据的AI工具
  2. MCP Clients(客户端): 与MCP服务器保持一对一连接的协议客户端,充当通信桥梁
  3. MCP Servers(服务器): 轻量级程序,每个程序都通过标准化协议公开特定功能
  4. Local Data Sources(本地数据源): MCP服务器可以安全访问的计算机文件、数据库和服务
  5. Remote Services(远程服务): MCP服务器可以连接的外部API和云服务

MCP 的重要意义

1. 统一标准化接口

传统的AI应用开发需要为每个外部服务单独开发接口,这不仅增加了开发复杂度,还带来了维护成本。MCP提供了统一的标准,就像USB接口一样,任何支持MCP的服务都可以轻松接入AI系统。

2. 安全性增强

MCP通过标准化的安全协议,确保AI系统与外部服务的交互更加安全可靠。服务提供商只需要实现一套安全标准,就能保证与所有支持MCP的AI应用的安全交互。

3. 互操作性提升

不同的AI应用可以通过MCP共享相同的服务和工具,这大大提高了整个AI生态系统的互操作性。

4. 开发效率提升

开发者不再需要为每个AI应用单独开发集成方案,一个MCP服务器就能支持所有兼容的AI客户端。

MCP 系统组件详解

MCP 客户端架构

class MCPClient: def __init__(self, server_url, auth_token=None): self.server_url = server_url self.auth_token = auth_token self.connection = None self.capabilities = {} async def connect(self): \"\"\"建立与MCP服务器的连接\"\"\" try: self.connection = await self._establish_connection() await self._perform_handshake() self.capabilities = await self._get_server_capabilities() return True except Exception as e: print(f\"连接失败: {e}\") return False async def _establish_connection(self): \"\"\"建立底层网络连接\"\"\" # WebSocket或HTTP连接实现 pass async def _perform_handshake(self): \"\"\"执行协议握手\"\"\" handshake_message = { \"jsonrpc\": \"2.0\", \"method\": \"initialize\", \"params\": { \"protocolVersion\": \"2024-11-05\", \"capabilities\": {  \"roots\": {\"listChanged\": True},  \"sampling\": {} }, \"clientInfo\": {  \"name\": \"MCP Client\",  \"version\": \"1.0.0\" } } } response = await self._send_message(handshake_message) return response async def _get_server_capabilities(self): \"\"\"获取服务器能力\"\"\" request = { \"jsonrpc\": \"2.0\", \"method\": \"tools/list\", \"params\": {} } response = await self._send_message(request) return response.get(\"result\", {}) async def call_tool(self, tool_name, arguments): \"\"\"调用工具\"\"\" request = { \"jsonrpc\": \"2.0\", \"method\": \"tools/call\", \"params\": { \"name\": tool_name, \"arguments\": arguments } } return await self._send_message(request) async def get_resources(self): \"\"\"获取资源列表\"\"\" request = { \"jsonrpc\": \"2.0\", \"method\": \"resources/list\", \"params\": {} } return await self._send_message(request) async def read_resource(self, uri): \"\"\"读取特定资源\"\"\" request = { \"jsonrpc\": \"2.0\", \"method\": \"resources/read\", \"params\": { \"uri\": uri } } return await self._send_message(request) async def _send_message(self, message): \"\"\"发送消息到服务器\"\"\" if not self.connection: raise Exception(\"未建立连接\") # 添加认证头 if self.auth_token: message[\"auth\"] = self.auth_token # 发送消息并等待响应 response = await self.connection.send_and_wait(message) return response

MCP 服务器架构

class MCPServer: def __init__(self, name, version=\"1.0.0\"): self.name = name self.version = version self.tools = {} self.resources = {} self.prompts = {} self.clients = set() def register_tool(self, name, description, parameters, handler): \"\"\"注册工具\"\"\" self.tools[name] = { \"name\": name, \"description\": description, \"inputSchema\": { \"type\": \"object\", \"properties\": parameters, \"required\": list(parameters.keys()) }, \"handler\": handler } def register_resource(self, uri, name, description, handler): \"\"\"注册资源\"\"\" self.resources[uri] = { \"uri\": uri, \"name\": name, \"description\": description, \"mimeType\": \"text/plain\", \"handler\": handler } def register_prompt(self, name, description, arguments, handler): \"\"\"注册提示模板\"\"\" self.prompts[name] = { \"name\": name, \"description\": description, \"arguments\": arguments, \"handler\": handler } async def handle_initialize(self, params): \"\"\"处理初始化请求\"\"\" client_info = params.get(\"clientInfo\", {}) print(f\"客户端连接: {client_info.get(\'name\', \'Unknown\')}\") return { \"protocolVersion\": \"2024-11-05\", \"capabilities\": { \"tools\": {\"listChanged\": True}, \"resources\": {\"subscribe\": True, \"listChanged\": True}, \"prompts\": {\"listChanged\": True} }, \"serverInfo\": { \"name\": self.name, \"version\": self.version } } async def handle_tools_list(self, params): \"\"\"处理工具列表请求\"\"\" return { \"tools\": [ {  \"name\": tool[\"name\"],  \"description\": tool[\"description\"],  \"inputSchema\": tool[\"inputSchema\"] } for tool in self.tools.values() ] } async def handle_tools_call(self, params): \"\"\"处理工具调用请求\"\"\" tool_name = params[\"name\"] arguments = params[\"arguments\"] if tool_name not in self.tools: raise Exception(f\"工具未找到: {tool_name}\") tool = self.tools[tool_name] handler = tool[\"handler\"] try: result = await handler(arguments) return { \"content\": [  { \"type\": \"text\", \"text\": str(result)  } ] } except Exception as e: return { \"content\": [  { \"type\": \"text\", \"text\": f\"错误: {str(e)}\"  } ], \"isError\": True } async def handle_resources_list(self, params): \"\"\"处理资源列表请求\"\"\" return { \"resources\": [ {  \"uri\": resource[\"uri\"],  \"name\": resource[\"name\"],  \"description\": resource[\"description\"],  \"mimeType\": resource[\"mimeType\"] } for resource in self.resources.values() ] } async def handle_resources_read(self, params): \"\"\"处理资源读取请求\"\"\" uri = params[\"uri\"] if uri not in self.resources: raise Exception(f\"资源未找到: {uri}\") resource = self.resources[uri] handler = resource[\"handler\"] try: content = await handler(uri) return { \"contents\": [  { \"uri\": uri, \"mimeType\": resource[\"mimeType\"], \"text\": content  } ] } except Exception as e: raise Exception(f\"读取资源失败: {str(e)}\") async def handle_prompts_list(self, params): \"\"\"处理提示列表请求\"\"\" return { \"prompts\": [ {  \"name\": prompt[\"name\"],  \"description\": prompt[\"description\"],  \"arguments\": prompt[\"arguments\"] } for prompt in self.prompts.values() ] } async def handle_prompts_get(self, params): \"\"\"处理提示获取请求\"\"\" prompt_name = params[\"name\"] arguments = params.get(\"arguments\", {}) if prompt_name not in self.prompts: raise Exception(f\"提示未找到: {prompt_name}\") prompt = self.prompts[prompt_name] handler = prompt[\"handler\"] try: messages = await handler(arguments) return { \"description\": prompt[\"description\"], \"messages\": messages } except Exception as e: raise Exception(f\"生成提示失败: {str(e)}\") async def process_message(self, message): \"\"\"处理传入消息\"\"\" method = message.get(\"method\") params = message.get(\"params\", {}) handler_map = { \"initialize\": self.handle_initialize, \"tools/list\": self.handle_tools_list, \"tools/call\": self.handle_tools_call, \"resources/list\": self.handle_resources_list, \"resources/read\": self.handle_resources_read, \"prompts/list\": self.handle_prompts_list, \"prompts/get\": self.handle_prompts_get } if method in handler_map: try: result = await handler_map[method](params) return {  \"jsonrpc\": \"2.0\",  \"id\": message.get(\"id\"),  \"result\": result } except Exception as e: return {  \"jsonrpc\": \"2.0\",  \"id\": message.get(\"id\"),  \"error\": { \"code\": -32000, \"message\": str(e)  } } else: return { \"jsonrpc\": \"2.0\", \"id\": message.get(\"id\"), \"error\": {  \"code\": -32601,  \"message\": f\"方法未找到: {method}\" } }

15 个开源 MCP 项目详解

以下是当前生态系统中最重要的15+个开源MCP项目,每个项目都展示了MCP在不同领域的应用潜力:

开发与集成类

1. GitHub Official MCP

GitHub: https://github.com/github/github-mcp-server

GitHub官方MCP服务器,提供与GitHub平台的无缝集成。

class GitHubMCPServer(MCPServer): def __init__(self, github_token): super().__init__(\"GitHub MCP Server\") self.github_token = github_token self.github_client = None self._register_github_tools() def _register_github_tools(self): \"\"\"注册GitHub工具\"\"\" self.register_tool( \"create_issue\", \"在仓库中创建新的issue\", { \"owner\": {\"type\": \"string\", \"description\": \"仓库拥有者\"}, \"repo\": {\"type\": \"string\", \"description\": \"仓库名称\"}, \"title\": {\"type\": \"string\", \"description\": \"issue标题\"}, \"body\": {\"type\": \"string\", \"description\": \"issue内容\"} }, self._create_issue ) self.register_tool( \"list_repositories\", \"列出用户的仓库\", { \"username\": {\"type\": \"string\", \"description\": \"用户名\"} }, self._list_repositories ) self.register_tool( \"get_file_content\", \"获取文件内容\", { \"owner\": {\"type\": \"string\", \"description\": \"仓库拥有者\"}, \"repo\": {\"type\": \"string\", \"description\": \"仓库名称\"}, \"path\": {\"type\": \"string\", \"description\": \"文件路径\"} }, self._get_file_content ) async def _create_issue(self, args): \"\"\"创建issue\"\"\" url = f\"https://api.github.com/repos/{args[\'owner\']}/{args[\'repo\']}/issues\" headers = { \"Authorization\": f\"token {self.github_token}\", \"Accept\": \"application/vnd.github.v3+json\" } data = { \"title\": args[\"title\"], \"body\": args[\"body\"] } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 201:  result = await response.json()  return f\"Issue创建成功: {result[\'html_url\']}\" else:  error = await response.text()  raise Exception(f\"创建issue失败: {error}\") async def _list_repositories(self, args): \"\"\"列出仓库\"\"\" url = f\"https://api.github.com/users/{args[\'username\']}/repos\" headers = { \"Authorization\": f\"token {self.github_token}\", \"Accept\": \"application/vnd.github.v3+json\" } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  repos = await response.json()  return [{\"name\": repo[\"name\"], \"url\": repo[\"html_url\"]} for repo in repos] else:  error = await response.text()  raise Exception(f\"获取仓库列表失败: {error}\") async def _get_file_content(self, args): \"\"\"获取文件内容\"\"\" url = f\"https://api.github.com/repos/{args[\'owner\']}/{args[\'repo\']}/contents/{args[\'path\']}\" headers = { \"Authorization\": f\"token {self.github_token}\", \"Accept\": \"application/vnd.github.v3+json\" } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  content = await response.json()  if content[\"type\"] == \"file\": import base64 decoded_content = base64.b64decode(content[\"content\"]).decode(\'utf-8\') return decoded_content  else: return \"这是一个目录,不是文件\" else:  error = await response.text()  raise Exception(f\"获取文件内容失败: {error}\")

创意与设计类

2. Figma MCP

GitHub: https://github.com/sonnylazuardi/cursor-talk-to-figma-mcp

让Cursor能够以编程方式读取和修改Figma设计。

class FigmaMCPServer(MCPServer): def __init__(self, figma_token): super().__init__(\"Figma MCP Server\") self.figma_token = figma_token self._register_figma_tools() def _register_figma_tools(self): \"\"\"注册Figma工具\"\"\" self.register_tool( \"get_file\", \"获取Figma文件信息\", { \"file_key\": {\"type\": \"string\", \"description\": \"Figma文件key\"} }, self._get_file ) self.register_tool( \"get_file_nodes\", \"获取文件节点信息\", { \"file_key\": {\"type\": \"string\", \"description\": \"Figma文件key\"}, \"node_ids\": {\"type\": \"array\", \"description\": \"节点ID列表\"} }, self._get_file_nodes ) self.register_tool( \"export_image\", \"导出图像\", { \"file_key\": {\"type\": \"string\", \"description\": \"Figma文件key\"}, \"node_ids\": {\"type\": \"array\", \"description\": \"节点ID列表\"}, \"format\": {\"type\": \"string\", \"description\": \"格式(png, jpg, svg)\", \"default\": \"png\"} }, self._export_image ) async def _get_file(self, args): \"\"\"获取Figma文件\"\"\" file_key = args[\"file_key\"] url = f\"https://api.figma.com/v1/files/{file_key}\" headers = {\"X-Figma-Token\": self.figma_token} async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  data = await response.json()  return { \"name\": data[\"name\"], \"lastModified\": data[\"lastModified\"], \"thumbnailUrl\": data[\"thumbnailUrl\"], \"pages\": [page[\"name\"] for page in data[\"document\"][\"children\"]]  } else:  error = await response.text()  raise Exception(f\"获取Figma文件失败: {error}\") async def _get_file_nodes(self, args): \"\"\"获取文件节点\"\"\" file_key = args[\"file_key\"] node_ids = \",\".join(args[\"node_ids\"]) url = f\"https://api.figma.com/v1/files/{file_key}/nodes?ids={node_ids}\" headers = {\"X-Figma-Token\": self.figma_token} async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  data = await response.json()  return data[\"nodes\"] else:  error = await response.text()  raise Exception(f\"获取节点信息失败: {error}\") async def _export_image(self, args): \"\"\"导出图像\"\"\" file_key = args[\"file_key\"] node_ids = \",\".join(args[\"node_ids\"]) format_type = args.get(\"format\", \"png\") url = f\"https://api.figma.com/v1/images/{file_key}?ids={node_ids}&format={format_type}\" headers = {\"X-Figma-Token\": self.figma_token} async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  data = await response.json()  return data[\"images\"] else:  error = await response.text()  raise Exception(f\"导出图像失败: {error}\")
3. Blender MCP

GitHub: https://github.com/ahujasid/blender-mcp

仅使用提示创建3D场景的Blender集成。

class BlenderMCPServer(MCPServer): def __init__(self): super().__init__(\"Blender MCP Server\") self._register_blender_tools() def _register_blender_tools(self): \"\"\"注册Blender工具\"\"\" self.register_tool( \"create_cube\", \"创建立方体\", { \"location\": {\"type\": \"array\", \"description\": \"位置坐标[x,y,z]\", \"default\": [0,0,0]}, \"scale\": {\"type\": \"array\", \"description\": \"缩放[x,y,z]\", \"default\": [1,1,1]}, \"name\": {\"type\": \"string\", \"description\": \"对象名称\", \"default\": \"Cube\"} }, self._create_cube ) self.register_tool( \"create_sphere\", \"创建球体\", { \"location\": {\"type\": \"array\", \"description\": \"位置坐标[x,y,z]\", \"default\": [0,0,0]}, \"radius\": {\"type\": \"number\", \"description\": \"半径\", \"default\": 1.0}, \"name\": {\"type\": \"string\", \"description\": \"对象名称\", \"default\": \"Sphere\"} }, self._create_sphere ) self.register_tool( \"set_material\", \"设置材质\", { \"object_name\": {\"type\": \"string\", \"description\": \"对象名称\"}, \"material_name\": {\"type\": \"string\", \"description\": \"材质名称\"}, \"color\": {\"type\": \"array\", \"description\": \"颜色RGBA\", \"default\": [1,1,1,1]} }, self._set_material ) self.register_tool( \"render_scene\", \"渲染场景\", { \"output_path\": {\"type\": \"string\", \"description\": \"输出路径\"}, \"resolution\": {\"type\": \"array\", \"description\": \"分辨率[width,height]\", \"default\": [1920,1080]} }, self._render_scene ) async def _create_cube(self, args): \"\"\"创建立方体\"\"\" blender_script = f\'\'\'import bpy# 创建立方体bpy.ops.mesh.primitive_cube_add( location=({args.get(\"location\", [0,0,0])[0]}, {args.get(\"location\", [0,0,0])[1]}, {args.get(\"location\", [0,0,0])[2]}))# 设置名称bpy.context.active_object.name = \"{args.get(\"name\", \"Cube\")}\"# 设置缩放bpy.context.active_object.scale = ({args.get(\"scale\", [1,1,1])[0]}, {args.get(\"scale\", [1,1,1])[1]}, {args.get(\"scale\", [1,1,1])[2]})\'\'\' result = await self._execute_blender_script(blender_script) return f\"立方体\'{args.get(\'name\', \'Cube\')}\'创建成功\" async def _create_sphere(self, args): \"\"\"创建球体\"\"\" blender_script = f\'\'\'import bpy# 创建球体bpy.ops.mesh.primitive_uv_sphere_add( radius={args.get(\"radius\", 1.0)}, location=({args.get(\"location\", [0,0,0])[0]}, {args.get(\"location\", [0,0,0])[1]}, {args.get(\"location\", [0,0,0])[2]}))# 设置名称bpy.context.active_object.name = \"{args.get(\"name\", \"Sphere\")}\"\'\'\' result = await self._execute_blender_script(blender_script) return f\"球体\'{args.get(\'name\', \'Sphere\')}\'创建成功\" async def _set_material(self, args): \"\"\"设置材质\"\"\" object_name = args[\"object_name\"] material_name = args[\"material_name\"] color = args.get(\"color\", [1,1,1,1]) blender_script = f\'\'\'import bpy# 获取对象obj = bpy.data.objects.get(\"{object_name}\")if obj is None: raise Exception(\"对象未找到: {object_name}\")# 创建材质mat = bpy.data.materials.new(name=\"{material_name}\")mat.use_nodes = True# 设置颜色bsdf = mat.node_tree.nodes[\"Principled BSDF\"]bsdf.inputs[0].default_value = ({color[0]}, {color[1]}, {color[2]}, {color[3]})# 应用材质if obj.data.materials: obj.data.materials[0] = matelse: obj.data.materials.append(mat)\'\'\' result = await self._execute_blender_script(blender_script) return f\"材质\'{material_name}\'已应用到对象\'{object_name}\'\" async def _render_scene(self, args): \"\"\"渲染场景\"\"\" output_path = args[\"output_path\"] resolution = args.get(\"resolution\", [1920, 1080]) blender_script = f\'\'\'import bpy# 设置渲染分辨率bpy.context.scene.render.resolution_x = {resolution[0]}bpy.context.scene.render.resolution_y = {resolution[1]}# 设置输出路径bpy.context.scene.render.filepath = \"{output_path}\"# 开始渲染bpy.ops.render.render(write_still=True)\'\'\' result = await self._execute_blender_script(blender_script) return f\"场景已渲染到: {output_path}\" async def _execute_blender_script(self, script): \"\"\"执行Blender脚本\"\"\" import subprocess import tempfile import os # 创建临时脚本文件 with tempfile.NamedTemporaryFile(mode=\'w\', suffix=\'.py\', delete=False) as f: f.write(script) script_path = f.name try: # 执行Blender脚本 result = subprocess.run( [\"blender\", \"--background\", \"--python\", script_path], capture_output=True, text=True, check=True ) return result.stdout except subprocess.CalledProcessError as e: raise Exception(f\"Blender脚本执行失败: {e.stderr}\") finally: # 清理临时文件 os.unlink(script_path)

媒体与内容类

4. ElevenLabs MCP

GitHub: https://github.com/elevenlabs/elevenlabs-mcp

生成语音和自定义AI语音的服务。

class ElevenLabsMCPServer(MCPServer): def __init__(self, api_key): super().__init__(\"ElevenLabs MCP Server\") self.api_key = api_key self._register_elevenlabs_tools() def _register_elevenlabs_tools(self): \"\"\"注册ElevenLabs工具\"\"\" self.register_tool( \"text_to_speech\", \"文本转语音\", { \"text\": {\"type\": \"string\", \"description\": \"要转换的文本\"}, \"voice_id\": {\"type\": \"string\", \"description\": \"语音ID\"}, \"stability\": {\"type\": \"number\", \"description\": \"稳定性\", \"default\": 0.5}, \"similarity_boost\": {\"type\": \"number\", \"description\": \"相似度增强\", \"default\": 0.5} }, self._text_to_speech ) self.register_tool( \"get_voices\", \"获取可用语音列表\", {}, self._get_voices ) self.register_tool( \"clone_voice\", \"克隆语音\", { \"name\": {\"type\": \"string\", \"description\": \"语音名称\"}, \"description\": {\"type\": \"string\", \"description\": \"语音描述\"}, \"files\": {\"type\": \"array\", \"description\": \"音频文件路径列表\"} }, self._clone_voice ) async def _text_to_speech(self, args): \"\"\"文本转语音\"\"\" url = f\"https://api.elevenlabs.io/v1/text-to-speech/{args[\'voice_id\']}\" headers = { \"Accept\": \"audio/mpeg\", \"Content-Type\": \"application/json\", \"xi-api-key\": self.api_key } data = { \"text\": args[\"text\"], \"model_id\": \"eleven_monolingual_v1\", \"voice_settings\": { \"stability\": args.get(\"stability\", 0.5), \"similarity_boost\": args.get(\"similarity_boost\", 0.5) } } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 200:  # 保存音频文件  import tempfile  with tempfile.NamedTemporaryFile(suffix=\'.mp3\', delete=False) as f: f.write(await response.read()) return f\"语音生成成功,保存到: {f.name}\" else:  error = await response.text()  raise Exception(f\"语音生成失败: {error}\") async def _get_voices(self, args): \"\"\"获取语音列表\"\"\" url = \"https://api.elevenlabs.io/v1/voices\" headers = {\"xi-api-key\": self.api_key} async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  data = await response.json()  voices = []  for voice in data[\"voices\"]: voices.append({ \"voice_id\": voice[\"voice_id\"], \"name\": voice[\"name\"], \"category\": voice[\"category\"], \"description\": voice.get(\"description\", \"\") })  return voices else:  error = await response.text()  raise Exception(f\"获取语音列表失败: {error}\") async def _clone_voice(self, args): \"\"\"克隆语音\"\"\" url = \"https://api.elevenlabs.io/v1/voices/add\" headers = {\"xi-api-key\": self.api_key} # 准备文件上传 data = aiohttp.FormData() data.add_field(\'name\', args[\'name\']) data.add_field(\'description\', args[\'description\']) for file_path in args[\'files\']: with open(file_path, \'rb\') as f: data.add_field(\'files\', f, filename=os.path.basename(file_path)) async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, data=data) as response: if response.status == 200:  result = await response.json()  return f\"语音克隆成功,语音ID: {result[\'voice_id\']}\" else:  error = await response.text()  raise Exception(f\"语音克隆失败: {error}\")
5. Spotify MCP

GitHub: https://github.com/varunneal/spotify-mcp

从Spotify启动、搜索和获取特定详细信息。

class SpotifyMCPServer(MCPServer): def __init__(self, client_id, client_secret): super().__init__(\"Spotify MCP Server\") self.client_id = client_id self.client_secret = client_secret self.access_token = None self._register_spotify_tools() def _register_spotify_tools(self): \"\"\"注册Spotify工具\"\"\" self.register_tool( \"search_tracks\", \"搜索歌曲\", { \"query\": {\"type\": \"string\", \"description\": \"搜索查询\"}, \"limit\": {\"type\": \"integer\", \"description\": \"结果限制\", \"default\": 10} }, self._search_tracks ) self.register_tool( \"get_track_info\", \"获取歌曲信息\", { \"track_id\": {\"type\": \"string\", \"description\": \"歌曲ID\"} }, self._get_track_info ) self.register_tool( \"create_playlist\", \"创建播放列表\", { \"name\": {\"type\": \"string\", \"description\": \"播放列表名称\"}, \"description\": {\"type\": \"string\", \"description\": \"播放列表描述\", \"default\": \"\"}, \"public\": {\"type\": \"boolean\", \"description\": \"是否公开\", \"default\": false} }, self._create_playlist ) self.register_tool( \"add_tracks_to_playlist\", \"添加歌曲到播放列表\", { \"playlist_id\": {\"type\": \"string\", \"description\": \"播放列表ID\"}, \"track_uris\": {\"type\": \"array\", \"description\": \"歌曲URI列表\"} }, self._add_tracks_to_playlist ) async def _get_access_token(self): \"\"\"获取访问令牌\"\"\" if self.access_token: return self.access_token url = \"https://accounts.spotify.com/api/token\" headers = {\"Content-Type\": \"application/x-www-form-urlencoded\"} data = { \"grant_type\": \"client_credentials\", \"client_id\": self.client_id, \"client_secret\": self.client_secret } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, data=data) as response: if response.status == 200:  result = await response.json()  self.access_token = result[\"access_token\"]  return self.access_token else:  raise Exception(\"获取Spotify访问令牌失败\") async def _search_tracks(self, args): \"\"\"搜索歌曲\"\"\" token = await self._get_access_token() url = \"https://api.spotify.com/v1/search\" headers = {\"Authorization\": f\"Bearer {token}\"} params = { \"q\": args[\"query\"], \"type\": \"track\", \"limit\": args.get(\"limit\", 10) } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, params=params) as response: if response.status == 200:  data = await response.json()  tracks = []  for track in data[\"tracks\"][\"items\"]: tracks.append({ \"id\": track[\"id\"], \"name\": track[\"name\"], \"artists\": [artist[\"name\"] for artist in track[\"artists\"]], \"album\": track[\"album\"][\"name\"], \"uri\": track[\"uri\"], \"preview_url\": track[\"preview_url\"] })  return tracks else:  error = await response.text()  raise Exception(f\"搜索歌曲失败: {error}\") async def _get_track_info(self, args): \"\"\"获取歌曲信息\"\"\" token = await self._get_access_token() track_id = args[\"track_id\"] url = f\"https://api.spotify.com/v1/tracks/{track_id}\" headers = {\"Authorization\": f\"Bearer {token}\"} async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  track = await response.json()  return { \"id\": track[\"id\"], \"name\": track[\"name\"], \"artists\": [artist[\"name\"] for artist in track[\"artists\"]], \"album\": track[\"album\"][\"name\"], \"duration_ms\": track[\"duration_ms\"], \"popularity\": track[\"popularity\"], \"explicit\": track[\"explicit\"], \"preview_url\": track[\"preview_url\"]  } else:  error = await response.text()  raise Exception(f\"获取歌曲信息失败: {error}\") async def _create_playlist(self, args): \"\"\"创建播放列表\"\"\" # 注意:创建播放列表需要用户授权,这里仅展示API调用结构 token = await self._get_access_token() # 这里需要用户ID,实际应用中需要通过OAuth获取 user_id = \"user_id_placeholder\" url = f\"https://api.spotify.com/v1/users/{user_id}/playlists\" headers = { \"Authorization\": f\"Bearer {token}\", \"Content-Type\": \"application/json\" } data = { \"name\": args[\"name\"], \"description\": args.get(\"description\", \"\"), \"public\": args.get(\"public\", False) } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 201:  playlist = await response.json()  return { \"id\": playlist[\"id\"], \"name\": playlist[\"name\"], \"external_urls\": playlist[\"external_urls\"]  } else:  error = await response.text()  raise Exception(f\"创建播放列表失败: {error}\") async def _add_tracks_to_playlist(self, args): \"\"\"添加歌曲到播放列表\"\"\" token = await self._get_access_token() playlist_id = args[\"playlist_id\"] url = f\"https://api.spotify.com/v1/playlists/{playlist_id}/tracks\" headers = { \"Authorization\": f\"Bearer {token}\", \"Content-Type\": \"application/json\" } data = {\"uris\": args[\"track_uris\"]} async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 201:  result = await response.json()  return f\"成功添加{len(args[\'track_uris\'])}首歌曲到播放列表\" else:  error = await response.text()  raise Exception(f\"添加歌曲失败: {error}\")

浏览器自动化类

6. Browser MCP

GitHub: https://github.com/browsermcp/mcp

使用编码代理自动化浏览器。

class BrowserMCPServer(MCPServer): def __init__(self): super().__init__(\"Browser MCP Server\") self.browser = None self.page = None self._register_browser_tools() def _register_browser_tools(self): \"\"\"注册浏览器工具\"\"\" self.register_tool( \"launch_browser\", \"启动浏览器\", { \"headless\": {\"type\": \"boolean\", \"description\": \"是否无头模式\", \"default\": True}, \"browser_type\": {\"type\": \"string\", \"description\": \"浏览器类型\", \"default\": \"chromium\"} }, self._launch_browser ) self.register_tool( \"navigate_to\", \"导航到URL\", { \"url\": {\"type\": \"string\", \"description\": \"目标URL\"} }, self._navigate_to ) self.register_tool( \"click_element\", \"点击元素\", { \"selector\": {\"type\": \"string\", \"description\": \"CSS选择器\"} }, self._click_element ) self.register_tool( \"type_text\", \"输入文本\", { \"selector\": {\"type\": \"string\", \"description\": \"CSS选择器\"}, \"text\": {\"type\": \"string\", \"description\": \"要输入的文本\"} }, self._type_text ) self.register_tool( \"get_text\", \"获取文本\", { \"selector\": {\"type\": \"string\", \"description\": \"CSS选择器\"} }, self._get_text ) self.register_tool( \"screenshot\", \"截图\", { \"path\": {\"type\": \"string\", \"description\": \"保存路径\", \"default\": \"screenshot.png\"} }, self._screenshot ) self.register_tool( \"wait_for_element\", \"等待元素\", { \"selector\": {\"type\": \"string\", \"description\": \"CSS选择器\"}, \"timeout\": {\"type\": \"number\", \"description\": \"超时时间(毫秒)\", \"default\": 30000} }, self._wait_for_element ) async def _launch_browser(self, args): \"\"\"启动浏览器\"\"\" from playwright.async_api import async_playwright self.playwright = await async_playwright().start() browser_type = args.get(\"browser_type\", \"chromium\") headless = args.get(\"headless\", True) if browser_type == \"chromium\": self.browser = await self.playwright.chromium.launch(headless=headless) elif browser_type == \"firefox\": self.browser = await self.playwright.firefox.launch(headless=headless) elif browser_type == \"webkit\": self.browser = await self.playwright.webkit.launch(headless=headless) else: raise Exception(f\"不支持的浏览器类型: {browser_type}\") self.page = await self.browser.new_page() return f\"{browser_type}浏览器启动成功\" async def _navigate_to(self, args): \"\"\"导航到URL\"\"\" if not self.page: raise Exception(\"浏览器未启动\") url = args[\"url\"] await self.page.goto(url) return f\"已导航到: {url}\" async def _click_element(self, args): \"\"\"点击元素\"\"\" if not self.page: raise Exception(\"浏览器未启动\") selector = args[\"selector\"] await self.page.click(selector) return f\"已点击元素: {selector}\" async def _type_text(self, args): \"\"\"输入文本\"\"\" if not self.page: raise Exception(\"浏览器未启动\") selector = args[\"selector\"] text = args[\"text\"] await self.page.fill(selector, text) return f\"已在{selector}输入文本: {text}\" async def _get_text(self, args): \"\"\"获取文本\"\"\" if not self.page: raise Exception(\"浏览器未启动\") selector = args[\"selector\"] text = await self.page.text_content(selector) return text or \"\" async def _screenshot(self, args): \"\"\"截图\"\"\" if not self.page: raise Exception(\"浏览器未启动\") path = args.get(\"path\", \"screenshot.png\") await self.page.screenshot(path=path) return f\"截图已保存到: {path}\" async def _wait_for_element(self, args): \"\"\"等待元素\"\"\" if not self.page: raise Exception(\"浏览器未启动\") selector = args[\"selector\"] timeout = args.get(\"timeout\", 30000) await self.page.wait_for_selector(selector, timeout=timeout) return f\"元素{selector}已出现\"
7. Playwright MCP

GitHub: https://github.com/microsoft/playwright-mcp

使用Playwright的专业浏览器自动化功能。

class PlaywrightMCPServer(MCPServer): def __init__(self): super().__init__(\"Playwright MCP Server\") self.playwright = None self.browser = None self.context = None self.page = None self._register_playwright_tools() def _register_playwright_tools(self): \"\"\"注册Playwright工具\"\"\" self.register_tool( \"start_playwright\", \"启动Playwright\", { \"browser\": {\"type\": \"string\", \"description\": \"浏览器类型\", \"default\": \"chromium\"}, \"headless\": {\"type\": \"boolean\", \"description\": \"无头模式\", \"default\": True} }, self._start_playwright ) self.register_tool( \"create_context\", \"创建浏览器上下文\", { \"viewport\": {\"type\": \"object\", \"description\": \"视口大小\", \"default\": {\"width\": 1280, \"height\": 720}}, \"user_agent\": {\"type\": \"string\", \"description\": \"用户代理\", \"default\": \"\"} }, self._create_context ) self.register_tool( \"new_page\", \"创建新页面\", {}, self._new_page ) self.register_tool( \"goto\", \"导航到页面\", { \"url\": {\"type\": \"string\", \"description\": \"目标URL\"}, \"wait_until\": {\"type\": \"string\", \"description\": \"等待条件\", \"default\": \"load\"} }, self._goto ) self.register_tool( \"fill\", \"填充表单\", { \"selector\": {\"type\": \"string\", \"description\": \"元素选择器\"}, \"value\": {\"type\": \"string\", \"description\": \"填充值\"} }, self._fill ) self.register_tool( \"click\", \"点击元素\", { \"selector\": {\"type\": \"string\", \"description\": \"元素选择器\"}, \"button\": {\"type\": \"string\", \"description\": \"鼠标按钮\", \"default\": \"left\"} }, self._click ) self.register_tool( \"wait_for_selector\", \"等待选择器\", { \"selector\": {\"type\": \"string\", \"description\": \"元素选择器\"}, \"state\": {\"type\": \"string\", \"description\": \"等待状态\", \"default\": \"visible\"}, \"timeout\": {\"type\": \"number\", \"description\": \"超时时间\", \"default\": 30000} }, self._wait_for_selector ) self.register_tool( \"evaluate\", \"执行JavaScript\", { \"expression\": {\"type\": \"string\", \"description\": \"JavaScript表达式\"} }, self._evaluate ) self.register_tool( \"screenshot\", \"页面截图\", { \"path\": {\"type\": \"string\", \"description\": \"保存路径\"}, \"full_page\": {\"type\": \"boolean\", \"description\": \"全页截图\", \"default\": False} }, self._screenshot ) self.register_tool( \"pdf\", \"生成PDF\", { \"path\": {\"type\": \"string\", \"description\": \"保存路径\"}, \"format\": {\"type\": \"string\", \"description\": \"页面格式\", \"default\": \"A4\"} }, self._pdf ) async def _start_playwright(self, args): \"\"\"启动Playwright\"\"\" from playwright.async_api import async_playwright self.playwright = await async_playwright().start() browser_type = args.get(\"browser\", \"chromium\") headless = args.get(\"headless\", True) if browser_type == \"chromium\": self.browser = await self.playwright.chromium.launch(headless=headless) elif browser_type == \"firefox\": self.browser = await self.playwright.firefox.launch(headless=headless) elif browser_type == \"webkit\": self.browser = await self.playwright.webkit.launch(headless=headless) else: raise Exception(f\"不支持的浏览器: {browser_type}\") return f\"Playwright {browser_type} 启动成功\" async def _create_context(self, args): \"\"\"创建浏览器上下文\"\"\" if not self.browser: raise Exception(\"浏览器未启动\") viewport = args.get(\"viewport\", {\"width\": 1280, \"height\": 720}) user_agent = args.get(\"user_agent\", \"\") context_options = {\"viewport\": viewport} if user_agent: context_options[\"user_agent\"] = user_agent self.context = await self.browser.new_context(**context_options) return \"浏览器上下文创建成功\" async def _new_page(self, args): \"\"\"创建新页面\"\"\" if not self.context: raise Exception(\"浏览器上下文未创建\") self.page = await self.context.new_page() return \"新页面创建成功\" async def _goto(self, args): \"\"\"导航到页面\"\"\" if not self.page: raise Exception(\"页面未创建\") url = args[\"url\"] wait_until = args.get(\"wait_until\", \"load\") await self.page.goto(url, wait_until=wait_until) return f\"已导航到: {url}\" async def _fill(self, args): \"\"\"填充表单\"\"\" if not self.page: raise Exception(\"页面未创建\") selector = args[\"selector\"] value = args[\"value\"] await self.page.fill(selector, value) return f\"已填充 {selector}: {value}\" async def _click(self, args): \"\"\"点击元素\"\"\" if not self.page: raise Exception(\"页面未创建\") selector = args[\"selector\"] button = args.get(\"button\", \"left\") await self.page.click(selector, button=button) return f\"已点击: {selector}\" async def _wait_for_selector(self, args): \"\"\"等待选择器\"\"\" if not self.page: raise Exception(\"页面未创建\") selector = args[\"selector\"] state = args.get(\"state\", \"visible\") timeout = args.get(\"timeout\", 30000) await self.page.wait_for_selector(selector, state=state, timeout=timeout) return f\"选择器 {selector}{state}\" async def _evaluate(self, args): \"\"\"执行JavaScript\"\"\" if not self.page: raise Exception(\"页面未创建\") expression = args[\"expression\"] result = await self.page.evaluate(expression) return str(result) async def _screenshot(self, args): \"\"\"页面截图\"\"\" if not self.page: raise Exception(\"页面未创建\") path = args[\"path\"] full_page = args.get(\"full_page\", False) await self.page.screenshot(path=path, full_page=full_page) return f\"截图已保存: {path}\" async def _pdf(self, args): \"\"\"生成PDF\"\"\" if not self.page: raise Exception(\"页面未创建\") path = args[\"path\"] format_type = args.get(\"format\", \"A4\") await self.page.pdf(path=path, format=format_type) return f\"PDF已生成: {path}\"

数据库类

8. Supabase MCP

GitHub: https://github.com/supabase-community/supabase-mcp

将Supabase连接到AI助手。

class SupabaseMCPServer(MCPServer): def __init__(self, supabase_url, supabase_key): super().__init__(\"Supabase MCP Server\") self.supabase_url = supabase_url self.supabase_key = supabase_key self.client = None self._register_supabase_tools() def _register_supabase_tools(self): \"\"\"注册Supabase工具\"\"\" self.register_tool( \"query_table\", \"查询表数据\", { \"table\": {\"type\": \"string\", \"description\": \"表名\"}, \"columns\": {\"type\": \"array\", \"description\": \"查询列\", \"default\": [\"*\"]}, \"filters\": {\"type\": \"object\", \"description\": \"过滤条件\", \"default\": {}}, \"limit\": {\"type\": \"integer\", \"description\": \"结果限制\", \"default\": 100} }, self._query_table ) self.register_tool( \"insert_data\", \"插入数据\", { \"table\": {\"type\": \"string\", \"description\": \"表名\"}, \"data\": {\"type\": \"object\", \"description\": \"插入的数据\"} }, self._insert_data ) self.register_tool( \"update_data\", \"更新数据\", { \"table\": {\"type\": \"string\", \"description\": \"表名\"}, \"data\": {\"type\": \"object\", \"description\": \"更新的数据\"}, \"filters\": {\"type\": \"object\", \"description\": \"过滤条件\"} }, self._update_data ) self.register_tool( \"delete_data\", \"删除数据\", { \"table\": {\"type\": \"string\", \"description\": \"表名\"}, \"filters\": {\"type\": \"object\", \"description\": \"过滤条件\"} }, self._delete_data ) self.register_tool( \"call_function\", \"调用函数\", { \"function_name\": {\"type\": \"string\", \"description\": \"函数名\"}, \"params\": {\"type\": \"object\", \"description\": \"函数参数\", \"default\": {}} }, self._call_function ) async def _get_client(self): \"\"\"获取Supabase客户端\"\"\" if not self.client: try: from supabase import create_client, Client self.client = create_client(self.supabase_url, self.supabase_key) except ImportError: raise Exception(\"请安装supabase-py: pip install supabase\") return self.client async def _query_table(self, args): \"\"\"查询表数据\"\"\" client = await self._get_client() table = args[\"table\"] columns = args.get(\"columns\", [\"*\"]) filters = args.get(\"filters\", {}) limit = args.get(\"limit\", 100) try: query = client.table(table).select(\",\".join(columns)) # 应用过滤条件 for key, value in filters.items(): if isinstance(value, dict):  # 支持操作符,如 {\"gt\": 10}  for op, val in value.items(): if op == \"eq\": query = query.eq(key, val) elif op == \"gt\": query = query.gt(key, val) elif op == \"lt\": query = query.lt(key, val) elif op == \"like\": query = query.like(key, val) else:  query = query.eq(key, value) query = query.limit(limit) result = query.execute() return { \"data\": result.data, \"count\": len(result.data) } except Exception as e: raise Exception(f\"查询失败: {str(e)}\") async def _insert_data(self, args): \"\"\"插入数据\"\"\" client = await self._get_client() table = args[\"table\"] data = args[\"data\"] try: result = client.table(table).insert(data).execute() return { \"success\": True, \"data\": result.data, \"message\": f\"成功插入 {len(result.data)} 条记录\" } except Exception as e: raise Exception(f\"插入失败: {str(e)}\") async def _update_data(self, args): \"\"\"更新数据\"\"\" client = await self._get_client() table = args[\"table\"] data = args[\"data\"] filters = args[\"filters\"] try: query = client.table(table).update(data) # 应用过滤条件 for key, value in filters.items(): query = query.eq(key, value) result = query.execute() return { \"success\": True, \"data\": result.data, \"message\": f\"成功更新 {len(result.data)} 条记录\" } except Exception as e: raise Exception(f\"更新失败: {str(e)}\") async def _delete_data(self, args): \"\"\"删除数据\"\"\" client = await self._get_client() table = args[\"table\"] filters = args[\"filters\"] try: query = client.table(table).delete() # 应用过滤条件 for key, value in filters.items(): query = query.eq(key, value) result = query.execute() return { \"success\": True, \"message\": f\"成功删除 {len(result.data)} 条记录\" } except Exception as e: raise Exception(f\"删除失败: {str(e)}\") async def _call_function(self, args): \"\"\"调用函数\"\"\" client = await self._get_client() function_name = args[\"function_name\"] params = args.get(\"params\", {}) try: result = client.rpc(function_name, params).execute() return { \"success\": True, \"data\": result.data } except Exception as e: raise Exception(f\"函数调用失败: {str(e)}\")
9. MySQL MCP Server

GitHub: https://github.com/benborla/mcp-server-mysql

这是一个为MySQL数据库提供只读访问的模型上下文协议服务器,使LLM能够检查数据库架构和执行只读查询。

class MySQLMCPServer(MCPServer): def __init__(self, host, port, username, password, database): super().__init__(\"MySQL MCP Server\") self.connection_config = { \'host\': host, \'port\': port, \'user\': username, \'password\': password, \'database\': database, \'charset\': \'utf8mb4\' } self.connection = None self._register_mysql_tools() def _register_mysql_tools(self): \"\"\"注册MySQL工具\"\"\" self.register_tool( \"describe_database\", \"描述数据库结构\", {}, self._describe_database ) self.register_tool( \"list_tables\", \"列出所有表\", {}, self._list_tables ) self.register_tool( \"describe_table\", \"描述表结构\", { \"table_name\": {\"type\": \"string\", \"description\": \"表名\"} }, self._describe_table ) self.register_tool( \"execute_query\", \"执行只读查询\", { \"query\": {\"type\": \"string\", \"description\": \"SQL查询语句\"}, \"limit\": {\"type\": \"integer\", \"description\": \"结果限制\", \"default\": 100} }, self._execute_query ) self.register_tool( \"get_table_sample\", \"获取表样本数据\", { \"table_name\": {\"type\": \"string\", \"description\": \"表名\"}, \"limit\": {\"type\": \"integer\", \"description\": \"样本数量\", \"default\": 10} }, self._get_table_sample ) self.register_tool( \"analyze_table\", \"分析表统计信息\", { \"table_name\": {\"type\": \"string\", \"description\": \"表名\"} }, self._analyze_table ) self.register_tool( \"search_columns\", \"搜索包含特定列名的表\", { \"column_pattern\": {\"type\": \"string\", \"description\": \"列名模式\"} }, self._search_columns ) self.register_tool( \"get_foreign_keys\", \"获取外键关系\", { \"table_name\": {\"type\": \"string\", \"description\": \"表名\", \"default\": \"\"} }, self._get_foreign_keys ) async def _get_connection(self): \"\"\"获取数据库连接\"\"\" if not self.connection: try: import aiomysql self.connection = await aiomysql.connect(**self.connection_config) except ImportError: raise Exception(\"请安装aiomysql: pip install aiomysql\") except Exception as e: raise Exception(f\"数据库连接失败: {str(e)}\") return self.connection async def _execute_sql(self, sql, params=None): \"\"\"执行SQL语句\"\"\" connection = await self._get_connection() cursor = await connection.cursor(aiomysql.DictCursor) try: await cursor.execute(sql, params) result = await cursor.fetchall() return result except Exception as e: raise Exception(f\"SQL执行失败: {str(e)}\") finally: await cursor.close() async def _describe_database(self, args): \"\"\"描述数据库结构\"\"\" # 获取数据库基本信息 db_info_sql = \"\"\" SELECT SCHEMA_NAME as database_name, DEFAULT_CHARACTER_SET_NAME as charset, DEFAULT_COLLATION_NAME as collation FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = %s \"\"\" db_info = await self._execute_sql(db_info_sql, (self.connection_config[\'database\'],)) # 获取表统计信息 table_stats_sql = \"\"\" SELECT COUNT(*) as total_tables, SUM(TABLE_ROWS) as total_rows, ROUND(SUM(DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) as total_size_mb FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_TYPE = \'BASE TABLE\' \"\"\" table_stats = await self._execute_sql(table_stats_sql, (self.connection_config[\'database\'],)) # 获取表类型分布 table_types_sql = \"\"\" SELECT ENGINE, COUNT(*) as count FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_TYPE = \'BASE TABLE\' GROUP BY ENGINE \"\"\" table_types = await self._execute_sql(table_types_sql, (self.connection_config[\'database\'],)) return { \"database_info\": db_info[0] if db_info else {}, \"statistics\": table_stats[0] if table_stats else {}, \"storage_engines\": table_types, \"connection_info\": { \"host\": self.connection_config[\'host\'], \"port\": self.connection_config[\'port\'], \"database\": self.connection_config[\'database\'] } } async def _list_tables(self, args): \"\"\"列出所有表\"\"\" sql = \"\"\" SELECT TABLE_NAME as table_name, TABLE_TYPE as table_type, ENGINE as engine, TABLE_ROWS as estimated_rows, ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) as size_mb, TABLE_COMMENT as comment, CREATE_TIME as created_at, UPDATE_TIME as updated_at FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s ORDER BY TABLE_NAME \"\"\" tables = await self._execute_sql(sql, (self.connection_config[\'database\'],)) return { \"tables\": tables, \"total_count\": len(tables) } async def _describe_table(self, args): \"\"\"描述表结构\"\"\" table_name = args[\"table_name\"] # 获取列信息 columns_sql = \"\"\" SELECT COLUMN_NAME as column_name, DATA_TYPE as data_type, IS_NULLABLE as is_nullable, COLUMN_DEFAULT as default_value, COLUMN_KEY as key_type, EXTRA as extra, COLUMN_COMMENT as comment, CHARACTER_MAXIMUM_LENGTH as max_length, NUMERIC_PRECISION as precision, NUMERIC_SCALE as scale FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s ORDER BY ORDINAL_POSITION \"\"\" columns = await self._execute_sql(columns_sql, (self.connection_config[\'database\'], table_name)) # 获取索引信息 indexes_sql = \"\"\" SELECT INDEX_NAME as index_name, COLUMN_NAME as column_name, SEQ_IN_INDEX as sequence, NON_UNIQUE as non_unique, INDEX_TYPE as index_type FROM information_schema.STATISTICS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s ORDER BY INDEX_NAME, SEQ_IN_INDEX \"\"\" indexes = await self._execute_sql(indexes_sql, (self.connection_config[\'database\'], table_name)) # 获取表信息 table_info_sql = \"\"\" SELECT TABLE_NAME as table_name, ENGINE as engine, TABLE_ROWS as estimated_rows, AVG_ROW_LENGTH as avg_row_length, ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) as size_mb, TABLE_COMMENT as comment, CREATE_TIME as created_at, UPDATE_TIME as updated_at FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s \"\"\" table_info = await self._execute_sql(table_info_sql, (self.connection_config[\'database\'], table_name)) return { \"table_info\": table_info[0] if table_info else {}, \"columns\": columns, \"indexes\": indexes, \"column_count\": len(columns), \"index_count\": len(set(idx[\'index_name\'] for idx in indexes)) } async def _execute_query(self, args): \"\"\"执行只读查询\"\"\" query = args[\"query\"].strip() limit = args.get(\"limit\", 100) # 安全检查:只允许SELECT语句 if not query.upper().startswith(\'SELECT\'): raise Exception(\"只允许执行SELECT查询\") # 检查是否包含危险关键词 dangerous_keywords = [\'DELETE\', \'UPDATE\', \'INSERT\', \'DROP\', \'CREATE\', \'ALTER\', \'TRUNCATE\'] query_upper = query.upper() for keyword in dangerous_keywords: if keyword in query_upper: raise Exception(f\"查询包含禁止的关键词: {keyword}\") # 添加LIMIT限制 if \'LIMIT\' not in query_upper: query += f\" LIMIT {limit}\" try: start_time = time.time() result = await self._execute_sql(query) execution_time = time.time() - start_time return { \"query\": query, \"results\": result, \"row_count\": len(result), \"execution_time_seconds\": round(execution_time, 3), \"columns\": list(result[0].keys()) if result else [] } except Exception as e: raise Exception(f\"查询执行失败: {str(e)}\") async def _get_table_sample(self, args): \"\"\"获取表样本数据\"\"\" table_name = args[\"table_name\"] limit = args.get(\"limit\", 10) # 验证表名(防止SQL注入) tables = await self._list_tables({}) table_names = [t[\'table_name\'] for t in tables[\'tables\']] if table_name not in table_names: raise Exception(f\"表 \'{table_name}\' 不存在\") sql = f\"SELECT * FROM `{table_name}` LIMIT %s\" result = await self._execute_sql(sql, (limit,)) return { \"table_name\": table_name, \"sample_data\": result, \"sample_size\": len(result), \"columns\": list(result[0].keys()) if result else [] } async def _analyze_table(self, args): \"\"\"分析表统计信息\"\"\" table_name = args[\"table_name\"] # 获取基本统计信息 basic_stats_sql = f\"\"\" SELECT COUNT(*) as total_rows, COUNT(DISTINCT *) as unique_rows FROM `{table_name}` \"\"\" basic_stats = await self._execute_sql(basic_stats_sql) # 获取数值列的统计信息 numeric_columns_sql = \"\"\" SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s AND DATA_TYPE IN (\'int\', \'bigint\', \'decimal\', \'float\', \'double\', \'tinyint\', \'smallint\', \'mediumint\') \"\"\" numeric_columns = await self._execute_sql(numeric_columns_sql, (self.connection_config[\'database\'], table_name)) column_stats = {} for col in numeric_columns: col_name = col[\'COLUMN_NAME\'] stats_sql = f\"\"\" SELECT  MIN(`{col_name}`) as min_value, MAX(`{col_name}`) as max_value, AVG(`{col_name}`) as avg_value, COUNT(DISTINCT `{col_name}`) as distinct_count, COUNT(`{col_name}`) as non_null_count FROM `{table_name}` \"\"\" col_stats = await self._execute_sql(stats_sql) column_stats[col_name] = col_stats[0] if col_stats else {} return { \"table_name\": table_name, \"basic_statistics\": basic_stats[0] if basic_stats else {}, \"column_statistics\": column_stats } async def _search_columns(self, args): \"\"\"搜索包含特定列名的表\"\"\" pattern = args[\"column_pattern\"] sql = \"\"\" SELECT TABLE_NAME as table_name, COLUMN_NAME as column_name, DATA_TYPE as data_type, IS_NULLABLE as is_nullable, COLUMN_DEFAULT as default_value, COLUMN_COMMENT as comment FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = %s AND COLUMN_NAME LIKE %s ORDER BY TABLE_NAME, COLUMN_NAME \"\"\" search_pattern = f\"%{pattern}%\" results = await self._execute_sql(sql, (self.connection_config[\'database\'], search_pattern)) # 按表分组 tables_with_columns = {} for row in results: table_name = row[\'table_name\'] if table_name not in tables_with_columns: tables_with_columns[table_name] = [] tables_with_columns[table_name].append({ \'column_name\': row[\'column_name\'], \'data_type\': row[\'data_type\'], \'is_nullable\': row[\'is_nullable\'], \'default_value\': row[\'default_value\'], \'comment\': row[\'comment\'] }) return { \"search_pattern\": pattern, \"matching_tables\": tables_with_columns, \"total_matches\": len(results), \"tables_found\": len(tables_with_columns) } async def _get_foreign_keys(self, args): \"\"\"获取外键关系\"\"\" table_name = args.get(\"table_name\", \"\") if table_name: # 获取特定表的外键 sql = \"\"\" SELECT  TABLE_NAME as table_name, COLUMN_NAME as column_name, CONSTRAINT_NAME as constraint_name, REFERENCED_TABLE_NAME as referenced_table, REFERENCED_COLUMN_NAME as referenced_column FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s AND REFERENCED_TABLE_NAME IS NOT NULL ORDER BY TABLE_NAME, CONSTRAINT_NAME \"\"\" params = (self.connection_config[\'database\'], table_name) else: # 获取所有外键关系 sql = \"\"\" SELECT  TABLE_NAME as table_name, COLUMN_NAME as column_name, CONSTRAINT_NAME as constraint_name, REFERENCED_TABLE_NAME as referenced_table, REFERENCED_COLUMN_NAME as referenced_column FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA = %s AND REFERENCED_TABLE_NAME IS NOT NULL ORDER BY TABLE_NAME, CONSTRAINT_NAME \"\"\" params = (self.connection_config[\'database\'],) foreign_keys = await self._execute_sql(sql, params) return { \"table_name\": table_name or \"all_tables\", \"foreign_keys\": foreign_keys, \"relationship_count\": len(foreign_keys) }
10. PostgreSQL MCP Server

GitHub: https://github.com/modelcontextprotocol/servers/tree/main/src/postgres

PostgreSQL的MCP服务器实现,提供对PostgreSQL数据库的安全访问。

class PostgreSQLMCPServer(MCPServer): def __init__(self, host, port, username, password, database): super().__init__(\"PostgreSQL MCP Server\") self.connection_config = { \'host\': host, \'port\': port, \'user\': username, \'password\': password, \'database\': database } self.connection_pool = None self._register_postgres_tools() def _register_postgres_tools(self): \"\"\"注册PostgreSQL工具\"\"\" self.register_tool( \"describe_database\", \"描述数据库结构\", {}, self._describe_database ) self.register_tool( \"list_schemas\", \"列出所有模式\", {}, self._list_schemas ) self.register_tool( \"list_tables\", \"列出表\", { \"schema\": {\"type\": \"string\", \"description\": \"模式名\", \"default\": \"public\"} }, self._list_tables ) self.register_tool( \"describe_table\", \"描述表结构\", { \"table_name\": {\"type\": \"string\", \"description\": \"表名\"}, \"schema\": {\"type\": \"string\", \"description\": \"模式名\", \"default\": \"public\"} }, self._describe_table ) self.register_tool( \"execute_query\", \"执行只读查询\", { \"query\": {\"type\": \"string\", \"description\": \"SQL查询语句\"}, \"limit\": {\"type\": \"integer\", \"description\": \"结果限制\", \"default\": 100} }, self._execute_query ) self.register_tool( \"get_table_sample\", \"获取表样本数据\", { \"table_name\": {\"type\": \"string\", \"description\": \"表名\"}, \"schema\": {\"type\": \"string\", \"description\": \"模式名\", \"default\": \"public\"}, \"limit\": {\"type\": \"integer\", \"description\": \"样本数量\", \"default\": 10} }, self._get_table_sample ) self.register_tool( \"analyze_table\", \"分析表统计信息\", { \"table_name\": {\"type\": \"string\", \"description\": \"表名\"}, \"schema\": {\"type\": \"string\", \"description\": \"模式名\", \"default\": \"public\"} }, self._analyze_table ) self.register_tool( \"get_table_relationships\", \"获取表关系\", { \"schema\": {\"type\": \"string\", \"description\": \"模式名\", \"default\": \"public\"} }, self._get_table_relationships ) self.register_tool( \"get_database_stats\", \"获取数据库统计信息\", {}, self._get_database_stats ) self.register_tool( \"search_in_tables\", \"在表中搜索数据\", { \"search_term\": {\"type\": \"string\", \"description\": \"搜索词\"}, \"schema\": {\"type\": \"string\", \"description\": \"模式名\", \"default\": \"public\"}, \"table_names\": {\"type\": \"array\", \"description\": \"指定表名列表\", \"default\": []} }, self._search_in_tables ) async def _get_connection_pool(self): \"\"\"获取连接池\"\"\" if not self.connection_pool: try: import asyncpg self.connection_pool = await asyncpg.create_pool(**self.connection_config) except ImportError: raise Exception(\"请安装asyncpg: pip install asyncpg\") except Exception as e: raise Exception(f\"数据库连接失败: {str(e)}\") return self.connection_pool async def _execute_sql(self, sql, params=None): \"\"\"执行SQL语句\"\"\" pool = await self._get_connection_pool() async with pool.acquire() as connection: try: if params:  result = await connection.fetch(sql, *params) else:  result = await connection.fetch(sql) # 转换为字典列表 return [dict(row) for row in result] except Exception as e: raise Exception(f\"SQL执行失败: {str(e)}\") async def _describe_database(self, args): \"\"\"描述数据库结构\"\"\" # 获取数据库基本信息 db_info_sql = \"\"\" SELECT current_database() as database_name, current_user as current_user, version() as postgresql_version \"\"\" db_info = await self._execute_sql(db_info_sql) # 获取数据库大小 size_sql = \"\"\" SELECT pg_size_pretty(pg_database_size(current_database())) as database_size \"\"\" size_info = await self._execute_sql(size_sql) # 获取表统计信息 table_stats_sql = \"\"\" SELECT schemaname as schema_name, COUNT(*) as table_count, SUM(n_tup_ins) as total_inserts, SUM(n_tup_upd) as total_updates, SUM(n_tup_del) as total_deletes FROM pg_stat_user_tables GROUP BY schemaname ORDER BY schema_name \"\"\" table_stats = await self._execute_sql(table_stats_sql) return { \"database_info\": {**db_info[0], **size_info[0]} if db_info and size_info else {}, \"schema_statistics\": table_stats, \"connection_info\": { \"host\": self.connection_config[\'host\'], \"port\": self.connection_config[\'port\'], \"database\": self.connection_config[\'database\'] } } async def _list_schemas(self, args): \"\"\"列出所有模式\"\"\" sql = \"\"\" SELECT schema_name, schema_owner FROM information_schema.schemata WHERE schema_name NOT IN (\'information_schema\', \'pg_catalog\', \'pg_toast\') ORDER BY schema_name \"\"\" schemas = await self._execute_sql(sql) # 获取每个模式的表数量 for schema in schemas: count_sql = \"\"\" SELECT COUNT(*) as table_count FROM information_schema.tables WHERE table_schema = $1 AND table_type = \'BASE TABLE\' \"\"\" count_result = await self._execute_sql(count_sql, (schema[\'schema_name\'],)) schema[\'table_count\'] = count_result[0][\'table_count\'] if count_result else 0 return { \"schemas\": schemas, \"total_schemas\": len(schemas) } async def _list_tables(self, args): \"\"\"列出表\"\"\" schema = args.get(\"schema\", \"public\") sql = \"\"\" SELECT t.table_name, t.table_type, pg_size_pretty(pg_total_relation_size(c.oid)) as size, obj_description(c.oid) as comment, s.n_tup_ins as insert_count, s.n_tup_upd as update_count, s.n_tup_del as delete_count, s.n_live_tup as live_tuples, s.n_dead_tup as dead_tuples FROM information_schema.tables t LEFT JOIN pg_class c ON c.relname = t.table_name LEFT JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = t.table_schema LEFT JOIN pg_stat_user_tables s ON s.relname = t.table_name AND s.schemaname = t.table_schema WHERE t.table_schema = $1 AND t.table_type = \'BASE TABLE\' ORDER BY t.table_name \"\"\" tables = await self._execute_sql(sql, (schema,)) return { \"schema\": schema, \"tables\": tables, \"total_count\": len(tables) } async def _describe_table(self, args): \"\"\"描述表结构\"\"\" table_name = args[\"table_name\"] schema = args.get(\"schema\", \"public\") # 获取列信息 columns_sql = \"\"\" SELECT column_name, data_type, is_nullable, column_default, character_maximum_length, numeric_precision, numeric_scale, ordinal_position FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position \"\"\" columns = await self._execute_sql(columns_sql, (schema, table_name)) # 获取约束信息 constraints_sql = \"\"\" SELECT tc.constraint_name, tc.constraint_type, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema LEFT JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.table_schema = $1 AND tc.table_name = $2 ORDER BY tc.constraint_type, tc.constraint_name \"\"\" constraints = await self._execute_sql(constraints_sql, (schema, table_name)) # 获取索引信息 indexes_sql = \"\"\" SELECT indexname as index_name, indexdef as index_definition FROM pg_indexes WHERE schemaname = $1 AND tablename = $2 ORDER BY indexname \"\"\" indexes = await self._execute_sql(indexes_sql, (schema, table_name)) # 获取表统计信息 stats_sql = \"\"\" SELECT n_live_tup as live_tuples, n_dead_tup as dead_tuples, n_tup_ins as insert_count, n_tup_upd as update_count, n_tup_del as delete_count, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze FROM pg_stat_user_tables WHERE schemaname = $1 AND relname = $2 \"\"\" stats = await self._execute_sql(stats_sql, (schema, table_name)) return { \"schema\": schema, \"table_name\": table_name, \"columns\": columns, \"constraints\": constraints, \"indexes\": indexes, \"statistics\": stats[0] if stats else {}, \"column_count\": len(columns), \"constraint_count\": len(constraints), \"index_count\": len(indexes) } async def _execute_query(self, args): \"\"\"执行只读查询\"\"\" query = args[\"query\"].strip() limit = args.get(\"limit\", 100) # 安全检查:只允许SELECT语句 if not query.upper().startswith(\'SELECT\'): raise Exception(\"只允许执行SELECT查询\") # 检查是否包含危险关键词 dangerous_keywords = [\'DELETE\', \'UPDATE\', \'INSERT\', \'DROP\', \'CREATE\', \'ALTER\', \'TRUNCATE\'] query_upper = query.upper() for keyword in dangerous_keywords: if keyword in query_upper: raise Exception(f\"查询包含禁止的关键词: {keyword}\") # 添加LIMIT限制 if \'LIMIT\' not in query_upper: query += f\" LIMIT {limit}\" try: start_time = time.time() result = await self._execute_sql(query) execution_time = time.time() - start_time return { \"query\": query, \"results\": result, \"row_count\": len(result), \"execution_time_seconds\": round(execution_time, 3), \"columns\": list(result[0].keys()) if result else [] } except Exception as e: raise Exception(f\"查询执行失败: {str(e)}\") async def _get_table_sample(self, args): \"\"\"获取表样本数据\"\"\" table_name = args[\"table_name\"] schema = args.get(\"schema\", \"public\") limit = args.get(\"limit\", 10) # 验证表是否存在 verify_sql = \"\"\" SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 AND table_type = \'BASE TABLE\' \"\"\" verify_result = await self._execute_sql(verify_sql, (schema, table_name)) if not verify_result or verify_result[0][\'count\'] == 0: raise Exception(f\"表 \'{schema}.{table_name}\' 不存在\") sql = f\'SELECT * FROM \"{schema}\".\"{table_name}\" LIMIT $1\' result = await self._execute_sql(sql, (limit,)) return { \"schema\": schema, \"table_name\": table_name, \"sample_data\": result, \"sample_size\": len(result), \"columns\": list(result[0].keys()) if result else [] } async def _analyze_table(self, args): \"\"\"分析表统计信息\"\"\" table_name = args[\"table_name\"] schema = args.get(\"schema\", \"public\") # 获取表的基本统计信息 basic_stats_sql = f\"\"\" SELECT COUNT(*) as total_rows, pg_size_pretty(pg_total_relation_size(\'\"{schema}\".\"{table_name}\"\')) as total_size, pg_size_pretty(pg_relation_size(\'\"{schema}\".\"{table_name}\"\')) as table_size, pg_size_pretty(pg_total_relation_size(\'\"{schema}\".\"{table_name}\"\') - pg_relation_size(\'\"{schema}\".\"{table_name}\"\')) as index_size FROM \"{schema}\".\"{table_name}\" \"\"\" basic_stats = await self._execute_sql(basic_stats_sql) # 获取数值列的统计信息 numeric_columns_sql = \"\"\" SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND data_type IN (\'integer\', \'bigint\', \'decimal\', \'numeric\', \'real\', \'double precision\', \'smallint\') \"\"\" numeric_columns = await self._execute_sql(numeric_columns_sql, (schema, table_name)) column_stats = {} for col in numeric_columns: col_name = col[\'column_name\'] stats_sql = f\"\"\" SELECT  MIN(\"{col_name}\") as min_value, MAX(\"{col_name}\") as max_value, AVG(\"{col_name}\") as avg_value, COUNT(DISTINCT \"{col_name}\") as distinct_count, COUNT(\"{col_name}\") as non_null_count, COUNT(*) - COUNT(\"{col_name}\") as null_count FROM \"{schema}\".\"{table_name}\" \"\"\" col_stats = await self._execute_sql(stats_sql) column_stats[col_name] = col_stats[0] if col_stats else {} # 获取文本列的统计信息 text_columns_sql = \"\"\" SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND data_type IN (\'character varying\', \'varchar\', \'text\', \'char\', \'character\') \"\"\" text_columns = await self._execute_sql(text_columns_sql, (schema, table_name)) text_stats = {} for col in text_columns: col_name = col[\'column_name\'] stats_sql = f\"\"\" SELECT  COUNT(DISTINCT \"{col_name}\") as distinct_count, COUNT(\"{col_name}\") as non_null_count, COUNT(*) - COUNT(\"{col_name}\") as null_count, AVG(LENGTH(\"{col_name}\")) as avg_length, MAX(LENGTH(\"{col_name}\")) as max_length, MIN(LENGTH(\"{col_name}\")) as min_length FROM \"{schema}\".\"{table_name}\" WHERE \"{col_name}\" IS NOT NULL \"\"\" col_stats = await self._execute_sql(stats_sql) text_stats[col_name] = col_stats[0] if col_stats else {} return { \"schema\": schema, \"table_name\": table_name, \"basic_statistics\": basic_stats[0] if basic_stats else {}, \"numeric_column_statistics\": column_stats, \"text_column_statistics\": text_stats } async def _get_table_relationships(self, args): \"\"\"获取表关系\"\"\" schema = args.get(\"schema\", \"public\") sql = \"\"\" SELECT tc.table_name, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name, tc.constraint_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = \'FOREIGN KEY\' AND tc.table_schema = $1 ORDER BY tc.table_name, tc.constraint_name \"\"\" relationships = await self._execute_sql(sql, (schema,)) # 构建关系图 relationship_map = {} for rel in relationships: table = rel[\'table_name\'] if table not in relationship_map: relationship_map[table] = {  \'outgoing_relations\': [],  \'incoming_relations\': [] } relationship_map[table][\'outgoing_relations\'].append({ \'local_column\': rel[\'column_name\'], \'foreign_table\': rel[\'foreign_table_name\'], \'foreign_column\': rel[\'foreign_column_name\'], \'constraint_name\': rel[\'constraint_name\'] }) # 添加反向关系 foreign_table = rel[\'foreign_table_name\'] if foreign_table not in relationship_map: relationship_map[foreign_table] = {  \'outgoing_relations\': [],  \'incoming_relations\': [] } relationship_map[foreign_table][\'incoming_relations\'].append({ \'foreign_table\': table, \'foreign_column\': rel[\'column_name\'], \'local_column\': rel[\'foreign_column_name\'], \'constraint_name\': rel[\'constraint_name\'] }) return { \"schema\": schema, \"relationships\": relationships, \"relationship_map\": relationship_map, \"total_foreign_keys\": len(relationships) } async def _get_database_stats(self, args): \"\"\"获取数据库统计信息\"\"\" # 获取连接统计 connection_stats_sql = \"\"\" SELECT COUNT(*) as total_connections, COUNT(*) FILTER (WHERE state = \'active\') as active_connections, COUNT(*) FILTER (WHERE state = \'idle\') as idle_connections FROM pg_stat_activity WHERE datname = current_database() \"\"\" connection_stats = await self._execute_sql(connection_stats_sql) # 获取数据库大小信息 size_stats_sql = \"\"\" SELECT pg_size_pretty(pg_database_size(current_database())) as database_size, (SELECT COUNT(*) FROM information_schema.tables WHERE table_schema NOT IN (\'information_schema\', \'pg_catalog\')) as total_tables, (SELECT COUNT(*) FROM information_schema.views WHERE table_schema NOT IN (\'information_schema\', \'pg_catalog\')) as total_views \"\"\" size_stats = await self._execute_sql(size_stats_sql) # 获取最活跃的表 active_tables_sql = \"\"\" SELECT schemaname, relname as table_name, seq_scan, seq_tup_read, idx_scan, idx_tup_fetch, n_tup_ins as inserts, n_tup_upd as updates, n_tup_del as deletes FROM pg_stat_user_tables ORDER BY (seq_tup_read + idx_tup_fetch + n_tup_ins + n_tup_upd + n_tup_del) DESC LIMIT 10 \"\"\" active_tables = await self._execute_sql(active_tables_sql) return { \"connection_statistics\": connection_stats[0] if connection_stats else {}, \"size_statistics\": size_stats[0] if size_stats else {}, \"most_active_tables\": active_tables } async def _search_in_tables(self, args): \"\"\"在表中搜索数据\"\"\" search_term = args[\"search_term\"] schema = args.get(\"schema\", \"public\") table_names = args.get(\"table_names\", []) if not table_names: # 获取所有表名 tables_sql = \"\"\" SELECT table_name FROM information_schema.tables WHERE table_schema = $1 AND table_type = \'BASE TABLE\' \"\"\" tables_result = await self._execute_sql(tables_sql, (schema,)) table_names = [t[\'table_name\'] for t in tables_result] search_results = {} for table_name in table_names: # 获取文本列 columns_sql = \"\"\" SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND data_type IN (\'character varying\', \'varchar\', \'text\', \'char\', \'character\') \"\"\" columns = await self._execute_sql(columns_sql, (schema, table_name)) if not columns: continue # 构建搜索查询 where_conditions = [] for col in columns: where_conditions.append(f\'\"{col[\"column_name\"]}\" ILIKE $3\') if where_conditions: search_sql = f\"\"\" SELECT * FROM \"{schema}\".\"{table_name}\" WHERE {\' OR \'.join(where_conditions)} LIMIT 20 \"\"\" try:  results = await self._execute_sql(search_sql, (schema, table_name, f\'%{search_term}%\'))  if results: search_results[table_name] = { \'matches\': results, \'match_count\': len(results), \'searched_columns\': [col[\'column_name\'] for col in columns] } except Exception as e:  search_results[table_name] = { \'error\': str(e), \'matches\': [], \'match_count\': 0  } return { \"search_term\": search_term, \"schema\": schema, \"search_results\": search_results, \"tables_searched\": len(table_names), \"tables_with_matches\": len([t for t in search_results.values() if t.get(\'match_count\', 0) > 0]) }

图表生成类

11. Chart MCP Server

GitHub: https://github.com/modelcontextprotocol/servers/tree/main/src/chart

基础的图表生成MCP服务器,支持多种图表类型。

class ChartMCPServer(MCPServer): def __init__(self): super().__init__(\"Chart MCP Server\") self._register_chart_tools() def _register_chart_tools(self): \"\"\"注册图表工具\"\"\" self.register_tool( \"create_bar_chart\", \"创建柱状图\", { \"data\": {\"type\": \"object\", \"description\": \"图表数据\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"x_label\": {\"type\": \"string\", \"description\": \"X轴标签\", \"default\": \"\"}, \"y_label\": {\"type\": \"string\", \"description\": \"Y轴标签\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 800}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 600} }, self._create_bar_chart ) self.register_tool( \"create_line_chart\", \"创建折线图\", { \"data\": {\"type\": \"object\", \"description\": \"图表数据\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"x_label\": {\"type\": \"string\", \"description\": \"X轴标签\", \"default\": \"\"}, \"y_label\": {\"type\": \"string\", \"description\": \"Y轴标签\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 800}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 600} }, self._create_line_chart ) self.register_tool( \"create_pie_chart\", \"创建饼图\", { \"data\": {\"type\": \"object\", \"description\": \"图表数据\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 800}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 600} }, self._create_pie_chart ) self.register_tool( \"create_scatter_plot\", \"创建散点图\", { \"data\": {\"type\": \"object\", \"description\": \"图表数据\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"x_label\": {\"type\": \"string\", \"description\": \"X轴标签\", \"default\": \"\"}, \"y_label\": {\"type\": \"string\", \"description\": \"Y轴标签\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 800}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 600} }, self._create_scatter_plot ) self.register_tool( \"create_histogram\", \"创建直方图\", { \"data\": {\"type\": \"array\", \"description\": \"数据数组\"}, \"bins\": {\"type\": \"integer\", \"description\": \"分箱数量\", \"default\": 20}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"x_label\": {\"type\": \"string\", \"description\": \"X轴标签\", \"default\": \"\"}, \"y_label\": {\"type\": \"string\", \"description\": \"Y轴标签\", \"default\": \"Frequency\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 800}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 600} }, self._create_histogram ) self.register_tool( \"create_heatmap\", \"创建热力图\", { \"data\": {\"type\": \"array\", \"description\": \"二维数据数组\"}, \"x_labels\": {\"type\": \"array\", \"description\": \"X轴标签\", \"default\": []}, \"y_labels\": {\"type\": \"array\", \"description\": \"Y轴标签\", \"default\": []}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"colormap\": {\"type\": \"string\", \"description\": \"颜色映射\", \"default\": \"viridis\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 800}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 600} }, self._create_heatmap ) async def _create_bar_chart(self, args): \"\"\"创建柱状图\"\"\" try: import matplotlib.pyplot as plt import numpy as np import tempfile import base64 from io import BytesIO except ImportError: raise Exception(\"请安装matplotlib: pip install matplotlib\") data = args[\"data\"] title = args.get(\"title\", \"\") x_label = args.get(\"x_label\", \"\") y_label = args.get(\"y_label\", \"\") width = args.get(\"width\", 800) height = args.get(\"height\", 600) # 创建图表 fig, ax = plt.subplots(figsize=(width/100, height/100)) if isinstance(data, dict): # 字典格式:{label: value} labels = list(data.keys()) values = list(data.values()) elif isinstance(data, list) and all(isinstance(item, dict) for item in data): # 列表格式:[{x: label, y: value}] labels = [item.get(\'x\', str(i)) for i, item in enumerate(data)] values = [item.get(\'y\', 0) for item in data] else: raise Exception(\"数据格式不正确,应为字典或包含x,y键的字典列表\") bars = ax.bar(labels, values) # 设置标题和标签 if title: ax.set_title(title, fontsize=16, fontweight=\'bold\') if x_label: ax.set_xlabel(x_label, fontsize=12) if y_label: ax.set_ylabel(y_label, fontsize=12) # 添加数值标签 for bar, value in zip(bars, values): height = bar.get_height() ax.text(bar.get_x() + bar.get_width()/2., height,  f\'{value}\', ha=\'center\', va=\'bottom\') # 旋转x轴标签防止重叠 plt.xticks(rotation=45, ha=\'right\') plt.tight_layout() # 保存图表 buffer = BytesIO() plt.savefig(buffer, format=\'png\', dpi=100, bbox_inches=\'tight\') buffer.seek(0) # 编码为base64 image_base64 = base64.b64encode(buffer.read()).decode() plt.close() return { \"chart_type\": \"bar_chart\", \"image_base64\": image_base64, \"image_format\": \"png\", \"data_points\": len(values), \"title\": title } async def _create_line_chart(self, args): \"\"\"创建折线图\"\"\" try: import matplotlib.pyplot as plt import numpy as np import base64 from io import BytesIO except ImportError: raise Exception(\"请安装matplotlib: pip install matplotlib\") data = args[\"data\"] title = args.get(\"title\", \"\") x_label = args.get(\"x_label\", \"\") y_label = args.get(\"y_label\", \"\") width = args.get(\"width\", 800) height = args.get(\"height\", 600) # 创建图表 fig, ax = plt.subplots(figsize=(width/100, height/100)) if isinstance(data, dict): # 单条线:{x: [x_values], y: [y_values]} if \'x\' in data and \'y\' in data: ax.plot(data[\'x\'], data[\'y\'], marker=\'o\', linewidth=2) else: # 字典格式:{x_value: y_value} x_values = list(data.keys()) y_values = list(data.values()) ax.plot(x_values, y_values, marker=\'o\', linewidth=2) elif isinstance(data, list): if all(isinstance(item, dict) for item in data): # 多条线:[{name: \"line1\", x: [x_values], y: [y_values]}] for line_data in data:  name = line_data.get(\'name\', f\'Line {len(ax.lines) + 1}\')  x_vals = line_data.get(\'x\', range(len(line_data.get(\'y\', []))))  y_vals = line_data.get(\'y\', [])  ax.plot(x_vals, y_vals, marker=\'o\', linewidth=2, label=name) if len(data) > 1:  ax.legend() else: # 简单列表:[y_values] ax.plot(range(len(data)), data, marker=\'o\', linewidth=2) else: raise Exception(\"数据格式不正确\") # 设置标题和标签 if title: ax.set_title(title, fontsize=16, fontweight=\'bold\') if x_label: ax.set_xlabel(x_label, fontsize=12) if y_label: ax.set_ylabel(y_label, fontsize=12) ax.grid(True, alpha=0.3) plt.tight_layout() # 保存图表 buffer = BytesIO() plt.savefig(buffer, format=\'png\', dpi=100, bbox_inches=\'tight\') buffer.seek(0) # 编码为base64 image_base64 = base64.b64encode(buffer.read()).decode() plt.close() return { \"chart_type\": \"line_chart\", \"image_base64\": image_base64, \"image_format\": \"png\", \"title\": title } async def _create_pie_chart(self, args): \"\"\"创建饼图\"\"\" try: import matplotlib.pyplot as plt import base64 from io import BytesIO except ImportError: raise Exception(\"请安装matplotlib: pip install matplotlib\") data = args[\"data\"] title = args.get(\"title\", \"\") width = args.get(\"width\", 800) height = args.get(\"height\", 600) # 创建图表 fig, ax = plt.subplots(figsize=(width/100, height/100)) if isinstance(data, dict): labels = list(data.keys()) values = list(data.values()) elif isinstance(data, list) and all(isinstance(item, dict) for item in data): labels = [item.get(\'label\', f\'Item {i}\') for i, item in enumerate(data)] values = [item.get(\'value\', 0) for item in data] else: raise Exception(\"数据格式不正确,应为字典或包含label,value键的字典列表\") # 创建饼图 wedges, texts, autotexts = ax.pie(values, labels=labels, autopct=\'%1.1f%%\',  startangle=90, explode=[0.05] * len(values)) # 设置标题 if title: ax.set_title(title, fontsize=16, fontweight=\'bold\') # 设置文本样式 for autotext in autotexts: autotext.set_color(\'white\') autotext.set_fontweight(\'bold\') plt.tight_layout() # 保存图表 buffer = BytesIO() plt.savefig(buffer, format=\'png\', dpi=100, bbox_inches=\'tight\') buffer.seek(0) # 编码为base64 image_base64 = base64.b64encode(buffer.read()).decode() plt.close() return { \"chart_type\": \"pie_chart\", \"image_base64\": image_base64, \"image_format\": \"png\", \"data_points\": len(values), \"title\": title } async def _create_scatter_plot(self, args): \"\"\"创建散点图\"\"\" try: import matplotlib.pyplot as plt import numpy as np import base64 from io import BytesIO except ImportError: raise Exception(\"请安装matplotlib: pip install matplotlib\") data = args[\"data\"] title = args.get(\"title\", \"\") x_label = args.get(\"x_label\", \"\") y_label = args.get(\"y_label\", \"\") width = args.get(\"width\", 800) height = args.get(\"height\", 600) # 创建图表 fig, ax = plt.subplots(figsize=(width/100, height/100)) if isinstance(data, dict) and \'x\' in data and \'y\' in data: x_values = data[\'x\'] y_values = data[\'y\'] sizes = data.get(\'size\', [50] * len(x_values)) colors = data.get(\'color\', [\'blue\'] * len(x_values)) elif isinstance(data, list) and all(isinstance(item, dict) for item in data): x_values = [item.get(\'x\', 0) for item in data] y_values = [item.get(\'y\', 0) for item in data] sizes = [item.get(\'size\', 50) for item in data] colors = [item.get(\'color\', \'blue\') for item in data] else: raise Exception(\"数据格式不正确,应包含x,y数组或包含x,y键的字典列表\") # 创建散点图 scatter = ax.scatter(x_values, y_values, s=sizes, c=colors, alpha=0.7, edgecolors=\'black\', linewidth=0.5) # 设置标题和标签 if title: ax.set_title(title, fontsize=16, fontweight=\'bold\') if x_label: ax.set_xlabel(x_label, fontsize=12) if y_label: ax.set_ylabel(y_label, fontsize=12) ax.grid(True, alpha=0.3) plt.tight_layout() # 保存图表 buffer = BytesIO() plt.savefig(buffer, format=\'png\', dpi=100, bbox_inches=\'tight\') buffer.seek(0) # 编码为base64 image_base64 = base64.b64encode(buffer.read()).decode() plt.close() return { \"chart_type\": \"scatter_plot\", \"image_base64\": image_base64, \"image_format\": \"png\", \"data_points\": len(x_values), \"title\": title } async def _create_histogram(self, args): \"\"\"创建直方图\"\"\" try: import matplotlib.pyplot as plt import numpy as np import base64 from io import BytesIO except ImportError: raise Exception(\"请安装matplotlib: pip install matplotlib\") data = args[\"data\"] bins = args.get(\"bins\", 20) title = args.get(\"title\", \"\") x_label = args.get(\"x_label\", \"\") y_label = args.get(\"y_label\", \"Frequency\") width = args.get(\"width\", 800) height = args.get(\"height\", 600) if not isinstance(data, list): raise Exception(\"数据应为数值列表\") # 创建图表 fig, ax = plt.subplots(figsize=(width/100, height/100)) # 创建直方图 n, bins_edges, patches = ax.hist(data, bins=bins, alpha=0.7, color=\'skyblue\', edgecolor=\'black\') # 设置标题和标签 if title: ax.set_title(title, fontsize=16, fontweight=\'bold\') if x_label: ax.set_xlabel(x_label, fontsize=12) if y_label: ax.set_ylabel(y_label, fontsize=12) # 添加统计信息 mean_val = np.mean(data) std_val = np.std(data) ax.axvline(mean_val, color=\'red\', linestyle=\'--\', linewidth=2, label=f\'Mean: {mean_val:.2f}\') ax.legend() ax.grid(True, alpha=0.3) plt.tight_layout() # 保存图表 buffer = BytesIO() plt.savefig(buffer, format=\'png\', dpi=100, bbox_inches=\'tight\') buffer.seek(0) # 编码为base64 image_base64 = base64.b64encode(buffer.read()).decode() plt.close() return { \"chart_type\": \"histogram\", \"image_base64\": image_base64, \"image_format\": \"png\", \"data_points\": len(data), \"bins\": bins, \"mean\": float(mean_val), \"std\": float(std_val), \"title\": title } async def _create_heatmap(self, args): \"\"\"创建热力图\"\"\" try: import matplotlib.pyplot as plt import numpy as np import base64 from io import BytesIO except ImportError: raise Exception(\"请安装matplotlib: pip install matplotlib\") data = args[\"data\"] x_labels = args.get(\"x_labels\", []) y_labels = args.get(\"y_labels\", []) title = args.get(\"title\", \"\") colormap = args.get(\"colormap\", \"viridis\") width = args.get(\"width\", 800) height = args.get(\"height\", 600) if not isinstance(data, list) or not all(isinstance(row, list) for row in data): raise Exception(\"数据应为二维数组\") data_array = np.array(data) # 创建图表 fig, ax = plt.subplots(figsize=(width/100, height/100)) # 创建热力图 im = ax.imshow(data_array, cmap=colormap, aspect=\'auto\') # 设置标签 if x_labels: ax.set_xticks(range(len(x_labels))) ax.set_xticklabels(x_labels, rotation=45, ha=\'right\') if y_labels: ax.set_yticks(range(len(y_labels))) ax.set_yticklabels(y_labels) # 设置标题 if title: ax.set_title(title, fontsize=16, fontweight=\'bold\') # 添加颜色条 cbar = plt.colorbar(im) cbar.set_label(\'Value\', rotation=270, labelpad=15) # 添加数值标注 for i in range(data_array.shape[0]): for j in range(data_array.shape[1]): text = ax.text(j, i, f\'{data_array[i, j]:.2f}\', ha=\"center\", va=\"center\", color=\"white\" if data_array[i, j] < np.mean(data_array) else \"black\") plt.tight_layout() # 保存图表 buffer = BytesIO() plt.savefig(buffer, format=\'png\', dpi=100, bbox_inches=\'tight\') buffer.seek(0) # 编码为base64 image_base64 = base64.b64encode(buffer.read()).decode() plt.close() return { \"chart_type\": \"heatmap\", \"image_base64\": image_base64, \"image_format\": \"png\", \"data_shape\": data_array.shape, \"colormap\": colormap, \"title\": title }
12. QuickChart MCP Server

GitHub: https://github.com/GongRzhe/Quickchart-MCP-Server

根据 docs.cloudbase.net 的介绍,这是基于QuickChart.io服务的图表生成MCP服务器,支持多种图表类型,包括条形图、折线图、饼图、甜甜圈图、雷达图、极坐标图、散点图、气泡图、径向仪表、速度计等。

class QuickChartMCPServer(MCPServer): def __init__(self, quickchart_url=\"https://quickchart.io\"): super().__init__(\"QuickChart MCP Server\") self.quickchart_url = quickchart_url self._register_quickchart_tools() def _register_quickchart_tools(self): \"\"\"注册QuickChart工具\"\"\" self.register_tool( \"create_chart\", \"创建图表\", { \"type\": {\"type\": \"string\", \"description\": \"图表类型(bar, line, pie, doughnut, radar, polarArea, scatter, bubble, radialGauge, speedometer)\"}, \"data\": {\"type\": \"object\", \"description\": \"图表数据\"}, \"options\": {\"type\": \"object\", \"description\": \"图表选项\", \"default\": {}}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300}, \"background_color\": {\"type\": \"string\", \"description\": \"背景颜色\", \"default\": \"white\"}, \"device_pixel_ratio\": {\"type\": \"number\", \"description\": \"设备像素比\", \"default\": 1.0} }, self._create_chart ) self.register_tool( \"create_bar_chart\", \"创建柱状图\", { \"labels\": {\"type\": \"array\", \"description\": \"标签数组\"}, \"datasets\": {\"type\": \"array\", \"description\": \"数据集数组\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"x_axis_label\": {\"type\": \"string\", \"description\": \"X轴标签\", \"default\": \"\"}, \"y_axis_label\": {\"type\": \"string\", \"description\": \"Y轴标签\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300} }, self._create_bar_chart ) self.register_tool( \"create_line_chart\", \"创建折线图\", { \"labels\": {\"type\": \"array\", \"description\": \"标签数组\"}, \"datasets\": {\"type\": \"array\", \"description\": \"数据集数组\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"x_axis_label\": {\"type\": \"string\", \"description\": \"X轴标签\", \"default\": \"\"}, \"y_axis_label\": {\"type\": \"string\", \"description\": \"Y轴标签\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300} }, self._create_line_chart ) self.register_tool( \"create_pie_chart\", \"创建饼图\", { \"labels\": {\"type\": \"array\", \"description\": \"标签数组\"}, \"data\": {\"type\": \"array\", \"description\": \"数据数组\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"colors\": {\"type\": \"array\", \"description\": \"颜色数组\", \"default\": []}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300} }, self._create_pie_chart ) self.register_tool( \"create_doughnut_chart\", \"创建甜甜圈图\", { \"labels\": {\"type\": \"array\", \"description\": \"标签数组\"}, \"data\": {\"type\": \"array\", \"description\": \"数据数组\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"colors\": {\"type\": \"array\", \"description\": \"颜色数组\", \"default\": []}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300} }, self._create_doughnut_chart ) self.register_tool( \"create_radar_chart\", \"创建雷达图\", { \"labels\": {\"type\": \"array\", \"description\": \"标签数组\"}, \"datasets\": {\"type\": \"array\", \"description\": \"数据集数组\"}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300} }, self._create_radar_chart ) self.register_tool( \"create_speedometer\", \"创建速度计\", { \"value\": {\"type\": \"number\", \"description\": \"当前值\"}, \"min\": {\"type\": \"number\", \"description\": \"最小值\", \"default\": 0}, \"max\": {\"type\": \"number\", \"description\": \"最大值\", \"default\": 100}, \"title\": {\"type\": \"string\", \"description\": \"图表标题\", \"default\": \"\"}, \"unit\": {\"type\": \"string\", \"description\": \"单位\", \"default\": \"\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300} }, self._create_speedometer ) self.register_tool( \"get_chart_url\", \"获取图表URL\", { \"chart_config\": {\"type\": \"object\", \"description\": \"图表配置\"}, \"width\": {\"type\": \"integer\", \"description\": \"图表宽度\", \"default\": 500}, \"height\": {\"type\": \"integer\", \"description\": \"图表高度\", \"default\": 300} }, self._get_chart_url ) async def _create_chart(self, args): \"\"\"创建图表\"\"\" chart_type = args[\"type\"] data = args[\"data\"] options = args.get(\"options\", {}) width = args.get(\"width\", 500) height = args.get(\"height\", 300) background_color = args.get(\"background_color\", \"white\") device_pixel_ratio = args.get(\"device_pixel_ratio\", 1.0) # 构建Chart.js配置 chart_config = { \"type\": chart_type, \"data\": data, \"options\": options } # 发送请求到QuickChart url = f\"{self.quickchart_url}/chart\" params = { \"c\": json.dumps(chart_config), \"w\": width, \"h\": height, \"bkg\": background_color, \"devicePixelRatio\": device_pixel_ratio } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200:  image_data = await response.read()  image_base64 = base64.b64encode(image_data).decode()  return { \"chart_type\": chart_type, \"image_base64\": image_base64, \"image_format\": \"png\", \"chart_url\": str(response.url), \"width\": width, \"height\": height  } else:  error = await response.text()  raise Exception(f\"图表生成失败: {error}\") async def _create_bar_chart(self, args): \"\"\"创建柱状图\"\"\" labels = args[\"labels\"] datasets = args[\"datasets\"] title = args.get(\"title\", \"\") x_axis_label = args.get(\"x_axis_label\", \"\") y_axis_label = args.get(\"y_axis_label\", \"\") width = args.get(\"width\", 500) height = args.get(\"height\", 300) # 构建数据结构 chart_data = { \"labels\": labels, \"datasets\": [] } # 处理数据集 default_colors = [ \'rgba(255, 99, 132, 0.8)\', \'rgba(54, 162, 235, 0.8)\', \'rgba(255, 205, 86, 0.8)\', \'rgba(75, 192, 192, 0.8)\', \'rgba(153, 102, 255, 0.8)\', \'rgba(255, 159, 64, 0.8)\' ] for i, dataset in enumerate(datasets): if isinstance(dataset, dict): chart_dataset = {  \"label\": dataset.get(\"label\", f\"Dataset {i+1}\"),  \"data\": dataset.get(\"data\", []),  \"backgroundColor\": dataset.get(\"backgroundColor\", default_colors[i % len(default_colors)]),  \"borderColor\": dataset.get(\"borderColor\", default_colors[i % len(default_colors)].replace(\'0.8\', \'1\')),  \"borderWidth\": dataset.get(\"borderWidth\", 1) } else: chart_dataset = {  \"label\": f\"Dataset {i+1}\",  \"data\": dataset,  \"backgroundColor\": default_colors[i % len(default_colors)],  \"borderColor\": default_colors[i % len(default_colors)].replace(\'0.8\', \'1\'),  \"borderWidth\": 1 } chart_data[\"datasets\"].append(chart_dataset) # 构建选项 options = { \"responsive\": True, \"plugins\": { \"title\": {  \"display\": bool(title),  \"text\": title }, \"legend\": {  \"display\": len(datasets) > 1 } }, \"scales\": { \"x\": {  \"display\": True,  \"title\": { \"display\": bool(x_axis_label), \"text\": x_axis_label  } }, \"y\": {  \"display\": True,  \"title\": { \"display\": bool(y_axis_label), \"text\": y_axis_label  } } } } return await self._create_chart({ \"type\": \"bar\", \"data\": chart_data, \"options\": options, \"width\": width, \"height\": height }) async def _create_line_chart(self, args): \"\"\"创建折线图\"\"\" labels = args[\"labels\"] datasets = args[\"datasets\"] title = args.get(\"title\", \"\") x_axis_label = args.get(\"x_axis_label\", \"\") y_axis_label = args.get(\"y_axis_label\", \"\") width = args.get(\"width\", 500) height = args.get(\"height\", 300) # 构建数据结构 chart_data = { \"labels\": labels, \"datasets\": [] } # 处理数据集 default_colors = [ \'rgb(255, 99, 132)\', \'rgb(54, 162, 235)\', \'rgb(255, 205, 86)\', \'rgb(75, 192, 192)\', \'rgb(153, 102, 255)\', \'rgb(255, 159, 64)\' ] for i, dataset in enumerate(datasets): if isinstance(dataset, dict): chart_dataset = {  \"label\": dataset.get(\"label\", f\"Dataset {i+1}\"),  \"data\": dataset.get(\"data\", []),  \"borderColor\": dataset.get(\"borderColor\", default_colors[i % len(default_colors)]),  \"backgroundColor\": dataset.get(\"backgroundColor\", default_colors[i % len(default_colors)] + \'20\'),  \"borderWidth\": dataset.get(\"borderWidth\", 2),  \"fill\": dataset.get(\"fill\", False),  \"tension\": dataset.get(\"tension\", 0.1) } else: chart_dataset = {  \"label\": f\"Dataset {i+1}\",  \"data\": dataset,  \"borderColor\": default_colors[i % len(default_colors)],  \"backgroundColor\": default_colors[i % len(default_colors)] + \'20\',  \"borderWidth\": 2,  \"fill\": False,  \"tension\": 0.1 } chart_data[\"datasets\"].append(chart_dataset) # 构建选项 options = { \"responsive\": True, \"plugins\": { \"title\": {  \"display\": bool(title),  \"text\": title }, \"legend\": {  \"display\": len(datasets) > 1 } }, \"scales\": { \"x\": {  \"display\": True,  \"title\": { \"display\": bool(x_axis_label), \"text\": x_axis_label  } }, \"y\": {  \"display\": True,  \"title\": { \"display\": bool(y_axis_label), \"text\": y_axis_label  } } } } return await self._create_chart({ \"type\": \"line\", \"data\": chart_data, \"options\": options, \"width\": width, \"height\": height }) async def _create_pie_chart(self, args): \"\"\"创建饼图\"\"\" labels = args[\"labels\"] data = args[\"data\"] title = args.get(\"title\", \"\") colors = args.get(\"colors\", []) width = args.get(\"width\", 500) height = args.get(\"height\", 300) # 默认颜色 if not colors: colors = [ \'rgb(255, 99, 132)\', \'rgb(54, 162, 235)\', \'rgb(255, 205, 86)\', \'rgb(75, 192, 192)\', \'rgb(153, 102, 255)\', \'rgb(255, 159, 64)\', \'rgb(201, 203, 207)\', \'rgb(255, 99, 255)\', \'rgb(99, 255, 132)\', \'rgb(132, 99, 255)\' ] # 确保颜色数量足够 while len(colors) < len(data): colors.extend(colors) # 构建数据结构 chart_data = { \"labels\": labels, \"datasets\": [{ \"data\": data, \"backgroundColor\": colors[:len(data)], \"borderColor\": [\'white\'] * len(data), \"borderWidth\": 2 }] } # 构建选项 options = { \"responsive\": True, \"plugins\": { \"title\": {  \"display\": bool(title),  \"text\": title }, \"legend\": {  \"display\": True,  \"position\": \"bottom\" } } } return await self._create_chart({ \"type\": \"pie\", \"data\": chart_data, \"options\": options, \"width\": width, \"height\": height }) async def _create_doughnut_chart(self, args): \"\"\"创建甜甜圈图\"\"\" # 甜甜圈图与饼图类似,只是类型不同 result = await self._create_pie_chart(args) result[\"chart_type\"] = \"doughnut\" # 重新生成甜甜圈图 labels = args[\"labels\"] data = args[\"data\"] title = args.get(\"title\", \"\") colors = args.get(\"colors\", []) width = args.get(\"width\", 500) height = args.get(\"height\", 300) if not colors: colors = [ \'rgb(255, 99, 132)\', \'rgb(54, 162, 235)\', \'rgb(255, 205, 86)\', \'rgb(75, 192, 192)\', \'rgb(153, 102, 255)\', \'rgb(255, 159, 64)\' ] while len(colors) < len(data): colors.extend(colors) chart_data = { \"labels\": labels, \"datasets\": [{ \"data\": data, \"backgroundColor\": colors[:len(data)], \"borderColor\": [\'white\'] * len(data), \"borderWidth\": 2 }] } options = { \"responsive\": True, \"plugins\": { \"title\": {  \"display\": bool(title),  \"text\": title }, \"legend\": {  \"display\": True,  \"position\": \"bottom\" } } } return await self._create_chart({ \"type\": \"doughnut\", \"data\": chart_data, \"options\": options, \"width\": width, \"height\": height }) async def _create_radar_chart(self, args): \"\"\"创建雷达图\"\"\" labels = args[\"labels\"] datasets = args[\"datasets\"] title = args.get(\"title\", \"\") width = args.get(\"width\", 500) height = args.get(\"height\", 300) # 构建数据结构 chart_data = { \"labels\": labels, \"datasets\": [] } # 处理数据集 default_colors = [ \'rgba(255, 99, 132, 0.6)\', \'rgba(54, 162, 235, 0.6)\', \'rgba(255, 205, 86, 0.6)\', \'rgba(75, 192, 192, 0.6)\', \'rgba(153, 102, 255, 0.6)\', \'rgba(255, 159, 64, 0.6)\' ] for i, dataset in enumerate(datasets): if isinstance(dataset, dict): chart_dataset = {  \"label\": dataset.get(\"label\", f\"Dataset {i+1}\"),  \"data\": dataset.get(\"data\", []),  \"backgroundColor\": dataset.get(\"backgroundColor\", default_colors[i % len(default_colors)]),  \"borderColor\": dataset.get(\"borderColor\", default_colors[i % len(default_colors)].replace(\'0.6\', \'1\')),  \"borderWidth\": dataset.get(\"borderWidth\", 2),  \"pointRadius\": dataset.get(\"pointRadius\", 3),  \"pointHoverRadius\": dataset.get(\"pointHoverRadius\", 5) } else: chart_dataset = {  \"label\": f\"Dataset {i+1}\",  \"data\": dataset,  \"backgroundColor\": default_colors[i % len(default_colors)],  \"borderColor\": default_colors[i % len(default_colors)].replace(\'0.6\', \'1\'),  \"borderWidth\": 2,  \"pointRadius\": 3,  \"pointHoverRadius\": 5 } chart_data[\"datasets\"].append(chart_dataset) # 构建选项 options = { \"responsive\": True, \"plugins\": { \"title\": {  \"display\": bool(title),  \"text\": title }, \"legend\": {  \"display\": len(datasets) > 1 } }, \"scales\": { \"r\": {  \"beginAtZero\": True,  \"grid\": { \"circular\": True  } } } } return await self._create_chart({ \"type\": \"radar\", \"data\": chart_data, \"options\": options, \"width\": width, \"height\": height }) async def _create_speedometer(self, args): \"\"\"创建速度计\"\"\" value = args[\"value\"] min_val = args.get(\"min\", 0) max_val = args.get(\"max\", 100) title = args.get(\"title\", \"\") unit = args.get(\"unit\", \"\") width = args.get(\"width\", 500) height = args.get(\"height\", 300) # 计算百分比 percentage = (value - min_val) / (max_val - min_val) * 100 # 构建速度计配置(使用radialGauge插件) chart_config = { \"type\": \"radialGauge\", \"data\": { \"datasets\": [{  \"data\": [value],  \"backgroundColor\": self._get_speedometer_color(percentage),  \"borderWidth\": 0 }] }, \"options\": { \"responsive\": True, \"plugins\": {  \"title\": { \"display\": bool(title), \"text\": title  } }, \"trackColor\": \'#e0e0e0\', \"centerPercentage\": 80, \"centerArea\": {  \"text\": f\"{value}{unit}\",  \"fontStyle\": \"Arial\",  \"fontSize\": 20,  \"fontColor\": \"#000\" }, \"domain\": [min_val, max_val] } } # 发送请求到QuickChart url = f\"{self.quickchart_url}/chart\" params = { \"c\": json.dumps(chart_config), \"w\": width, \"h\": height } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200:  image_data = await response.read()  image_base64 = base64.b64encode(image_data).decode()  return { \"chart_type\": \"speedometer\", \"image_base64\": image_base64, \"image_format\": \"png\", \"chart_url\": str(response.url), \"width\": width, \"height\": height, \"value\": value, \"min\": min_val, \"max\": max_val, \"percentage\": round(percentage, 2)  } else:  error = await response.text()  raise Exception(f\"速度计生成失败: {error}\") def _get_speedometer_color(self, percentage): \"\"\"根据百分比获取速度计颜色\"\"\" if percentage < 30: return \'#4CAF50\' # 绿色 elif percentage < 70: return \'#FF9800\' # 橙色 else: return \'#F44336\' # 红色 async def _get_chart_url(self, args): \"\"\"获取图表URL\"\"\" chart_config = args[\"chart_config\"] width = args.get(\"width\", 500) height = args.get(\"height\", 300) # 构建URL url = f\"{self.quickchart_url}/chart\" params = { \"c\": json.dumps(chart_config), \"w\": width, \"h\": height } # 构建完整URL query_string = \"&\".join([f\"{k}={v}\" for k, v in params.items()]) full_url = f\"{url}?{query_string}\" return { \"chart_url\": full_url, \"width\": width, \"height\": height, \"chart_config\": chart_config }

通信与协作类

13. Gmail MCP

GitHub: https://github.com/GongRzhe/Gmail-MCP-Server

具有自动身份验证支持的Gmail集成。

class GmailMCPServer(MCPServer): def __init__(self, credentials_file): super().__init__(\"Gmail MCP Server\") self.credentials_file = credentials_file self.service = None self._register_gmail_tools() def _register_gmail_tools(self): \"\"\"注册Gmail工具\"\"\" self.register_tool( \"send_email\", \"发送邮件\", { \"to\": {\"type\": \"string\", \"description\": \"收件人邮箱\"}, \"subject\": {\"type\": \"string\", \"description\": \"邮件主题\"}, \"body\": {\"type\": \"string\", \"description\": \"邮件内容\"}, \"cc\": {\"type\": \"string\", \"description\": \"抄送\", \"default\": \"\"}, \"bcc\": {\"type\": \"string\", \"description\": \"密送\", \"default\": \"\"} }, self._send_email ) self.register_tool( \"list_emails\", \"列出邮件\", { \"query\": {\"type\": \"string\", \"description\": \"搜索查询\", \"default\": \"\"}, \"max_results\": {\"type\": \"integer\", \"description\": \"最大结果数\", \"default\": 10}, \"label_ids\": {\"type\": \"array\", \"description\": \"标签ID\", \"default\": [\"INBOX\"]} }, self._list_emails ) self.register_tool( \"read_email\", \"读取邮件\", { \"message_id\": {\"type\": \"string\", \"description\": \"邮件ID\"} }, self._read_email ) self.register_tool( \"search_emails\", \"搜索邮件\", { \"query\": {\"type\": \"string\", \"description\": \"搜索查询\"}, \"max_results\": {\"type\": \"integer\", \"description\": \"最大结果数\", \"default\": 10} }, self._search_emails ) self.register_tool( \"mark_as_read\", \"标记为已读\", { \"message_id\": {\"type\": \"string\", \"description\": \"邮件ID\"} }, self._mark_as_read ) async def _get_service(self): \"\"\"获取Gmail服务\"\"\" if self.service: return self.service try: from googleapiclient.discovery import build from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request import pickle import os SCOPES = [\'https://www.googleapis.com/auth/gmail.modify\'] creds = None token_file = \'token.pickle\' if os.path.exists(token_file): with open(token_file, \'rb\') as token:  creds = pickle.load(token) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token:  creds.refresh(Request()) else:  flow = InstalledAppFlow.from_client_secrets_file( self.credentials_file, SCOPES)  creds = flow.run_local_server(port=0) with open(token_file, \'wb\') as token:  pickle.dump(creds, token) self.service = build(\'gmail\', \'v1\', credentials=creds) return self.service  except ImportError: raise Exception(\"请安装Google API库: pip install google-api-python-client google-auth\") async def _send_email(self, args): \"\"\"发送邮件\"\"\" service = await self._get_service() import base64 from email.mime.text import MIMEText message = MIMEText(args[\"body\"]) message[\'to\'] = args[\"to\"] message[\'subject\'] = args[\"subject\"] if args.get(\"cc\"): message[\'cc\'] = args[\"cc\"] if args.get(\"bcc\"): message[\'bcc\'] = args[\"bcc\"] raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode() try: result = service.users().messages().send( userId=\'me\', body={\'raw\': raw_message} ).execute() return f\"邮件发送成功,ID: {result[\'id\']}\" except Exception as e: raise Exception(f\"发送邮件失败: {str(e)}\") async def _list_emails(self, args): \"\"\"列出邮件\"\"\" service = await self._get_service() query = args.get(\"query\", \"\") max_results = args.get(\"max_results\", 10) label_ids = args.get(\"label_ids\", [\"INBOX\"]) try: results = service.users().messages().list( userId=\'me\', q=query, maxResults=max_results, labelIds=label_ids ).execute() messages = results.get(\'messages\', []) email_list = [] for message in messages: msg = service.users().messages().get(  userId=\'me\',  id=message[\'id\'] ).execute() headers = msg[\'payload\'].get(\'headers\', []) subject = next((h[\'value\'] for h in headers if h[\'name\'] == \'Subject\'), \'No Subject\') sender = next((h[\'value\'] for h in headers if h[\'name\'] == \'From\'), \'Unknown Sender\') date = next((h[\'value\'] for h in headers if h[\'name\'] == \'Date\'), \'Unknown Date\') email_list.append({  \'id\': message[\'id\'],  \'subject\': subject,  \'from\': sender,  \'date\': date,  \'snippet\': msg.get(\'snippet\', \'\') }) return email_list except Exception as e: raise Exception(f\"列出邮件失败: {str(e)}\") async def _read_email(self, args): \"\"\"读取邮件\"\"\" service = await self._get_service() message_id = args[\"message_id\"] try: message = service.users().messages().get( userId=\'me\', id=message_id ).execute() headers = message[\'payload\'].get(\'headers\', []) subject = next((h[\'value\'] for h in headers if h[\'name\'] == \'Subject\'), \'No Subject\') sender = next((h[\'value\'] for h in headers if h[\'name\'] == \'From\'), \'Unknown Sender\') date = next((h[\'value\'] for h in headers if h[\'name\'] == \'Date\'), \'Unknown Date\') # 获取邮件正文 body = self._get_message_body(message[\'payload\']) return { \'id\': message_id, \'subject\': subject, \'from\': sender, \'date\': date, \'body\': body, \'snippet\': message.get(\'snippet\', \'\') } except Exception as e: raise Exception(f\"读取邮件失败: {str(e)}\") def _get_message_body(self, payload): \"\"\"提取邮件正文\"\"\" import base64 body = \"\" if \'parts\' in payload: for part in payload[\'parts\']: if part[\'mimeType\'] == \'text/plain\':  data = part[\'body\'][\'data\']  body = base64.urlsafe_b64decode(data).decode(\'utf-8\')  break else: if payload[\'mimeType\'] == \'text/plain\': data = payload[\'body\'][\'data\'] body = base64.urlsafe_b64decode(data).decode(\'utf-8\') return body async def _search_emails(self, args): \"\"\"搜索邮件\"\"\" return await self._list_emails({ \"query\": args[\"query\"], \"max_results\": args.get(\"max_results\", 10) }) async def _mark_as_read(self, args): \"\"\"标记为已读\"\"\" service = await self._get_service() message_id = args[\"message_id\"] try: service.users().messages().modify( userId=\'me\', id=message_id, body={\'removeLabelIds\': [\'UNREAD\']} ).execute() return f\"邮件 {message_id} 已标记为已读\" except Exception as e: raise Exception(f\"标记失败: {str(e)}\")
14. WhatsApp MCP

GitHub: https://github.com/lharries/whatsapp-mcp

搜索、发送和阅读WhatsApp媒体。

class WhatsAppMCPServer(MCPServer): def __init__(self, whatsapp_token, phone_number_id): super().__init__(\"WhatsApp MCP Server\") self.whatsapp_token = whatsapp_token self.phone_number_id = phone_number_id self._register_whatsapp_tools() def _register_whatsapp_tools(self): \"\"\"注册WhatsApp工具\"\"\" self.register_tool( \"send_message\", \"发送消息\", { \"to\": {\"type\": \"string\", \"description\": \"接收者电话号码\"}, \"message\": {\"type\": \"string\", \"description\": \"消息内容\"} }, self._send_message ) self.register_tool( \"send_template\", \"发送模板消息\", { \"to\": {\"type\": \"string\", \"description\": \"接收者电话号码\"}, \"template_name\": {\"type\": \"string\", \"description\": \"模板名称\"}, \"language\": {\"type\": \"string\", \"description\": \"语言代码\", \"default\": \"en\"}, \"parameters\": {\"type\": \"array\", \"description\": \"模板参数\", \"default\": []} }, self._send_template ) self.register_tool( \"send_media\", \"发送媒体\", { \"to\": {\"type\": \"string\", \"description\": \"接收者电话号码\"}, \"media_type\": {\"type\": \"string\", \"description\": \"媒体类型(image, document, audio, video)\"}, \"media_url\": {\"type\": \"string\", \"description\": \"媒体URL\"}, \"caption\": {\"type\": \"string\", \"description\": \"媒体标题\", \"default\": \"\"} }, self._send_media ) self.register_tool( \"get_media\", \"获取媒体\", { \"media_id\": {\"type\": \"string\", \"description\": \"媒体ID\"} }, self._get_media ) self.register_tool( \"mark_as_read\", \"标记为已读\", { \"message_id\": {\"type\": \"string\", \"description\": \"消息ID\"} }, self._mark_as_read ) async def _send_message(self, args): \"\"\"发送消息\"\"\" url = f\"https://graph.facebook.com/v17.0/{self.phone_number_id}/messages\" headers = { \"Authorization\": f\"Bearer {self.whatsapp_token}\", \"Content-Type\": \"application/json\" } data = { \"messaging_product\": \"whatsapp\", \"to\": args[\"to\"], \"type\": \"text\", \"text\": { \"body\": args[\"message\"] } } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 200:  result = await response.json()  return f\"消息发送成功,ID: {result.get(\'messages\', [{}])[0].get(\'id\', \'unknown\')}\" else:  error = await response.text()  raise Exception(f\"发送消息失败: {error}\") async def _send_template(self, args): \"\"\"发送模板消息\"\"\" url = f\"https://graph.facebook.com/v17.0/{self.phone_number_id}/messages\" headers = { \"Authorization\": f\"Bearer {self.whatsapp_token}\", \"Content-Type\": \"application/json\" } template_data = { \"name\": args[\"template_name\"], \"language\": { \"code\": args.get(\"language\", \"en\") } } if args.get(\"parameters\"): template_data[\"components\"] = [ {  \"type\": \"body\",  \"parameters\": [ {\"type\": \"text\", \"text\": param} for param in args[\"parameters\"]  ] } ] data = { \"messaging_product\": \"whatsapp\", \"to\": args[\"to\"], \"type\": \"template\", \"template\": template_data } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 200:  result = await response.json()  return f\"模板消息发送成功,ID: {result.get(\'messages\', [{}])[0].get(\'id\', \'unknown\')}\" else:  error = await response.text()  raise Exception(f\"发送模板消息失败: {error}\") async def _send_media(self, args): \"\"\"发送媒体\"\"\" url = f\"https://graph.facebook.com/v17.0/{self.phone_number_id}/messages\" headers = { \"Authorization\": f\"Bearer {self.whatsapp_token}\", \"Content-Type\": \"application/json\" } media_type = args[\"media_type\"] media_data = { \"link\": args[\"media_url\"] } if args.get(\"caption\") and media_type in [\"image\", \"video\", \"document\"]: media_data[\"caption\"] = args[\"caption\"] data = { \"messaging_product\": \"whatsapp\", \"to\": args[\"to\"], \"type\": media_type, media_type: media_data } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 200:  result = await response.json()  return f\"媒体发送成功,ID: {result.get(\'messages\', [{}])[0].get(\'id\', \'unknown\')}\" else:  error = await response.text()  raise Exception(f\"发送媒体失败: {error}\") async def _get_media(self, args): \"\"\"获取媒体\"\"\" media_id = args[\"media_id\"] url = f\"https://graph.facebook.com/v17.0/{media_id}\" headers = { \"Authorization\": f\"Bearer {self.whatsapp_token}\" } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200:  media_info = await response.json()  media_url = media_info.get(\"url\")  # 下载实际媒体文件  if media_url: async with session.get(media_url, headers=headers) as media_response: if media_response.status == 200: # 这里可以保存文件或返回媒体信息 return {  \"media_id\": media_id,  \"url\": media_url,  \"mime_type\": media_info.get(\"mime_type\"),  \"file_size\": media_info.get(\"file_size\"),  \"sha256\": media_info.get(\"sha256\") } else: raise Exception(\"下载媒体失败\")  else: raise Exception(\"媒体URL不可用\") else:  error = await response.text()  raise Exception(f\"获取媒体信息失败: {error}\") async def _mark_as_read(self, args): \"\"\"标记为已读\"\"\" url = f\"https://graph.facebook.com/v17.0/{self.phone_number_id}/messages\" headers = { \"Authorization\": f\"Bearer {self.whatsapp_token}\", \"Content-Type\": \"application/json\" } data = { \"messaging_product\": \"whatsapp\", \"status\": \"read\", \"message_id\": args[\"message_id\"] } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=data) as response: if response.status == 200:  return f\"消息 {args[\'message_id\']} 已标记为已读\" else:  error = await response.text()  raise Exception(f\"标记已读失败: {error}\")

数据处理与搜索类

15. Brave Search MCP

GitHub: https://github.com/modelcontextprotocol/servers/tree/main/src/brave-search

使用Brave搜索API进行网络搜索。

class BraveSearchMCPServer(MCPServer): def __init__(self, brave_api_key): super().__init__(\"Brave Search MCP Server\") self.brave_api_key = brave_api_key self._register_brave_tools() def _register_brave_tools(self): \"\"\"注册Brave搜索工具\"\"\" self.register_tool( \"web_search\", \"网络搜索\", { \"query\": {\"type\": \"string\", \"description\": \"搜索查询\"}, \"count\": {\"type\": \"integer\", \"description\": \"结果数量\", \"default\": 10}, \"country\": {\"type\": \"string\", \"description\": \"国家代码\", \"default\": \"US\"}, \"search_lang\": {\"type\": \"string\", \"description\": \"搜索语言\", \"default\": \"en\"}, \"safesearch\": {\"type\": \"string\", \"description\": \"安全搜索\", \"default\": \"moderate\"} }, self._web_search ) self.register_tool( \"news_search\", \"新闻搜索\", { \"query\": {\"type\": \"string\", \"description\": \"搜索查询\"}, \"count\": {\"type\": \"integer\", \"description\": \"结果数量\", \"default\": 10}, \"country\": {\"type\": \"string\", \"description\": \"国家代码\", \"default\": \"US\"}, \"search_lang\": {\"type\": \"string\", \"description\": \"搜索语言\", \"default\": \"en\"} }, self._news_search ) self.register_tool( \"image_search\", \"图像搜索\", { \"query\": {\"type\": \"string\", \"description\": \"搜索查询\"}, \"count\": {\"type\": \"integer\", \"description\": \"结果数量\", \"default\": 10}, \"country\": {\"type\": \"string\", \"description\": \"国家代码\", \"default\": \"US\"}, \"search_lang\": {\"type\": \"string\", \"description\": \"搜索语言\", \"default\": \"en\"}, \"safesearch\": {\"type\": \"string\", \"description\": \"安全搜索\", \"default\": \"moderate\"} }, self._image_search ) self.register_tool( \"video_search\", \"视频搜索\", { \"query\": {\"type\": \"string\", \"description\": \"搜索查询\"}, \"count\": {\"type\": \"integer\", \"description\": \"结果数量\", \"default\": 10}, \"country\": {\"type\": \"string\", \"description\": \"国家代码\", \"default\": \"US\"}, \"search_lang\": {\"type\": \"string\", \"description\": \"搜索语言\", \"default\": \"en\"}, \"safesearch\": {\"type\": \"string\", \"description\": \"安全搜索\", \"default\": \"moderate\"} }, self._video_search ) async def _web_search(self, args): \"\"\"网络搜索\"\"\" url = \"https://api.search.brave.com/res/v1/web/search\" headers = { \"Accept\": \"application/json\", \"Accept-Encoding\": \"gzip\", \"X-Subscription-Token\": self.brave_api_key } params = { \"q\": args[\"query\"], \"count\": args.get(\"count\", 10), \"country\": args.get(\"country\", \"US\"), \"search_lang\": args.get(\"search_lang\", \"en\"), \"safesearch\": args.get(\"safesearch\", \"moderate\") } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, params=params) as response: if response.status == 200:  data = await response.json()  results = []  if \"web\" in data and \"results\" in data[\"web\"]: for result in data[\"web\"][\"results\"]: results.append({ \"title\": result.get(\"title\", \"\"), \"url\": result.get(\"url\", \"\"), \"description\": result.get(\"description\", \"\"), \"meta_url\": result.get(\"meta_url\", {}).get(\"netloc\", \"\"), \"published\": result.get(\"age\", \"\"), \"family_friendly\": result.get(\"family_friendly\", True) })  return { \"query\": args[\"query\"], \"results\": results, \"total_results\": len(results)  } else:  error = await response.text()  raise Exception(f\"搜索失败: {error}\") async def _news_search(self, args): \"\"\"新闻搜索\"\"\" url = \"https://api.search.brave.com/res/v1/news/search\" headers = { \"Accept\": \"application/json\", \"Accept-Encoding\": \"gzip\", \"X-Subscription-Token\": self.brave_api_key } params = { \"q\": args[\"query\"], \"count\": args.get(\"count\", 10), \"country\": args.get(\"country\", \"US\"), \"search_lang\": args.get(\"search_lang\", \"en\") } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, params=params) as response: if response.status == 200:  data = await response.json()  results = []  if \"results\" in data: for result in data[\"results\"]: results.append({ \"title\": result.get(\"title\", \"\"), \"url\": result.get(\"url\", \"\"), \"description\": result.get(\"description\", \"\"), \"meta_url\": result.get(\"meta_url\", {}).get(\"netloc\", \"\"), \"published\": result.get(\"age\", \"\"), \"breaking\": result.get(\"breaking\", False) })  return { \"query\": args[\"query\"], \"results\": results, \"total_results\": len(results)  } else:  error = await response.text()  raise Exception(f\"新闻搜索失败: {error}\") async def _image_search(self, args): \"\"\"图像搜索\"\"\" url = \"https://api.search.brave.com/res/v1/images/search\" headers = { \"Accept\": \"application/json\", \"Accept-Encoding\": \"gzip\", \"X-Subscription-Token\": self.brave_api_key } params = { \"q\": args[\"query\"], \"count\": args.get(\"count\", 10), \"country\": args.get(\"country\", \"US\"), \"search_lang\": args.get(\"search_lang\", \"en\"), \"safesearch\": args.get(\"safesearch\", \"moderate\") } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, params=params) as response: if response.status == 200:  data = await response.json()  results = []  if \"results\" in data: for result in data[\"results\"]: results.append({ \"title\": result.get(\"title\", \"\"), \"url\": result.get(\"url\", \"\"), \"thumbnail\": result.get(\"thumbnail\", {}).get(\"src\", \"\"), \"properties\": {  \"url\": result.get(\"properties\", {}).get(\"url\", \"\"),  \"width\": result.get(\"properties\", {}).get(\"width\", 0),  \"height\": result.get(\"properties\", {}).get(\"height\", 0),  \"format\": result.get(\"properties\", {}).get(\"format\", \"\") } })  return { \"query\": args[\"query\"], \"results\": results, \"total_results\": len(results)  } else:  error = await response.text()  raise Exception(f\"图像搜索失败: {error}\") async def _video_search(self, args): \"\"\"视频搜索\"\"\" url = \"https://api.search.brave.com/res/v1/videos/search\" headers = { \"Accept\": \"application/json\", \"Accept-Encoding\": \"gzip\", \"X-Subscription-Token\": self.brave_api_key } params = { \"q\": args[\"query\"], \"count\": args.get(\"count\", 10), \"country\": args.get(\"country\", \"US\"), \"search_lang\": args.get(\"search_lang\", \"en\"), \"safesearch\": args.get(\"safesearch\", \"moderate\") } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, params=params) as response: if response.status == 200:  data = await response.json()  results = []  if \"results\" in data: for result in data[\"results\"]: results.append({ \"title\": result.get(\"title\", \"\"), \"url\": result.get(\"url\", \"\"), \"description\": result.get(\"description\", \"\"), \"thumbnail\": result.get(\"thumbnail\", {}).get(\"src\", \"\"), \"duration\": result.get(\"video\", {}).get(\"duration\", \"\"), \"views\": result.get(\"video\", {}).get(\"views\", 0), \"published\": result.get(\"age\", \"\") })  return { \"query\": args[\"query\"], \"results\": results, \"total_results\": len(results)  } else:  error = await response.text()  raise Exception(f\"视频搜索失败: {error}\")

MCP 的优势与挑战

优势

  1. 标准化统一: MCP提供了统一的接口标准,简化了AI应用与外部服务的集成
  2. 安全可靠: 通过标准化的安全协议,确保数据交互的安全性
  3. 易于扩展: 开发者只需实现MCP接口,就能被所有支持MCP的AI应用使用
  4. 生态互通: 不同的AI应用可以共享相同的MCP服务,促进生态系统的发展

挑战

  1. 学习成本: 开发者需要学习新的协议标准和实现方式
  2. 生态成熟度: 作为新兴标准,生态系统还在建设阶段
  3. 性能考虑: 额外的协议层可能带来一定的性能开销
  4. 兼容性: 需要确保与现有系统的兼容性

未来展望

MCP不仅仅是API技术的演进,而是AI系统理解和交互数字世界方式的革命。随着越来越多的服务提供商采用MCP标准,我们可以期待:

  1. 更丰富的生态系统: 更多的服务和工具将支持MCP
  2. 更智能的应用: AI应用将能够无缝连接到各种数据源和服务
  3. 更高的开发效率: 开发者可以更快速地构建复杂的AI应用
  4. 更好的用户体验: 用户可以通过自然语言与多个系统进行交互

结论

模型上下文协议(MCP)代表了AI应用开发的重要进步。通过提供标准化的接口,MCP使得AI模型能够更容易地连接到各种外部服务和数据源。虽然目前还处于早期阶段,但随着生态系统的不断完善,MCP有望成为AI应用开发的重要基础设施。

对于开发者来说,现在是开始了解和实验MCP的好时机。通过本文介绍的30+个开源项目,你可以快速上手并开始构建自己的MCP应用。无论是简单的数据查询还是复杂的工作流程自动化,MCP都能为你的AI应用提供强大的扩展能力。

从长远来看,MCP将推动AI应用从孤立的工具演变为真正的智能助手,能够理解用户需求并自动完成跨平台的复杂任务。这不仅会提高我们的工作效率,还将开启全新的人机交互模式。

康辉旅游网