RxJava的一些入门学习分享

时间:2022-04-25
本文章向大家介绍RxJava的一些入门学习分享,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

最近在Android编程领域,RxJava这个框架的技术热度相当的高。团队对使用RxJava改进目前的项目代码也表示很有兴趣,所以我特地和团队伙伴一起对这套框架做了一下调研。现在就把目前对RxJava的一些理解和粗浅的分析做一下分享,欢迎大家拍砖。

1.什么是RxJava

要了解什么是RxJava,先需要了解一下它的理念来源:ReactiveX。

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。RxJava就是Rx的Java语言库。

RxJava官方所给的定义是“a library for composing asynchronous and event-based programs using observable sequences for the Java VM”,即“Java虚拟机上的使用可观测序列进行可组合可异步的基于事件的编程的库”。这个定义概括虽然精准但是略微有点难懂。简单的说,RxJava采用的是观察者模式,代表被观察的数据源的类叫Observable,而代表观察Observable并作出响应的观察者的类叫Subscriber(其实Subscriber是一个实现了Observer接口的抽象类,严格来说Observer才是实现观察者功能的最基本单元,但由于Subcriber实现了一些观察者的基本功能,使用较为方便,一般就使用Subscriber类作为最基本的观察者单元)。

Subscriber通过“订阅”的方式观察Observable,所观察的Observable会依次发出一个数据序列,这个数据序列在被响应之前可以先进行各种处理,比如过滤序列中的数据,通过定义映射方法把原数据映射成新的数据,控制数据的发出时机,组合若干个数据序列成为一个新序列等等,这种处理在RxJava被称作“变换”,实现变换的方法被称作“操作符”。最后得到的序列上就只有我们感兴趣的数据,观察者无需等待数据生成,创建并订阅后只需响应序列上传来的最新数据即可,因此使用RxJava的代码是异步的。

同时RxJava采用了函数式编程的风格,在序列的变换方法和响应事件的方法,都大量使用了Java的函数式接口,并把变换中的要处理的线程同步,IO阻塞,异常处理等逻辑都封装进操作符方法里,不同的变换方法可以链式连续调用,变换的详情可以从链式调用的顺序得知,因此整体代码显得特别的简洁易读和容易拓展,使用这套框架开发能够有效节省阅读代码带来的精力和时间成本。

我们来看一段使用RxJava编写的简单代码,先来了解一下这套框架的基本知识:

public static void helloWorldRxJava() {
     Observable<String> observable = Observable.create(new  Observable.OnSubscribe<String>() {
     @Override
     public void call(Subscriber<? super String> subscriber) {        
           subscriber.onNext("Hello");
           subscriber.onNext("World");
           subscriber.onNext("RxJava");
           subscriber.onCompleted();
        }
   });
   Subscriber<String> subscriber = new Subscriber<String>() {
       @Override
           public void onNext(String arg0) {
            System.out.println(arg0);
       };
       @Override
       public void onError(Throwable arg0) {
       }
       @Override
       public void onCompleted() {
           System.out.println("onCompleted!!");
       }
      };
    observable
   .subscribeOn(Schedulers.newThread())
   .observeOn(Schedulers.newThread())
   .subscribe(subscriber);
}

这段代码实现的功能很简单:Observable依次发出三个字符串,Subscriber接到发出的字符串将其依次打印出来。当发送响应都完成的时候打印字符串“onCompleted!!”。

代码运行后在console的打印结果如下:

Hello 
World 
RxJava 
onCompleted!!

Observable在创建的时候会传入一个OnSubscribe对象作为成员。OnSubscribe是一个函数式接口,它唯一的方法call传入订阅Observable的Subscriber做参数,在里面定义了如何向Subscriber发出数据序列的逻辑。而Subscriber则有三个回调方法:onNext,onError,onCompleted。 RxJava除了使用观察者模式之外,同时也应用并拓展了迭代器模式,对数据序列是迭代遍历依次处理的,而Subscriber的onNext方法正对应Java的Iterable中的next方法。其他两个方法则是拓展迭代器模式新增的方法。onError方法用于响应数据序列发出过程中出现的异常的处理,当这个方法被回调之后对数据序列的响应就会强制终止。onCompleted方法在数据序列上的所有数据都发送响应完毕之后回调,通知所有工作已完成。这个拓展的迭代器模式相比传统还有一个优点在于,onNext方法、onError方法和onCompleted方法可异步回调,传统的next方法只能阻塞回调,在数据的遍历上会更有效率。

