Project Reactor使用手册
简介
Project Reactor 是一个由 Pivotal(现为 VMware)开发的 响应式编程框架,它基于 Reactive Streams 规范,专为 JVM(Java虚拟机)平台上的 异步、非阻塞应用程序而设计,特别适合处理高吞吐、低延迟的场景,比如微服务、Web 应用、流处理等。
Project Reactor又叫Spring Reactor,因为它是 Spring 团队主导开发的响应式流框架。
它属于 Spring 生态,但它本身 不依赖 Spring,可以在纯java项目中单独引入。
Project Reactor与JUC.Flow的关系
- Project Reactor和JUC.Flow都实现了Reactive Streams 规范。拥有相同的接口,只是包名不一样。
- Reactive Streams:org.reactivestreams.*
- JDK 9:java.util.concurrent.Flow.*
- Project Reactor支持JDK8,不依赖于JUC.Flow,是单独实现的接口。JUC.Flow是JDK9引入的,java官方内置的Reactive Streams的实现。
- Spring WebFlux使用的是Project Reactor,所以Spring WebFlux支持JDK8。
- JDK 提供了Project Reactor和JUC.Flow的互转适配器,但不推荐在项目中同时使用两套接口,避免混淆。
执行逻辑
- Flux定义数据构建与流处理逻辑。
flux.subscribe(consumer)- 定义消费数据的逻辑,调用时触发Flux数据构建与流处理逻辑。
- 数据准备好后触发消费数据逻辑。
两个核心类型
Project Reactor 提供了两个响应式数据类型:
Mono<T>表示0个或1个元素的异步数据序列(可理解为异步版本的Optional<T>)
示例:
Mono<String> mono = Mono.just("Hello Reactor");Flux<T>表示0个或多个元素的异步数据流(可理解为异步版本的Stream<T>)
示例:
Flux<Integer> flux = Flux.range(1, 5); // 1 到 5常用流API
- just:将元素构建成流
- range:按数字范围构建流
- create:异步生成流
- generate:同步生成流
- fromIterable、fromArray、fromStream:从迭代器、数组、流中构建flux流
- concat、concatWith、concatMap:按顺序订阅流。当上一个流完成并且调用了onComplete方法后,才会开始订阅下一个流。
- mergeSequential:已极快的速度按顺序订阅流
- merge、mergewith:按元素时间顺序合并多个流
- zip、zipWith:从多个流中遍历取出单个元素组合成数组流。
- switchIfEmpty:如果流为空转为自定义流
- defaultIfEmpty:如果流为空则为流添加一个默认元素
- handle:自定义处理流,对元素进行操作。
- transform:自定义流转换逻辑,对整个流处理。只会执行一次转换,所有订阅共享同一个转换器实例。
- transformDeferred:自定义流转换逻辑,对整个流处理。所有订阅独享自己的转换实例,即每一次订阅都会重新转换。
- log:打印流处理日志
- blockLast:阻塞调用线程直到最后一个元素完成
- delayElements:指定元素发射的时间间隔
- buffer:定义订阅者每次拿到的元素数量,数组的形式。
- cache:缓存指定数量的数据,后面的订阅者可以直接获取到缓存对象。如果使用了延迟发射,后面的订阅者可以继续获取后继对象。
- contextWrite:数据共享,仅对上游流处理函数可见。使用支持context对象的方法时才能获取到数据。
两种订阅方式
- 消费者订阅。
java
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 10);
flux.subscribe(System.out::println, e -> e.printStackTrace(), () -> {
System.out.println("完成");
});
}- 订阅者订阅,继承BaseSubscriber,或实现Subscriber接口。
java
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 10);
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("订阅了");
super.hookOnSubscribe(subscription);
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("结束了");
super.hookFinally(type);
}
@Override
protected void hookOnCancel() {
System.out.println("取消了");
super.hookOnCancel();
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("抛异常了");
super.hookOnError(throwable);
}
@Override
protected void hookOnComplete() {
System.out.println("完成了");
super.hookOnComplete();
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("获取下一个");
super.hookOnNext(value);
}
});事件回调API
- doOnSubscribe:流被订阅时的回调函数
java
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 10)
.doOnSubscribe(s -> {
System.out.println("流被订阅1" + s);
}).doOnSubscribe(s -> {
System.out.println("流被订阅2" + s);
}).doOnSubscribe(s -> {
System.out.println("流被订阅3" + s);
});
flux.subscribe(System.out::println);
}- doOnNext:每个元素到达时触发
- doOnEach:每个信号到达时的回调函数
- doOnRequest:流被请求时的回调方法
- doOnComplete:流正常结束时的回调方法
- doOnTerminate:流被终止时的回调方法,先执行doOnTerminate后执行doOnComplete/doOnError
- doOnCancel:流被订阅者取消订阅时的回调方法
- doOnError:流发生异常时的回调方法,传入一个异常。
- doOnDisard:流中元素被丢弃时调用,通常在filter过滤时触发
自定义线程调度
在 Project Reactor中,默认所有的操作(包括生成、操作符处理、发布、订阅)都在调用 subscribe()的线程上执行,也就是同步、阻塞的方式,除非你显式地使用调度器(Scheduler)来切换线程。
可以通过publisherOn和subscribeOn方法改变调度线程,方法接受调度器。
publisherOn:影响流处理操作符执行的线程,作用于从当前位置开始往后的操作符,每次调用都会切换线程。用于某段流需切换线程处理时,比如从IO切换到计算线程时。subscribeOn:影响数据生成所在的线程,作用于整个流的最上游(数据源),只有第一次调用有效。用于IO 密集型的数据源,比如文件/网络读取。
Reactor提供了Schedulers类用于创建调度器。
| 调度器 | 适合场景 | 特点 |
|---|---|---|
Schedulers.parallel() | CPU 密集任务 | 固定线程池,线程数=CPU核心数 |
Schedulers.boundedElastic() | IO 阻塞型任务(如文件、数据库) | 自动增长线程池,线程总数为10xCPU核心数,最大100(默认),队列默认十万 |
Schedulers.single() | 单线程顺序处理 | 所有任务都在同一个线程上 |
Schedulers.immediate() | 当前线程(不切换) | 同步执行 |
Schedulers.fromExecutor(...) | 自定义线程池 | 手动控制 |
异常处理
当流在处理过程中出现异常时,可以选择将异常抛出到订阅者,也可以选择在流中拦截处理。
onErrorReturn:当流处理过程中发生异常,可以捕获异常,并返回一个值。默认捕获所有异常,也可以捕获指定的异常。onErrorResume:当流处理过程中发生异常,可以捕获异常,并返回一个流。默认捕获所有异常,也可以捕获指定的异常。onErrorMap:当流处理过程中发生异常,可以捕获异常,将异常转换为另一个异常并返回。doFinnal:流正常还是异常都能执行的方法,传入一个流信号。onErrorContinue:当流发生异常让流继续执行。onErrorComplete:当流发生异常时将异常信号转换为完成信号,并且结束流onErrorStop:当流发生异常时,终止订阅,返回onError异常结束信号。retry:当上游发生错误时,自动重新订阅并重试。参数设置重试次数,不设置则无限重试。
Sinks
Sinks 是 Reactor 提供的“事件推送器”,让你可以主动地将数据发送给下游流,非常适合事件驱动、异步桥接、回调转流等应用。
| 类型 | 方法组合 | 特点 | 适用场景 |
|---|---|---|---|
| 单值 | Sinks.one() | 只能发一个值 | 回调结果、单任务 |
| 空信号 | Sinks.empty() | 只发完成/错误 | 完成通知 |
| 多值-单订阅 | Sinks.many().unicast() | 只允许一个订阅者,有缓冲 | 单线程数据流 |
| 多值-广播 | Sinks.many().multicast() | 多个订阅者,只发给已订阅者 | 实时事件广播 |
| 多值-缓存 | Sinks.many().replay().all() | 多个订阅者,全部接收历史 | 状态同步 |
| 多值-最新 | Sinks.many().replay().latest() | 多个订阅者,最新值 | 监听状态变化 |
| 多值-限制数量 | Sinks.many().replay(n) | 多个订阅者,保留N条 | 小缓存场景 |
| 多值-时间缓存 | Sinks.many().replay(Duration) | 多个订阅者,保留最近N秒 | 延时通知系统 |
示例
以下示例,在订阅者订阅后,使用Sinks主动发送数据。最终结果是,在发送数据之前订阅的订阅者一接收到数据。在发送数据之后订阅的订阅者二没有接收到数据。
java
public static void main(String[] args) {
Sinks.Many<String> many = Sinks.many().multicast().onBackpressureBuffer();
many.asFlux().subscribe(v -> {
System.out.println("订阅者一:" + v);
});
for (int i = 0; i < 10; i++) {
many.tryEmitNext(String.valueOf(i));
}
many.asFlux().subscribe(v -> {
System.out.println("订阅者二:" + v);
});
}