> 技术文档 > Spring WebFlux 深度实践指南

Spring WebFlux 深度实践指南

Spring WebFlux 深度实践指南

文章目录

      • 4.3.1 构建 Reactive REST API
        • 基础项目搭建
        • 响应式控制器
        • 高级特性
      • 4.3.2 响应式数据库访问(R2DBC)
        • R2DBC 配置
        • 响应式Repository
        • 复杂查询与事务
        • 性能优化
      • 4.3.3 WebSocket 实时通信
        • 基础WebSocket配置
        • 响应式WebSocket处理
        • 高级特性
        • 安全配置
    • 性能监控与最佳实践
      • 监控端点配置
      • 响应式应用监控
      • 最佳实践总结

Spring WebFlux 深度实践指南

Spring WebFlux 是 Spring Framework 5 引入的响应式 Web 框架,基于 Project Reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 WebFlux 的核心功能,包括 REST API 构建、响应式数据库访问和实时通信。

4.3.1 构建 Reactive REST API

Spring WebFlux 深度实践指南

基础项目搭建

首先创建 Spring Boot WebFlux 项目(基于 Spring Initializr):

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId></dependency>
响应式控制器

注解式控制器(与传统Spring MVC类似但支持响应式类型):

@RestController@RequestMapping(\"/products\")public class ProductController { private final ProductService productService; // 构造函数注入 public ProductController(ProductService productService) { this.productService = productService; } @GetMapping public Flux<Product> getAllProducts() { return productService.findAll(); } @GetMapping(\"/{id}\") public Mono<Product> getProductById(@PathVariable String id) { return productService.findById(id); } @PostMapping @ResponseStatus(HttpStatus.CREATED) public Mono<Product> createProduct(@RequestBody Mono<Product> productMono) { return productService.save(productMono); } @PutMapping(\"/{id}\") public Mono<Product> updateProduct( @PathVariable String id, @RequestBody Mono<Product> productMono) { return productService.update(id, productMono); } @DeleteMapping(\"/{id}\") @ResponseStatus(HttpStatus.NO_CONTENT) public Mono<Void> deleteProduct(@PathVariable String id) { return productService.delete(id); }}

函数式端点(RouterFunction方式):

@Configurationpublic class ProductRouter { @Bean public RouterFunction<ServerResponse> route(ProductHandler handler) { return RouterFunctions.route() .GET(\"/fn/products\", handler::getAll) .GET(\"/fn/products/{id}\", handler::getById) .POST(\"/fn/products\", handler::create) .PUT(\"/fn/products/{id}\", handler::update) .DELETE(\"/fn/products/{id}\", handler::delete) .build(); }}@Componentpublic class ProductHandler { private final ProductService productService; public ProductHandler(ProductService productService) { this.productService = productService; } public Mono<ServerResponse> getAll(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_NDJSON) .body(productService.findAll(), Product.class); } public Mono<ServerResponse> getById(ServerRequest request) { String id = request.pathVariable(\"id\"); return productService.findById(id) .flatMap(product -> ServerResponse.ok().bodyValue(product)) .switchIfEmpty(ServerResponse.notFound().build()); } public Mono<ServerResponse> create(ServerRequest request) { return request.bodyToMono(Product.class) .flatMap(productService::save) .flatMap(product -> ServerResponse .created(URI.create(\"/fn/products/\" + product.getId())) .bodyValue(product)); } public Mono<ServerResponse> update(ServerRequest request) { String id = request.pathVariable(\"id\"); return request.bodyToMono(Product.class) .flatMap(product -> productService.update(id, Mono.just(product))) .flatMap(product -> ServerResponse.ok().bodyValue(product)) .switchIfEmpty(ServerResponse.notFound().build()); } public Mono<ServerResponse> delete(ServerRequest request) { String id = request.pathVariable(\"id\"); return productService.delete(id) .then(ServerResponse.noContent().build()); }}
高级特性

Spring WebFlux 深度实践指南
流式响应(Server-Sent Events):

@GetMapping(value = \"/stream\", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ProductEvent> streamProducts() { return Flux.interval(Duration.ofSeconds(1)) .map(sequence -> new ProductEvent( \"product-\" + sequence, \"Event at \" + Instant.now() ));}

请求验证与异常处理

@ControllerAdvicepublic class GlobalErrorHandler extends AbstractErrorWebExceptionHandler { public GlobalErrorHandler(ErrorAttributes errorAttributes, WebProperties.Resources resources, ApplicationContext applicationContext, ServerCodecConfigurer serverCodecConfigurer) { super(errorAttributes, resources, applicationContext); this.setMessageWriters(serverCodecConfigurer.getWriters()); } @Override protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) { return RouterFunctions.route( RequestPredicates.all(), request -> { Map<String, Object> errorProperties = getErrorAttributes(request, ErrorAttributeOptions.defaults()); HttpStatus status = getHttpStatus(errorProperties); return ServerResponse.status(status)  .contentType(MediaType.APPLICATION_JSON)  .bodyValue(errorProperties); } ); } private HttpStatus getHttpStatus(Map<String, Object> errorProperties) { return HttpStatus.valueOf((Integer)errorProperties.get(\"status\")); }}// 自定义验证public class ProductValidator { public static Mono<Product> validate(Product product) { return Mono.just(product) .flatMap(p -> { List<String> errors = new ArrayList<>(); if (p.getName() == null || p.getName().isEmpty()) {  errors.add(\"Product name is required\"); } if (p.getPrice() <= 0) {  errors.add(\"Price must be positive\"); } if (!errors.isEmpty()) {  return Mono.error(new ValidationException(errors)); } return Mono.just(p); }); }}

4.3.2 响应式数据库访问(R2DBC)

Spring WebFlux 深度实践指南

R2DBC 配置

添加依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency> <groupId>io.r2dbc</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope></dependency>

配置 application.yml:

spring: r2dbc: url: r2dbc:postgresql://localhost:5432/mydb username: user password: pass pool: enabled: true max-size: 20
响应式Repository

定义实体:

@Data@Table(\"products\")public class Product { @Id private Long id; private String name; private String description; private BigDecimal price; private Instant createdAt;}

创建Repository接口:

public interface ProductRepository extends ReactiveCrudRepository<Product, Long> { Flux<Product> findByNameContaining(String name); @Query(\"SELECT * FROM products WHERE price > :minPrice\") Flux<Product> findByPriceGreaterThan(BigDecimal minPrice); @Modifying @Query(\"UPDATE products SET price = price * :factor\") Mono<Integer> updateAllPrices(BigDecimal factor);}
复杂查询与事务

Spring WebFlux 深度实践指南
自定义查询实现

public class ProductRepositoryImpl implements CustomProductRepository { private final DatabaseClient databaseClient; public ProductRepositoryImpl(DatabaseClient databaseClient) { this.databaseClient = databaseClient; } @Override public Flux<Product> complexSearch(ProductCriteria criteria) { return databaseClient.sql(\"\"\" SELECT * FROM products  WHERE name LIKE :name  AND price BETWEEN :minPrice AND :maxPrice ORDER BY :sortField :sortDirection LIMIT :limit OFFSET :offset \"\"\") .bind(\"name\", \"%\" + criteria.getName() + \"%\") .bind(\"minPrice\", criteria.getMinPrice()) .bind(\"maxPrice\", criteria.getMaxPrice()) .bind(\"sortField\", criteria.getSortField()) .bind(\"sortDirection\", criteria.getSortDirection()) .bind(\"limit\", criteria.getPageSize()) .bind(\"offset\", (criteria.getPageNumber() - 1) * criteria.getPageSize()) .map((row, metadata) -> toProduct(row)) .all(); } private Product toProduct(Row row) { // 行到对象的转换逻辑 }}

事务管理

@Service@RequiredArgsConstructorpublic class ProductService { private final ProductRepository productRepository; private final TransactionalOperator transactionalOperator; public Mono<Void> transferStock(String fromId, String toId, int quantity) { return transactionalOperator.execute(status -> productRepository.findById(fromId) .flatMap(fromProduct -> {  if (fromProduct.getStock() < quantity) { return Mono.error(new InsufficientStockException());  }  fromProduct.setStock(fromProduct.getStock() - quantity);  return productRepository.save(fromProduct) .then(productRepository.findById(toId)) .flatMap(toProduct -> { toProduct.setStock(toProduct.getStock() + quantity); return productRepository.save(toProduct); }); }) ); }}
性能优化
  1. 连接池配置
spring: r2dbc: pool: max-size: 20 initial-size: 5 max-idle-time: 30m
  1. 批处理操作
public Mono<Integer> batchInsert(List<Product> products) { return databaseClient.inConnectionMany(connection -> { Batch batch = connection.createBatch(); products.forEach(product -> batch.add(\"INSERT INTO products(name, price) VALUES($1, $2)\") .bind(0, product.getName()) .bind(1, product.getPrice()) ); return Flux.from(batch.execute()) .reduce(0, (count, result) -> count + result.getRowsUpdated()); });}

4.3.3 WebSocket 实时通信

Spring WebFlux 深度实践指南

基础WebSocket配置

配置类:

@Configuration@EnableWebFluxpublic class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint(\"/ws\") .setHandshakeHandler(new DefaultHandshakeHandler()) .setAllowedOrigins(\"*\"); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker(\"/topic\"); registry.setApplicationDestinationPrefixes(\"/app\"); }}
响应式WebSocket处理

股票行情推送示例

@Controllerpublic class StockTickerController { private final Flux<StockQuote> stockQuoteFlux; public StockTickerController(StockQuoteGenerator quoteGenerator) { this.stockQuoteFlux = Flux.interval(Duration.ofMillis(500)) .map(sequence -> quoteGenerator.generate()) .share(); // 热发布,多个订阅者共享数据 } @MessageMapping(\"stocks.subscribe\") @SendTo(\"/topic/stocks\") public Flux<StockQuote> subscribe() { return stockQuoteFlux; } @MessageMapping(\"stocks.filter\") public Flux<StockQuote> filter(@Payload String symbol) { return stockQuoteFlux.filter(quote -> quote.getSymbol().equals(symbol)); }}

客户端连接示例

const socket = new SockJS(\'/ws\');const stompClient = Stomp.over(socket);stompClient.connect({}, () => { stompClient.subscribe(\'/topic/stocks\', (message) => { const quote = JSON.parse(message.body); updateStockTable(quote); }); stompClient.send(\"/app/stocks.filter\", {}, \"AAPL\");});
高级特性

Spring WebFlux 深度实践指南
RSocket集成(更强大的响应式协议):

@Controller@MessageMapping(\"stock.service\")public class RSocketStockController { @MessageMapping(\"current\") public Mono<StockQuote> current(String symbol) { return stockService.getCurrent(symbol); } @MessageMapping(\"stream\") public Flux<StockQuote> stream(String symbol) { return stockService.getStream(symbol); } @MessageMapping(\"channel\") public Flux<StockQuote> channel(Flux<String> symbols) { return symbols.flatMap(stockService::getStream); }}

背压控制

@MessageMapping(\"large.data.stream\")public Flux<DataChunk> largeDataStream() { return dataService.streamLargeData() .onBackpressureBuffer(50, // 缓冲区大小 chunk -> log.warn(\"Dropping chunk due to backpressure\"));}
安全配置
@Configuration@EnableWebFluxSecuritypublic class SecurityConfig { @Bean public SecurityWebFilterChain securityFilterChain(ServerHttpSecurity http) { return http .authorizeExchange() .pathMatchers(\"/ws/**\").authenticated() .anyExchange().permitAll() .and() .httpBasic() .and() .csrf().disable() .build(); } @Bean public MapReactiveUserDetailsService userDetailsService() { UserDetails user = User.withUsername(\"user\") .password(\"{noop}password\") .roles(\"USER\") .build(); return new MapReactiveUserDetailsService(user); }}

性能监控与最佳实践

Spring WebFlux 深度实践指南

监控端点配置

management: endpoints: web: exposure: include: health, metrics, prometheus metrics: tags: application: ${spring.application.name}

响应式应用监控

@Beanpublic MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() { return registry -> registry.config() .commonTags(\"application\", \"reactive-demo\");}// 自定义指标@Beanpublic WebFilter metricsWebFilter(MeterRegistry registry) { return (exchange, chain) -> { String path = exchange.getRequest().getPath().toString(); Timer.Sample sample = Timer.start(registry); return chain.filter(exchange) .doOnSuccessOrError((done, ex) -> { sample.stop(registry.timer(\"http.requests\",  \"uri\", path,  \"status\", exchange.getResponse().getStatusCode().toString(),  \"method\", exchange.getRequest().getMethodValue())); }); };}

最佳实践总结

  1. 线程模型理解

    • WebFlux 默认使用 Netty 事件循环线程
    • 阻塞操作必须使用 publishOn 切换到弹性线程池
  2. 背压策略选择

    • UI 客户端:使用 onBackpressureDroponBackpressureLatest
    • 服务间通信:使用 onBackpressureBuffer 配合合理缓冲区大小
  3. 错误处理原则

    • 尽早处理错误
    • 为每个 Flux/Mono 链添加错误处理
    • 区分业务异常和系统异常
  4. 测试策略

    • 使用 StepVerifier 测试响应式流
    • 使用 WebTestClient 测试控制器
    • 虚拟时间测试长时间操作
  5. 性能调优

    • 合理配置连接池
    • 监控关键指标(延迟、吞吐量、资源使用率)
    • 使用响应式日志框架(如 Logback 异步Appender)

通过以上全面实践,您将能够构建高性能、可扩展的响应式 Web 应用,充分利用 WebFlux 的非阻塞特性,处理高并发场景下的各种挑战。