Rxjava使用讲解

使用Rxjava进行输入框搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable.just(keyword)
//EditText内容输入后,500毫秒后才发送事件
.debounce(500, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
//过滤掉空字符串
.filter(text -> !TextUtils.isEmpty(text))
//保证发送的数据是最新的
.switchMap((Function<String, Observable<CommonResp<List<SchoolPojo>>>>) text -> query(text.trim()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(bindToLifecycle())
.subscribe(new BaseCommonObserver<CommonResp<List<SchoolPojo>>>() {
@Override
protected void onError(String error) {
ToastUtils.showShortToast("网络异常");
relatedSearchAdapter.setNewData(Collections.EMPTY_LIST);
showRelatedSearchRv();
}

@Override
protected void onSuccess(CommonResp<List<SchoolPojo>> data) {
if (CollectionUtils.isListEmpty(data.getData())) {
relatedSearchAdapter.setNewData(Collections.EMPTY_LIST);
} else {
relatedSearchAdapter.setNewData(data.getData());
}
showRelatedSearchRv();
}
});

Single/Maybe/Completable

Single:只发送一个事件:onSuccess(T t)或者onError(Throwable e),适合网络请求,

Completable:不关心数据,只关心结果,只有onComplete()和onError(Throwable e)方法,通常会配合andThen一起使用

Maybe: 发送0个或1个事件,onSuccess(T t)/onError(Throwable e)/onComplete()

combineLatest

combineLatest()和zip()都是对observableA和observableB按照Func2中制定的规则进行组合,二者最大的不同在于,zip()的组合顺序是observableA和observableB中的元素有一一对应的关系,相同位置的元素按照Func2中制定的规则进行组合,combineLatest()就没有这种所谓的一一对应的关系,而是observableA或者observableB发射一个元素时,这个元素会向前去寻找另一个observable发射出来的元素,直到寻找到一个为止,然后再按照Func2中制定的规则进行组合

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable<List<Entry>> purpleFeedObservable =
FeedObservable.getFeed("https://news.google.com/?output=atom");
Observable<List<Entry>> yellowFeedObservable =
FeedObservable.getFeed("http://www.theregister.co.uk/software/headlines.atom");

Observable<List<Entry>> combinedObservable = Observable.combineLatest(purpleFeedObservable, yellowFeedObservable,(purpleList, yellowList) -> {
final List<Entry> list = new ArrayList<>();
list.addAll(purpleList);
list.addAll(yellowList);
return list;
}
);

combinedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(this::drawList);

zip

zip(observableA, observableB, Func2)用来合并两个Observable对象发射的数据项并合成一个新Observable对象,根据Func2函数生成一个新的值并发射出去,在这里Func2就相当于observableA和observableB的合并规则,当其中一个Observable对象发送数据结束或者出现异常后,另一个Observable对象也将停止发射数据。

img

Merge

merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。你可以简单的将它理解为两个Observable合并成了一个Observable,合并后的数据是无序的。

img

startWith

startWith(T)用于在源Observable发射的数据前插入数据。使用startWith(Iterator)我们还可以在源Observable发射的数据前插入Iterator。

img

Concat

concat(Observable<? extends T>, Observable<? extends T>)和concat(Observable<? extends Observable>)用于将多个Observable发射的的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射玩是不会发射后一个Observable的数据的。它和merge、startWitch和相似,不同之处在于merge合并后发射的数据是无序的,startWitch只能在源Observable发射的数据前插入数据,而concat是在另一个Observable上进行合并并且合并的发射数据是有序的。

img

onErrorReturn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Observable.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return null;
}
}).onErrorReturn(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) throws Exception {
System.out.println("onErrorReturn--->" + throwable.getMessage());
//如果正常返回,那么不会走onError逻辑,只会走onNext逻辑
return "";
//如果返回null或者抛出异常,那么会走onError逻辑,不会走onNext逻辑
//throw new Exception("11");
//return null;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
System.out.println("onNext--->" + s);
}

@Override
public void onError(Throwable e) {
System.out.println("onError--->" + e.getMessage());
}

@Override
public void onComplete() {

}
});

flatMap

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Flowable.fromIterable(MockData.getAllStudentInfoById(0))
.flatMap(new Function<Student, Publisher<Source>>() {
@Override
public Publisher<Source> apply(@NonNull Student student) throws Exception {
return Flowable.fromIterable(student.mSources);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Source>() {
@Override
public void accept(@NonNull Source source) throws Exception {
String content = "sourceName:"+source.name +" source score:"+source.score;
mTextView.setText(content);
Log.i(TAG,content);

}
});

img

Compose

使用一个Transformer将一种类型的Observable转换为另一种类型Observable.通过compose我们可以实现一系列操作符的复用,并且还可以保证链式调用不被打断

1
2
observable.subscribeOn(Schedulers.io).observerOn(AndroidSchedulers.mainThread())

Author

jiangyao

Posted on

2021-06-12

Updated on

2022-10-12

Licensed under