跳至主要内容

[Node] Streams and Pipes

在使用 Node 的過程中,除非有其他的需要,否則盡可能使用 async 的方法和使用 Stream,因為這才能善用到 Node 的效能。

Streams

Streams 是 Node.js 中將 input 轉換成 output 的資料處理方法,它可以有效率的處理檔案的讀寫、網路傳輸、或任何資料交換。

Streams 之所以這麼特別,主要是因為它不是一次將整個檔案讀進記憶體中,而是將檔案切成一個片段一個片段,如此再處理大檔的時候會變得非常強大

What makes streams unique, is that instead of a program reading a file into memory all at once like in the traditional way, streams read chunks of data piece by piece, processing its content without keeping it all in memory.

This makes streams really powerful when working with large amounts of data, for example, a file size can be larger than your free memory space, making it impossible to read the whole file into the memory in order to process it. That’s where streams come to the rescue!

img

當我們在 Node 中談論 Stream 的時候主要有兩種不同的狀況:

  • implementing the streams.
  • consuming the streams.

Event and Function About Stream

img

readable streams 中有幾個比較重要的事件,像是

  • data event, 每當 stream 傳遞一個 chunk 給使用者時會觸發(emit)。
  • end event, 當 stream 沒有更多的 data 要傳遞時會觸發(emit)。

writable streams 中有幾個比較重要的事件,像是

  • drain event, 表示 writable stream 可以接收更多的資料。
  • finish event,當所有的資料都被傳送到系統內時會觸發。

Readable Stream

img

包含pausedflowing 這兩個狀態(mode)。所有的 readable streams 預設都是以 paused mode 開始,當有需要的時候可以輕易地於 paused 和 flowing 間切換,而且這常常是自動發生的。我們也可以透過 resume()pause() 來手動切換。

當 readable stream 是在 paused mode 時,有需要的話我們可以使用 read() 這個方法來讀取 stream。

Example Code - 讀檔與寫檔

const fs = require('fs');
const path = require('path');

// 這是同步讀檔的方法,除非有特定需要,否則建議都是用非同步的方式。
const greet = fs.readFileSync(path.join(__dirname, 'greetr.txt'), 'utf8');

const readable = fs.createReadStream(path.join(__dirname, 'greetr.txt'), {
encoding: 'utf8',
highWaterMark: 20 * 1024, // 20kb as a buffer size, default is 64kb
});

const writable = fs.createWriteStream(path.join(__dirname, 'greetrcopy.txt'));

// a file will split into several chunks depends on the buffer size
readable.on('data', function (chunk) {
console.log(chunk);
writable.write(chunk);
});

Example Code - 將 Buffer 轉成 ReadableStream

const { Readable: ReadableStream } = require('stream');

const toReadableStream = (bufferData) =>
new ReadableStream({
read() {
this.push(bufferData);
this.push(null);
},
});

to-readable-stream @ github

Pipe

Source Code

pipe 這個方法最後會回傳 dest,如果這個 dest 是 Readable 的話,則可以用 chain 的方式繼續使用 pipe

//  source code: /lib/_stream_readable.
util.inherits(Readable, Stream);

Readable.prototype.pipe = function (dest, pipeOpts) {
// ...
return dest;
};

Example Code

const fs = require('fs');
const path = require('path');
const zlib = require('zlib');

const readable = fs.createReadStream(path.join(__dirname, 'greetr.txt'));
const writable = fs.createWriteStream(path.join(__dirname, 'greetrcopy.txt'));

// pipe readable data into writable data
readable.pipe(writable);

const gzip = zlib.createGzip();
const compressed = fs.createWriteStream(path.join(__dirname, 'greetrcopy.txt.gz'));

/**
* Because gzip is readable, it can pipe again. From Stream to Stream.
* The first pipe will transfer origin streams into gzip streams.
* The second pipe will save gzip stream as a file.
*/
readable.pipe(gzip).pipe(compressed);

參考資料