Skip to main content

[note] RxJS 筆記

本篇內容大量整理自 RxJS 7 and Observables: Introduction @ Udemy。另外,由於 Operators 的種類眾多,建議可以搭配 Learn RxJSRxJS Guide 學習。

TL;DR;

Observable 會發送資料給 Observer,但它只管傳遞資料,並不會去管 Subscribers 拿資料去做什麼。

import { Observable, Observer } from 'rxjs';
const observable$ = new Observable()<string>((subscriber) => {
// Next
subscriber.next('foo');

// Error
subscriber.error(new Error('some error occurred'));

// Complete
subscriber.complete();

// Teardown
return () => {
console.log('observable teardown');
};
});

const observer: Observer<string> = {
next: (value: string) => console.log(value),
complete: () => console.log('complete'),
error: (err: Error) => console.log(err.message),
};

const subscription = observable$.subscribe(observer);
// 如果只需要 next handler 可以在參數的地方直接帶入 function 而非帶入物件
// const subscriptionWithOnlyNextHandler = observable$.subscribe((value: string) =>
// console.log(value),
// );

// 如果對於 Observable 提供的值不在感興趣,要 unsubscribe
subscription.unsubscribe();

常用的 Transformation Operator:

  • concatMap:一次一個,依序執行,前一個完成後下一個才會接著執行。是最安全的方式,可以避免 memory leak,前一次的 observable 被取消,或沒有依照 request 的順序處理 response,但也因此可能比較沒效率。
  • switchMap:當 inner observable 有新的 emit value 是,會取消(cancel)前一次的 inner subscription,也就是說,一次只會有一個最新的 inner subscription 在進行,總是會拿到最新的順序。
  • mergeMap:以併發的方式執行多個 inner subscription,且不會確保 response 被處理的順序,先 complete 的 inner subscription 會先往後 notify。

重要概念

Observables

Array vs Stream

  • array 在一開始就會被定義好裡面有哪些資料,開發者對於 array 中有哪些元素存在可以一目瞭然
  • stream 中的項目(items)則可能會在不同的時間點產生,stream 有時間的概念在內,開發者不知道下一個時間點會得到什麼資料,但可以先寫好當某個項目出現是對應要執行的程式邏輯,這也是 reactive programming 的概念
  • 在 JavaScript 中的許多事件都可以用 stream 的角度來思考,例如使用者輸入資料時,我們不知道下個時間點會得到什麼資料,但可以先把當資料進來後的邏輯寫好;HTTP Response 的事件等等也是類似的概念。

Observable, Subscription and Observer

  • Observable:本身不會做任何事,它只是保存了許多邏輯
    • Notification 包含
      • next:訂閱期間收到 value 都會被觸發,沒有觸發的次數限制
      • error:表示 Observable 有 error 發生且不會再 emit 任何 value,因此觸發一次後即結束訂閱,可以從 payload 取得 error 的資料
      • complete:表示不會再 emit 任何 value,因此觸發一次後即結束訂閱,沒有可以帶資料的 payload
    • Observable 中所定義的 callback 會當這個 Observable 被 subscribe() 時才會執行
  • Observer:定義收到 Observable 傳來的 value 時所要執行的行為
  • Subscription:透過 subscribe 的方式,讓 Observable 中 emitted value 被 Observer 所執行,如此才會實際執行 observable 中的 callback function,並且讓 observer 可以接收到 observable 所通知的值

整個執行的流程會像這樣:

  • 當 Observable 被訂閱(subscribe)時,會建立起一個 Subscription,並執行 Observable 中的 callback,
  • 需要留意的是,每次 Observable 中的 subscriber.next() 被呼叫時,Observer 中就會立即收到 emitted 的 value 並加以執行,而不是等到所有 Observable 中的內容執行完後才開始執行 Observer 的內容
tip

可以把 observer 中定義的 function 想成會在 observable 中 subscriber 的位置執行。

如果我們在裡面放一些非同步的邏輯:

import { Observable } from 'rxjs';

const observable$ = new Observable<string>((subscriber) => {
console.log('[Observable] executed');

subscriber.next('Hello');

setTimeout(() => subscriber.next('World'), 1000);

console.log('---');

setTimeout(() => {
subscriber.next('with RxJS');

// Complete
subscriber.complete();
}, 2000);

console.log('[Observable] finished');

// Teardown
return () => {
console.log('[Observable] unsubscribed (Teardown)');
};
});

const observer = {
next: (value: string) => console.log(value),
complete: () => console.log('[subscription] completed'),
};

console.log('App start');
const subscription = observable$.subscribe(observer);

可以思考一下,最終 console.log 的結果會是:

App start
[Observable] executed
Hello
---
[Observable] finished
World
with RxJS
[subscription] completed
[Observable] unsubscribed (Teardown)

但若我們在最後直接呼叫 subscription.unsubscribe(); 的話,則所有非同步的操作都不會被執行。

tip

subscribing 就像以 observer 爲參數去呼叫 Observable 中的函式,所以即使我們 subscribe 相同的 Observable 很多次,但每一個 subscription 都是獨立的 Observable 被執行。

