5장 Stream API (2/3) - Node.js의 4가지 스트림 소개와 사용법

이 글은 Node.js 디자인 패턴 CH 05 스트림 코딩의 일부를 참고해서 작성하였으며, Node.js에서 코어 모듈로 제공하는 Stream 4종류를 다룬다. Node.js에서의 스트림 자체에 대해서는 5장 Stream API (1/3) - 스트림 개요 및 Readable Stream를 참고하라.


Node.js 스트림 객체

Node.js에서는 4가지의 추상 스트림 클래스를 제공하여 쉽게 스트림을 구현할 수 있게 한다. 이 클래스들은 core 모듈에서 제공하므로 추가 의존성이 필요하지 않다.

Name 목적 dataSource 가능
stream.Readable 외부 데이터 읽기 (dataSource에서 꺼내는 형태) True
stream.Writable 내부 데이터 외부로 전송하기 (dataSource로 써주는 형태) False
stream.Duplex Readable + Writable 스트림. True
stream.Transform 외부 데이터 읽기 => 데이터 변조하기 => 외부로 전송하기 True

Node.js의 두 버전의 스트림 API

Node.js에는 두 가지의 Stream API가 있다.

API Version Name Event Name Description
Stream v1 Flowing Mode on('data') 무조건 해당 데이터를 처리해야 함. 버퍼 크기 등의 문제로 처리하지 못 하는 경우 해당 데이터를 되살릴 방법이 없음.
Stream v2 Non-Flowing Mode on('readable') 곧바로 데이터를 처리하지 않아도 됨. 백 프래셔를 지원함.

Back Pressure: Event 송신자의 처리량이 Event를 수신하는 측의 처리량을 넘기는 경우 송신자의 전송 속도를 줄여야 하는 경우가 생기는 데 이를 해결하는 메커니즘을 Back Pressure라고 한다.

송신자-수신자 처리량 차이 발생 오류 백프래셔 필요
송신자 전송량 < 수신자 처리량 없음 False
송신자 전송량 > 수신자 처리량 처리하지 못하는 데이터에 대한 정의되지 않은 동작 등 손실 발생 가능 True

Stream v2의 백 프래셔:
Node.js의 버퍼가 알아서 버퍼링을 해주며, 버퍼 한계치를 넘으면 OS에서 패킷을 drop시켜 sender 입장에서 전송 속도를 늦추게 함. 이 기능을 자동으로 지원. (v1도 가능하다고 함. 다만 더 어렵다고 함.)

출처: What are the differences between readable and data event of process.stdin stream?

추가 참고:

[RxJava2]Flowable에서의 Backpressure

[RxJava2]Backpressure와 함께 Flowable를 만들어 보자

1. Readable 스트림

Readable 스트림은 데이터를 읽어들이는 게 목적이다.

1-1. 사용 예시:

stream.read() 함수를 사용하면 chunk를 반환한다.

1
2
3
4
5
6
7
8
9
10
11
const RandomStream = require("./randomStream");
const randomStream = new RandomStream();

// readable 스트림:
// stream.read() 로 내용을 읽는 것을 의미한다.
randomStream.on("readable", () => {
let chunk;
while ((chunk = randomStream.read()) !== null) {
console.log(`Chunk received: ${chunk.toString()}`);
}
});

1-2. Readable 구현 코드

Readable Stream은 _read 함수를 구현하면 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const stream = require('stream'); // 코어 모듈 (stream)
const chance = new require('chance')(); // 랜덤 (외부 의존성)

// [1] Readable 구현체
class RandomStream extends stream.Readable {
constructor(options) {
super(options);
}

// Readable에서 구현해야 하는 함수는 _read 하나임
//
_read(size) {
const chunk = chance.string(); //[1] 랜덤값 생성
console.log(`Pushing chunk of size: ${chunk.length}`);
this.push(chunk, 'utf8'); //[2] Encoding을 설정하면 String으로 읽음
if(chance.bool({likelihood: 5})) { //[3] null을 보내면 종료하기로 약속함
this.push(null);
}
}
}

// [2] 스트림 사용
const randomStream = new RandomStream();

randomStream.on('readable', () => {
let chunk;
while((chunk = randomStream.read()) !== null) { // [1] 약속한대로 null 이면 읽기 종료
console.log(`Chunk received: ${chunk.toString()}`);
}
});

2. Writable 스트림

Writable 스트림은 데이터를 생성하는 게 목적이다. (ex) HTTP response 생성

2-1. 사용 예시

