Spring WebFlux 入门指南(初学者快速上手)

2026/1/6 笔记WebFlux

# 1. 简介

Spring WebFlux 是 Spring 在 5.x 引入的响应式(Reactive)Web 框架,用于构建非阻塞、异步的网络应用。它基于 Reactive Streams 规范,并与 Project Reactor(MonoFlux)紧密集成。WebFlux 支持运行在 Netty 等非阻塞服务器上,也可以在 Servlet 容器中以非阻塞方式运行。适合高并发、I/O 密集型场景。

# 2. 核心概念与原理

  • 响应式编程(Reactive Programming):通过声明式的数据流和异步处理,把同步阻塞变为异步非阻塞,常用接口是 PublisherSubscriber(Reactive Streams)。
  • Project Reactor:Spring 官方推荐的响应式库,提供两种主要类型:
    • Mono<T>:表示 0 或 1 个元素的异步序列。
    • Flux<T>:表示 0..N 个元素的异步序列。
  • 背压(Backpressure):下游可以控制上游生产数据的速率,避免消费者被淹没。
  • 非阻塞 I/O:底层使用 NIO/Netty 实现,线程不会因为等待 I/O 而被阻塞,从而用更少的线程处理更多并发连接。
  • 事件驱动 & 组合算子:通过 mapflatMapfilterretry 等操作符组合异步链条。

# 3. 编程模型(注解式 vs 函数式)

  • 注解式(注解 + 控制器):类似于 Spring MVC 的风格,使用 @RestController@GetMapping 等注解,但方法返回 Mono/Flux
  • 函数式(Router + Handler):使用 RouterFunctionHandlerFunction,更轻量、适合构建函数式 API 或做网关/代理场景。

# 4. 关键类与模块

  • Mono, Flux(Reactor 核心)
  • WebClient:非阻塞 HTTP 客户端,替代 RestTemplate(在响应式场景中)。
  • WebTestClient:用于测试 WebFlux 应用。
  • RouterFunction, HandlerFunction:函数式路由处理。
  • @RestController + 注解映射:熟悉 MVC 的人更易上手。
  • 数据访问:配合 R2DBC(响应式关系型驱动)或 Reactive MongoDB(reactive repository)。

# 5. 实战:常见后端能力的 WebFlux 实践示例

本节聚焦真实业务中最常用的三类基础设施:数据库、缓存、消息队列,全部以 WebFlux + 响应式方式示例,便于直接在项目中落地。

# 5.1 连接数据库(响应式数据访问)

# 5.1.1 为什么不用 JDBC

  • JDBC 是阻塞式的,在 WebFlux 中直接使用会阻塞 Netty 事件循环线程
  • 正确姿势:
    • 优先选择响应式驱动(如 R2DBC、Reactive MongoDB)
    • 或将 JDBC 放入 boundedElastic 线程池(仅作为过渡方案)

# 5.1.2 R2DBC + WebFlux 示例(关系型数据库)

实体定义

@Table("user")
public class User {
    @Id
    private Long id;
    private String name;
    private Integer age;
}
1
2
3
4
5
6
7

Repository(响应式)

public interface UserRepository extends ReactiveCrudRepository<User, Long> {
    Flux<User> findByAgeGreaterThan(int age);
}
1
2
3

Service 层

@Service
public class UserService {
    private final UserRepository userRepository;

    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Mono<User> getUser(Long id) {
        return userRepository.findById(id);
    }

