JDK9特性-Reactive Stream 响应式流
初识Reactive Stream
Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范。响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。
“背压(反压)back pressure”概念很关键。首先异步消费者会向生产者订阅接收消息,然后当有新的信息可用时,消费者会通过之前订阅时提供的回调函数被再次激活调用。如果生产者发出的信息比消费者能够处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费越来越多的资源,埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者,降低消息的生成速度。生产者可以采用多种策略来实现这一要求,这种机制称为背压。
响应式流模型非常简单——订阅者向发布者发送多个元素的异步请求,发布者向订阅者异步发送多个或稍少的元素。响应式流会在pull模型和push模型流处理机制之间动态切换。 当订阅者较慢时,它使用pull模型,当订阅者更快时使用push模型。
简单来说,在响应式流下订阅者可以与发布者沟通,如果使用JMS就应该知道,订阅者只能被动接收发布者所产生的消息数据。这就好比没有水龙头的水管一样,我只能被动接收水管里流过来的水,无法关闭也无法减少。而响应式流就相当于给水管加了个水龙头,在消费者这边可以控制水流的增加、减少及关闭。
响应式流模型图:
发布者(Publisher)是潜在的无限数量的有序元素的生产者。发布者可能有多个来自订阅者的待处理请求。
- 根据收到的要求向当前订阅者发布(或发送)元素。
订阅者(Subscriber)从发布者那里订阅并接收元素。订阅者可以请求更多的元素。
- 发布者向订阅者发送订阅令牌(Subscription)。
- 使用订阅令牌,订阅者从发布者那里请求多个元素。
- 当元素准备就绪时,发布者向订阅者发送多个或更少的元素。
Reactive Stream主要接口
JDK9 通过java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 类来实现响应式流。在JDK9里Reactive Stream的主要接口声明在Flow类里,Flow 类中定义了四个嵌套的静态接口,用于建立流量控制的组件,发布者在其中生成一个或多个供订阅者使用的数据项:
- Publisher:数据项发布者、生产者
- Subscriber:数据项订阅者、消费者
- Subscription:发布者与订阅者之间的关系纽带,订阅令牌
- Processor:数据处理器
Flow类结构如下:
Publisher
是能够发出元素的发布者,Subscriber
是接收元素并做出响应的订阅者。当执行Publisher
里的subscribe
方法时,发布者会回调订阅者的onSubscribe
方法,这个方法中,通常订阅者会借助传入的Subscription
向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext
方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete
告知订阅者流已经发完;如果有错误发生,则通过onError
发出错误数据,同样也会终止流。
其中,Subscription
相当于是连接Publisher
和Subscriber
的“纽带”。因为当发布者调用subscribe
方法注册订阅者时,会通过订阅者的回调方法onSubscribe
传入Subscription
对象,之后订阅者就可以使用这个Subscription
对象的request
方法向发布者“要”数据了。背压机制正是基于此来实现的。
如下图:
Processor
则是集Publisher
和Subscriber
于一身,相当于是发布者与订阅者之间的一个”中间人“,可以通过Processor
进行一些中间操作:
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
如下图:
参考:
响应流使用示例
1.以下代码简单演示了SubmissionPublisher 和这套发布-订阅框架的基本使用方式:
package com.example.demo;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* @program: demo
* @description: Flow Demo
* @author: 01
* @create: 2018-10-04 13:25
**/
public class FlowDemo {
public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();
// 2. 定义订阅者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者已经达到了目标, 可以调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 3. 发布者和订阅者 建立订阅关系
publiser.subscribe(subscriber);
// 4. 生产数据, 并发布
// 这里忽略数据生产过程
for (int i = 0; i < 3; i++) {
System.out.println("生成数据:" + i);
// submit是个block方法
publiser.submit(i);
}
// 5. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就会退出
Thread.currentThread().join(1000);
// debug的时候, 下面这行需要有断点
// 否则主线程结束无法debug
System.out.println();
}
}
运行结果如下:
上文中提到过可以调节发布者的数据产出速度,那么这个速度是如何调节的呢?关键就在于submit方法,该方法是一个阻塞方法。需要先说明的是SubmissionPublisher里有一个数据缓冲区,用于缓冲发布者产生的数据,而这个缓冲区是利用一个Object数组实现的,缓冲区最大长度为256。我们可以在onSubscribe方法里打上断点,查看到这个缓冲区:
当这个缓冲区的数据满了之后,submit方法就会进入阻塞状态,发布者数据的产生速度就会变慢,以此实现调节发布者的数据产出速度。
2.第二个例子演示了结合Processor的使用方式,代码如下:
package com.example.demo;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);
// 过滤掉小于0的, 然后发布出去
if (item > 0) {
this.submit("转换后的数据:" + item);
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}
}
/**
* 带 process 的 flow demo
* @author 01
*/
public class FlowDemo2 {
public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();
// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();
// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);
// 4. 定义最终订阅者, 消费 String 类型数据
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);
// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);
// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}
}
运行结果如下:
- Selenium2+python自动化63-二次封装(click/send_kesy)
- Selenium2+python自动化65-js定位几种方法总结
- HDU 2502 月之数(二进制,规律)
- Tensorflow实战系列:手把手教你使用CNN进行图像分类(附完整代码)
- HDU 2503 a/b + c/d(最大公约数与最小公倍数,板子题)
- python接口自动化6-重定向(Location)
- 2017广东工业大学程序设计竞赛初赛 题解&源码(A,水 B,数学 C,二分 D,枚举 E,dp F,思维题 G,字符串处理 H,枚举)
- python接口自动化7-参数关联
- 深度学习GPU环境Ubuntu16.04+GTX1080+CUDA9+cuDNN7+TensorFlow1.6环境配置
- python接口自动化8-参数化
- HDU 2037 今年暑假不AC(贪心,区间更新,板子题)
- “玲珑杯”ACM比赛 Round #13 题解&源码
- 回溯算法入门及经典案例剖析(初学者必备宝典)
- Selenium2+python自动化66-装饰器之运行失败截图
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android自定义View倒计时圆
- Android开发实现的IntentUtil跳转多功能工具类
- Android端“被挤下线”功能的单点登录实现
- Android轻松实现多语言的方法示例
- Android开发实现去除bitmap无用白色边框的方法示例
- Android开发实现的内存管理工具类
- Android日期和时间选择器实现代码
- Android开发实现ImageView加载摄像头拍摄的大图功能
- Android开发实现的Intent跳转工具类实例
- Android开发中的文件操作工具类FileUtil完整实例
- Android开发中超好用的正则表达式工具类RegexUtil完整实例
- Android ijkplayer的使用方法解析
- Android开发实现查询远程服务器的工具类QueryUtils完整实例
- 解决android studio 3.0 加载项目过慢问题–maven仓库选择
- Android实现朋友圈点赞列表