使用了 RxJava2 有一段時(shí)間了,深深感受到了其“牛逼”之處。下面,就從 RxJava2 的基礎(chǔ)開始,一步步與大家分享一下這個(gè)強(qiáng)大的異步庫的用法!RxJava 是 一個(gè)在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫,也就是用于實(shí)現(xiàn)異步操作的庫。
RxJava可以濃縮為異步兩個(gè)字,其核心的東西不外乎兩個(gè), Observables(被觀察者) 和 Observable(觀察者)。Observables可以發(fā)出一系列的 事件(例如網(wǎng)絡(luò)請求、復(fù)雜計(jì)算、數(shù)據(jù)庫操作、文件讀取等),事件執(zhí)行結(jié)束后交給Observable 的回調(diào)處理。
觀察者模式是對(duì)象的行為模式,也叫做發(fā)布-訂閱(Publish/Subscribe)模式、模型-視圖(Model/View)模式、源-監(jiān)聽器(Source/Listener)模式或從屬者(Dependents)模式。
什么是觀察者模式?舉個(gè)栗子,Android中View的點(diǎn)擊監(jiān)聽器的實(shí)現(xiàn),View是被觀察者,OnClickListener對(duì)象是觀察者,Activity要如何知道View被點(diǎn)擊了?那就是派一個(gè)OnClickListener對(duì)象,入駐View,與View達(dá)成一個(gè)訂閱關(guān)系,一旦View被點(diǎn)擊了,就通過OnClickListener對(duì)象的OnClick方法傳達(dá)給Activity。采用觀察者模式可以避免去輪詢檢查,節(jié)約有限的cpu資源。
RxJava 作為一個(gè)工具庫,使用的便是通用形式的觀察者模式:
普通事件:onNext(),相當(dāng)于 onClick()、onEvent();特殊事件:onCompleted() 和 onError()
如圖所示,RxJava 的基本概念分別為:Observable(被觀察者,事件源),Observer(觀察者,訂閱者),subscribe (訂閱)、事件;不同的是,RxJava 把多個(gè)事件看做一個(gè)隊(duì)列,并對(duì)每個(gè)事件單獨(dú)處理。在一個(gè)隊(duì)列中 onCompleted() 和 onError(),只有一個(gè)會(huì)被調(diào)用。如果調(diào)用了 onCompleted() 就說明隊(duì)列執(zhí)行完畢,沒有出現(xiàn)異常,否則調(diào)用 onError() 方法并終止隊(duì)列。
什么是響應(yīng)式編程?舉個(gè)栗子,a = b + c; 這句代碼將b+c的值賦給a,而之后如果b和c的值改變了不會(huì)影響到a,然而,對(duì)于響應(yīng)式編程,之后b和c的值的改變也動(dòng)態(tài)影響著a,意味著a會(huì)隨著b和c的變化而變化。
響應(yīng)式編程的組成為Observable/Operator/Subscriber,RxJava在響應(yīng)式編程中的基本流程如下:
這個(gè)流程,可以簡單的理解為:Observable -> Operator1 -> Operator2 -> Operator3 -> Subscriber
Observable發(fā)出一系列事件,他是事件的產(chǎn)生者;
Subscriber負(fù)責(zé)處理事件,他是事件的消費(fèi)者;
Operator是對(duì)Observable發(fā)出的事件進(jìn)行修改和變換;
若事件從產(chǎn)生到消費(fèi)不需要其他處理,則可以省略掉中間的Operator,從而流程變?yōu)镺bsevable -> Subscriber;
Subscriber通常在主線程執(zhí)行,所以原則上不要去處理太多的事務(wù),而這些復(fù)雜的處理則交給Operator;
首先需要添加 RxJava2 在 Android 中的 Gradle 依賴:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.8'
RxJava2 可以通過下面這幾種方法創(chuàng)建被觀察者:
// 發(fā)送對(duì)應(yīng)的方法
Observable.create(new ObservableOnSubscribeString>() {
// 默認(rèn)在主線程里執(zhí)行該方法
@Override
public void subscribe(@NonNull ObservableEmitterString> e) throws Exception {
e.onNext('Hello');
e.onNext('World');
// 結(jié)束標(biāo)識(shí)
e.onComplete();
}
});
// 發(fā)送多個(gè)數(shù)據(jù)
Observable.just('Hello', 'World');
// 發(fā)送數(shù)組
Observable.fromArray('Hello', 'World');
// 發(fā)送一個(gè)數(shù)據(jù)
Observable.fromCallable(new CallableString>() {
@Override
public String call() throws Exception {
return 'Hello';
}
});
RxJava2 支持鏈?zhǔn)骄幊?,下來我們?chuàng)建被觀察者,然后創(chuàng)建觀察者并訂閱:
// 創(chuàng)建被觀察者
Observable.just('Hello', 'World')
// 將被觀察者切換到子線程
.subscribeOn(Schedulers.io())
// 將觀察者切換到主線程
.observeOn(AndroidSchedulers.mainThread())
// 創(chuàng)建觀察者并訂閱
.subscribe(new ObserverString>() {
// Disposable 相當(dāng)于RxJava1.x中的 Subscription,用于解除訂閱
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
Log.i('JAVA', '被觀察者向觀察者發(fā)送的數(shù)據(jù):' + s);
if (s == '-1') { // '-1' 時(shí)為異常數(shù)據(jù),解除訂閱
disposable.dispose();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
一旦 Observer 訂閱了 Observable,Observable 就會(huì)調(diào)用 Observer 的 onNext()、onCompleted()、onError() 等方法。至此一個(gè)完整的 RxJava 調(diào)用就完成了??匆幌螺敵龅腖og:
I/JAVA: 被觀察者向觀察者發(fā)送的數(shù)據(jù):Hello
I/JAVA: 被觀察者向觀察者發(fā)送的數(shù)據(jù):World
若喜歡簡潔、定制服務(wù),那么可以實(shí)現(xiàn)的方法跟上面的實(shí)現(xiàn)方法是對(duì)應(yīng)起來的,大家看參數(shù)就知道哪個(gè)對(duì)應(yīng)哪個(gè)了,你可以通過new Consumer(不需要實(shí)現(xiàn)的方法你可以不寫,看上去更簡潔),Consumer就是消費(fèi)者的意思,可以理解為消費(fèi)了 onNext 等事件:
Observable.just('Hello', 'World')
.subscribe(new ConsumerString>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i('JAVA', '被觀察者向觀察者發(fā)送的數(shù)據(jù):' + s);
}
}, new Consumer() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
}
}, new Consumer() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
}
});
RxJava中提供了大量不同種類,不同場景的Operators(操作符),RxJava的強(qiáng)大性就來自于它所定義的操作符。主要分類:
調(diào)度器 Scheduler 用于控制操作符和被觀察者事件所執(zhí)行的線程,不同的調(diào)度器對(duì)應(yīng)不同的線程。RxJava提供了5種調(diào)度器:
可以使用 subscribeOn() 和 ObserveOn() 操作符進(jìn)行線程調(diào)度,讓 Observable 在一個(gè)特定的調(diào)度器上執(zhí)行。subscribeOn() 指定 subscribe() 所發(fā)生的線程,事件產(chǎn)生的線程。ObserveOn() 指定 Observer 所運(yùn)行在的線程,事件消費(fèi)的線程。
public void onCodeClick() {
final long count = 60; // 設(shè)置60秒
Observable.interval(0, 1, TimeUnit.SECONDS)
.take(count + 1)
.map(new Function() {
@Override
public Long apply(@NonNull Long aLong) throws Exception {
return count - aLong; // 由于是倒計(jì)時(shí),需要將倒計(jì)時(shí)的數(shù)字反過來
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
button.setEnabled(false);
button.setTextColor(Color.GRAY);
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
button.setText(aLong + '秒后重發(fā)');
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
button.setEnabled(true);
button.setTextColor(Color.RED);
button.setText('發(fā)送驗(yàn)證碼');
}
});
}
RxJava 與 Retrofit 的使用,更像我們的 AsyncTask,通過網(wǎng)絡(luò)獲取數(shù)據(jù)然后通過 Handler 更新UI。首先需要導(dǎo)入依賴:
compile 'com.squareup.retrofit2:retrofit:2.2.0'
compile 'com.squareup.retrofit2:converter-gson:2.2.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'
1.Bean對(duì)象:
public class UserParam {
private String param1;
private String param2;
public UserParam(String param1, String param2) {
this.param1 = param1;
this.param2 = param2;
}
// 省略了 getter setter
}
public class NetBean {
private FormBean form;
// 省略了 getter setter
public static class FormBean {
private String username;
private String password;
// 省略了 getter setter
}
}
public class UserBean {
private String username;
private String password;
public UserBean(String username, String password) {
this.username = username;
this.password = password;
}
// 省略了 getter setter
}
2.ApiService,這里返回Observable對(duì)象,也就是我們RxJava的被觀察者
public interface ApiService {
@FormUrlEncoded
@POST('/post')
Observable getUserInfo(@Field('username')String username,
@Field('password')String password);
}
3.RxJava + Retrofit 的實(shí)現(xiàn)
// 構(gòu)建Retrofit
ApiService apiService = new Retrofit.Builder()
.baseUrl('http://httpbin.org/')
.addConverterFactory(GsonConverterFactory.create()) // RxJava2與Gson混用
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // RxJava2與Retrofit混用
.build()
.create(ApiService.class);
// 構(gòu)建RxJava
UserParam param = new UserParam('zhangsan', '123');
// 發(fā)送param參數(shù)
Observable.just(param)
// flatMap方法是用于數(shù)據(jù)格式轉(zhuǎn)換的方法,參數(shù)一表示原數(shù)據(jù),
// 參數(shù)二表示轉(zhuǎn)換的數(shù)據(jù),那么就是通過發(fā)送網(wǎng)絡(luò)參數(shù),轉(zhuǎn)換成網(wǎng)絡(luò)返回的數(shù)據(jù),調(diào)用Retrofit
.flatMap(new Function>() {
@Override
public ObservableSource apply(@NonNull UserParam userParam)
throws Exception {
// 1.發(fā)送網(wǎng)絡(luò)請求,獲取NetBean
return apiService.getUserInfo(userParam.getParam1(), userParam.getParam2());
}
})
.flatMap(new Function>() {
@Override
public ObservableSource apply(@NonNull NetBean netBean)
throws Exception {
UserBean user = new UserBean(netBean.getForm().getUsername(),
netBean.getForm().getPassword());
// 2.轉(zhuǎn)換NetBean數(shù)據(jù)為我們需要的UserBean數(shù)據(jù)
return Observable.just(user);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(@NonNull UserBean userBean) throws Exception {
Log.i('JAVA', '' + '用戶名:' + userBean.getUsername()
+ ', 密碼:' + userBean.getPassword());
}
});
這個(gè)案例其實(shí)就是用戶添加購物車的時(shí)候,首先會(huì)在本地存儲(chǔ)一份,然后發(fā)現(xiàn)如果沒有網(wǎng)絡(luò),那么沒辦法提交到服務(wù)器上,只能等下一次有網(wǎng)絡(luò)的時(shí)候采用本地?cái)?shù)據(jù)庫和服務(wù)器數(shù)據(jù)的合并來實(shí)現(xiàn)上傳到服務(wù)器。
首先需要準(zhǔn)備 Retrofit 對(duì)象和獲取本地?cái)?shù)據(jù)、網(wǎng)絡(luò)數(shù)據(jù)的方法:
private ApiService apiService;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// 省略
// 構(gòu)建Retrofit
apiService = new Retrofit.Builder()
.baseUrl('http://httpbin.org/')
.addConverterFactory(GsonConverterFactory.create()) // RxJava2與Gson混用
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // RxJava2與Retrofit混用
.build()
.create(ApiService.class);
}
/**
* 獲取本地?cái)?shù)據(jù)
*/
private Observable<>String>> getDataForLocal() {
ListString> list = new ArrayList<>();
list.add('購物車的商品1');
list.add('購物車的商品2');
return Observable.just(list);}
/**
* 獲取網(wǎng)絡(luò)數(shù)據(jù)
*/
private Observable<>String>> getDataForNet() {
return Observable.just('shopName')
// flatMap方法是用于數(shù)據(jù)格式轉(zhuǎn)換的方法,參數(shù)一表示原數(shù)據(jù),
// 參數(shù)二表示轉(zhuǎn)換的數(shù)據(jù),那么就是通過發(fā)送網(wǎng)絡(luò)參數(shù),轉(zhuǎn)換成網(wǎng)絡(luò)返回的數(shù)據(jù),調(diào)用Retrofit
.flatMap(new FunctionString, ObservableSource>() {
@Override
public ObservableSource apply(@NonNull String s) throws Exception {
// 1.發(fā)送網(wǎng)絡(luò)請求,獲取數(shù)據(jù)
return apiService.getCartList(s);
}
}).flatMap(new Function <>String>>>() {
@Override
public ObservableSource<>String>> apply(@NonNull NetBean netBean) throws Exception {
// String shop = netBean.get_$Args257().getShopName();
String shop = '購物車的商品3';
ListString> list = new ArrayList<>();
list.add(shop);
// 2.轉(zhuǎn)換NetBean數(shù)據(jù)為我們需要的List數(shù)據(jù)
return Observable.just(list);
}
}).subscribeOn(Schedulers.io());
}
然后我們就可以創(chuàng)建被觀察者并訂閱了,來完成合并本地與服務(wù)器購物車列表操作:
// merge操作符: 將兩個(gè)ObservableSource合并為一個(gè)ObservableSource
Observable.merge(getDataForLocal(), getDataForNet())
.subscribe(new Observer<>String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ListString> strings) {
for (String str: strings) { Log.i('JAVA', str); }
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.i('JAVA', 'onComplete');
}
});
最后的打印結(jié)果是:
I/JAVA: 購物車的商品1
I/JAVA: 購物車的商品2
I/JAVA: 購物車的商品3
I/JAVA: onComplete
當(dāng)我們在 EditText 打字時(shí)搜索的時(shí)候,可能用戶會(huì)打字很會(huì)快,那么我們就沒有必要一直發(fā)送網(wǎng)絡(luò)請求,請求搜索結(jié)果,我們可以通過當(dāng)用戶打字停止后的延時(shí)500毫秒再發(fā)送搜索請求:
// RxTextView.textChanges(edittext): Rxbinding用法
RxTextView.textChanges(editText)
// 表示延時(shí)多少秒后執(zhí)行,當(dāng)你敲完字之后停下來的半秒就會(huì)執(zhí)行下面語句
.debounce(500, TimeUnit.MILLISECONDS)
// 數(shù)據(jù)轉(zhuǎn)換 flatMap: 當(dāng)同時(shí)多個(gè)數(shù)據(jù)請求訪問的時(shí)候,前面的網(wǎng)絡(luò)數(shù)據(jù)會(huì)覆蓋后面的網(wǎng)絡(luò)數(shù)據(jù)
// 數(shù)據(jù)轉(zhuǎn)換 switchMap: 當(dāng)同時(shí)多個(gè)網(wǎng)絡(luò)請求訪問的時(shí)候,會(huì)以最后一個(gè)發(fā)送請求為準(zhǔn),前面網(wǎng)絡(luò)數(shù)據(jù)會(huì)被最后一個(gè)覆蓋
.switchMap(new Function <>String>>>() {
@Override
public ObservableSource<>String>> apply(
@NonNull CharSequence charSequence) throws Exception {
// 網(wǎng)絡(luò)請求操作,獲取我們需要的數(shù)據(jù)
ListString> list = new ArrayListString>();
list.add('2017');
list.add('2018');
return Observable.just(list);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<>String>>() {
@Override
public void accept(@NonNull ListString> strings) throws Exception {
// 更新UI
Log.i('JAVA', strings.toString());
}
});
當(dāng)用戶一直點(diǎn)擊一個(gè)按鈕的時(shí)候,我們不應(yīng)該一直調(diào)用訪問網(wǎng)絡(luò)請求,而是 1秒內(nèi),只執(zhí)行一次網(wǎng)絡(luò)請求。
RxView.clicks(button).throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new ObserverObject>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.i('JAVA', 'onClick');
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
舉個(gè)例子,對(duì)于前面常用操作符 interval 做周期性操作的例子,并沒有使之停下來的,沒有去控制訂閱的生命周期,這樣,就有可能引發(fā)內(nèi)存泄漏。所以,在 Activity 的 onDestroy() 方法執(zhí)行的時(shí)候或者不需要繼續(xù)執(zhí)行的時(shí)候應(yīng)該解除訂閱。
聯(lián)系客服