[course] Fundamentals of Backend Engineering
資料來源
Backend Communication Design Patterns
Long Polling
Server 收到請求後,過好一陣子才回傳結果給 Client
const app = require('express')();
const jobs = {};
// 建立一個 job 並回傳 job id
app.post('/submit', (req, res) => {
const jobId = `job:${Date.now()}`;
jobs[jobId] = 0;
updateJob(jobId, 0);
res.end('\n\n' + jobId + '\n\n');
});
app.get('/check-status', async (req, res) => {
console.log(jobs[req.query.jobId]);
// Long Polling 會一直等在這裡,有結果是才會給回應
while ((await checkJobComplete(req.query.jobId)) == false);
res.end('\n\nJobStatus: Complete ' + jobs[req.query.jobId] + '%\n\n');
});
app.listen(8080, () => console.log('listening on 8080'));
async function checkJobComplete(jobId) {
return new Promise((resolve, reject) => {
if (jobs[jobId] < 100) this.setTimeout(() => resolve(false), 1000);
else resolve(true);
});
}
function updateJob(jobId, prg) {
jobs[jobId] = prg;
console.log(`updated ${jobId} to ${prg}`);
if (prg == 100) return;
this.setTimeout(() => updateJob(jobId, prg + 10), 10000);
}
Server Send Events
Server 收到請求後,不結束這個 TCP 的連線,而是以串流的方式持續回傳事件給 Client。和 WebSocket 不同的地方是,它是單向 Server 對 Client 的傳送,因此特別適合用在狀態的更新、訊息通知等等:
提示
需要特別留意的是,Server Send Events 是基於 TCP 協議,所以它一樣會佔用一個連線數,如果使用的並非 HTTP/2 以上的版本,需要留意瀏覽器預設的連線數目上限。
Server
const app = require('express')();
app.get('/', (req, res) => res.send('hello!'));
app.get('/stream', (req, res) => {
// 回傳的 Content-Type 是 "text/event-stream"
res.setHeader('Content-Type', 'text/event-stream');
// 接著使用 res.write 來回拋事件
send(res);
});
let i = 0;
function send(res) {
res.write('data: ' + `hello from server ---- [${i++}]\n\n`);
setTimeout(() => send(res), 1000);
}
const port = process.env.PORT || 8888;
app.listen(port);
console.log(`Listening on ${port}`);
Client 端如果是 Browser 的話可以使用 EventSource API:
Client
let sse = new EventSource('http://localhost:8080/stream');
// 收到訊息時
sse.onmessage = (e) => {
console.log(e.data);
};
// 監聽有名稱的事件
sse.addEventListener('notice', (e) => {
console.log(e.data);
});
Pub/Sub
使用 RabbitMQ 做示範:
Publisher
// publisher.js
/* RabbitMQ */
const amqp = require('amqplib');
const msg = { number: process.argv[2] };
connect();
async function connect() {
try {
const amqpServer = 'amqp://localhost:5672';
const connection = await amqp.connect(amqpServer);
const channel = await connection.createChannel();
await channel.assertQueue('jobs');
await channel.sendToQueue('jobs', Buffer.from(JSON.stringify(msg)));
console.log(`Job sent successfully ${msg.number}`);
await channel.close();
await connection.close();
} catch (ex) {
console.error(ex);
}
}
Consumer
// consumer.js
const amqp = require('amqplib');
connect();
async function connect() {
try {
const amqpServer = 'amqp://localhost:5672';
const connection = await amqp.connect(amqpServer);
const channel = await connection.createChannel();
await channel.assertQueue('jobs');
channel.consume('jobs', (message) => {
const input = JSON.parse(message.content.toString());
console.log(`Received job with input ${input.number}`);
//"7" == 7 true
//"7" === 7 false
if (input.number == 7) channel.ack(message);
});
console.log('Waiting for messages...');
} catch (ex) {
console.error(ex);
}
}