Cold and Hot Observable

  • Cold Observable:每一個 subscription 間取得的值都是各自獨立時,就是屬於 cold observable
    • emit value 的來源會是在 observable 內部
    • 每一個 subscription 會獲取不同的值
    • 例如,HTTP Request、setInterval/setTimeout
  • Hot Observable:每一個 subscriptions 間共享相同的來源,並獲取相同的值時,就屬於 hot observable
    • emit value 的來源會是在 observable 外部,從這個相同的來源 multicast 資料
    • 不同的 subscription 即使在不同時間訂閱 observable,接受到通知時還是會得到相同的值
    • 例如,click event 發生時,多個不同的 subscription 都會被通知,它們的來源都是相同的。這種情況下,emit value 的來源會來自 Observable 外的邏輯(DOM 的 click event 本身並不在 Observable 內)
  • Observable 並不是非 Hot 即 Cold,有的 Observable 同時屬於 Code 和 Hot;有的一開始時 Cold 後來變成 Hot。

Cold Observable

/**
* Code Observable
**/
import { ajax } from 'rxjs/ajax';

// ajax$ 是一個 observable
const ajax$ = ajax<any>('https://random-data-api.com/api/name/random_name');

// 每一個 subscriptions 間取得的值都是各自獨立時,就是屬於 cold observable
// 這裡每個 subscription 拿到的 first_name 的值都會不同
ajax$.subscribe((data) => console.log('Sub 1: ', data.response.first_name));
ajax$.subscribe((data) => console.log('Sub 2: ', data.response.first_name));
ajax$.subscribe((data) => console.log('Sub 3: ', data.response.first_name));

Hot Observable

/**
* Hot Observable
**/
import { Observable } from 'rxjs';

const helloButton = document.getElementById('hello-btn');

const helloBtnClick$ = new Observable<MouseEvent>((subscriber) => {
helloButton.addEventListener('click', (e) => {
subscriber.next(e);
});
});

// 每一個 subscriptions 間會共享相同的來源,取得相同的值時,就屬於 hot observable
// 這裡每個 subscription 取得的值都來自 helloButton 的 click event,它們收到的值都會是相同的
helloBtnClick$.subscribe((e) => console.log('sub1 ', e.type, e.x, e.y));
helloBtnClick$.subscribe((e) => console.log('sub2 ', e.type, e.x, e.y));
helloBtnClick$.subscribe((e) => console.log('sub3 ', e.type, e.x, e.y));

Subscription LifeCycle

圖片來源:RxJS 7 and Observables: Introduction @ Udemy

Subscription LifeCycle

如果有監聽事件或使用 setInterval 等計數器,記得要在 teardown 時把它解除,否則它會持續執行,甚至導致 memory leak:

import { Observable, Observer } from 'rxjs';
const observable$ = new Observable<string>((subscriber) => {
let timerId;

setInterval(() => {
timerId = subscriber.next('foo');
}, 1000);

// Teardown
return () => {
clearInterval(timerId);
console.log('observable teardown');
};
});

const observer: Observer<string> = {
next: (value: string) => console.log(value),
complete: () => console.log('complete'),
error: (err: Error) => console.log(err.message),
};

const subscription = observable$.subscribe(observer);

// 如果對於 Observable 提供的值不在感興趣,要 unsubscribe
setTimeout(() => {
subscription.unsubscribe();
}, 3000);

Operators

Type of Operators

當 Observable 傳來資料後,在資料傳遞給 Observer 之前,可以透過 Operators 針對資料進行處理。Operators 可以分成幾類:

  • Creation operators:可以將幾乎所有東西變成 observable。常見的像是 of, from, fromEvent, timer, interval
  • Combination operators:可以一次收集來自多個 observable 的資料。常見的像是 combineLatest, concat, merge, startWithwithLatestFrom
  • Error handling operators:用來處理錯誤,重試請求等。最常見的是 catchError
  • Filtering operators:用來過濾資料。例如 debounceTime, distinctUntilChanged, filter, take, takeUntil
  • Multicasting operators:預設的情況下,RxJS 的 observable 是 cold / unicast 的,也就是一個 observable 只會有一個 subscriber。透過 multicasting operators 可以讓 observable 變成 hot / multicasting 的,可以將資料傳給多個 subscribers。最常見的是 shareReplay
  • Transformation operators:用來轉換資料,常見的如 concatMap, map, mergeMap, scanswitchMap

Creation Functions / Creation Operators

Creation Functions 提供了方便建立 Observable 的方法。

of

emit 一系列的值後 complete,會以一個一個參數的方式帶入資料:

import { of } from 'rxjs';

//emits any number of provided values in sequence
const numbers$ = of(1, 3, 6);

const observer = {
next: (value) => console.log(value),
complete: () => console.log('complete'),
};

const subscription = numbers$.subscribe(observer);

如果不用 of operator,而是自己建立 observable 的話,寫起來就會像這樣:

// 等同於 const numbers$ = of(1, 3, 6);
const numbers$ = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(3);
subscriber.next(6);
subscriber.complete();
});

如果知道 Observable 是怎麼被建立的邏輯後,我們也可自己建立成 Creation Function,例如 myOf

