跳至主要内容

[course] Fundamentals of Backend Engineering

Backend Communication Design Patterns

Long Polling

Server 收到請求後,過好一陣子才回傳結果給 Client

const app = require('express')();
const jobs = {};

// 建立一個 job 並回傳 job id
app.post('/submit', (req, res) => {
const jobId = `job:${Date.now()}`;
jobs[jobId] = 0;
updateJob(jobId, 0);
res.end('\n\n' + jobId + '\n\n');
});

app.get('/check-status', async (req, res) => {
console.log(jobs[req.query.jobId]);
// Long Polling 會一直等在這裡,有結果是才會給回應
while ((await checkJobComplete(req.query.jobId)) == false);
res.end('\n\nJobStatus: Complete ' + jobs[req.query.jobId] + '%\n\n');
});

app.listen(8080, () => console.log('listening on 8080'));

async function checkJobComplete(jobId) {
return new Promise((resolve, reject) => {
if (jobs[jobId] < 100) this.setTimeout(() => resolve(false), 1000);
else resolve(true);
});
}

function updateJob(jobId, prg) {
jobs[jobId] = prg;
console.log(`updated ${jobId} to ${prg}`);
if (prg == 100) return;
this.setTimeout(() => updateJob(jobId, prg + 10), 10000);
}

Server Send Events

Server 收到請求後,不結束這個 TCP 的連線,而是以串流的方式持續回傳事件給 Client。和 WebSocket 不同的地方是,它是單向 Server 對 Client 的傳送,因此特別適合用在狀態的更新、訊息通知等等:

提示

需要特別留意的是,Server Send Events 是基於 TCP 協議,所以它一樣會佔用一個連線數,如果使用的並非 HTTP/2 以上的版本,需要留意瀏覽器預設的連線數目上限。

Server
const app = require('express')();

app.get('/', (req, res) => res.send('hello!'));

app.get('/stream', (req, res) => {
// 回傳的 Content-Type 是 "text/event-stream"
res.setHeader('Content-Type', 'text/event-stream');

// 接著使用 res.write 來回拋事件
send(res);
});

let i = 0;
function send(res) {
res.write('data: ' + `hello from server ---- [${i++}]\n\n`);
setTimeout(() => send(res), 1000);
}

const port = process.env.PORT || 8888;
app.listen(port);
console.log(`Listening on ${port}`);

Client 端如果是 Browser 的話可以使用 EventSource API

Client
let sse = new EventSource('http://localhost:8080/stream');

// 收到訊息時
sse.onmessage = (e) => {
console.log(e.data);
};

// 監聽有名稱的事件
sse.addEventListener('notice', (e) => {
console.log(e.data);
});

Pub/Sub

使用 RabbitMQ 做示範:

Publisher
// publisher.js
/* RabbitMQ */
const amqp = require('amqplib');

const msg = { number: process.argv[2] };

connect();

async function connect() {
try {
const amqpServer = 'amqp://localhost:5672';
const connection = await amqp.connect(amqpServer);
const channel = await connection.createChannel();

await channel.assertQueue('jobs');
await channel.sendToQueue('jobs', Buffer.from(JSON.stringify(msg)));
console.log(`Job sent successfully ${msg.number}`);
await channel.close();
await connection.close();
} catch (ex) {
console.error(ex);
}
}
Consumer
// consumer.js
const amqp = require('amqplib');

connect();
async function connect() {
try {
const amqpServer = 'amqp://localhost:5672';
const connection = await amqp.connect(amqpServer);
const channel = await connection.createChannel();
await channel.assertQueue('jobs');

channel.consume('jobs', (message) => {
const input = JSON.parse(message.content.toString());
console.log(`Received job with input ${input.number}`);
//"7" == 7 true
//"7" === 7 false

if (input.number == 7) channel.ack(message);
});

console.log('Waiting for messages...');
} catch (ex) {
console.error(ex);
}
}