5장 Stream API 디자인 패턴 - Pipe, Fork, Merge, Mux/Demux

이 글은 Stream에서의 Pipe, Fork, Merge, Mux/Demux 패턴에 대해 소개하고 Mux/Demux는 예를 제공한다.

참고 자료:


1. Pipe 패턴

여기서 말하는 Pipe 패턴이란 스트림의 조합으로 이루어진 하나의 파이프라인을 모듈화하고 재사용하는 방법을 말한다.

Pipe 패턴 구현 시 주의할 점

  • 첫 Stream에 Write하고, 마지막 Stream에서 Read해야 한다.
  • 내부의 모든 Stream에서 발생하는 오류를 포착할 수 있어야 한다. Error Listener 하나로 Pipeline에서 발생하는 모든 오류를 구독할 수 있도록 한다.

Combined-Stream 패키지를 이용한다. (사용량은 압도적이나 Stream v1 - Flowing 모드만 지원한다.)

(Pumpify가 더 좋은 것 같은데 사용법을 잘 모르겠다.)


2. Fork 패턴

서로 다른 대상에 동일한 데이터를 보내는 경우, 즉 하나의 Readable에 2개 이상의 스트림을 연결하는 패턴이다.

Fork 패턴 구현 시 주의할 점

  • .pipe 사용 시 {end: false} 옵션이 필수가 된다. 한 쪽의 작업이 끝나는 경우 다른 쪽도 닫히기 때문
  • 백 프레셔 때문에 제일 느린 스트림에 속도가 맞춰지게 된다.
  • 같은 프로세스 내에 두 스트림이 있는 경우 chunk가 공유되므로 한 쪽의 스트림에서 해당 chunk의 내용을 직접 수정하게 되면 다른 스트림도 그 영향을 받게 된다.

3. Merge 패턴

일련의 Readable을 하나의 스트림으로 연결하는 패턴이다. .pipe({end: false})로 연결해야 한다. Auto End 옵션은 하나의 Redable만 종료되더라도 연결된 스트림까지 종료시키기 때문이다.

Merge-Stream 패키지를 사용한다.

  • multistream 패키지보다 훨씬 사용량이 많다.

4. Mux/Demux 패턴

(직접 구현한다.) 여러 스트림에서 들어오는 데이터를 한 스트림(이 예에서는 net 패키지의 도움을 받아 TCP Socket을 사용한다.)으로 내보내고, 같은 방식으로 데이터를 받아들인 후 여러 스트림으로 다시 분류하는 멀티플렉싱/디멀티플렉싱을 스트림 수준에서 구현한다.

아키텍처 요약

긴 설명은 하지 않고, 코드에 주석을 달아 놓았으니 흐름을 따라가면 쉽게 이해할 수 있을 것이다.

generateData.js

표준 출력, 오류 스트림에서 데이터를 생성하기 위한 코드이다. Client에서 실행하게 된다.

1
2
3
4
5
console.log("out1");
console.log("out2");
console.error("err1");
console.log("out3");
console.error("err2");

Client.js

