> 技术文档 > 使用 UniApp 在微信小程序中实现 SSE 流式响应_uniapp sse

使用 UniApp 在微信小程序中实现 SSE 流式响应_uniapp sse

在这里插入图片描述

概述

服务端发送事件(Server-Sent Events, SSE)是一种允许服务器向客户端推送实时更新的技术。SSE 提供了一种单向的通信通道,服务器可以持续地向客户端发送数据,而不需要客户端频繁发起请求。这对于需要实时更新的应用场景非常有用。

流式传输的特点是将数据逐步传输给客户端,而不需要等待完整的响应生成。这意味着在传输过程中,数据会逐步发送给客户端,而不是一次性发送所有数据,对于基于文本的AI对话来说,这意味着每个单词或短语可以随着模型预测它们时即时显示出来,从而营造出一种更加自然和动态的交流体验。

最近在对接大模型对话生成接口,查找官方文档中并没有找到明确的实现说明,本文根据 Uniapp 及微信小程序开发文档,基于 uni.request 实现了一个简单的SSE客户端,微信小程序真机测试中可正常运行。

准备

服务端已提供SSE接口,可通过 Apifox 直接访问进行测试。

Uniapp 客户端实现

要实现在微信小程序中接收 SSE 流式响应,我们需要做几个关键步骤:

  1. 配置 HTTP 请求:设置适当的请求头和参数,以确保服务器知道我们期望的是流式响应。
  2. 处理分块数据:由于 SSE 是分块传输的,我们需要监听每个数据块,并适当地解析它们。
  3. 错误和完成处理:定义当遇到错误或完成时的行为。

下面是一个使用 uni.request API 实现 SSE 的例子:

  1. 解析数据
let buffer = \'\'function decode(data: ArrayBuffer): string { // 根据协议对数据进行解析,省略... // 注意数据可能是不连续的,需要通过 buffer 进行拼接 const text = safeDecodeUTF8(data) // 自己实现的解码方法,见文末附录 buffer += text const messages = buffer.split(\'\\n\\n\').filter((chunk) => chunk.trim().length > 0) if (!buffer.endsWith(\'\\n\\n\') && messages.length > 0) { buffer = messages.pop() || \'\' } else { buffer = \'\' } return messages .filter((chunk) => chunk.includes(\'data:\')) .map((chunk) => { const parsed = JSON.parse(chunk.replace(/^data:\\s*/, \'\')) return parsed.data }) .join(\'\')}```js2. 发送请求,`uni.request` 不行可以尝试替换为 `wx.request````jsfunction streamPost(url, data, onData, onError = null, onComplete = null) { function onChunkReceived(res) { onData(decode(res.data)) } function onHeadersReceived(res) { console.log(\'onHeadersReceived\', res) } const requestTask = uni.request({ url: baseUrl + apiPath + url, method: \'POST\', header: { Accept: \'text/event-stream\', // 确保服务器知道我们期望的是流式响应 Authorization: uni.getStorageSync(\'token\'), // ...其他参数 }, data, enableChunked: true, // onChunkReceived, 否则走success() responseType: \'arraybuffer\', success: (res) => { console.log(\'Data received:\', res.data) // 开启 enableChunked 时仅最后一次会走这个 }, fail: (error) => { // 错误处理 if (onError) { onError(error) } console.error(\'SSE failed:\', error) }, complete: () => { // 完成接收。 开启 enableChunked 开发者工具中测试未走 complete 回调,真机正常 if (onComplete) { onComplete() } if (onHeadersReceived) { requestTask?.offHeadersReceived(onHeadersReceived) } if (onChunkReceived) { // @ts-expect-error uni-app types lost requestTask?.offChunkReceived(onChunkReceived) } }, }) if (onHeadersReceived) { requestTask.onHeadersReceived(onHeadersReceived) } if (onChunkReceived) { // @ts-expect-error uni-app types lost requestTask.onChunkReceived(onChunkReceived) // 注册数据接收响应函数 } return requestTask // 外部可通过 requestTask.abort(); 主动结束}
  1. 组件中调用
const doAiGenerate = () => { state.form.text = \'\' task = MyApi.generateCommentStream( { workId: model.comment?.workId, prompt: prompt.value, type: type.value }, (res: string) => { if (res) state.form.text += res }, (err) => console.log(err), () => setLoading(false) )}// 手动终止const onStop = () => { task && task.abort() setLoading(false)}

在 nginx 中开启transfer_encoding, 同时关闭缓存 proxy_buffering。

location /ai/chat/stream { proxy_pass http://127.0.0.1:8080; proxy_set_header Transfer-Encoding \"\"; chunked_transfer_encoding on; proxy_buffering off;}

总结

  • 开启:enableChunked: true
  • 设置请求 Header:Accept: \'text/event-stream\'
  • 注册数据接收响应函数: requestTask.onChunkReceived(onChunkReceived)
  • 主动结束: requestTask.abort()
  • 分块数据解析:decode()

通过以上步骤,我们成功地在 UniApp 中实现了 SSE 流式响应,增强了应用程序的实时交互能力。希望这篇文章能为你在 UniApp 中集成实时数据更新功能提供有价值的参考。

参考

  • uniapp api 文档: https://uniapp.dcloud.net.cn/api/request/request.html
  • 小程序开发文档:wx.request https://developers.weixin.qq.com/miniprogram/dev/api/network/request/RequestTask.onChunkReceived.html

附录

分包体积过大,自己实现解码方法
//import { TextDecoder } from ‘text-encoding’
// const decoder = new TextDecoder(‘utf-8’)
// const text = decoder.decode(data)

function safeDecodeUTF8(arrayBuffer: ArrayBuffer) { const bytes = new Uint8Array(arrayBuffer) let str = \'\' let i = 0 while (i < bytes.length) { try { const byte = bytes[i] if (byte < 0x80) { str += String.fromCharCode(byte) i++ } else if (byte >= 0xc0 && byte < 0xe0) { if (i + 1 >= bytes.length) throw new Error() str += String.fromCharCode(((byte & 0x1f) << 6) | (bytes[i + 1] & 0x3f)) i += 2 } else if (byte >= 0xe0 && byte < 0xf0) { if (i + 2 >= bytes.length) throw new Error() str += String.fromCharCode(((byte & 0x0f) << 12) | ((bytes[i + 1] & 0x3f) << 6) | (bytes[i + 2] & 0x3f)) i += 3 } else if (byte >= 0xf0) { if (i + 3 >= bytes.length) throw new Error() const codePoint = ((byte & 0x07) << 18) | ((bytes[i + 1] & 0x3f) << 12) | ((bytes[i + 2] & 0x3f) << 6) | (bytes[i + 3] & 0x3f) str += String.fromCodePoint ? String.fromCodePoint(codePoint) : \'��\' i += 4 } else { str += \'�\' i++ } } catch (e) { str += \'�\' i++ } } return str}

欢迎合作

最近业余在做的个人项目:https://www.aaronzzh.cn

如果这篇文章对您有所帮助,欢迎点赞、分享和留言,让更多的人受益。感谢您的细心阅读,如果发现了任何错误或需要补充的地方,请随时告诉我,我会尽快处理 _