Spring Boot SSE实战:SseEmitter实现多客户端事件广播与心跳保活
1. 添加依赖 (pom.xml
)
org.springframework.boot spring-boot-starter-web
2. 事件服务 (EventService.java)
import org.springframework.stereotype.Service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;import java.util.Map;import java.util.UUID;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;@Servicepublic class EventService { // 线程安全的Emitter存储 private final ConcurrentMap emitters = new ConcurrentHashMap(); // 心跳调度器 private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); // 事件计数器 private final AtomicInteger eventCounter = new AtomicInteger(0); public EventService() { // 启动心跳任务 (每25秒发送一次) heartbeatExecutor.scheduleAtFixedRate(this::broadcastHeartbeat, 0, 25, TimeUnit.SECONDS); } // 客户端订阅 public SseEmitter subscribe() { String clientId = UUID.randomUUID().toString(); SseEmitter emitter = new SseEmitter(60_000L); // 1分钟超时 // 注册事件处理器 emitter.onCompletion(() -> removeEmitter(clientId)); emitter.onTimeout(() -> { removeEmitter(clientId); emitter.complete(); }); emitter.onError(ex -> removeEmitter(clientId)); emitters.put(clientId, emitter); return emitter; } // 广播事件 public void broadcast(String eventName, Object data) { emitters.forEach((clientId, emitter) -> { try { emitter.send(SseEmitter.event() .id(String.valueOf(eventCounter.incrementAndGet())) .name(eventName) .data(data) ); } catch (IOException | IllegalStateException e) { removeEmitter(clientId); // 发送失败则移除 } }); } // 广播心跳 private void broadcastHeartbeat() { emitters.forEach((clientId, emitter) -> { try { emitter.send(SseEmitter.event() .comment(\"heartbeat\") // 发送注释类型的心跳 ); } catch (Exception ignored) { // 心跳失败不移除,等待超时机制处理 } }); } // 移除客户端 private void removeEmitter(String clientId) { SseEmitter emitter = emitters.remove(clientId); if (emitter != null) { emitter.complete(); } } // 关闭服务 (资源清理) public void shutdown() { // 1. 停止心跳线程 heartbeatExecutor.shutdownNow(); // 2. 关闭所有连接 emitters.forEach((id, emitter) -> { try { emitter.send(SseEmitter.event() .name(\"system\") .data(Map.of(\"action\", \"shutdown\")) ); } catch (Exception ignored) { } finally { emitter.complete(); } }); // 3. 清空集合 emitters.clear(); }}
3. 控制器 (EventController.java)
import org.springframework.http.MediaType;import org.springframework.web.bind.annotation.*;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@RestController@RequestMapping(\"/events\")public class EventController { private final EventService eventService; public EventController(EventService eventService) { this.eventService = eventService; } // 客户端订阅入口 @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter subscribe() { return eventService.subscribe(); } // 广播消息入口(这里是模拟消息推送过来,会把该条消息都放入到已订阅的客户端) @PostMapping(\"/broadcast\") public void broadcast(@RequestParam String message) { eventService.broadcast(\"message\", Map.of( \"content\", message, \"timestamp\", System.currentTimeMillis() )); }}
4. 应用配置 (Application.java)
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ApplicationContext;@SpringBootApplicationpublic class SseApplication { public static void main(String[] args) { ApplicationContext context = SpringApplication.run(SseApplication.class, args); // 注册优雅关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { EventService eventService = context.getBean(EventService.class); eventService.shutdown(); System.out.println(\"SSE资源已清理完成\"); })); }}
5. 客户端示例 (JavaScript)
SSE客户端
const messageContainer = document.getElementById(\'messages\'); let eventSource; function connect() { eventSource = new EventSource(\'http://localhost:8080/events\'); eventSource.addEventListener(\'message\', (e) => { const data = JSON.parse(e.data); addMessage(`消息: ${data.content} [${new Date(data.timestamp).toLocaleTimeString()}]`); }); eventSource.addEventListener(\'system\', (e) => { const data = JSON.parse(e.data); if (data.action === \'shutdown\') { addMessage(\'系统通知: 服务即将关闭\'); eventSource.close(); } }); eventSource.onerror = (e) => { addMessage(\'连接错误,3秒后重连...\'); setTimeout(connect, 3000); }; } function addMessage(text) { const p = document.createElement(\'p\'); p.textContent = text; messageContainer.appendChild(p); messageContainer.scrollTop = messageContainer.scrollHeight; } // 初始连接 connect();
关键机制说明
-
心跳机制:
-
每25秒发送一次空注释事件
:heartbeat
-
防止代理或负载均衡器关闭空闲连接
-
客户端可通过监听所有事件检测心跳
-
-
关闭流程:
-
客户端重连:
-
使用事件ID支持断线续传
-
客户端错误时自动重连
-
服务端关闭时发送系统通知
-