Skip to main content

[course] 即使通訊與傳輸(realtime、streaming、websocket)

keywords: realtime, HTTP/2, streaming, pooling, websocket

本篇非原創文章,內容整理自 Complete Intro to Real-Time @ Frontend Masters

練習用的 repo 即完整程式碼 @ Gitlab。

Long Polling(輪詢)

Long polling 並沒有使用什麼特別的技術,單純透過 AJAX 的方式不斷向 server 發送請求。要留意的是這種方式會在一定時間內對 server 發送請求,當網站的使用者人數越來越多時,對 server 的負擔也會逐漸增加,甚至變成自己對自己 DDOS。

使用 setTimeout

另外,不要使用 setInterval 來做 Polling,原因在於 setInternal 並不會等待 API 的 response 回來後才重新計時,如果 API response 較慢的話,有可能會變成在一個時間區間內打多次 API 的情況。比較好的方式是使用 setTimeout,當收到 API 的 response 後,才用 setTimeout 開始下一次的計時。

範例程式碼:Polling with setTimeout

// https://btholt.github.io/complete-intro-to-realtime/settimeout
async function getNewMsgs() {
let json;
try {
const res = await fetch('/poll');
json = await res.json();
} catch (e) {
// back off code would go here
console.error('polling error', e);
}
allChat = json.msg;
render();
setTimeout(getNewMsgs, INTERVAL);
}

然而,setTimeout 有一個缺點是,即使使用者切換到了其他頁籤(tab),但沒有關閉原本網站的話,這個 polling 的動作有可能還是會在背景持續運作,並持續消耗 server 和 client 的資源。

requestAnimationFrame

這裡 Brian Holt 提供了另一個方式是使用 requestAnimationFrame,他認為比起 setTimeoutrequestAnimationFrame 比較不會造成畫面的阻塞,因為它會在瀏覽器有空檔(idle)時才去執行,不會干擾到 repaints。另外,一旦使用者離開了當前的頁籤(tab),瀏覽器會自動停止該頁面的 requestAnimationFrame 直到使用者切換回原本的頁籤(tab)後才繼續執行。

// https://btholt.github.io/complete-intro-to-realtime/requestanimationframe

let timeToMakeNextRequest = 0;
async function rafTimer(time) {
if (timeToMakeNextRequest <= time) {
await getNewMsgs();
timeToMakeNextRequest = time + INTERVAL;
}

requestAnimationFrame(rafTimer);
}

requestAnimationFrame(rafTimer);

Backoff and Retry(放棄或重試)

這裡要問的是,如果發送請求後 API 回傳錯誤的話,要怎麼辦?

一種是失敗時馬上再重試,但不斷的重試有可能會造成 server 更大的負擔,特別是 server 可能已經故障的情況,這時候比較好的是有實作 backoff and retry 的機制。

Backoff and Retry 的機制會是類似當第一次 API 請求失敗後,過 10 秒後重試,如果還是失敗,則過 1 分鐘後再重試,如果還是失敗,則 5 分鐘後重試,如果還是失敗,則...(retry 的時間越拉越久、backoff、或提供使用者手動更新)。

// Modify from https://btholt.github.io/complete-intro-to-realtime/backoff-and-retry

const handleRetry = (BACK_OFF_TIME = 1000) => {
let numberOfRetries = 0;
return {
onSuccess: () => (numberOfRetries = 0),
onFail: () => numberOfRetries++,
getElapseTimes: () => BACK_OFF_TIME * numberOfRetries,
};
};

let retryHandler = handleRetry();
async function getNewMsgs() {
let json;
try {
const res = await fetch('/poll');
json = await res.json();

if (res.statusText !== 'OK') {
throw new Error(`request did not succeed: ${res.statusText}`);
}

allChat = json.msg;
render();

// API 成功時
retryHandler.onSuccess();
} catch (err) {
console.error(err);

// API 失敗時
retryHandler.onFail();
}
}

let timeToMakeNextRequest = 0;
async function rafTimer(time) {
if (timeToMakeNextRequest <= time) {
await getNewMsgs();

// 取得要延遲等待的時間
timeToMakeNextRequest = time + INTERVAL + retryHandler.getElapseTimes();
}

requestAnimationFrame(rafTimer);
}

requestAnimationFrame(rafTimer);

Brian 建議可以直接使用第三方的 library,例如 Axios、useSWR 等,這些套件都內建了 backoff and retry 的機制在內。