const myOf = (...args: number[]): Observable<number> => {
return new Observable((observer) => {
args.forEach((arg) => observer.next(arg));
observer.complete();
});
};

// 等同於 const numbers$ = of(1, 3, 6);
const numbers$ = myOf(1, 3, 6);

from

帶入陣列作為參數

from 在帶入的參數是陣列時,它的效果和 of 的效果幾乎相同,差別只在於 from 是以一個「陣列」的方式當作參數傳入,而 of 是以多個參數的方式傳入。

import { from } from 'rxjs';

// emit array as a sequence of values
const numbers$ = from([1, 3, 6]);

const observer = {
next: (value: number) => console.log(value),
complete: () => console.log('complete'),
};

const subscription = numbers$.subscribe(observer);

帶入 Promise 作為參數

from 也可以傳入 Promise,是直接傳入 Promise,而不是 Promise[],from 會自動把

  • resolve 的值帶入 next(),並呼叫 complete
  • 或是把 reject 的值帶入 error() 後終止(不會執行到 complete):
import { Observable, from } from 'rxjs';

const sleep = (ms: number) =>
new Promise<number>((resolve) => setTimeout(() => resolve(ms), ms));

// 直接在 from 中帶入 Promise 而不是 Promise[]
const promise$ = from(sleep(1000));

const observer = {
next: (value) => console.log(value),
complete: () => console.log('complete'),
};

const subscription = promise$.subscribe(observer);

fromEvent

透過 fromEvent creation function 可以根據不同的 event 來源建立 Observable:

  • 在執行 subscribe / unsubscribe 時,RxJS 會分別執行對應的 addEventListener / removeEventListener
  • 不會執行 complete,需要自行呼叫 unsubscribe
  • Hot Observable
// fromEvent(element, 'click')
import { fromEvent } from 'rxjs';

const triggerButton = document.getElementById('hello-btn');

const clickEvent$ = fromEvent<MouseEvent>(triggerButton, 'click');

const subscription = clickEvent$.subscribe((event) =>
console.log(event.type, event.x, event.y),
);

同樣的,如果沒用 fromEvent 的話,我們也可以寫出類似的 Observable:

// 等同於
// const clickEvent$ = fromEvent<MouseEvent>(triggerButton, 'click');
const clickEvent$ = new Observable<MouseEvent>((subscriber) => {
const handler = (e: MouseEvent) => subscriber.next(e);

triggerButton.addEventListener('click', handler);

return () => triggerButton.removeEventListener('click', handler);
});

timer

  • 對應到的是 setTimeout
  • 時間到時會呼叫 complete,因此可以不用手動 unsubscribe
  • Cold Observable
// timer(dueTime: number | Date = 0)
import { timer, Observer } from 'rxjs';

const timer$ = timer(2000);

const observer: Observer<number> = {
next: (value) => console.log(value),
complete: () => console.log('completed !'),
error: () => console.log('error !'),
};

const subscription = timer$.subscribe(observer);

同樣的,如果不使用 RxJS 提供的 timer,也可以自己寫一個類似的 myTimer:

const myTimer = (ms) => {
return new Observable((subscriber) => {
const timerId = setTimeout(() => {
subscriber.next(0);
subscriber.complete();
return;
}, ms);

return () => clearTimeout(timerId);
});
};

interval

  • 對應到 setInterval
  • 需要主動呼叫 subscription.unsubscribe() 來結束它(且結束時 complete 不會被呼叫)
  • Cold Observable
// interval(period: number = 0)
import { interval, Observer, Observable } from 'rxjs';

const interval$ = interval(1000);

const observer: Observer<number> = {
next: (value) => console.log(value),
complete: () => console.log('completed !'),
error: () => console.log('error !'),
};

const subscription = interval$.subscribe(observer);

// complete 不會被呼叫
subscription.unsubscribe();

同樣的,可以寫出自己的 interval creation function,這裡稱作 myInterval

const myInterval = (ms) => {
return new Observable((subscriber) => {
let count = 0;
const timerId = setInterval(() => {
subscriber.next(count++);
return;
}, ms);

return () => clearTimeout(timerId);
});
};

Pipeable Operators 的概念

透過 Pipeable Operators,可以讓 Observable 在 emit value 後,該 notification 在抵達 Observer 之前,先透過 pipeable operator 進行一些轉換:

RxJS

圖片來源:RxJS 7 and Observables: Introduction @ Udemy

具體來說,Pipeable Operators 和原本的 Observable Source 間,組成了一個新的 Observable 被 Observer 所訂閱:

NewObservable$ = OriginalObservable$ + Pipeable Operator;

經過轉換後的得到的值或被通知到的時間點可能和一開始的 source 間有明顯的差異。

tip

Pipeable Operators 會和原本的 Observable 結合後,變成新的 Observable 可供訂閱。

filter

filter operator 可以過濾掉 next() 傳出的值,但如果是 error()complete() 則不會做任何過濾。

import { filter, Observable, Observer } from 'rxjs';

interface NewsItem {
category: 'Business' | 'Sports';
content: string;
}

