> 技术文档 > 使用 OkHttp 或者 WebClient 对接大模型的流式接口_webclient调用流式接口

使用 OkHttp 或者 WebClient 对接大模型的流式接口_webclient调用流式接口

随着大模型(如 GPT、BERT 等)的流行,越来越多的 API 提供商开始提供流式接口,这使得我们能够更高效地处理大规模数据,尤其是在响应体非常大的情况下。本文将探讨如何使用 OkHttpSpring WebClient 两种方式对接大模型的流式接口,实现高效的异步请求处理。

一、流式接口概述

流式接口是一种允许客户端逐步接收服务器响应数据的机制。相比传统的“一次性返回”方式,流式接口通常用于处理大体积数据或者需要长时间处理的请求。客户端通过建立持久连接,持续从服务器获取数据,而无需等待整个响应体加载完成。

大模型 API 的流式接口通常采用 HTTP 2.0WebSocket 协议进行数据流传输,能够以流的方式发送大量的数据块或结果,这对实现低延迟、高吞吐量的请求处理至关重要。

二、使用 OkHttp 对接流式接口

2.1 配置 OkHttp 客户端

OkHttp 是一个强大的 HTTP 客户端,支持异步请求、连接池和流式响应等特性。下面是如何使用 OkHttp 实现对接流式接口的示例。

<dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>4.12.0</version>在这里插入代码片</dependency>
package com.dolphin.bootstrap.agent.streamchat;import com.alibaba.fastjson.JSONObject;import com.dolphin.bootstrap.agent.BaseAgent;import io.micrometer.common.util.StringUtils;import lombok.extern.slf4j.Slf4j;import okhttp3.*;import okio.BufferedSource;import org.jetbrains.annotations.NotNull;import org.springframework.beans.factory.annotation.Value;import org.springframework.http.HttpHeaders;import org.springframework.stereotype.Component;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;import java.util.Collections;import java.util.concurrent.TimeUnit;/** * 利用 OKHttp 库,实现 Stream 流式调用 */@Component@Slf4jpublic class OKHttpStream implements BaseAgent { private static final Integer TIME_OUT = 5 * 60; @Value(\"ai.agent.streamChatUrl\") private String streamChatUrl; @Override public void chatStream(SseEmitter sseEmitter, String question) { // 1. 创建 OkHttpClient 对象,并设置超时时间 OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(TIME_OUT, TimeUnit.SECONDS) .readTimeout(TIME_OUT, TimeUnit.SECONDS) .writeTimeout(TIME_OUT, TimeUnit.SECONDS) .build(); // 封装请求体参数 JSONObject params = new JSONObject(); params.put(\"question\", question); RequestBody requestBody = RequestBody.create(params.toJSONString(), MediaType.parse(\"application/json; charset=utf-8\")); // 封装请求头 Headers headers = new Headers.Builder() .set(\"Content-Type\",\"application/json\") .set(\"Accept\",\"text/event-stream\") .build(); // 2. 构建 Request 对象 Request request = new Request.Builder() .url(streamChatUrl) .headers(headers) .post(requestBody) .build(); // 3. 创建 Call 对象 Call call = client.newCall(request); // 4. 监听回调 call.enqueue(new Callback() { @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { log.error(\"进入 onFailure方法 {}\", e.getMessage(), e); sseEmitter.completeWithError(e); } @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { if (response.isSuccessful()) {  String chunkMessage = \"\";  try (ResponseBody responseBody = response.body();) { BufferedSource source = responseBody.source(); while (!source.exhausted()) { chunkMessage = source.readUtf8Line(); if (StringUtils.isBlank(chunkMessage)) { continue; } JSONObject jsonObject = JSONObject.parseObject(chunkMessage); if (null != jsonObject && null != jsonObject.getJSONObject(\"data\")) { String answer = jsonObject.getJSONObject(\"data\").getString(\"answer\"); sseEmitter.send(answer); } }  } catch (Exception e) { log.error(\"解析失败: {}\", e.getMessage(), e); sseEmitter.completeWithError(e);  } } else {  log.error(\"onResponse 方法请求失败 {}\", response.message());  // TODO 重新发起请求 } } }); BaseAgent.super.chatStream(sseEmitter, question); }}

2.2 代码解释

​•call.enqueue(new Callback() {…});:异步执行请求,回调函数用于处理响应。

这种方式非常适合大模型 API,因为它能够实时获取并处理每一个数据块,而不需要等待整个响应体加载完成。

三、使用 WebClient 对接流式接口

Spring 5 引入的 WebClient 是基于响应式编程的 HTTP 客户端,特别适用于流式数据处理和高并发场景。WebClient 支持通过 MonoFlux 来处理异步数据流,非常适合与大模型的流式接口对接。

3.1 配置 WebClient

首先,需要在 Spring Boot 项目中配置 WebClient。可以通过 WebClient.Builder 来定制客户端配置。

<dependency> <groupId>org.springframework</groupId> <artifactId>spring-webflux</artifactId> <version>6.1.4</version></dependency>
package com.dolphin.bootstrap.agent.streamchat;import com.alibaba.fastjson.JSONObject;import com.dolphin.bootstrap.agent.BaseAgent;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.http.HttpHeaders;import org.springframework.http.HttpStatus;import org.springframework.http.MediaType;import org.springframework.web.reactive.function.client.WebClient;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import reactor.core.publisher.Flux;import java.io.IOException;import java.util.Collections;/** * 利用 WebClient 类,实现 Stream 流式调用 */public class WebClientStream implements BaseAgent { private static final Logger log = LoggerFactory.getLogger(WebClientStream.class); @Value(\"ai.agent.streamChatUrl\") private String streamChatUrl; @Override public void chatStream(SseEmitter sseEmitter, String question) { // 创建 WebClient 客户端 WebClient webClient = WebClient.builder().baseUrl(streamChatUrl).build(); // 封装参数 JSONObject params = new JSONObject(); params.put(\"question\", question); // 封装请求头 HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON_UTF8); // accept 一定要设置为 TEXT_EVENT_STREAM headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM)); Flux<String> eventStream = webClient .post() .uri(\"/stream\") .accept(MediaType.valueOf(\"text/event-stream;charset=UTF-8\")) // 一定要设置 .headers(httpHeaders -> httpHeaders.addAll(headers)) .bodyValue(params.toJSONString()) .retrieve() .bodyToFlux(String.class); eventStream.subscribe( data -> { // data 有什么key,value。具体看你对接agent的文档 try {  JSONObject bodyJson = JSONObject.parseObject(data);  if (bodyJson.getIntValue(\"code\") == HttpStatus.OK.value()) { // 事件类型 String event = bodyJson.getString(\"event\"); // agent回答 String answer = bodyJson.getJSONObject(\"data\").getString(\"answer\"); if (\"message\".equals(event)) { // 中间一段一段的消息 sseEmitter.send(answer); } else if (\"end\".equals(event)) { // 最后会全部返回 sseEmitter.send(answer); }  } } catch (IOException e) {  throw new RuntimeException(e); } }, error -> { sseEmitter.completeWithError(error); log.error(\"报错了:{}\", error.getMessage(), error); }, sseEmitter::complete ); }}

3.2 代码解释

​•bodyValue :添加请求体。

​•retrieve :执行请求。

​•bodyToFlux :将响应体处理为 Flux,Flux 代表一个可以包含多个元素的数据流。。

​•subscribe :非阻塞,注册回调函数。第一个回调:正常数据返回。第二个回调:出错。第三个回调:请求完成。

3.3 错误处理

WebClient 还提供了完善的错误处理机制,可以通过 .onStatus() 或 .onErrorResume() 方法捕获 HTTP 错误或其他异常情况。

webClient.get() .uri(\"/\") .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException(\"客户端错误\"))) .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new RuntimeException(\"服务器错误\"))) .bodyToFlux(String.class) .doOnNext(data -> System.out.println(\"接收到数据块:\" + data)) .subscribe();

四、OkHttp 与 WebClient 的对比

特性 OkHttp WebClient 编程模型 阻塞或异步操作 响应式编程,支持异步和流式处理 并发处理 支持异步请求,但不具备响应式特性 支持高并发,且更适合流式数据处理 适用场景 传统的 HTTP 请求,适合简单的客户端 高并发、响应式编程,流式接口等 集成生态 单独的 HTTP 客户端库 与 Spring WebFlux 集成,适用于微服务架构

选择建议:

​•如果你已经在使用 Spring Boot,且需要处理流式接口,WebClient 是更推荐的选择。它与 Spring WebFlux 配合默契,能处理高并发和复杂的数据流场景。

​•如果你只是需要处理简单的流式 HTTP 请求,OkHttp 是一个非常轻量且功能强大的库,适合用于快速集成。

五、总结

无论是 OkHttp 还是 WebClient,它们都提供了高效的流式数据处理能力,能够帮助我们在与大模型 API 对接时减少等待时间,提高响应速度和吞吐量。选择哪个工具取决于你的项目需求和技术栈,如果你在使用 Spring Boot 或 WebFlux,WebClient 无疑是一个理想的选择。

酷家家居商城