> 技术文档 > 流式接口,断点续传解决方案及实现

流式接口,断点续传解决方案及实现

背景:用户刷新页面或者切换tab页后断链的续流问题

目前有两种方案:

方案一:后端:Mongo + 实时存 (数据库压力大,每个流都要进行一次入库操作)    前端轮询

方案二:后端:Redis + Mongo (推荐, 流的过程中使用Redis,流数据结束后再去入一次库)

              前端:调新接口

方案一实现:

public SseEmitter sendinfo (QuestionnaireDTO dto) { Flux flux = algorithmUtils.code_bswj(dto, webClient); flux .doOnError(e -> {  try { completeEmitter(emitter, e, isCompleted);  } catch (IOException ex) { throw new RuntimeException(ex);  } })// 处理客户端断开连接 .doOnComplete(() -> {  try { completeEmitter(emitter, null, isCompleted);  } catch (IOException e) { throw new RuntimeException(e);  } }) // 传null表示正常完成 .subscribe(data -> {  try { sendDataToEmitter(emitter, data, isCompleted);  } catch (IOException e) { throw new RuntimeException(e);  } });// 订阅 Flux 并发送数据到 SseEmitter return emitter;}private void sendDataToEmitter(SseEmitter emitter, String data, AtomicBoolean isCompleted) throws IOException { if (!isCompleted.get()) { try { processChunk(data, questionnaire,isCompleted, traceId); } catch (IOException | JSONException e) { if (e instanceof ClientAbortException) {  this.emitter = new SseEmitterUTF8(1000000L);  return; } completeEmitter(emitter, e, isCompleted,questionnaire); // 处理发送过程中的异常 } } }private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted) throws IOException, JSONException { System.out.println(\"chunk handing before =========>:\" + chunk); if (!StringUtils.hasText(chunk)) return; JSONObject jsonObject = new JSONObject(chunk); if (!jsonObject.getBoolean(\"is_success\")) { emitter.send(jsonObject.getString(\"err_msg\")); return; } String result = jsonObject.getString(\"results\"); JSONObject jsonObjectRes = new JSONObject(result); String type = jsonObjectRes.getString(\"type\");//answer String value = jsonObjectRes.getString(\"value\"); JSONObject response = new JSONObject(); JSONObject results = new JSONObject(); if (type.equals(\"progress_indicator\")) { JSONObject valueJson = new JSONObject(value); String text = valueJson.getString(\"text\"); if (text.equals(\"[DONE]\")) { questionnaireRepository.save(questionnaire); completeEmitter(emitter, null, isCompleted, questionnaire); return; } results.put(\"type\", \"progress_indicator\"); results.put(\"data\", text); if (isBase) { String progressBase = questionnaire.getProgressBase(); progressBase = progressBase == null ? \"\" : progressBase; progressBase += text; questionnaire.setProgressBase(progressBase); } else { String progressCustom = questionnaire.getProgressCustom(); progressCustom = progressCustom == null ? \"\" : progressCustom; progressCustom += text; questionnaire.setProgressCustom(progressCustom); } questionnaireRepository.save(questionnaire); } else if (type.equals(\"survey\")) { JSONObject valueJson = new JSONObject(value); String customSurvey = valueJson.getString(\"custom_survey\"); questionnaire.setCustomQuestion(customSurvey); questionnaireRepository.save(questionnaire); results.put(\"type\", \"survey\"); results.put(\"data\", customSurvey); }else if (type.equals(\"finished_thinking\")) { JSONObject valueJson = new JSONObject(value); String text = valueJson.getString(\"text\"); results.put(\"type\", \"finished_thinking\"); results.put(\"data\", text); if (isBase) { questionnaire.setStatus(\"SurveyGenerating\"); }else { questionnaire.setStatus(\"MapGenerating\"); } questionnaireRepository.save(questionnaire); } response.put(\"results\", results); if (!isCompleted.get()){ System.out.println(\"后端发送给前端时间:\" + TimeUtil.getCurrentTime()); emitter.send(response.toString()); } }

通过实时插入数据库,前端感知到刷新或者切换tab后,根据状态轮询调用历史记录接口,因为此时后端与算法的流还没有断开,所以是在实时保存的。前端此时轮询历史接口可以伪造出流式输出的形式,使用户无感知。

方案二实现:(代码几乎同上,就是在处理流式的方法里做了改动)

private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted, String traceId) throws IOException, JSONException { System.out.println(\"chunk handing before =========>:\" + chunk); if (!StringUtils.hasText(chunk)) return; JSONObject jsonObject = new JSONObject(chunk); if (!jsonObject.getBoolean(\"is_success\")) { emitter.send(jsonObject.getString(\"err_msg\")); return; } String result = jsonObject.getString(\"results\"); JSONObject jsonObjectRes = new JSONObject(result); String type = jsonObjectRes.getString(\"type\");//answer String value = jsonObjectRes.getString(\"value\"); JSONObject response = new JSONObject(); JSONObject results = new JSONObject(); if (type.equals(\"progress_indicator\")) { JSONObject valueJson = new JSONObject(value); String text = valueJson.getString(\"text\"); if (text.equals(\"[DONE]\")) { questionnaireRepository.save(questionnaire); //设置redis中key为questionnaire.getId()的过期时间为5分钟,目前redis中是有这个key的// redisService.expire(questionnaire.getId(), 1); completeEmitter(emitter, null, isCompleted, questionnaire); return; } results.put(\"type\", \"progress_indicator\"); results.put(\"data\", text); if (isBase) { String progressBase = questionnaire.getProgressBase(); progressBase = progressBase == null ? \"\" : progressBase; progressBase += text; questionnaire.setProgressBase(progressBase); } else { String progressCustom = questionnaire.getProgressCustom(); progressCustom = progressCustom == null ? \"\" : progressCustom; progressCustom += text; questionnaire.setProgressCustom(progressCustom); } } else if (type.equals(\"survey\")) { JSONObject valueJson = new JSONObject(value); String customSurvey = valueJson.getString(\"custom_survey\"); questionnaire.setCustomQuestion(customSurvey); questionnaireRepository.save(questionnaire); accompanyLearningLog.uploadLogByTranceId(\"processChunk\", \"后端处理定制问卷\", \"INFO\", JsonUtil.object2Json(customSurvey),traceId); results.put(\"type\", \"survey\"); results.put(\"data\", customSurvey); } else if (type.equals(\"text\")) { JSONObject valueJson = new JSONObject(value); String text = valueJson.getString(\"text\"); results.put(\"type\", \"text\"); results.put(\"data\", text); questionnaire.setPlanCustom(text); accompanyLearningLog.uploadLogByTranceId(\"processChunk\", \"后端处理多链路地图\", \"INFO\", JsonUtil.object2Json(text),traceId); questionnaireRepository.save(questionnaire); }else if (type.equals(\"finished_thinking\")) { JSONObject valueJson = new JSONObject(value); String text = valueJson.getString(\"text\"); results.put(\"type\", \"finished_thinking\"); results.put(\"data\", text); if (isBase) { questionnaire.setStatus(\"SurveyGenerating\"); }else { questionnaire.setStatus(\"MapGenerating\"); } questionnaireRepository.save(questionnaire); } response.put(\"results\", results); //这里有了一个新的逻辑,把思维链保存的redis的队列中,然后如果前端断开连接,想要做断点续传,就从队列中取出,然后继续,队列的key是questionnaire.getId() // 把思维链保存到 Redis 队列中 redisService.rightPush(questionnaire.getId(), response.toString()); if (!isCompleted.get()){ System.out.println(\"后端发送给前端时间:\" + TimeUtil.getCurrentTime()); emitter.send(response.toString()); } }

前端刷新页面或者来回切换了tab后会调用我的新接口,也是一个流式输出的接口

 /** * 断点续传方法,用于从 Redis 中获取数据并通过 SseEmitter 发送给前端。 * 如果数据库中有符合条件的数据则直接处理,否则从 Redis 中获取数据。 */ @Override public SseEmitter refreshStream(QuestionnaireDTO dto, String userId, String traceId) { // 创建一个 SseEmitter 对象,设置超时时间为 1000000 毫秒 SseEmitter emitter = new SseEmitterUTF8(1000000L); // 先从数据库取数据,如果取到了就不查redis了 /*Questionnaire questionnaire = questionnaireRepository.findByUserId(userId); if (questionnaire != null) { try { if(questionnaire.getStatus().equals(\"baseCompleted\")||questionnaire.getStatus().equals(\"customCompleted\")){  // 若问卷状态为 baseCompleted 或 customCompleted,发送 [DONE] 并完成 SseEmitter  emitter.send(\"[DONE]\");  emitter.complete(); } return emitter; } catch (Exception e) { // 处理异常,将错误信息记录到日志并完成 SseEmitter emitter.completeWithError(e); accompanyLearningLog.uploadLogByTranceId(\"refreshStream\", \"从数据库获取数据发送失败\", \"ERROR\", JsonUtil.object2Json(e), traceId); return emitter; } }*/ // 获取 Redis 中数据列表的键 String key = dto.getId(); // 从redis中取key为dto.getId()的列表的长度 Long listLength = redisService.size(key); // 若 Redis 列表长度不为空且大于 0 if (listLength != null && listLength > 0) { // 记录这个长度 final long[] initialLength = {listLength}; // 从redis中取列表中的数据 List redisDataList = redisService.range(key, 0, initialLength[0] - 1); // 用于拼接 type 为 progress_indicator 的 data 数据 StringBuilder progressData = new StringBuilder(); // 标记 SseEmitter 是否已经完成 AtomicBoolean isEmitterCompleted = new AtomicBoolean(false); // 遍历 Redis 数据列表 for (Object data : redisDataList) { try {  // 将数据转换为 JSONObject  JSONObject jsonData = new JSONObject(data.toString());  // 获取 results 字段  JSONObject results = jsonData.getJSONObject(\"results\");  // 获取 type 字段  String type = results.getString(\"type\");  if(\"generate_indicator\".equals(type)){ // 若 type 为 generate_indicator,直接发送数据 emitter.send(data.toString());  }else if (\"progress_indicator\".equals(type)) { // 若 type 为 progress_indicator,拼接 data 数据 progressData.append(results.getString(\"data\"));  } else { // 若 SseEmitter 未完成,发送非 progress_indicator 类型的数据 if (!isEmitterCompleted.get()) { System.out.println(\"data:\" + data.toString()); emitter.send(data.toString()); }  } } catch (JSONException e) { }catch (IOException e) {  // 处理发送数据时的 IO 异常,完成 SseEmitter 并记录日志  emitter.completeWithError(e); } } // 以{\"results\":{\"type\":\"progress_indicator\",\"data\":\"所有的data\"}}格式推给前端 try { // 若 progressData 不为空 if (progressData.length() > 0) {  // 创建响应 JSON 对象  JSONObject response = new JSONObject();  JSONObject results = new JSONObject();  results.put(\"type\", \"progress_indicator\");  results.put(\"data\", progressData.toString());  response.put(\"results\", results);  System.out.println(\"response:\"+response.toString());  // 若 SseEmitter 未完成,发送拼接后的 progress_indicator 数据  if (!isEmitterCompleted.get()) { emitter.send(response.toString());  } } // 从记录的长度开始,后面的就不需要拼接了,直接取到后推给前端就可以了 // 使用数组包装 subscription final Disposable[] subscription = new Disposable[1]; // 每秒检查一次 Redis 列表长度是否有变化 subscription[0] = Flux.interval(Duration.ofSeconds(1)) .subscribe(interval -> { // 获取当前 Redis 列表长度 Long currentLength = redisService.size(key); if (currentLength != null && currentLength > initialLength[0]) { // 获取新增的数据 List newDataList = redisService.range(key, initialLength[0], currentLength - 1); for (Object newData : newDataList) {  try {  if(newData.toString().equals(\"[DONE]\")){// 若数据为 [DONE],发送数据,完成 SseEmitter,取消订阅并删除 Redis 键if (!isEmitterCompleted.get()) { emitter.send(newData.toString()); emitter.complete(); isEmitterCompleted.set(true); // 取消订阅 if (subscription[0] != null) { subscription[0].dispose(); } redisService.delete(key);}return;  }  System.out.println(\"newData:\"+newData.toString());  // 若 SseEmitter 未完成,发送新增数据  if (!isEmitterCompleted.get()) {emitter.send(newData.toString());try { // 线程休眠 50 毫秒 Thread.sleep(50);} catch (InterruptedException e) { // 处理线程休眠中断异常,完成 SseEmitter Thread.currentThread().interrupt(); emitter.completeWithError(e); }  }  } catch (IOException e) {  emitter.completeWithError(e); } } // 更新初始长度 initialLength[0] = currentLength; } }); } catch (Exception e) { // 处理异常,完成 SseEmitter 并记录日志 emitter.completeWithError(e); } } else { try { // 若 Redis 列表长度为空或为 0,完成 SseEmitter emitter.complete(); } catch (Exception e) { // 处理异常,完成 SseEmitter 并记录日志 emitter.completeWithError(e); } } return emitter; }