const newsFeed$ = new Observable<NewsItem>((subscriber) => {
setTimeout(() => subscriber.next({ category: 'Business', content: 'A' }), 1000);
setTimeout(() => subscriber.next({ category: 'Sports', content: 'B' }), 2000);
setTimeout(() => subscriber.next({ category: 'Business', content: 'C' }), 3000);
setTimeout(() => subscriber.next({ category: 'Sports', content: 'D' }), 4000);
setTimeout(() => subscriber.next({ category: 'Business', content: 'E' }), 5000);
});

// 原本的 Observable + Pipeable Observable = 新的 Observable
// 使用 filter operator 後,可以產生新的 Observable
const businessNewsFeed$ = newsFeed$.pipe(filter((item) => item.category === 'Business'));
const sportsNewsFeed$ = newsFeed$.pipe(filter((item) => item.category === 'Sports'));

const subscription = businessNewsFeed$.subscribe((value) => console.log(value));

map

map operator 會針對 next() 收到的值進行處理,但不會管 error()complete() 的內容。

import { forkJoin, map } from 'rxjs';

import { ajax } from 'rxjs/ajax';

const randomName$ = ajax<any>('https://random-data-api.com/api/name/random_name');
const randomFirstName$ = randomName$.pipe(
map((ajaxResponse) => ajaxResponse.response.first_name),
);

const randomNation$ = ajax<any>('https://random-data-api.com/api/nation/random_nation');
const randomNationCapital$ = randomNation$.pipe(
map((ajaxResponse) => ajaxResponse.response.capital),
);

const randomFood$ = ajax<any>('https://random-data-api.com/api/food/random_food');
const randomFoodDish$ = randomFood$.pipe(
map((ajaxResponse) => ajaxResponse.response.dish),
);

forkJoin([randomFirstName$, randomNationCapital$, randomFoodDish$]).subscribe(
([firstName, capital, dish]) =>
console.log(`${firstName} is from ${capital} and likes to eat ${dish}.`),
);

tap / do

在 tap 中可以使用帶有 side effect 的操作。

tap 接收的參數和 subscribe() 很像,參數可以直接是 function 會取得 next 傳來的參數,也可以是 object,放入的是 observer({ next, error, complete }),差別在於它是 pipeable operator,如果該 observable 最終沒有被 subscribe() 的話,該 Observable 不會被執行:

// https://www.learnrxjs.io/learn-rxjs/operators/utility/do#examples
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

const subscription = of(1, 2, 3, 4, 5)
.pipe(
tap((val) => console.log(`BEFORE MAP: ${val}`)),
map((val) => val + 10),
tap((val) => console.log(`AFTER MAP: ${val}`)),
)
.subscribe(console.log);

可以接收 Observer 物件當作參數,tap({next, error, complete})

import { filter, map, of, tap } from 'rxjs';

of(1, 7, 3, 6, 2)
.pipe(
map((value) => value * 2),
tap({
next: (value) => console.log('tap next: ', value),
complete: () => console.log('tap complete'),
error: (err) => console.log('tap error', err),
}),
filter((value) => value > 5),
)
.subscribe((value) => console.log('receive: ', value));

另外在 RxJS 7.3.0 之後,tap 還多了幾個屬性可以使用:

  • subscribe:subscription 被建立時
  • unsubscribe:subscription 被 unsubscribe 時,但收到 error 或 complete 時不會觸發
  • finalize:subscription 結束時,不論是 unsubscribe、error 或 complete 都會觸發
import { filter, map, of, tap } from 'rxjs';

of(1, 7, 3, 6, 2)
.pipe(
map((value) => value * 2),
tap({
next: (value) => console.log('tap next: ', value),
complete: () => console.log('tap complete'),
error: (err) => console.log('tap error', err),
}),
filter((value) => value > 5),
tap({
subscribe: () => console.log('subscription made'),
unsubscribe: () => console.log('subscription unsubscribed'),
finalize: () => console.log('subscription finalized'),
}),
)
.subscribe((value) => console.log('receive: ', value));

debounceTime

超過一段時間沒有 emit value 後,才會實際發出 notification。例如,debounceTime(1000) 指的是該 Observable 在 1000ms 內沒有 emit 新的值的話,才會把該 value 通知給 observer:

import { debounceTime, fromEvent, map, tap } from 'rxjs';

const sliderInput = document.querySelector('input#slider');

const onSliderChange$ = fromEvent(sliderInput, 'input');

// 在一秒內如果沒有 emit 新的 value 的話,才會通知 onSlideChangeWithDebounce$ 的 subscriber
const onSlideChangeWithDebounce$ = onSliderChange$.pipe(
// 每一次 input 都會觸發
tap(() => console.log('slider changed')),

// 在一秒內如果沒有 emit 新的 value 的話,才會往後面的 pipeline 走
debounceTime(1000),
map((event) => event.target['value']),
);

const subscription = onSlideChangeWithDebounce$.subscribe((event) => console.log(event));

withLatestFrom

會跟著一個主要的 Observable(例如 click),但需要在另一個 Observable 有值的前提下,才會通知 subscriber。

withLatestFrom

