Spring WebFlux 入门指南(初学者快速上手)
# 1. 简介
Spring WebFlux 是 Spring 在 5.x 引入的响应式(Reactive)Web 框架,用于构建非阻塞、异步的网络应用。它基于 Reactive Streams 规范,并与 Project Reactor(Mono、Flux)紧密集成。WebFlux 支持运行在 Netty 等非阻塞服务器上,也可以在 Servlet 容器中以非阻塞方式运行。适合高并发、I/O 密集型场景。
# 2. 核心概念与原理
- 响应式编程(Reactive Programming):通过声明式的数据流和异步处理,把同步阻塞变为异步非阻塞,常用接口是
Publisher、Subscriber(Reactive Streams)。 - Project Reactor:Spring 官方推荐的响应式库,提供两种主要类型:
Mono<T>:表示 0 或 1 个元素的异步序列。Flux<T>:表示 0..N 个元素的异步序列。
- 背压(Backpressure):下游可以控制上游生产数据的速率,避免消费者被淹没。
- 非阻塞 I/O:底层使用 NIO/Netty 实现,线程不会因为等待 I/O 而被阻塞,从而用更少的线程处理更多并发连接。
- 事件驱动 & 组合算子:通过
map、flatMap、filter、retry等操作符组合异步链条。
# 3. 编程模型(注解式 vs 函数式)
- 注解式(注解 + 控制器):类似于 Spring MVC 的风格,使用
@RestController、@GetMapping等注解,但方法返回Mono/Flux。 - 函数式(Router + Handler):使用
RouterFunction、HandlerFunction,更轻量、适合构建函数式 API 或做网关/代理场景。
# 4. 关键类与模块
Mono,Flux(Reactor 核心)WebClient:非阻塞 HTTP 客户端,替代RestTemplate(在响应式场景中)。WebTestClient:用于测试 WebFlux 应用。RouterFunction,HandlerFunction:函数式路由处理。@RestController+ 注解映射:熟悉 MVC 的人更易上手。- 数据访问:配合 R2DBC(响应式关系型驱动)或 Reactive MongoDB(reactive repository)。
# 5. 实战:常见后端能力的 WebFlux 实践示例
本节聚焦真实业务中最常用的三类基础设施:数据库、缓存、消息队列,全部以 WebFlux + 响应式方式示例,便于直接在项目中落地。
# 5.1 连接数据库(响应式数据访问)
# 5.1.1 为什么不用 JDBC
- JDBC 是阻塞式的,在 WebFlux 中直接使用会阻塞 Netty 事件循环线程
- 正确姿势:
- 优先选择响应式驱动(如 R2DBC、Reactive MongoDB)
- 或将 JDBC 放入
boundedElastic线程池(仅作为过渡方案)
# 5.1.2 R2DBC + WebFlux 示例(关系型数据库)
实体定义
@Table("user")
public class User {
@Id
private Long id;
private String name;
private Integer age;
}
2
3
4
5
6
7
Repository(响应式)
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByAgeGreaterThan(int age);
}
2
3
Service 层
@Service
public class UserService {
private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Mono<User> getUser(Long id) {
return userRepository.findById(id);
}
public Flux<User> listAdults() {
return userRepository.findByAgeGreaterThan(18);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Controller
@RestController
@RequestMapping("/users")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
@GetMapping("/{id}")
public Mono<User> get(@PathVariable Long id) {
return userService.getUser(id);
}
@GetMapping
public Flux<User> list() {
return userService.listAdults();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
⚠️ 注意:事务需要使用
R2dbcTransactionManager,不能再用传统@Transactional(JDBC 版本)。
# 5.2 连接缓存(Redis 响应式)
# 5.2.1 使用场景
- 热点数据缓存
- 接口防刷 / 限流
- 会话、Token、临时状态存储
# 5.2.2 Reactive Redis 示例
@Service
public class UserCacheService {
private final ReactiveStringRedisTemplate redisTemplate;
public UserCacheService(ReactiveStringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Mono<String> getUserName(Long userId) {
return redisTemplate.opsForValue()
.get("user:name:" + userId);
}
public Mono<Boolean> cacheUserName(Long userId, String name) {
return redisTemplate.opsForValue()
.set("user:name:" + userId, name, Duration.ofMinutes(10));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
缓存 + 数据库组合示例(Cache Aside)
public Mono<User> getUserWithCache(Long id) {
String key = "user:" + id;
return redisTemplate.opsForValue().get(key)
.flatMap(json -> Mono.just(deserialize(json)))
.switchIfEmpty(
userRepository.findById(id)
.flatMap(user -> redisTemplate.opsForValue()
.set(key, serialize(user), Duration.ofMinutes(5))
.thenReturn(user))
);
}
2
3
4
5
6
7
8
9
10
11
# 5.3 连接消息队列(事件驱动)
# 5.3.1 典型应用场景
- 异步解耦(下单 → 通知 → 积分)
- 削峰填谷
- 日志、审计、行为采集
# 5.3.2 Kafka(响应式 Producer / Consumer)
发送消息(Producer)
@Service
public class EventProducer {
private final KafkaSender<String, String> sender;
public EventProducer(KafkaSender<String, String> sender) {
this.sender = sender;
}
public Mono<Void> send(String topic, String msg) {
return sender.send(
Mono.just(SenderRecord.create(topic, null, null, msg, null))
)
.then();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
消费消息(Consumer)
@Component
public class EventConsumer {
@PostConstruct
public void consume() {
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> {
// 业务处理
return Mono.fromRunnable(() -> handle(record.value()))
.then(record.receiverOffset().commit());
})
.subscribe();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
对于 RabbitMQ,可使用
spring-rabbit-stream或 Reactor RabbitMQ,模型类似。
# 5.4 混用阻塞资源的正确姿势(兜底方案)
如果必须调用阻塞接口(如老 JDBC / SDK):
Mono.fromCallable(() -> legacyService.query())
.subscribeOn(Schedulers.boundedElastic());
2
⚠️ 这是妥协方案,不是推荐方案,应逐步迁移为响应式组件。
# 6. WebFlux 架构选型指南
# 6.1 什么场景值得用 WebFlux
✅ 非常适合的场景:
I/O 密集型系统
- 网关 / BFF / API 聚合层
- 一个请求需要调用多个下游 HTTP / RPC / DB / MQ
高并发 + 长连接
- SSE(Server-Sent Events)
- WebSocket / 实时推送
外部依赖多、慢且不可控
- 第三方 API
- 跨网络调用(云 API、跨地域)
天然事件驱动模型
- 消息消费、流式处理
- 异步任务编排
👉 核心判断标准:
线程是否经常“等 I/O”? 如果是,WebFlux 值得考虑。
# 6.2 什么场景千万别用 WebFlux(面试必问)
❌ 不推荐 / 慎用场景:
CPU 密集型计算
- 大量复杂计算、图像处理
- WebFlux 无法让 CPU 算得更快
强依赖阻塞生态
- 只能使用 JDBC / 老 SDK
- 无法改造为响应式
小并发、CRUD 系统
- 管理后台
- 内部系统(QPS 很低)
团队响应式经验不足
- 调试困难
- 心智负担明显高于 MVC
# 7. 完整业务链路示例(强实践)
一个接口串联 Controller → Cache → DB → MQ → 下游 HTTP
# 7.1 业务场景说明
- 查询用户信息
- 先查 Redis 缓存
- 缓存未命中查 DB
- 写入访问日志到 MQ
- 并行调用下游风控服务
# 7.2 Service 编排示例
public Mono<UserDTO> getUserDetail(Long userId) {
String cacheKey = "user:" + userId;
Mono<User> userMono = redisTemplate.opsForValue()
.get(cacheKey)
.map(this::deserialize)
.switchIfEmpty(
userRepository.findById(userId)
.flatMap(user -> redisTemplate.opsForValue()
.set(cacheKey, serialize(user), Duration.ofMinutes(5))
.thenReturn(user))
);
Mono<Void> logMono = eventProducer.send("user-log", "query:" + userId);
Mono<RiskResult> riskMono = webClient.get()
.uri("/risk/{id}", userId)
.retrieve()
.bodyToMono(RiskResult.class)
.timeout(Duration.ofSeconds(2))
.onErrorReturn(RiskResult.DEFAULT);
return Mono.zip(userMono, riskMono)
.flatMap(tuple -> logMono.thenReturn(
new UserDTO(tuple.getT1(), tuple.getT2())
));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 7.3 Controller
@GetMapping("/{id}/detail")
public Mono<UserDTO> detail(@PathVariable Long id) {
return userService.getUserDetail(id);
}
2
3
4
👉 加分点:
- 缓存 / DB / HTTP 并行
- MQ 解耦非核心流程
- 超时 + 降级清晰
# 8. FAQ
# Q1:map 和 flatMap 有什么区别?
标准回答:
map:同步转换,一个元素变一个元素flatMap:异步展开,用于返回Mono / Flux
mono.map(x -> x + 1);
mono.flatMap(x -> service.call(x));
2
口诀:返回值是不是 Mono/Flux?是就用 flatMap
# Q2:subscribeOn vs publishOn
| 对比 | subscribeOn | publishOn |
|---|---|---|
| 作用范围 | 整个链路 | 之后的操作 |
| 使用场景 | 切换数据源线程 | 中途切线程 |
Mono.fromCallable(this::blockCall)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel());
2
3
# Q3:为什么 WebFlux 不等于高性能?
标准回答(非常重要):
WebFlux 提升的是 并发能力和资源利用率,而不是单请求性能。
- 单次请求耗时 ≈ MVC
- 高并发下 WebFlux 用更少线程
- 如果没有 I/O 阻塞,WebFlux 甚至可能更慢
# 9. 最佳工程实践
- 避免阻塞调用:不要在响应式链中调用阻塞 API(JDBC、文件 I/O 等)。如果必须调用,使用
Schedulers.boundedElastic()并把阻塞调用放在publishOn/subscribeOn上。尽量使用 R2DBC、Reactive MongoDB 等响应式驱动。 - 限制并发与连接数:给 WebClient / Netty 配置合适的连接池、超时、重试策略,避免资源耗尽。
- 合理使用
flatMapvsconcatMap:并发合并时用flatMap(并行),需要顺序时用concatMap。 - 处理错误:在链路的合适位置用
onErrorResume、onErrorMap做降级与埋点。 - 测试:使用
WebTestClient+StepVerifier测试响应式流行为。 - 指标与监控:结合 Micrometer、Prometheus、日志链路追踪(如 Sleuth 或 OpenTelemetry)观察延迟与背压情况。
- 上下文传递:使用 Reactor 的
Context传递元信息(用户、traceId),注意不要滥用影响性能。 - 线程亲和性:不要在 Reactor 的事件循环线程上执行耗时或阻塞工作。
# 7. 常见问题与排查
症状:响应卡住 / 线程耗尽
- 排查点:是否有阻塞调用(例如 JDBC)出现在响应式链?是否错误地在事件循环线程上执行阻塞逻辑?检查堆栈、监控线程池使用。
症状:吞吐下降 / 高延迟
- 排查点:背压未正确使用、外部服务慢、连接数不足、Netty 事件循环被阻塞。
症状:和 Spring MVC 混用出现问题
- 排查点:同一应用混合阻塞与非阻塞处理会带来复杂性,注意 servlet 容器下行为和 Netty 下行为的差异。
常见 NPE / 数据流为空
- 检查
Mono.empty()、switchIfEmpty()用法,以及bodyToMono/bodyToFlux的类型匹配。
- 检查
# 8. 常用方法与操作符速查表
- 创建:
Mono.just(),Mono.empty(),Mono.fromCallable(),Flux.just(),Flux.fromIterable(),Flux.range() - 转换:
map(),flatMap(),flatMapSequential(),concatMap() - 过滤/组合:
filter(),zipWith(),merge(),concat() - 错误处理:
onErrorResume(),onErrorReturn(),retryWhen() - 调度:
subscribeOn(),publishOn() - 延时/节流:
delayElements(),throttleFirst()(自定义操作) - 流终结:
collectList(),block()(仅在测试或特殊场景下谨慎使用)
# 9. 注意事项 & 性能陷阱
- 不要在生产代码中使用
block(),除非在启动逻辑或测试场景;block()会阻塞线程,破坏非阻塞模型。 - R2DBC vs JDBC:JDBC 是阻塞的,若在 WebFlux 中使用 JDBC,必须放到专用线程池。优先选用 R2DBC 等响应式驱动。
- 耗时计算要迁移到合适线程池:CPU 密集型或阻塞工作不要放在 Netty 事件循环上。
- 合理设置超时和限流:防止单个慢请求耗尽资源。