> 技术文档 > java后端http接口流式输出到前端_java接口流式输出

java后端http接口流式输出到前端_java接口流式输出


文章目录

  • 前置
  • 后端流式输出
    • 使用 springboot SseEmitter
    • 使用 springboot WebFlux
    • 使用 servlet
  • 扩展

前置

解释:
Server-Sent Events:服务器发送事件,是一种基于 HTTP 的轻量级协议,允许服务器主动向客户端推送文本数据(如 JSON、纯文本等)
特点:

  • 单向通信:仅服务器 → 客户端方向
  • 基于 HTTP/HTTPS:无需特殊协议
  • 自动重连:浏览器内置支持断线重连
  • 简单易用:前端直接使用 EventSource API

流程:

前端 (Vue) → 发起SSE请求 → Spring Boot (Controller) ↓ ↓(EventSource) ← 流式数据 ← WebClient → 大模型API (流式HTTP)

后端流式输出

使用 springboot SseEmitter

传统项目小范围流式推送 → SseEmitter(改动成本低)

轻量级 SSE 推送、兼容旧 MVC 项目
每个请求占用一个线程,高并发时资源消耗大

@RestControllerpublic class StreamingController { @GetMapping(\"/stream\") public SseEmitter streamText() { SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间(毫秒) ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(() -> { try { for (int i = 1; i <= 10; i++) {  String data = \"这是第 \" + i + \" 段内容\\n\";  emitter.send(SseEmitter.event().name(\"message\").data(data));  Thread.sleep(1000); // 模拟延迟 } emitter.complete(); // 结束流 } catch (IOException | InterruptedException e) { emitter.completeWithError(e); } finally { executor.shutdown(); } }); return emitter; }}

代码内部直接承接大模型 api 调用,然后流式输出到前端

@RestControllerpublic class ModelStreamController { // 创建线程池 private final ExecutorService executor = Executors.newCachedThreadPool(); @GetMapping(\"/model-stream\") public SseEmitter streamModel() { // 创建 SSE 发射器(60秒超时) SseEmitter emitter = new SseEmitter(60_000L); // 提交任务到线程池 executor.execute(() -> { try (CloseableHttpClient httpClient = HttpClients.createDefault()) { // 1. 创建到大模型API的请求 HttpGet request = new HttpGet(\"https://api.open-model.com/stream\"); // 2. 执行请求 HttpResponse response = httpClient.execute(request); // 3. 检查响应状态 if (response.getStatusLine().getStatusCode() != 200) {  throw new RuntimeException(\"API error: \" + response.getStatusLine().getStatusCode()); } // 4. 获取响应流 try (InputStream is = response.getEntity().getContent();  BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {  String line;  // 5. 逐行读取并转发  while ((line = reader.readLine()) != null) { // 6. 发送给前端 emitter.send(line);  }  // 7. 完成后关闭SSE  emitter.complete(); } } catch (Exception e) { // 8. 错误处理 emitter.completeWithError(e); } }); return emitter; }}

使用 springboot WebFlux

新建高并发/代理外部流式 API → Flux + WebFlux(性能与扩展性更优)

响应式编程(Reactive Streams)
每个请求占用一个线程,高并发时资源消耗大
全链路非阻塞,适合代理外部流式 API

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>

下面例子是转发大模型的响应结果

@RestControllerpublic class WebFluxController { @GetMapping(value = \"/flux-stream\", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> stream() { // 直接转发大模型的流式响应(非阻塞) WebClient webClient = WebClient.create(\"https://api.open-model.com/stream\"); return webClient.get() .retrieve() .bodyToFlux(String.class) .map(data -> \"data: \" + data + \"\\n\\n\") // 封装为 SSE 格式 .delayElements(Duration.ofMillis(100)); // 非阻塞延迟 }}

使用 servlet

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { // 设置响应头,声明内容类型为 SSE response.setContentType(\"text/event-stream\"); response.setCharacterEncoding(\"UTF-8\"); response.setHeader(\"Cache-Control\", \"no-cache\"); response.setHeader(\"Connection\", \"keep-alive\"); OutputStream outputStream = response.getOutputStream(); try { // 模拟流式传输数据 for (int i = 0; i < 5; i++) { String data = \"event: message\\ndata: {\" + i + \"}\\n\\n\"; outputStream.write(data.getBytes()); // 写入 outputStream.flush(); // 不断的发送 flush Thread.sleep(1000); // 模拟延迟 } // 发送自定义结束事件 String endEvent = \"data: [DONE]\\n\\n\"; outputStream.write(endEvent.getBytes()); outputStream.flush(); // 关闭流,通知前端结束 outputStream.close(); } catch (Exception e) { e.printStackTrace(); outputStream.close(); }}

如果是你的服务接入了大模型 api,大模型本身做的就是一个流式输出呢?承接后直接输出到前端

@WebServlet(\"/proxy-stream\")public class StreamingProxyServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // 设置响应头 response.setContentType(\"text/event-stream\"); response.setCharacterEncoding(\"UTF-8\"); response.setHeader(\"Cache-Control\", \"no-cache\"); response.setHeader(\"Connection\", \"keep-alive\"); // 构造 JSON 请求体 String jsonBody = \"{\\\"param1\\\": \\\"value1\\\", \\\"param2\\\": \\\"value2\\\"}\"; // 创建 HttpClient 实例 try (CloseableHttpClient httpClient = HttpClients.createDefault()) { HttpPost httpPost = new HttpPost(\"https://third-party.com/stream\"); // 替换为你的流式接口地址 // 设置请求头 httpPost.setHeader(\"Content-Type\", \"application/json; charset=UTF-8\"); // 设置请求体 httpPost.setEntity(new StringEntity(jsonBody, StandardCharsets.UTF_8)); // 执行请求 try (CloseableHttpResponse backendResponse = httpClient.execute(httpPost)) { // 检查后端响应状态码 int statusCode = backendResponse.getStatusLine().getStatusCode(); if (statusCode != 200) {  response.sendError(HttpServletResponse.SC_BAD_GATEWAY, \"后端接口返回状态码: \" + statusCode);  return; } // 获取后端流式接口的输入流 InputStream backendStream = backendResponse.getEntity().getContent(); // 获取前端响应的输出流 OutputStream frontendStream = response.getOutputStream(); byte[] buffer = new byte[4096]; int bytesRead; // 边读边写,实时透传 while ((bytesRead = backendStream.read(buffer)) != -1) {  frontendStream.write(buffer, 0, bytesRead);  frontendStream.flush(); // 必须立即刷新,确保数据实时到达前端 } // 结束流式连接 frontendStream.close(); } } catch (Exception e) { e.printStackTrace(); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, \"流式传输失败: \" + e.getMessage()); } }}

扩展

自定义事件类型:

后端发送:event: update\\ndata: {…}\\n\\n

前端监听:.addEventListener(“update”, handler)