RxAndroid完全教程

时间:2022-04-26
本文章向大家介绍RxAndroid完全教程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

不知现在的编辑,能否好用一些呢?希望大家有什么意见,在公众号直接回复即可,想了解哪些知识点,也可以直接留言了。 炎炎夏日,清凉一夏 这章节主要介绍下Rx系列的经典内容,Rxandroid。 Rx是响应式编程的意思, 本质是观察者模式, 是以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式. 更好的体验,点击阅读原文查看

RxAndroid 常见用法

在项目里面添加

     compile 'io.reactivex:rxandroid:1.1.0'// Because RxAndroid releases are few and far between, it is recommended you also
     // explicitly depend on RxJava's latest version for bug fixes and new features.
    compile 'io.reactivex:rxjava:1.1.0'

例子1:

    Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {                    
                @Override
                    public void call(String s) {
                        Log.i("lxm0",s);
                    }
                }, new Action1<Throwable>() {                    
                @Override
                    public void call(Throwable throwable) {
                        Log.i("lxm0","eeee");
                    }
                }, new Action0() {                    
                @Override
                    public void call() {
                        Log.i("lxm0","wwwwwww");
                    }
                });

map对数据作为对输入数据做一次处理,然后再传给Action1

        Observable.just(1, 2, 3, 4, 4)
                .map(new Func1<Integer, String>() {                    
                @Override
                    public String call(Integer  num) {                        
                    return num+"";
                    }
                    ;}).subscribe(new Action1<String>() {            
                    @Override
            public void call(String s) {
                Log.i("lxm",s);
            }
        });

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

from从里面拿出每一个,调用flatmap将数据再次包裹起来,变换后再次进入filter进入过滤条件,过滤条件有前后顺序, flatMap 将里面的元素进行每个重新包装,再次作为一个被观察者发送出去。AndroidSchedulers.mainThread() 主线程。 onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。 3.将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe(). 需要撤销

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
    @Override
        public Observable<File> call(File file) {            
        return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {        
    @Override
        public Boolean call(File file) {            
        return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {        
    @Override
        public Bitmap call(File file) {            
        return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread(
    ))
    .subscribe(new Action1<Bitmap>() {        
    @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

构造一个被观察者

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。 Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。 Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread()更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。 Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }    @Override
    public void onCompleted() {
    }    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

继续来看

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

关键的东西,将被观察者的一些东西执行在主线程.doOnSubscribe的内容运行在下面的subscribeOn指定的线程上

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程  
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

RxJava 的适用场景和使用方式

  1. 与 Retrofit 的结合

Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。

Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。

以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:

@GET("/user")public void getUser(@Query("userId") String userId, Callback<User> callback);

在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:

getUser(userId, new Callback<User>() {    @Override
    public void success(User user) {
        userView.setUser(user);
    }    @Override
    public void failure(RetrofitError error) {        // Error handling
        ...
    }
};

而使用 RxJava 形式的 API,定义同样的请求是这样的:

@GET("/user")public Observable<User> getUser(@Query("userId") String userId);

代码变为:

getUser(userId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }        @Override
        public void onCompleted() {
        }        @Override
        public void onError(Throwable error) {            // Error handling
            ...
        }
    });

看到区别了吗?

当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()。

对比来看, Callback 形式和 Observable 形式长得不太一样,但本质都差不多,而且在细节上 Observable 形式似乎还比Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢?

因为它好用啊!从这个例子看不出来是因为这只是最简单的情况。而一旦情景复杂起来, Callback 形式马上就会开始让人头疼。比如:

假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。使用Callback 方式大概可以这么写:

getUser(userId, new Callback<User>() {    @Override
    public void success(User user) {
        processUser(user); // 尝试修正 User 数据
        userView.setUser(user);
    }    @Override
    public void failure(RetrofitError error) {        // Error handling
        ...
    }
};

有问题吗?

很简便,但不要这样做。为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化一下:

getUser(userId, new Callback<User>() {    @Override
    public void success(User user) {        new Thread() {            @Override
            public void run() {
                processUser(user); // 尝试修正 User 数据
                runOnUiThread(new Runnable() { // 切回 UI 线程
                    @Override
                    public void run() {
                        userView.setUser(user);
                    }
                });
            }).start();
    }    @Override
    public void failure(RetrofitError error) {        // Error handling
        ...
    }
};

性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,因为代码越乱往往就越难读懂,而如果项目中充斥着杂乱的代码,无疑会降低代码的可读性,造成团队开发效率的降低和出错率的升高。

这时候,如果用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这样的:

getUser(userId)
    .doOnNext(new Action1<User>() {        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }        @Override
        public void onCompleted() {
        }        @Override
        public void onError(Throwable error) {            // Error handling
            ...
        }
    });

后台代码和前台代码全都写在一条链中,明显清晰了很多。

再举一个例子:假设 /user 接口并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写?

Callback 方式,可以使用嵌套的 Callback:

@GET("/token")public void getToken(Callback<String> callback);@GET("/user")public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);

...

getToken(new Callback<String>() {    @Override
    public void success(String token) {
        getUser(userId, new Callback<User>() {            @Override
            public void success(User user) {
                userView.setUser(user);
            }            @Override
            public void failure(RetrofitError error) {                // Error handling
                ...
            }
        };
    }    @Override
    public void failure(RetrofitError error) {        // Error handling
        ...
    }
});

倒是没有什么性能问题,可是迷之缩进毁一生,你懂我也懂,做过大项目的人应该更懂。

而使用 RxJava 的话,代码是这样的:

@GET("/token")public Observable<String> getToken();@GET("/user")public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {        @Override
        public Observable<User> onNext(String token) {            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }        @Override
        public void onCompleted() {
        }        @Override
        public void onError(Throwable error) {            // Error handling
            ...
        }
    });

用一个 flatMap() 就搞定了逻辑,依然是一条链。看着就很爽,是吧?

好,Retrofit 部分就到这里。

  1. RxBinding

RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API。

举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
    .subscribe(new Action1<ViewClickEvent>() {        @Override
        public void call(ViewClickEvent event) {            // Click handling
        }
    });

看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击