Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1573626
  • 博文数量: 113
  • 博客积分: 3526
  • 博客等级: 中校
  • 技术积分: 1815
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-08 09:46
个人简介

记录总结自己的工作

文章分类

全部博文(113)

文章存档

2015年(19)

2014年(10)

2013年(6)

2012年(16)

2011年(24)

2010年(21)

2009年(17)

分类: Android平台

2015-09-14 16:49:06

    在上一篇文章里,我们了解了转化操作符,能将数据转化为我们想要的格式,但是如果数据集合里面有一些我们想要过滤掉的数据怎么办?这时候我们就需要使用过滤操作符了,有点类似于sql里的where,让Observable只返回满足我们条件的数据。

    一、debounce
    debounce操作符就是起到了限流的作用,可以理解为阀门,当你半开阀门的时候,水会以较慢的速度流出来。不同之处就是阀门里的水不会浪费掉,而debounce过滤掉的数据会被丢弃掉。在Rxjava中,将这个操作符氛围了throttleWithTimeoutdebounce两个操作符。先来看一下throttleWithTimeOut吧,如下图所示,这个操作符通过时间来限流,源Observable每次发射出来一个数据后就会进行计时,如果在设定好的时间结束前源Observable有新的数据发射出来,这个数据就会被丢弃,同时重新开始计时。如果每次都是在计时结束前发射数据,那么这个限流就会走向极端:只会发射最后一个数据。


    首先我们来创建一个Observable,每隔100毫秒发射一个数据,当要发射的数据是3的倍数的时候,下一个数据就延迟到300毫秒再发射。

点击(此处)折叠或打开

  1. private Observable<Integer> createObserver() {
  2.         return Observable.create(new Observable.OnSubscribe<Integer>() {
  3.             @Override
  4.             public void call(Subscriber<? super Integer> subscriber) {
  5.                 for (int i = 0; i < 10; i++) {
  6.                     if (!subscriber.isUnsubscribed()) {
  7.                         subscriber.onNext(i);
  8.                     }
  9.                     int sleep = 100;
  10.                     if (i % 3 == 0) {
  11.                         sleep = 300;
  12.                     }
  13.                     try {
  14.                         Thread.sleep(sleep);
  15.                     } catch (InterruptedException e) {
  16.                         e.printStackTrace();
  17.                     }

  18.                 }
  19.                 subscriber.onCompleted();
  20.             }
  21.         }).subscribeOn(Schedulers.computation());
  22.     }
    下面使用throttleWithTimeOut来过滤一下这个Observable,我们设定的过滤时间是200毫秒,也就说发射间隔小于200毫秒的数据会被过滤掉。

点击(此处)折叠或打开

  1. private Observable<Integer> throttleWithTimeoutObserver() {
  2.         return createObserver().throttleWithTimeout(200, TimeUnit.MILLISECONDS)
  3.                 .observeOn(AndroidSchedulers.mainThread());

  4.     }
    对其进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("throttleWithTimeout");
  2. mLButton.setOnClickListener(e -> throttleWithTimeoutObserver().subscribe(i -> log("throttleWithTimeout:" + i)));
    运行结果如下,可以看到,不是3的倍数的数据在发射后200毫秒内会发射出新的数据,所以会被过滤掉。

    debounce操作符也可以使用时间来进行过滤,这时它跟throttleWithTimeOut使用起来是一样,但是deounce操作符还可以根据一个函数来进行限流。这个函数的返回值是一个临时Observable,如果源Observable在发射一个新的数据的时候,上一个数据根据函数所生成的临时Observable还没有结束,那么上一个数据就会被过滤掉。

    生成一个Observable并使用debounce对其进行过滤,只有发射来的数据为偶数的时候才会调用onCompleted方法来表示这个临时的Observable已经终止。

