Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1535045
  • 博文数量: 113
  • 博客积分: 3526
  • 博客等级: 中校
  • 技术积分: 1815
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-08 09:46
个人简介

记录总结自己的工作

文章分类

全部博文(113)

文章存档

2015年(19)

2014年(10)

2013年(6)

2012年(16)

2011年(24)

2010年(21)

2009年(17)

分类: Android平台

2015-09-17 11:28:28

    上一篇文章中我们了解了如何对数据进行过滤,在这篇文章里我们来了解一下如何组装多个Observable的数据。

    一、CombineLatest
    CombineLatest操作符可以将2~9个Observable发射的数据组装起来然后再发射出来。不过还有两个前提:
    1.所有的Observable都发射过数据。
    2.满足条件1的时候任何一个Observable发射一个数据,就将所有Observable最新发射的数据按照提供的函数组装起来发射出去。
    
    Rxjava实现CombineLast操作符可以让我们直接将组装的Observable作为参数传值,也可以将所有的Observable装在一个List里面穿进去。

    下面我们创建几个Observable对象,分别直接传值和使用List传值将其组装起来

点击(此处)折叠或打开

  1. private Observable<Integer> createObserver(int index) {
  2.         return Observable.create(new Observable.OnSubscribe<Integer>() {
  3.             @Override
  4.             public void call(Subscriber<? super Integer> subscriber) {
  5.                 for (int i = 1; i < 6; i++) {
  6.                     subscriber.onNext(i * index);
  7.                     try {
  8.                         Thread.sleep(1000);
  9.                     } catch (InterruptedException e) {
  10.                         e.printStackTrace();
  11.                     }
  12.                 }
  13.             }
  14.         }).subscribeOn(Schedulers.newThread());
  15.     }

  16.     private Observable<Integer> combineLatestObserver() {
  17.         return Observable.combineLatest(createObserver(1), createObserver(2), (num1, num2) -> {
  18.             log("left:" + num1 + " right:" + num2);
  19.             return num1 + num2;
  20.         });
  21.     }

  22.     List<Observable<Integer>> list = new ArrayList<>();

  23.     private Observable<Integer> combineListObserver() {
  24.         for (int i = 1; i < 5; i++) {
  25.             list.add(createObserver(i));
  26.         }
  27.         return Observable.combineLatest(list, args -> {
  28.             int temp = 0;
  29.             for (Object i : args) {
  30.                 log(i);
  31.                 temp += (Integer) i;
  32.             }
  33.             return temp;
  34.         });
  35.     }
    对其进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("combineList");
  2.         mLButton.setOnClickListener(e -> combineListObserver().subscribe(i -> log("combineList:" + i)));
  3.         mRButton.setText("CombineLatest");
  4.         mRButton.setOnClickListener(e -> combineLatestObserver().subscribe(i -> log("CombineLatest:" + i)));
    运行结果如下


    二、Join
    Join操作符根据时间窗口来组合两个Observable发射的数据,每个Observable都有一个自己的时间窗口,要组合的时候,在这个时间窗口内的数据都有有效的,可以拿来组合。
    Rxjava还实现了groupJoin,基本和join相同,只是最后组合函数的参数不同。

    使用join操作符需要4个参数,分别是:
    1.源Observable所要组合的目标Observable
    2.一个函数,就收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射出来数据的有效期
    3.一个函数,就收从目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射出来数据的有效期
    4.一个函数,接收从源Observable和目标Observable发射来的数据,并返回最终组合完的数据。
      
    下面我们使用join和groupJoin操作符分别来组合两个Observable对象