HTTP/2 Push

Chat with HTTP/2 Push @ frontend master

Backend: Coding a HTTP/2 Push

server.js @ gitlab

雖然規格上沒有規定,但目前瀏覽器的實作上要做 HTTP/2 一定要有 SSL(HTTPs)。使用下述指令可以透過 openssl 產生 self-signed 的 certificates:

openssl req -new -newkey rsa:2048 -new -nodes -keyout key.pem -out csr.pem
openssl x509 -req -days 365 -in csr.pem -signkey key.pem -out server.crt

如此會產生對應的 certificates 和 key 可以給 server 使用:

import http2 from 'http2';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';

const __dirname = path.dirname(fileURLToPath(import.meta.url));

// 使用 node 提供的 https 來啟動 server,並且提供對應的憑證
const server = http2.createSecureServer({
cert: fs.readFileSync(path.join(__dirname, '/../server.crt')),
key: fs.readFileSync(path.join(__dirname, '/../key.pem')),
});

接著即可使用 server.on('stream', (stream, headers) => {/* ... */}) 來對 stream 進行操作:

  • 每一個 http2 server 進來的 request 除了會觸發 "request" 事件之外,也會觸發 "stream" 事件
  • 透過 server.on('stream', [callback]) 可以在其 callback 中取得 stream 物件,並將它保存起來後續使用
  • 透過 stream.respond() 可以回傳 client 當前與 server 的連線狀態
  • 透過 stream.write() 則可以以 stream 的方式將內容傳給 client
let connections = [];

// ...

server.on('stream', (stream, headers) => {
const path = headers[':path'];
const method = headers[':method'];

// 每一個 requst 進來時,預設就會有 stream 的連線,這裡我們只要 "GET /msgs"
if (path === '/msgs' && method === 'GET') {
// 假設一開始有三個 client 連結,但其中一個斷連後,
// Node 會重複使用這個 id 給新連進來的使用者
console.log('connected a stream ' + stream.id);

// immediately reply with 200 OK and the coding
stream.respond({
':status': 200,
// 雖然回傳的是 JSON,但因為是透過 stream 的方式傳送
// 所以每個封包回去的並不會是完整的一個 JSON
'content-type': 'text/plain; charset=utf-8',
});

// 對於新連進來的使用者回傳現有的 messages
stream.write(JSON.stringify({ msg: getMsgs() }));

// 將這個使用者的 stream 保存在 connections 陣列中
connections.push(stream);

stream.on('close', () => {
// remove the stream from the list of connections
connections = connections.filter((s) => s.id !== stream.id);
console.log('disconnected a stream ' + stream.id);
});
}
});
info

如果是透過此 self-signed certificates 來 serve 前端的畫面時,瀏覽器會出現警告提示,可以在該畫面輸入 "thisisunsafe" 表示自己知道這個危險,並繞過警告頁面。

在上面的程式碼中,我們只處理了使用者初次連進網站時,以 stream 的方式回傳現有的 messages 給使用者,但還沒處理但使用者發送新的 message 時,要廣播給所有的使用者。要做到這個功能,可以加上下述程式碼:

server.on('request', async (req, res) => {
const path = req.headers[':path'];
const method = req.headers[':method'];

if (path !== '/msgs') {
// handle the static assets ...
} else if (method === 'POST') {
// ...

// save the message in the memory of server
msg.push({
user,
text,
time: Date.now(),
});

res.end();

// broadcast the message to all clients
connections.forEach((stream) => {
stream.write(JSON.stringify({ msg: getMsgs() }));
});
}
});

Frontend: Coding a HTTP/2 Push

如果 BE 傳過來的是 streaming 的資料,那麼前端也需要用 streaming 的方式來處理:

  • 前端一樣可以用 fetch API 來處理,但收到 response 時,要使用 response.body.getReader() 的方法來開啟 readable stream 來做後續的處理
  • 使用 reader.read() 來讀取每一個 chunk 的資料
  • 接著即可將取得的 chunk 透過 decoder 來解析內容