generateData로 생성된 데이터가 표준 출력, 오류 스트림으로 들어오게 되고, 아래 코드에서 헤더로 포장한 후 Socket으로 Server에 전송한다. (참고로 Client 코드가 이 case에서 가장 어렵다. 이 코드만 이해하면 다 했다고 볼 수 있다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
const child_process = require("child_process");
const net = require("net");

function multiplexChannels(sources, destination) {
let totalChannels = sources.length;

for (let i = 0; i < sources.length; i++) {
sources[i]
.on("readable", function () {
let chunk;
while ((chunk = this.read()) !== null) {
// 5+chunk byte (Node.js는 바이트 스트림(Octet Stream)만 지원하는건지, Buffer도 최소 단위가 bit이 아니라 byte이다.)
const outBuff = Buffer.alloc(1 + 4 + chunk.length); // Buffer.alloc(size); === new Buffer(size);
outBuff.writeUInt8(i, 0); // write(data, idx) - 이 경우에는 idx=0
outBuff.writeUInt32BE(chunk.length, 1); // write (data, idx) 이 경우에는 idx=1 (앞 데이터는 8bit 이므로, 한 칸만 사용)
chunk.copy(outBuff, 5); // 앞에서 40bit를 사용해서 다음 데이터의 offset=5
// chunk가 무슨 타입인지 모르겠지만 Readable이 제공하는 chunk는 copy 메소드가 있는 듯.
console.log("Sending packet to channel: " + i);
destination.write(outBuff); // 대상 스트림으로 쓰기
}
})
.on("end", () => {
// 모든 Readable이 닫힌 후 대상 스트림 종료
if (--totalChannels === 0) {
destination.end();
}
});
}
}

// net.connect: (port, host?, callback)
const socket = net.connect(3000, () => {
//현재 프로세스는 소켓을 열고 끝. net.connect는 Non-blocking call 이다.
const child = child_process.fork(
// child_process.fork로 새 프로세스에서 JS 파일을 실행한다. (이 경우 generateData.js)
process.argv[2],
process.argv.slice(3), // fork로 실행할 JS파일
{ silent: true } // silent 옵션: Child 프로세스가 독립적인 표준 스트림을 갖도록 (상속받지 않도록)
);

multiplexChannels([child.stdout, child.stderr], socket); // 대상 스트림으로 Socket 생성해 전달
});

Server.js

클라이언트로부터 데이터를 파싱한 후 각 스트림에 대응되는 파일에 내용을 쓴다. 헤더 격인 앞 1바이트를 읽어 채널을 구분한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
const net = require("net");
const fs = require("fs");

// 소켓마다 한 번 호출됨 (상태 관리 필요)
// 디멀티플렉싱 수행
function demultiplexChannel(source, destinations) {
// 상태 관리 필드
let currentChannel = null;
let currentLength = null;
source
.on("readable", () => {
// 1. 표준 입력/오류 구분
let chunk;
if (currentChannel === null) {
chunk = source.read(1);
currentChannel = chunk && chunk.readUInt8(0);
}

// 2. 데이터 길이 파싱
if (currentLength === null) {
chunk = source.read(4);
currentLength = chunk && chunk.readUInt32BE(0);
if (currentLength === null) {
return;
}
}

// 3. 데이터 길이만큼 읽기
chunk = source.read(currentLength);
if (chunk === null) {
return;
}

// 4. 읽은 데이터(chunk)를 대상 스트림에 작성
console.log("Received packet from: " + currentChannel);
destinations[currentChannel].write(chunk);

// chunk 순서대로 호출되므로 여기서 다시 null을 할당하면 됨 :)
currentChannel = null;
currentLength = null;
})
.on("end", () => {
// 소켓에서 받은 데이터가 끝난 경우 대상 스트림 모두 종료
destinations.forEach((destination) => destination.end());
console.log("Source channel closed");
});
}

// 소켓 3000번으로 서버 열기
net
.createServer((socket) => {
// 연결 수립 시 수행할 Callback
const stdoutStream = fs.createWriteStream("stdout.log");
const stderrStream = fs.createWriteStream("stderr.log");
// Source: 소켓을 통해 전달된 octet-stream, 대상 스트림 2개: 표준 출력, 표준 오류
demultiplexChannel(socket, [stdoutStream, stderrStream]);
})
.listen(3000, () => console.log("Server started"));

TODO:

  1. 스트림을 제대로 써봐야 제대로 이해할 수 있을 것 같다.
  2. 스트림 생태계가 좀 엉망인데 직접 사용해보고 정리하는 기회가 필요할 것 같다.
  3. 이 글도 예제를 제대로 추가해 영양가 있는 글로 만들어야 한다.

5장 Stream API 디자인 패턴 - Pipe, Fork, Merge, Mux/Demux

https://jsqna.com/ndp-5-stream-4/

Author

Seongbin Kim

Posted on

21-03-25

Updated on

21-03-25

Licensed under

댓글