> 技术文档 > 鸿蒙Next对接Deepseek,实现流式会话聊天_鸿蒙流式输出

鸿蒙Next对接Deepseek,实现流式会话聊天_鸿蒙流式输出


一、SSE流式支持

这是最基本的前提,不过搜了一下,发现官方例子有支持,但是只是GET,所以自己写了一个:

import http from \'@ohos.net.http\'import util from \'@ohos.util\'interface EventSourceOptions { method?: http.RequestMethod headers?: Record body?: string}interface EventSourceEvent { data: string type?: string}interface RequestOptions { method: http.RequestMethod header: Record readTimeout: number extraData?: string}type EventCallback = (event: EventSourceEvent) => voidexport default class CustomEventSource { private httpRequest: http.HttpRequest private url: string private options: EventSourceOptions private onmessage: EventCallback | null = null private onerror: ((error: Error) => void) | null = null private oncomplete: (() => void) | null = null private buffer: string = \'\' private isActive: boolean = false private decoder: util.TextDecoder = new util.TextDecoder() constructor(url: string, options: EventSourceOptions = {}) { this.url = url this.options = options this.httpRequest = http.createHttp() this.connect() } private connect() { this.isActive = true const headers: Record = { \'Accept\': \'text/event-stream\', \'Cache-Control\': \'no-cache\' } if (this.options.headers) { // 合并headers headers[\'Content-Type\'] = this.options.headers[\'Content-Type\'] || headers[\'Content-Type\'] || \'\' headers[\'Accept\'] = this.options.headers[\'Accept\'] || headers[\'Accept\'] || \'\' headers[\'Cache-Control\'] = this.options.headers[\'Cache-Control\'] || headers[\'Cache-Control\'] || \'\' headers[\'Connection\'] = this.options.headers[\'Connection\'] || \'\' } const requestOptions: RequestOptions = { method: this.options.method || http.RequestMethod.GET, header: headers, readTimeout: 0 } if (this.options.body) { requestOptions.extraData = this.options.body } // 设置数据接收监听 this.httpRequest.on(\'dataReceive\', (data: ArrayBuffer) => { if (!this.isActive) return try { const chunk = this.decoder.decode(new Uint8Array(data)) this.processChunk(chunk) } catch (e) { this.handleError(new Error(\'Failed to decode chunk\')) } }) // 发起请求 let promise = this.httpRequest.requestInStream( this.url, requestOptions ) promise.then((data: number) => { if (this.oncomplete) { this.oncomplete() } }).catch((err: Error) => { this.handleError(err) }); } private processChunk(chunk: string) { this.buffer += chunk const lines = this.buffer.split(\'\\n\') this.buffer = lines[lines.length - 1] || \'\' for (let i = 0; i  void) { this.onerror = callback } public set onComplete(callback: () => void) { this.oncomplete = callback }}

二、Deepseek服务端

Deepseek本地化部署方式很多,通过Ollama本地部署DeepSeek R1以及简单使用的教程(超详细) - Qubernet - 博客园

SSE目前服务端基本上都支持的了,不管是Java还是Python还是各种,都支持流式返回,下面是个Python的例子

2.1 使用FastApi创建api服务
from fastapi import FastAPI, Requestfrom fastapi.middleware.cors import CORSMiddlewarefrom sse_starlette.sse import EventSourceResponsefrom business.ollama.chat import OllamaChatfrom pydantic import BaseModelclass ChatRequest(BaseModel): query: strapp = FastAPI()# 配置CORSapp.add_middleware( CORSMiddleware, allow_origins=[\"*\"], # 在生产环境中应该设置具体的域名 allow_credentials=True, allow_methods=[\"*\"], allow_headers=[\"*\"],)# 初始化OllamaChat实例chat_instance = OllamaChat()@app.get(\"/\")async def root(): return {\"message\": \"Welcome to the JhmAi API\"}@app.post(\"/chat\")async def chat_endpoint(request: Request, chat_request: ChatRequest): \"\"\" 聊天端点,返回SSE流 \"\"\" return EventSourceResponse(chat_instance.stream_chat(chat_request.query))@app.get(\"/health\")async def health_check(): \"\"\" 健康检查端点 \"\"\" return {\"status\": \"healthy\"}if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=6005)
2.2 使用Ollama库初始化模型
# 初始化 deepseek 模型,启用流式输出self.llm = Ollama( base_url=这里是Ollama服务地址, model=Deepseek模型, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), temperature=0.7,)
2.3 构建提示词以及流式返回
def _build_prompt(self, query: str) -> str: \"\"\"构建提示词\"\"\" return f\"\"\"你是一个专业的AI助手。请回答用户的问题。 用户问题: {query} 规则: 1. 不用重复显示用户的问题是什么,只需要直接回答问题就行。\"\"\"async def stream_chat(self, query: str) -> AsyncGenerator[Dict[str, Any], None]: \"\"\" 流式聊天响应 \"\"\" try: # 构建提示词() prompt = self._build_prompt(query) print(f\"提示词: {prompt}\") # 使用 deepseek 模型生成回答 for chunk in self.llm.stream(prompt): yield {  \"event\": \"message\",  \"data\": json.dumps({ \"content\": chunk, \"type\": \"content\"  }, ensure_ascii=False) } await asyncio.sleep(0.1) except Exception as e: yield { \"event\": \"error\", \"data\": json.dumps({  \"error\": str(e),  \"type\": \"error\" }, ensure_ascii=False) }