    public Flux<User> listAdults() {
        return userRepository.findByAgeGreaterThan(18);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Controller

@RestController
@RequestMapping("/users")
public class UserController {

    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @GetMapping("/{id}")
    public Mono<User> get(@PathVariable Long id) {
        return userService.getUser(id);
    }

    @GetMapping
    public Flux<User> list() {
        return userService.listAdults();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

⚠️ 注意:事务需要使用 R2dbcTransactionManager,不能再用传统 @Transactional(JDBC 版本)。

# 5.2 连接缓存(Redis 响应式)

# 5.2.1 使用场景

  • 热点数据缓存
  • 接口防刷 / 限流
  • 会话、Token、临时状态存储

# 5.2.2 Reactive Redis 示例

@Service
public class UserCacheService {

    private final ReactiveStringRedisTemplate redisTemplate;

    public UserCacheService(ReactiveStringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public Mono<String> getUserName(Long userId) {
        return redisTemplate.opsForValue()
            .get("user:name:" + userId);
    }

    public Mono<Boolean> cacheUserName(Long userId, String name) {
        return redisTemplate.opsForValue()
            .set("user:name:" + userId, name, Duration.ofMinutes(10));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

缓存 + 数据库组合示例(Cache Aside)

public Mono<User> getUserWithCache(Long id) {
    String key = "user:" + id;
    return redisTemplate.opsForValue().get(key)
        .flatMap(json -> Mono.just(deserialize(json)))
        .switchIfEmpty(
            userRepository.findById(id)
                .flatMap(user -> redisTemplate.opsForValue()
                    .set(key, serialize(user), Duration.ofMinutes(5))
                    .thenReturn(user))
        );
}
1
2
3
4
5
6
7
8
9
10
11

# 5.3 连接消息队列(事件驱动)

# 5.3.1 典型应用场景

  • 异步解耦(下单 → 通知 → 积分)
  • 削峰填谷
  • 日志、审计、行为采集

# 5.3.2 Kafka(响应式 Producer / Consumer)

发送消息(Producer)

@Service
public class EventProducer {

    private final KafkaSender<String, String> sender;

    public EventProducer(KafkaSender<String, String> sender) {
        this.sender = sender;
    }

    public Mono<Void> send(String topic, String msg) {
        return sender.send(
                Mono.just(SenderRecord.create(topic, null, null, msg, null))
            )
            .then();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

消费消息(Consumer)

@Component
public class EventConsumer {

    @PostConstruct
    public void consume() {
        KafkaReceiver.create(receiverOptions)
            .receive()
            .flatMap(record -> {
                // 业务处理
                return Mono.fromRunnable(() -> handle(record.value()))
                           .then(record.receiverOffset().commit());
            })
            .subscribe();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

对于 RabbitMQ,可使用 spring-rabbit-stream 或 Reactor RabbitMQ,模型类似。

# 5.4 混用阻塞资源的正确姿势(兜底方案)

如果必须调用阻塞接口(如老 JDBC / SDK):

Mono.fromCallable(() -> legacyService.query())
    .subscribeOn(Schedulers.boundedElastic());
1
2

⚠️ 这是妥协方案,不是推荐方案,应逐步迁移为响应式组件。

# 6. WebFlux 架构选型指南

# 6.1 什么场景值得用 WebFlux

非常适合的场景

  1. I/O 密集型系统

    • 网关 / BFF / API 聚合层
    • 一个请求需要调用多个下游 HTTP / RPC / DB / MQ
  2. 高并发 + 长连接

    • SSE(Server-Sent Events)
    • WebSocket / 实时推送
  3. 外部依赖多、慢且不可控

    • 第三方 API
    • 跨网络调用(云 API、跨地域)
  4. 天然事件驱动模型

    • 消息消费、流式处理
    • 异步任务编排

👉 核心判断标准:

线程是否经常“等 I/O”? 如果是,WebFlux 值得考虑。

# 6.2 什么场景千万别用 WebFlux(面试必问)

不推荐 / 慎用场景

  1. CPU 密集型计算

    • 大量复杂计算、图像处理
    • WebFlux 无法让 CPU 算得更快
  2. 强依赖阻塞生态

    • 只能使用 JDBC / 老 SDK
    • 无法改造为响应式
  3. 小并发、CRUD 系统

    • 管理后台
    • 内部系统(QPS 很低)
  4. 团队响应式经验不足

    • 调试困难
    • 心智负担明显高于 MVC

# 7. 完整业务链路示例(强实践)

一个接口串联 Controller → Cache → DB → MQ → 下游 HTTP

# 7.1 业务场景说明

  • 查询用户信息
  • 先查 Redis 缓存
  • 缓存未命中查 DB
  • 写入访问日志到 MQ
  • 并行调用下游风控服务

# 7.2 Service 编排示例

public Mono<UserDTO> getUserDetail(Long userId) {
    String cacheKey = "user:" + userId;

    Mono<User> userMono = redisTemplate.opsForValue()
        .get(cacheKey)
        .map(this::deserialize)
        .switchIfEmpty(
            userRepository.findById(userId)
                .flatMap(user -> redisTemplate.opsForValue()
                    .set(cacheKey, serialize(user), Duration.ofMinutes(5))
                    .thenReturn(user))
        );

    Mono<Void> logMono = eventProducer.send("user-log", "query:" + userId);

    Mono<RiskResult> riskMono = webClient.get()
        .uri("/risk/{id}", userId)
        .retrieve()
        .bodyToMono(RiskResult.class)
        .timeout(Duration.ofSeconds(2))
        .onErrorReturn(RiskResult.DEFAULT);

    return Mono.zip(userMono, riskMono)
        .flatMap(tuple -> logMono.thenReturn(
            new UserDTO(tuple.getT1(), tuple.getT2())
        ));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 7.3 Controller

@GetMapping("/{id}/detail")
public Mono<UserDTO> detail(@PathVariable Long id) {
    return userService.getUserDetail(id);
}
1
2
3
4

👉 加分点

  • 缓存 / DB / HTTP 并行
  • MQ 解耦非核心流程
  • 超时 + 降级清晰

# 8. FAQ

# Q1:mapflatMap 有什么区别?

标准回答

  • map:同步转换,一个元素变一个元素
  • flatMap:异步展开,用于返回 Mono / Flux
mono.map(x -> x + 1);
mono.flatMap(x -> service.call(x));
1
2

口诀:返回值是不是 Mono/Flux?是就用 flatMap

# Q2:subscribeOn vs publishOn

对比 subscribeOn publishOn
作用范围 整个链路 之后的操作
使用场景 切换数据源线程 中途切线程
Mono.fromCallable(this::blockCall)
    .subscribeOn(Schedulers.boundedElastic())
    .publishOn(Schedulers.parallel());
1
2
3

# Q3:为什么 WebFlux 不等于高性能?

标准回答(非常重要)

WebFlux 提升的是 并发能力和资源利用率,而不是单请求性能。

  • 单次请求耗时 ≈ MVC
  • 高并发下 WebFlux 用更少线程
  • 如果没有 I/O 阻塞,WebFlux 甚至可能更慢

# 9. 最佳工程实践

  1. 避免阻塞调用:不要在响应式链中调用阻塞 API(JDBC、文件 I/O 等)。如果必须调用,使用 Schedulers.boundedElastic() 并把阻塞调用放在 publishOn/subscribeOn 上。尽量使用 R2DBC、Reactive MongoDB 等响应式驱动。
  2. 限制并发与连接数:给 WebClient / Netty 配置合适的连接池、超时、重试策略,避免资源耗尽。
  3. 合理使用 flatMap vs concatMap:并发合并时用 flatMap(并行),需要顺序时用 concatMap
  4. 处理错误:在链路的合适位置用 onErrorResumeonErrorMap 做降级与埋点。
  5. 测试:使用 WebTestClient + StepVerifier 测试响应式流行为。
  6. 指标与监控:结合 Micrometer、Prometheus、日志链路追踪(如 Sleuth 或 OpenTelemetry)观察延迟与背压情况。
  7. 上下文传递:使用 Reactor 的 Context 传递元信息(用户、traceId),注意不要滥用影响性能。
  8. 线程亲和性:不要在 Reactor 的事件循环线程上执行耗时或阻塞工作。

# 7. 常见问题与排查

  • 症状:响应卡住 / 线程耗尽

    • 排查点:是否有阻塞调用(例如 JDBC)出现在响应式链?是否错误地在事件循环线程上执行阻塞逻辑?检查堆栈、监控线程池使用。
  • 症状:吞吐下降 / 高延迟

    • 排查点:背压未正确使用、外部服务慢、连接数不足、Netty 事件循环被阻塞。
  • 症状:和 Spring MVC 混用出现问题

    • 排查点:同一应用混合阻塞与非阻塞处理会带来复杂性,注意 servlet 容器下行为和 Netty 下行为的差异。
  • 常见 NPE / 数据流为空

    • 检查 Mono.empty()switchIfEmpty() 用法,以及 bodyToMono/bodyToFlux 的类型匹配。

# 8. 常用方法与操作符速查表

  • 创建:Mono.just(), Mono.empty(), Mono.fromCallable(), Flux.just(), Flux.fromIterable(), Flux.range()
  • 转换:map(), flatMap(), flatMapSequential(), concatMap()
  • 过滤/组合:filter(), zipWith(), merge(), concat()
  • 错误处理:onErrorResume(), onErrorReturn(), retryWhen()
  • 调度:subscribeOn(), publishOn()
  • 延时/节流:delayElements(), throttleFirst()(自定义操作)
  • 流终结:collectList(), block()(仅在测试或特殊场景下谨慎使用)

# 9. 注意事项 & 性能陷阱

  • 不要在生产代码中使用 block(),除非在启动逻辑或测试场景;block() 会阻塞线程,破坏非阻塞模型。
  • R2DBC vs JDBC:JDBC 是阻塞的,若在 WebFlux 中使用 JDBC,必须放到专用线程池。优先选用 R2DBC 等响应式驱动。
  • 耗时计算要迁移到合适线程池:CPU 密集型或阻塞工作不要放在 Netty 事件循环上。
  • 合理设置超时和限流:防止单个慢请求耗尽资源。