[course] 即使通訊與傳輸(realtime、streaming、websocket)
keywords: realtime
, HTTP/2
, streaming
, pooling
, websocket
本篇非原創文章,內容整理自 Complete Intro to Real-Time @ Frontend Masters
練習用的 repo 即完整程式碼 @ Gitlab。
Long Polling(輪詢)
Long polling 並沒有使用什麼特別的技術,單純透過 AJAX 的方式不斷向 server 發送請求。要留意的是這種方式會在一定時間內 對 server 發送請求,當網站的使用者人數越來越多時,對 server 的負擔也會逐漸增加,甚至變成自己對自己 DDOS。
使用 setTimeout
另外,不要使用 setInterval
來做 Polling,原因在於 setInternal 並不會等待 API 的 response 回來後才重新計時,如果 API response 較慢的話,有可能會變成在一個時間區間內打多次 API 的情況。比較好的方式是使用 setTimeout
,當收到 API 的 response 後,才用 setTimeout 開始下一次的計時。
範例程式碼:Polling with setTimeout
// https://btholt.github.io/complete-intro-to-realtime/settimeout
async function getNewMsgs() {
let json;
try {
const res = await fetch('/poll');
json = await res.json();
} catch (e) {
// back off code would go here
console.error('polling error', e);
}
allChat = json.msg;
render();
setTimeout(getNewMsgs, INTERVAL);
}
然而,setTimeout
有一個缺點是,即使使用者切換到了其他頁籤(tab),但沒有關閉原本網站的話,這個 polling 的動作有可能還是會在背景持續 運作,並持續消耗 server 和 client 的資源。
requestAnimationFrame
這裡 Brian Holt 提供了另一個方式是使用 requestAnimationFrame
,他認為比起 setTimeout
,requestAnimationFrame
比較不會造成畫面的阻塞,因為它會在瀏覽器有空檔(idle)時才去執行,不會干擾到 repaints。另外,一旦使用者離開了當前的頁籤(tab),瀏覽器會自動停止該頁面的 requestAnimationFrame
直到使用者切換回原本的頁籤(tab)後才繼續執行。
// https://btholt.github.io/complete-intro-to-realtime/requestanimationframe
let timeToMakeNextRequest = 0;
async function rafTimer(time) {
if (timeToMakeNextRequest <= time) {
await getNewMsgs();
timeToMakeNextRequest = time + INTERVAL;
}
requestAnimationFrame(rafTimer);
}
requestAnimationFrame(rafTimer);
Backoff and Retry(放棄或重試)
這裡要問的是,如果發送請求後 API 回傳錯誤的話,要怎麼辦?
一種是失敗時馬上再重試,但不斷的重試有可能會造成 server 更大的負擔,特別是 server 可能已經故障的情況,這時候比較好的是有實作 backoff and retry 的機制。
Backoff and Retry 的機制會是類似當第一次 API 請求失敗後,過 10 秒後重試,如果還是失敗,則過 1 分鐘後再重試,如果還是失敗,則 5 分鐘後重試,如果還是失敗,則...(retry 的時間越拉越久、backoff、或提供使用者手動更新)。
// Modify from https://btholt.github.io/complete-intro-to-realtime/backoff-and-retry
const handleRetry = (BACK_OFF_TIME = 1000) => {
let numberOfRetries = 0;
return {
onSuccess: () => (numberOfRetries = 0),
onFail: () => numberOfRetries++,
getElapseTimes: () => BACK_OFF_TIME * numberOfRetries,
};
};
let retryHandler = handleRetry();
async function getNewMsgs() {
let json;
try {
const res = await fetch('/poll');
json = await res.json();
if (res.statusText !== 'OK') {
throw new Error(`request did not succeed: ${res.statusText}`);
}
allChat = json.msg;
render();
// API 成功時
retryHandler.onSuccess();
} catch (err) {
console.error(err);
// API 失敗時
retryHandler.onFail();
}
}
let timeToMakeNextRequest = 0;
async function rafTimer(time) {
if (timeToMakeNextRequest <= time) {
await getNewMsgs();
// 取得要延遲等待的時間
timeToMakeNextRequest = time + INTERVAL + retryHandler.getElapseTimes();
}
requestAnimationFrame(rafTimer);
}
requestAnimationFrame(rafTimer);
Brian 建議可以直接使用第三方的 library,例如 Axios、useSWR 等,這些套件都內建了 backoff and retry 的機制在內。
HTTP/2 Push
Chat with HTTP/2 Push @ frontend master
Backend: Coding a HTTP/2 Push
server.js @ gitlab
雖然規格上沒有規定,但目前瀏覽器的實作上要做 HTTP/2 一定要有 SSL(HTTPs)。使用下述指令可以透過 openssl 產生 self-signed 的 certificates:
openssl req -new -newkey rsa:2048 -new -nodes -keyout key.pem -out csr.pem
openssl x509 -req -days 365 -in csr.pem -signkey key.pem -out server.crt
如此會產生對應的 certificates 和 key 可以給 server 使用:
import http2 from 'http2';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
// 使用 node 提供的 https 來啟動 server,並且提供對應的憑證
const server = http2.createSecureServer({
cert: fs.readFileSync(path.join(__dirname, '/../server.crt')),
key: fs.readFileSync(path.join(__dirname, '/../key.pem')),
});
接著即可使用 server.on('stream', (stream, headers) => {/* ... */})
來對 stream 進行操作:
- 每一個 http2 server 進來的 request 除了會觸發 "request" 事件之外,也會觸發 "stream" 事件
- 透過
server.on('stream', [callback])
可以在其 callback 中取得 stream 物件,並將它保存起來後續使用 - 透過
stream.respond()
可以回傳 client 當前與 server 的連線狀態 - 透過
stream.write()
則可以以 stream 的方式將內容傳給 client
let connections = [];
// ...
server.on('stream', (stream, headers) => {
const path = headers[':path'];
const method = headers[':method'];
// 每一個 requst 進來時,預設就會有 stream 的連線,這裡我們只要 "GET /msgs"
if (path === '/msgs' && method === 'GET') {
// 假設一開始有三個 client 連結,但其中一個斷連後,
// Node 會重複使用這個 id 給新連進來的使用者
console.log('connected a stream ' + stream.id);
// immediately reply with 200 OK and the coding
stream.respond({
':status': 200,
// 雖然回傳的是 JSON,但因為是透過 stream 的方式傳送
// 所以每個封包回去的並不會是完整的一個 JSON
'content-type': 'text/plain; charset=utf-8',
});
// 對於新連進來的使用者回傳現有的 messages
stream.write(JSON.stringify({ msg: getMsgs() }));
// 將這個使用者的 stream 保存在 connections 陣列中
connections.push(stream);
stream.on('close', () => {
// remove the stream from the list of connections
connections = connections.filter((s) => s.id !== stream.id);
console.log('disconnected a stream ' + stream.id);
});
}
});
如果是透過此 self-signed certificates 來 serve 前端的畫面時,瀏覽器會出現警告提示,可以在該畫面輸入 "thisisunsafe"
表示自己知道這個危險,並繞 過警告頁面。
在上面的程式碼中,我們只處理了使用者初次連進網站時,以 stream 的方式回傳現有的 messages 給使用者,但還沒處理但使用者發送新的 message 時,要廣播給所有的使用者。要做到這個功能,可以加上下述程式碼:
server.on('request', async (req, res) => {
const path = req.headers[':path'];
const method = req.headers[':method'];
if (path !== '/msgs') {
// handle the static assets ...
} else if (method === 'POST') {
// ...
// save the message in the memory of server
msg.push({
user,
text,
time: Date.now(),
});
res.end();
// broadcast the message to all clients
connections.forEach((stream) => {
stream.write(JSON.stringify({ msg: getMsgs() }));
});
}
});
Frontend: Coding a HTTP/2 Push
- http2-chat.js @ gitlab
- response.body.getReader() @ MDN
如果 BE 傳過來的是 streaming 的資料,那麼前端也需要用 streaming 的方式來處理:
- 前端一樣可以用 fetch API 來處理,但收到 response 時,要使用
response.body.getReader()
的方法來開啟 readable stream 來做後續的處理 - 使用
reader.read()
來讀取每一個 chunk 的資料 - 接著即可將取得的 chunk 透過 decoder 來解析內容
async function getNewMsgs() {
let reader;
try {
// 使用 fetch 一樣可以處理 server 回傳的 stream
// 但不能直接使用 res.json(),而是要用 res.body.getReader() 的方法來處理 stream
const res = await fetch('/msgs');
// opening a readable stream
// 邏輯上這個 response 並不會結束,可以一直接受後端吐回來的內容
reader = res.body.getReader();
} catch (err) {
console.log('connection error', err);
}
// decode the response "ONCE"
const utf8Decoder = new TextDecoder('utf-8');
try {
// read the next chunk of data
readerResponse = await reader.read();
// decode the response
const chunk = utf8Decoder.decode(readerResponse.value, { stream: true });
console.log(chunk);
} catch (error) {
console.error('reader failed ', error);
return;
}
}
這時候當我們用瀏覽器的開發者工具檢視此 request 和 response 時,會看到「request is not finished yet」的字樣,並且在 preview 頁籤和 response 頁籤都看不到內容。
實際上,上面那段程式碼只會 decode 一次 server 從 stream 回傳的內容,我們可以使用 do...while loop 來持續處理 server 回傳的 streaming,只到 server 告知 streaming 已經結束後才停止。因此我們可以把程式碼改成:
async function getNewMsgs() {
let reader;
// ...same as above..
const utf8Decoder = new TextDecoder('utf-8');
// decode the response
// if the stream is not done, than keep decoding the response
do {
let readerResponse;
try {
// read the next chunk of data
readerResponse = await reader.read();
} catch (error) {
console.error('reader failed ', error);
return;
}
// parse the chunk data
const chunk = utf8Decoder.decode(readerResponse.value, { stream: true });
done = readerResponse.done; // 看 server 是否告知 streaming 已經結束
if (chunk) {
try {
console.log(chunk);
const json = JSON.parse(chunk);
allChat = json.msg;
render();
} catch (error) {
console.log('parse the response failed ', error);
}
}
} while (!done);
}
自己實作 WebSocket
- WebSocket @ MDN
- WebSockets Backend @ Complete Intro to Realtime
WebSocket 和 HTTP 是兩種不同的通訊協定,但 WebSocket 在一開始會先透過 HTTP 來建立連線後,才升級成 WebSocket 這種通訊協定。
具體來說, client 會先向 server 發送一般的 HTTP GET:
GET /chat HTTP/1.1
Host: example.com:8000
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
當 server 收到請求後,如果 server 是有支援 websocket 的話,則會回傳:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
從上面 server 的 response 中可以看到,Server 會在 header 中回傳 Sec-WebSocket-Aceept
,這個 header 的目的是確保雙方都認得 WebSocket 通訊協定。
Server 要產生 "Sec-WebSocket-Accept" 的方式,是需要先把 client 在 header 中的 Sec-WebSocket-Key
取出,然後將這個 key 和 "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
(magic string)接在一起後,進行 SHA1 的雜湊並以 base64 回傳,即可得到 "Sec-WebSocket-Accept"(參考 Writing WebSocket servers):
import crypto from 'crypto';
function generateAcceptValue(acceptKey) {
return (
crypto
.createHash('sha1')
// this magic string key is actually in the spec
.update(acceptKey + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', 'binary')
.digest('base64')
);
}
由於 WebSocket 在一開始仍需要靠 HTTP 來建立連線,因此 client 需要支援 HTTP 1.1 以上的版本,且必須是 GET 方法。
Frontend
由於瀏覽器針對 WebSockets 已經實作好許多不同的功能,並且有 WebSocket 物件可以直接使用,因此前端的工比較小:
- 使用
new WebSocket()
可以和 server 建立 WebSockets 連線 - 事件
- open: 當與 server 成功建立 WebSocket 連線後,會觸發
open
事件 - message: 當 server 傳來資料時,會觸發
message
事件
- open: 當與 server 成功建立 WebSocket 連線後,會觸發
- 使用
ws.send()
可以將資料傳給 server
// client
// create a connection to websocket server
// ws:// 表示建立的是 WebSocket 的通訊協定
const ws = new WebSocket('ws://localhost:8080', ['json']);
// when the connection is opened
ws.addEventListener('open', () => {
console.log('connected');
presence.textContent = '🟢';
});
// every time socket receives a message
ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
allChat = data.msg;
render();
});
// when the connection is closed
ws.addEventListener('close', () => {
presence.textContent = '🔴';
});
// send message to the server
async function postNewMsg(user, text) {
const data = { user, text };
ws.send(JSON.stringify(data));
}
Backend: Writing WebSocket server
upgrading to WebSocket connection with client
-
使用
server.on('upgrade', (req, socket) => {/* ... */})
可以取得 socket -
方法
- 使用
socket.write()
可以將資料傳給 client - 使用
socket.end()
可以中斷連線
- 使用
-
事件
- data:使用
socket.on("data", (buffer) => {/* ... */})
可以用來接收 client 傳來的資料 - end:當連線中斷時會觸發
socket.on("end", [callback])
- data:使用
-
server 在收到 client 傳來 upgrade 的請求後
-
根據 client header 中的
'sec-websocket-key'
來計算要回傳的Sec-WebSocket-Accept
-
最後要加上
\r\n
來告知這是 headers 的結尾
-
// server
import objToResponse from './obj-to-response.js';
import generateAcceptValue from './generate-accept-value.js';
import parseMessage from './parse-message.js';
// 將所有的 socket 連線保存在記憶體中
const connections = [];
// 當 server 收到要 upgrade 的請求
server.on('upgrade', (req, socket) => {
// 如果不是 websocket 請求,則回覆 bad request
if (req.headers['upgrade'] !== 'websocket') {
socket.end('HTTP/1.1 400 Bad Request');
return;
}
const acceptKey = req.headers['sec-websocket-key'];
const acceptValue = generateAcceptValue(acceptKey);
const headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: WebSocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${acceptValue}`,
`Sec-WebSocket-Protocol: json`,
'\r\n', // indicate that here is the end of header
];
socket.write(headers.join('\r\n'));
// When the client connects to the server at the first time,
// send all msgs back to the client
socket.write(objToResponse({ msg: getMsgs() }));
// save the "socket" of the user into the connects for future use
connections.push(socket);
// when receiving the message from the client
socket.on('data', (buffer) => {
const message = parseMessage(buffer);
if (message) {
// save the message in the memory
msg.push({
user: message.user,
text: message.text,
time: Date.now(),
});
// broadcast the message to all clients
connections.forEach((s) => {
s.write(objToResponse({ msg: getMsgs() }));
});
} else if (message === null) {
// the message will be "null" if the client is closing the connection
// so we close the connection with the client
socket.end();
}
});
// handle user disconnection
socket.on('end', () => {
connections = connections.filter((s) => s !== socket);
});
});
完成 client 和 server 間的溝通後,用瀏覽器的開發者工具即可看到成功建立了 WebSocket 的連線:
Exchanging data frames
接著要來處理實際的資料交換,在 WebSockets 中,每個傳輸的資料都稱作 frame,這些 frame 都有相同的格式(format),並且會以 XOR encryption 來進行 mask 的動作。在 MDN 的物件中詳細說明了如何解析 WebSocket 的 frame (Exchaning data frames)。
使用 Socket.IO
- Intro to Socket.IO @ Complete Intro to Realtime
WebSockets 在 Node.js 中有兩個常用的套件,一個是 Socket.IO,另一個這是 ws。可以把 ws 想成是一個輕量的 WebSockets 套件,它提供的功能較為精簡,適合不需要複 雜情境時使用;相較之下,Socket.IO 則提供了豐富的功能,可以管理多個 chat room 等。因此,應該要看專案的需要來選擇。
- 即使 server 和 client 在過程中有斷線的情況,Socket.IO 也會自動幫忙處理重新連線的機制。
- 如果 client 的瀏覽器不支援 WebSockets,Socket.IO 會自動切換成使用 polling 的機制。
FrontEnd
import { io } from 'socket.io-client';
const socket = io('http://localhost:8080');
socket.on('connect', () => {
console.log('connected');
presence.innerText = '🟢';
});
socket.on('disconnect', () => {
presence.innerText = '🔴';
});
// receive the data corresponding to the event name defined in the server
socket.on('msg:get', (data) => {
allChat = data.msg;
render();
});
async function postNewMsg(user, text) {
const data = { user, text };
socket.emit('msg:post', data);
}
BackEnd
import { Server } from 'socket.io';
// start socket server
const io = new Server(8080);
// when the socket connects
io.on('connection', (socket) => {
console.log(`connected: ${socket.id}`);
// emit a event to the client with data
socket.emit('msg:get', { msg: getMsgs() });
socket.on('msg:post', (data) => {
msg.push({
user: data.user,
text: data.text,
time: Date.now(),
});
// 這裡很重要的是,socket 指的是「某一個」與 server 連線的 client
// 如果是想要廣播讓所有 clients 收到,要用 io,因為 io 表示的是整個 server
io.emit('msg:get', { msg: getMsgs() });
});
socket.on('disconnect', () => {
console.log(`disconnected: ${socket.id}`);
});
});
資料來源
- Complete Intro to Real-Time @ frontend-masters