// https://rxjs.dev/api/operators/withLatestFrom
import { fromEvent, interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const timer = interval(1000);
const result = clicks.pipe(withLatestFrom(timer));
result.subscribe((x) => console.log(x));

Combination Operators

forkJoin:類似 Promise.all

  • 接受其他的 Observable 作為來源(Observable[]),當這個 observable 被訂閱時,它會為裡面每一個 observable 建立 subscription,直到所有的 observable 都取得結果後,會以陣列的方式回傳每一個 observable emit 的值,此外,只有有其中一個 observable 呼叫了 error()forkJoin$ 的 subscription 就會立即結束並取得錯誤,整體的行為很類似 Promise.all()
  • 有一點可以留意的是,即使 forkJoin$ 的 subscription 是以為 error 而提前結束,每一個裡面的 observable 依然會執行的 teardown 的邏輯(這點可以留意 Subscription LifeCycle 的圖),透過這個方式,可以讓其中一個 request 失敗時,就取消所有其他的 observable。
  • Cold Observable
import { forkJoin } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const BASE_URL = 'https://random-data-api.com/api';

// 建立多個 observable
const randomName$ = ajax(`${BASE_URL}/name/random_name`);
const randomNation$ = ajax(`${BASE_URL}/nation/random_nation`);
const randomFood$ = ajax(`${BASE_URL}/food/random_food`);

// 使用 forkJoin 需等到所有 observable 都取得值時,才會 emit
const forkJoin$ = forkJoin([randomName$, randomNation$, randomFood$]);

const observer = {
next: ([name, nation, food]) => {
console.log(
`${name.response.first_name} is from ${nation.response.capital} and likes to eat ${food.response.dish}.`,
);
},
error: (err) => console.log(err),
complete: () => console.log('All done'),
};

const subscription = forkJoin$.subscribe(observer);

combineLatest

Observable 都有值之後,不論那個 Observable emit value,都會通知。

  • forkJoin 類似,combineLatest 也可以就接收 Observable[] 作為參數,差別在於 combineLatest 中,只要在每個 observable 都至少有一個 value 後,後續任一個 Observable 只要有 emit value 時,combineLatest 的 subscription 就可以立即收到值,並且以陣列的方式加以通知。
  • 每一個 observable 都至少要有 emit 過一次 value,才會觸發後續 combineLatest$ 的 emit value,所以從下圖中可以看到,當其中一個 observable 有 emit 1 時,combineLatest$ 還不會 emit,需要等到每個 observable 都至少 emit 過時,combineLatest$ 才會 emit,所以當 emit A 時,combineLatest$ 才會 emit 1A

combineLatest

圖片來源:RxJS Marbles

caution

要特別留意的是,combineLatest$ 中的任何 observable 有 emit value 就會發送通知的前提是:「每一個 observable 都至少要有 emit 過一次 value」。

程式碼範例

  • combineLatest$ subscription 要在 temperatureInputEvent$conversionInputEvent$ 都有之後才會 emit value
import { combineLatest, fromEvent } from 'rxjs';

/**
* Utils
*/
const temperatureConversion = (temperature: number, conversion: 'f-to-c' | 'c-to-f') => {
let result: number;
if (conversion === 'f-to-c') {
result = ((temperature - 32) * 5) / 9;
} else {
result = (temperature * 9) / 5 + 32;
}
return result;
};

const renderTemperature = (temperature: number, element: HTMLElement) => {
element.innerText = temperature.toFixed(2);
};

/**
* Select DOM Elements
*/
const temperatureInput = document.getElementById('temperature-input');
const conversionDropdown = document.getElementById('conversion-dropdown');
const resultText = document.getElementById('result-text');

/**
* Create Observables
*/
const temperatureInputEvent$ = fromEvent<InputEvent>(temperatureInput, 'input');
const conversionInputEvent$ = fromEvent<InputEvent>(conversionDropdown, 'input');

// 使用 combineLatest,當兩個 observable 都有值時
// 任一個 observable emit value 即會送出通知
const subscription = combineLatest([
temperatureInputEvent$,
conversionInputEvent$,
]).subscribe(([temperatureInputEvent, conversionInputEvent]) => {
const temperature = temperatureConversion(
temperatureInputEvent.target['value'],
conversionInputEvent.target['value'],
);

renderTemperature(temperature, resultText);
});

catchError & EMPTY

  • catchError 可以取得錯誤訊息後,回傳一個 Observable,這個 Observable emit 的 value 則會以 next 來通知。
  • 如果收到 error 後不希望收到任何值,可以使用 EMPTY 這個 Creation Function(Creation Operators)
  • 即使有 catchError 並回傳 fallback value,但只要 Observable 有 error 發生,該 subscription 依然會被 complete

rxjs catchError

圖片來源:RxJS 7 and Observables: Introduction @ Udemy

import { catchError, EMPTY, Observable, Observer, of } from 'rxjs';

const failingHttpRequest$ = new Observable((subscriber) => {
setTimeout(() => {
subscriber.error(new Error('Timeout 1500'));
}, 2000);

setInterval(() => {
subscriber.next(new Date().getTime());
}, 500);
});

const failingHttpRequestWithCatchError$ = failingHttpRequest$.pipe(
// catchError(err => observable$)
// catch 到 error 後,會回傳 Observable(inner observable)
// 該 Observable emit 的 value 會以 next 通知 subscriber
catchError((error) => of('Fallback value')),
);

// EMPTY 被訂閱時不會回傳任何內容,而是直接通知 complete
const failingHttpRequestWithMuteError$ = failingHttpRequest$.pipe(
catchError(() => EMPTY),
);

const observer: Observer<number> = {
next: (value) => console.log(value),
error: (error) => console.log(error),
complete: () => console.log('complete'),
};

const subscription = failingHttpRequestWithCatchError$.subscribe(observer);

Transformation Operators & Flattening Operators

Flattening Operators 有一個重點是,在這個 operator 內,還可以有 inner observable 所建立出來的 inner subscription,且這個 inner observable complete 時,不會終止外部原本的 subscription。

concatMapmergeMapswitchMapexhaustMap 的差別主要在於處理 concurrency 時的差異,也就是說,如果前一個 inner subscription 還沒 complete 的話,要以什麼「模式」來處理:

  • concatMap:一次一個,且會等到前一個執行完成(complete)後,才會執行下一個(即,以 queue 的方式進行)。好處是它可以確保 request 和 response 的順序,不會因為前一個 response 比較晚回來,而導致後面的 response 被比較早發出的 response 給覆蓋掉。由於前一個 subscription 沒有完成的話,將不會發動下一次的 subscription,所以如果有 memory leak 的風險是最容易被發現的。
  • switchMap:不會等前一個 inner subscription 完成,而是會取消前一個 subscription,並總是用最後一次 request 取得的 response。但要留意的是雖然它會取消前一次的 subscription,但並不代表 HTTP request 就不會被送出(因為請求已經發出),只是我們不理 會該次的 response。switchMap 的好處是在像 server 取得資料是,可以確保拿到的是最新一次的 response。
  • mergeMap:可以同時有多個 subscription 在運作(concurrently),且先回來的 response 就會通知到 main subscription,所以不會確保 request 對應到的 response 順序。如果開發者沒有確保每一個 inner subscription 都被 complete 的話,是最容易導致 memory leak 的。

範例程式碼

下面關於 concatMap、switchMap 和 mergeMap 的範例可以參考連結:

RxJS Flattening Operators @ pjchender stackblitz

concatMap

  • 每次點擊都會觸發,但會被排入 queue 不會馬上執行,需要等前一個 complete 後,才會執行下一個

concatMap

import { fromEvent, map, concatMap, tap, timer } from 'rxjs';

const randomNum = () => Math.floor(Math.random() * 2000);

let timerId = 0;

const fetchButton = document.querySelector('button#fetch');

const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => ({
timerId: timerId++,
timeout: randomNum(),
})),

