Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1537939
  • 博文数量: 113
  • 博客积分: 3526
  • 博客等级: 中校
  • 技术积分: 1815
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-08 09:46
个人简介

记录总结自己的工作

文章分类

全部博文(113)

文章存档

2015年(19)

2014年(10)

2013年(6)

2012年(16)

2011年(24)

2010年(21)

2009年(17)

分类: Android平台

2015-09-07 16:56:45

    RxJava只是ReactiveX(Reactive Extensions)的一种java实现, ReactiveX是一种响应式扩展框架,有很多种实现,如RxAndroid, RxJS, RxSwift, RxRuby等等。RX采用一种类似于观察者的形式来实现各种功能,跟我们一般的写代码思路差别较大。刚开始接触可能会觉得难以理解,但是一旦掌握地话就会体会到其强大之处。其原理就是创建一个Observable对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样把你想要处理的数据一步一步地加工成你想要的成品然后发射(emit)给Subscriber。

    RxAndroid是对RxJava在Android上的扩展,如果你是做安卓开发的话,各种主线程和子线程的操作肯定会让你觉得头疼,RxAndroid可以很容易地解决你的这种困扰。为了方便测试和编译,本文的demo程序都是基于RxAdroid来实现的。

    RX的强大就在其丰富的操作符,所以要灵活地使用RX的话就必须要掌握这些操作符,让我们首先来看一下如何创建Observable的操作符。

    一、Create
    Create是最基本的创建Observable的操作符,其原理图如下所示(本文中的原理图都使用了官网的图片)

    创建一个Observable最重要的就是要和合适的时机调用Subscriber的onNext/onComplete/onError方法。onNext就是发射处理好的数据给Subscriber; onComplete用来告诉Subscriber所有的数据都已发射完毕;onError是在发生错误的时候发射一个Throwable对象给Subscriber。需要注意的一点就是Observable必须调用所有的Subscriber的onComplete方法并且只能调用一次,出错的时候调用onError方法也是一样的,并且一旦调用后就不能调用Subscriber的任何其他方法了。下面是Create操作符的使用:

点击(此处)折叠或打开

  1. private Observable<Integer> createObserver() {
  2.         return Observable.create(new Observable.OnSubscribe<Integer>() {
  3.             @Override
  4.             public void call(Subscriber<? super Integer> subscriber) {
  5.                 if (!subscriber.isUnsubscribed()) {
  6.                     for (int i = 0; i < 5; i++) {
  7.                         int temp = new Random().nextInt(10);
  8.                         if (temp > 8) {
  9.                             //if value>8, we make an error
  10.                             subscriber.onError(new Throwable("value >8"));
  11.                             break;
  12.                         } else {
  13.                             subscriber.onNext(temp);
  14.                         }
  15.                         // on error,complete the job
  16.                         if (i == 4) {
  17.                             subscriber.onCompleted();
  18.                         }
  19.                     }
  20.                 }
  21.             }
  22.         });
  23.     }
    在这个方法里,我们创建并返回了个Observable,这个Observable会产生5个小于10的随机数并且依次发射出去,如果随机数大于8,我们就认为是一个Error。下面是我们对这个Observable的使用:
点击(此处)折叠或打开
  1. mLButton.setOnClickListener(e -> createObserver().subscribe(new Subscriber<Integer>() {
  2.             @Override
  3.             public void onCompleted() {
  4.                 log("onComplete!");
  5.             }

  6.             @Override
  7.             public void onError(Throwable e) {
  8.                 log("onError:" + e.getMessage());
  9.             }

  10.             @Override
  11.             public void onNext(Integer integer) {
  12.                 log("onNext:" + integer);
  13.             }
  14.         }));
    当点击button的时候,我们就会建立一个Subscriber对象并将其注册给创建的Observable对象,然后接收其发射来的数据。测试的时候共点击了两次,第一次顺利发射完了5个数据,第二次在发射了2个数据后产生了错误。运行结果如下:
  

    二、Range
    Range操作符根据出入的初始值n和数目m发射一系列大于等于n的m个值
    
    其使用也非常方便,仅仅制定初始值和数目就可以了,不用自己去实现对Subscriber的调用

点击(此处)折叠或打开

  1. private Observable<Integer> rangeObserver() {
  2.         return Observable.range(10, 5);
  3.     }
    对其订阅:

点击(此处)折叠或打开

  1. mRButton.setOnClickListener(e -> rangeObserver().subscribe(i -> log(i)));

    运行结果输出了10-14的5个数:


    三、Defer、Just
    Defer操作符只有当有Subscriber来订阅的时候才会创建一个新的Observable对象,也就是说每次订阅都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的,其特点我们将在下面和Just进行对比理解。
    

    Just操作符将某个对象转化为Observable对象,并且将其发射出去,可以使一个数字、一个字符串、数组、Iterate对象等,是一种非常快捷的创建Observable对象的方法,在以后的例子里会大量使用。

    下面我们来分别使用defer和just创建一个Observable,来返回当前的毫秒数

点击(此处)折叠或打开

  1. private Observable<Long> DeferObserver() {
  2.         return Observable.defer(() -> Observable.just(System.currentTimeMillis()));
  3.     }

  4.     private Observable<Long> JustObserver() {
  5.         return Observable.just(System.currentTimeMillis());
  6.     }
    分别对其订阅:

点击(此处)折叠或打开

  1. mLButton.setOnClickListener(e -> deferObservable.subscribe(time -> log("defer:" + time)));
  2.         mRButton.setOnClickListener(e -> justObservable.subscribe(time -> log("just:" + time)));

    好了,来看一下运行结果吧,可以看到使用Defer操作符创建Observable对象每次调用我们都可以得到最新的的当前时间,而使用just只会返回同一个时间。


    四、From
    From操作符用来将某个对象转化为Observable对象,并且依次将其内容发射出去。这个类似于just,但是just会将这个对象整个发射出去。比如说一个含有10个数字的数组,使用from就会发射10次,每次发射一个数字,而使用just会发射一次来将整个的数组发射出去。

    使用from创建两个Observable对象,来源分别是一个数组和list

点击(此处)折叠或打开

  1. private Observable<Integer> FromArray() {
  2.         return Observable.from(arrays);
  3.     }

  4.     private Observable<Integer> FromIterable() {
  5.         return Observable.from(list);
  6.     }
    进行订阅

点击(此处)折叠或打开

  1. mLButton.setOnClickListener(e -> FromArray().subscribe(i -> log("FromArray:" + i)));
  2. mRButton.setOnClickListener(e -> FromIterable().subscribe(i -> log("FromIterable:" + i)));
    运行结果如下,可以看到数组和list中的数据被依次地发射出来。


    五、Interval
    Interval所创建的Observable对象会从0开始,每隔固定的时间发射一个数字。需要注意的是这个对象是运行在computation Scheduler,所以如果需要在view中显示结果,要在主线程中订阅。

    使用interval创建一个Observable对象,其间隔为1秒钟。

点击(此处)折叠或打开

  1. private Observable<Long> interval() {
  2.         return Observable.interval(1, TimeUnit.SECONDS)
  3.         //interva operates by default on the computation Scheduler,so observe on main Thread
  4.         .observeOn(AndroidSchedulers.mainThread());
  5.     }
    进行订阅和反订阅,反订阅后将不会再收到Observable发射来的数据。

点击(此处)折叠或打开

  1.         Observable<Long> observable = interval();

  2.         Subscriber<Long> subscriber = new Subscriber<Long>() {
  3.             @Override
  4.             public void onCompleted() {
  5.                 log("onCompleted" );
  6.             }

  7.             @Override
  8.             public void onError(Throwable e) {
  9.                 log("onError:" + e.getMessage());
  10.             }

  11.             @Override
  12.             public void onNext(Long aLong) {
  13.                 log("interval:" + aLong);
  14.             }

  15.         };
  16.         mLButton.setText("Interval");
  17.         mRButton.setText("UnSubsCribe");
  18.         mLButton.setOnClickListener(e -> observable.subscribe(subscriber));
  19.         mRButton.setOnClickListener(e -> subscriber.unsubscribe());
    运行结果


    六、Repeat、Timer
    Repeat会将一个Observable对象重复发射,我们可以指定其发射的次数

    Timer会在指定时间后发射一个数字0,注意其也是运行在computation Scheduler

    分别使用Repeat和Timer创建一个Observable对象

点击(此处)折叠或打开

  1. private Observable<Integer> repeatObserver() {
  2.         return Observable.just(1).repeat(5);
  3.     }

  4.     private Observable<Long> timerObserver() {
  5.         //timer by default operates on the computation Scheduler
  6.         return Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread());
  7.     }
    进行订阅

点击(此处)折叠或打开

  1.         mLButton.setOnClickListener(e -> repeatObserver().subscribe(i -> log("repeat:" + i)));
  2.         mRButton.setOnClickListener(e -> timerObserver().subscribe(i -> log("timer:" + i)));
    运行结果如下,可以看到Repeat创建的对象发射了5个1,Timer创建的对象在1秒钟后发射了一个0。


    创建操作符还有Nerver/Empty/Throw等,非常简单但是我感觉可能用到的机会不多,就不细说了。

    本文的demo程序见github:  />

阅读(17142) | 评论(8) | 转发(2) |
给主人留下些什么吧!~~

云少嘎嘎嘎2016-01-05 13:34:42

muximus3:嗯,通常我们用的主题都是一次性的传导,就像发射一次炮弹,但是如果我想长期观察一个变量的值一旦有改变就得到通知,这个该怎么用rxjava实现呢

可以考虑使用RxRelay来实现https://github.com/JakeWharton/RxRelay

回复 | 举报

muximus32016-01-04 12:16:47

云少嘎嘎嘎:Rxjava 本来就是一种观察者模式。你创建一个Observable对象,在这个对象里面进行主题的改变操作,完成后知订阅者的时候调用一下onNext就可以了啊。

嗯,通常我们用的主题都是一次性的传导,就像发射一次炮弹,但是如果我想长期观察一个变量的值一旦有改变就得到通知,这个该怎么用rxjava实现呢

回复 | 举报

云少嘎嘎嘎2015-12-16 10:04:43

muximus3:楼主,你好,我用rxjava有一段时间了,不过有个问题一直没弄清楚,就是怎么用rxjava实现传统的观察者模式,比如常驻一个观察者,当主题改变时才会发出数据?楼主有什么好的建议?

Rxjava 本来就是一种观察者模式。你创建一个Observable对象,在这个对象里面进行主题的改变操作,完成后知订阅者的时候调用一下onNext就可以了啊。

回复 | 举报

muximus32015-12-15 23:07:50

楼主,你好,我用rxjava有一段时间了,不过有个问题一直没弄清楚,就是怎么用rxjava实现传统的观察者模式,比如常驻一个观察者,当主题改变时才会发出数据?楼主有什么好的建议?

云少嘎嘎嘎2015-11-05 16:18:06

muximus3:应该是第一个比较完整的中文文档了,感谢楼主分享。弱弱问一句,可以转载吗

当然可以了,只要注明出处行了

回复 | 举报