点击(此处)折叠或打开

  1. private Observable<Integer> debounceObserver() {
  2.         return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).debounce(integer -> {
  3.             log(integer);
  4.             return Observable.create(new Observable.OnSubscribe<Integer>() {
  5.                 @Override
  6.                 public void call(Subscriber<? super Integer> subscriber) {
  7.                     if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
  8.                         log("complete:" + integer);
  9.                         subscriber.onNext(integer);
  10.                         subscriber.onCompleted();
  11.                     }
  12.                 }
  13.             });
  14.         })
  15.                 .observeOn(AndroidSchedulers.mainThread());
  16.     }
    对其进行订阅

点击(此处)折叠或打开

  1. mRButton.setOnClickListener(e -> debounceObserver().subscribe(i -> log("debounce:" + i)));
    运行结果如下,可以看到,只有那些调用了onCompleted方法的数据才会被发射出来,其他的都过滤掉了。


    二、Distinct
    Distinct操作符的用处就是用来去重,非常好理解。如下图所示,所有重复的数据都会被过滤掉。还有一个操作符distinctUntilChanged,是用来过滤掉连续的重复数据。

    创建两个Observable,并使用Distinct和DistinctUtilChanged操作符分别对其进行过滤

点击(此处)折叠或打开

  1. private Observable<Integer> distinctObserver() {
  2.         return Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1).distinct();

  3.     }

  4.     private Observable<Integer> distinctUntilChangedObserver() {
  5.         return Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3).distinctUntilChanged();

  6.     }
    进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("distinct");
  2. mLButton.setOnClickListener(e -> distinctObserver().subscribe(i -> log("distinct:" + i)));
  3. mRButton.setText("UntilChanged");
  4. mRButton.setOnClickListener(e -> distinctUntilChangedObserver().subscribe(i -> log("UntilChanged:" + i)));
    运行结果如下所示,可以看到Distinct过滤掉了所有重复的数字,二DistinctUtilChanged只过滤掉重复的数字


    三、ElementAt、Filter
    这两个操作符都很好理解,ElementAt只会返回指定位置的数据,而Filter只会返回满足过滤条件的数据,其示意图分别如下所示

    创建两个Observable对象并分别使用ElementAt和Filter操作符对其进行过滤

点击(此处)折叠或打开

  1. private Observable<Integer> elementAtObserver() {
  2.         return Observable.just(0, 1, 2, 3, 4, 5).elementAt(2);
  3.     }

  4.     private Observable<Integer> FilterObserver() {
  5.         return Observable.just(0, 1, 2, 3, 4, 5).filter(i -> i < 3);
  6.     }
    分别对其进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("elementAt");
  2. mLButton.setOnClickListener(e -> elementAtObserver().subscribe(i -> log("elementAt:" + i)));
  3. mRButton.setText("Filter");
  4. mRButton.setOnClickListener(e -> FilterObserver().subscribe(i -> log("Filter:" + i)));
    运行结果如下


    四、First、Last
    First操作符只会返回第一条数据,并且还可以返回满足条件的第一条数据。如果你看过我以前的博客,就会发现在我们使用Rxjava实现三级缓存的例子里,就是使用first操作符来选择所要使用的缓存。与First相反,Last操作符只返回最后一条满足条件的数据。

    另外还有一个BlockingObservable方法,这个方法不会对Observable做任何处理,只会阻塞住,当满足条件的数据发射出来的时候才会返回一个BlockingObservable对象。可以使用Observable.toBlocking或者BlockingObservable.from方法来将一个Observable对象转化为BlockingObservable对象。BlockingObservable可以和first操作符进行配合使用。

    创建两个Observable对象并分别使用first操作符进行处理

