[note] RxJS 筆記
本篇內容大量整理自 RxJS 7 and Observables: Introduction @ Udemy。另外,由於 Operators 的種類眾多,建議可以搭配 Learn RxJS 和 RxJS Guide 學習。
TL;DR
Observable 會發送資料給 Observer,但它只管傳遞資料,並不會去管 Subscribers 拿資料去做什麼。
import { Observable, Observer } from 'rxjs';
const observable$ = new Observable()<string>((subscriber) => {
// Next
subscriber.next('foo');
// Error
subscriber.error(new Error('some error occurred'));
// Complete
subscriber.complete();
// Teardown
return () => {
console.log('observable teardown');
};
});
const observer: Observer<string> = {
next: (value: string) => console.log(value),
complete: () => console.log('complete'),
error: (err: Error) => console.log(err.message),
};
const subscription = observable$.subscribe(observer);
// 如果只需要 next handler 可以在參數的地方直接帶入 function 而非帶入物件
// const subscriptionWithOnlyNextHandler = observable$.subscribe((value: string) =>
// console.log(value),
// );
// 如果對於 Observable 提供的值不在感興趣,要 unsubscribe
subscription.unsubscribe();
常用的 Transformation Operator:
concatMap
:一次一個,依序執行,前一個完成後下一個才會接著執行。是最安全的方式,可以避免 memory leak,前一次的 observable 被取消,或沒有依照 request 的順序處理 response,但也因此可能比較沒效率。switchMap
:當 inner observable 有新的 emit value 是,會取消(cancel)前一次的 inner subscription,也就是說,一次只會有一個最新的 inner subscription 在進行,總是會拿到最新的順序。mergeMap
:以併發的方式執行多個 inner subscription,且不會確保 response 被處理的順序,先 complete 的 inner subscription 會先往後 notify。
重要概念
Observables
Array vs Stream
- array 在一開始就會被定義好裡面有哪些資料,開發者對於 array 中有哪些元素存在可以一目瞭然
- stream 中的項目(items)則可能會在不同的時間點產生,stream 有時間的概念在內,開發者不知道下一個時間點會得到什麼資料,但可以先寫好當某個項目出現是對應要執行的程式邏輯,這也是 reactive programming 的概念
- 在 JavaScript 中的許多事件都可以用 stream 的角度來思考,例如使用者輸入資料時,我們不知道下個時間點會得到什麼資料,但可以先把當資料進來後的邏輯寫好;HTTP Response 的事件等等也是類似的概念。
Observable, Subscription and Observer
- Observable:本身不會做任何事,它只是保存了許多邏輯
- Notification 包含
next
:訂閱期間收到 value 都會被觸發,沒有觸發的次數限制error
:表示 Observable 有 error 發生且不會再 emit 任何 value,因此觸發一次後即結束訂閱,可以從 payload 取得 error 的資料complete
:表示不會再 emit 任何 value,因此觸發一次後即結束訂閱,沒有可以帶資料的 payload
- Observable 中所定義的 callback 會當這個 Observable 被
subscribe()
時才會執行
- Notification 包含
- Observer:定義收到 Observable 傳來的 value 時所要執行的行為
- Subscription:透過
subscribe
的方式,讓 Observable 中 emitted value 被 Observer 所執行,如此才會實際執行 observable 中的 callback function,並且讓 observer 可以接收到 observable 所通知的值
整個執行的流程會像這樣:
- 當 Observable 被訂閱(subscribe)時,會建立起一個 Subscription,並執行 Observable 中的 callback,
- 需要留意的是,每次 Observable 中的
subscriber.next()
被呼叫時,Observer 中就會立即收到 emitted 的 value 並加以執行,而不是等到所有 Observable 中的內容執行完後才開始執行 Observer 的內容。
可以把 observer 中定義的 function 想成會在 observable 中 subscriber 的位置執行。
如果我們在裡面放一些非同步的邏輯:
import { Observable } from 'rxjs';
const observable$ = new Observable<string>((subscriber) => {
console.log('[Observable] executed');
subscriber.next('Hello');
setTimeout(() => subscriber.next('World'), 1000);
console.log('---');
setTimeout(() => {
subscriber.next('with RxJS');
// Complete
subscriber.complete();
}, 2000);
console.log('[Observable] finished');
// Teardown
return () => {
console.log('[Observable] unsubscribed (Teardown)');
};
});
const observer = {
next: (value: string) => console.log(value),
complete: () => console.log('[subscription] completed'),
};
console.log('App start');
const subscription = observable$.subscribe(observer);
可以思考一下,最終 console.log 的結果會是:
App start
[Observable] executed
Hello
---
[Observable] finished
World
with RxJS
[subscription] completed
[Observable] unsubscribed (Teardown)
但若我們在最後直接呼叫 subscription.unsubscribe();
的話,則所有非同步的操作都不會被執行。
subscribing 就像以 observer
爲參數去呼叫 Observable 中的函式,所以即使我們 subscribe 相同的 Observable 很多次,但每一個 subscription 都是獨立的 Observable 被執行。
Cold and Hot Observable
- Cold Observable:每一個 subscription 間取得的值都是各自獨立時,就是屬於 cold observable
- emit value 的來源會是在 observable 內部
- 每一個 subscription 會獲取不同的值
- 例如,HTTP Request、setInterval/setTimeout
- Hot Observable:每一個 subscriptions 間共享相同的來源,並獲取相同的值時,就屬於 hot observable
- emit value 的來源會是在 observable 外部,從這個相同的來源 multicast 資料
- 不同的 subscription 即使在不同時間訂閱 observable,接受到通知時還是會得到相同的值
- 例如,click event 發生時,多個不同的 subscription 都會被通知,它們的來源都是相同的。這種情況下,emit value 的來源會來自 Observable 外的邏輯(DOM 的 click event 本身並不在 Observable 內)
- Observable 並不是非 Hot 即 Cold,有的 Observable 同時屬於 Code 和 Hot;有的一開始時 Cold 後來變成 Hot。
Cold Observable
/**
* Code Observable
**/
import { ajax } from 'rxjs/ajax';
// ajax$ 是一個 observable
const ajax$ = ajax<any>('https://random-data-api.com/api/name/random_name');
// 每一個 subscriptions 間取得的值都是各自獨立時,就是屬於 cold observable
// 這裡每個 subscription 拿到的 first_name 的值都會不同
ajax$.subscribe((data) => console.log('Sub 1: ', data.response.first_name));
ajax$.subscribe((data) => console.log('Sub 2: ', data.response.first_name));
ajax$.subscribe((data) => console.log('Sub 3: ', data.response.first_name));
Hot Observable
/**
* Hot Observable
**/
import { Observable } from 'rxjs';
const helloButton = document.getElementById('hello-btn');
const helloBtnClick$ = new Observable<MouseEvent>((subscriber) => {
helloButton.addEventListener('click', (e) => {
subscriber.next(e);
});
});
// 每一個 subscriptions 間會共享相同的來源,取得相同的值時,就屬於 hot observable
// 這裡每個 subscription 取得的值都來自 helloButton 的 click event,它們收到的值都會是相同的
helloBtnClick$.subscribe((e) => console.log('sub1 ', e.type, e.x, e.y));
helloBtnClick$.subscribe((e) => console.log('sub2 ', e.type, e.x, e.y));
helloBtnClick$.subscribe((e) => console.log('sub3 ', e.type, e.x, e.y));
Subscription LifeCycle
圖片來源:RxJS 7 and Observables: Introduction @ Udemy
如果有監聽事件 或使用 setInterval 等計數器,記得要在 teardown 時把它解除,否則它會持續執行,甚至導致 memory leak:
import { Observable, Observer } from 'rxjs';
const observable$ = new Observable<string>((subscriber) => {
let timerId;
setInterval(() => {
timerId = subscriber.next('foo');
}, 1000);
// Teardown
return () => {
clearInterval(timerId);
console.log('observable teardown');
};
});
const observer: Observer<string> = {
next: (value: string) => console.log(value),
complete: () => console.log('complete'),
error: (err: Error) => console.log(err.message),
};
const subscription = observable$.subscribe(observer);
// 如果對於 Observable 提供的值不在感興趣,要 unsubscribe
setTimeout(() => {
subscription.unsubscribe();
}, 3000);
Operators
Type of Operators
當 Observable 傳來資料後,在資料傳遞給 Observer 之前,可以透過 Operators 針對資料進行處理。Operators 可以分成幾類:
- Creation operators:可以將幾乎所有東西變成 observable。常見的像是
of
,from
,fromEvent
,timer
,interval
。 - Combination operators:可以一次收集來自多個 observable 的資料。常見的像是
combineLatest
,concat
,merge
,startWith
和withLatestFrom
。 - Error handling operators:用來處理錯誤,重試請求等。最常見的是
catchError
。 - Filtering operators:用來過濾資料。例如
debounceTime
,distinctUntilChanged
,filter
,take
,takeUntil
。 - Multicasting operators:預設的情況下,RxJS 的 observable 是 cold / unicast 的,也就是一個 observable 只會有一個 subscriber。透過 multicasting operators 可以讓 observable 變成 hot / multicasting 的,可以將資料傳給多個 subscribers。最常見的是
shareReplay
。 - Transformation operators:用來轉換資料,常見的如
concatMap
,map
,mergeMap
,scan
和switchMap
。
Creation Functions / Creation Operators
Creation Functions 提供了方便建立 Observable 的方法。
of
emit 一系列的值後 complete,會以一個一個參數的方式帶入資料:
import { of } from 'rxjs';
//emits any number of provided values in sequence
const numbers$ = of(1, 3, 6);
const observer = {
next: (value) => console.log(value),
complete: () => console.log('complete'),
};
const subscription = numbers$.subscribe(observer);
如果不用 of
operator,而是自己建立 observable 的話,寫起來就會像這樣:
// 等同於 const numbers$ = of(1, 3, 6);
const numbers$ = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(3);
subscriber.next(6);
subscriber.complete();
});
如果知道 Observable 是怎麼被建立的邏輯後,我們也可自己建立成 Creation Function,例如 myOf
:
const myOf = (...args: number[]): Observable<number> => {
return new Observable((observer) => {
args.forEach((arg) => observer.next(arg));
observer.complete();
});
};
// 等同於 const numbers$ = of(1, 3, 6);
const numbers$ = myOf(1, 3, 6);
from
帶入陣列作為參數
from
在帶入的參數是陣列時,它的效果和 of
的效果幾乎相同,差別只在於 from
是以一個「陣列」的方式當作參數傳入,而 of
是以多個參數的方式傳入。
import { from } from 'rxjs';
// emit array as a sequence of values
const numbers$ = from([1, 3, 6]);
const observer = {
next: (value: number) => console.log(value),
complete: () => console.log('complete'),
};
const subscription = numbers$.subscribe(observer);
帶入 Promise 作為參數
from
也可以傳入 Promise,是直接傳入 Promise,而不是 Promise[],from 會自動把
resolve
的值帶入next()
,並呼叫complete
- 或是把
reject
的值帶入error()
後終止(不會執行到complete
):
import { Observable, from } from 'rxjs';
const sleep = (ms: number) => new Promise<number>((resolve) => setTimeout(() => resolve(ms), ms));
// 直接在 from 中帶入 Promise 而不是 Promise[]
const promise$ = from(sleep(1000));
const observer = {
next: (value) => console.log(value),
complete: () => console.log('complete'),
};
const subscription = promise$.subscribe(observer);
fromEvent
透過 fromEvent creation function 可以根據不同的 event 來源建立 Observable:
- 在執行
subscribe / unsubscribe
時,RxJS 會分別執行對應的addEventListener / removeEventListener
- 不會執行 complete,需要自行呼叫 unsubscribe
- Hot Observable
// fromEvent(element, 'click')
import { fromEvent } from 'rxjs';
const triggerButton = document.getElementById('hello-btn');
const clickEvent$ = fromEvent<MouseEvent>(triggerButton, 'click');
const subscription = clickEvent$.subscribe((event) => console.log(event.type, event.x, event.y));
同樣的,如果沒用 fromEvent
的話,我們也可以寫出類似的 Observable:
// 等同於
// const clickEvent$ = fromEvent<MouseEvent>(triggerButton, 'click');
const clickEvent$ = new Observable<MouseEvent>((subscriber) => {
const handler = (e: MouseEvent) => subscriber.next(e);
triggerButton.addEventListener('click', handler);
return () => triggerButton.removeEventListener('click', handler);
});
timer
- 對應到的是
setTimeout
- 時間到時會呼叫 complete,因此可以不用手動 unsubscribe
- Cold Observable
// timer(dueTime: number | Date = 0)
import { timer, Observer } from 'rxjs';
const timer$ = timer(2000);
const observer: Observer<number> = {
next: (value) => console.log(value),
complete: () => console.log('completed !'),
error: () => console.log('error !'),
};
const subscription = timer$.subscribe(observer);
同樣的,如果不使用 RxJS 提供的 timer
,也可以自己寫一個類似的 myTimer:
const myTimer = (ms) => {
return new Observable((subscriber) => {
const timerId = setTimeout(() => {
subscriber.next(0);
subscriber.complete();
return;
}, ms);
return () => clearTimeout(timerId);
});
};
interval
- 對應到
setInterval
- 需要主動呼叫
subscription.unsubscribe()
來結束它(且結束時 complete 不會被呼叫) - Cold Observable
// interval(period: number = 0)
import { interval, Observer, Observable } from 'rxjs';
const interval$ = interval(1000);
const observer: Observer<number> = {
next: (value) => console.log(value),
complete: () => console.log('completed !'),
error: () => console.log('error !'),
};
const subscription = interval$.subscribe(observer);
// complete 不會被呼叫
subscription.unsubscribe();
同樣的,可以寫出自己的 interval
creation function,這裡稱作 myInterval
:
const myInterval = (ms) => {
return new Observable((subscriber) => {
let count = 0;
const timerId = setInterval(() => {
subscriber.next(count++);
return;
}, ms);
return () => clearTimeout(timerId);
});
};
Pipeable Operators 的概念
透過 Pipeable Operators,可以讓 Observable 在 emit value 後,該 notification 在抵達 Observer 之前,先透過 pipeable operator 進行一些轉換:
圖片來源:RxJS 7 and Observables: Introduction @ Udemy
具體來說,Pipeable Operators 和原本的 Observable Source 間,組成了一個新的 Observable 被 Observer 所訂閱:
NewObservable$ = OriginalObservable$ + Pipeable Operator;
經過轉換後的得到的值或被通知到的時間點可能和一開始的 source 間有明顯的差異。
Pipeable Operators 會和原本的 Observable 結合後,變成新的 Observable 可供訂閱。
filter
filter operator 可以過濾掉 next()
傳出的值,但如果是 error()
或 complete()
則不會做任何過濾。
import { filter, Observable, Observer } from 'rxjs';
interface NewsItem {
category: 'Business' | 'Sports';
content: string;
}
const newsFeed$ = new Observable<NewsItem>((subscriber) => {
setTimeout(() => subscriber.next({ category: 'Business', content: 'A' }), 1000);
setTimeout(() => subscriber.next({ category: 'Sports', content: 'B' }), 2000);
setTimeout(() => subscriber.next({ category: 'Business', content: 'C' }), 3000);
setTimeout(() => subscriber.next({ category: 'Sports', content: 'D' }), 4000);
setTimeout(() => subscriber.next({ category: 'Business', content: 'E' }), 5000);
});
// 原本的 Observable + Pipeable Observable = 新的 Observable
// 使用 filter operator 後,可以產生新的 Observable
const businessNewsFeed$ = newsFeed$.pipe(filter((item) => item.category === 'Business'));
const sportsNewsFeed$ = newsFeed$.pipe(filter((item) => item.category === 'Sports'));
const subscription = businessNewsFeed$.subscribe((value) => console.log(value));
map
map
operator 會針對 next()
收到的值進行處理,但不會管 error()
或 complete()
的內容。
import { forkJoin, map } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const randomName$ = ajax<any>('https://random-data-api.com/api/name/random_name');
const randomFirstName$ = randomName$.pipe(map((ajaxResponse) => ajaxResponse.response.first_name));
const randomNation$ = ajax<any>('https://random-data-api.com/api/nation/random_nation');
const randomNationCapital$ = randomNation$.pipe(
map((ajaxResponse) => ajaxResponse.response.capital),
);
const randomFood$ = ajax<any>('https://random-data-api.com/api/food/random_food');
const randomFoodDish$ = randomFood$.pipe(map((ajaxResponse) => ajaxResponse.response.dish));
forkJoin([randomFirstName$, randomNationCapital$, randomFoodDish$]).subscribe(
([firstName, capital, dish]) =>
console.log(`${firstName} is from ${capital} and likes to eat ${dish}.`),
);
tap / do
在 tap 中可以使用帶有 side effect 的操作。
tap 接收的參數和 subscribe()
很像,參數可以直接是 function 會取得 next
傳來的參數,也可以是 object,放入的是 observer({ next, error, complete }
),差別在於它是 pipeable operator,如果該 observable 最終沒有被 subscribe()
的話,該 Observable 不會被執行:
// https://www.learnrxjs.io/learn-rxjs/operators/utility/do#examples
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
const subscription = of(1, 2, 3, 4, 5)
.pipe(
tap((val) => console.log(`BEFORE MAP: ${val}`)),
map((val) => val + 10),
tap((val) => console.log(`AFTER MAP: ${val}`)),
)
.subscribe(console.log);
可以接收 Observer 物件當作參數,tap({next, error, complete})
:
import { filter, map, of, tap } from 'rxjs';
of(1, 7, 3, 6, 2)
.pipe(
map((value) => value * 2),
tap({
next: (value) => console.log('tap next: ', value),
complete: () => console.log('tap complete'),
error: (err) => console.log('tap error', err),
}),
filter((value) => value > 5),
)
.subscribe((value) => console.log('receive: ', value));
另外在 RxJS 7.3.0 之後,tap 還多了幾個屬性可以使用:
subscribe
:subscription 被建立時unsubscribe
:subscription 被 unsubscribe 時,但收到 error 或 complete 時不會觸發finalize
:subscription 結束時,不論是 unsubscribe、error 或 complete 都會觸發
import { filter, map, of, tap } from 'rxjs';
of(1, 7, 3, 6, 2)
.pipe(
map((value) => value * 2),
tap({
next: (value) => console.log('tap next: ', value),
complete: () => console.log('tap complete'),
error: (err) => console.log('tap error', err),
}),
filter((value) => value > 5),
tap({
subscribe: () => console.log('subscription made'),
unsubscribe: () => console.log('subscription unsubscribed'),
finalize: () => console.log('subscription finalized'),
}),
)
.subscribe((value) => console.log('receive: ', value));
debounceTime
超過一段時間沒有 emit value 後,才會實際發出 notification。例如,debounceTime(1000)
指的是該 Observable 在 1000ms 內沒有 emit 新的值的話,才會把該 value 通知給 observer:
import { debounceTime, fromEvent, map, tap } from 'rxjs';
const sliderInput = document.querySelector('input#slider');
const onSliderChange$ = fromEvent(sliderInput, 'input');
// 在一秒內如果沒有 emit 新的 value 的話,才會通知 onSlideChangeWithDebounce$ 的 subscriber
const onSlideChangeWithDebounce$ = onSliderChange$.pipe(
// 每一次 input 都會觸發
tap(() => console.log('slider changed')),
// 在一秒內如果沒有 emit 新的 value 的話,才會往後面的 pipeline 走
debounceTime(1000),
map((event) => event.target['value']),
);
const subscription = onSlideChangeWithDebounce$.subscribe((event) => console.log(event));
withLatestFrom
會跟著一個主要的 Observable(例如 click
),但需要在另一個 Observable 有值的前提下,才會通知 subscriber。
// https://rxjs.dev/api/operators/withLatestFrom
import { fromEvent, interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const timer = interval(1000);
const result = clicks.pipe(withLatestFrom(timer));
result.subscribe((x) => console.log(x));
Combination Operators
forkJoin:類似 Promise.all
- 接受其他的 Observable 作為來源(
Observable[]
),當這個 observable 被訂閱時,它會為裡面每一個 observable 建立 subscription,直到所有的 observable 都取得結果後,會以陣列的方式回傳每一個 observable emit 的值,此外,只有有其中一個 observable 呼叫了error()
,forkJoin$
的 subscription 就 會立即結束並取得錯誤,整體的行為很類似Promise.all()
。 - 有一點可以留意的是,即使
forkJoin$
的 subscription 是以為 error 而提前結束,每一個裡面的 observable 依然會執行的 teardown 的邏輯(這點可以留意 Subscription LifeCycle 的圖),透過這個方式,可以讓其中一個 request 失敗時,就取消所有其他的 observable。 - Cold Observable
import { forkJoin } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const BASE_URL = 'https://random-data-api.com/api';
// 建立多個 observable
const randomName$ = ajax(`${BASE_URL}/name/random_name`);
const randomNation$ = ajax(`${BASE_URL}/nation/random_nation`);
const randomFood$ = ajax(`${BASE_URL}/food/random_food`);
// 使用 forkJoin 需等到所有 observable 都取得值時,才會 emit
const forkJoin$ = forkJoin([randomName$, randomNation$, randomFood$]);
const observer = {
next: ([name, nation, food]) => {
console.log(
`${name.response.first_name} is from ${nation.response.capital} and likes to eat ${food.response.dish}.`,
);
},
error: (err) => console.log(err),
complete: () => console.log('All done'),
};
const subscription = forkJoin$.subscribe(observer);
combineLatest
Observable 都有值之後,不論那個 Observable emit value,都會通知。
- 和
forkJoin
類似,combineLatest
也可以就接收Observable[]
作為參數,差別在於 combineLatest 中,只要在每個 observable 都至少有一個 value 後,後續任一個 Observable 只要有 emit value 時,combineLatest 的 subscription 就可以立即收到值,並且以陣列的方式加以通知。 - 每一個 observable 都至少要有 emit 過一次 value,才會觸發後續
combineLatest$
的 emit value,所以從下圖中可以看到,當其中一個 observable 有 emit1
時,combineLatest$
還不會 emit,需要等到每個 observable 都至少 emit 過時,combineLatest$
才會 emit,所以當 emitA
時,combineLatest$
才會 emit1A
:
圖片來源:RxJS Marbles。
要特別留意的是,combineLatest$
中的任何 observable 有 emit value 就會發送通知的前提是:「每一個 observable 都至少要有 emit 過一次 value」。
程式碼範例
combineLatest$
subscription 要在temperatureInputEvent$
和conversionInputEvent$
都有之後才會 emit value
import { combineLatest, fromEvent } from 'rxjs';
/**
* Utils
*/
const temperatureConversion = (temperature: number, conversion: 'f-to-c' | 'c-to-f') => {
let result: number;
if (conversion === 'f-to-c') {
result = ((temperature - 32) * 5) / 9;
} else {
result = (temperature * 9) / 5 + 32;
}
return result;
};
const renderTemperature = (temperature: number, element: HTMLElement) => {
element.innerText = temperature.toFixed(2);
};
/**
* Select DOM Elements
*/
const temperatureInput = document.getElementById('temperature-input');
const conversionDropdown = document.getElementById('conversion-dropdown');
const resultText = document.getElementById('result-text');
/**
* Create Observables
*/
const temperatureInputEvent$ = fromEvent<InputEvent>(temperatureInput, 'input');
const conversionInputEvent$ = fromEvent<InputEvent>(conversionDropdown, 'input');
// 使用 combineLatest,當兩個 observable 都有值時
// 任一個 observable emit value 即會送出通知
const subscription = combineLatest([temperatureInputEvent$, conversionInputEvent$]).subscribe(
([temperatureInputEvent, conversionInputEvent]) => {
const temperature = temperatureConversion(
temperatureInputEvent.target['value'],
conversionInputEvent.target['value'],
);
renderTemperature(temperature, resultText);
},
);
catchError & EMPTY
- catchError 可以取得錯誤訊息後,回傳一個 Observable,這個 Observable emit 的 value 則會以 next 來通知。
- 如果收到 error 後不希望收到任何值,可以使用
EMPTY
這個 Creation Function(Creation Operators) - 即使有 catchError 並回傳 fallback value,但只要 Observable 有 error 發生,該 subscription 依然會被 complete。
圖片來源:RxJS 7 and Observables: Introduction @ Udemy
import { catchError, EMPTY, Observable, Observer, of } from 'rxjs';
const failingHttpRequest$ = new Observable((subscriber) => {
setTimeout(() => {
subscriber.error(new Error('Timeout 1500'));
}, 2000);
setInterval(() => {
subscriber.next(new Date().getTime());
}, 500);
});
const failingHttpRequestWithCatchError$ = failingHttpRequest$.pipe(
// catchError(err => observable$)
// catch 到 error 後,會回傳 Observable(inner observable)
// 該 Observable emit 的 value 會以 next 通知 subscriber
catchError((error) => of('Fallback value')),
);
// EMPTY 被訂閱時不會回傳任何內容,而是直接通知 complete
const failingHttpRequestWithMuteError$ = failingHttpRequest$.pipe(catchError(() => EMPTY));
const observer: Observer<number> = {
next: (value) => console.log(value),
error: (error) => console.log(error),
complete: () => console.log('complete'),
};
const subscription = failingHttpRequestWithCatchError$.subscribe(observer);
Transformation Operators & Flattening Operators
Flattening Operators 有一個重點是,在這個 operator 內,還可以有 inner observable 所建立出來的 inner subscription,且這個 inner observable complete 時,不會終止外部原本的 subscription。
concatMap
、mergeMap
、switchMap
和 exhaustMap
的差別主要在於處理 concurrency 時的差異,也就是說,如果前一個 inner subscription 還沒 complete 的話,要以什麼「模式」來處理:
concatMap
:一次一個,且會等到前一個執行完成(complete)後,才會執行下一個(即,以 queue 的方式進行)。好處是它可以確保 request 和 response 的順序,不會因為前一個 response 比較晚回來,而導致後面的 response 被比較早發出的 response 給覆蓋掉。由於前一個 subscription 沒有完成的話,將不會發動下一次的 subscription,所以如果有 memory leak 的風險是最容易被發現的。switchMap
:不會等前一個 inner subscription 完成,而是會取消前一個 subscription,並總是用最後一次 request 取得的 response。但要留意的是雖然它會取消前一次的 subscription,但並不代表 HTTP request 就不會被送出(因為請求已經發出),只是我們不理 會該次的 response。switchMap 的好處是在像 server 取得資料是,可以確保拿到的是最新一次的 response。mergeMap
:可以同時有多個 subscription 在運作(concurrently),且先回來的 response 就會通知到 main subscription,所以不會確保 request 對應到的 response 順序。如果開發者沒有確保每一個 inner subscription 都被 complete 的話,是最容易導致 memory leak 的。
範例程式碼
下面關於 concatMap、switchMap 和 mergeMap 的範例可以參考連結:
RxJS Flattening Operators @ pjchender stackblitz
concatMap
- 每次點擊都會觸發,但會被排入 queue 不會馬上執行,需要等前一個 complete 後,才會執行下一個
import { fromEvent, map, concatMap, tap, timer } from 'rxjs';
const randomNum = () => Math.floor(Math.random() * 2000);
let timerId = 0;
const fetchButton = document.querySelector('button#fetch');
const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => ({
timerId: timerId++,
timeout: randomNum(),
})),
// main subscription 每次收到點擊時都會被觸發
tap(({ timeout, timerId: _timerId }) =>
console.log(`receive click timerId: ${_timerId} (${timeout})`),
),
concatMap(({ timerId: _timerId, timeout: _timeout }) =>
// 當 concatMap 中的 observable(inner subscription)連續 emit value 時
// 會全部進入 cue 中,等前一個 complete 後才會開始執行下一個
timer(_timeout).pipe(
map(() => ({ _timerId, _timeout })),
tap({
next: (value) => console.log('[inner subscription] timerId: ', value),
complete: () => console.log('[inner subscription] complete'),
}),
),
),
);
const subscription = handleClick$.subscribe({
next: (value) => console.log('[main subscription] value: ', value),
error: (err) => console.log('err', err),
complete: () => console.log('[main subscription] complete'),
});
switchMap
- switchMap 內的 observable(inner subscription)被多次觸發時,前一次的如果還沒完成則會被 cancel
- 但對於 main subscription 來說,是沒錯點擊時都會被觸發
import { fromEvent, map, concatMap, tap, timer, switchMap } from 'rxjs';
const randomNum = () => Math.floor(Math.random() * 2000);
let timerId = 0;
const fetchButton = document.querySelector('button#fetch');
const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => ({
timerId: timerId++,
timeout: randomNum(),
})),
// main subscription 每次收到點擊時都會被觸發
tap(({ timeout, timerId: _timerId }) =>
console.log(`receive click timerId: ${_timerId} (${timeout})`),
),
switchMap(({ timerId: _timerId, timeout: _timeout }) =>
// switchMap 中的 observable(inner subscription)連續 emit value 時
// 前一次的 timer 會被 cancel
timer(_timeout).pipe(
map(() => ({ _timerId, _timeout })),
tap({
next: (value) => console.log('[inner subscription] timerId: ', value),
complete: () => console.log('[inner subscription] complete'),
}),
),
),
);
const subscription = handleClick$.subscribe({
next: (value) => console.log('[main subscription] value: ', value),
error: (err) => console.log('err', err),
complete: () => console.log('[main subscription] complete'),
});
mergeMap
以併發的方式發出請求,點幾次就會發生幾次,且不會確保資料回來的順序**,先完成的就先往後通知**:
import { fromEvent, map, concatMap, tap, timer, switchMap, mergeMap } from 'rxjs';
const randomNum = () => Math.floor(Math.random() * 2000);
let timerId = 0;
const fetchButton = document.querySelector('button#fetch');
const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => ({
timerId: timerId++,
timeout: randomNum(),
})),
// main subscription 每次收到點擊時都會被觸發
tap(({ timeout, timerId: _timerId }) =>
console.log(`receive click timerId: ${_timerId} (${timeout})`),
),
mergeMap(({ timerId: _timerId, timeout: _timeout }) =>
// 當 mergeMap 中的 observable(inner subscription)連續 emit value 時
// 會以併發的方式發出,且先 complete
timer(_timeout).pipe(
map(() => ({ _timerId, _timeout })),
tap({
next: (value) => console.log('[inner subscription] timerId: ', value),
complete: () => console.log('[inner subscription] complete'),
}),
),
),
);
const subscription = handleClick$.subscribe({
next: (value) => console.log('[main subscription] value: ', value),
error: (err) => console.log('err', err),
complete: () => console.log('[main subscription] complete'),
});
以實際的 API Request 做說明
當 concatMap 內部的 observable 結束時,它的 complete 通知只會在內部的 subscription,並不會傳到傳到外面(原本)的 subscription,如此原本的 subscription 才不會被終止。可 以看到下圖中灰色的部分是屬於 concatMap 內的 observable,當該 observable complete 時,並不會傳到外部的 subscription:
然而,如果原本的 observable 發生 error 時,這個 error 的發生會終止 main(outer) subscription:
如果我們希望在發生錯誤後,main subscription 還能持續運作的話,需要在 inner subscription 中使用 catchError 讓它在 inner subscription 被終止:
// concatMap(() => observable$)
const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => endpointInput.value),
concatMap((value) =>
ajax(`${BASE_URL}/${value}/random_${value}`).pipe(
// 如果我們希望在發生錯誤後,main subscription 還能持續運作
// 則要在 inner subscription 中使用 catchError 與 EMPTY
// 讓它在 inner subscription 被 complete 而不會把 error 傳到外層的 main subscription
catchError(() => EMPTY),
),
),
map((ajaxResp) => ajaxResp.response),
// catchError(() => EMPTY),
);
const subscription = handleClick$.subscribe({
next: console.log,
error: (err) => console.log('err', err),
complete: () => console.log('complete'),
});
如此,即使 ajax 發生錯誤後,main subscription 仍然可以持續運作。如下圖所示:
然而,如果不是在 inner subscription 使用 catchError 而是在 main subscription 的話,當 error 發生時,main subscription 仍然會被終止:
const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => endpointInput.value),
concatMap((value) => ajax(`${BASE_URL}/${value}/random_${value}`).pipe(catchError(() => EMPTY))),
map((ajaxResp) => ajaxResp.response),
// 如果是在 main subscription 的話
// 當 error 發生時 EMPTY 會通知 complete 到 main subscription
// 進而終止 main subscription
catchError(() => EMPTY),
);
如圖:
圖片來源:RxJS 7 and Observables: Introduction @ Udemy
import { concatMap, Observable, of, tap } from 'rxjs';
const source$ = new Observable((subscriber) => {
setTimeout(() => subscriber.next('A'), 1000);
setTimeout(() => subscriber.next('B'), 2000);
});
console.log('App has started');
source$
.pipe(
concatMap((value) =>
// here is an inner subscription
of(value).pipe(
tap({
next: (value) => console.log(`[inner] Value: ${value}`),
complete: () => console.log('[inner] Complete'),
}),
),
),
tap({
next: (value) => console.log(`Value: ${value}`),
complete: () => console.log('Complete'),
}),
)
.subscribe((value) => console.log(value));
Subject
透過 Subject,可以使用群播(multicasting)的方式,同時通知多個訂閱者。
Subject 同時會是 Observable 和 Observer:
- Observable:它可以被訂閱
- Observer:它可以呼叫
next()
、error()
和complete()
圖片來源:RxJS 7 and Observables: Introduction @ Udemy