又一年要過(guò)去了,回顧2017,rxjs始終是我在項(xiàng)目里使用最頻繁的庫(kù),在我看來(lái),它是一個(gè)非常優(yōu)秀的數(shù)據(jù)處理工具。年初的時(shí)候就計(jì)劃寫點(diǎn)什么,礙于目前公司的項(xiàng)目實(shí)在抽不出時(shí)間,這一拖就到了年底。
臨近新年,總算忙里偷閑,但又不知道從何寫起,于是乎偷了個(gè)懶拿起了官方文檔開(kāi)始翻譯。以下的文字僅供各位參考,還是強(qiáng)烈建議去看官方文檔。
rxjs是一個(gè)使用觀察者模式來(lái)整合異步操作和事件系統(tǒng)的js庫(kù),通過(guò)一系列可觀測(cè)的流(observable)將它們串聯(lián)起來(lái)。Observable是這個(gè)庫(kù)的核心類型,此外還包括諸如Observer,Schedulers,Subjects等類型。還包括一些和數(shù)組方法類似或經(jīng)過(guò)演化的操作符,用來(lái)協(xié)助處理數(shù)據(jù)。
響應(yīng)式設(shè)計(jì)結(jié)合了觀察者模式,迭代模式和基于集合的函數(shù)式編程風(fēng)格,從而提供了一種處理事件序列的理想方式。
在rxjs中用來(lái)處理異步事件的核心概念包括:
在javascript中通常使用以下方式來(lái)注冊(cè)事件:
var button = document.querySelector('button');button.addEventListener('click', () => console.log('Clicked'));
使用rxjs的方式實(shí)現(xiàn)如下:
var button = document.querySelector('button');Rx.Observable.fromEvent(button, 'click') .subscribe(() => console.log('Clicked'));
使用不純的函數(shù)時(shí),函數(shù)可能會(huì)改變外部數(shù)據(jù)的狀態(tài),比如:
var count = 0;var button = document.querySelector('button');button.addEventListener('click', () => console.log(`Clicked ${++count} times`));
當(dāng)使用rxjs時(shí),必須把這些狀態(tài)隔離出去,比如:
var button = document.querySelect('button');Rx.Observable.fromEvent(button, 'click') .scan(count => count + 1, 0) .subscribe(count => console.log(`Clicked ${count} times`));
這里的scan操作符的行為和數(shù)組的reduce方法的行為非常類似,它提供一個(gè)初始值給迭代函數(shù),迭代函數(shù)運(yùn)行后的值將作為下一次迭代的初始值。
rxjs提供了一套非常豐富的操作符來(lái)幫助用戶控制數(shù)據(jù)或事件如何在observable中進(jìn)行流動(dòng)。假如我們需要控制用戶在一個(gè)按鈕上每秒最多只能點(diǎn)擊一次
使用普通的javascript代碼:
var count = 0;var rate = 1000;var lastClick = Date.now() - rate;var button = document.querySelector('button');button.addEventListener('click', () => { if(Date.now() - lastClick >= rate) { console.log(`Clicked ${++count} timers`); lastClick = Date.now(); }});
使用rxjs:
var button = button.querySelector('button');Rx.Observable.fromEvent(button, 'click') .throttleTime(1000) .scan(count => count + 1, 0) .subscribe(count => console.log(`Click ${count} times`));
諸如此類對(duì)流進(jìn)行控制的操作符還有:filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged等等。
你可以對(duì)流中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,當(dāng)我們需要得到每一次鼠標(biāo)點(diǎn)擊時(shí)的x軸坐標(biāo)時(shí),
使用普通的javascript代碼:
var count = 0;var rate = 1000;var lastClick = Date.now() - rate;var button = document.querySelector('button');button.addEventListener('click', (event) => { if(Date.now() - lastClick >= rate) { count += event.clientX; console.log(count); lastClick = Date.now(); }});
使用rxjs:
var button = button.querySelector('button');Rx.Observable.fromEvent(button, 'click') .throttleTime(1000) .map(event => event.ClientX) .scan((count,clientX) => count + clientX, 0) .subscribe(count => console.log(count));
諸如此類可以產(chǎn)生新值的操作符還有:pluck,pairwise,sample等等。
可觀測(cè)序列可以推送多個(gè)值,并且它的推送方式是‘懶’的,它滿足下面表格中列出的條件
| single | multiple |
---|---|---|
pull | function | iterator |
push | promise | observable |
下面這個(gè)例子中的observable在被訂閱時(shí)可以立即推送出1,2,3這個(gè)三個(gè)值,1秒以后推送第四的值4,然后立即結(jié)束。
var observable = Rx.Observable.create(function(observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); },1000);});
為了獲取到這個(gè)observable上的值, 我們需要對(duì)它進(jìn)行訂閱:
var observable = Rx.Observable.create(function(observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); },1000);});console.log('just before subscribe');observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'),});console.log('just after subscribe);
這段代碼執(zhí)行后將會(huì)得到如下結(jié)果:
just before subscribegot value 1got value 2got value 3just after subscribegot value 4done
這里使用 pull 和 push 來(lái)描述值的生產(chǎn)者和消費(fèi)者之間是如何發(fā)生聯(lián)系的,它們是兩種完全不同的協(xié)議。
在pull的系統(tǒng)中,值的消費(fèi)決定什么時(shí)間從生產(chǎn)者上獲取數(shù)據(jù),生產(chǎn)者本身并不關(guān)心數(shù)據(jù)什么時(shí)間分發(fā)給消費(fèi)者。
每一個(gè)javascript函數(shù)都可以看作一個(gè) pull 類型的系統(tǒng)。函數(shù)可以產(chǎn)生值,但這是通過(guò)調(diào)用者主動(dòng)調(diào)用函數(shù),函數(shù)運(yùn)行產(chǎn)生出值返回給調(diào)用者的方式進(jìn)行的,所以可以理解為調(diào)用者主動(dòng)去函數(shù)上拉取了值。
ES2015中介紹另外兩種 pull 類型的系統(tǒng),generator函數(shù) 和 iterator。對(duì)于它們來(lái)講,遍歷器對(duì)象的 next 方法可以視作值的消費(fèi)者,通過(guò)iterator.next()可以獲取到多個(gè)值。
| Producer | Consumer |
---|---|---|
pull | 被動(dòng):當(dāng)被調(diào)用時(shí)產(chǎn)生值 | 主動(dòng):決定何時(shí)發(fā)起調(diào)用 |
push | 主動(dòng):按自身的設(shè)置產(chǎn)生值 | 被動(dòng):響應(yīng)接收到的值 |
在push的系統(tǒng)中,生產(chǎn)者決定什么時(shí)候發(fā)送值給消費(fèi)者,消費(fèi)者并知道什么時(shí)候可以接收到值。
ES6中的 promise 就是一個(gè)非常典型的 push 系統(tǒng),promise 將 resolve 的結(jié)果傳遞給注冊(cè)在它內(nèi)部的回調(diào)函數(shù),這與普通的函數(shù)有很大的不同,回調(diào)函數(shù)何時(shí)可以接收到數(shù)據(jù)完全取決于 promise 什么時(shí)間向它傳遞數(shù)據(jù)。
rxjs的 Observable 也是一種 push 類型的系統(tǒng),一個(gè) Observable 可以產(chǎn)生多個(gè)值,然后推送給它的訂閱者。
很多人認(rèn)為Observable就是一個(gè)事件發(fā)射器或可以產(chǎn)生多個(gè)值的promise,實(shí)際上這種觀點(diǎn)是不正確的。在某些情況下Observable的行為可能與事件發(fā)射器的行為類似,例如使用Subject來(lái)發(fā)射值時(shí),但通常情況下它們的行為與事件發(fā)射器有很大的不同。
請(qǐng)思考以下代碼:
function foo() { console.log('hello'); return 42;}var x = foo.call();console.log(x);var y = foo.call();console.log(y);
運(yùn)行后的輸出:
"hello"42"hello"42
使用rxjs的方式:
var foo = Rx.Observable.create(function(observer) { console.log('hello'); observable.next(42);});foo.subscribe(function (x) { console.log(x);});foo.subscribe(function (y) { console.log(y);});
運(yùn)行后的輸出依然是:
"hello"42"hello"42
首先,函數(shù)和Observable的求值過(guò)程都是‘懶’的,如果你不調(diào)用foo函數(shù),console.log('hello')將不會(huì)執(zhí)行,這對(duì)于Observable也是一樣的,只要不訂閱,console.log('hello')同樣也不會(huì)執(zhí)行。另外,每一次的訂閱(subscribe)和調(diào)用(call)都是互不干擾的,兩次函數(shù)調(diào)用會(huì)觸發(fā)兩次執(zhí)行過(guò)程,產(chǎn)生兩次副作用,同樣,兩次訂閱也會(huì)觸發(fā)兩次訂閱過(guò)程。對(duì)于事件發(fā)射器來(lái)說(shuō)卻不同,它產(chǎn)生的副作用會(huì)傳遞給每一個(gè)事件接收者,它并不理會(huì)有沒(méi)有接收者接收事件,都會(huì)把事件發(fā)射出去,這與Observable有很大的不同。
你可能會(huì)認(rèn)為Observable都是異步的,事實(shí)上并不是這樣,我們看一下函數(shù)的調(diào)用過(guò)程:
console.log('before');console.log('foo.call()');console.log('after');
執(zhí)行后的輸出:
"before""hello"42"after"
然后以同樣的順序,但是使用Observable:
console.log('before');foo.subscribe(function(x) { console.log(x);});console.log('after');
你會(huì)發(fā)現(xiàn)結(jié)果是相同的:
"before""hello"42"after"
這個(gè)結(jié)果證明了foo這個(gè)Observable完全是同步執(zhí)行的,和調(diào)用一個(gè)函數(shù)沒(méi)有什么兩樣。
那么Observable與函數(shù)之間有什么樣的區(qū)別?Observable被訂閱后可以返回多個(gè)值,這是普通的函數(shù)所無(wú)法做到的。例如,你無(wú)法像下面這樣去實(shí)現(xiàn)一個(gè)函數(shù):
function foo() { console.log('hello'); return 42; return 100; // 這句永遠(yuǎn)不會(huì)執(zhí)行}
因?yàn)楹瘮?shù)只能返回一個(gè)值,return 100; 會(huì)被解釋器忽略,如果是在typescript中,編譯器會(huì)告訴你這里有一個(gè)錯(cuò)誤。然而這對(duì)于Observable來(lái)說(shuō)卻不是問(wèn)題:
var foo = Rx.Observable.create(function (observer) { console.log('hello'); observer.next(42); observer.next(100); // '返回‘另外一個(gè)值 observer.next(200); // 同上});console.log('before');foo.subscribe(function(x) { console.log(x);});console.log('after');
上面的代碼執(zhí)行后會(huì)以同步的方式輸出:
"before""Hello"10042200"after"
當(dāng)然, 你也可以讓它以異步的方式來(lái)返回值:
var foo = Rx.Observable.create(function (observer) { console.log('hello'); observer.next(42); observer.next(100); // '返回‘另外一個(gè)值 observer.next(200); // 同上 setTimeout(() => { observer.next(300); },1000);});console.log('before');foo.subscribe(function(x) { console.log(x);});console.log('after');
執(zhí)行后的輸出:
"before""Hello"10042200"after"300
總結(jié):
我們可以使用Rx.Observable.create方法或者那些可以創(chuàng)建 Observable 的操作符來(lái)創(chuàng)建 Observable,然后使用一個(gè)觀察者來(lái)觀察 Observable,當(dāng) Observable 被訂閱時(shí)它就會(huì)通過(guò) next/error/complete/ 方法來(lái)通知觀察者,當(dāng)觀察者放棄觀察行為時(shí)如何處理 Observable 運(yùn)行產(chǎn)生的數(shù)據(jù)。這四個(gè)動(dòng)作都被編碼在一個(gè)可觀察的實(shí)例中,但其中一些動(dòng)作與其它動(dòng)作是相關(guān)的,如觀察和訂閱。
使用Observable時(shí)要關(guān)注的核心問(wèn)題:
Rx.Observable.create方法實(shí)際是 Observable 類的構(gòu)造函數(shù)的別名,它接受一個(gè)參數(shù)——訂閱函數(shù)。下面這個(gè)示例會(huì)創(chuàng)建一個(gè)observable,它會(huì)每秒發(fā)一個(gè) string 類型的值‘hi’ 給它的觀察者。
var observable = Rx.Observable.create(function subscribe(observer) { var id = setInterval(() => { observer.next('hi'); },1000);});
上面代碼中的 subscribe 函數(shù)對(duì)于解釋 Observable 來(lái)說(shuō)是非常重要的一部分,接下來(lái)解釋如何去理解它。
上面例子中的observable可以這樣被訂閱:
observable.subscribe(x => console.log(x));
observable變量上有 subscribe 方法,同樣 Observable.create(function subscribe(observer) { ... }) 也有subscribe 方法,這并不是一個(gè)巧合。在庫(kù)的實(shí)現(xiàn)中,這兩個(gè)subscribe是不一樣的,但在實(shí)際使用中你可以認(rèn)為它們?cè)诟拍钌鲜窍嗟鹊摹?/p>
這個(gè)示例很好的展示了為什么訂閱行為在多個(gè)訂閱者之間是不會(huì)共享的。當(dāng)我們使用一個(gè)觀察者來(lái)訂閱 observable 時(shí),傳入 create 方法的 subscribe 函數(shù)就會(huì)為這個(gè)觀察者運(yùn)行一次。每次調(diào)用 observable.subscribe 都會(huì)為給定的觀察者觸發(fā)它自己的獨(dú)立設(shè)置。
這個(gè)過(guò)程與諸如 addEventListener/removeEventListener 等事件處理API有很大的區(qū)別。在 observable.subscribe 的過(guò)程中,給定的觀察者并沒(méi)有注冊(cè)成為 Observable 上的監(jiān)聽(tīng)者。Observable 甚至不需要維護(hù)一個(gè)觀察者名單。
通過(guò) subscribe 這種簡(jiǎn)單的方式就可以使一個(gè) Observable 開(kāi)始執(zhí)行,并且將產(chǎn)生的數(shù)據(jù)或事件傳遞給當(dāng)前 Observable 環(huán)境上的觀察者。
Observable.create(function subscribe(observer) { ... })內(nèi)部的代碼代表了 Observable 的執(zhí)行,一個(gè)‘懶’執(zhí)行過(guò)程——僅在有觀察者訂閱它的時(shí)候執(zhí)行,當(dāng)它被執(zhí)行后就可以以同步或異步的方式不斷的生產(chǎn)值。
Observable 執(zhí)行后可以產(chǎn)生三種類型的值:
Next類型的通知是最重要也是最常用的一種,它代表了將要傳遞給觀察者的數(shù)據(jù)。Error 和 Complete 類型的通知在整個(gè)Observable的執(zhí)行過(guò)程中只可以發(fā)生一次,并且它們是互斥的,只有一個(gè)可以發(fā)生。
以正則表達(dá)式來(lái)表述三者之間的關(guān)系的話,應(yīng)該是:
next*(error|complete)?
下面的代碼展示了一個(gè)可以產(chǎn)生三個(gè) Next 類型通知的 Observable,最后發(fā)送一個(gè) Complete 通知:
var observable = Rx.Observable.create(function subscribe(observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete();});
Observable 將嚴(yán)格遵守上述規(guī)則,所以試圖在Observable完成后繼續(xù)傳遞值的行為是無(wú)效的。
var observable = Rx.Observable.create(function subscribe(observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete(); observer.next(4); // 不會(huì)傳遞成功});
如果Observable在執(zhí)行過(guò)程中發(fā)生異常,我們可以在subscribe函數(shù)內(nèi)部使用 try/catch 塊來(lái)捕獲它:
var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1); observer.next(2); observer.next(3); observer.complete(); } catch(error) { observer.error(error); // 傳遞錯(cuò)誤 }});
由于 Observable 可能產(chǎn)生出無(wú)限多個(gè)值,但觀察者可能在某時(shí)間點(diǎn)想停止觀察行為,這就需要有一種方法來(lái)取消 Observable 的執(zhí)行。由于每個(gè)執(zhí)行只對(duì)應(yīng)一個(gè)觀察者,一旦觀察者完成接收值,它就需要知道如何停止執(zhí)行,以避免浪費(fèi)計(jì)算或內(nèi)存資源。
當(dāng)我們調(diào)用 observable.subscribe 時(shí),觀察者會(huì)被附加在新創(chuàng)建的可觀察上下文中,同時(shí)這次調(diào)用會(huì)返回一個(gè)對(duì)象,它就是 Subscription 。
var subscription = observable.subscribe(x => console.log(x));
Subscription 代表了正在執(zhí)行的上下文,它擁有一個(gè)可以取消 Observable 執(zhí)行的方法——unsubscribe。調(diào)用這個(gè)方法 subscription.unsubscribe() 就可以取消執(zhí)行。
var observable = Rx.Observable.from([10,20,30]);var subscription = observable.subscribe(x => console.log(x));// 一段時(shí)間以后subscription.unsubscribe();
當(dāng)我們使用 Observable.create 方法來(lái)創(chuàng)建 Observable 時(shí),必須定義當(dāng)取消訂閱時(shí)如何去釋放執(zhí)行時(shí)的資源,此時(shí)可以返回一個(gè)自定義的 unsubscribe 函數(shù)。
我們可以給之前的例子添加上它的 unsubscribe 方法:
var observable Rx.Observable.create(function subscribe(observer) { var intervalID = setInterval(() => { observer.next('hi'); },1000); return function unsubscribe() { clearInterval(intervalID); }});
與 observable.unsubscribe 一樣,我們可通過(guò)調(diào)用 unsubscribe 方法對(duì)這個(gè)流取消訂閱,這兩個(gè) unsubscribe 在概念上是完全相同的。
事實(shí)上在這個(gè)例子中,假如我們把響應(yīng)式的外殼剝離的話,它完全就是一個(gè)普普通通的javascript函數(shù):
function subscribe(observer) { var intervalID = setInterval(() => { observer.next('hi'); },1000); return function unsubscribe() { clearInterval(intervalID); }}var unsubscribe = subscribe({next: (x) => console.log(x)});
盡管如此,我們依然有理由去使用Rx這種響應(yīng)式的編程,通過(guò) Observable,Observer,Subscription,配合各種操作符來(lái)實(shí)現(xiàn)更高效安全的數(shù)據(jù)處理。
觀察者實(shí)際就是可觀察序列——Observable的消費(fèi)者。Observer 會(huì)設(shè)置一些回調(diào)函數(shù)來(lái)對(duì)應(yīng) Observable 的各個(gè)通知類型(next/error/complete),從這些通知中接收值,然后對(duì)它們進(jìn)行處理。
這是一個(gè)很典型的 Observer:
var observer = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification')};
我們可以使用它來(lái)觀察一個(gè) Observable:
observable.subscribe(observer);
Rxjs中的觀察者允許只實(shí)現(xiàn)這三個(gè)方法中的某幾個(gè),這對(duì) Observable 的執(zhí)行過(guò)程不會(huì)產(chǎn)生影響,僅僅是被忽略方法的對(duì)應(yīng)通知無(wú)法得到處理,因?yàn)橛^察者想要處理相應(yīng)通知上的數(shù)據(jù)時(shí)必須實(shí)現(xiàn)對(duì)應(yīng)的方法。
下面這個(gè)示例的 Observer 就沒(méi)有實(shí)現(xiàn)處理 complete 方法:
var observer = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err),};
當(dāng)需要訂閱一個(gè) Observable 時(shí),可以只傳入一個(gè)回調(diào)函數(shù)作為參數(shù),而不必傳入觀察者對(duì)象:
observable.subscribe(x => console.log('Observer got a next value: ' + x));
實(shí)際上在庫(kù)的內(nèi)部實(shí)現(xiàn)中會(huì)根據(jù)傳入的參數(shù)為我們創(chuàng)建出一個(gè) Observer 對(duì)象,此時(shí),傳入的第一個(gè)函數(shù)作為next方法的回調(diào),第二個(gè)是error回調(diào),第三個(gè)是complete回調(diào)。也就是說(shuō)我們可以這樣去訂閱一個(gè) Observable :
observable.subscribe( x => console.log('Observer got a next value: ' + x), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification'));
Subscription是一個(gè)代表著當(dāng)前正在執(zhí)行的 Observable 的對(duì)象。它有一個(gè)非常重要的方法 unsubscribe ,這個(gè)方法不接受任何參數(shù),僅僅是用來(lái)釋放 Observable 執(zhí)行時(shí)的資源。在舊版本的Rxjs中,它也叫做 Disposable 。
var observable = Rx.Observable.interval(1000);var subscription = observable.subscribe(x => console.log(x));// 執(zhí)行一段時(shí)間后// 下面的操作將會(huì)取消上面已經(jīng)訂閱的 Observable 的執(zhí)行。subscription.unsubscribe();
Subscription 可以被組合在一起,這樣只要調(diào)用一次 unsubscribe 方法就可以將多個(gè) Observable 的執(zhí)行取消掉。我們可以通過(guò) subscription 的 add 方法將它們添加在一起。
var observable1 = Rx.Observable.interval(400);var observable2 = Rx.Observable.interval(300);var subscription = observable1.subscribe(x => console.log('first: ' + x));var childSubscription = observable2.subscribe(x => console.log('second: ' + x));subscription.add(childSubscription);setTimeout(() => { // 取消上面的兩次訂閱 subscription.unsubscribe();},1000);
執(zhí)行后將輸出:
second:0first:0second:1first:1second:2
當(dāng)然,可以添加,就可以移除,Subscription同樣擁有一個(gè)remove方法,用來(lái)從一組subscription 中移除指定的子 subscription。
Subject 是 Observable 的一種特殊類型,它允許把值同時(shí)傳播給多個(gè) Observer,也就是說(shuō)它是多播的。相比之下,普通的Observable 是單播的,每一個(gè) Observer 在訂閱 Observable 時(shí)都有其獨(dú)立的執(zhí)行上下文。
每一個(gè) Subject 都是一個(gè) Observable。你可以提供一個(gè)訂閱者來(lái)訂閱 Subject,從而在它上面取值。從觀察者的角度來(lái)看,并不能區(qū)分出數(shù)據(jù)是從 Subject 上獲取的還是從 Observable 上獲取的。
在 Subject 的內(nèi)部實(shí)現(xiàn)中,并不會(huì)產(chǎn)生新的執(zhí)行上下文來(lái)傳遞數(shù)據(jù),它僅僅是簡(jiǎn)單的將 Observer 注冊(cè)在自己的監(jiān)聽(tīng)者列表中,這與其它的庫(kù)或語(yǔ)言中添加事件的機(jī)制是類似的。
每一個(gè) Subject 都是一個(gè) Observer。每一個(gè) Subject 上都有自己的 next,error,complete方法。當(dāng)需要給 Subject 上添加一個(gè)值時(shí),只要調(diào)用它的next方法,接下來(lái)它就會(huì)把這個(gè)值廣播給注冊(cè)在監(jiān)聽(tīng)者列表中的多個(gè)監(jiān)聽(tīng)者。
下面的例子中,我們給 Subject 添加了兩個(gè)觀察者,然后給它添加一些值:
var subject = new Rx.Subject();subject.subscribe({ next: v => console.log('observerA: ' + v)});subject.subscribe({ next: v => console.log('observerB: ' + v)});subject.next(1);subject.next(2);
執(zhí)行后的輸出:
observerA: 1observerB: 1observerA: 2observerB: 2
由于 Subject 也是一個(gè) Observer ,所以你可以把他傳遞給一個(gè) Observable 的subscribe方法:
var subject = new Rx.Subject();subject.subscribe({ next: v => console.log('observerA: ' + v)});subject.subscribe({ next: v => console.log('observerB: ' + v)});var observable = Rx.Observable.from([1,2,3]);observable.subscribe(subject); // 通過(guò) Subject 來(lái)訂閱這條 Observable
執(zhí)行后輸出:
observerA: 1observerB: 1observerA: 2observerB: 2observerA: 3observerB: 3
通過(guò)上面的方法,我們基本上就借助 Subject 把一個(gè)單播的 Observable 轉(zhuǎn)換成了多播的。這個(gè)示例僅僅是演示了一種將一個(gè)執(zhí)行上下文分享給多個(gè)觀察者的方法。
在 Subject 類型下,還有一些比較特殊的 Subject 類型:BehaviorSubject,ReplaySubject,AsyncSubject。
一個(gè)‘多播’的 Observable 可以借助于 Subject 實(shí)現(xiàn)將數(shù)據(jù)通知給多個(gè)觀察者,然而單播的 Observable 僅僅只能把通知發(fā)送給一個(gè)觀察者。
這就是那些多播的操作符實(shí)現(xiàn)時(shí)的真實(shí)情況:觀察者訂閱底層的一個(gè) Subject,使用這個(gè) Subject 來(lái)訂閱產(chǎn)生源數(shù)據(jù)的 Observable。
下面這個(gè)例子和之前那個(gè)使用 subject 來(lái)訂閱 Observable 的例子非常類似:
var source = Rx.Observable.from([1,2,3]);var subject = new Rx.Subject();var multicasted = source.multicast(subject);
// 使用 subject.subscribe({...}) 實(shí)現(xiàn)
multicasted.subscribe({
next: v => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: v => console.log('observerB: ' + v)
});
//使用 source.subscribe(subject) 實(shí)現(xiàn)
multicasted.connect();
上面代碼中的multicast方法返回的 Observable 看起來(lái)像一個(gè)普通的 Observable,但是當(dāng)它被訂閱時(shí),執(zhí)行的方式卻和 Subject 一樣是多播的,同時(shí)這個(gè) Observable 還有一個(gè)connect方法,可以將多個(gè)流串聯(lián)在一起。
何時(shí)調(diào)用這個(gè)connect方法非常重要,因?yàn)樗鼪Q定了這條可以共享的流什么時(shí)間開(kāi)始執(zhí)行。由于connect方法在內(nèi)部執(zhí)行了 source.subscribe(subject) ,因此它會(huì)返回一個(gè) Subscription ,可以通過(guò)它來(lái)取消這個(gè)共享的 Observable 的執(zhí)行。
通過(guò)手動(dòng)調(diào)用connect方法來(lái)得到 Subscription 多少顯得有些笨重,通常情況下,我們希望當(dāng)?shù)谝粋€(gè)觀察者抵達(dá)時(shí)可以自動(dòng)的調(diào)用connect方法,當(dāng)最后一個(gè)觀察者取消訂閱時(shí)自動(dòng)的取消這個(gè)共享 Observable 的執(zhí)行。
請(qǐng)考慮實(shí)現(xiàn)如以下列表所概述的訂閱:
為了達(dá)到這個(gè)效果,我們需要顯式的調(diào)用connect方法:
var source = Rx.Observable.interval(500);var subject = new Rx.Subject();var multicasted = source.multicast(subject);var subscription1, subscription2, subscriptionConnect;subscription1 = multicasted.subscribe({ next: v => console.log('observerA: ' + v)});// 在這里我們需要顯示的調(diào)用connect方法以使第一個(gè)訂閱者可以開(kāi)始接收值subscriptionConnect = multicasted.connect();setTimeout(() => { subscription2 = multicasted.subscribe({ next: v => console.log('observerB: ' + v) });}, 600);setTimeout(() => { subscription1.unsubscribe();},1200);// 在這里我們需要把多播流的訂閱取消掉,因?yàn)閺拇艘院笤僖矝](méi)有訂閱者訂閱它了。setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe();},2000);
我們可以通過(guò)使用 ConnectableObservable 的refCount方法來(lái)避免顯式的調(diào)用connect方法,這個(gè)方法會(huì)返回一個(gè) Observable 用來(lái)追蹤當(dāng)前有多少訂閱者正在訂閱多播流。當(dāng)訂閱者的數(shù)量從0增加到1時(shí),它幫助我們調(diào)用connect方法以開(kāi)始訂閱,當(dāng)訂閱者的數(shù)量從1變?yōu)?時(shí),它幫助我們?nèi)∠嗛喴酝V苟嗖チ鞯睦^續(xù)執(zhí)行。
請(qǐng)看下面的例子:
var source = Rx.Observable.interval(500);var subject = new Rx.Subject();var refCount = source.multicast(subject).refCount();var subscription1, subscription2, subscriptionConnect;// 這次調(diào)用會(huì)執(zhí)行connect方法console.log('observerA subscribed');subscription1 = refCounted.subscribe({ next: v => console.log('observerA: ' + v)});setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: v => console.log('observerB: ' + v) })},600);setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe();}, 1200);// 這里會(huì)調(diào)用多播流的unsubscribe方法setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe();},2000);
執(zhí)行后的輸出:
observerA subscribedobserverA: 0observerB subscribedobserverA: 1observerB: 1observerA unsubscribedobserverB: 2observerB unsubscribed
refCount方法僅存在于 ConnectableObservable 上,它返回的是一個(gè)新的 Observable 而不是另一個(gè) ConnectableObservable 。
BehaviorSubject 是 Subject 的一個(gè)變種上,關(guān)于它的一個(gè)重要概念是’當(dāng)前值‘。它會(huì)把最后一次發(fā)送給訂閱者的值保存下來(lái),當(dāng)另外一個(gè)訂閱者開(kāi)始訂閱時(shí),它會(huì)把這個(gè)值立即發(fā)送給新的訂閱者。
下面的例子中,BehaviorSubject初始化了一個(gè)值0,當(dāng)?shù)谝粋€(gè)訂閱者開(kāi)始訂閱時(shí),這個(gè)值被發(fā)送出去。接下來(lái)當(dāng)?shù)诙€(gè)訂閱者開(kāi)始訂閱時(shí),將會(huì)接收到值2,即使這個(gè)值已經(jīng)被發(fā)送過(guò)了。
var subject = new Rx.BehaviorSubject(0);subject.subscribe({ next: v => console.log('observerA: ' + v)});subject.next(1);subject.next(2);subject.subscribe({ next: v => console.log('observerB: ' + v)});subject.next(3);
執(zhí)行后的輸出:
observerA: 0observerA: 1observerA: 2observerB: 2observerA: 3observerB: 3
ReplaySubject 和 BehaviorSubject 很類似,它們都可以把過(guò)去的值發(fā)送給新的訂閱者,不同的是它可以把 Observable 執(zhí)行時(shí)產(chǎn)生的一部分值記錄下來(lái)。
在創(chuàng)建 ReplaySubject 時(shí),我們可以指定需要重復(fù)發(fā)送的值的數(shù)量:
var subject = new Rx.ReplaySubject(3); // 傳遞3個(gè)值給新加入的訂閱者subject.subscribe({ next: (v) => console.log('observerA: ' + v)});subject.next(1);subject.next(2);subject.next(3);subject.next(4);subject.subscribe({ next: (v) => console.log('observerB: ' + v)});subject.next(5);
執(zhí)行后的輸出:
observerA: 1observerA: 2observerA: 3observerA: 4observerB: 2observerB: 3observerB: 4observerA: 5observerB: 5
還可以指定一個(gè)以毫秒為單位的時(shí)間,配合記錄的數(shù)量共同決定究竟有個(gè)多少個(gè)值需要被重復(fù)發(fā)送。
下面的這個(gè)例子,使用了一個(gè)比較大的數(shù)量,但是時(shí)間上只限制了只發(fā)送500毫秒內(nèi)的值:
var subject = new ReplaySubject(100, 500/* windowTime */);subject.subscribe({ next: v => console.log('observerA: ' + v)});var i = 1;setInterval(() => subject.next(i++), 200);setTimeout(() => { subject.subscribe({ next: v => console.log('observerB: ' + v) });},1000);
第二個(gè)訂閱者只能接收到最近的500毫秒內(nèi)發(fā)出的值,下面是執(zhí)行后的輸出,
observerA: 1observerA: 2observerA: 3observerA: 4observerA: 5observerB: 3observerB: 4observerB: 5observerA: 6observerB: 6...
它也是 Subject 的一個(gè)變種,AsyncSubject僅在流執(zhí)行結(jié)束后把最后一個(gè)值發(fā)送給它的訂閱者。
var subject = new Rx.AsyncSubject();subject.subscribe({ next: v => console.log('observerA: ' + v)});subject.next(1);subject.next(2);subject.next(3);subject.next(4);subject.subscribe({ next: v => console.log('observerB: ' + v);});subject.next(5);subject.complete();
執(zhí)行后的輸出:
observerA: 5observerB: 5
AsyncSubject 的行為與last操作符的行為非常相似,都是在等待完成后再發(fā)送一個(gè)值。你應(yīng)該還記得之前提到的當(dāng) Observable 發(fā)出Complete通知或Error通知后就不能再發(fā)送值,AsyncSubject看起來(lái)違背了這個(gè)原則,其實(shí)不然,我們可以看一下它的源碼:
constructor() { super(...arguments); this.value = null; this.hasNext = false; this.hasCompleted = false;}complete() { this.hasCompleted = true; if (this.hasNext) { super.next(this.value); } super.complete();}
這里只摘錄出了AsyncSubject的構(gòu)造函數(shù)和它的complete方法,首先AsyncSubject是繼承自Subject的,所以這里的super類就是Subject,那么就很明顯了,在AsyncSubject實(shí)例上調(diào)用complete方法時(shí)并沒(méi)有違背之前提到的原則,依然是先發(fā)出了Next通知,最后才發(fā)出Complete通知。
雖然 Observable 是構(gòu)建rxjs的基石,但是真正精彩的部分應(yīng)該是操作符。操作符的組合使用使得以聲明式的方式解決復(fù)雜的異步場(chǎng)景變得非常簡(jiǎn)單,它構(gòu)成rxjs必不可少的一部分。
操作符是 Observable 類上的方法,例如: .map(...),.filter(...),.merge(...)等。當(dāng)我們調(diào)用它們時(shí),并不會(huì)去改變已有 Observable 實(shí)例,取而代之的是創(chuàng)建一個(gè)全新的 Observable ,它是訂閱邏輯是基于第一個(gè) Observable 的。
理解操作符時(shí)非常重要的一點(diǎn)是時(shí)刻謹(jǐn)記它是一個(gè)純函數(shù),它接受一個(gè) Observable 作為輸入然后生成一個(gè)新的 Observable 作為輸出。當(dāng)訂閱輸出 Observable 的同時(shí)也就訂閱了輸入 Observable 。
下面的例子中,我們自定義了一個(gè)操作函數(shù),它將輸入的每一個(gè)值都乘以10:
function multiplyByTen(input) { var output = Rx.Observable.create(function subscribe(observer) { input.subscribe({ next: v => observer.next(10 * v), error: err => observer.error(error), complete: () => observer.complete() }); }); return output;}var input = Rx.Observable.from([1,2,3,4]);var output = multiplyByTen(input);output.subscribe(x => console.log(x));
執(zhí)行后的輸出:
10203040
注意:我們只是訂閱了輸出-——output,但這同時(shí)導(dǎo)致input被訂閱,這稱之為’操作符的鏈?zhǔn)接嗛啞?/p>
通常在提到操作符時(shí),我們指的是 Observable 實(shí)例上的操作符。假如上面例子中我們定義的操作函數(shù) multiplyByTen 是一個(gè)操作符的話,它看起來(lái)應(yīng)該是這樣:
Rx.Observable.prototype.multiplyByTen = function multiplyByTen() { var input = this; return Rx.Observable.create(function subscribe(observer) { input.subscribe({ next: v => observer.next(10 * v), error: err => observer.error(err), complete: () => observer.complete() }); });}
在這里輸入的 Observable 不再通過(guò) multiplyByTen 的參數(shù)獲得,而是通過(guò)this關(guān)鍵字來(lái)獲取,這就允許我們可以像下面這樣鏈?zhǔn)降恼{(diào)用:
var observable = Rx.Observable.from([1,2,3,4]).multiplyByTen();observable.subscribe(x = console.log(x));
除了實(shí)例操作符,還有靜態(tài)操作符,它們是直接定義在 Observable 類上的靜態(tài)方法。靜態(tài)操作符不會(huì)使用this關(guān)鍵字來(lái)推斷輸入,它的輸入完全依賴于輸入的參數(shù)。
常見(jiàn)的靜態(tài)操作符都是一些可創(chuàng)建類型的操作符,與通常情況下操作符接收輸入流,輸出輸出流不同,它們大多接收一些非 Observable 類型的參數(shù),例如一個(gè)數(shù)字,然后創(chuàng)建出一條流。
var observable = Rx.Observable.interval(1000 /* 毫秒數(shù) */);
另外的例子就是create,我們?cè)谥暗睦又幸呀?jīng)多次使用過(guò)了。
然后,靜態(tài)操作符并非只是那些簡(jiǎn)單創(chuàng)建流的操作符,一些組合類的操作符也可以有它的靜態(tài)類型,例如:merge,combineLatest,concat等。這樣做的意義在于我們接收多個(gè)流作為輸入而不僅僅是一個(gè)。
var observable1 = Rx.Observable.interval(1000);var observable2 = Rx.Observable.interval(400);var merge = Rx.Observable.merge(observable1, observable2);
單純靠文字可能還不足解釋的清楚操作符是如何工作的,許多操作符是與時(shí)間相關(guān)的,它們可能以不同的方式來(lái)影響值的發(fā)射,比如:delay,throttle,sample等。對(duì)于這些操作符來(lái)說(shuō)圖表的形式會(huì)更加直觀。彈珠圖可以模擬出操作符是如何工作的,可以直觀的表現(xiàn)出輸入流,操作符,參數(shù)與輸出流之間的聯(lián)系。
下面來(lái)詳細(xì)解釋彈珠圖各部分的含義:
// 這條從左到右的橫線代表隨時(shí)間的推移,輸入流的執(zhí)行過(guò)程。// 橫線上的值代表從流上發(fā)射出的值// 橫線尾部的豎線代表complete通知執(zhí)行的時(shí)間點(diǎn),表示這條流已經(jīng)成功的執(zhí)行完成。----------4------6--------------a-------8-------------|----> multipleByTen // 使用的操作符// 這條從左到右的橫線代表經(jīng)過(guò)操作符轉(zhuǎn)換后的輸出流。// 橫線尾部的X代表在這個(gè)時(shí)間點(diǎn)上流發(fā)生了錯(cuò)誤,至此之后不應(yīng)該再有 Next 通知或 Complete 通知從流上發(fā)出。---------40-----60--------------X--------------------------->
在整個(gè)文檔中,我們都使用這種彈珠圖來(lái)解釋操作符是如何工作的。這種彈珠圖在其它場(chǎng)景中也是非常有用的,比如在白板演示或者單元測(cè)試中。
如果你明確自己需要解決的問(wèn)題,但是不清楚應(yīng)該使用哪一個(gè)操作符,可以在官網(wǎng)的 Manual > Operators > Choose an operator 中通過(guò)選擇符合問(wèn)題的描述來(lái)找到你所需要的操作符。
為了解決不同的問(wèn)題,rxjs針對(duì)性的設(shè)計(jì)了各種各樣的操作符,大體上可以分為幾類:創(chuàng)建型、轉(zhuǎn)換型、過(guò)濾型、組合型、多播型、錯(cuò)誤處理型、工具類等等。
關(guān)于操作符,將會(huì)在專門的文檔中進(jìn)行解釋。
Scheduler決定了subscription什么時(shí)候開(kāi)始以及什么時(shí)候開(kāi)始分發(fā)值。它由3個(gè)部分組成:
可以傳入scheduler的操作符:
聯(lián)系客服