> 技术文档 > 一文读懂WebFlux框架和WebClient响应式http客户端

一文读懂WebFlux框架和WebClient响应式http客户端

前言:WebClient 是 Spring WebFlux 模块提供的一个非阻塞的基于响应式编程的进行 Http 请求的客户端工具。WebFlux 对标 SpringMVC,WebClient 相当于 RestTemplate,同时也是 Spring 官方的 Http 请求工具。截止当前时间,目前Spring框架的最新版本是6.2.0。


一、Spring WebFlux简介

Spring WebFlux 是 Spring 5 引入的一个新的响应式 Web 框架,它与 Spring MVC 并行存在,旨在支持构建非阻塞异步通信和响应式流处理的应用程序。WebFlux 基于 Reactive Streams 规范,并使用了 Project Reactor 库作为其实现的基础。

以下是 WebFlux 的一些关键特性和概念:

  1.1 响应式编程模型

WebFlux 使用了 Mono 和 Flux 这两种反应类型来表示单值和多值的异步数据序列。它允许开发者以声明式的方式组合异步逻辑,并且能够处理复杂的场景如背压(backpressure)。

   1.2 非阻塞 I/O

WebFlux 支持非阻塞输入输出操作,这使得在高并发场景下可以更高效地利用系统资源。默认情况下,WebFlux 使用 Netty 服务器作为其非阻塞服务器,但也可以运行在其他支持 Servlet 3.1+ 规范的容器上

  1.3 事件驱动架构

WebFlux 利用了事件驱动的架构来提高系统的吞吐量和响应性。当有数据准备好时才进行处理,而不是为每个请求分配一个线程等待 I/O 操作完成。

1.4 支持多种编程模型

  • 注解驱动:类似于 Spring MVC 的注解方式,可以使用 @RestController@RequestMapping 等注解定义控制器。

  • 函数式编程:提供了函数式端点和路由定义的方式,允许以编程方式定义路由和处理器函数。

1.5 适用场景

  • 高并发 Web 应用:对于需要处理大量并发请求的应用来说,WebFlux 提供了一种有效的解决方案。

  • 微服务架构中的异步服务:适合微服务间的异步调用,特别是在涉及 I/O 操作时。

  • 实时数据流应用:例如 WebSocket 通信、消息推送等实时数据处理场景。

1.6 WebClient

Spring WebFlux 提供了一个新的 WebClient 类,这是一个非阻塞的 HTTP 客户端,可用于发起 HTTP 请求并处理响应。

1.7 兼容性和灵活性

WebFlux 可以与现有的 Spring 生态系统无缝集成,包括 Spring Data、Spring Security 等模块。

如果你正在考虑是否要在项目中采用 WebFlux,那么应该基于你的具体需求来决定。如果你的应用需要处理大量的并发连接,或者你希望利用响应式编程的优势来构建高效的微服务或实时应用程序,那么 WebFlux 可能是一个合适的选择。然而,如果是一个相对简单的 Web 应用,传统的 Spring MVC 可能就足够了。


二、WebClient 简介

WebClient 是 Spring WebFlux 模块提供的一个非阻塞的基于响应式编程的进行 Http 请求的客户端工具。WebFlux 对标 SpringMvc,WebClient 相当于 RestTemplate,同时也是 Spring 官方的 Http 请求工具。

2.1 传统阻塞IO模型 VS 响应式IO模型

  • 传统阻塞IO模型 RestTemplate

  Spring3.0引入了RestTemplate,SpringMVC或Struct等框架都是基于Servlet的,其底层IO模型是阻塞IO模型。采用阻塞IO模式获取输入数据。每个连接都需要独立的线程,完成数据输入、业务处理、返回。传统阻塞IO模型的问题是,当并发数很大时,就要创建大量线程,占用很大的系统资源。连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。

  • 响应式IO模型 WebClient

  Spring5中引入了WebClient作为非阻塞式Reactive Http客户端。Spring社区为了解决SpringMVC的阻塞模型在高并发场景下的性能瓶颈,推出了Spring WebFlux,WebFlux底层实现是久经考验的Netty非阻塞IO通信框架。其实WebClient处理单个HTTP请求的响应时长并不比RestTemplate更快,但是它处理并发的能力更强,非阻塞的方式可以使用较少的线程以及硬件资源来处理更多的并发。

  所以响应式非阻塞IO模型的核心意义在于,提高了单位时间内有限资源下的服务请求的并发处理能力,而不是缩短了单个服务请求的响应时长。

  • 与RestTemplate相比,WebClient的优势
    • 非阻塞响应式IO,单位时间内有限资源下支持更高的并发量。

    • 支持使用Java8 Lambda表达式函数。

    • 支持同步、异步、Stream流式传输。