三、鸿蒙Next聊天页面

import { BaseConstants, MainButton, PageTitleBuilder } from \"jbase\"import { inputMethod } from \'@kit.IMEKit\'import { AppConfig } from \'../../config/Index\'import { http } from \'@kit.NetworkKit\'import CustomEventSource from \'./EventSource\'interface SSEData { type: string content: string}// 消息数据模型class ChatMessage { id: string content: string isUser: boolean isThinking: boolean isGenerating: boolean constructor(id: string, content: string, isUser: boolean) { this.id = id this.content = content this.isUser = isUser this.isThinking = !isUser this.isGenerating = false }}class Header { public contentType: string public accept: string constructor(contentType: string, accept: string) { this.contentType = contentType this.accept = accept }}@Builderexport function ChatIndexBuilder(name: string, param: Object) { ChatIndex()}// 聊天页面@Componentexport struct ChatIndex { @State private messages: ChatMessage[] = [] @State private inputText: string = \'\' private eventSource: CustomEventSource | null = null private scroller: Scroller = new Scroller() @Builder CustomKeyboardBuilder(){} aboutToCreate() { } aboutToDisappear() { if (this.eventSource) { this.eventSource.close() } } // 更新AI消息内容 private updateAiMessage(messageId: string, content: string) { const index = this.messages.findIndex(msg => msg.id === messageId) if (index !== -1) { const currentMessage = this.messages[index] const newMessage = new ChatMessage( currentMessage.id, currentMessage.content + content, currentMessage.isUser ) // 设置状态 newMessage.isThinking = false newMessage.isGenerating = true // 更新消息 this.messages = this.messages.map((msg, i) => i === index ? newMessage : msg ) } } // 完成AI消息生成 private completeAiMessage(messageId: string) { const index = this.messages.findIndex(msg => msg.id === messageId) if (index !== -1) { const currentMessage = this.messages[index] // 创建新的消息对象,保持内容不变 this.messages = this.messages.map((msg, i) => i === index ? new ChatMessage( currentMessage.id, currentMessage.content, currentMessage.isUser ) : msg ) // 更新消息状态 this.messages[index].isGenerating = false this.messages[index].isThinking = false setTimeout(() => this.scrollToBottom, 10) } } // 发送消息 sendMessage() { if (this.inputText.trim() === \'\') return // 添加用户消息 this.messages = [ ...this.messages, new ChatMessage( Date.now().toString(), this.inputText, true ) ] const userInput = this.inputText // 清空输入框 this.inputText = \'\' // 创建新的AI消息 const aiMessageId = Date.now().toString() this.messages = [ ...this.messages, new ChatMessage(aiMessageId, \'\', false) ] // 滚动到底部 this.scrollToBottom() // 关闭之前的连接 if (this.eventSource) { this.eventSource.close() } // 创建SSE连接 this.eventSource = new CustomEventSource(AppConfig.aiRootUrl + \'/chat\', { method: http.RequestMethod.POST, headers: { \'Content-Type\': \'application/json\' }, body: JSON.stringify({ query: userInput }) }) // 监听消息 this.eventSource.onMessage = (event) => { try { const data: SSEData = JSON.parse(event.data) if (data.type === \'content\') { this.updateAiMessage(aiMessageId, data.content) // 每次更新后滚动到底部 this.scrollToBottom() } } catch (e) { console.error(\'Failed to parse SSE data:\', e.message) } } // 监听错误 this.eventSource.onError = (error: Error) => { console.error(\'SSE Error:\', error.message) this.updateAiMessage(aiMessageId, \'抱歉,连接出错了\') this.eventSource?.close() // 出错时也标记为完成 this.completeAiMessage(aiMessageId) } // 监听完成 this.eventSource.onComplete = () => { console.info(\'SSE completed\') this.completeAiMessage(aiMessageId) this.eventSource?.close() } } // 滚动到底部 private scrollToBottom() { this.scroller.scrollEdge(Edge.Bottom); } @Builder LoadingIndicator() { Row() { LoadingProgress() .width(20) .height(20) .margin({ right: 8 }) Text(\'思考中...\') .fontSize(14) .fontColor(\'#999999\') } .margin({ left: 52, top: 10 }) } @Builder GeneratingIndicator() { Row() { LoadingProgress() .width(16) .height(16) .margin({ right: 4 }) Text(\'生成中...\') .fontSize(14) .fontColor(\'#999999\') } .margin({ left: 52, top: 10 }) } @Builder MessageItem(message: ChatMessage) { Row() { if (message.isUser) { Blank() Column() { Text(message.content) .fontSize(16) .backgroundColor($r(\'app.color.main_color\')) .fontColor(\'#ffffff\') .padding(BaseConstants.MODULE_PADDING) .borderRadius(BaseConstants.BORDER_RADIUS) .margin({ bottom: 4 }) } .alignItems(HorizontalAlign.End) } else { Image($r(\'app.media.avatar12\')) .width(40) .height(40) .borderRadius(BaseConstants.BORDER_RADIUS) .margin({ right: 12 }) .objectFit(ImageFit.Cover) .markAnchor({ x: 0, y: 0 }) .position({ x: 0, y: 0 })  Column() { if (message.isThinking) { this.LoadingIndicator() } else { Text(message.content)  .fontSize(16)  .backgroundColor($r(\'app.color.back_color\'))  .padding({ left: 18, right: 18, top: 14, bottom: 14 })  .borderRadius(BaseConstants.BORDER_RADIUS)  .margin({ bottom: 4, left: 52 })  .wordBreak(WordBreak.BREAK_ALL)  .lineHeight(24) if (message.isGenerating) {  this.GeneratingIndicator() } } } Blank() } } .width(BaseConstants.FULL_WIDTH) .padding({ left: BaseConstants.MODULE_PADDING, right: BaseConstants.MODULE_PADDING, top: 8, bottom: 8 }) .alignItems(VerticalAlign.Top) } build() { NavDestination() { Column() { Scroll(this.scroller) { // 消息列表 List({ scroller: this.scroller, space: 8 }) { ForEach(this.messages, (message: ChatMessage) => {  ListItem() { this.MessageItem(message)  } }) } .width(BaseConstants.FULL_WIDTH) .height(BaseConstants.FULL_HEIGHT) } .scrollBar(BarState.Off) .edgeEffect(EdgeEffect.Spring) .layoutWeight(1) .onClick(() => { inputMethod.getController().hideTextInput() }) // 输入区域 Flex({ alignItems: ItemAlign.Center }) { Column() { SymbolGlyph($r(\'sys.symbol.mic_circle\')).fontSize(38).fontColor([$r(\'app.color.black\')]) } .padding({ right: 12 }) Column() { TextArea({ placeholder: \'请输入消息\', text: this.inputText })  .enterKeyType(EnterKeyType.NEW_LINE)  .borderRadius(0)  .fontSize(16)  .lineHeight(24)  .maxLength(500)  .backgroundColor($r(\'app.color.back_color\'))  .padding({ left: 0, right: 0 })  .heightAdaptivePolicy(TextHeightAdaptivePolicy.LAYOUT_CONSTRAINT_FIRST)  .onChange((value: string) => { this.inputText = value  }) } .backgroundColor($r(\'app.color.back_color\')) .borderRadius(BaseConstants.BORDER_RADIUS) .padding({ left: 16, right: 16, top: 8, bottom: 8 }) .margin({ left: 12, right: 12 }) Column() { Button({ type: ButtonType.Circle, stateEffect: true }) {  SymbolGlyph($r(\'sys.symbol.paperplane_right_fill\')).fontSize(20).fontColor([\'#ffffff\']) }.height(42).width(42) .onClick(() => {  inputMethod.getController().hideTextInput()  this.sendMessage() }) } .padding({ left: 6 }) } .width(BaseConstants.FULL_WIDTH) .padding(BaseConstants.MODULE_PADDING + 2) } .width(BaseConstants.FULL_WIDTH) .height(BaseConstants.FULL_HEIGHT) } .title(PageTitleBuilder($r(\'app.string.new_chat\'))) }}

完整代码:

鸿蒙Next前端源码:

https://gitee.com/476743842/jhm

服务端源码:

https://gitee.com/476743842/jhm-ai