// main subscription 每次收到點擊時都會被觸發
tap(({ timeout, timerId: _timerId }) =>
console.log(`receive click timerId: ${_timerId} (${timeout})`),
),

concatMap(({ timerId: _timerId, timeout: _timeout }) =>
// 當 concatMap 中的 observable(inner subscription)連續 emit value 時
// 會全部進入 cue 中,等前一個 complete 後才會開始執行下一個
timer(_timeout).pipe(
map(() => ({ _timerId, _timeout })),
tap({
next: (value) => console.log('[inner subscription] timerId: ', value),
complete: () => console.log('[inner subscription] complete'),
}),
),
),
);

const subscription = handleClick$.subscribe({
next: (value) => console.log('[main subscription] value: ', value),
error: (err) => console.log('err', err),
complete: () => console.log('[main subscription] complete'),
});

switchMap

switchMap

  • switchMap 內的 observable(inner subscription)被多次觸發時,前一次的如果還沒完成則會被 cancel
  • 但對於 main subscription 來說,是沒錯點擊時都會被觸發
import { fromEvent, map, concatMap, tap, timer, switchMap } from 'rxjs';

const randomNum = () => Math.floor(Math.random() * 2000);

let timerId = 0;

const fetchButton = document.querySelector('button#fetch');

const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => ({
timerId: timerId++,
timeout: randomNum(),
})),

// main subscription 每次收到點擊時都會被觸發
tap(({ timeout, timerId: _timerId }) =>
console.log(`receive click timerId: ${_timerId} (${timeout})`),
),

switchMap(({ timerId: _timerId, timeout: _timeout }) =>
// switchMap 中的 observable(inner subscription)連續 emit value 時
// 前一次的 timer 會被 cancel
timer(_timeout).pipe(
map(() => ({ _timerId, _timeout })),
tap({
next: (value) => console.log('[inner subscription] timerId: ', value),
complete: () => console.log('[inner subscription] complete'),
}),
),
),
);

const subscription = handleClick$.subscribe({
next: (value) => console.log('[main subscription] value: ', value),
error: (err) => console.log('err', err),
complete: () => console.log('[main subscription] complete'),
});

mergeMap

以併發的方式發出請求,點幾次就會發生幾次,且不會確保資料回來的順序,先完成的就先往後通知

mergeMap

import { fromEvent, map, concatMap, tap, timer, switchMap, mergeMap } from 'rxjs';

const randomNum = () => Math.floor(Math.random() * 2000);

let timerId = 0;