点击(此处)折叠或打开

  1. private Observable<Integer> FirstObserver() {
  2.         return Observable.just(0, 1, 2, 3, 4, 5).first(i -> i > 1);
  3.     }

  4.     private BlockingObservable<Integer> FilterObserver() {
  5.         return Observable.create(new Observable.OnSubscribe<Integer>() {
  6.             @Override
  7.             public void call(Subscriber<? super Integer> subscriber) {
  8.                 for (int i = 0; i < 5; i++) {
  9.                     try {
  10.                         Thread.sleep(500);
  11.                     } catch (InterruptedException e) {
  12.                         e.printStackTrace();
  13.                     }
  14.                     if (!subscriber.isUnsubscribed()) {
  15.                         log("onNext:" + i);
  16.                         subscriber.onNext(i);
  17.                     }
  18.                 }
  19.                 if (!subscriber.isUnsubscribed()) {
  20.                     subscriber.onCompleted();
  21.                 }
  22.             }
  23.         }).toBlocking();
  24.     }
    分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("First");
  2.         mLButton.setOnClickListener(e -> FirstObserver().subscribe(i -> log("First:" + i)));
  3.         mRButton.setText(" Blocking");
  4.         mRButton.setOnClickListener(e -> {
  5.             log("blocking:" + FilterObserver().first(i -> i > 1));
  6.         });
    运行结果如下。可以看到first操作符返回了第一个大于1的数2,而BlockingObservable则一直阻塞着,直到第一个大于1的数据发射出来。

    
    五、Skip、Take
    Skip操作符将源Observable发射的数据过滤掉前n项,而Take操作符则只取前n项,理解和使用起来都很容易,但是用处很大。另外还有SkipLast和TakeLast操作符,分别是从后面进行过滤操作。

    创建两个Observable并分别使用skip和take操作符对其进行过滤操作

点击(此处)折叠或打开

  1. private Observable<Integer> skipObserver() {
  2.         return Observable.just(0, 1, 2, 3, 4, 5).skip(2);
  3.     }
  4.     private Observable<Integer> takeObserver() {
  5.         return Observable.just(0, 1, 2, 3, 4, 5).take(2);
  6.     }
    分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("Skip");
  2. mLButton.setOnClickListener(e -> skipObserver().subscribe(i -> log("Skip:" + i)));
  3. mRButton.setText("Take");
  4. mRButton.setOnClickListener(e -> takeObserver().subscribe(i -> log("Take:" + i)));
    运行结果如下,可以看到skip过滤掉了前两项,而take则过滤掉了除了前两项的其他所有项。


    六、Sample、ThrottleFirst
    Sample操作符会定时地发射源Observable最近发射的数据,其他的都会被过滤掉,等效于ThrottleLast操作符,而ThrottleFirst操作符则会定期发射这个时间段里源Observable发射的第一个数据

    我们创建一个Observable每隔200毫秒发射一个数据,然后分别使用sample和throttleFirst操作符对其进行过滤

点击(此处)折叠或打开

  1. private Observable<Integer> sampleObserver() {
  2.         return createObserver().sample(1000, TimeUnit.MILLISECONDS);
  3.     }

  4.     private Observable<Integer> throttleFirstObserver() {
  5.         return createObserver().throttleFirst(1000, TimeUnit.MILLISECONDS);
  6.     }


  7.     private Observable<Integer> createObserver() {
  8.         return Observable.create(new Observable.OnSubscribe<Integer>() {
  9.             @Override
  10.             public void call(Subscriber<? super Integer> subscriber) {
  11.                 for (int i = 0; i < 20; i++) {
  12.                     try {
  13.                         Thread.sleep(200);
  14.                     } catch (InterruptedException e) {
  15.                         e.printStackTrace();
  16.                     }
  17.                     subscriber.onNext(i);
  18.                 }
  19.                 subscriber.onCompleted();
  20.             }
  21.         });
  22.     }
    分别进行订阅

点击(此处)折叠或打开

  1. mLButton.setText("sample");
  2. mLButton.setOnClickListener(e -> sampleObserver().subscribe(i -> log("sample:" + i)));
  3. mRButton.setText("throttleFirst");
  4. mRButton.setOnClickListener(e -> throttleFirstObserver().subscribe(i -> log("throttleFirst:" + i)));
    运行结果如下,可以看到sample操作符会每隔5个数字发射出一个数据来,而throttleFirst则会每隔5个数据发射第一个数据。


    本文的demo程序见github:  />
阅读(13747) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~