Hyperf WebSocket开发:实时通信服务器与客户端实现
Hyperf WebSocket开发:实时通信服务器与客户端实现
【免费下载链接】hyperf 🚀 A coroutine framework that focuses on hyperspeed and flexibility. Building microservice or middleware with ease. 项目地址: https://gitcode.com/hyperf/hyperf
引言:为什么选择WebSocket?
在现代Web应用开发中,实时通信已成为不可或缺的功能需求。无论是聊天应用、实时数据监控、在线协作工具还是游戏应用,都需要低延迟、双向通信的能力。传统的HTTP协议基于请求-响应模式,无法满足真正的实时通信需求。
Hyperf框架基于Swoole扩展,提供了强大的WebSocket支持,让开发者能够轻松构建高性能的实时通信应用。本文将深入探讨Hyperf WebSocket的开发实践,涵盖服务器端和客户端的完整实现方案。
WebSocket基础概念
WebSocket协议简介
WebSocket是一种在单个TCP连接上进行全双工通信的协议,于2011年被IETF标准化为RFC 6455。与HTTP相比,WebSocket具有以下优势:
- 低延迟:建立连接后无需重复握手
- 双向通信:服务器可以主动向客户端推送数据
- 减少带宽:相比HTTP轮询,数据传输更高效
- 持久连接:连接建立后保持活跃状态
WebSocket通信流程
Hyperf WebSocket服务器开发
环境准备与安装
首先确保已安装Hyperf框架和Swoole扩展:
# 创建Hyperf项目composer create-project hyperf/hyperf-skeleton websocket-app# 进入项目目录cd websocket-app# 安装WebSocket服务器组件composer require hyperf/websocket-server# 检查Swoole扩展php --ri swoole
服务器配置
修改 config/autoload/server.php
配置文件:
SWOOLE_PROCESS, \'servers\' => [ [ \'name\' => \'http\', \'type\' => Server::SERVER_HTTP, \'host\' => \'0.0.0.0\', \'port\' => 9501, \'sock_type\' => SWOOLE_SOCK_TCP, \'callbacks\' => [ Event::ON_REQUEST => [Hyperf\\HttpServer\\Server::class, \'onRequest\'], ], \'settings\' => [ \'open_websocket_protocol\' => false, ] ], [ \'name\' => \'ws\', \'type\' => Server::SERVER_WEBSOCKET, \'host\' => \'0.0.0.0\', \'port\' => 9502, \'sock_type\' => SWOOLE_SOCK_TCP, \'callbacks\' => [ Event::ON_HAND_SHAKE => [Hyperf\\WebSocketServer\\Server::class, \'onHandShake\'], Event::ON_MESSAGE => [Hyperf\\WebSocketServer\\Server::class, \'onMessage\'], Event::ON_CLOSE => [Hyperf\\WebSocketServer\\Server::class, \'onClose\'], ], \'settings\' => [ \'heartbeat_check_interval\' => 60, \'heartbeat_idle_time\' => 600, ] ], ], \'settings\' => [ \'enable_coroutine\' => true, \'worker_num\' => swoole_cpu_num(), \'pid_file\' => BASE_PATH . \'/runtime/hyperf.pid\', \'open_tcp_nodelay\' => true, \'max_coroutine\' => 100000, \'open_http2_protocol\' => true, \'max_request\' => 1000000, \'socket_buffer_size\' => 2 * 1024 * 1024, \'buffer_output_size\' => 2 * 1024 * 1024, ], \'callbacks\' => [ Event::ON_WORKER_START => [Hyperf\\Framework\\Bootstrap\\WorkerStartCallback::class, \'onWorkerStart\'], Event::ON_PIPE_MESSAGE => [Hyperf\\Framework\\Bootstrap\\PipeMessageCallback::class, \'onPipeMessage\'], Event::ON_WORKER_EXIT => [Hyperf\\Framework\\Bootstrap\\WorkerExitCallback::class, \'onWorkerExit\'], ],];
路由配置
在 config/routes.php
中配置WebSocket路由:
<?phpuse Hyperf\\HttpServer\\Router\\Router;Router::addServer(\'ws\', function () { // 基础WebSocket连接 Router::get(\'/\', \'App\\Controller\\WebSocketController\'); // 多路径支持 Router::get(\'/chat\', \'App\\Controller\\ChatController\'); Router::get(\'/notify\', \'App\\Controller\\NotificationController\'); // 带参数的路由 Router::get(\'/room/{roomId}\', \'App\\Controller\\RoomController\');});
WebSocket控制器实现
创建完整的WebSocket控制器:
fd; // 存储连接信息到上下文 Context::set(\'user_id\', $this->getUserIdFromRequest($request)); Context::set(\'connected_at\', time()); Context::set(\'client_ip\', $request->server[\'remote_addr\']); // 发送欢迎消息 $response = (new Response($server))->init($request); $response->push(new Frame( payloadData: json_encode([ \'type\' => \'welcome\', \'message\' => \'连接成功\', \'fd\' => $fd, \'timestamp\' => time() ]) )); // 记录连接日志 $this->logConnection(\'open\', $fd, $request); } /** * 收到消息时处理 */ public function onMessage($server, $frame): void { $response = (new Response($server))->init($frame); // 处理PING帧 if ($frame->opcode == Opcode::PING) { $response->push(new Frame(opcode: Opcode::PONG)); return; } try { $data = json_decode($frame->data, true); $messageType = $data[\'type\'] ?? \'unknown\'; switch ($messageType) { case \'chat\': $this->handleChatMessage($server, $frame->fd, $data); break; case \'broadcast\': $this->handleBroadcastMessage($server, $data); break; case \'ping\': $response->push(new Frame( payloadData: json_encode([\'type\' => \'pong\', \'timestamp\' => time()]) )); break; default: $response->push(new Frame( payloadData: json_encode([ \'type\' => \'error\', \'message\' => \'未知的消息类型\' ]) )); } } catch (\\Exception $e) { $response->push(new Frame( payloadData: json_encode([ \'type\' => \'error\', \'message\' => \'消息处理失败: \' . $e->getMessage() ]) )); } } /** * 连接关闭时处理 */ public function onClose($server, int $fd, int $reactorId): void { // 清理连接上下文 $userId = Context::get(\'user_id\'); $this->logConnection(\'close\', $fd, [\'user_id\' => $userId]); // 通知其他用户该用户下线 $this->broadcastUserStatus($server, $userId, \'offline\'); } /** * 处理聊天消息 */ private function handleChatMessage($server, $fd, array $data): void { $message = [ \'type\' => \'chat\', \'from\' => Context::get(\'user_id\'), \'content\' => $data[\'content\'] ?? \'\', \'timestamp\' => time(), \'message_id\' => uniqid() ]; // 如果是私聊 if (isset($data[\'to\'])) { $this->sendToUser($server, $data[\'to\'], $message); } else { // 群发 $this->broadcastMessage($server, $message, [$fd]); } } /** * 广播消息处理 */ private function handleBroadcastMessage($server, array $data): void { $message = [ \'type\' => \'broadcast\', \'content\' => $data[\'content\'] ?? \'\', \'timestamp\' => time(), \'from\' => \'system\' ]; $this->broadcastMessage($server, $message); } /** * 向指定用户发送消息 */ private function sendToUser($server, $userId, array $message): void { // 这里需要实现用户ID到FD的映射 // 实际项目中可以使用Redis等存储映射关系 $targetFd = $this->getFdByUserId($userId); if ($targetFd) { $this->sender->push($targetFd, json_encode($message)); } } /** * 广播消息给所有连接(可选排除某些FD) */ private function broadcastMessage($server, array $message, array $excludeFds = []): void { $messageJson = json_encode($message); foreach ($server->connections as $fd) { if ($server->isEstablished($fd) && !in_array($fd, $excludeFds)) { $this->sender->push($fd, $messageJson); } } } /** * 广播用户状态变化 */ private function broadcastUserStatus($server, $userId, $status): void { if ($userId) { $message = [ \'type\' => \'user_status\', \'user_id\' => $userId, \'status\' => $status, \'timestamp\' => time() ]; $this->broadcastMessage($server, $message); } } /** * 从请求中获取用户ID(示例) */ private function getUserIdFromRequest($request): string { // 实际项目中可以从token、cookie等获取用户信息 return $request->cookie[\'user_id\'] ?? \'guest_\' . $request->fd; } /** * 根据用户ID获取FD(示例) */ private function getFdByUserId($userId): ?int { // 实际项目中需要维护用户ID和FD的映射关系 return null; } /** * 记录连接日志 */ private function logConnection(string $action, int $fd, $data): void { // 实际项目中可以记录到日志文件或数据库 echo sprintf( \"[%s] FD %d %s: %s\\n\", date(\'Y-m-d H:i:s\'), $fd, $action, json_encode($data) ); }}
中间件配置
在 config/autoload/middlewares.php
中配置WebSocket中间件:
[ Hyperf\\Validation\\Middleware\\ValidationMiddleware::class, Hyperf\\Session\\Middleware\\SessionMiddleware::class, ], \'ws\' => [ // WebSocket认证中间件 App\\Middleware\\WebSocketAuthMiddleware::class, // 跨域中间件 Hyperf\\Cors\\Middleware\\CorsMiddleware::class, // 请求日志中间件 App\\Middleware\\WebSocketLogMiddleware::class, ],];
Hyperf WebSocket客户端开发
客户端组件安装
composer require hyperf/websocket-client
客户端工厂类使用
3, \'headers\' => [ \'User-Agent\' => \'Hyperf-WebSocket-Client/1.0\', ], ]; $options = array_merge($defaultOptions, $options); return $this->clientFactory->create($url, true, $options); } /** * 发送消息并获取响应 */ public function sendMessage(string $url, array $message, float $timeout = 2): ?array { $client = $this->createConnection($url); try { // 发送消息 $client->push(json_encode($message)); // 接收响应 /** @var Frame $frame */ $frame = $client->recv($timeout); if ($frame && $frame->data) { return json_decode($frame->data, true); } return null; } finally { $client->close(); } } /** * 保持长连接监听消息 */ public function keepAliveConnection(string $url, callable $messageHandler, array $initialMessage = null): void { $client = $this->createConnection($url, [\'auto_close\' => false]); try { // 发送初始消息 if ($initialMessage) { $client->push(json_encode($initialMessage)); } // 持续监听消息 while (true) { /** @var Frame $frame */ $frame = $client->recv(1); if ($frame && $frame->data) { $data = json_decode($frame->data, true); $messageHandler($data, $client); } // 每秒发送心跳 static $lastPing = 0; if (time() - $lastPing >= 5) { $client->push(json_encode([\'type\' => \'ping\'])); $lastPing = time(); } usleep(100000); // 100ms } } finally { $client->close(); } }}
在HTTP控制器中使用WebSocket客户端
webSocketService->sendMessage($url, [ \'type\' => \'chat\', \'content\' => \'Hello from HTTP client!\', \'timestamp\' => time() ]); return [ \'success\' => true, \'response\' => $response, \'server_time\' => date(\'Y-m-d H:i:s\') ]; } /** * 广播消息示例 */ #[GetMapping(\"broadcast\")] public function broadcastMessage() { $url = \"ws://127.0.0.1:9502\"; $response = $this->webSocketService->sendMessage($url, [ \'type\' => \'broadcast\', \'content\' => \'系统广播消息\', \'from\' => \'admin\', \'timestamp\' => time() ]); return [ \'success\' => true, \'message\' => \'广播消息已发送\', \'response\' => $response ]; }}
高级特性与最佳实践
连接管理与状态维护
redis->hMSet($key, $data); $this->redis->expire($key, 86400); // 24小时过期 // 维护用户ID到FD的映射 if (isset($data[\'user_id\'])) { $userKey = self::USER_MAPPING_PREFIX . $data[\'user_id\']; $this->redis->sAdd($userKey, $fd); $this->redis->expire($userKey, 86400); } } /** * 获取连接信息
【免费下载链接】hyperf 🚀 A coroutine framework that focuses on hyperspeed and flexibility. Building microservice or middleware with ease. 项目地址: https://gitcode.com/hyperf/hyperf
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考