const fetchButton = document.querySelector('button#fetch');

const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => ({
timerId: timerId++,
timeout: randomNum(),
})),

// main subscription 每次收到點擊時都會被觸發
tap(({ timeout, timerId: _timerId }) =>
console.log(`receive click timerId: ${_timerId} (${timeout})`),
),

mergeMap(({ timerId: _timerId, timeout: _timeout }) =>
// 當 mergeMap 中的 observable(inner subscription)連續 emit value 時
// 會以併發的方式發出,且先 complete
timer(_timeout).pipe(
map(() => ({ _timerId, _timeout })),
tap({
next: (value) => console.log('[inner subscription] timerId: ', value),
complete: () => console.log('[inner subscription] complete'),
}),
),
),
);

const subscription = handleClick$.subscribe({
next: (value) => console.log('[main subscription] value: ', value),
error: (err) => console.log('err', err),
complete: () => console.log('[main subscription] complete'),
});

以實際的 API Request 做說明

當 concatMap 內部的 observable 結束時,它的 complete 通知只會在內部的 subscription,並不會傳到傳到外面(原本)的 subscription,如此原本的 subscription 才不會被終止。可以看到下圖中灰色的部分是屬於 concatMap 內的 observable,當該 observable complete 時,並不會傳到外部的 subscription:

Flattening Operators

然而,如果原本的 observable 發生 error 時,這個 error 的發生會終止 main(outer) subscription:

handle error in Flattening operators

如果我們希望在發生錯誤後,main subscription 還能持續運作的話,需要在 inner subscription 中使用 catchError 讓它在 inner subscription 被終止:

// concatMap(() => observable$)

const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => endpointInput.value),
concatMap((value) =>
ajax(`${BASE_URL}/${value}/random_${value}`).pipe(
// 如果我們希望在發生錯誤後,main subscription 還能持續運作
// 則要在 inner subscription 中使用 catchError 與 EMPTY
// 讓它在 inner subscription 被 complete 而不會把 error 傳到外層的 main subscription
catchError(() => EMPTY),
),
),
map((ajaxResp) => ajaxResp.response),
// catchError(() => EMPTY),
);

const subscription = handleClick$.subscribe({
next: console.log,
error: (err) => console.log('err', err),
complete: () => console.log('complete'),
});

如此,即使 ajax 發生錯誤後,main subscription 仍然可以持續運作。如下圖所示:

Flattening operators with catch error

然而,如果不是在 inner subscription 使用 catchError 而是在 main subscription 的話,當 error 發生時,main subscription 仍然會被終止:

const handleClick$ = fromEvent<MouseEvent>(fetchButton, 'click').pipe(
map(() => endpointInput.value),
concatMap((value) =>
ajax(`${BASE_URL}/${value}/random_${value}`).pipe(catchError(() => EMPTY)),
),
map((ajaxResp) => ajaxResp.response),
// 如果是在 main subscription 的話
// 當 error 發生時 EMPTY 會通知 complete 到 main subscription
// 進而終止 main subscription
catchError(() => EMPTY),
);

如圖:

Flattening operators

圖片來源:RxJS 7 and Observables: Introduction @ Udemy

import { concatMap, Observable, of, tap } from 'rxjs';

const source$ = new Observable((subscriber) => {
setTimeout(() => subscriber.next('A'), 1000);
setTimeout(() => subscriber.next('B'), 2000);
});

console.log('App has started');
source$
.pipe(
concatMap((value) =>
// here is an inner subscription
of(value).pipe(
tap({
next: (value) => console.log(`[inner] Value: ${value}`),
complete: () => console.log('[inner] Complete'),
}),
),
),
tap({
next: (value) => console.log(`Value: ${value}`),
complete: () => console.log('Complete'),
}),
)
.subscribe((value) => console.log(value));

Subject

透過 Subject,可以使用群播(multicasting)的方式,同時通知多個訂閱者。

Subject 同時會是 Observable 和 Observer:

  • Observable:它可以被訂閱
  • Observer:它可以呼叫 next()error()complete()

RxJS Subject

圖片來源:RxJS 7 and Observables: Introduction @ Udemy

範例程式碼

以下面的程式碼來說,概念上會是:

RxJS-Subject @ pjchender StackBlitz

  1. 建立一個 Subject
  2. Subject 是 Observer
    • 註冊 input Event,當 input 事件發生時會透過 next 通知 Subject(input event -> Subject)
    • Subject 在收到通知後,會再透過「它自己的 next」,把資料通知給它的 subscriber,但此時因為 Subject 還沒被訂閱,所以不會看到任何訊息
  3. Subject 是 Observable:
    • 註冊 subscribeButton 的 click event,當按鈕被點擊時,會新增一個 Subject 的 subscriber,這個 subscriber 收到 subject 通知時,會把資料 console 出來(Subject -> Subject's subscriber)
// https://stackblitz.com/edit/rxjs-jw-course-7d-subject-pvpevh?devtoolsheight=33&file=index.ts

import { fromEvent, Subject } from 'rxjs';
import { map } from 'rxjs/operators';

const emitButton = document.querySelector('button#emit');
const inputElement: HTMLInputElement = document.querySelector('#value-input');
const subscribeButton = document.querySelector('button#subscribe');

// STEP 1:建立 subject
const value$ = new Subject<string>();

/**
* STEP 2:監聽 inputElement 的 input event
**/
// 當 Input 事件觸發時,會通知 Subject
// Subject 收到通知後則會再透過 next 通知它的 subscribers
// 此時 Subject 是 Observer
const inputEventObserver = {
next: (event) => value$.next(event.target.value),
};
fromEvent(inputElement, 'input').subscribe(inputEventObserver);

// 2-1 由於 Subject 本身就是 observer,所以也可以把 Subject
// 直接當成參數傳入 .subscribe(),像是下面這樣
// fromEvent(inputElement, 'input')
// .pipe(map((event) => event.currentTarget['value']))
// .subscribe(value$);

/**
* STEP 3:為 Subject 添加訂閱者,此時 Subject 是 Observable
**/
// 3-3:定義 value$ 收到 next 通知時要做的事
const valueObserver = {
next: (value) => console.log('subscription get the value: ', value),
};

// 3-2:當 subscribeButton 被點擊時,訂閱 value$,並帶入 value$ 的 observer
const clickEventObserver = {
next: () => {
console.log('create a new subscription to the Subject');
value$.subscribe(valueObserver);
},
};
// 3-1:監聽 subscribeButton 的點擊事件
fromEvent(subscribeButton, 'click').subscribe(clickEventObserver);

由於 Subject 就是 Observer,所以也可以直接把它作為 observer 帶入 Observable 的參數:

// 原本
const inputEventObserver = {
next: (event) => value$.next(event.target.value),
};
fromEvent(inputElement, 'input').subscribe(inputEventObserver);

// 可以寫出
fromEvent(inputElement, 'input')
.pipe(map((event) => event.currentTarget['value']))
.subscribe(value$);

Behavior Subject

在一般的 Subject 中,新的訂閱者只會收到後續通知,但他並不會知道先前發生了什麼,可以想像成你加入了一個了聊天室,但是看不到歷史訊息,所以雖然你可以開始參與聊天,但你不知道先前的人在聊什麼話題;透過 Behavior Subject,他會紀錄最後一次 emit 的 value 在記憶體當中,所以當有新的訂閱產生時,Subject 會先把這個值通知給新的訂閱者,就好像是你加入聊天室後,會先給你一部分過去的歷史訊息一樣,如此你就能知道,前一段時間大家在聊些什麼。

具體來說,Behavior Subject 就是帶有 initial value 的 subject,但新的訂閱產生時,會先提供 initial value 給 subscriber:

Behavior Subject

圖片來源:RxJS 7 and Observables: Introduction @ Udemy

範例程式碼

RxJS Behavior Subject @ PJCHENder stackblitz

如果使用一般的 Subject 時,雖然可以達到狀態改變時就更新 UI,但因為 isLoggedIn$ 在第一次被訂閱時沒有初始值的緣故,所以使用者一進到頁面時會看同時看到 Login 和 Logout 的按鈕:

import { fromEvent, Subject } from 'rxjs';

const loggedInSpan: HTMLElement = document.querySelector('span#logged-in');
const loginButton: HTMLElement = document.querySelector('button#login');
const logoutButton: HTMLElement = document.querySelector('button#logout');
const printStateButton: HTMLElement = document.querySelector('button#print-state');

// STEP 1:建立 subject
const isLoggedIn$ = new Subject<boolean>();

// STEP 2:時間發生時通知 subject 改變
fromEvent(loginButton, 'click').subscribe(() => isLoggedIn$.next(true));
fromEvent(logoutButton, 'click').subscribe(() => isLoggedIn$.next(false));

// STEP 3:當 subject 的狀態改變時,要做的後續處理
isLoggedIn$.subscribe((isLoggedIn) => {
loginButton.style.display = !isLoggedIn ? 'block' : 'none';
logoutButton.style.display = isLoggedIn ? 'block' : 'none';
});

RxJs Subject

為了要讓使用者不會一進到頁面時就看到兩個按鈕,需要提供 subscriber 在一開始訂閱時的起始值,這時候可以使用 BehaviorSubject,方式很簡單:

import { BehaviorSubject } from 'rxjs';

// STEP 1:建立 BehaviorSubject,並且帶入預設值
const isLoggedIn$ = new BehaviorSubject<boolean>(false);

當我們要取得最新的 state 時,可以使用 isLoggedIn$.valueisLoggerIn$.getValue()

// STEP 4:點擊 Print state 時列印 state
fromEvent(printStateButton, 'click').subscribe(() => {
const isLoggedIn = isLoggedIn$.value; // 或 isLoggedIn$.value
console.log('isLoggedIn: ', isLoggedIn);
});

或者也可以用 combination operator 中的 withLatestForm 方式:

// STEP 4:點擊 Print state 時列印 state
fromEvent(printStateButton, 'click')
.pipe(withLatestFrom(isLoggedIn$))
.subscribe(([event, isLoggedIn]) => console.log('isLoggedIn: ', isLoggedIn));

withLatest

圖片來源:https://rxmarbles.com/#withLatestFrom

參考資料