본문 바로가기

개발/Javascript

Node.js Stream - 높은 퍼포먼스의 Node.js 애플리케이션 만들기

디스크에서 파일을 읽어오거나 인터넷에서 파일을 다운로드할 때 빠른 처리를 위해서 Stream을 사용하곤 한다.

 

유툽에서 동영상을 재생하거나 ott 프로그램으로 영상을 시청한 경험이 있을 것이다.
전체 영상 중 아주 조금만 불러와도 재생이 시작되며 나머지 부분은 시청동안 다운로드가 계속 진행된다.

바로 빠른 처리를 위해서다.

 

Node.js의 stream도 작용방식도 이와 같다.

 

Stream은 Node.js의 빌트인 모듈로, 데이터를 스트리밍할 수 있게 해 준다.

 

이 포스트에서 우리는 간단한 예제를 통해 stream 모듈 사용방법과
또한 높은 퍼포먼스를 위해 여러 개의 스트림을 연결해 파이프라인을 구성하는 법을 익혀볼 것이다.

 

아래로 고고! 👏

 

 

 


1. Node.js Stream의 종류

Node.js stream 모듈은 네 종류의 스트림을 제공한다.

  • Readable
  • Writable
  • Duplex
  • Transform

각 스트림에 대한 Node.js의 공식 설명은 여기에서 확인할 수 있다.

 

 

1.1 Readable Stream

Readable Stream은 특정 데이터 소스에서 데이터를 읽을 때 사용한다.

 

용례는 다음과 같다.

  • 파일 시스템 file system
  • process.stdin: 터미널에서 stdin을 통해 유저의 인풋값을 읽음
  • http.IncomingMessage: HTTP 서버에서 request의 content를 읽거나, HTTP 클라이언트에서 HTTP 서버의 response를 읽음

 

1.2 Writable Stream

파일과 같은 특정 대상에 데이터를 쓸 때 사용한다.

 

process.stdout은 표준 출력에 데이터를 쓸 때 사용되며, console.log 내부에서 사용되기도 한다.

 

1.3 Duplex Stream

읽기readable와 쓰기writable가 모두 가능한 하이브리드 스트림이다.

 

가장 널리 사용되는 예는 소켓에서 데이터를 읽고 쓰는 net.Socket이다.

 

중요한 것은 이 스트림에서는 읽기와 쓰기가 서로 독립적으로 운영된다는 점이다.
데이터가 읽기에서 쓰기, 혹은 그 반대로 흐르지 않는다는 점을 명심해야 한다.

 

1.4 Transform Stream

Duplex Stream과 동일하게 읽기readable와 쓰기writable가 모두 가능한 하이브리드 스트림이다.

하지만 Duplex Stream과는 다르게 읽기와 쓰기가 연결되어있다.

 

암호화 스트림을 implement하는 crypto.Cipher 클래스로 예를 들어보겠다.

crypto.Cipher 스트림을 사용하면 애플리케이션은 텍스트를 쓰기 쪽 스트림에 쓸 수 있고 암호화된 암호문을 읽기 쪽 스트림에서 읽어올 수 있다.

 

이렇게 변형이 가능한 특성이 이 스트림이 'Transform Stream'이라고 불리는 이유다.

 

또한, stream.PassThrough라는 스트림이 있는데, 아무런 변형 없이 쓰기 쪽에서 읽기 쪽으로 데이터를 넘기는 스트림이다.
파이프라인이나 커스텀 스트림 implementation을 만들 때 굉장히 유용하니 알아두면 좋다.

 

 

 


2. 읽기 스트림(Readable Stream)에서 데이터 읽어오기

읽기 스트림이 데이터 소스에 연결되면 이 스트림을 통해 여러 방법으로 데이터를 읽어올 수 있다.

 

먼저, sampleFile이라는 샘플 텍스트 파일을 만들어 보자.

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur fermentum rhoncus dolor, at cras.

 

이제 읽기 스트림을 사용해서 데이터를 읽어오는 두 가지 방법을 알아보자.

 

2.1 data 이벤트 리스닝하기

읽기 스트림에서 데이터를 읽어오는 가장 보편적인 방법은 스트림에서 발생하는 data 이벤트를 listen하는 것이다.

 

아래는 이 방법으로 데이터를 읽는 간단한 코드 내용이다.

const fs = require('fs');
const readStream = fs.createReadStream('./sampleFile.txt', { highWaterMark: 30});

readStream.on(`data`, chunk => {
  console.log(`${chunk.length} bytes 읽음ntxt: "${chunk.toString()}"n`);
});

 

 

fs.createReadStream에 옵션으로 들어간 highWaterMark 속성은 버퍼의 크기를 정하는 속성이다.

디폴트로 64kb로 설정되어 있으나, 여기서 data이벤트를 여러 번 발생시키기 위해 30으로 설정했다.

 

위 코드를 실행시키면 sampleFile.txt 파일에서 데이터를 읽어 콘솔에 아래와 같이 출력한다.

