reactive streams与观察者模式
序
本文主要研究下java里头的reactive streams与观察者模式。
reactive streams
reactive编程范式是一个异步编程范式,主要涉及数据流及变化的传播,可以看做是观察者设计模式的扩展。
java里头的iterator是以pull模型,即订阅者使用next去拉取下一个数据;而reactive streams则是以push模型为主,订阅者调用subscribe方法订阅,发布者调用订阅者的onNext通知订阅者新消息。
reactive streams java api
reactive streams定义了4个java api,如下 Processor processor既是Subscriber也是Publisher,代表二者的处理阶段
Publisher
publisher是数据的提供者, 将数据发布给订阅者
Subscriber
在调用Publisher.subscribe(Subscriber)之后,Subscriber.onSubscribe(Subscription)将会被调用
Subscription
Subscription代表订阅者与发布者的一次订阅周期,一旦调用cancel去掉订阅,则发布者不会再推送消息。
观察者模式
观察者模式的实现有推模型和拉模型
- 拉模型 即发布者通知订阅有新消息,订阅者再去找发布者拉取
- 推模型 即发布者通知订阅者有消息,通知的时候已经带上了一个新消息
reactor实例
maven
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.2.RELEASE</version>
</dependency>
reactor 3 是java里头reactive streams的一个实现,基于reactive streams的java api,是spring 5反应式编程的基础。
Flux实例
@Test
public void testBackpressure(){
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
小结
从上面的代码看,reactive streams实际上是推拉结合的模式的结合。为什么还要拉呢?
rabbitmq vs kafka
rabbitmq是以推为主的,如果消费者消费能力跟不上,则消息会堆积在内存队列中(
必要时可能写磁盘
) kafka则是以拉为主的,生产者推送消息到broker,消费者自己根据自己的能力从broker拉取消息,由于消息是持久化的,因此无需关心生产消费速率的不平衡
backpressure
backpressure这个是为处理生产速率与消费速率不平衡这个问题而衍生出来的,订阅者可以在next方法里头根据自己的情况,使用request方法告诉发布者要取N个数据,发布者则向订阅者推送N个数据。通过request达到订阅者对发布者的反馈。而对于发布者而言,为了实现backpressure,则需要有一个缓存队列来缓冲订阅者没来得及消费的数据。涉及到缓冲,就涉及容量是有界还是无界,如果是有界则在缓冲慢的时候,处理策略是怎样等等。
doc
- reactive streams java api
- Java 9 揭秘(17. Reactive Streams)
- Java 9 Reactive Streams
- 我的数据访问函数库的源代码(四)—— 存储过程部分,包括存储过程的参数的封装
- [WCF 4.0新特性] 路由服务[实例篇]
- [WCF 4.0新特性] 默认终结点
- 三层架构之我见 —— 不同于您见过的三层架构。
- 来源于WCF的设计模式:可扩展对象模式[下篇]
- [WCF 4.0新特性] 标准终结点与无(.SVC)文件服务激活
- 我的数据访问类(第二版)—— for .net2.0 (二)
- 我的数据访问类(第二版)—— for .net2.0 (一)
- [WCF 4.0新特性] 路由服务[原理篇]
- 通过“访问多种数据库”的代码来学习多态!(.net2.0版)
- [WCF-Discovery] 客户端如何能够“探测”到可用的服务?
- WCF的安全审核——记录谁在敲打你的门
- Step By Step 一步一步写网站[1] —— 填加数据
- 五个解决方案让MongoDB拥有RDBMS的鲁棒性事务
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- php根据命令行参数生成配置文件详解
- PHP使用SOAP调用API操作示例
- 使用Zookeeper分布式部署PHP应用程序
- pytorch判断是否cuda 判断变量类型方式
- Keras搭建自编码器操作
- python程序如何进行保存
- Android Q之气泡弹窗的实现示例
- Python with语句用法原理详解
- pytorch 计算ConvTranspose1d输出特征大小方式
- Keras中 ImageDataGenerator函数的参数用法
- CI框架网页缓存简单用法分析
- 掌握PHP垃圾回收机制详解
- PHP基于面向对象封装的分页类示例
- PHP获取对象属性的三种方法实例分析
- PHP7内核之Reference详解