stream.write 함수를 사용하면 스트림에 내용을 쓸 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// res가 Writable Stream 이다 :)
// "Writable Stream은 데이터의 목적지를 나타낸다"는 뜻은
// stream.write(내용) 을 쓰면 해당 stream으로 전달된다는 뜻이다.
// 전송하는 입장에선 writable이지만, 받는 입장에선 readable로 취급하면 쓰기, 읽기가 각각 되는 것이다.
require("http")
.createServer((req, res) => {
res.writeHead(200, { "Content-Type": "text/plain" });
while (chance.bool({ likelihood: 95 })) { // 5% 확률로 루프 빠져나오는 코드.
res.write(chance.string() + "\n");
}
res.end("\nThe end...\n");
res.on("finish", () => console.log("All data was sent")); // 스트림에 finish 이벤트 리스너 등록 후 종료
})
.listen(8080, () => console.log("Listening on http://localhost:8080"));

2-2. Writable 구현 코드

(윗 코드와는 상관 없음.) Writable Stream은 _write 함수를 구현하면 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ToFileStream extends stream.Writable {
constructor() {
super({objectMode: true});
}

_write (chunk, encoding, callback) {
mkdirp(path.dirname(chunk.path), err => {
if (err) {
return callback(err);
}
fs.writeFile(chunk.path, chunk.content, callback);
});
}
}

2-3. 백 프래셔 예제

백 프래셔란 Read보다 Write가 빠를 때 병목이 생기는 것을 방지하는 메커니즘이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
require("http")
.createServer((req, res) => {
res.writeHead(200, { "Content-Type": "text/plain" });

function generateMore() {
while (chance.bool({ likelihood: 95 })) {
const shouldContinue = res.write(
// res.write가 Falsy를 반환하면 내부 버퍼를 다 사용한 것
// (자세한 내용은 나중에 포스팅할 예정)
chance.string({ length: 16 * 1024 - 1 })
);
if (!shouldContinue) {
console.log("Backpressure"); // 백 프래셔를 수행해야 하는 시점
return res.once("drain", generateMore); // once로 drain 이벤트 핸들러를 등록해 재시작 대기
}
}
res.end("\nThe end...\n", () => console.log("All data was sent"));
}
generateMore();
})
.listen(8080, () => console.log("Listening on http://localhost:8080"));


3. Duplex Stream

Duplex Stream은 Readable + Writable 그 이상 그 이하도 아니며 따라서 설명을 생략한다.


4. Transform Stream

Transform 스트림은 읽어들인 데이터를 변조해 내보내는 스트림이다. 스트림이니만큼 chunk 단위로 데이터가 오므로 변환에 유의해야 한다.

4-1. 사용 예시

1
2
3
4
5
6
7
8
9
const ReplaceStream = require("./replaceStream");

const rs = new ReplaceStream("World", "Node.js");
rs.on("data", (chunk) => console.log(chunk.toString()));

rs.write("Hello W");
rs.write("orld!");
rs.end();
// Hello Node.js

4-2. Transform 구현 코드

스트림 상에서 문자열 일부를 치환하는 코드이다. (어렵다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ReplaceStream extends stream.Transform {
constructor(searchString, replaceString) {
super();
this.searchString = searchString;
this.replaceString = replaceString;
this.tailPiece = '';
}

_transform(chunk, encoding, callback) {
const pieces = (this.tailPiece + chunk)
.split(this.searchString);
const lastPiece = pieces[pieces.length - 1];
const tailPieceLen = this.searchString.length - 1;

this.tailPiece = lastPiece.slice(-tailPieceLen);
pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

this.push(pieces.join(this.replaceString));
// 여기서의 callback은 각 chunk의 처리가 완료됐음을 알리는 함수이다.
callback();
}

// 여기서의 callback은 스트림을 종료시키는 함수이다.
_flush(callback) {
// flush라는 이름 답게 출력되지 않은 데이터의 출력을 수행한다.
this.push(this.tailPiece);
// 스트림을 종료한다.
callback();
}
}

TODO: (전부 다 책에서 나온 내용)

스트림 간의 Pipelining(조합) 소개

스트림 기반 비동기 제어 소개 (순차/비순차/제한된 비순차)

스트림 fork, merge

스트림 멀티플렉싱, 디멀티플렉싱


소스 코드 출처: Node.js 디자인 패턴

스트림 파트는 내가 스트림에 대한 경험도 거의 없고 책에서 설명하는 내용이 어려워서 내 생각을 넣어 포스팅하기가 매우 어려웠다. 내용을 간략히 정리하는 선에서 마쳐야 할 것 같아 아쉽다.

5장 Stream API (2/3) - Node.js의 4가지 스트림 소개와 사용법

https://jsqna.com/ndp-5-stream-2/

Author

Seongbin Kim

Posted on

21-03-05

Updated on

21-03-12

Licensed under

댓글