RxJava取消订阅的各种方式的实现

时间:2022-07-27
本文章向大家介绍RxJava取消订阅的各种方式的实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

手动取消订阅

Consumer类型

Observable创建返回Disposable取消

public class SecondActivity extends AppCompatActivity {

  private static final String TAG = "SecondActivity";
  private Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_second);
    disposable = Observable.create(new ObservableOnSubscribe<String () {
      @Override
      public void subscribe(ObservableEmitter<String  emitter) throws Exception {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String () {
          @Override
          public void accept(String s) throws Exception {
            Log.d(TAG, "accept: "+s);
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //取消订阅
    if(disposable != null && !disposable.isDisposed()){
      disposable.dispose();
      Log.d(TAG, "onDestroy: dispose");
    }
  }
}

普通类型Observer

在Observer中获取Disposable然后取消

public class ThirdActivity extends AppCompatActivity {
  private static final String TAG = "ThirdActivity";
  Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_third);
    Observable.create(new ObservableOnSubscribe<String () {
      @Override
      public void subscribe(ObservableEmitter<String  emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String () {
          @Override
          public void onSubscribe(Disposable d) {
            disposable = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG, "onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG, "onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "onComplete: ");
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //然后在需要取消订阅的地方调用即可
    if (disposable != null && !disposable.isDisposed()) {
      Log.d(TAG, "dispose: ");
      disposable.dispose();
    }
  }
}

DisposableObserver类型

利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消

public class FourthActivity extends AppCompatActivity {
  private static final String TAG = "FourthActivity";
  private DisposableObserver<String  observer;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_fourth);
    observer = Observable.create(new ObservableOnSubscribe<String () {
      @Override
      public void subscribe(ObservableEmitter<String  emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(new DisposableObserver<String () {
      @Override
      public void onNext(String o) {
        Log.d(TAG, "onNext: "+o);
      }

      @Override
      public void onError(Throwable e) {
        Log.d(TAG, "onError: ");
      }

      @Override
      public void onComplete() {
        Log.d(TAG, "onComplete: ");
      }
    });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    if (observer != null && !observer.isDisposed()) {
      Log.d(TAG, "dispose: ");
      observer.dispose();
    }
  }
}

取消多个Observer

把多个Observer添加CompositeDisposable,一次取消

public class ComDisposableActivity extends AppCompatActivity {
private Disposable disposable1;
private Disposable disposable2;
private static final String TAG = "ComDisposableActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_com_disposable);
Observable.create(new ObservableOnSubscribe<String () {
@Override
public void subscribe(ObservableEmitter<String  emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: Unsubscribing subscription from onCreate()");
}
})
.subscribe(new Observer<String () {
@Override
public void onSubscribe(Disposable d) {
disposable1 = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
Observable.create(new ObservableOnSubscribe<String () {
@Override
public void subscribe(ObservableEmitter<String  emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String () {
@Override
public void onSubscribe(Disposable d) {
disposable2 = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
CompositeDisposable compositeDisposable = new CompositeDisposable();
//批量添加
compositeDisposable.add(disposable1);
compositeDisposable.add(disposable2);
//最后一次性全部取消订阅
compositeDisposable.dispose();
}
}

RxLifecyle取消

OnDestory取消

Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");
}
})
.compose(this.<Long bindToLifecycle())
.subscribe(new Consumer<Long () {
@Override
public void accept(Long num) throws Exception {
Log.d(TAG, "accept: " + num);
}
});

指定生命周期取消

Observable.interval(1,TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");
}
}).compose(this.<Long bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Consumer<Long () {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "bindUntilEvent accept: " + aLong);
}
});

以上就是本文的全部内容,希望对大家的学习有所帮助。