聊聊reactive streams的Mono及Flux
时间:2022-06-11
本文章向大家介绍聊聊reactive streams的Mono及Flux,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
序
本文主要讲一下reactive streams的Publisher接口的两个抽象类Mono与Flux
Publisher
reactive-streams-1.0.1-sources.jar!/org/reactivestreams/Publisher.java
/**
* A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to
* the demand received from its {@link Subscriber}(s).
* <p>
* A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link #subscribe(Subscriber)} dynamically
* at various points in time.
*
* @param <T> the type of element signaled.
*/
public interface Publisher<T> {
/**
* Request {@link Publisher} to start streaming data.
* <p>
* This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
* <p>
* Each {@link Subscription} will work for only a single {@link Subscriber}.
* <p>
* A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
* <p>
* If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
* signal the error via {@link Subscriber#onError}.
*
* @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
*/
public void subscribe(Subscriber<? super T> s);
}
Mono
reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/Mono.java
public abstract class Mono<T> implements Publisher<T> {
//...
/**
* Expose the specified {@link Publisher} with the {@link Mono} API, and ensure it will emit 0 or 1 item.
* The source emitter will be cancelled on the first `onNext`.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from1.png" alt="">
* <p>
* @param source the {@link Publisher} source
* @param <T> the source type
*
* @return the next item emitted as a {@link Mono}
*/
public static <T> Mono<T> from(Publisher<? extends T> source) {
if (source instanceof Mono) {
@SuppressWarnings("unchecked")
Mono<T> casted = (Mono<T>) source;
return casted;
}
if (source instanceof Flux) {
@SuppressWarnings("unchecked")
Flux<T> casted = (Flux<T>) source;
return casted.next();
}
return onAssembly(new MonoFromPublisher<>(source));
}
/**
* Create a new {@link Mono} that emits the specified item, which is captured at
* instantiation time.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/just.png" alt="">
* <p>
* @param data the only item to onNext
* @param <T> the type of the produced item
*
* @return a {@link Mono}.
*/
public static <T> Mono<T> just(T data) {
return onAssembly(new MonoJust<>(data));
}
//...
}
Flux
reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/Flux.java
public abstract class Flux<T> implements Publisher<T> {
//......
/**
* Programmatically create a {@link Flux} with the capability of emitting multiple
* elements in a synchronous or asynchronous manner through the {@link FluxSink} API.
* <p>
* This Flux factory is useful if one wants to adapt some other multi-valued async API
* and not worry about cancellation and backpressure (which is handled by buffering
* all signals if the downstream can't keep up).
* <p>
* For example:
*
* <pre><code>
* Flux.<String>create(emitter -> {
*
* ActionListener al = e -> {
* emitter.next(textField.getText());
* };
* // without cleanup support:
*
* button.addActionListener(al);
*
* // with cleanup support:
*
* button.addActionListener(al);
* emitter.onDispose(() -> {
* button.removeListener(al);
* });
* }, FluxSink.OverflowStrategy.LATEST);
* </code></pre>
*
* @param <T> The type of values in the sequence
* @param backpressure the backpressure mode, see {@link OverflowStrategy} for the
* available backpressure modes
* @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals.
* @return a {@link Flux}
*/
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
}
/**
* Decorate the specified {@link Publisher} with the {@link Flux} API.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from.png" alt="">
* <p>
* @param source the source to decorate
* @param <T> The type of values in both source and output sequences
*
* @return a new {@link Flux}
*/
public static <T> Flux<T> from(Publisher<? extends T> source) {
if (source instanceof Flux) {
@SuppressWarnings("unchecked")
Flux<T> casted = (Flux<T>) source;
return casted;
}
if (source instanceof Fuseable.ScalarCallable) {
try {
@SuppressWarnings("unchecked") T t =
((Fuseable.ScalarCallable<T>) source).call();
if (t != null) {
return just(t);
}
return empty();
}
catch (Exception e) {
return error(e);
}
}
return wrap(source);
}
/**
* Programmatically create a {@link Flux} by generating signals one-by-one via a
* consumer callback and some state, with a final cleanup callback. The
* {@code stateSupplier} may return {@literal null} but your cleanup {@code stateConsumer}
* will need to handle the null case.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/generate.png" alt="">
* <p>
*
* @param <T> the value type emitted
* @param <S> the per-subscriber custom state type
* @param stateSupplier called for each incoming Subscriber to provide the initial state for the generator bifunction
* @param generator Consume the {@link SynchronousSink} provided per-subscriber by Reactor
* as well as the current state to generate a <strong>single</strong> signal on each pass
* and return a (new) state.
* @param stateConsumer called after the generator has terminated or the downstream cancelled, receiving the last
* state to be handled (i.e., release resources or do other cleanup).
*
* @return a {@link Flux}
*/
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) {
return onAssembly(new FluxGenerate<>(stateSupplier, generator, stateConsumer));
}
}
实例
Mono
@Test
public void testMonoBasic(){
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
}
Mono ,是指最多只能触发(emit) (事件)一次。它对应于 RxJava 库的 Single 和 Maybe 类型或者是java的Optional。因此一个异步任务,如果只是想要在完成时给出完成信号,就可以使用 Mono。 调用 Flux的single()将返回一个 Mono,而连接两个 monos一起使用 concatWith 将产生一个 Flux。
Flux
@Test
public void testBasic(){
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
}
Flux 相当于一个 RxJava Observable,能够发出 0~N 个数据项,然后(可选地)completing 或 erroring。处理多个数据项作为stream。
小结
Mono和Flux都是实现Publisher接口的抽象类,一个相当于Optional,一个相当于有0..N的stream。两个都是spring 5 reactive编程的重要基础概念。
doc
- mono
- flux
- CSS 特殊属性介绍之 pointer-events
- 【java开发系列】—— 集合使用方法
- Elasticsearch安装
- 【java开发系列】—— 深克隆和浅克隆
- 【java开发系列】—— JDOM创建、修改、删除、读取XML文件
- AngularJs ng-route路由详解
- JAVA获取txt文件内容
- 关于 CSS 反射倒影的研究思考
- 【Javascript】—— 1 方法function的高级特性
- Oracle基础知识-SQL简单命令
- 【java开发系列】—— spring简单入门示例
- JavaMelody应用监控使用指南
- 记录安装oracle的那些事(四)之oracle 缺包安包
- 【面试虐菜】—— JAVA面试题(1)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Java并发工具类Semaphore应用实例
- Java并发之死锁实例
- Java并发之ThreadPoolExecutor 线程执行服务
- Java并发之工具类 ForkJoin 任务分解
- 简单的 http 服务器
- 动态代理:cgib、jdk、java javassist
- JAVA NIO Channel
- JAVA NIO Scatter/Gather(矢量IO)
- JAVA NIO FileChannel 内存映射文件
- JAVA NIO Socket通道
- Mysql Illegal mix of collations (utf8_unicode_ci,IMPLICIT) and (utf8_general_ci,IMPLICIT) for operat
- Spring 资源文件处理
- Spring profile配置应用
- Spring Bean的加载
- Spring ApplicationContext 简介