一文读懂WebFlux框架和WebClient响应式http客户端
前言:WebClient 是 Spring WebFlux 模块提供的一个非阻塞的基于响应式编程的进行 Http 请求的客户端工具。WebFlux 对标 SpringMVC,WebClient 相当于 RestTemplate,同时也是 Spring 官方的 Http 请求工具。截止当前时间,目前Spring框架的最新版本是6.2.0。
一、Spring WebFlux简介
Spring WebFlux 是 Spring 5 引入的一个新的响应式 Web 框架,它与 Spring MVC 并行存在,旨在支持构建非阻塞异步通信和响应式流处理的应用程序。WebFlux 基于 Reactive Streams 规范,并使用了 Project Reactor 库作为其实现的基础。
以下是 WebFlux 的一些关键特性和概念:
1.1 响应式编程模型
WebFlux 使用了 Mono 和 Flux 这两种反应类型来表示单值和多值的异步数据序列。它允许开发者以声明式的方式组合异步逻辑,并且能够处理复杂的场景如背压(backpressure)。
1.2 非阻塞 I/O
WebFlux 支持非阻塞输入输出操作,这使得在高并发场景下可以更高效地利用系统资源。默认情况下,WebFlux 使用 Netty 服务器作为其非阻塞服务器,但也可以运行在其他支持 Servlet 3.1+ 规范的容器上
1.3 事件驱动架构
WebFlux 利用了事件驱动的架构来提高系统的吞吐量和响应性。当有数据准备好时才进行处理,而不是为每个请求分配一个线程等待 I/O 操作完成。
1.4 支持多种编程模型
-
注解驱动:类似于 Spring MVC 的注解方式,可以使用
@RestController
、@RequestMapping
等注解定义控制器。 -
函数式编程:提供了函数式端点和路由定义的方式,允许以编程方式定义路由和处理器函数。
1.5 适用场景
-
高并发 Web 应用:对于需要处理大量并发请求的应用来说,WebFlux 提供了一种有效的解决方案。
-
微服务架构中的异步服务:适合微服务间的异步调用,特别是在涉及 I/O 操作时。
-
实时数据流应用:例如 WebSocket 通信、消息推送等实时数据处理场景。
1.6 WebClient
Spring WebFlux 提供了一个新的 WebClient 类,这是一个非阻塞的 HTTP 客户端,可用于发起 HTTP 请求并处理响应。
1.7 兼容性和灵活性
WebFlux 可以与现有的 Spring 生态系统无缝集成,包括 Spring Data、Spring Security 等模块。
如果你正在考虑是否要在项目中采用 WebFlux,那么应该基于你的具体需求来决定。如果你的应用需要处理大量的并发连接,或者你希望利用响应式编程的优势来构建高效的微服务或实时应用程序,那么 WebFlux 可能是一个合适的选择。然而,如果是一个相对简单的 Web 应用,传统的 Spring MVC 可能就足够了。
二、WebClient 简介
WebClient 是 Spring WebFlux 模块提供的一个非阻塞的基于响应式编程的进行 Http 请求的客户端工具。WebFlux 对标 SpringMvc,WebClient 相当于 RestTemplate,同时也是 Spring 官方的 Http 请求工具。
2.1 传统阻塞IO模型 VS 响应式IO模型
-
传统阻塞IO模型 RestTemplate
Spring3.0引入了RestTemplate,SpringMVC或Struct等框架都是基于Servlet的,其底层IO模型是阻塞IO模型。采用阻塞IO模式获取输入数据。每个连接都需要独立的线程,完成数据输入、业务处理、返回。传统阻塞IO模型的问题是,当并发数很大时,就要创建大量线程,占用很大的系统资源。连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。
-
响应式IO模型 WebClient
Spring5中引入了WebClient作为非阻塞式Reactive Http客户端。Spring社区为了解决SpringMVC的阻塞模型在高并发场景下的性能瓶颈,推出了Spring WebFlux,WebFlux底层实现是久经考验的Netty非阻塞IO通信框架。其实WebClient处理单个HTTP请求的响应时长并不比RestTemplate更快,但是它处理并发的能力更强,非阻塞的方式可以使用较少的线程以及硬件资源来处理更多的并发。
所以响应式非阻塞IO模型的核心意义在于,提高了单位时间内有限资源下的服务请求的并发处理能力,而不是缩短了单个服务请求的响应时长。
-
与RestTemplate相比,WebClient的优势
-
非阻塞响应式IO,单位时间内有限资源下支持更高的并发量。
-
支持使用Java8 Lambda表达式函数。
-
支持同步、异步、Stream流式传输。
-
2.2. WebClient Api
WebClient 在 spring 提供的 WebFlux 中
org.springframework.bootspring-boot-starter-webflux
(1)创建实例
- create() 创建实例
WebClient cli = WebClient.create();
- create(String baseUrl) 创建实例并指定 baseURL
WebClient webClient = WebClient.create(\"http://localhost:8080\");
(2)构建器
- WebClient.builder().build() 使用构建器
WebClient build = WebClient.builder().build();
WebClient.builder 的额外配置:
- 创建副本
WebClient client1 = WebClient.builder() .filter(filterA).filter(filterB).build(); WebClient client2 = client1.mutate() .filter(filterC).filter(filterD).build();
一旦构建,WebClient是不可变的。但是,您可以按如下方式克隆它并构建修改后的副本
- 编码器
//默认 256 kbWebClient webClient = WebClient.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)).build();
(3)获取响应
retrieve() 检索方法用于声明如何提取响应。
WebClient client = WebClient.create(\"https://example.org\"); Mono<ResponseEntity> result = client.get() .uri(\"/persons/{id}\", id).accept(MediaType.APPLICATION_JSON) .retrieve() .toEntity(Person.class);
或者只获得 body
WebClient client = WebClient.create(\"https://example.org\"); Mono result = client.get() .uri(\"/persons/{id}\", id).accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Person.class);
要获取解码对象流,请执行以下操作:
Flux result = client.get() .uri(\"/quotes\").accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(Quote.class);
bodyToFlux 和 bodyToMono:
-
bodyToFlux 方法用于将响应结果处理为 Flux 对象,Flux 是 Reactor 框架中表示包含零个、一个或多个元素的异步序列的类。这意味着响应结果可能是一个包含多个元素的流,而不是单个值。
-
bodyToMono 方法用于将响应结果处理为 Mono 对象,Mono 是 Reactor 框架中表示包含零个或一个元素的异步序列的类。这意味着响应结果是一个单个值或者没有值。
accept(MediaType... var1)
响应数据类型
acceptCharset(Charset... var1)
响应字符集
(4)RequestBody
@RequestMapping(\"/body\")public void test5(){ WebClient webClient = WebClient.create(\"http://localhost:8080/api/restful\"); MultiValueMap formData = new LinkedMultiValueMap(); formData.add(\"name\", \"张和\"); formData.add(\"color\", \"blue\"); Mono dogMono = webClient.post() .contentType(MediaType.APPLICATION_FORM_URLENCODED) .bodyValue(formData) .retrieve() .bodyToMono(Dog.class); System.out.println(dogMono.block());} @RequestMapping(\"/body1\")public void test6(){ WebClient webClient = WebClient.create(\"http://localhost:8080/api/json\"); Mono mono = Mono.just(new Dog(\"和\", \"33\")); Mono dogMono = webClient.post() .contentType(MediaType.APPLICATION_JSON) .body(mono, Dog.class) .retrieve() .bodyToMono(Dog.class); System.out.println(dogMono.block());}
-
contentType()
设置请求参数格式
-
body()/bodyValue()
请求参数
(5)过滤器
filter() 方法添加过滤器
public void test7(){ Mono mono = Mono.just(new Dog(\"和\", \"33\")); WebClient webClient = WebClient.builder().filter(new ExchangeFilterFunction() { @Override public Mono filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) { ClientRequest build = ClientRequest.from(clientRequest).body(mono, Dog.class).build(); return exchangeFunction.exchange(build); } }).build(); Mono dogMono = webClient.post() .uri(\"http://localhost:8080/api/json\") .contentType(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Dog.class); System.out.println(dogMono.block());}
(6) 同步
WebClient 的 block() 方法可以通过在末尾阻止结果以同步方式使用
@RequestMapping(\"/filter\")public void test7(){ Mono mono = Mono.just(new Dog(\"和\", \"33\")); WebClient webClient = WebClient.builder().filter(new ExchangeFilterFunction() { @Override public Mono filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) { ClientRequest build = ClientRequest.from(clientRequest).body(mono, Dog.class).build(); return exchangeFunction.exchange(build); } }).build(); Mono dogMono = webClient.post() .uri(\"http://localhost:8080/api/json\") .contentType(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Dog.class); //非阻塞 System.out.println(dogMono.subscribe(dog -> { System.out.println(dog); })); //阻塞 System.out.println(dogMono.block());}
-
subscribe() 非阻塞方式,用于异步处理数据流,非阻塞,适合构建高性能、响应式的应用程序。
-
block() 阻塞方式,用于同步获取数据流的结果,会阻塞线程,适用于简单的测试或需要在同步环境中获取数据的场景。
三、不调用subscribe直接返回flux的区别
在使用响应式编程模型时,比如通过 Project Reactor 的
Flux
或Mono
,直接返回一个Flux
而不调用subscribe()
方法和实际订阅(调用subscribe()
)之间有着重要的区别。理解这些差异对于构建高效的响应式应用程序至关重要。
3.1、直接返回 Flux
当你直接返回一个
Flux
对象而不进行订阅时,实际上你只是创建了一个数据流的定义或者说是一个“蓝图”。这个数据流尚未开始执行,也没有任何数据被生成或消费。
优点:
-
延迟计算:因为数据流未被订阅,所以相关的计算也不会被执行。这可以节省资源直到真正需要数据的时候。
-
组合性:可以在不同的层次上将多个操作组合起来,而不需要立即执行它们。
-
声明性:专注于描述要做什么而不是如何做,使得代码更加简洁、清晰。
示例:
@GetMapping(value = \"/events\", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux getEvents() { return eventService.getEventStream();}
在这个例子中,getEvents()
方法返回一个 Flux
,但并没有调用 subscribe()
。WebFlux 框架会自动处理这个 Flux
并为每个客户端请求订阅它。这种做法常见于Web控制器中,例如Spring WebFlux应用,返回的Flux
被动的由框架负责对象进行订阅,并处理结果。
3.2 subscribe
订阅 Flux
调用
subscribe()
方法意味着你主动开始执行这个数据流,开始产生数据并消费它。这是触发数据流的实际执行点。
作用:
-
启动数据流:只有当您调用
subscribe()
时,数据流才会开始产生数据并传递给订阅者。 -
处理结果:你可以提供回调来处理产生的数据项、错误或者完成信号。
示例:
eventService.getEventStream().subscribe(event -> { System.out.println(\"Received event: \" + event);});
在这个例子中,subscribe()
方法被调用以开始接收来自 getEventStream()
的事件,并在控制台打印出来。
3.3 区别总结
-
执行时机:直接返回
Flux
只是定义了数据流并未执行;调用subscribe()
则触发了数据流的执行。 -
应用场景:直接返回
Flux
常用于构建响应式的API接口,让框架层去管理订阅和数据流的执行;而在需要直接处理数据流的地方(如业务逻辑层),则可能需要显式调用subscribe()
来主动获取数据。 -
资源管理:由于
Flux
是惰性的,只有在订阅后才会开始消耗资源,因此直接返回Flux
可以更好地控制资源的使用。
四、exchange()和retrieve()方法的区别
exchange()
是 Spring WebFlux框架 中WebClient
提供的一个方法,用于手动控制 HTTP 请求的响应处理流程。它返回一个Mono
,允许你对响应进行更细粒度的操作,比如检查状态码、头信息、选择不同的响应体解析方式等。与.retrieve()
不同,.exchange()
不会自动处理响应体或抛出异常,而是让你手动控制整个响应过程。
4.1 基本概念
在使用 WebClient
发起请求时,有两种常见方式:
.retrieve()
.exchange()
Mono
,让你手动处理整个响应过程,包括状态码、错误处理、响应体解析等,更加灵活。4.2 exchange() 的作用和优势,更精细地控制响应处理
-
可以根据响应的状态码做不同的处理。
-
支持自定义错误处理逻辑。
-
支持流式处理响应体(如
text/event-stream
)。 -
可以读取响应头、cookie 等元数据。
4.3 使用示例
示例 1:基本使用 .exchange()
WebClient webClient = WebClient.create(\"https://api.example.com\");webClient.get() .uri(\"/users/1\") .exchange() .flatMap(response -> { if (response.statusCode().is2xxSuccessful()) { return response.bodyToMono(User.class); } else { return Mono.error(new RuntimeException(\"Request failed with status code: \" + response.statusCode())); } }) .subscribe(user -> System.out.println(\"User: \" + user));
(1)总结流程图
GET https://api.example.com/users/1 ↓ exchange() → Mono ↓ flatMap → 判断状态码 ↓ bodyToMono 或 抛出错误 ↓ subscribe → 异步获取 User 对象并打印
(2)解释代码
使用
.exchange()
获取完整响应flatMap()处理响应:使用
.flatMap()
对Mono
进行转换。检查响应状态码是否是 2xx 成功状态:如果是成功响应,调用bodyToMono(User.class)
将响应体反序列化为User
对象。如果不是成功状态码,则返回一个错误信号Mono.error(...)
,中断流并通知订阅者发生了错误。
调用
.subscribe()
表示启动整个响应式流,真正发送请求。这是一个异步非阻塞操作,当服务器返回数据后,回调函数user -> System.out.println(...)
会被执行。如果请求失败(如返回非 2xx 状态码),会触发错误信号,可以通过.onErrorResume()
或.doOnError()
来捕获。
示例 2:处理流式响应(如 Server-Sent Events)
以流式处理方式接收并打印来自服务端的 Server-Sent Events(SSE) 数据。
accept()
方法是 Spring WebFlux 中WebClient
请求构建器(WebClient.RequestHeadersSpec
)的一个方法,用于设置 HTTP 请求头中的Accept
字段(这是 HTTP 协议中标准的请求头字段之一,通常用于 内容协商(Content Negotiation))。设置客户端期望从服务器接收的响应内容类型(即告诉服务器:“我接受什么样的数据格式”)。
webClient.get() .uri(\"/stream\") .contentType(MediaType.APPLICATION_JSON) // 发送的是 JSON 数据 .accept(MediaType.TEXT_EVENT_STREAM) // 希望返回流式数据 .exchange() .flatMapMany(clientResponse -> clientResponse.bodyToFlux(String.class)) .subscribe(data -> System.out.println(\"Received: \" + data));
示例 3:读取响应头信息
webClient.get() .uri(\"/info\") .exchange() .flatMap(response -> { String contentType = response.headers().contentType().map(MediaType::toString).orElse(\"unknown\"); return response.bodyToMono(String.class) .map(body -> \"Content-Type: \" + contentType + \", Body: \" + body); }) .subscribe(System.out::println);
4.4 注意事项
-
必须调用
subscribe()
启动流:.exchange()
返回的是Mono
,是惰性的,只有订阅后才会真正发送请求。 -
避免阻塞操作:如果你使用
.block()
来等待结果,会失去响应式编程的优势。 -
异常处理需显式处理:不像
.retrieve()
那样自动抛出异常,你需要自己判断状态码或使用.onErrorResume()
处理错误。
4.5 与 .retrieve()
的对比
.retrieve()
.exchange()
4.6 适用场景
.retrieve()
.exchange()
.exchange()
.exchange()
.exchange()
4.7 总结
.exchange()
是WebClient
提供的一种更底层、更灵活的方式来处理 HTTP 响应。它适用于需要精确控制响应处理流程的场景,尤其是在处理非标准响应、流式传输、自定义错误逻辑时非常有用。
如果你只需要简单的响应体提取,推荐使用 .retrieve()
;如果需要更强的控制能力,请使用 .exchange()
。
五、Webclient使用示例
这段代码是典型的 Reactor 操作链,使用了
Flux
或类似响应式类型来处理一个叫ChatR
esponse
的流式响应对象。它对每一个“块”(chunk)进行处理,并在完成时执行某些操作,同时处理异常情况。
响应式编程框架(如 Project Reactor 的 Flux
)来处理流式数据的逻辑,常用于处理异步或非阻塞操作,比如网络请求、事件流等。
//Flux<CommonResult> return streamResponse.map(chunk -> { String newContent = chunk.getResult() != null ? chunk.getResult().getOutput().getContent() : null; newContent = StrUtil.nullToDefault(newContent, \"\"); // 避免 null 的 情况 contentBuffer.append(newContent); // 响应结果 return success(new AiChatMessageSendRespVO() .setSend(BeanUtils.toBean(userMessage, AiChatMessageSendRespVO.Message.class)) .setReceive(BeanUtils.toBean(assistantMessage, AiChatMessageSendRespVO.Message.class) .setContent(newContent)) ); }).doOnComplete(() -> { // 忽略租户,因为 Flux 异步无法透传租户 TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(new AiChatMessageDO() .setId(assistantMessage.getId()) .setSegmentIds(convertList(segmentList, AiKnowledgeSegmentDO::getId)) .setContent(contentBuffer.toString()))); }).doOnError(throwable -> { log.error(\"[sendChatMessageStream][userId({}) sendReqVO({}) 发生异常]\", finalUserId, sendReqVO, throwable); // 忽略租户,因为 Flux 异步无法透传租户 TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage()))); }).onErrorResume(error -> Flux.just(error(AiErrorCodeConstants.CHAT_STREAM_ERROR)));
5.1 Chatresponse.map(chunk -> { ... })
作用:
chunk
是流中的一部分数据,可以理解为一次网络请求返回的一个片段。对Chatresponse
流中的每一个chunk
数据进行转换。将最终内容包装成一个“成功”的结果对象,可能是一个自定义的封装类(比如ResponseEntity
或你项目中的统一返回格式)。
内部逻辑:
String newContent = chunk.getResult() != null ? chunk.getResult().getOutput().getContent() : null;
newContent = StrUtil.nullToDefault(newContent, \"\"); // 避免 null 的情况
return success(newContent);
-
这里是在安全地获取
chunk
中的内容:-
先检查
chunk.getResult()
是否不为null
; -
如果不为
null
,再继续获取.getOutput().getContent()
; -
否则返回
null
,防止空指针异常。
-
-
使用了工具类
StrUtil
(可能是 Hutool 工具库)将null
转换为空字符串\"\"
,确保不会出现null
值。
5.2 .doOnComplete(() -> { })
-
作用:当整个
Flux
流处理完成后触发这个回调。可以在这里添加一些清理资源、日志记录或通知用户任务完成的逻辑。
error -> Flux.just(error(AiErrorCodeConstants.CHAT_STREAM_ERROR))
5.3.doOnError(throwable -> { log.error(...); ... })
作用:throwable
是发生的异常对象。当流处理过程中发生错误时,会调用这个回调。打印错误信息和堆栈跟踪,方便调试和日志记录。
5.4.onErrorResume(error -> Flux.just(error(...))
当发生错误时,不是直接抛出异常结束流,而是“恢复”并返回一个新的错误响应。创建一个包含错误响应的新
Flux
,让流程继续下去而不是中断。
AiErrorCodeConstants.CHAT_STREAM_ERROR
可能是你项目中定义的错误码常量,表示“聊天流错误”。
error -> Flux.just(error(AiErrorCodeConstants.CHAT_STREAM_ERROR))
5.5 总结一下整段代码做了什么?
-
接收一个
Chatresponse
类型的流式响应; -
对每个
chunk
进行内容提取和转换; -
当流处理完成时,执行可选的后续操作(当前为空);
-
遇到错误时,打印错误信息并输出一个错误响应;
-
保证即使出错,也不会中断整个流,而是优雅地返回错误提示。
六、just()方法
在 WebFlux 中,
just()
方法是 Reactor 框架(Spring WebFlux 的底层实现)用于创建包含固定元素的响应式流的核心方法。其主要作用是为Flux
(多元素流)或Mono
(单元素流)预定义静态数据序列。
6.1 核心作用
-
同步创建固定数据流
just()
通过直接指定元素值,创建一个立即发出预设元素的响应式流。例如:Flux.just(\"A\", \"B\", \"C\") // 创建包含3个字符串的流 Mono.just(\"Hello\") // 创建包含单个字符串的 Mono
这些元素会在订阅时按顺序同步发出,无需等待异步操作。
-
支持可变参数
可接受多个参数(
Flux.just(T...)
)或单个参数(Mono.just(T)
),覆盖单元素和多元素场景3。 -
创建冷序列(Cold Sequence)
通过
just()
创建的流属于冷序列:每次订阅都会重新发送全部元素。例如两次订阅同一Flux.just(1,2,3)
会各自收到独立的1→2→3
序列。
6.2 关键特性
-
数据流未执行直到订阅
调用
just()
仅是声明数据流,不会触发元素发送。必须通过subscribe()
订阅后才会实际发出数据:Mono.just(\"Data\").subscribe(System.out::println); // 输出 \"Data\"
-
空序列的特殊处理
若要创建空流,需使用
Mono.empty()
或Flux.empty()
。而just()
至少包含一个元素(如Mono.just(null)
会发送null
值)。 -
适用简单场景,不涉及复杂逻辑
适合已知静态数据的场景。若需动态生成元素(如循环、异步操作),应使用
create()
、generate()
或fromSupplier()
等方法。
6.3 对比其他创建方法
just()
Flux.just(1, 2, 3)
empty()
Mono.empty()
fromArray()
/fromIterable()
Flux.fromArray(new int[]{1,2})
create()
6.4 注意事项
-
非阻塞但可能阻塞线程:
just()
本身是同步的,若元素生成涉及阻塞操作(如Thread.sleep
),会破坏响应式非阻塞特性。 -
调试支持:结合
StepVerifier
测试工具可验证just()
流的元素是否符合预期。
just()
是 WebFlux 中快速创建静态数据流的基础方法,适用于已知元素的简单场景。其核心价值在于声明式构造响应式序列,但需注意订阅机制和冷序列特性。对于动态数据或复杂逻辑,建议结合其他操作符(如flatMap
、generate
)实现更灵活的流处理在响应式编程库 Project Reactor(如
reactor-core
)中,.just()
是一个非常常用的操作符,用于创建一个 包含一个或多个元素的、立即发射数据的响应式流(Mono
或Flux
)。
6.5 使用方式及示例
Mono
Mono.just(T data)
Mono
,该 Mono
在被订阅时会立即发射这个元素并完成。Flux
Flux.just(T... data)
或 Flux.just(T first, T... rest)
Flux
,每个元素都会被依次发射,并在最后完成。(1)使用 Mono.just()
Mono mono = Mono.just(\"Hello\");mono.subscribe(System.out::println);
输出:Hello
-
Mono.just(\"Hello\")
表示一个包含单个值\"Hello\"
的异步结果。 -
当调用
.subscribe()
时,它会立即发射这个值。
⚠️ 注意:
如果传入的是 null
,会抛出异常:NullPointerException
。
Mono.just(null); // ❌ 抛出 NullPointerException
如果需要支持 null 值,请使用 Mono.justOrEmpty()
(适用于可能为 null 的情况):
java
Mono mono = Mono.justOrEmpty(null); // 不会发射任何数据,直接 onComplete
(2)使用 Flux.just()
Flux flux = Flux.just(\"Apple\", \"Banana\", \"Cherry\");flux.subscribe(System.out::println);
输出:
AppleBananaCherry
每个参数都会作为流中的一个元素被发射。支持多个参数,也可以传数组:
String[] fruits = {\"Apple\", \"Banana\", \"Cherry\"};Flux flux = Flux.just(fruits);
(3)和其它方法的区别
Mono.just(T)
Mono.just(\"OK\")
Mono.empty()
Mono.empty()
Mono.fromSupplier(() -> ...)
Mono.fromSupplier(() -> calculate())
Mono.defer(() -> Mono.just(...))
Mono.defer(() -> Mono.just(new Date()))
Flux.just(...)
Flux.just(1, 2, 3)
Flux.fromIterable(list)
Flux.fromIterable(Arrays.asList(1,2,3))
(4)适用场景
Mono
或 Flux
flatMap
, map
等组合使用Mono.just(user).map(...)
(5)完整示例
在 Spring WebFlux 中,这些
Mono
/Flux
会被框架自动订阅并写入 HTTP 响应体中。
@GetMapping(\"/user\")public Mono getUser() { User user = new User(\"Alice\"); return Mono.just(user); // 包装成响应式类型返回}@GetMapping(\"/numbers\")public Flux getNumbers() { return Flux.just(1, 2, 3, 4, 5); // 返回一个数字序列}
Mono.just(T)
Flux.just(...)
justOrEmpty()
替代在响应式编程中,比如使用 Project Reactor 的 Mono
或 Flux
,需要订阅(subscribe)才会触发数据流的执行和发射。这种机制被称为“惰性求值”或“延迟执行”,意味着直到你调用 .subscribe()
方法之前,任何操作符链都不会真正开始执行。
(6)惰性求值
-
定义:在响应式流中,除非有订阅者订阅了这个流,否则不会执行任何操作。
-
目的:这种设计有助于优化资源使用,避免不必要的计算或网络请求等操作,直到确实需要处理这些数据为止。
使用 Mono.just()
创建一个 Mono 并订阅
Mono mono = Mono.just(\"Hello World\");// 在此之前,\"Hello World\" 并没有被打印出来,因为还没有订阅mono.subscribe(System.out::println); // 订阅后,输出 \"Hello World\"
在这个例子中,只有当 mono.subscribe()
被调用时,\"Hello World\" 才会被打印出来。如果没有调用 .subscribe()
,即使你创建了一个包含数据的 Mono
,也不会有任何输出。
使用 Flux.just()
创建一个 Flux 并订阅
Flux flux = Flux.just(\"A\", \"B\", \"C\");// 同样地,这里不会立即打印任何内容flux.subscribe(System.out::println); // 订阅后,依次输出 \"A\", \"B\", \"C\"
同样,对于 Flux
来说,也需要通过 .subscribe()
方法来启动数据流,并开始接收并处理元素。
(7)不主动订阅的情况
如果你只是构建了一个
Mono
或Flux
而不进行订阅,那么这个流将永远不会被执行。这对于某些场景来说是有用的,例如当你想返回一个响应式类型从一个方法中(如 Spring WebFlux 控制器),让框架本身来负责订阅和处理响应式流。
(8)总结
-
必须订阅:为了触发数据的产生和流动,你需要对
Mono
或Flux
进行订阅。 -
惰性求值:这种机制确保了只有在实际需要的时候才执行操作,从而提高了效率和性能。
-
应用场景:不仅限于简单的数据发射,还包括复杂的异步操作、网络请求等,所有这些都是基于订阅机制来驱动的
七、flatMap、generate、 create()、 fromSupplier()方法的作用
在响应式编程中(如 Project Reactor 的
reactor-core
),flatMap
、generate
、create()
和fromSupplier()
是非常常用的几个操作符或创建方法。它们各自有不同的用途和适用场景,下面是它们的详细解释和使用方式。
7.1 flatMap()方法
将每个元素映射为一个新的
Publisher
(如Mono
或Flux
),然后并发地扁平化合并这些 Publisher 的结果,最终返回一个统一的Flux
或Mono
。
Flux flux = Flux.just(1, 2, 3) .flatMap(i -> Flux.range(i * 10, 3)); // 每个数字变成一个范围流// 输出:10, 11, 12, 20, 21, 22, 30, 31, 32flux.subscribe(System.out::println);
(1)flatMap
特点
-
异步非阻塞
-
并发执行多个内部流
-
常用于处理异步数据转换、数据库查询、HTTP 请求等
-
可控制并发数(通过
flatMap(..., concurrency)
)
(2)对比 map()方法
-
map()
是同步转换。 -
flatMap()
是异步映射 + 合并。
7.2 generate()方法
同步地生成数据流,适用于需要手动控制发射逻辑的场景。是
Flux
提供的一个工厂方法。
Flux flux = Flux.generate( () -> 0, // 初始状态 (state, sink) -> { sink.next(state); // 发送当前状态 if (state == 10) { sink.complete(); // 完成流 } return state + 1; // 返回新的状态 });flux.subscribe(System.out::println);
输出:
01...10
特点:
-
同步生成器
-
非常适合用于模拟序列、有限状态机、自定义算法等
-
必须显式调用
sink.next()
和sink.complete()
/sink.error()
7.3 create()方法
提供一个更灵活的方式创建 Flux,可以异步/同步、多线程、背压支持良好,适用于桥接非响应式代码与响应式流之间的桥梁。
Flux flux = Flux.create(sink -> { sink.next(\"Hello\"); sink.next(\"World\"); sink.complete();});flux.subscribe(System.out::println);
输出
HelloWorld
-
支持同步和异步发射
-
支持背压(可通过
onRequest
控制) -
适合封装第三方库、事件驱动系统、Socket 等原始数据源
7.4 create
与 generate()
的区别
generate()
create()
7.5 fromSupplier()
从一个
Supplier
创建一个Mono
,延迟执行 Supplier 并发射其结果。
Mono mono = Mono.fromSupplier(() -> \"Hello from supplier\");mono.subscribe(System.out::println);
输出:
Hello from supplier
特点:
-
懒加载:只有订阅时才会执行 Supplier
-
不会多次执行 Supplier(除非重新订阅)
-
常用于包装同步计算任务,使其适应响应式链
对比 just()方法
:
Mono.just(T)
Mono.fromSupplier(Supplier)
7.6 总结对比表
flatMap()
generate()
create()
fromSupplier()
使用建议(常见场景)
flatMap()
generate()
create()
fromSupplier()
如果你正在构建响应式应用(如 Spring WebFlux),掌握这些操作符和创建方法可以让你更好地组合异步流程、封装业务逻辑、提升程序灵活性和性能。