操作符之功能性操作符

时间:2019-08-06
本文章向大家介绍操作符之功能性操作符,主要包括操作符之功能性操作符使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

     1、作用

  • 辅助被观察者(Observable) 在发送事件时实现一些功能性需求

     2、类型

     3、详解

          3.1   subscribe() /  subscribeOn() /  observeOn()

       作用:订阅  /  设置被观察者线程  /  设置观察者线程

          3.2   delay() ----- 见rxdocs.pdf第157页

       作用:使得被观察者延迟一段时间再发送事件

    public static void delay() {
        Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

        输出:

08-06 19:38:09.177 15886 15886 D Operation: onSubscribe
08-06 19:38:12.178 15886 15904 D Operation: onNext: value = 1
08-06 19:38:12.179 15886 15904 D Operation: onNext: value = 2
08-06 19:38:12.179 15886 15904 D Operation: onNext: value = 3
08-06 19:38:12.179 15886 15904 D Operation: onComplete

          3.3   do() ----- 见rxdocs.pdf第161页

       作用:在事件发送 & 接收的整个生命周期过程中进行操作

    public static void doOperation() {
        Observable.just(1, 2)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doOnNext: value = " + integer);
                    }
                })
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(TAG, "doOnLifecycle Consumer: disposable  = " + disposable.isDisposed());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnLifecycle Action");
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doFinally");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doAfterTerminate");
                    }
                })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnTerminate");
                    }
                })
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(TAG, "doOnSubscribe: disposable = " + disposable.isDisposed());
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "doOnError");
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnComplete");
                    }
                })
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doAfterNext: value = " + integer);
                    }
                })
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.d(TAG, "doOnEach: notification = " + integerNotification.toString());
                    }
                })
                .doOnEach(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "doOnEach: onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "doOnEach: onNext = " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "doOnEach: onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "doOnEach: onComplete");
                    }
                })
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnDispose");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "real onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "real onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "real onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "real onComplete");
                    }
                });
    }

          输出:

08-06 20:10:46.205 18692 18692 D Operation: doOnLifecycle Consumer: disposable  = false
08-06 20:10:46.205 18692 18692 D Operation: doOnSubscribe: disposable = false
08-06 20:10:46.205 18692 18692 D Operation: real onSubscribe
08-06 20:10:46.205 18692 18692 D Operation: doOnNext: value = 1
08-06 20:10:46.206 18692 18692 D Operation: doOnEach: notification = OnNextNotification[1]
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onNext = 1
08-06 20:10:46.207 18692 18692 D Operation: real onNext: value = 1
08-06 20:10:46.207 18692 18692 D Operation: doAfterNext: value = 1
08-06 20:10:46.207 18692 18692 D Operation: doOnNext: value = 2
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: notification = OnNextNotification[2]
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onNext = 2
08-06 20:10:46.207 18692 18692 D Operation: real onNext: value = 2
08-06 20:10:46.207 18692 18692 D Operation: doAfterNext: value = 2
08-06 20:10:46.207 18692 18692 D Operation: doOnTerminate
08-06 20:10:46.207 18692 18692 D Operation: doOnComplete
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: notification = OnCompleteNotification
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onComplete
08-06 20:10:46.208 18692 18692 D Operation: real onComplete
08-06 20:10:46.208 18692 18692 D Operation: doAfterTerminate
08-06 20:10:46.208 18692 18692 D Operation: doFinally

          3.4   onErrorReturn() ----- 见rxdocs.pdf第151页

       作用:遇到错误时,发送1个特殊事件 & 正常终止

原文地址:https://www.cnblogs.com/andy-loong/p/11311577.html