async function getNewMsgs() {
let reader;

try {
// 使用 fetch 一樣可以處理 server 回傳的 stream
// 但不能直接使用 res.json(),而是要用 res.body.getReader() 的方法來處理 stream
const res = await fetch('/msgs');

// opening a readable stream
// 邏輯上這個 response 並不會結束,可以一直接受後端吐回來的內容
reader = res.body.getReader();
} catch (err) {
console.log('connection error', err);
}

// decode the response "ONCE"
const utf8Decoder = new TextDecoder('utf-8');
try {
// read the next chunk of data
readerResponse = await reader.read();

// decode the response
const chunk = utf8Decoder.decode(readerResponse.value, { stream: true });
console.log(chunk);
} catch (error) {
console.error('reader failed ', error);
return;
}
}

這時候當我們用瀏覽器的開發者工具檢視此 request 和 response 時,會看到「request is not finished yet」的字樣,並且在 preview 頁籤和 response 頁籤都看不到內容。

http/2 push

實際上,上面那段程式碼只會 decode 一次 server 從 stream 回傳的內容,我們可以使用 do...while loop 來持續處理 server 回傳的 streaming,只到 server 告知 streaming 已經結束後才停止。因此我們可以把程式碼改成:

async function getNewMsgs() {
let reader;

// ...same as above..

const utf8Decoder = new TextDecoder('utf-8');
// decode the response
// if the stream is not done, than keep decoding the response
do {
let readerResponse;
try {
// read the next chunk of data
readerResponse = await reader.read();
} catch (error) {
console.error('reader failed ', error);
return;
}

// parse the chunk data
const chunk = utf8Decoder.decode(readerResponse.value, { stream: true });
done = readerResponse.done; // 看 server 是否告知 streaming 已經結束
if (chunk) {
try {
console.log(chunk);
const json = JSON.parse(chunk);
allChat = json.msg;
render();
} catch (error) {
console.log('parse the response failed ', error);
}
}
} while (!done);
}

自己實作 WebSocket

WebSocket 和 HTTP 是兩種不同的通訊協定,但 WebSocket 在一開始會先透過 HTTP 來建立連線後,才升級成 WebSocket 這種通訊協定。

具體來說, client 會先向 server 發送一般的 HTTP GET:

GET /chat HTTP/1.1
Host: example.com:8000
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

當 server 收到請求後,如果 server 是有支援 websocket 的話,則會回傳:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

從上面 server 的 response 中可以看到,Server 會在 header 中回傳 Sec-WebSocket-Aceept,這個 header 的目的是確保雙方都認得 WebSocket 通訊協定。

Server 要產生 "Sec-WebSocket-Accept" 的方式,是需要先把 client 在 header 中的 Sec-WebSocket-Key 取出,然後將這個 key 和 "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"(magic string)接在一起後,進行 SHA1 的雜湊並以 base64 回傳,即可得到 "Sec-WebSocket-Accept"(參考 Writing WebSocket servers):

import crypto from 'crypto';

function generateAcceptValue(acceptKey) {
return (
crypto
.createHash('sha1')
// this magic string key is actually in the spec
.update(acceptKey + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', 'binary')
.digest('base64')
);
}
info

由於 WebSocket 在一開始仍需要靠 HTTP 來建立連線,因此 client 需要支援 HTTP 1.1 以上的版本,且必須是 GET 方法。

Frontend

Writing WebSocket client applications @ MDN

由於瀏覽器針對 WebSockets 已經實作好許多不同的功能,並且有 WebSocket 物件可以直接使用,因此前端的工比較小:

  • 使用 new WebSocket() 可以和 server 建立 WebSockets 連線
  • 事件
    • open: 當與 server 成功建立 WebSocket 連線後,會觸發 open 事件
    • message: 當 server 傳來資料時,會觸發 message 事件
  • 使用 ws.send() 可以將資料傳給 server
// client

// create a connection to websocket server
// ws:// 表示建立的是 WebSocket 的通訊協定
const ws = new WebSocket('ws://localhost:8080', ['json']);

// when the connection is opened
ws.addEventListener('open', () => {
console.log('connected');
presence.textContent = '🟢';
});

// every time socket receives a message
ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
allChat = data.msg;
render();
});

// when the connection is closed
ws.addEventListener('close', () => {
presence.textContent = '🔴';
});

// send message to the server
async function postNewMsg(user, text) {
const data = { user, text };
ws.send(JSON.stringify(data));
}

Backend: Writing WebSocket server

Writing WebSocket servers @ MDN

