Java MCP 服务器开发详细指南(小白入门版)_java mcp服务
Java MCP 服务器开发详细指南(小白入门版)
本指南提供了使用Java开发MCP(消息控制协议)服务器的详细步骤教程。本文专为初学者设计,包含环境搭建、功能开发、代码配置的完整流程和详细说明。
目录
-
介绍
- 什么是MCP服务器?
- 前置要求
-
开发环境搭建
- 安装Java开发工具包(JDK)
- 选择集成开发环境(IDE)
- 配置构建工具(Maven)
-
基础服务器结构创建
- 项目设置(Maven示例)
- 核心服务器类
- 处理客户端连接
-
实现MCP协议逻辑
- 消息解析
- 处理不同消息类型
- 发送响应
-
配置管理
- 服务器端口和地址配置
- 日志配置
- 外部配置文件
-
高级主题(可选)
- 多线程与并发
- 错误处理
- 安全性考虑
- 测试
-
部署
- 构建应用程序
- 运行服务器
-
总结
1. 介绍
什么是MCP服务器?
MCP(消息控制协议)服务器是一种用于管理客户端和服务器之间通信的中央服务器,它使用特定的消息协议来处理各种请求和响应。在实际应用中,MCP服务器可以用于:
- 实时数据同步
- 命令执行和控制
- 集中式消息处理
- 多客户端协调
下图展示了一个典型的MCP服务器架构:
客户端A 客户端B 客户端C ↓ ↓ ↓ │ │ │ └───────┬──────┴─────────────┘ │ ┌─────▼─────┐ │ MCP服务器 │ └───────────┘
前置要求
开始开发前,请确保你具备以下条件:
- 基础Java知识:理解Java核心概念(类、对象、方法、循环、条件判断、基本I/O操作)
- Java开发工具包(JDK):版本8或更高(本指南将介绍安装步骤)
- 集成开发环境(IDE):如IntelliJ IDEA、Eclipse或VS Code。本指南将使用IntelliJ IDEA示例
- 构建自动化工具:Maven或Gradle。本指南主要使用Maven示例
- 基本网络知识:IP地址、端口、TCP/IP等概念(有助于理解,但不是严格要求)
2. 开发环境搭建
安装Java开发工具包(JDK)
-
下载:访问Oracle JDK下载页面或使用开源替代品如OpenJDK(例如Adoptium Temurin)。下载适合你操作系统(Windows、macOS、Linux)的安装包。
-
安装:运行安装程序并按照屏幕上的说明进行操作。
-
验证安装:打开终端或命令提示符并输入以下命令:
java -versionjavac -version
你应该看到显示已安装的Java版本信息。例如:
java version \"1.8.0_291\"Java(TM) SE Runtime Environment (build 1.8.0_291-b10)Java HotSpot(TM) 64-Bit Server VM (build 25.291-b10, mixed mode)
-
设置JAVA_HOME环境变量(建议但非必须):配置
JAVA_HOME
环境变量指向你的JDK安装目录。这通常是Maven/Gradle等工具所需的。在Windows上:
- 右键点击\"此电脑\",选择\"属性\"
- 点击\"高级系统设置\"
- 点击\"环境变量\"
- 在\"系统变量\"部分,点击\"新建\"
- 变量名输入
JAVA_HOME
- 变量值输入JDK安装路径(例如:
C:\\Program Files\\Java\\jdk1.8.0_291
) - 点击\"确定\"保存
然后,编辑系统变量
Path
,添加%JAVA_HOME%\\bin
选择集成开发环境(IDE)
使用集成开发环境可以使开发过程更加轻松。我们推荐使用IntelliJ IDEA社区版(免费版本)。
-
下载:前往JetBrains网站下载IntelliJ IDEA社区版。
-
安装:运行安装程序并按照说明进行操作。
-
首次启动配置:首次启动时,IDEA会引导你完成初始设置,包括UI主题、插件和其他偏好设置。
配置构建工具(Maven)
构建工具帮助管理依赖项、编译代码、运行测试和打包应用程序。
-
Maven安装:
- IntelliJ IDEA通常已经捆绑了Maven
- 如需单独安装,可从Apache Maven网站下载
-
验证安装:
- 打开终端,运行:
mvn -version
- 应显示Maven版本信息,例如:
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)Maven home: /usr/local/Cellar/maven/3.8.1/libexecJava version: 1.8.0_291, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre
- 打开终端,运行:
3. 基础服务器结构创建
让我们开始创建一个新项目并设置基本的服务器代码。
项目设置(Maven示例)
-
打开IntelliJ IDEA
-
创建新项目:
- 选择文件->新建->项目…
- 在左侧选择Maven
- 确保在项目SDK下选择了你的JDK
- 点击下一步
-
设置项目坐标:
- 输入GroupId(例如,
com.example
) - 输入ArtifactId(例如,
mcp-server
) - 点击下一步,然后点击完成
- 输入GroupId(例如,
IntelliJ将创建一个标准的Maven项目结构,包括pom.xml
文件。
pom.xml
(初始):
<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>mcp-server</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.13.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>com.example.McpServer</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>
说明:
groupId
:项目组织唯一标识符,通常使用公司/组织的反向域名artifactId
:项目唯一标识符version
:项目版本properties
:设置源代码和目标Java版本dependencies
:项目依赖项(这里引入了SLF4J日志API和Logback实现,以及Jackson JSON库)build
:配置了Maven Assembly插件以创建包含所有依赖的可执行JAR文件
核心服务器类
现在,让我们创建启动服务器的主类。
- 导航到
src/main/java
- 右键点击包(例如
com.example
)->新建->Java类 - 命名为
McpServer
src/main/java/com/example/McpServer.java
:
package com.example;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;/** * MCP服务器主类 - 负责启动服务器并接受客户端连接 */public class McpServer { private static final Logger log = LoggerFactory.getLogger(McpServer.class); private final int port; /** * 构造函数 - 初始化服务器端口 * @param port 服务器监听的端口号 */ public McpServer(int port) { this.port = port; } /** * 启动服务器并开始接受客户端连接 */ public void start() { log.info(\"正在启动MCP服务器,端口: {}...\", port); try (ServerSocket serverSocket = new ServerSocket(port)) { log.info(\"服务器启动成功。等待客户端连接...\"); while (true) { // 无限循环,持续接受连接 try { Socket clientSocket = serverSocket.accept(); // 等待客户端连接 log.info(\"客户端已连接,来源: {}\", clientSocket.getRemoteSocketAddress()); // 处理客户端连接(稍后实现) handleClient(clientSocket); } catch (IOException e) { log.error(\"接受客户端连接时出错\", e); // 根据错误情况决定是继续循环还是中断 } } } catch (IOException e) { log.error(\"无法在端口 {} 上启动服务器\", port, e); // 处理服务器启动失败(例如,端口已被占用) } finally { log.info(\"MCP服务器正在关闭。\"); } } /** * 处理单个客户端连接 * @param clientSocket 客户端套接字 */ private void handleClient(Socket clientSocket) { // TODO: 实现客户端处理逻辑(读取消息,写入响应) log.info(\"处理客户端: {}\", clientSocket.getRemoteSocketAddress()); // 暂时仅关闭连接 try { clientSocket.close(); } catch (IOException e) { log.error(\"关闭客户端套接字时出错\", e); } } /** * 应用程序入口点 * @param args 命令行参数(可选第一个参数:端口号) */ public static void main(String[] args) { int serverPort = 8080; // 默认端口,考虑使其可配置 if (args.length > 0) { try { serverPort = Integer.parseInt(args[0]); } catch (NumberFormatException e) { log.error(\"提供的端口号无效: {}。使用默认端口 {}。\", args[0], serverPort); } } McpServer server = new McpServer(serverPort); server.start(); }}
代码说明:
- 我们创建了一个简单的Java服务器,使用
ServerSocket
监听指定端口 accept()
方法阻塞等待客户端连接- 当客户端连接时,我们调用
handleClient()
方法处理连接 - 目前,
handleClient()
方法只是关闭连接,稍后我们会完善它 - 我们使用SLF4J/Logback记录服务器活动
main
方法允许通过命令行参数自定义端口号
处理客户端连接
我们现在需要修改handleClient
方法以实际处理客户端连接。为实现这一点,我们将:
- 为每个客户端连接创建一个新线程
- 读取客户端消息
- 解析消息并生成响应
- 将响应发送回客户端
首先,让我们定义一个简单的ClientHandler
类来处理单个客户端连接:
src/main/java/com/example/ClientHandler.java
:
package com.example;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;/** * 客户端处理器 - 处理单个客户端的消息交换 */public class ClientHandler implements Runnable { private static final Logger log = LoggerFactory.getLogger(ClientHandler.class); private final Socket clientSocket; /** * 构造函数 * @param socket 客户端套接字 */ public ClientHandler(Socket socket) { this.clientSocket = socket; } /** * 当线程启动时执行的主方法 */ @Override public void run() { log.info(\"开始处理来自 {} 的客户端连接\", clientSocket.getRemoteSocketAddress()); try ( // 创建输入输出流 BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true) ) { String inputLine; // 持续读取客户端消息,直到客户端关闭连接或发送特定的退出消息 while ((inputLine = in.readLine()) != null) { log.info(\"收到消息: {}\", inputLine); // 解析消息(实际实现应根据您的协议规范) String response = processMessage(inputLine); // 发送响应给客户端 out.println(response); // 检查是否需要关闭连接(例如,\"EXIT\"命令) if (\"EXIT\".equals(inputLine.trim().toUpperCase())) { log.info(\"客户端请求断开连接\"); break; } } } catch (IOException e) { log.error(\"处理客户端连接时发生I/O错误\", e); } finally { try { clientSocket.close(); log.info(\"已关闭与 {} 的连接\", clientSocket.getRemoteSocketAddress()); } catch (IOException e) { log.error(\"关闭客户端套接字时出错\", e); } } } /** * 处理传入的消息并生成响应 * @param message 客户端发送的消息 * @return 服务器响应 */ private String processMessage(String message) { // 这是一个非常简单的实现,实际应用中会更复杂 // 根据您的协议规范扩展此方法 if (message == null || message.trim().isEmpty()) { return \"ERROR: 空消息\"; } // 简单的消息格式:COMMAND:PARAMETER String[] parts = message.split(\":\", 2); String command = parts[0].trim().toUpperCase(); String parameter = (parts.length > 1) ? parts[1].trim() : \"\"; switch (command) { case \"HELLO\": return \"你好,\" + (parameter.isEmpty() ? \"客户端\" : parameter) + \"!\"; case \"TIME\": return \"当前服务器时间是: \" + java.time.LocalDateTime.now(); case \"ECHO\": return parameter; case \"EXIT\": return \"再见!\"; default: return \"未知命令: \" + command + \". 可用命令: HELLO, TIME, ECHO, EXIT\"; } }}
现在,让我们更新McpServer
类中的handleClient
方法来使用这个ClientHandler
:
更新McpServer.java
中的handleClient
方法:
/** * 处理单个客户端连接 * @param clientSocket 客户端套接字 */private void handleClient(Socket clientSocket) { // 创建一个新的客户端处理器 ClientHandler clientHandler = new ClientHandler(clientSocket); // 创建一个新线程来处理这个客户端 // 这使得服务器可以同时处理多个客户端 Thread clientThread = new Thread(clientHandler); clientThread.start(); log.info(\"已为客户端 {} 创建新线程\", clientSocket.getRemoteSocketAddress());}
代码说明:
ClientHandler
类实现了Runnable
接口,这样它可以在单独的线程中运行- 在
run()
方法中,我们:- 创建输入流(使用
BufferedReader
读取客户端消息) - 创建输出流(使用
PrintWriter
发送响应) - 持续读取并处理消息,直到连接关闭
- 创建输入流(使用
processMessage()
方法解析消息并生成响应。它:- 将消息分割为命令和参数部分
- 根据命令类型执行不同的动作
- 返回适当的响应
- 更新后的
handleClient()
方法在McpServer
类中:- 为每个客户端创建一个新的
ClientHandler
- 在新线程中启动处理器
- 记录线程创建情况
- 为每个客户端创建一个新的
以上代码提供了一个简单但功能完整的MCP服务器实现,它可以:
- 接受多个客户端的连接
- 为每个客户端创建一个专用线程
- 处理简单的文本协议命令
- 根据命令生成并发送响应
这是一个基础实现,你可以根据实际需求扩展它,添加更多功能。
4. 实现MCP协议逻辑
在前面部分中,我们实现了一个简单的基于文本的协议。在实际应用中,你可能需要一个更复杂的协议。本节将介绍如何实现一个基于JSON的MCP协议,它更有结构性和灵活性。
消息解析
首先,让我们定义我们的消息结构。我们将使用JSON格式,因为它易于读写,并且有很好的库支持。
src/main/java/com/example/message/McpMessage.java
:
package com.example.message;import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.annotation.JsonProperty;/** * MCP消息类 - 定义客户端和服务器之间交换的消息格式 */@JsonInclude(JsonInclude.Include.NON_NULL)public class McpMessage { @JsonProperty(\"type\") private String type; @JsonProperty(\"command\") private String command; @JsonProperty(\"data\") private Object data; @JsonProperty(\"status\") private String status; @JsonProperty(\"message\") private String message; @JsonProperty(\"requestId\") private String requestId; // 无参构造函数(Jackson需要) public McpMessage() { } /** * 创建一个请求消息 * @param command 命令名称 * @param data 与命令关联的数据 * @param requestId 请求标识符 * @return 请求消息对象 */ public static McpMessage createRequest(String command, Object data, String requestId) { McpMessage message = new McpMessage(); message.setType(\"REQUEST\"); message.setCommand(command); message.setData(data); message.setRequestId(requestId); return message; } /** * 创建一个成功响应消息 * @param data 响应数据 * @param requestId 对应请求的标识符 * @return 响应消息对象 */ public static McpMessage createSuccessResponse(Object data, String requestId) { McpMessage message = new McpMessage(); message.setType(\"RESPONSE\"); message.setStatus(\"SUCCESS\"); message.setData(data); message.setRequestId(requestId); return message; } /** * 创建一个错误响应消息 * @param errorMessage 错误消息 * @param requestId 对应请求的标识符 * @return 响应消息对象 */ public static McpMessage createErrorResponse(String errorMessage, String requestId) { McpMessage message = new McpMessage(); message.setType(\"RESPONSE\"); message.setStatus(\"ERROR\"); message.setMessage(errorMessage); message.setRequestId(requestId); return message; } // Getter和Setter方法 public String getType() { return type; } public void setType(String type) { this.type = type; } public String getCommand() { return command; } public void setCommand(String command) { this.command = command; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } @Override public String toString() { return \"McpMessage{\" + \"type=\'\" + type + \'\\\'\' + \", command=\'\" + command + \'\\\'\' + \", data=\" + data + \", status=\'\" + status + \'\\\'\' + \", message=\'\" + message + \'\\\'\' + \", requestId=\'\" + requestId + \'\\\'\' + \'}\'; }}
接下来,创建一个JSON消息解析器类:
src/main/java/com/example/message/MessageParser.java
:
package com.example.message;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 消息解析器 - 转换JSON字符串和McpMessage对象 */public class MessageParser { private static final Logger log = LoggerFactory.getLogger(MessageParser.class); private final ObjectMapper objectMapper; /** * 构造函数 */ public MessageParser() { this.objectMapper = new ObjectMapper(); } /** * 将JSON字符串解析为McpMessage对象 * @param jsonString JSON格式的消息字符串 * @return 解析后的McpMessage对象,如果解析失败则返回null */ public McpMessage parse(String jsonString) { try { return objectMapper.readValue(jsonString, McpMessage.class); } catch (JsonProcessingException e) { log.error(\"解析消息失败: {}\", jsonString, e); return null; } } /** * 将McpMessage对象转换为JSON字符串 * @param message 要转换的McpMessage对象 * @return JSON格式的字符串,如果转换失败则返回null */ public String toJson(McpMessage message) { try { return objectMapper.writeValueAsString(message); } catch (JsonProcessingException e) { log.error(\"将消息转换为JSON失败: {}\", message, e); return null; } }}
处理不同消息类型
现在我们的消息解析器已经准备好了,让我们创建一个更强大的客户端处理器,它可以处理我们的JSON消息格式:
src/main/java/com/example/JsonClientHandler.java
:
package com.example;import com.example.message.McpMessage;import com.example.message.MessageParser;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;import java.time.LocalDateTime;import java.util.HashMap;import java.util.Map;import java.util.UUID;/** * JSON客户端处理器 - 处理基于JSON的MCP协议消息 */public class JsonClientHandler implements Runnable { private static final Logger log = LoggerFactory.getLogger(JsonClientHandler.class); private final Socket clientSocket; private final MessageParser messageParser; /** * 构造函数 * @param socket 客户端套接字 */ public JsonClientHandler(Socket socket) { this.clientSocket = socket; this.messageParser = new MessageParser(); } /** * 当线程启动时执行的主方法 */ @Override public void run() { log.info(\"开始处理来自 {} 的JSON客户端连接\", clientSocket.getRemoteSocketAddress()); try ( BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true) ) { String inputLine; while ((inputLine = in.readLine()) != null) { log.info(\"收到JSON消息: {}\", inputLine); // 解析收到的JSON消息 McpMessage request = messageParser.parse(inputLine); if (request == null) { // 解析失败,发送错误响应 McpMessage errorResponse = McpMessage.createErrorResponse( \"无效的JSON格式\", UUID.randomUUID().toString()); out.println(messageParser.toJson(errorResponse)); continue; } // 处理消息 McpMessage response = processMessage(request); // 发送响应 String jsonResponse = messageParser.toJson(response); log.info(\"发送响应: {}\", jsonResponse); out.println(jsonResponse); // 检查是否需要关闭连接 if (\"EXIT\".equals(request.getCommand())) { log.info(\"客户端请求断开连接\"); break; } } } catch (IOException e) { log.error(\"处理客户端连接时发生I/O错误\", e); } finally { try { clientSocket.close(); log.info(\"已关闭与 {} 的连接\", clientSocket.getRemoteSocketAddress()); } catch (IOException e) { log.error(\"关闭客户端套接字时出错\", e); } } } /** * 处理传入的MCP消息并生成响应 * @param request 客户端发送的请求消息 * @return 服务器响应消息 */ private McpMessage processMessage(McpMessage request) { // 确保有一个requestId,如果请求中没有提供,则创建一个新的 String requestId = (request.getRequestId() != null) ? request.getRequestId() : UUID.randomUUID().toString(); // 检查请求类型 if (!\"REQUEST\".equals(request.getType())) { return McpMessage.createErrorResponse( \"无效的消息类型: \" + request.getType() + \", 预期: REQUEST\", requestId); } // 根据命令处理请求 String command = request.getCommand(); if (command == null) { return McpMessage.createErrorResponse(\"缺少命令\", requestId); } switch (command.toUpperCase()) { case \"HELLO\": return handleHelloCommand(request, requestId); case \"TIME\": return handleTimeCommand(requestId); case \"ECHO\": return handleEchoCommand(request, requestId); case \"EXIT\": return handleExitCommand(requestId); default: return McpMessage.createErrorResponse( \"未知命令: \" + command + \". 可用命令: HELLO, TIME, ECHO, EXIT\", requestId); } } /** * 处理HELLO命令 */ private McpMessage handleHelloCommand(McpMessage request, String requestId) { // 尝试从data中提取名称 String name = \"客户端\"; if (request.getData() instanceof String) { name = (String) request.getData(); } else if (request.getData() instanceof Map) { @SuppressWarnings(\"unchecked\") Map<String, Object> data = (Map<String, Object>) request.getData(); if (data.containsKey(\"name\")) { name = data.get(\"name\").toString(); } } Map<String, Object> responseData = new HashMap<>(); responseData.put(\"greeting\", \"你好,\" + name + \"!\"); responseData.put(\"timestamp\", System.currentTimeMillis()); return McpMessage.createSuccessResponse(responseData, requestId); } /** * 处理TIME命令 */ private McpMessage handleTimeCommand(String requestId) { Map<String, Object> responseData = new HashMap<>(); responseData.put(\"serverTime\", LocalDateTime.now().toString()); responseData.put(\"timestamp\", System.currentTimeMillis()); return McpMessage.createSuccessResponse(responseData, requestId); } /** * 处理ECHO命令 */ private McpMessage handleEchoCommand(McpMessage request, String requestId) { // 简单地回显收到的数据 return McpMessage.createSuccessResponse(request.getData(), requestId); } /** * 处理EXIT命令 */ private McpMessage handleExitCommand(String requestId) { Map<String, Object> responseData = new HashMap<>(); responseData.put(\"message\", \"再见!\"); return McpMessage.createSuccessResponse(responseData, requestId); }}
现在,让我们再次更新McpServer
类,使其使用我们的新JsonClientHandler
:
更新McpServer.java
中的handleClient
方法:
/** * 处理单个客户端连接 * @param clientSocket 客户端套接字 */private void handleClient(Socket clientSocket) { // 创建一个新的JSON客户端处理器 JsonClientHandler clientHandler = new JsonClientHandler(clientSocket); // 创建一个新线程来处理这个客户端 Thread clientThread = new Thread(clientHandler); clientThread.start(); log.info(\"已为客户端 {} 创建JSON处理线程\", clientSocket.getRemoteSocketAddress());}
发送响应
现在我们的服务器能够处理基于JSON的MCP消息,让我们创建一个简单的客户端示例,展示如何与服务器通信:
src/main/java/com/example/client/McpClient.java
:
package com.example.client;import com.example.message.McpMessage;import com.example.message.MessageParser;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;import java.util.HashMap;import java.util.Map;import java.util.UUID;import java.util.concurrent.atomic.AtomicBoolean;/** * MCP客户端 - 连接到MCP服务器并发送/接收消息 */public class McpClient { private static final Logger log = LoggerFactory.getLogger(McpClient.class); private final String serverAddress; private final int serverPort; private final MessageParser messageParser; private Socket socket; private PrintWriter out; private BufferedReader in; private final AtomicBoolean running = new AtomicBoolean(false); private Thread listenerThread; /** * 构造函数 * @param serverAddress 服务器地址 * @param serverPort 服务器端口 */ public McpClient(String serverAddress, int serverPort) { this.serverAddress = serverAddress; this.serverPort = serverPort; this.messageParser = new MessageParser(); } /** * 连接到服务器 * @throws IOException 如果连接失败 */ public void connect() throws IOException { log.info(\"连接到服务器 {}:{}...\", serverAddress, serverPort); socket = new Socket(serverAddress, serverPort); out = new PrintWriter(socket.getOutputStream(), true); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); running.set(true); // 启动监听线程 listenerThread = new Thread(this::listen); listenerThread.start(); log.info(\"已连接到服务器\"); } /** * 断开与服务器的连接 */ public void disconnect() { log.info(\"断开与服务器的连接...\"); running.set(false); try { if (out != null) out.close(); if (in != null) in.close(); if (socket != null) socket.close(); if (listenerThread != null) { listenerThread.interrupt(); listenerThread.join(2000); // 等待最多2秒 } } catch (IOException | InterruptedException e) { log.error(\"断开连接时出错\", e); } log.info(\"已断开连接\"); } /** * 发送命令到服务器 * @param command 命令名称 * @param data 命令数据 * @return 请求ID */ public String sendCommand(String command, Object data) { if (!running.get() || socket == null || socket.isClosed()) { log.error(\"客户端未连接,无法发送命令\"); return null; } String requestId = UUID.randomUUID().toString(); McpMessage request = McpMessage.createRequest(command, data, requestId); String jsonRequest = messageParser.toJson(request); log.info(\"发送命令: {}\", jsonRequest); out.println(jsonRequest); return requestId; } /** * 监听并处理服务器响应的后台线程 */ private void listen() { log.info(\"启动响应监听器\"); try { String inputLine; while (running.get() && (inputLine = in.readLine()) != null) { log.info(\"收到响应: {}\", inputLine); McpMessage response = messageParser.parse(inputLine); if (response == null) { log.error(\"收到无效的JSON响应\"); continue; } handleResponse(response); } } catch (IOException e) { if (running.get()) { log.error(\"读取服务器响应时出错\", e); } } log.info(\"响应监听器已停止\"); } /** * 处理服务器响应 * @param response 服务器响应消息 */ private void handleResponse(McpMessage response) { // 在实际应用中,你可能会将响应转发给特定的回调或处理程序 // 这里我们只是记录它们 if (\"SUCCESS\".equals(response.getStatus())) { log.info(\"成功响应 (ID: {}): {}\", response.getRequestId(), response.getData()); } else if (\"ERROR\".equals(response.getStatus())) { log.error(\"错误响应 (ID: {}): {}\", response.getRequestId(), response.getMessage()); } else { log.warn(\"未知响应状态 (ID: {}): {}\", response.getRequestId(), response.getStatus()); } } /** * 示例客户端用法 */ public static void main(String[] args) { McpClient client = new McpClient(\"localhost\", 8080); try { client.connect(); // 发送HELLO命令 Map<String, String> helloData = new HashMap<>(); helloData.put(\"name\", \"小明\"); client.sendCommand(\"HELLO\", helloData); // 发送TIME命令 client.sendCommand(\"TIME\", null); // 发送ECHO命令 Map<String, Object> echoData = new HashMap<>(); echoData.put(\"message\", \"测试消息\"); echoData.put(\"timestamp\", System.currentTimeMillis()); client.sendCommand(\"ECHO\", echoData); // 等待片刻以接收响应 Thread.sleep(2000); // 发送EXIT命令 client.sendCommand(\"EXIT\", null); // 等待片刻以接收最后的响应 Thread.sleep(1000); } catch (IOException | InterruptedException e) { log.error(\"客户端操作出错\", e); } finally { client.disconnect(); } }}
代码说明:
-
JsonClientHandler
类处理基于JSON的MCP协议消息:- 解析收到的JSON消息
- 根据命令类型调用适当的处理方法
- 生成并发送JSON格式的响应
-
McpClient
类提供一个客户端实现:- 连接到MCP服务器
- 发送命令/请求
- 接收和处理响应
- 提供一个演示所有功能的
main
方法
以上代码提供了一个更完整的MCP服务器和客户端实现,使用了:
- 基于JSON的消息格式
- 请求和响应消息类型
- 请求ID用于关联请求和响应
- 多线程客户端处理
- 异步响应处理
5. 配置管理
目前,我们的代码中包含一些硬编码的设置,如服务器端口。在实际应用中,这些值应该是可配置的。本节将介绍如何使用配置文件和环境变量来使我们的应用程序更加灵活。
服务器端口和地址配置
让我们创建一个简单的配置类:
src/main/java/com/example/config/ServerConfig.java
:
package com.example.config;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStream;import java.util.Properties;/** * 服务器配置 - 从属性文件和环境变量加载配置 */public class ServerConfig { private static final Logger log = LoggerFactory.getLogger(ServerConfig.class); private final Properties properties = new Properties(); // 默认配置值 private static final int DEFAULT_PORT = 8080; private static final int DEFAULT_BACKLOG = 50; private static final int DEFAULT_READ_TIMEOUT = 30000; // 30 seconds /** * 构造函数 * @param configPath 配置文件路径(可以为null,使用默认值) */ public ServerConfig(String configPath) { // 首先加载默认属性 loadDefaultProperties(); // 尝试从配置文件加载属性 if (configPath != null) { loadFromFile(configPath); } // 用环境变量覆盖属性 loadFromEnvironment(); // 记录配置 log.info(\"服务器配置: port={}, backlog={}, readTimeout={}ms\", getPort(), getBacklog(), getReadTimeout()); } private void loadDefaultProperties() { properties.setProperty(\"server.port\", String.valueOf(DEFAULT_PORT)); properties.setProperty(\"server.backlog\", String.valueOf(DEFAULT_BACKLOG)); properties.setProperty(\"server.readTimeout\", String.valueOf(DEFAULT_READ_TIMEOUT)); } private void loadFromFile(String configPath) { try (InputStream input = new FileInputStream(configPath)) { properties.load(input); log.info(\"从 {} 加载配置\", configPath); } catch (IOException e) { log.warn(\"无法加载配置文件: {}. 使用默认值\", configPath, e); } } private void loadFromEnvironment() { // 环境变量名通常是大写的,带有下划线 String portEnv = System.getenv(\"SERVER_PORT\"); if (portEnv != null && !portEnv.isEmpty()) { properties.setProperty(\"server.port\", portEnv); log.info(\"从环境变量设置端口: {}\", portEnv); } String backlogEnv = System.getenv(\"SERVER_BACKLOG\"); if (backlogEnv != null && !backlogEnv.isEmpty()) { properties.setProperty(\"server.backlog\", backlogEnv); } String readTimeoutEnv = System.getenv(\"SERVER_READ_TIMEOUT\"); if (readTimeoutEnv != null && !readTimeoutEnv.isEmpty()) { properties.setProperty(\"server.readTimeout\", readTimeoutEnv); } } /** * 获取服务器端口 * @return 端口号 */ public int getPort() { return Integer.parseInt(properties.getProperty(\"server.port\")); } /** * 获取服务器连接积压数量 * @return 积压数量 */ public int getBacklog() { return Integer.parseInt(properties.getProperty(\"server.backlog\")); } /** * 获取读取超时(毫秒) * @return 超时时间 */ public int getReadTimeout() { return Integer.parseInt(properties.getProperty(\"server.readTimeout\")); }}
现在,让我们更新McpServer
类以使用这个配置:
更新McpServer.java
:
package com.example;import com.example.config.ServerConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketException;/** * MCP服务器主类 - 负责启动服务器并接受客户端连接 */public class McpServer { private static final Logger log = LoggerFactory.getLogger(McpServer.class); private final ServerConfig config; /** * 构造函数 * @param config 服务器配置 */ public McpServer(ServerConfig config) { this.config = config; } /** * 启动服务器并开始接受客户端连接 */ public void start() { log.info(\"正在启动MCP服务器,端口: {}...\", config.getPort()); try (ServerSocket serverSocket = new ServerSocket(config.getPort(), config.getBacklog())) { log.info(\"服务器启动成功。等待客户端连接...\"); // 设置读取超时 serverSocket.setSoTimeout(0); // 无限超时,accept会无限阻塞 while (true) { // 无限循环,持续接受连接 try { Socket clientSocket = serverSocket.accept(); // 等待客户端连接 log.info(\"客户端已连接,来源: {}\", clientSocket.getRemoteSocketAddress()); // 设置客户端套接字超时 try { clientSocket.setSoTimeout(config.getReadTimeout()); } catch (SocketException e) { log.warn(\"无法设置客户端套接字超时\", e); } // 处理客户端连接 handleClient(clientSocket); } catch (IOException e) { log.error(\"接受客户端连接时出错\", e); // 根据错误情况决定是继续循环还是中断 } } } catch (IOException e) { log.error(\"无法在端口 {} 上启动服务器\", config.getPort(), e); // 处理服务器启动失败(例如,端口已被占用) } finally { log.info(\"MCP服务器正在关闭。\"); } } /** * 处理单个客户端连接 * @param clientSocket 客户端套接字 */ private void handleClient(Socket clientSocket) { // 创建一个新的JSON客户端处理器 JsonClientHandler clientHandler = new JsonClientHandler(clientSocket); // 创建一个新线程来处理这个客户端 Thread clientThread = new Thread(clientHandler); clientThread.start(); log.info(\"已为客户端 {} 创建JSON处理线程\", clientSocket.getRemoteSocketAddress()); } /** * 应用程序入口点 * @param args 命令行参数(可选第一个参数:配置文件路径) */ public static void main(String[] args) { String configPath = args.length > 0 ? args[0] : \"server.properties\"; ServerConfig config = new ServerConfig(configPath); McpServer server = new McpServer(config); server.start(); }}
日志配置
我们已经使用了SLF4J和Logback进行日志记录,但尚未配置它们。让我们创建一个Logback配置文件:
src/main/resources/logback.xml
:
<configuration> <appender name=\"CONSOLE\" class=\"ch.qos.logback.core.ConsoleAppender\"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <appender name=\"FILE\" class=\"ch.qos.logback.core.rolling.RollingFileAppender\"> <file>logs/mcp-server.log</file> <rollingPolicy class=\"ch.qos.logback.core.rolling.TimeBasedRollingPolicy\"> <fileNamePattern>logs/mcp-server.%d{yyyy-MM-dd}.log</fileNamePattern> <maxHistory>30</maxHistory> </rollingPolicy> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <root level=\"info\"> <appender-ref ref=\"CONSOLE\" /> <appender-ref ref=\"FILE\" /> </root> <logger name=\"com.example\" level=\"debug\" /></configuration>
外部配置文件
让我们创建一个示例配置文件,可以放在项目根目录中:
server.properties
:
# MCP服务器配置# 服务器将监听的端口server.port=8080# 服务器连接积压队列大小server.backlog=50# 读取超时(毫秒)server.readTimeout=30000
代码说明:
-
ServerConfig
类用于加载和提供服务器配置:- 首先设置默认值
- 然后从属性文件加载配置
- 最后使用环境变量覆盖配置
- 提供方法来访问各种配置选项
-
更新后的
McpServer
类使用这个配置类:- 在构造函数中接受一个
ServerConfig
实例 - 使用配置值来设置服务器端口、积压和超时
- 在
main
方法中创建配置实例并传递给服务器
- 在构造函数中接受一个
-
Logback配置文件
logback.xml
:- 设置日志模式和级别
- 配置控制台和文件日志输出
- 为不同的包设置不同的日志级别
这样,我们的应用程序现在完全可配置,可以适应不同的环境和需求。
6. 高级主题(可选)
多线程与并发
在我们当前的实现中,每个客户端连接都在一个新线程中处理。虽然这种方法对于小型应用程序来说很简单,但对于需要处理大量并发连接的生产服务器来说,它可能不是最佳选择。
线程池
考虑使用线程池而不是为每个连接创建新线程:
private final ExecutorService threadPool = Executors.newFixedThreadPool(20); // 20个线程private void handleClient(Socket clientSocket) { JsonClientHandler clientHandler = new JsonClientHandler(clientSocket); threadPool.submit(clientHandler); log.info(\"客户端 {} 的处理提交到线程池\", clientSocket.getRemoteSocketAddress());}
非阻塞I/O
对于高度并发的应用程序,可以考虑使用Java NIO(非阻塞I/O):
import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.Selector;// ...public void startNio() throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.socket().bind(new InetSocketAddress(config.getPort())); serverChannel.configureBlocking(false); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 实现选择器循环...}
错误处理
在健壮的服务器实现中,错误处理至关重要。考虑以下策略:
优雅的客户端断开连接处理:
try { // 客户端处理代码...} catch (IOException e) { if (isConnectionReset(e)) { log.info(\"客户端断开连接: {}\", clientSocket.getRemoteSocketAddress()); } else { log.error(\"客户端处理时发生I/O错误\", e); }} finally { closeClientResourcesSafely();}private boolean isConnectionReset(IOException e) { return e instanceof SocketException && (\"Connection reset\".equals(e.getMessage()) || \"Socket closed\".equals(e.getMessage()));}
使用更复杂的异常层次结构:
package com.example.exception;public class McpException extends Exception { private final String errorCode; public McpException(String message, String errorCode) { super(message); this.errorCode = errorCode; } // 子类可以定义特定类型的异常...}
安全性考虑
在生产环境中,安全性是必不可少的:
TLS/SSL加密:
import javax.net.ssl.SSLServerSocketFactory;// ...SSLServerSocketFactory sslServerSocketFactory = (SSLServerSocketFactory) SSLServerSocketFactory.getDefault();ServerSocket serverSocket = sslServerSocketFactory.createServerSocket(config.getPort());
身份验证:
private boolean authenticate(McpMessage message) { String authToken = extractAuthToken(message); return securityService.validateToken(authToken);}
测试
对服务器进行充分测试至关重要:
单元测试:
import org.junit.jupiter.api.Test;import static org.junit.jupiter.api.Assertions.*;class MessageParserTest { @Test void testParseValidMessage() { MessageParser parser = new MessageParser(); String json = \"{\\\"type\\\":\\\"REQUEST\\\",\\\"command\\\":\\\"HELLO\\\"}\"; McpMessage message = parser.parse(json); assertNotNull(message); assertEquals(\"REQUEST\", message.getType()); assertEquals(\"HELLO\", message.getCommand()); }}
集成测试:
class ServerIntegrationTest { private McpServer server; private McpClient client; @BeforeEach void setup() { // 启动测试服务器和客户端... } @Test void testClientServerCommunication() { // 测试客户端-服务器通信... } @AfterEach void tearDown() { // 关闭服务器和客户端... }}
7. 部署
构建应用程序
我们已经在pom.xml
中配置了Maven Assembly插件,它可以创建一个包含所有依赖的可执行JAR文件。要构建应用程序,请运行:
mvn clean package
这将在target
目录中创建一个文件,如mcp-server-1.0-SNAPSHOT-jar-with-dependencies.jar
。
运行服务器
可以使用以下命令运行服务器:
java -jar target/mcp-server-1.0-SNAPSHOT-jar-with-dependencies.jar [配置文件路径]
要使用环境变量设置配置,可以这样做:
SERVER_PORT=9090 java -jar target/mcp-server-1.0-SNAPSHOT-jar-with-dependencies.jar
创建服务(Linux/systemd):
[Unit]Description=MCP ServerAfter=network.target[Service]Type=simpleUser=mcp-userWorkingDirectory=/opt/mcp-serverExecStart=/usr/bin/java -jar mcp-server-1.0-SNAPSHOT-jar-with-dependencies.jarRestart=on-failure[Install]WantedBy=multi-user.target
Docker部署:
Dockerfile
:
FROM openjdk:8-jre-alpineWORKDIR /appCOPY target/mcp-server-1.0-SNAPSHOT-jar-with-dependencies.jar /app/mcp-server.jarCOPY server.properties /app/server.propertiesEXPOSE 8080CMD [\"java\", \"-jar\", \"mcp-server.jar\", \"server.properties\"]
构建并运行Docker容器:
docker build -t mcp-server .docker run -p 8080:8080 mcp-server
8. 总结
恭喜!你已经完成了一个功能完整的MCP服务器的开发过程。在本指南中,我们涵盖了:
- 设置Java开发环境
- 创建基本的服务器结构
- 实现客户端连接处理
- 设计和实现基于JSON的消息协议
- 添加可配置的服务器设置
- 探讨了高级主题,如多线程、错误处理和安全性
- 展示了如何构建和部署应用程序