5장 Stream API (3/3) - Stream을 사용할 때에 순차 실행, 병렬 실행, 제한된 병렬 실행 구현하기

이 글은 Stream을 사용할 때에 순차 실행, 병렬 실행, 제한된 병렬 실행에 대해 다룬다. 또한 독자가 Node.js Stream에 대한 기초 지식이 있음을 전제로 작성되었음을 밝힌다.

참고 자료:


1. 여러 파일을 하나의 파일로 순차적으로 병합하는 방법

스트림은 당연하게도 비동기로 작동한다. 여러 개의 Redable Stream이 있고 하나의 Writable Stream이 있을 때, 각 작업들을 순차적으로 수행하는 방법이 있을까? 가능하다. 여러 개의 Readable 을 각각 Writable로 연결하고, Redable에 순서를 지정하면 된다. 아래 코드에 대한 설명은 주석으로 나타나 있으니 주석을 따라가기 바란다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');

function concatFiles(destination, files, callback) {
const destStream = fs.createWriteStream(destination);

// fromArray.obj: readableStream of param array.
fromArray.obj(files)
// readable 끼리 pipe 수행
// through.obj(fn) == through({ objectMode: true }, fn) => Transform 스트림 반환
// 현재는 through를 많이 사용하지 않아도 괜찮음.
.pipe(through.obj((file, enc, done) => {
const src = fs.createReadStream(file);
// 파일명을 file로 입력 받음
// src1 => dest 로 pipe 연결 (pipe 사용 시 자동으로 백 프래셔 수행. src에서 데이터 생산만 하면 됨.)
// src1.end
// src2 => dest 로 pipe 연결
// src2.end
// ...
// dest.end
// [끝]
// ---
// 연결을 요청함. 이벤트 핸들러 등록과 같은 느낌. 실제 스트림 간의 통신은 비동기로 수행됨.
src.pipe(destStream, {end: false});
// 이후 src에서 dest로 연결하려면, dest는 종료되지 않아야 함
// ---
// 이 파일에 대한 Read Stream이 끝나면, through.obj로 생성하는 Trasnform 스트림의
// callback인 'done' 함수를 호출하게 함. (단순히 params 이름만 바꾼 것임.)
src.on('end', done);
}))
.on('finish', () => {
// WritableStream을 종료함.
destStream.end();
// concatFiles 호출자에게 종료를 알림.
callback();
});
}

2. 순서에 상관 없이 결과를 비동기로, 병렬적으로 한 파일에 출력하는 방법

http://thiswillbedownforsure.com is down
https://www.naver.com is up
https://www.google.com is up

위와 같이 특정 사이트 목록들에 대해 health check를 하고 그 결과를 파일로 출력하는 프로그램을 만든다고 하자. 굳이 Stream으로 만들 필요는 없겠지만 그렇게 해본다면 다음과 같은 코드를 생각해볼 수 있다.

일단 Transform 기반의 스트림을 하나 정의한다. 이 스트림은 request의 콜백으로 스트림의 기능을 빌려주는 형태로 작동한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Transform 스트림을 하나 정의한다.
class ParallelStream extends stream.Transform {
constructor(userTransform) {
super({objectMode: true});
this.userTransform = userTransform;
// const userTransform = (chunk, enc, done, pushFn) => { ... }
this.running = 0;
this.terminateCallback = null;
}

_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
done();
}

// flush는 스트림 종료 직전에 호출되며 즉 done() 의 호출 여부를 결정할 수 있다.
_flush(done) {
// 작업이 모두 종료되기 전에 스트림이 종료되려고 하는 경우 done()을 호출하지 않는다.
// 그 대신 onComplete에서 곧바로 종료할 수 있도록 done 함수를
if(this.running > 0) {
this.terminateCallback = done;
} else {
done();
}
}

// userTransform에서 done이라는 이름으로 호출되는 함수. 이 때의 done은 각 단위 작업의 완료를 의미한다.
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
// 실행 중인 작업이 모두 종료되었고 스트림 종류가 한 번 이상 보류된 경우 직접 스트림을 종료한다.
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
};

위에서 정의한 스트림을 사용해 구현한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
(ex)
1. process.argv[2]: urls.txt