upgrading to WebSocket connection with client

  • 使用 server.on('upgrade', (req, socket) => {/* ... */}) 可以取得 socket

  • 方法

    • 使用 socket.write() 可以將資料傳給 client
    • 使用 socket.end() 可以中斷連線
  • 事件

    • data:使用 socket.on("data", (buffer) => {/* ... */}) 可以用來接收 client 傳來的資料
    • end:當連線中斷時會觸發 socket.on("end", [callback])
  • server 在收到 client 傳來 upgrade 的請求後

    • 根據 client header 中的 'sec-websocket-key' 來計算要回傳的 Sec-WebSocket-Accept

    • 最後要加上 \r\n 來告知這是 headers 的結尾

// server
import objToResponse from './obj-to-response.js';
import generateAcceptValue from './generate-accept-value.js';
import parseMessage from './parse-message.js';

// 將所有的 socket 連線保存在記憶體中
const connections = [];

// 當 server 收到要 upgrade 的請求
server.on('upgrade', (req, socket) => {
// 如果不是 websocket 請求,則回覆 bad request
if (req.headers['upgrade'] !== 'websocket') {
socket.end('HTTP/1.1 400 Bad Request');
return;
}

const acceptKey = req.headers['sec-websocket-key'];
const acceptValue = generateAcceptValue(acceptKey);
const headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: WebSocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${acceptValue}`,
`Sec-WebSocket-Protocol: json`,
'\r\n', // indicate that here is the end of header
];

socket.write(headers.join('\r\n'));

// When the client connects to the server at the first time,
// send all msgs back to the client
socket.write(objToResponse({ msg: getMsgs() }));

// save the "socket" of the user into the connects for future use
connections.push(socket);

// when receiving the message from the client
socket.on('data', (buffer) => {
const message = parseMessage(buffer);

if (message) {
// save the message in the memory
msg.push({
user: message.user,
text: message.text,
time: Date.now(),
});

// broadcast the message to all clients
connections.forEach((s) => {
s.write(objToResponse({ msg: getMsgs() }));
});
} else if (message === null) {
// the message will be "null" if the client is closing the connection
// so we close the connection with the client
socket.end();
}
});

// handle user disconnection
socket.on('end', () => {
connections = connections.filter((s) => s !== socket);
});
});

完成 client 和 server 間的溝通後,用瀏覽器的開發者工具即可看到成功建立了 WebSocket 的連線:

Exchanging data frames

接著要來處理實際的資料交換,在 WebSockets 中,每個傳輸的資料都稱作 frame,這些 frame 都有相同的格式(format),並且會以 XOR encryption 來進行 mask 的動作。在 MDN 的物件中詳細說明了如何解析 WebSocket 的 frame (Exchaning data frames)。

使用 Socket.IO

WebSockets 在 Node.js 中有兩個常用的套件,一個是 Socket.IO,另一個這是 ws。可以把 ws 想成是一個輕量的 WebSockets 套件,它提供的功能較為精簡,適合不需要複雜情境時使用;相較之下,Socket.IO 則提供了豐富的功能,可以管理多個 chat room 等。因此,應該要看專案的需要來選擇。

  • 即使 server 和 client 在過程中有斷線的情況,Socket.IO 也會自動幫忙處理重新連線的機制。
  • 如果 client 的瀏覽器不支援 WebSockets,Socket.IO 會自動切換成使用 polling 的機制。

FrontEnd

import { io } from 'socket.io-client';

const socket = io('http://localhost:8080');

socket.on('connect', () => {
console.log('connected');
presence.innerText = '🟢';
});

socket.on('disconnect', () => {
presence.innerText = '🔴';
});

// receive the data corresponding to the event name defined in the server
socket.on('msg:get', (data) => {
allChat = data.msg;
render();
});

async function postNewMsg(user, text) {
const data = { user, text };
socket.emit('msg:post', data);
}

BackEnd

import { Server } from 'socket.io';

// start socket server
const io = new Server(8080);

// when the socket connects
io.on('connection', (socket) => {
console.log(`connected: ${socket.id}`);

// emit a event to the client with data
socket.emit('msg:get', { msg: getMsgs() });

socket.on('msg:post', (data) => {
msg.push({
user: data.user,
text: data.text,
time: Date.now(),
});

// 這裡很重要的是,socket 指的是「某一個」與 server 連線的 client
// 如果是想要廣播讓所有 clients 收到,要用 io,因為 io 表示的是整個 server
io.emit('msg:get', { msg: getMsgs() });
});

socket.on('disconnect', () => {
console.log(`disconnected: ${socket.id}`);
});
});

資料來源