Spring WebFlux 深度实践指南
文章目录
-
-
- 4.3.1 构建 Reactive REST API
-
- 基础项目搭建
- 响应式控制器
- 高级特性
- 4.3.2 响应式数据库访问(R2DBC)
-
- R2DBC 配置
- 响应式Repository
- 复杂查询与事务
- 性能优化
- 4.3.3 WebSocket 实时通信
-
- 基础WebSocket配置
- 响应式WebSocket处理
- 高级特性
- 安全配置
- 性能监控与最佳实践
-
- 监控端点配置
- 响应式应用监控
- 最佳实践总结
-
Spring WebFlux 是 Spring Framework 5 引入的响应式 Web 框架,基于 Project Reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 WebFlux 的核心功能,包括 REST API 构建、响应式数据库访问和实时通信。
4.3.1 构建 Reactive REST API
基础项目搭建
首先创建 Spring Boot WebFlux 项目(基于 Spring Initializr):
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId></dependency>
响应式控制器
注解式控制器(与传统Spring MVC类似但支持响应式类型):
@RestController@RequestMapping(\"/products\")public class ProductController { private final ProductService productService; // 构造函数注入 public ProductController(ProductService productService) { this.productService = productService; } @GetMapping public Flux<Product> getAllProducts() { return productService.findAll(); } @GetMapping(\"/{id}\") public Mono<Product> getProductById(@PathVariable String id) { return productService.findById(id); } @PostMapping @ResponseStatus(HttpStatus.CREATED) public Mono<Product> createProduct(@RequestBody Mono<Product> productMono) { return productService.save(productMono); } @PutMapping(\"/{id}\") public Mono<Product> updateProduct( @PathVariable String id, @RequestBody Mono<Product> productMono) { return productService.update(id, productMono); } @DeleteMapping(\"/{id}\") @ResponseStatus(HttpStatus.NO_CONTENT) public Mono<Void> deleteProduct(@PathVariable String id) { return productService.delete(id); }}
函数式端点(RouterFunction方式):
@Configurationpublic class ProductRouter { @Bean public RouterFunction<ServerResponse> route(ProductHandler handler) { return RouterFunctions.route() .GET(\"/fn/products\", handler::getAll) .GET(\"/fn/products/{id}\", handler::getById) .POST(\"/fn/products\", handler::create) .PUT(\"/fn/products/{id}\", handler::update) .DELETE(\"/fn/products/{id}\", handler::delete) .build(); }}@Componentpublic class ProductHandler { private final ProductService productService; public ProductHandler(ProductService productService) { this.productService = productService; } public Mono<ServerResponse> getAll(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_NDJSON) .body(productService.findAll(), Product.class); } public Mono<ServerResponse> getById(ServerRequest request) { String id = request.pathVariable(\"id\"); return productService.findById(id) .flatMap(product -> ServerResponse.ok().bodyValue(product)) .switchIfEmpty(ServerResponse.notFound().build()); } public Mono<ServerResponse> create(ServerRequest request) { return request.bodyToMono(Product.class) .flatMap(productService::save) .flatMap(product -> ServerResponse .created(URI.create(\"/fn/products/\" + product.getId())) .bodyValue(product)); } public Mono<ServerResponse> update(ServerRequest request) { String id = request.pathVariable(\"id\"); return request.bodyToMono(Product.class) .flatMap(product -> productService.update(id, Mono.just(product))) .flatMap(product -> ServerResponse.ok().bodyValue(product)) .switchIfEmpty(ServerResponse.notFound().build()); } public Mono<ServerResponse> delete(ServerRequest request) { String id = request.pathVariable(\"id\"); return productService.delete(id) .then(ServerResponse.noContent().build()); }}
高级特性
流式响应(Server-Sent Events):
@GetMapping(value = \"/stream\", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ProductEvent> streamProducts() { return Flux.interval(Duration.ofSeconds(1)) .map(sequence -> new ProductEvent( \"product-\" + sequence, \"Event at \" + Instant.now() ));}
请求验证与异常处理:
@ControllerAdvicepublic class GlobalErrorHandler extends AbstractErrorWebExceptionHandler { public GlobalErrorHandler(ErrorAttributes errorAttributes, WebProperties.Resources resources, ApplicationContext applicationContext, ServerCodecConfigurer serverCodecConfigurer) { super(errorAttributes, resources, applicationContext); this.setMessageWriters(serverCodecConfigurer.getWriters()); } @Override protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) { return RouterFunctions.route( RequestPredicates.all(), request -> { Map<String, Object> errorProperties = getErrorAttributes(request, ErrorAttributeOptions.defaults()); HttpStatus status = getHttpStatus(errorProperties); return ServerResponse.status(status) .contentType(MediaType.APPLICATION_JSON) .bodyValue(errorProperties); } ); } private HttpStatus getHttpStatus(Map<String, Object> errorProperties) { return HttpStatus.valueOf((Integer)errorProperties.get(\"status\")); }}// 自定义验证public class ProductValidator { public static Mono<Product> validate(Product product) { return Mono.just(product) .flatMap(p -> { List<String> errors = new ArrayList<>(); if (p.getName() == null || p.getName().isEmpty()) { errors.add(\"Product name is required\"); } if (p.getPrice() <= 0) { errors.add(\"Price must be positive\"); } if (!errors.isEmpty()) { return Mono.error(new ValidationException(errors)); } return Mono.just(p); }); }}
4.3.2 响应式数据库访问(R2DBC)
R2DBC 配置
添加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency> <groupId>io.r2dbc</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope></dependency>
配置 application.yml:
spring: r2dbc: url: r2dbc:postgresql://localhost:5432/mydb username: user password: pass pool: enabled: true max-size: 20
响应式Repository
定义实体:
@Data@Table(\"products\")public class Product { @Id private Long id; private String name; private String description; private BigDecimal price; private Instant createdAt;}
创建Repository接口:
public interface ProductRepository extends ReactiveCrudRepository<Product, Long> { Flux<Product> findByNameContaining(String name); @Query(\"SELECT * FROM products WHERE price > :minPrice\") Flux<Product> findByPriceGreaterThan(BigDecimal minPrice); @Modifying @Query(\"UPDATE products SET price = price * :factor\") Mono<Integer> updateAllPrices(BigDecimal factor);}
复杂查询与事务
自定义查询实现:
public class ProductRepositoryImpl implements CustomProductRepository { private final DatabaseClient databaseClient; public ProductRepositoryImpl(DatabaseClient databaseClient) { this.databaseClient = databaseClient; } @Override public Flux<Product> complexSearch(ProductCriteria criteria) { return databaseClient.sql(\"\"\" SELECT * FROM products WHERE name LIKE :name AND price BETWEEN :minPrice AND :maxPrice ORDER BY :sortField :sortDirection LIMIT :limit OFFSET :offset \"\"\") .bind(\"name\", \"%\" + criteria.getName() + \"%\") .bind(\"minPrice\", criteria.getMinPrice()) .bind(\"maxPrice\", criteria.getMaxPrice()) .bind(\"sortField\", criteria.getSortField()) .bind(\"sortDirection\", criteria.getSortDirection()) .bind(\"limit\", criteria.getPageSize()) .bind(\"offset\", (criteria.getPageNumber() - 1) * criteria.getPageSize()) .map((row, metadata) -> toProduct(row)) .all(); } private Product toProduct(Row row) { // 行到对象的转换逻辑 }}
事务管理:
@Service@RequiredArgsConstructorpublic class ProductService { private final ProductRepository productRepository; private final TransactionalOperator transactionalOperator; public Mono<Void> transferStock(String fromId, String toId, int quantity) { return transactionalOperator.execute(status -> productRepository.findById(fromId) .flatMap(fromProduct -> { if (fromProduct.getStock() < quantity) { return Mono.error(new InsufficientStockException()); } fromProduct.setStock(fromProduct.getStock() - quantity); return productRepository.save(fromProduct) .then(productRepository.findById(toId)) .flatMap(toProduct -> { toProduct.setStock(toProduct.getStock() + quantity); return productRepository.save(toProduct); }); }) ); }}
性能优化
- 连接池配置:
spring: r2dbc: pool: max-size: 20 initial-size: 5 max-idle-time: 30m
- 批处理操作:
public Mono<Integer> batchInsert(List<Product> products) { return databaseClient.inConnectionMany(connection -> { Batch batch = connection.createBatch(); products.forEach(product -> batch.add(\"INSERT INTO products(name, price) VALUES($1, $2)\") .bind(0, product.getName()) .bind(1, product.getPrice()) ); return Flux.from(batch.execute()) .reduce(0, (count, result) -> count + result.getRowsUpdated()); });}
4.3.3 WebSocket 实时通信
基础WebSocket配置
配置类:
@Configuration@EnableWebFluxpublic class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint(\"/ws\") .setHandshakeHandler(new DefaultHandshakeHandler()) .setAllowedOrigins(\"*\"); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker(\"/topic\"); registry.setApplicationDestinationPrefixes(\"/app\"); }}
响应式WebSocket处理
股票行情推送示例:
@Controllerpublic class StockTickerController { private final Flux<StockQuote> stockQuoteFlux; public StockTickerController(StockQuoteGenerator quoteGenerator) { this.stockQuoteFlux = Flux.interval(Duration.ofMillis(500)) .map(sequence -> quoteGenerator.generate()) .share(); // 热发布,多个订阅者共享数据 } @MessageMapping(\"stocks.subscribe\") @SendTo(\"/topic/stocks\") public Flux<StockQuote> subscribe() { return stockQuoteFlux; } @MessageMapping(\"stocks.filter\") public Flux<StockQuote> filter(@Payload String symbol) { return stockQuoteFlux.filter(quote -> quote.getSymbol().equals(symbol)); }}
客户端连接示例:
const socket = new SockJS(\'/ws\');const stompClient = Stomp.over(socket);stompClient.connect({}, () => { stompClient.subscribe(\'/topic/stocks\', (message) => { const quote = JSON.parse(message.body); updateStockTable(quote); }); stompClient.send(\"/app/stocks.filter\", {}, \"AAPL\");});
高级特性
RSocket集成(更强大的响应式协议):
@Controller@MessageMapping(\"stock.service\")public class RSocketStockController { @MessageMapping(\"current\") public Mono<StockQuote> current(String symbol) { return stockService.getCurrent(symbol); } @MessageMapping(\"stream\") public Flux<StockQuote> stream(String symbol) { return stockService.getStream(symbol); } @MessageMapping(\"channel\") public Flux<StockQuote> channel(Flux<String> symbols) { return symbols.flatMap(stockService::getStream); }}
背压控制:
@MessageMapping(\"large.data.stream\")public Flux<DataChunk> largeDataStream() { return dataService.streamLargeData() .onBackpressureBuffer(50, // 缓冲区大小 chunk -> log.warn(\"Dropping chunk due to backpressure\"));}
安全配置
@Configuration@EnableWebFluxSecuritypublic class SecurityConfig { @Bean public SecurityWebFilterChain securityFilterChain(ServerHttpSecurity http) { return http .authorizeExchange() .pathMatchers(\"/ws/**\").authenticated() .anyExchange().permitAll() .and() .httpBasic() .and() .csrf().disable() .build(); } @Bean public MapReactiveUserDetailsService userDetailsService() { UserDetails user = User.withUsername(\"user\") .password(\"{noop}password\") .roles(\"USER\") .build(); return new MapReactiveUserDetailsService(user); }}
性能监控与最佳实践
监控端点配置
management: endpoints: web: exposure: include: health, metrics, prometheus metrics: tags: application: ${spring.application.name}
响应式应用监控
@Beanpublic MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() { return registry -> registry.config() .commonTags(\"application\", \"reactive-demo\");}// 自定义指标@Beanpublic WebFilter metricsWebFilter(MeterRegistry registry) { return (exchange, chain) -> { String path = exchange.getRequest().getPath().toString(); Timer.Sample sample = Timer.start(registry); return chain.filter(exchange) .doOnSuccessOrError((done, ex) -> { sample.stop(registry.timer(\"http.requests\", \"uri\", path, \"status\", exchange.getResponse().getStatusCode().toString(), \"method\", exchange.getRequest().getMethodValue())); }); };}
最佳实践总结
-
线程模型理解:
- WebFlux 默认使用 Netty 事件循环线程
- 阻塞操作必须使用
publishOn
切换到弹性线程池
-
背压策略选择:
- UI 客户端:使用
onBackpressureDrop
或onBackpressureLatest
- 服务间通信:使用
onBackpressureBuffer
配合合理缓冲区大小
- UI 客户端:使用
-
错误处理原则:
- 尽早处理错误
- 为每个 Flux/Mono 链添加错误处理
- 区分业务异常和系统异常
-
测试策略:
- 使用
StepVerifier
测试响应式流 - 使用
WebTestClient
测试控制器 - 虚拟时间测试长时间操作
- 使用
-
性能调优:
- 合理配置连接池
- 监控关键指标(延迟、吞吐量、资源使用率)
- 使用响应式日志框架(如 Logback 异步Appender)
通过以上全面实践,您将能够构建高性能、可扩展的响应式 Web 应用,充分利用 WebFlux 的非阻塞特性,处理高并发场景下的各种挑战。