点击(此处)折叠或打开

  1. private Observable<String> createObserver() {
  2.         return Observable.create(new Observable.OnSubscribe<String>() {
  3.             @Override
  4.             public void call(Subscriber<? super String> subscriber) {
  5.                 for (int i = 1; i < 5; i++) {
  6.                     subscriber.onNext("Right-" + i);
  7.                     try {
  8.                         Thread.sleep(1000);
  9.                     } catch (InterruptedException e) {
  10.                         e.printStackTrace();
  11.                     }
  12.                 }
  13.             }
  14.         }).subscribeOn(Schedulers.newThread());
  15.     }


  16.     private Observable<String> joinObserver() {
  17.         return Observable.just("Left-").join(createObserver(),
  18.                 integer -> Observable.timer(3000, TimeUnit.MILLISECONDS),
  19.                 integer -> Observable.timer(2000, TimeUnit.MILLISECONDS),
  20.                 (i, j) -> i + j
  21.         );
  22.     }

  23.     private Observable<Observable<String>> groupJoinObserver() {
  24.         return Observable.just("Left-")
  25.                 .groupJoin(createObserver(),
  26.                         s -> Observable.timer(3000, TimeUnit.MILLISECONDS),
  27.                         s -> Observable.timer(2000, TimeUnit.MILLISECONDS),
  28.                         (s, stringObservable) -> stringObservable.map(str -> s + str));
  29.     }
    分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("join");
  2. mLButton.setOnClickListener(e -> joinObserver().subscribe(i -> log("join:" + i + "\n")));
  3. mRButton.setText("groupJoin");
  4. mRButton.setOnClickListener(e -> groupJoinObserver().subscribe(i -> i.subscribe(j -> log("groupJoin:" + j + "\n"))));
    运行结果如下,可以看到虽然目标Observable发射了4个数据,但是源Observable只发射了一个有效期为3秒的数据,所以最终的组合结果也只有3个数据。


    三、Merege
    Merge操作符将多个Observable发射的数据整合起来发射,就如同是一个Observable发射的数据一样。但是其发射的数据有可能是交错的,如果想要没有交错,可以使用concat操作符。
    当某一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误终止merge的过程,可以使用MeregeDelayError操作符,会将错误在merge结束后再分发。

    下面我们分别使用merge和mergeDelayError操作符来进行merge操作。

点击(此处)折叠或打开

  1. private Observable<Integer> mergeObserver() {
  2.         return Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6));
  3.     }

  4.     private Observable<Integer> mergeDelayErrorObserver() {
  5.         return Observable.mergeDelayError(Observable.create(new Observable.OnSubscribe<Integer>() {
  6.             @Override
  7.             public void call(Subscriber<? super Integer> subscriber) {
  8.                 for (int i = 0; i < 5; i++) {
  9.                     if (i == 3) {
  10.                         subscriber.onError(new Throwable("error"));
  11.                     }
  12.                     subscriber.onNext(i);

  13.                 }
  14.             }
  15.         }), Observable.create(new Observable.OnSubscribe<Integer>() {
  16.             @Override
  17.             public void call(Subscriber<? super Integer> subscriber) {
  18.                 for (int i = 0; i < 5; i++) {
  19.                     subscriber.onNext(5 + i);
  20.                 }
  21.                 subscriber.onCompleted();
  22.             }
  23.         }));
  24.     }
    分别对其订阅
    

点击(此处)折叠或打开

  1. mLButton.setText("Merge");
  2.         mLButton.setOnClickListener(e -> mergeObserver().subscribe(i -> log("Merge:" + i)));
  3.         mRButton.setText("mergeDelayError");
  4.         mRButton.setOnClickListener(e -> mergeDelayErrorObserver().subscribe(new Subscriber<Integer>() {
  5.             @Override
  6.             public void onCompleted() {
  7.                 log("onCompleted");
  8.             }

  9.             @Override
  10.             public void onError(Throwable e) {
  11.                 log("mergeDelayError:" + e);

  12.             }

  13.             @Override
  14.             public void onNext(Integer integer) {
  15.                 log("mergeDelayError:" + integer);

  16.             }
  17.         }));
    运行结果如下。


    四、StartWith、Switch
    StartWith操作符会在源Observable发射的数据前面插上一些数据。不仅仅只可以插入一些数据,还可以将Iterable和Observable插入进入。如果插入的是Observable,则这个Observable发射的数据会插入到
源Observable发射数据的前面。

    switch操作符在Rxjava上的实现为switchOnNext,用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。