2.2. WebClient  Api

WebClient 在 spring 提供的 WebFlux 中

org.springframework.bootspring-boot-starter-webflux
(1)创建实例
  • create() 创建实例
WebClient cli = WebClient.create();
  • create(String baseUrl) 创建实例并指定 baseURL
WebClient webClient = WebClient.create(\"http://localhost:8080\");

(2)构建器

  • WebClient.builder().build() 使用构建器
WebClient build = WebClient.builder().build();

WebClient.builder 的额外配置:

  • 创建副本
WebClient client1 = WebClient.builder() .filter(filterA).filter(filterB).build(); WebClient client2 = client1.mutate() .filter(filterC).filter(filterD).build();

一旦构建,WebClient是不可变的。但是,您可以按如下方式克隆它并构建修改后的副本

  • 编码器
//默认 256 kbWebClient webClient = WebClient.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)).build();
(3)获取响应

retrieve() 检索方法用于声明如何提取响应。

WebClient client = WebClient.create(\"https://example.org\"); Mono<ResponseEntity> result = client.get() .uri(\"/persons/{id}\", id).accept(MediaType.APPLICATION_JSON) .retrieve() .toEntity(Person.class);

或者只获得 body

WebClient client = WebClient.create(\"https://example.org\"); Mono result = client.get() .uri(\"/persons/{id}\", id).accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Person.class);

要获取解码对象流,请执行以下操作:

Flux result = client.get() .uri(\"/quotes\").accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(Quote.class);
bodyToFlux 和 bodyToMono:
  • bodyToFlux 方法用于将响应结果处理为 Flux 对象,Flux 是 Reactor 框架中表示包含零个、一个或多个元素的异步序列的类。这意味着响应结果可能是一个包含多个元素的流,而不是单个值。

  • bodyToMono 方法用于将响应结果处理为 Mono 对象,Mono 是 Reactor 框架中表示包含零个或一个元素的异步序列的类。这意味着响应结果是一个单个值或者没有值。

accept(MediaType... var1)

  响应数据类型

acceptCharset(Charset... var1)

  响应字符集

(4)RequestBody
@RequestMapping(\"/body\")public void test5(){ WebClient webClient = WebClient.create(\"http://localhost:8080/api/restful\"); MultiValueMap formData = new LinkedMultiValueMap(); formData.add(\"name\", \"张和\"); formData.add(\"color\", \"blue\"); Mono dogMono = webClient.post() .contentType(MediaType.APPLICATION_FORM_URLENCODED) .bodyValue(formData) .retrieve() .bodyToMono(Dog.class); System.out.println(dogMono.block());} @RequestMapping(\"/body1\")public void test6(){ WebClient webClient = WebClient.create(\"http://localhost:8080/api/json\"); Mono mono = Mono.just(new Dog(\"和\", \"33\")); Mono dogMono = webClient.post() .contentType(MediaType.APPLICATION_JSON) .body(mono, Dog.class) .retrieve() .bodyToMono(Dog.class); System.out.println(dogMono.block());}
  • contentType()

    设置请求参数格式

  • body()/bodyValue()

    请求参数

(5)过滤器

filter() 方法添加过滤器

public void test7(){ Mono mono = Mono.just(new Dog(\"和\", \"33\")); WebClient webClient = WebClient.builder().filter(new ExchangeFilterFunction() { @Override public Mono filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) { ClientRequest build = ClientRequest.from(clientRequest).body(mono, Dog.class).build(); return exchangeFunction.exchange(build); } }).build(); Mono dogMono = webClient.post() .uri(\"http://localhost:8080/api/json\") .contentType(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Dog.class); System.out.println(dogMono.block());}
(6) 同步

WebClient 的 block() 方法可以通过在末尾阻止结果以同步方式使用

@RequestMapping(\"/filter\")public void test7(){ Mono mono = Mono.just(new Dog(\"和\", \"33\")); WebClient webClient = WebClient.builder().filter(new ExchangeFilterFunction() { @Override public Mono filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) { ClientRequest build = ClientRequest.from(clientRequest).body(mono, Dog.class).build(); return exchangeFunction.exchange(build); } }).build(); Mono dogMono = webClient.post() .uri(\"http://localhost:8080/api/json\") .contentType(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Dog.class); //非阻塞 System.out.println(dogMono.subscribe(dog -> { System.out.println(dog); })); //阻塞 System.out.println(dogMono.block());}
  • subscribe() 非阻塞方式,用于异步处理数据流,非阻塞,适合构建高性能、响应式的应用程序。

  • block() 阻塞方式,用于同步获取数据流的结果,会阻塞线程,适用于简单的测试或需要在同步环境中获取数据的场景。


    三、不调用subscribe直接返回flux的区别

    在使用响应式编程模型时,比如通过 Project Reactor 的 FluxMono,直接返回一个 Flux 而不调用 subscribe() 方法和实际订阅(调用 subscribe())之间有着重要的区别。理解这些差异对于构建高效的响应式应用程序至关重要。

    3.1、直接返回 Flux

    当你直接返回一个 Flux 对象而不进行订阅时,实际上你只是创建了一个数据流的定义或者说是一个“蓝图”。这个数据流尚未开始执行,也没有任何数据被生成或消费。

    优点:

    • 延迟计算:因为数据流未被订阅,所以相关的计算也不会被执行。这可以节省资源直到真正需要数据的时候。

    • 组合性:可以在不同的层次上将多个操作组合起来,而不需要立即执行它们。

    • 声明性:专注于描述要做什么而不是如何做,使得代码更加简洁、清晰。

    示例

    @GetMapping(value = \"/events\", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux getEvents() { return eventService.getEventStream();}

    在这个例子中,getEvents() 方法返回一个 Flux,但并没有调用 subscribe()。WebFlux 框架会自动处理这个 Flux 并为每个客户端请求订阅它。这种做法常见于Web控制器中,例如Spring WebFlux应用,返回的Flux被动的由框架负责对象进行订阅,并处理结果。

    3.2 subscribe订阅 Flux

    调用 subscribe() 方法意味着你主动开始执行这个数据流,开始产生数据并消费它。这是触发数据流的实际执行点。

    作用

    • 启动数据流:只有当您调用 subscribe() 时,数据流才会开始产生数据并传递给订阅者。

    • 处理结果:你可以提供回调来处理产生的数据项、错误或者完成信号。

    示例

    eventService.getEventStream().subscribe(event -> { System.out.println(\"Received event: \" + event);});

    在这个例子中,subscribe() 方法被调用以开始接收来自 getEventStream() 的事件,并在控制台打印出来。

    3.3 区别总结

    • 执行时机:直接返回 Flux 只是定义了数据流并未执行;调用 subscribe() 则触发了数据流的执行。

    • 应用场景:直接返回 Flux 常用于构建响应式的API接口,让框架层去管理订阅和数据流的执行;而在需要直接处理数据流的地方(如业务逻辑层),则可能需要显式调用 subscribe() 来主动获取数据。

    • 资源管理:由于Flux是惰性的,只有在订阅后才会开始消耗资源,因此直接返回 Flux 可以更好地控制资源的使用。


    四、exchange()和retrieve()方法的区别

    exchange() 是 Spring WebFlux框架 中 WebClient 提供的一个方法,用于手动控制 HTTP 请求的响应处理流程。它返回一个 Mono,允许你对响应进行更细粒度的操作,比如检查状态码、头信息、选择不同的响应体解析方式等。与 .retrieve() 不同,.exchange() 不会自动处理响应体或抛出异常,而是让你手动控制整个响应过程。

    4.1 基本概念

    在使用 WebClient 发起请求时,有两种常见方式:

    方法 描述 .retrieve() 自动处理成功响应(2xx),适用于直接获取响应体内容,简单易用但灵活性差。 .exchange() 返回 Mono,让你手动处理整个响应过程,包括状态码、错误处理、响应体解析等,更加灵活。

    4.2 exchange() 的作用和优势,更精细地控制响应处理

    • 可以根据响应的状态码做不同的处理。

    • 支持自定义错误处理逻辑。

    • 支持流式处理响应体(如 text/event-stream)。

    • 可以读取响应头、cookie 等元数据。


    4.3 使用示例

    示例 1:基本使用 .exchange()

    WebClient webClient = WebClient.create(\"https://api.example.com\");webClient.get() .uri(\"/users/1\") .exchange() .flatMap(response -> { if (response.statusCode().is2xxSuccessful()) { return response.bodyToMono(User.class); } else { return Mono.error(new RuntimeException(\"Request failed with status code: \" + response.statusCode())); } }) .subscribe(user -> System.out.println(\"User: \" + user));
        (1)总结流程图
        ​GET https://api.example.com/users/1 ↓ exchange() → Mono ↓ flatMap → 判断状态码 ↓ bodyToMono 或 抛出错误 ↓ subscribe → 异步获取 User 对象并打印
        (2)解释代码
        • 使用 .exchange() 获取完整响应

        • flatMap()处理响应:使用 .flatMap() 对 Mono 进行转换。检查响应状态码是否是 2xx 成功状态:如果是成功响应,调用 bodyToMono(User.class) 将响应体反序列化为 User 对象。如果不是成功状态码,则返回一个错误信号 Mono.error(...),中断流并通知订阅者发生了错误。

        • 调用 .subscribe() 表示启动整个响应式流,真正发送请求。这是一个异步非阻塞操作,当服务器返回数据后,回调函数 user -> System.out.println(...) 会被执行。如果请求失败(如返回非 2xx 状态码),会触发错误信号,可以通过 .onErrorResume() 或 .doOnError() 来捕获。

        示例 2:处理流式响应(如 Server-Sent Events)

        流式处理方式接收并打印来自服务端的 Server-Sent Events(SSE) 数据。accept() 方法是 Spring WebFlux 中 WebClient 请求构建器(WebClient.RequestHeadersSpec)的一个方法,用于设置 HTTP 请求头中的 Accept 字段(这是 HTTP 协议中标准的请求头字段之一,通常用于 内容协商(Content Negotiation))。设置客户端期望从服务器接收的响应内容类型(即告诉服务器:“我接受什么样的数据格式”)。

        webClient.get() .uri(\"/stream\") .contentType(MediaType.APPLICATION_JSON) // 发送的是 JSON 数据 .accept(MediaType.TEXT_EVENT_STREAM) // 希望返回流式数据 .exchange() .flatMapMany(clientResponse -> clientResponse.bodyToFlux(String.class)) .subscribe(data -> System.out.println(\"Received: \" + data));

        示例 3:读取响应头信息

        webClient.get() .uri(\"/info\") .exchange() .flatMap(response -> { String contentType = response.headers().contentType().map(MediaType::toString).orElse(\"unknown\"); return response.bodyToMono(String.class) .map(body -> \"Content-Type: \" + contentType + \", Body: \" + body); }) .subscribe(System.out::println);

        4.4 注意事项

        • 必须调用 subscribe() 启动流.exchange() 返回的是 Mono,是惰性的,只有订阅后才会真正发送请求。

        • 避免阻塞操作:如果你使用 .block() 来等待结果,会失去响应式编程的优势。

        • 异常处理需显式处理:不像 .retrieve() 那样自动抛出异常,你需要自己判断状态码或使用 .onErrorResume() 处理错误。

        4.5 与 .retrieve() 的对比

        特性 .retrieve() .exchange() 响应处理 自动处理 2xx 成功响应 手动处理所有响应 错误处理 默认抛出异常 需要手动判断状态码或错误 获取响应头 不支持 支持 流式响应 支持但不够灵活 完全支持,可自由控制 使用难度 简单 复杂但更强大

        4.6 适用场景

        场景 推荐方法 普通 GET 请求获取 JSON 数据 .retrieve() 需要判断不同状态码 .exchange() 处理 SSE 或流式响应 .exchange() 需要访问响应头信息 .exchange() 自定义错误逻辑(如重试、降级) .exchange()

        4.7 总结

        .exchange()WebClient 提供的一种更底层、更灵活的方式来处理 HTTP 响应。它适用于需要精确控制响应处理流程的场景,尤其是在处理非标准响应、流式传输、自定义错误逻辑时非常有用。

        如果你只需要简单的响应体提取,推荐使用 .retrieve();如果需要更强的控制能力,请使用 .exchange()


        五、Webclient使用示例

        这段代码是典型的 Reactor 操作链,使用了 Flux 或类似响应式类型来处理一个叫 ChatR

        esponse 的流式响应对象。它对每一个“块”(chunk)进行处理,并在完成时执行某些操作,同时处理异常情况。

        响应式编程框架(如 Project Reactor 的 Flux)来处理流式数据的逻辑,常用于处理异步或非阻塞操作,比如网络请求、事件流等。

        //Flux<CommonResult> return streamResponse.map(chunk -> { String newContent = chunk.getResult() != null ? chunk.getResult().getOutput().getContent() : null; newContent = StrUtil.nullToDefault(newContent, \"\"); // 避免 null 的 情况 contentBuffer.append(newContent); // 响应结果 return success(new AiChatMessageSendRespVO()  .setSend(BeanUtils.toBean(userMessage, AiChatMessageSendRespVO.Message.class))  .setReceive(BeanUtils.toBean(assistantMessage, AiChatMessageSendRespVO.Message.class) .setContent(newContent)) ); }).doOnComplete(() -> { // 忽略租户,因为 Flux 异步无法透传租户 TenantUtils.executeIgnore(() ->  chatMessageMapper.updateById(new AiChatMessageDO() .setId(assistantMessage.getId()) .setSegmentIds(convertList(segmentList, AiKnowledgeSegmentDO::getId)) .setContent(contentBuffer.toString()))); }).doOnError(throwable -> { log.error(\"[sendChatMessageStream][userId({}) sendReqVO({}) 发生异常]\", finalUserId, sendReqVO, throwable); // 忽略租户,因为 Flux 异步无法透传租户 TenantUtils.executeIgnore(() ->  chatMessageMapper.updateById(new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage()))); }).onErrorResume(error -> Flux.just(error(AiErrorCodeConstants.CHAT_STREAM_ERROR)));

        5.1  Chatresponse.map(chunk -> { ... })

        作用chunk 是流中的一部分数据,可以理解为一次网络请求返回的一个片段。对 Chatresponse 流中的每一个 chunk 数据进行转换。将最终内容包装成一个“成功”的结果对象,可能是一个自定义的封装类(比如 ResponseEntity 或你项目中的统一返回格式)。

        内部逻辑

        String newContent = chunk.getResult() != null ?  chunk.getResult().getOutput().getContent() :  null;
        newContent = StrUtil.nullToDefault(newContent, \"\"); // 避免 null 的情况
        return success(newContent);
        • 这里是在安全地获取 chunk 中的内容:

          • 先检查 chunk.getResult() 是否不为 null

          • 如果不为 null,再继续获取 .getOutput().getContent()

          • 否则返回 null,防止空指针异常。

        • 使用了工具类 StrUtil(可能是 Hutool 工具库)将 null 转换为空字符串 \"\",确保不会出现 null 值。

        5.2 .doOnComplete(() -> { })

        • 作用:当整个 Flux 流处理完成后触发这个回调。可以在这里添加一些清理资源、日志记录或通知用户任务完成的逻辑。

        ​error -> Flux.just(error(AiErrorCodeConstants.CHAT_STREAM_ERROR))

        5.3.doOnError(throwable -> { log.error(...); ... })

        作用throwable 是发生的异常对象。当流处理过程中发生错误时,会调用这个回调。打印错误信息和堆栈跟踪,方便调试和日志记录。

        5.4.onErrorResume(error -> Flux.just(error(...))

        当发生错误时,不是直接抛出异常结束流,而是“恢复”并返回一个新的错误响应。创建一个包含错误响应的新 Flux,让流程继续下去而不是中断。

        AiErrorCodeConstants.CHAT_STREAM_ERROR 可能是你项目中定义的错误码常量,表示“聊天流错误”。

        error -> Flux.just(error(AiErrorCodeConstants.CHAT_STREAM_ERROR))

        5.5 总结一下整段代码做了什么?

        • 接收一个 Chatresponse 类型的流式响应;

        • 对每个 chunk 进行内容提取和转换;

        • 当流处理完成时,执行可选的后续操作(当前为空);

        • 遇到错误时,打印错误信息并输出一个错误响应;

        • 保证即使出错,也不会中断整个流,而是优雅地返回错误提示。


        六、just()方法

        在 WebFlux 中,just() 方法是 Reactor 框架(Spring WebFlux 的底层实现)用于创建包含固定元素的响应式流的核心方法。其主要作用是为 Flux(多元素流)或 Mono(单元素流)预定义静态数据序列

        6.1 核心作用

        • 同步创建固定数据流

          just() 通过直接指定元素值,创建一个立即发出预设元素的响应式流。例如:

          Flux.just(\"A\", \"B\", \"C\") // 创建包含3个字符串的流 Mono.just(\"Hello\") // 创建包含单个字符串的 Mono 

          这些元素会在订阅时按顺序同步发出,无需等待异步操作。

        • 支持可变参数

          可接受多个参数(Flux.just(T...) )或单个参数(Mono.just(T) ),覆盖单元素和多元素场景3。

        • 创建冷序列(Cold Sequence)

          通过 just() 创建的流属于冷序列:每次订阅都会重新发送全部元素。例如两次订阅同一 Flux.just(1,2,3) 会各自收到独立的 1→2→3 序列。

        6.2 关键特性

        • 数据流未执行直到订阅

          调用 just() 仅是声明数据流,不会触发元素发送。必须通过 subscribe() 订阅后才会实际发出数据:

          Mono.just(\"Data\").subscribe(System.out::println); // 输出 \"Data\"
        • 空序列的特殊处理

          若要创建空流,需使用 Mono.empty() 或 Flux.empty() 。而 just() 至少包含一个元素(如 Mono.just(null) 会发送 null 值)。

        • 适用简单场景,不涉及复杂逻辑

          适合已知静态数据的场景。若需动态生成元素(如循环、异步操作),应使用 create()generate() 或 fromSupplier() 等方法。

        6.3 对比其他创建方法

        方法 适用场景 示例 just() 固定元素序列 Flux.just(1, 2, 3) empty() 空序列(仅发送完成信号) Mono.empty() fromArray()/fromIterable() 从集合/数组创建流 Flux.fromArray(new int[]{1,2}) create() 动态生成元素(支持多次发送) 复杂异步逻辑

        6.4 注意事项

        • 非阻塞但可能阻塞线程just() 本身是同步的,若元素生成涉及阻塞操作(如 Thread.sleep ),会破坏响应式非阻塞特性。

        • 调试支持:结合 StepVerifier 测试工具可验证 just() 流的元素是否符合预期。

        just() 是 WebFlux 中快速创建静态数据流的基础方法,适用于已知元素的简单场景。其核心价值在于声明式构造响应式序列,但需注意订阅机制和冷序列特性。对于动态数据或复杂逻辑,建议结合其他操作符(如 flatMapgenerate)实现更灵活的流处理

        在响应式编程库 Project Reactor(如 reactor-core)中,.just() 是一个非常常用的操作符,用于创建一个 包含一个或多个元素的、立即发射数据的响应式流(MonoFlux

        6.5 使用方式及示例

        类型 方法 含义 Mono Mono.just(T data) 创建一个包含单个元素的 Mono,该 Mono 在被订阅时会立即发射这个元素并完成。 Flux Flux.just(T... data) 或 Flux.just(T first, T... rest) 创建一个按顺序发射多个元素的 Flux,每个元素都会被依次发射,并在最后完成。

        (1)使用 Mono.just()

        Mono mono = Mono.just(\"Hello\");mono.subscribe(System.out::println);
        输出:Hello
        • Mono.just(\"Hello\") 表示一个包含单个值 \"Hello\" 的异步结果。

        • 当调用 .subscribe() 时,它会立即发射这个值。

        ⚠️ 注意:

        如果传入的是 null,会抛出异常:NullPointerException

        Mono.just(null); // ❌ 抛出 NullPointerException

        如果需要支持 null 值,请使用 Mono.justOrEmpty()(适用于可能为 null 的情况):
        java

        Mono mono = Mono.justOrEmpty(null); // 不会发射任何数据,直接 onComplete

        (2)使用 Flux.just()

        Flux flux = Flux.just(\"Apple\", \"Banana\", \"Cherry\");flux.subscribe(System.out::println);

        输出:

        AppleBananaCherry

        每个参数都会作为流中的一个元素被发射。支持多个参数,也可以传数组:

        String[] fruits = {\"Apple\", \"Banana\", \"Cherry\"};Flux flux = Flux.just(fruits);

        (3)和其它方法的区别

        操作符 描述 示例 Mono.just(T) 发射一个元素,然后完成 Mono.just(\"OK\") Mono.empty() 立即完成,不发射任何数据 Mono.empty() Mono.fromSupplier(() -> ...) 懒加载,每次订阅时计算一次 Mono.fromSupplier(() -> calculate()) Mono.defer(() -> Mono.just(...)) 每次订阅都重新创建一个新的 Mono Mono.defer(() -> Mono.just(new Date())) Flux.just(...) 发射多个元素,按顺序发送 Flux.just(1, 2, 3) Flux.fromIterable(list) 从集合创建 Flux Flux.fromIterable(Arrays.asList(1,2,3))

        (4)适用场景

        场景 说明 返回静态数据 如返回固定的配置信息、常量值等 测试/模拟数据 快速构造测试用的 Mono 或 Flux 组合响应式流 将同步数据嵌入到响应式链中,例如与 flatMapmap 等组合使用 链式处理起点 作为响应式链的起始点,例如:Mono.just(user).map(...)

        (5)完整示例

        在 Spring WebFlux 中,这些 Mono / Flux 会被框架自动订阅并写入 HTTP 响应体中。

        @GetMapping(\"/user\")public Mono getUser() { User user = new User(\"Alice\"); return Mono.just(user); // 包装成响应式类型返回}@GetMapping(\"/numbers\")public Flux getNumbers() { return Flux.just(1, 2, 3, 4, 5); // 返回一个数字序列}
        方法 类型 特点 Mono.just(T) 单一值 被自动订阅后,立即发射一个值并完成 Flux.just(...) 多值 被自动订阅后,立即按顺序发射多个值并完成 是否懒加载? ❌ 否 数据是立即确定的 是否可为空? ❌ 不可为 null(会抛异常) 可使用 justOrEmpty() 替代

        在响应式编程中,比如使用 Project Reactor 的 MonoFlux需要订阅(subscribe)才会触发数据流的执行和发射。这种机制被称为“惰性求值”或“延迟执行”,意味着直到你调用 .subscribe() 方法之前,任何操作符链都不会真正开始执行。

        (6)惰性求值

        • 定义:在响应式流中,除非有订阅者订阅了这个流,否则不会执行任何操作。

        • 目的:这种设计有助于优化资源使用,避免不必要的计算或网络请求等操作,直到确实需要处理这些数据为止。

        使用 Mono.just() 创建一个 Mono 并订阅
        Mono mono = Mono.just(\"Hello World\");// 在此之前,\"Hello World\" 并没有被打印出来,因为还没有订阅mono.subscribe(System.out::println); // 订阅后,输出 \"Hello World\"

        在这个例子中,只有当 mono.subscribe() 被调用时,\"Hello World\" 才会被打印出来。如果没有调用 .subscribe(),即使你创建了一个包含数据的 Mono,也不会有任何输出。

        使用 Flux.just() 创建一个 Flux 并订阅
        Flux flux = Flux.just(\"A\", \"B\", \"C\");// 同样地,这里不会立即打印任何内容flux.subscribe(System.out::println); // 订阅后,依次输出 \"A\", \"B\", \"C\"

        同样,对于 Flux 来说,也需要通过 .subscribe() 方法来启动数据流,并开始接收并处理元素。

        (7)不主动订阅的情况

        如果你只是构建了一个 MonoFlux 而不进行订阅,那么这个流将永远不会被执行。这对于某些场景来说是有用的,例如当你想返回一个响应式类型从一个方法中(如 Spring WebFlux 控制器),让框架本身来负责订阅和处理响应式流。

        (8)总结

        • 必须订阅:为了触发数据的产生和流动,你需要对 Mono 或 Flux 进行订阅。

        • 惰性求值:这种机制确保了只有在实际需要的时候才执行操作,从而提高了效率和性能。

        • 应用场景:不仅限于简单的数据发射,还包括复杂的异步操作、网络请求等,所有这些都是基于订阅机制来驱动的


        七、​flatMap、generate、 create()、 fromSupplier()方法的作用

        在响应式编程中(如 Project Reactor 的 reactor-core),flatMapgeneratecreate()fromSupplier() 是非常常用的几个操作符或创建方法。它们各自有不同的用途和适用场景,下面是它们的详细解释和使用方式。


        7.1 flatMap()方法

        将每个元素映射为一个新的 Publisher(如 MonoFlux),然后并发地扁平化合并这些 Publisher 的结果,最终返回一个统一的 FluxMono

        Flux flux = Flux.just(1, 2, 3) .flatMap(i -> Flux.range(i * 10, 3)); // 每个数字变成一个范围流// 输出:10, 11, 12, 20, 21, 22, 30, 31, 32flux.subscribe(System.out::println);

        (1)flatMap特点

        • 异步非阻塞

        • 并发执行多个内部流

        • 常用于处理异步数据转换、数据库查询、HTTP 请求等

        • 可控制并发数(通过 flatMap(..., concurrency)

        (2)对比 map()方法

        • map() 是同步转换。

        • flatMap() 是异步映射 + 合并。

        7.2 generate()方法

        同步地生成数据流,适用于需要手动控制发射逻辑的场景。是 Flux 提供的一个工厂方法。

        Flux flux = Flux.generate( () -> 0, // 初始状态 (state, sink) -> { sink.next(state); // 发送当前状态 if (state == 10) { sink.complete(); // 完成流 } return state + 1; // 返回新的状态 });flux.subscribe(System.out::println);

        输出:

        01...10

         特点:

        • 同步生成器

        • 非常适合用于模拟序列、有限状态机、自定义算法等

        • 必须显式调用 sink.next() 和 sink.complete() / sink.error()


        7.3 create()方法

        提供一个更灵活的方式创建 Flux,可以异步/同步、多线程、背压支持良好,适用于桥接非响应式代码与响应式流之间的桥梁。

        Flux flux = Flux.create(sink -> { sink.next(\"Hello\"); sink.next(\"World\"); sink.complete();});flux.subscribe(System.out::println);

        输出

        HelloWorld
        • 支持同步和异步发射

        • 支持背压(可通过 onRequest 控制)

        • 适合封装第三方库、事件驱动系统、Socket 等原始数据源

        7.4 create与 generate() 的区别

        方法 是否支持异步 是否支持背压 用途 generate() ❌ 同步 ✅ 支持 手动控制同步数据流 create() ✅ 支持 ✅ 支持 更灵活,适合复杂场景

        7.5 fromSupplier()

        从一个 Supplier 创建一个 Mono延迟执行 Supplier 并发射其结果

        Mono mono = Mono.fromSupplier(() -> \"Hello from supplier\");mono.subscribe(System.out::println);

        输出:

        Hello from supplier

        特点:

        • 懒加载:只有订阅时才会执行 Supplier

        • 不会多次执行 Supplier(除非重新订阅)

        • 常用于包装同步计算任务,使其适应响应式链

        对比 just()方法

        方法 是否懒加载 是否接受 null Mono.just(T) ❌ 否,值立即确定 ❌ 不允许 null Mono.fromSupplier(Supplier) ✅ 是,订阅时才执行 ✅ 允许返回 null

        7.6 总结对比表

        方法 类型 是否异步 是否懒加载 是否支持背压 用途 flatMap() 操作符 ✅ ✅ ✅ 异步映射 + 合并多个流 generate() 创建方法 ❌ ✅ ✅ 同步生成有限数据流 create() 创建方法 ✅ ✅ ✅ 自定义任意数据源(同步/异步) fromSupplier() 创建方法 ❌ ✅ ❌ 封装同步计算任务,延迟执行

         使用建议(常见场景)

        场景 推荐方法 调用 HTTP API、DB 查询后映射结果 flatMap() 生成固定范围的数据流(如 0~10) generate() 接入事件监听器、Socket、队列等外部源 create() 包装同步计算任务,使其成为 Mono fromSupplier()

        如果你正在构建响应式应用(如 Spring WebFlux),掌握这些操作符和创建方法可以让你更好地组合异步流程、封装业务逻辑、提升程序灵活性和性能。