RxJava取消订阅的各种方式的实现
时间:2022-07-27
本文章向大家介绍RxJava取消订阅的各种方式的实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
手动取消订阅
Consumer类型
Observable创建返回Disposable取消
public class SecondActivity extends AppCompatActivity {
private static final String TAG = "SecondActivity";
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_second);
disposable = Observable.create(new ObservableOnSubscribe<String () {
@Override
public void subscribe(ObservableEmitter<String emitter) throws Exception {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String () {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: "+s);
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
Log.d(TAG, "onDestroy: ");
//取消订阅
if(disposable != null && !disposable.isDisposed()){
disposable.dispose();
Log.d(TAG, "onDestroy: dispose");
}
}
}
普通类型Observer
在Observer中获取Disposable然后取消
public class ThirdActivity extends AppCompatActivity {
private static final String TAG = "ThirdActivity";
Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_third);
Observable.create(new ObservableOnSubscribe<String () {
@Override
public void subscribe(ObservableEmitter<String emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String () {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
Log.d(TAG, "onDestroy: ");
//然后在需要取消订阅的地方调用即可
if (disposable != null && !disposable.isDisposed()) {
Log.d(TAG, "dispose: ");
disposable.dispose();
}
}
}
DisposableObserver类型
利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消
public class FourthActivity extends AppCompatActivity {
private static final String TAG = "FourthActivity";
private DisposableObserver<String observer;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_fourth);
observer = Observable.create(new ObservableOnSubscribe<String () {
@Override
public void subscribe(ObservableEmitter<String emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String () {
@Override
public void onNext(String o) {
Log.d(TAG, "onNext: "+o);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
if (observer != null && !observer.isDisposed()) {
Log.d(TAG, "dispose: ");
observer.dispose();
}
}
}
取消多个Observer
把多个Observer添加CompositeDisposable,一次取消
public class ComDisposableActivity extends AppCompatActivity {
private Disposable disposable1;
private Disposable disposable2;
private static final String TAG = "ComDisposableActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_com_disposable);
Observable.create(new ObservableOnSubscribe<String () {
@Override
public void subscribe(ObservableEmitter<String emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: Unsubscribing subscription from onCreate()");
}
})
.subscribe(new Observer<String () {
@Override
public void onSubscribe(Disposable d) {
disposable1 = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
Observable.create(new ObservableOnSubscribe<String () {
@Override
public void subscribe(ObservableEmitter<String emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String () {
@Override
public void onSubscribe(Disposable d) {
disposable2 = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
CompositeDisposable compositeDisposable = new CompositeDisposable();
//批量添加
compositeDisposable.add(disposable1);
compositeDisposable.add(disposable2);
//最后一次性全部取消订阅
compositeDisposable.dispose();
}
}
RxLifecyle取消
OnDestory取消
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");
}
})
.compose(this.<Long bindToLifecycle())
.subscribe(new Consumer<Long () {
@Override
public void accept(Long num) throws Exception {
Log.d(TAG, "accept: " + num);
}
});
指定生命周期取消
Observable.interval(1,TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");
}
}).compose(this.<Long bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Consumer<Long () {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "bindUntilEvent accept: " + aLong);
}
});
以上就是本文的全部内容,希望对大家的学习有所帮助。
- P03_Hive 安装
- P06_flume-ng-1.5.0-cdh5.3.6安装
- P04_zookeeper-cdh5集群搭建
- P05_kafka_2.9.2-0.8.1集群搭建
- 再探matplotlib
- 分布式系统(Distributed System)资料
- Python性能提升20倍居然不是标题党?
- Spark开发电商日志分析用户行为聚合功能练习下面开始搭建开发环境注意Task表中最后一个列task_param中,Json的StartDate和EndDate需要设置成今天,因为mock数据的时候,
- HDU 3783 ZOJ
- HDU 1412 {A} + {B}
- HDU 2092 整数解
- Hadoop数据分析平台实战——020Hadoop Shell命令(初学跳过)离线数据分析平台实战——020Hadoop Shell命令(可跳过)
- HDU 2080 夹角有多大II
- 二分查找模版
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- Pinstaller(Python打包为exe文件
- pytest 测试框架学习(14):pytest.warns
- ImportError: /lib64/libm.so.6: version `CXXAB_1.3.8.' not found (required by /usr/local/python37/lib
- pytest 测试框架学习(15):pytest.freeze_includes
- Linux: scp文件,目录上传下载标准版
- Hibernate第三天:Hibernate的一对多配置、Hibernate的多对多的配置
- Git: 掉坑记 -- git reset 杀手
- ModuleNotFoundError: No module named 'phkit.pinyin'
- Hibernate第四天:Hibernate的查询方式、抓取策略
- 爬虫抓取博客园前10页标题带有Python关键字(不区分大小写)的文章
- Python爬虫抓取唐诗宋词
- ImportError: /lib64/libm.so.6: version `GLIBC_2.23' not found (required by /usr/local/python37/lib/p
- 数据分析与数据挖掘 - 02基础操练
- 数据分析与数据挖掘 - 03智能对话
- 【深度揭秘】为什么很多语言的数组下标是从0开始的?