2. urls.txt:
http://thiswillbedownforsure.com
https://www.naver.com
https://www.google.com
*/
fs.createReadStream(process.argv[2]) //[1] 파일로 readable 스트림 생성
.pipe(split()) //[2] 파일의 라인 단위로 chunk를 잡아 출력하는 Transform 스트림 생성 (파일 내용은 url 단위로 줄바꿈 돼있음)
.pipe(
//[3] pipe로 전달되는 데이터(각 URL) 마다 Transform Stream의 _transform 함수에서 아래의 콜백 함수가 호출된다.
// 생성자로 이 콜백(userTransform이라고 불리는)을 등록한다.
new ParallelStream((url, enc, done, push) => {
if (!url) return done(); // 더 이상 데이터가 없는 경우 (null인 경우) 스트림 종료하도록 (this.running == 0)
request.head(url, (err, response) => {
push(url + " is " + (err ? "down" : "up") + "\n");
done();
});
})
)
.pipe(fs.createWriteStream("results.txt"))
.on("finish", () => console.log("All urls were checked"));
/*
result:
http://thiswillbedownforsure.com is down
https://www.naver.com is up
https://www.google.com is up
*/

3. (2)의 동시 실행 수를 제한하는 방법

비동기 요청 여러 개를 처리하는 일은 Node.js에선 매우 간단하다. Run to Completion이기 때문에 변수 하나로 비동기 작업의 개수를 정확히 세고 이 값에 기반해 의사 결정을 할 수 있다.

따라서 this.running의 개수가 동시 실행 제한 개수에 도달한 경우 처리하지 않으면 된다. 좀 더 정확하게는, _transform 함수에서 해당 chunk의 처리가 완료됐음을 알리는 콜백을 호출하지 않고 보류하면 된다.

이 경우 해당 chunk를 처리한 결과는 다음 스트림으로 넘어가지 않으며 현재 chunk가 처리되지 않았기 때문에 추가적인 chunk가 스트림으로 전달되지도 않는다(스트림 내부 버퍼에 쌓인다).

만약 ReadableStream이 chunk를 생성하고 내보내는 속도가 우리의 스트림의 처리 속도보다 빠르다면 처리되지 않는 chunk는 Transform의 버퍼에 쌓이며 이내 백 프레셔가 발동되고 알아서 처리될 것이다. - pipe로 연결하면 Node.js에서 자동으로 처리한다. 백 프래셔에 대해선 5장 Stream API (2/3) - Node.js의 4가지 스트림 소개와 사용법을 참고하라.

따라서 추가적으로 신경써야 하는 부분은 출력을 할 지 여부를 결정하는 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
constructor(concurrency, userTransform) {
// ... 아래 두 멤버 필드만 추가된다.
this.concurrency = concurrency;
this.continueCallback = null;
}

// 입력은 직접 제한하지 않고 계속 받는다.
// 완료를 의미하는 콜백을 호출해서 다음 chunk를 처리하지 않으면, 스트림 내부의 버퍼에 쌓이게 된다.
// 그러면 Node.js 런타임이 자동으로 백 프레셔를 수행한다.
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
if (this.running < this.concurrency) {
done();
} else {
// 만약 현재 running의 최대치에 도달한 경우 완료 콜백을 수행하지 않는다. 이는 자연스럽게 백 프래셔 발동으로 이어진다.
this.continueCallback = done;
}
}

_onComplete(err) {
this.running--;
if (err) {
return this.emit("error", err);
}

// continueCallback이 할당되어 있으면 호출한다.
// 이 시점에서 앞 chunk들은 모두 처리됐음이 보장된다.
// 왜냐하면 입력이 출력보다 충분히 빨라 버퍼링이 되는 시점에서는 항상 continueCallback으로 done()이 호출되게 된다.
// 항상 this.running == this.concurrency여서 꽉 차 있는 상태이기 때문이다.
// (설명이 부드럽지 못한데 실행 흐름을 보고 설명을 다시 읽어보면 이해가 될 것이다.)
const tmpCallback = this.continueCallback;
this.continueCallback = null;
tmpCallback && tmpCallback();

if (this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}

참고 자료 (이번 글만 특별히 도움이 됐는지와는 별개로 읽은 몇 개의 글을 링크한다.):

What’s the proper way to handle back-pressure in a node.js Transform stream?

Awesome Nodejs#Streams (Github Repo)


TODO:

  1. Stream 관련해서 자세한 자료보단 내부 구조를 코드 수준에서 확인하는 게 가장 좋을 것 같다.
  2. Back Pressure의 효과를 제대로 확인하기 위해선 디버거를 키고 스트림 객체를 살펴봐야 할 것 같다.
  3. Stream의 추상하된 구현체들을 가져다 쓸 수록 더욱 더 이해하기 어려워지는 것 같다.
  4. Stream을 3부작으로 나누어 작성하려고 했는데 한 10부작 까지는 나올 수도 있을 것 같다. 그만큼 부족하고, 글 쓰는 데도 매우 오래 걸린다.

5장 Stream API (3/3) - Stream을 사용할 때에 순차 실행, 병렬 실행, 제한된 병렬 실행 구현하기

https://jsqna.com/ndp-5-stream-3/

Author

Seongbin Kim

Posted on

21-03-12

Updated on

21-03-25

Licensed under

댓글