5장 Stream API (1/3) - 스트림 개요 및 Readable Stream

이 글은 Node.js 디자인 패턴 CH 05 스트림 코딩의 일부를 참고해서 작성하였다. 이번 글은 Stream API에 대해 깊이 다루기보다 스트림 자체에 대해 다룬다.


스트림 개요

스트림은 파일을 버퍼 단위로 옮겨서 전부 옮길 때까지 기다린 후 처리하기보다 매 버퍼 단위로 전송하는 방식이다.

스트림은 본질적으로 비동기 패러다임으로, 기다린 후 처리하는 Sync 방식에 대비된다. 물론 fs.readFile 역시 Node.js 런타임에서 I/O를 처리해주니 스레드가 Block 되진 않겠지만, 애초에 I/O 수준에서도 기다릴 일이 없게 하는 것이 처리량에서 우위이지 않을까?

(처리량에서 정말 우위일지는 잘 모르겠다. 스트리밍 오버헤드에 대해 공부해본 적이 없기 때문.)

스트림의 공간 효율성

스트림은 메모리에 파일의 전체 내용을 올리지 않고 버퍼의 크기만큼만 메모리를 할당하기 때문에 공간 효율적이다. 더 좋은 점은 파일의 크기에 상관 없이 일정한 양의 메모리를 점유한다는 점이다.

이것과 별개로 V8 엔진은 32bit 기준 ~1GB, 64bit 기준 ~1.7GB 정도의 메모리만 사용하도록 설정돼있어(더 높이려면 빌드해야 함.) 파일이 큰 경우 전체 파일을 한 번에 메모리에 올릴 수 없음.


공간 비효율적인 파일 압축 코드 (ex: example.tar -> example.tar.gz)

1
2
3
4
5
6
7
8
9
10
11
12
const fs = require('fs');
const zlib = require('zlib');

const file = process.argv[2];

fs.readFile(file, (err, buffer) => {
zlib.gzip(buffer, (err, buffer) => {
fs.writeFile(file + '.gz', buffer, err => {
console.log('File successfully compressed');
});
});
});

공간 효율적인 파일 압축 코드 (Stream API)

1
2
3
4
5
6
7
8
9
10
11
const fs = require('fs');
const zlib = require('zlib');

const file = process.argv[2];

// 파일을 읽는데에 buffer 크기만큼만 메모리를 점유하기 때문에 공간 효율적
// pipe 체이닝으로 각 chunk에 대해 이런 저런 처리를 할 수 있음.
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('File successfully compressed'));

참고로 gzip이 어떻게 스트림에 대해 동작하는지 궁금하다면 아래 글들을 참고해보면 좋을 것 같다.

How is it possible to GZIP a stream before the entire contents are known? | StackOverFlow

How does gzip compression rate change when streaming data? | StackOverFlow


스트림의 시간 효율성

Stream은 TTFB(Time to First Byte)에 강점이 있는데, TTFB는 파일의 크기에 비례하여 빠를 수 밖에 없다. 파일의 크기가 클 수록 읽는 데 대기시간이 필요하지만 Stream은 곧바로 응답을 보내기 시작하기 때문이다.

웹에서 TTFB는 매우 중요하다.

자세한 건 Next.js의 재밌는 이슈(Stream rendering to reduce TTFB and CPU load) 참고.

파일을 단위로 전송하는 Server-Client 모델

다음의 사이클을 단 1회 거치게 된다: read > compress > send > receive > decompress > write


chunk 단위로 전송하는 Server-Client 모델

위의 사이클을 매 chunk마다 거치게 되므로 파이프라이닝과 같은 형태로 병렬 처리가 가능하다. 물론 chunk의 크기마다 다르겠지만 각 단계를 거치는 만큼 오버헤드가 있을 것이다. (HTTP header 등. 이 부분에 대해선 잘 알지 못한다.)

Node.js 동시성을 활용하는 것이므로 순서를 맞춰줘야 하는데 Stream API가 알아서 처리한다고 한다.


아래는 파일을 전송하는 스트림 예제 코드이다.

client: 파일을 받아 디스크에 쓰는 역할

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
const filename = req.headers.filename;
console.log('File request received: ' + filename);
req
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream(filename))
.on('finish', () => {
res.writeHead(201 /* CREATED */, {'Content-Type': 'text/plain'});
res.end('That\'s it\n');
console.log(`File saved: ${filename}`);
});
});