30 bytes 읽음
txt: "Lorem ipsum dolor sit amet, co"

30 bytes 읽음
txt: "nsectetur adipiscing elit. Cur"

30 bytes 읽음
txt: "abitur fermentum rhoncus dolor"

10 bytes 읽음
txt: ", at cras."

 

2.2 Async 반복자 이용하기

두 번째 방법은 async를 이용하는 것이다.

 

예제는 다음과 같다.

const fs = require('fs');
const readStream = fs.createReadStream('./sampleFile.txt', { highWaterMark: 30});

(async () => {
  for await (const chunk of readStream) {
    console.log(`${chunk.length} bytes 읽음ntxt: "${chunk.toString()}"n`);
  }
})();

출력 결과는 1.1과 동일하므로 생략한다.

 

 

 


3. 읽기 스트림의 상태

리스너가 읽기 스트림의 data 이벤트에 등록되면 스트림은 flowing 상태로 바뀐다.

스트림의 상태는 readableFlowing 속성을 통해서 확인할 수 있다.

 

2.1의 코드를 수정해서 스트림의 상태가 언제 변하는지 확인해보자.

const fs = require('fs');
const readStream = fs.createReadStream('./sampleFile.txt', { highWaterMark: 30});

let bytesRead = 0;

console.log(`'data' 이벤트 핸들러 등록 전. isFlowing: ${readStream.readableFlowing}`);

readStream.on('data', chunk => {
  console.log(`${chunk.length} bytes 읽음n`);
  bytesRead += chunk.length;

  // 90bytes를 읽었으면 스트림을 잠시 중지한다.
  if (bytesRead === 90) {
    readStream.pause();
    console.log(`pause() 호출 후. isFlowing: ${readStream.readableFlowing}`);

    // 스트림을 1초 뒤 다시 시작한다.
    setTimeout(() => {
      readStream.resume();
      console.log(`resume() 호출 후. isFlowing: ${readStream.readableFlowing}`);
    }, 1000);
  }
});

console.log(`'data' 이벤트 핸들러 등록 후. isFlowing: ${readStream.readableFlowing}`);

 

위의 예제는  읽기스트림을 통해 sampleFile.txt 파일을 90byte까지 읽다가 pause()를 호출하여 1초동안 일시정지한다.

스트림의 상태가 어떻게 변하는지 관찰하기 위해서 readableFlowing 속성 또한 같이 출력했다.

 

위 코드를 실행시키면 다음과 같이 출력된다.

'data' 이벤트 핸들러 등록 전. isFlowing: null
'data' 이벤트 핸들러 등록 후. isFlowing: true
30 bytes 읽음

30 bytes 읽음

30 bytes 읽음

pause() 호출 후. isFlowing: false
resume() 호출 후. isFlowing: true
10 bytes 읽음

 

출력결과를 우리는 다음과 같이 해석할 수 있다.

  1. 스트림과 관련된 아무런 작업을 하지 않았기 때문에 프로그램 시작시 readableFlowingnull의 값을 갖는다.
  2. data 핸들러가 등록된 후, 읽기스트림은 'flowing' 상태로 들어가게 되며readableFlowing은 true가 된다.
  3. 90byte를 읽고 pause()로 일시정지를 시켰을 때, readableFlowing은 false가 된다.
  4. 1초를 기다린 후, resume() 호출로 인해 스트림은 다시 flowing 상태가 되고, readableFlowing 또한 true의 값을 갖는다. 그리고 파일의 남은 부분을 계속해서 읽는다.

 

 

 


4. 스트림을 이용해 큰 용량의 데이터 처리하기

스트림 덕분에 애플리케이션은 메모리에 큰 용량의 정보를 들고 있을 필요가 없다.

데이터를 받는 대로 처리할 수 있기 때문이다.

 

이번엔 스트림 여러 개를 연결하여 큰 용량의 데이터를 처리하는 앱을 만들어 보자.

파일의 SHA256를 생성하는 앱을 만들어 볼 것이다.

 

4.1 스트림 없이 큰 용량의 데이터 처리해보기

우선, 5GB의 더미 파일을 만들어보자. 아래 쉘 명령어를 이용해서 쉽게 만들 수 있다.

mkfile -n 5g hugeFile

 

더미 파일을 만들었으면 우선 stream없이 SHA256을 다음과 같이 생성해 볼 것이다.

const fs = require('fs');
const crypto = require('crypto');

fs.readFile('./hugeFile', (readErr, data) => {
  if (readErr) {
    console.log(readErr);
    return;
  }

  const hash = crypto.createHash('sha256').update(data).digest('base64');

  fs.writeFile('./checksum.txt', hash, writeErr => {
    if (writeErr) {
      console.log(writeErr);
      return;
    }
  });
});

 

위 코드를 실행하면 다음과 같은 결과를 얻을 수 있다.

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (5368709120) is greater than possible Buffer: 2147483647 bytes
    at FSReqWrap.readFileAfterStat [as oncomplete] (fs.js:264:11)

 