当Observable调用subscribe方法被Subscriber订阅,就会调用OnSubscribe的call方法,启动数据序列的发射。在调用subscribe方法之前,可以调用操作符方法在数据序列发送的时候做变换处理,其中有两个操作符方法observeOn和subscribeOn比较特别。subscribeOn方法指定数据将在哪个线程发出,observeOn方法指定数据将在哪个线程响应。线程将有Scheduler这个类指定。上述代码中,字符串的发出和响应打印都新建一个线程完成。通过使用observeOn和subscribeOn两个方法,可以轻松指定工作的线程,而无需关注线程间要如何通信,线程同步如何解决等问题,因为这些问题都会在RxJava框架内部解决。

同时,Observable的操作符方法的订阅方法的调用,都带有函数式编程的风格,没有任何外部变量的干扰,操作符变换的顺序相当清晰,代码显得格外简洁,相当容易阅读。

通过上面的介绍,我们大概明白了RxJava的两大优点:异步实现简单,代码简洁易读。对于平常经常碰到的繁杂的业务逻辑和线程并发问题,使用RxJava可以让我们摆脱那些为了解决这些问题所写的晦涩难读的代码。有效的提高开发效率。

2.创建Observable的一些常用API

对于Observable如何发送事件,一般是在创建Observable的create方法里面传入的OnSubscribe成员去定义。但是在很多使用Observable的情景里,都是给定了一个数据列表作为所有要发送的事件,如果依然使用OnSubscribe的call方法,在里面通过迭代列表发送数据,代码不免显得有点啰嗦冗余,为此Observable提供了几个直接传入事件列表做参数的创建方法,通过这些方法创建的Observable,迭代事件列表发送事件的逻辑被封装了,这样就能避免啰嗦的循环代码,保持代码的简洁。

Observable.from()

这个方法传入的就是一个实现了Iterable接口的对象(最常见的就是Java的各种List对象),创建出来的Observable会逐个发送这个对象里的元素给订阅的Subscriber响应。以下是from方法的简单的使用实例:

List<String> list = new ArrayList<String>();
list.add("Hello");
list.add("World");
list.add("RxJava");
Observable<String> observable = Observable.from(list);
Subscriber<String> subscriber = new Subscriber<String>() {

    public void onNext(String arg){
        System.out.println(arg);
    };

    @Override
    public void onError(        Throwable arg0) {

    }

    @Override
    public void onCompleted() {
        System.out.println(        "onCompleted!!");
    }
};
observable
.subscribe(subscriber);

上述代码的from方法传入了一个String类型的ArrayList,创建的Observable在调用subscribe方法就会把这个ArrayList里的所有元素逐个发送,订阅的Subscriber就会将其逐个打印。

Observable.just()

just方法同样是一个传入事件列表的Observable的创建方法,但是just方法的参数是多个的,且数目不定,可以有1到10个参数,但这些参数必须同类型。这里的参数将作为一个列表传给Observable作事件列表。以下是just方法的简单的使用实例:

Observable<String> observable = Observable.just("Hello", "World", "RxJava");
Subscriber<String> subscriber = new Subscriber<String>() {
   public void onNext(String arg){
       System.out.println(arg);
   };

   @Override
   public void onError(   Throwable arg) {
      }

   @Override
   public void onCompleted() {       System.out.println(       "onCompleted!!");
   }};observable
subscribe(subscriber);

3.Observable的一些基本变换操作符

变换是Observable的一个很重要的功能,我们可以通过变换对事件序列进行各种各样的处理。最基本的变换一个是映射,通过定义映射规则把原事件映射为新类型的事件;一个是过滤,通过定义过滤规则只保留我们感兴趣的事件,其余的全部丢弃。

Observable.map()

上图是map方法的一个基本的使用示意图,如图所示,上方和下方各表示一个事件序列,上方表示原序列,下方表示新序列。中间方框表示map方法使用的映射规则,上图表示的是原序列上的整数数据通过映射(x -> 10 * x)转换成数值是原数值乘以10的新数据放到新序列中。