server.listen(3000, () => console.log('Listening'));

server: 파일을 읽고 전송하는 역할

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
const fs = require('fs');
const zlib = require('zlib');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
hostname: server,
port: 3000,
path: '/',
method: 'PUT',
headers: {
filename: path.basename(file),
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip'
}
};

const req = http.request(options, res => {
console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(req)
.on('finish', () => {
console.log('File successfully sent');
})
;

스트림의 문제 해결력

스트림은 Composition으로 문제 해결을 한다. Express Middleware와 같이 마음껏 파이프라인을 만들어낼 수 있다.

  • 파이프라인은 각 기능 간에 결합이 없기 때문에 항상 1차원으로 코드가 표현된다. (분기가 없다는 게 아니라 가독성이 좋다는 것.)
  • 선언형으로 프로그래밍하기 수월하다. 선언형 패러다임은 코드를 요약해서 바라볼 수 있기 때문에 쉽게 이해하기 좋다.

스트림을 기반으로 비동기 이벤트를 처리하는 패러다임을 Reactive라고 하고 이를 위한 RxJS가 있다.

(ex) 암호화 기능 추가

1
2
3
4
5
// 복호화
.pipe(crypto.createDecipheriv("aes-192-gcm", "a_shared_secret"))

// 암호화
.pipe(crypto.createCipheriv("aes-192-gcm", "a_shared_secret"))

Node.js에서 지원하는 스트림

Node.js가 지원하는 스트림은 EventEmitter 객체를 상속하며 binary, 문자열 뿐만 아니라 거의 모든 Javascript의 값을 읽을 수 있다. 이러한 스트림에는 크게 네 종류가 있는데 이번 글에서는 (글이 길어지는 관계로) Readable만 다룬다.

  • Readable, Writable, Duplex, Transform

1. Readable

Readable 스트림은 외부에서 읽기 위한 스트림으로, 자신이 가진 값을 chunk로 써서 내보내는 역할이다.

사용 예:

readable 이벤트에 listener를 등록하고 이벤트 발생 시 버퍼에 있는 내용을 모두 읽기

API로는 아래의 함수가 있다.

readable.read([size]) // read는 동기 함수이다.

(ex) 표준 입력(stdin) 받아서 표준 출력(console.log, stdout.write)하기

1
2
3
4
5
6
7
8
9
10
11
process.stdin
.on('readable', () => {
let chunk;
console.log('New data available');
while((chunk = process.stdin.read()) !== null) {
console.log(
`Chunk read: (${chunk.length}) "${chunk.toString()}"`
);
}
})
.on('end', () => process.stdout.write('End of stream'));

Stream v1, v2에 따라 non-flowing mode, flowing mode 로 나뉘는데 어차피 v1은 사용되지 않으므로 설명을 생략한다.


ReadableStream을 하나 새로 만드는 예제

지금까지는 fs, http의 스트림을 그대로 사용했지만 직접 ReadableStream을 만들어 활용할 수도 있다.

stream.Readable을 상속해 abstract function인 _read([size])(public 인터페이스인 read와 헷갈리면 안 된다)를 구현하면 ReadableStream 객체를 하나 만들 수 있다.

구현을 위해 push(data[, encoding]) 함수를 호출해 내부 버퍼에 값을 쓸 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
const stream = require('stream');
const Chance = require('chance');

const chance = new Chance();

// [1] 생성
class RandomStream extends stream.Readable {
constructor(options) {
super(options);
}

_read(size) {
const chunk = chance.string();
console.log(`Pushing chunk of size: ${chunk.length}`);
this.push(chunk, 'utf8');
if(chance.bool({likelihood: 5})) {
this.push(null);
}
}
}

// [2] 사용
const RandomStream = require('./randomStream');
const randomStream = new RandomStream();

randomStream.on('readable', () => {
let chunk;
while((chunk = randomStream.read()) !== null) {
console.log(`Chunk received: ${chunk.toString()}`);
}
});

TODO

나머지 스트림 종류 다루기

백 프래셔

스트림 기반 비동기 제어

Pipe Composition

멀티 플렉싱, 디멀티 플렉싱

5장 Stream API (1/3) - 스트림 개요 및 Readable Stream

https://jsqna.com/ndp-5-stream-1/

Author

Seongbin Kim

Posted on

21-02-01

Updated on

21-03-03

Licensed under

댓글