이 에러는 자바스크립트 런타임이 마음대로 큰 버퍼를 핸들할 수 없기 때문에 발생한 에러이다.

에러 메시지 내용에 보이는 버퍼의 최대 크기는 환경에 따라 다르다.

최댓값은 빌트인 buffer 모듈의 buffer.constants.MAX_LENGTH를 찍어보면 확인해볼 수 있다.

 

만약 위와 같은 에러가 발생하지 않고 처리가 잘 완료되었다고 하더라도, 메모리에 큰 용량의 파일을 들고 있는 것은 문제가 될 소지가 다분하다.

사용 가능한 물리적인 메모리는 우리의 앱이 사용할 수 있는 메모리의 크기를 제한하기 때문이다.

 

높은 메모리 사용량은 CPU 사용량을 잡아먹는 저성능의 앱을 만들어 낸다. 이런 앱을 만들고 싶은 개발자는 없을 것이다.

 

 

4.2 pipeline()을 이용하여 메모리 사용 줄이기

이제 스트림을 사용해서 위의 에러가 발생하지 않게 해보자.

const fs = require('fs');
const crypto = require('crypto');
const { pipeline } = require('stream');

const hashStream = crypto.createHash('sha256'),
      inputStream = fs.createReadStream('./hugeFile'),
      outputStream = fs.createWriteStream('./checksum.txt');

hashStream.setEncoding('base64');

pipeline(inputStream, hashStream, outputStream,
  err => {
    if (err) {
      console.log(err);
      return;
    }

    console.log('입출력 완료');
  }
)

위 예제에서는 crypto.createHash 함수를 통해 제공되는 스트리밍을 사용했다.

"Transform Stream" 객체인 hashStream이 리턴되는데, 이 객체로 큰 파일을 위한 해쉬를 만들 것이다.

 

여기서 만들어진 transform stream hashStream에 파일내용을 전달하기 위해,

fs.createReadStream을 사용하여 hugeFile에 읽기 스트림 inputStream을 생성한 다음,

fs.createWriteStream을 사용하여 생성한 outputStream 쓰기 스트림에 transform stream인 hashStream의 출력을 연결한다.

 

위 코드를 실행하면 checksum.txt 파일이 생성되고, 처음에 생성한 5GB 파일에 대한 해쉬값이 담겨 있는 것을 확인할 수 있다! 오류 없이!

 

 

 


5. pipeline() vs pipe()

위 예제에서 우리는 여러 개의 스트림을 연결하기 위해 pipeline()을 사용했다.

이와 유사한 방법은 아래와 같이 pipe()을 사용하는 것이다.

inputStream
  .pipe(hashStream)
  .pipe(outputStream);

 

하지만 실제 앱에서 pipe()을 사용하는 건 몇 가지 이유로 권장되지 않는다.

 

만약 연결된 스트림 중 하나가 어떤 이유로 닫히거나 오류를 던진다면 pipe()은 연결된 스트림들을 destroy하지 않을 것이다.

이는 앱에서 메모리 누수를 유발할 수 있다.

 

또한, pipe()은 한 곳에서 에러를 핸들하도록 다른 스트림에 에러를 자동으로 전달하지 않는다.

 

pipeine()은 이런 문제점들을 보완하기 위해 나왔다.

그러므로 여러 개의 스트림을 연결하기 위해서 pipe()보다 pipeline() 사용하기를 권장하는 바이다.

pipe()을 사용한 위의 예제는 아래와 같이 pipeline()을 사용하도록 변경할 수 있다.

pipeline(inputStream, hashStream, outputStream,
  err => {
    err && console.error(err);
  }
)

 

pipeline()은 마지막 파라미터로 콜백함수를 받고 있다.

연결된 스트림 중 어떤 스트림에서 발생된 에러라도 이 콜백함수를 호출하며, 이 때문에 모든 스트림의 에러를 한 군데에서 효율적으로 처리할 수 있게 된다.

 

 

 


6. 마치며

: 메모리를 줄이고 퍼포먼스 향상을 하고 싶다면 Node.js의 Stream을 사용하세요!

 

Node.js의 스트림은 큰 용량의 데이터를 잘 처리할 수 있는 우수한 퍼포먼스의 앱을 만들 수 있게 도와준다.

이 포스트에서 우리는 다음을 알아봤다.

  • Node.js 스트림의 네 가지 종류 (Readable, Writable, Duplex, Transform)
  • data 이벤트를 리스닝 혹은 async 반복자를 활용하여 읽기 스트림에서 데이터를 읽는 방법
  • pipeline으로 여러 개의 스트림을 연결하여 memory footprint를 줄이는 방법

 

스트림 이용에 앞서 주의할 점:

스트림이 필요하지 않거나, 스트림 기반 접근이 오히려 앱의 복잡성을 증가시키기는 상황이 그렇게 많지는 않을 것이다.

하지만 사용에 앞서 스트림을 이용함으로써 증가하는 복잡성보다 이득이 더 큰지 확인을 하기 바란다.