在代码中,map方法通过传入一个实现Func1接口的对象,能把原事件序列上的事件一对一映射成新类型的事件,Func1接口是一个函数式接口,只有一个回调方法call,回调方法有一个参数和一个返回值(除此之外还有Func2,Func3等接口,对应有两个参数和三个参数等等),call方法定义的就是映射规则,传入原序列的数据作为参数,返回新序列的数据。以下是map方法的简单的使用示例:

Observable<String> observable = Observable.just("Hello", "World", "RxJava");
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
   public void onNext(Integer arg) {
       System.out.println(arg);
   };

   @Override
   public void onError(   Throwable arg) {

   }

   @Override
   public void onCompleted() {
       System.out.println(       "onCompleted!!");
   }
};

observable.map(new Func1<String,    Integer>() {
   public Integer call(String arg) {
       return arg0.length();
   };
})
.subscribe(subscriber);

上述代码中,原序列的数据类型是String,新序列的数据类型是Integer,这里想获得原序列上的每个String的长度放到新序列当中,所以使用map方法做数据的变换,subscriber订阅新序列,响应新序列上的数据后就将其打印。代码执行的结果如下:

5
5
6
onCompleted!!

Observable.flatmap()

flatmap方法也是用来做数据映射的变换,与map一对一映射变换不同的是,flatmap是一个一对多的映射变换,原序列上的一个数据将被映射为新序列上的多个数据,它的映射方法返回的是一个Observable,这个Observable要发送的数据就是原数据一对多映射返回的所有数据,对每个原序列上的数据映射后返回的每个Observable,在新序列上会把这些Observable要发送的数据铺平发送,如上图所示。

以下代码是flatmap方法的简单使用实例,通过这代码再详细说明下flatmap的用法:

Observable<String> observable = Observable.just("Hello", "World", "RxJava");    
Subscriber<Character> subscriber = new Subscriber<Character>() {   public void onNext(Character arg){
       System.out.print(arg);
   };

   @Override
   public void onError(   Throwable arg) {

   }

   @Override
   public void onCompleted() {
       System.out.println();
       System.out.println(       "onCompleted!!");
   }
};

observable.flatMap(new Func1<String, Observable<Character>>() {
   public Observable<Character> call(       String arg0) {
       char[] array = arg0.toCharArray();
       ArrayList<Character> list = new ArrayList<Character>();
       for (char c : array) {
           list.add(c);
       }
      return Observable.from(list);
   };
})
.subscribe(subscriber);

上述代码中,对原序列的每个String对象,使用flatmap方法,把String对象里的所有字符转换成char类型的ArrayList,在映射方法的最后返回一个把转换得到的ArrayList作为发送事件的列表的Observable,这样就实现了一对多的变换,一个String对象对应char类型ArrayList里的所有数据。这些映射返回的新Observable会在新序列里发射数据,在代码中就是把原序列的所有String对象里的所有char数据在新序列发送出去,订阅的subscriber响应这些char类型数据并将其打印,当所有数据都发送完毕,打印onCompleted!!通知。

这段代码的执行结果如下:

H e l l o W o r l d R x J a v a 
onCompleted!!

Observable.filter()

filter实现的变换很简单:通过定义过滤规则,过滤掉原序列上不需要响应的事件,把真正感兴趣的事件放在新序列中发送。上图是filter方法的一个基本的使用示意图,如图所示,原序列发送的数据是int类型数据,定义的过滤规则是只保留原序列中数值大于10的数据,其余都丢弃。

以下是filter方法的代码使用实例:

Observable<String> observable = Observable.just("Hello", "World", "RxJava");    
Subscriber<String> subscriber = new Subscriber<String>() {        
   public void onNext(String arg){
       System.out.println(arg);
   };

   @Override
   public void onError(   Throwable arg) {            
          }

   @Override
   public void onCompleted() {
       System.out.println(       "onCompleted!!");
   } 
};

observable
.filter(new Func1<String,Boolean>(){
   public Boolean call(String arg){
       return arg0.length() < 6;
   };
})
.subscribe(subscriber);

上述代码中,filter方法传入了一个Func1对象作为过滤规则, 这个Func1对象传入的参数表示原序列上的当前要处理的数据,返回一个boolean值,表示这个数据是否要过滤,这里判断是否过滤的条件是传入的String对象长度是否小于6,是则保留,否则过滤。因此这里会把原序列中的"RxJava"这个String对象过滤掉,因为只有这个String对象的长度不小于6。代码执行的结果如下:

Hello 
World 
onCompleted!!

4.使用RxJava的线程切换

之前介绍过,使用RxJava之所以能够简单的编写异步代码,是因为其提供了相当便利的线程切换的操作符:subcribeOn和observerOn。这两个方法通过传入指定线程的Scheduler作为参数,分别指定后台处理然后发送事件的线程和响应事件的线程,线程间的通信同步等问题全交由RxJava框架内部去处理,我们只需要专注于业务的实现即可。因此使用RxJava处理异步问题相当简单。

Scheduler是RxJava用于线程调度的工具类,它提供了几个线程种类,分别用于不同的业务情景,如下表:

种类

效果

Schedulers.computation( )

用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量

Schedulers.from(executor)

使用指定的Executor作为调度器

Schedulers.immedate( )

在当前线程立即开始执行任务

Schedulers.io( )

用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器

Schedulers.newThread( )

为每个任务创建一个新线程

Schedulers.trampoline( )

当其它排队的任务完成后,在当前线程排队开始执行

下图是GitHub上的android开发应用了RxJava的一个demo:RxJava-Android-Samples的其中一个应用情景。这是一个简单的“后台处理完成后由前台响应”的情景,点击按钮启动后台处理,这里所谓后台处理其实是让异步线程睡眠3秒来模拟耗时的异步处理,过程中打印一些日志表示进度,结束后前台主线程响应打印日志。

以下是相关的实现代码,这里的线程切换全部使用了RxJava:

public void startLongOperation() {
     _progress.setVisibility(View.VISIBLE);
     _log("Button Clicked");
     _subscription = _getObservable()
       .subscribeOn(Schedulers.io())
       .bserveOn(AndroidSchedulers.mainThread())
       .subscribe(_getObserver());                            
 }

 private Observable<Boolean> _getObservable() {
    return Observable.just(true).map(       new Func1<Boolean, Boolean>() {
       @Override
       public Boolean call(Boolean aBoolean) {
           _log("Within Observable");
           _doSomeLongOperation_thatBlocksCurrentThread();
           return aBoolean;
       }
   });
}

 private Observer<Boolean> _getObserver() {
     return new Observer<Boolean>() {
       @Override
       public void onCompleted() {
           _log("On complete");
           _progress.setVisibility(View.INVISIBLE);
       }

       @Override
       public void onError(Throwable e) {
         Timber.e(e, "Error in demo");
         _log(String.format("Boo! Error %s",            e.getMessage()));
        _progress.setVisibility(View.INVISIBLE);
       }

       @Override
       public void onNext(Boolean bool) {
           _log(String.format("onNext with return                value "%b"", bool));
       }
   };
 }

 private void _doSomeLongOperation_thatBlocksCurrentThread() {
     _log("performing long operation");
     try {
         Thread.sleep(3000);
     } catch (InterruptedException e) {
         Timber.d("Operation was interrupted");
     }
 }

上述代码除了使用RxJava的基础库,还使用了RxJava的Android拓展库RxAndroid。这是一个针对Android API的RxJava拓展,例如上述代码中的AndroidSchedulers.mainThread(),就是这个库里的返回Android的UI主线程的Scheduler的方法,利用这个拓展库,可以更方便的使用RxJava开发Android的应用程序(RxAndroid其实只提供了几个类和方法,源码也是用RxJava实现的,比较简单,真正使用的话,可以只用RxJava,对RxAndroid里面用到的类可以仿照其源码自己实现)。

上述代码中,按钮点击是调用了startLongOperation方法。这个方法通过创建一个只发送一个boolean数据的Observable,并且在发送前让当前线程沉睡3秒,藉此模拟耗时处理。这个Observable被订阅之前调用了subscribeOn方法,传入的参数Schedulers.io()表示处理业务并生成发送事件都在io线程完成,然后调用observeOn方法,指定在UI主线程响应事件。线程切换就是这样使用这两个方法很简单的实现了。

以上就是本人入门RxJava的一些较为粗浅的学习分享,志在先做一个简单的科普,如果深入研究就会发现RxJava还有不少强大的功能。Android版QQ音乐项目将在6.1版本尝试接入RxJava,到时我将会再总结一下使用的经验分享出来,欢迎大家指教。

参考资料:

ReactiveX官网

RxJava Essential CN

给Android开发者的RxJava的详解