NodeJS搭建SSE接口服务
下面是一个基于Node.js的SSE(Server-Sent Events)接口服务的实现。这个示例使用Express框架搭建服务器,支持客户端订阅事件流并实时接收服务器推送的数据。
const express = require(\'express\');const cors = require(\'cors\');const { v4: uuidv4 } = require(\'uuid\');const app = express();const PORT = 3000;// 中间件app.use(cors());app.use(express.json());// 存储客户端连接const clients = new Map();// SSE 接口app.get(\'/sse\', (req, res) => { // 设置 SSE 响应头 res.setHeader(\'Content-Type\', \'text/event-stream\'); res.setHeader(\'Cache-Control\', \'no-cache\'); res.setHeader(\'Connection\', \'keep-alive\'); res.flushHeaders(); // 生成客户端ID并存储连接 const clientId = uuidv4(); const client = { id: clientId, res }; clients.set(clientId, client); // 发送初始消息 sendEvent(clientId, { type: \'connected\', data: { clientId } }); // 处理客户端断开连接 req.on(\'close\', () => { clients.delete(clientId); console.log(`Client ${clientId} disconnected`); });});// 向特定客户端发送事件function sendEvent(clientId, event) { const client = clients.get(clientId); if (client) { client.res.write(`data: ${JSON.stringify(event)}\\n\\n`); }}// 向所有客户端广播事件function broadcastEvent(event) { clients.forEach(client => { client.res.write(`data: ${JSON.stringify(event)}\\n\\n`); });}// 模拟定时推送数据setInterval(() => { broadcastEvent({ type: \'update\', data: { timestamp: new Date().toISOString(), message: \'定时更新数据\' } });}, 5000);// 手动触发推送接口app.post(\'/trigger\', (req, res) => { const { message } = req.body; broadcastEvent({ type: \'manual\', data: { timestamp: new Date().toISOString(), message } }); res.json({ success: true });});// 启动服务器app.listen(PORT, () => { console.log(`Server running on port ${PORT}`);});
代码说明:
-
依赖安装:
npm install express cors uuid
-
核心功能:
- SSE连接管理:使用
Map
存储所有客户端连接,每个连接有唯一ID。 - 事件发送:
sendEvent(clientId, event)
:向特定客户端发送事件。broadcastEvent(event)
:向所有客户端广播事件。
- 响应头设置:
Content-Type: text/event-streamCache-Control: no-cacheConnection: keep-alive
- SSE连接管理:使用
-
接口说明:
GET /sse
:客户端订阅SSE流的入口。POST /trigger
:手动触发向所有客户端推送消息(需提供message
字段)。
-
客户端示例:
const eventSource = new EventSource(\'http://localhost:3000/sse\');eventSource.onmessage = (event) => { const data = JSON.parse(event.data); console.log(\'收到消息:\', data);};eventSource.onerror = (error) => { console.error(\'SSE连接错误:\', error);};
特性与扩展:
- 心跳机制:可添加定期发送心跳消息以保持连接。
- 错误处理:包含基本的连接断开处理。
- 事件类型:支持不同类型的事件(
connected
,update
,manual
)。 - 安全增强:可添加身份验证、权限控制等。
这个实现提供了SSE服务的基础框架,你可以根据具体需求扩展其功能,如添加用户认证、事件过滤、持久化存储等。