> 技术文档 > uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

uniapp在微信小程序中实现 SSE 流式响应_uniapp sse


前言

最近需要使用uniapp开发一个智能对话页面,其中就需要使用SSE进行通信。

本文介绍下在uniapp中如何基于uni.request实现SSE流式处理。

在线体验

#小程序:yinuosnowball

SSE传输格式

返回输出的流式块:

  1. Content-Type为text/event-stream

  2. 每个流式块均为 data: 开头,块之间以 \\n\\n 即两个换行符分隔, 如下所示:
    uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

  3. 后端接口定义的数据如下: - event为message,开始接收数据,answer为返回的结果 - event为message_end结束

uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

接口数据已约定完成,下一步使用uniapp开始接收处理数据。

uniapp处理数据

客户端实现在微信小程序中接收 SSE 流式响应,需要以下步骤:

  1. 配置 HTTP 请求:设置适当的请求头和参数,以确保服务器返回流式响应。
  2. 处理分块数据:由于SSE是分块传输的,我们需要监听每个数据块,并解析它们。
  3. 错误处理:当每一次返回的最后出现不是完整的响应时,需要进行特殊处理。
  4. 完成时:可以进行追问等额外处理

下面使用 uni.request实现SSE的案例:

基本框架:

const requestTask = uni.request({ url, method: \'POST\', header: { Accept: \'text/event-stream\', Authorization, }, data, enableChunked: true, responseType: \'arraybuffer\', success: (res) => { console.log(\'Data received 数据接受完毕:\', res.data) }, fail: (error) => { console.log(\'打印***error 错误处理\', error) }, complete: (complete) => { console.log(\'打印***complete 完成接收\', complete) }})requestTask.onChunkReceived((res)=>{ // 处理数据})

通过对requestTaskonChunkReceived监听就可以得到数据块,通过打印我们可以看到数据返回是ArrayBuffer,我们需要进行处理。

uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

 const uint8Array = new Uint8Array(res.data); let text = String.fromCharCode.apply(null, uint8Array);

解析后得到以data:data:返回的格式,

此处需要注意解析后是data: 还是 data:data:格式

uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

进一步处理:由于返回的数据块不是一段一段,而是很多段都返回,因此我们需要进行\\n\\n进行拆分,然后逐个解析:

uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

const arr = text.split(\'\\n\\n\').filter(Boolean)arr.forEach(msg => { const jsonStr = msg.substring(11); // 去掉 \'data:data: \' 前缀 const data = JSON.parse(jsonStr); switch (data.event) { case \'message\': { // 拼接返回文本 this.dialogueList[existingMessageIndex].answer += data.answer; break; } case \'message_end\': // 消息结束 break; }});

至此,我们就可以接收到消息,如果就这样那就最好,但对接的过程发现,每一次返回的文本最后一段不是完整的,导致解析出现失败,如下图所示:

uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

解决方案是: 定义一个变量,当解析出现失败时,肯定是最后一段,进行存储,下一次接收到数据将上一次存储的进行拼接,然后解析:

uniapp在微信小程序中实现 SSE 流式响应_uniapp sse

具体代码:

// 每次发送存储数据const msgObj = { query, answer: \"\", conversationId: null, isDone: false}this.dialogueList.push(msgObj)// 保存上一次失败的textlet lastText = \'\'requestTask.onChunkReceived((res) => { // 第一步:获取 字符串 数组 const uint8Array = new Uint8Array(res.data); let text = lastText + String.fromCharCode.apply(null, uint8Array); lastText = \'\'; let arr = text.split(\'\\n\\n\').filter(Boolean) let lastIndex = arr.length - 1 // 第二步:是否可以直接进行解析 try { // 判断是否可以全部解析完成 arr.every(item => JSON.parse(item.substring(11))) } catch (error) { // 如果报错截取最后一项 lastText = arr[lastIndex] arr = arr.filter((_, i) => i !== lastIndex) } // 处理数据块 if (arr.length) { try { arr.forEach(msg => { const jsonStr = msg.substring(11); // 去掉 \'data: \' 前缀 const data = JSON.parse(jsonStr); const existingMessageIndex = this.dialogueList.findIndex(item => item === msgObj); switch (data.event) { case \'message\': { // 查找是否存在相同ID的消息 this.dialogueList[existingMessageIndex].answer += data.answer; break; }  case \'message_end\': // 消息结束 break; } }); } catch (error) { console.error(\'解析数据失败:\', error); } }});

这样就完成了对不连续返回的错误处理。

如果需要直接结束请求,可以直接使用requestTask.abort()

完整代码

function sendMsg(query) { const msgObj = { query, // 问题 answer: \"\", // 回答的结果 conversationId: null, feedback: null, isDone: false // 自定义格式,用于加载处理 } this.dialogueList.push(msgObj) // 请求参数 const data = {} this.requestTask = uni.request({ url, method: \'POST\', header: { Accept: \'text/event-stream\', Authorization: getStorage(tokenKeyEnum.zhonglv), }, data, enableChunked: true, responseType: \'arraybuffer\', success: (res) => { console.log(\'Data received 数据接受完毕:\', res.data) }, fail: (error) => { console.log(\'打印***error 错误处理\', error) }, complete: (complete) => { console.log(\'打印***complete 完成接收\', complete) } }) let lastText = \'\' this.requestTask.onChunkReceived((res) => { // 第一步:获取 字符串 数组 const uint8Array = new Uint8Array(res.data); let text = lastText + String.fromCharCode.apply(null, uint8Array); lastText = \'\'; let arr = text.split(\'\\n\\n\').filter(Boolean) let lastIndex = arr.length - 1 // 第二步:是否可以直接进行解析 try { let isCanResolve = arr.every(item => JSON.parse(item.substring(11))) console.log(\'打印***isCanResolve\', isCanResolve) } catch (error) { // 如果报错截取最后一项 lastText = arr[lastIndex] arr = arr.filter((_, i) => i !== lastIndex) console.log(\'打印***error\', error) } // 处理数据块 if (arr.length) { try { arr.forEach(msg => { const jsonStr = msg.substring(11); // 去掉 \'data:data: \' 前缀 const data = JSON.parse(jsonStr); const existingMessageIndex = this.dialogueList.findIndex(item => item === msgObj); switch (data.event) { case \'message\': {  // 查找是否存在相同ID的消息  this.dialogueList[existingMessageIndex].answer += data.answer;  break; } case \'message_end\':  // 消息结束  this.dialogueList[existingMessageIndex].isDone = true;  break; } }); } catch (error) { console.error(\'解析数据失败:\', error); } } });}

总结

最后总结一下,在uniapp中使用uni.request处理流式响应,主要步骤有:

  • 开启:enableChunked: true
  • 设置请求Header:Accept: \'text/event-stream\'
  • 注册数据接收响应函数: requestTask.onChunkReceived(onChunkReceived)
  • 分块数据解析 String.fromCharCode
  • 处理不连续返回问题
  • 结束:requestTask.abort()

希望对你有所帮助,如有错误,请指正 O^O!


参考文档

  • uni.request文档: uniapp.dcloud.net.cn/api/request…