需要注意的就是,如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新
的小Observable所发射的数据。可以看示意图中的黄色圆圈就被丢弃了。


    下面使用startWith和switchOnNext操作符来组合两个Observable

点击(此处)折叠或打开

  1. private Observable<Integer> startWithObserver() {
  2.         return Observable.just(1, 2, 3).startWith(-1, 0);
  3.     }

  4.     private Observable<String> switchObserver() {
  5.         return Observable.switchOnNext(Observable.create(
  6.                 new Observable.OnSubscribe<Observable<String>>() {
  7.                     @Override
  8.                     public void call(Subscriber<? super Observable<String>> subscriber) {
  9.                         for (int i = 1; i < 3; i++) {
  10.                             subscriber.onNext(createObserver(i));
  11.                             try {
  12.                                 Thread.sleep(2000);
  13.                             } catch (InterruptedException e) {
  14.                                 e.printStackTrace();
  15.                             }
  16.                         }
  17.                     }
  18.                 }
  19.         ));
  20.     }

  21.     private Observable<String> createObserver(int index) {
  22.         return Observable.create(new Observable.OnSubscribe<String>() {
  23.             @Override
  24.             public void call(Subscriber<? super String> subscriber) {
  25.                 for (int i = 1; i < 5; i++) {
  26.                     subscriber.onNext(index + "-" + i);
  27.                     try {
  28.                         Thread.sleep(1000);
  29.                     } catch (InterruptedException e) {
  30.                         e.printStackTrace();
  31.                     }
  32.                 }
  33.             }
  34.         }).subscribeOn(Schedulers.newThread());
  35.     }
    分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("StartWith");
  2. mLButton.setOnClickListener(e -> startWithObserver().subscribe(i -> log("StartWith:" + i)));
  3. mRButton.setText("switch");
  4. mRButton.setOnClickListener(e -> switchObserver().subscribe(i -> log("switch:" + i)));
    运行结果如下,可以看到startwith将-1和0插入到前面。使用siwtch的时候第一个小Observable只发射出了两个数据,第二个小Observable就被源Observable发射出来了,所以其接下来的两个数据被丢弃。


    五、Zip
    Zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。
    Rxjava实现了zip和zipWith两个操作符。

    下面我们使用zip和zipWith操作符来组合数据
    

点击(此处)折叠或打开

  1. private Observable<String> zipWithObserver() {
  2.         return createObserver(2).zipWith(createObserver(3), (s, s2) -> s + "-" + s2);
  3.     }

  4.     private Observable<String> zipWithIterableObserver() {
  5.         return Observable.zip(createObserver(2), createObserver(3), createObserver(4), (s, s2, s3) -> s + "-" + s2 + "-" + s3);
  6.     }

  7.     private Observable<String> createObserver(int index) {
  8.         return Observable.create(new Observable.OnSubscribe<String>() {
  9.             @Override
  10.             public void call(Subscriber<? super String> subscriber) {
  11.                 for (int i = 1; i <= index; i++) {
  12.                     log("emitted:" + index + "-" + i);
  13.                     subscriber.onNext(index + "-" + i);
  14.                     try {
  15.                         Thread.sleep(500);
  16.                     } catch (InterruptedException e) {
  17.                         e.printStackTrace();
  18.                     }
  19.                 }
  20.             }
  21.         }).subscribeOn(Schedulers.newThread());
  22.     }
    分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("zipWith");
  2. mLButton.setOnClickListener(e -> zipWithObserver().subscribe(i -> log("zipWith:" + i + "\n")));
  3. mRButton.setText("zip");
  4. mRButton.setOnClickListener(e -> zipWithIterableObserver().subscribe(i -> log("zip:" + i + "\n")));
    运行结果如下,可以看到,最终都发射出了两个数据,因为createObserver(2)所创建的Observable只会发射两个数据,所以其他Observable多余发射的数据都被丢弃了。
  

    Combning的操作符就到这了,本文中的源码见 style="color:#666666;font-family:宋体, Arial;line-height:26px;white-space:normal;background-color:#FFFFFF;"> 
阅读(11311) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~