Skip to content

Node流式处理

流式处理

流式处理是一种处理数据的方式,它将数据分成小块,然后逐个处理这些小块。流式处理可以提高处理数据的效率,减少内存的使用。

Node.js 的流式处理是其处理大文件、网络数据的核心机制,通过事件驱动和非阻塞方式实现高效数据传输。理解流的工作原理对构建高性能应用至关重要。

Node流式处理

一、流的核心概念

1. 流的本质

流是数据的抽象序列,像“管道”一样让数据按顺序流动,无需一次性加载全部数据。常见场景:

  • 读取/写入大文件(如 300GB 日志)
  • 网络请求/响应(如 HTTP 流)
  • 数据转换(如压缩、加密)

2. 四大流类型

类型方向典型场景
可读流数据源 → 程序fs.createReadStream
可写流程序 → 目的地fs.createWriteStream
双工流双向net.Socket(网络连接)
转换流输入 → 转换 → 输出zlib(压缩/解压)、crypto(加密)

3. 核心特性

  • 背压(Backpressure):当写入速度慢于读取速度时,自动暂停读取,防止内存溢出。
  • 事件驱动:通过 on('data')on('end') 等事件处理数据。
  • 管道(Pipeline):用 stream.pipe() 连接多个流,形成数据处理链。

二、可读流(Readable Stream)

1. 两种模式

  • 流动模式(Flow Mode):数据自动流出,通过 data 事件触发。
  • 暂停模式(Paused Mode):需主动调用 stream.read() 获取数据。

2. 创建可读流

javascript
const { Readable } = require('stream');

// 自定义可读流(示例:生成1-10的数字)
const readable = new Readable({
  read(size) {
    if (this.current >= 10) {
      this.push(null); // 结束流
      return;
    }
    this.push(this.current++.toString());
  }
});

readable.current = 1;

// 监听数据事件(流动模式)
readable.on('data', (chunk) => {
  console.log('接收到数据:', chunk.toString());
});

readable.on('end', () => {
  console.log('流结束');
});

3. 关键事件

  • data:有新数据可读时触发。
  • end:数据全部读完时触发。
  • error:发生错误时触发。
  • pause/resume:控制流动模式的开关。

三、可写流(Writable Stream)

1. 核心方法

  • stream.write(chunk):写入数据块。
  • stream.end():结束写入,可选传入最后一块数据。

2. 创建可写流

javascript
const { Writable } = require('stream');

// 自定义可写流(示例:将数据写入数组)
const writable = new Writable({
  write(chunk, encoding, callback) {
    // 处理数据(示例:存入数组)
    this.dataArray.push(chunk.toString());
    callback(); // 必须调用回调通知写入完成
  }
});

writable.dataArray = [];

// 写入数据
writable.write('Hello');
writable.write('World');
writable.end(); // 结束写入

writable.on('finish', () => {
  console.log('写入完成:', writable.dataArray); // 输出: ['Hello', 'World']
});

3. 背压处理

write() 返回 false 时,表示缓冲区已满,需暂停写入:

javascript
if (!writable.write(chunk)) {
  readable.pause(); // 暂停读取
  writable.once('drain', () => {
    readable.resume(); // 恢复读取
  });
}

四、管道(Pipeline):连接流的桥梁

1. 基本用法

javascript
const fs = require('fs');

// 从文件读取 → 压缩 → 写入新文件
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt.gz');
const compressStream = require('zlib').createGzip();

readStream.pipe(compressStream).pipe(writeStream);

// 监听管道完成或错误
writeStream.on('finish', () => console.log('压缩完成'));
writeStream.on('error', (err) => console.error('管道错误:', err));

2. 优势

  • 自动处理背压,无需手动管理 pause/resume
  • 简化流的连接,避免复杂的事件监听。

3. 现代管道 API(Node 10+)

javascript
const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('input.txt'),
    zlib.createGzip(),
    fs.createWriteStream('output.txt.gz')
  );
  console.log('管道执行完成');
}

run().catch(console.error);

五、转换流(Transform Stream)

1. 典型场景

  • 数据格式转换(如 CSV → JSON)。
  • 内容处理(如替换文本、加密)。

2. 创建转换流

javascript
const { Transform } = require('stream');

// 示例:将文本转为大写
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// 使用转换流
process.stdin // 标准输入
  .pipe(upperCaseTransform)
  .pipe(process.stdout); // 标准输出

// 输入 "hello" → 输出 "HELLO"

六、实战:高效处理超大文件

1. 逐行读取日志文件

javascript
const fs = require('fs');
const readline = require('readline');

async function processLargeFile(filePath) {
  const fileStream = fs.createReadStream(filePath);
  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity // 识别所有换行符
  });

  let lineCount = 0;
  for await (const line of rl) {
    // 逐行处理(示例:统计行数)
    lineCount++;
    if (lineCount % 100000 === 0) {
      console.log(`已处理 ${lineCount} 行`);
    }
  }

  console.log(`文件处理完成,共 ${lineCount} 行`);
}

processLargeFile('huge_log.txt');

2. 分块写入大文件

javascript
const fs = require('fs');

const writeStream = fs.createWriteStream('output.txt');

// 模拟生成大量数据(1GB)
const totalSize = 1024 * 1024 * 1024; // 1GB
const chunkSize = 1024 * 1024; // 1MB
const totalChunks = totalSize / chunkSize;

for (let i = 0; i < totalChunks; i++) {
  const chunk = Buffer.alloc(chunkSize, 'a'); // 生成1MB的'a'
  if (!writeStream.write(chunk)) {
    // 处理背压
    await new Promise(resolve => writeStream.once('drain', resolve));
  }
}

writeStream.end();

七、性能优化与最佳实践

1. 内存控制

  • 设置合理的 highWaterMark(默认 16KB):
    javascript
    fs.createReadStream('file.txt', { highWaterMark: 64 * 1024 }); // 64KB缓冲区
  • 使用 pipeline() 自动处理背压。

2. 错误处理

  • 每个流都应监听 error 事件。
  • 使用 pipeline() 时,错误会自动传播并关闭所有流。

3. 并发处理

  • 对独立数据块,可并行处理(需注意顺序):
    javascript
    const { Transform } = require('stream');
    const { Worker } = require('worker_threads');
    
    const workerTransform = new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        const worker = new Worker('./process_chunk.js');
        worker.on('message', (result) => callback(null, result));
        worker.on('error', (err) => callback(err));
        worker.postMessage(chunk);
      }
    });

经典流式处理案例

Node.js 流式提取大日志文件方案

例子(面试题):

现在有个几百G的日志文件,内容里的每一行的开头都是日志写的日期,是按时间顺序的,如何高效读取某个时间范围内的日志内容。

对于 几百G 的超大日志文件,传统的一次性读取方式会导致内存溢出。

最佳方案是使用 Node.js 的流(Stream)接口进行高效处理。

一、核心思路

  1. 流式读取:使用 fs.createReadStream 逐行读取,避免内存溢出。

  2. 二分查找:利用日志按时间排序的特性,通过二分法快速定位起始位置。

  3. 范围过滤:读取时判断每行日志时间是否在目标范围内,匹配则输出。

二、实现步骤

1. 逐行解析日志文件

使用 readline 模块逐行处理,避免一次性加载整个文件:

javascript
const fs = require('fs');
const readline = require('readline');

async function streamLogFile(filePath, startDate, endDate, options = {}) {
  const { chunkSize = 1024 * 1024 } = options; // 默认每次读取1MB
  const fileStream = fs.createReadStream(filePath);
  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity // 识别所有换行符
  });

  const results = [];
  let isInRange = false;

  for await (const line of rl) {
    const logDate = extractDateFromLine(line); // 自定义日期提取函数
    
    // 判断是否进入时间范围
    if (!isInRange && logDate >= startDate) {
      isInRange = true;
    }
    
    // 超出结束时间则停止读取
    if (isInRange && logDate > endDate) {
      break;
    }
    
    // 收集范围内的日志
    if (isInRange) {
      results.push(line);
      
      // 控制内存使用,分批处理
      if (results.length >= chunkSize) {
        yield results;
        results.length = 0; // 清空数组
      }
    }
  }

  // 输出剩余结果
  if (results.length > 0) {
    yield results;
  }

  rl.close();
}

// 从日志行中提取日期(需根据实际日志格式调整)
function extractDateFromLine(line) {
  const dateStr = line.substring(0, 19); // 假设前19个字符是日期(YYYY-MM-DD HH:MM:SS)
  return new Date(dateStr);
}

2. 优化:二分查找定位起始位置

对于超大型文件,从头逐行读取仍效率低下。可通过二分法快速定位起始日期:

javascript
async function findStartPosition(filePath, startDate) {
  const fileSize = fs.statSync(filePath).size;
  let low = 0;
  let high = fileSize;
  
  while (low < high) {
    const mid = Math.floor((low + high) / 2);
    const line = await readLineAtPosition(filePath, mid);
    const lineDate = extractDateFromLine(line);
    
    if (lineDate < startDate) {
      low = mid + 1;
    } else {
      high = mid;
    }
  }
  
  return low;
}

async function readLineAtPosition(filePath, position) {
  const fileStream = fs.createReadStream(filePath, {
    start: position,
    end: position + 1024, // 读取足够的字节以包含完整行
    encoding: 'utf8'
  });

  let data = '';
  for await (const chunk of fileStream) {
    data += chunk;
    const lineEnd = data.indexOf('\n');
    if (lineEnd !== -1) {
      return data.substring(0, lineEnd);
    }
  }
  
  return data;
}

3. 完整使用示例

结合上述方法,高效读取指定时间范围的日志:

javascript
async function main() {
  const filePath = '/path/to/huge.log';
  const startDate = new Date('2023-01-01 00:00:00');
  const endDate = new Date('2023-01-02 23:59:59');
  
  // 快速定位起始位置
  const startPosition = await findStartPosition(filePath, startDate);
  
  // 从起始位置开始流式读取
  const streamOptions = {
    start: startPosition,
    chunkSize: 10000 // 每批处理10000行
  };
  
  // 处理结果(示例:写入新文件)
  const writeStream = fs.createWriteStream('filtered.log');
  
  for await (const chunk of streamLogFile(filePath, startDate, endDate, streamOptions)) {
    writeStream.write(chunk.join('\n') + '\n');
  }
  
  writeStream.end();
  console.log('日志提取完成');
}

main().catch(console.error);

三、性能优化建议

  1. 并行处理:对于多TB级日志,可按文件偏移量分割,并行处理多个片段。

  2. 索引加速:预先创建时间索引文件(记录时间戳与文件位置的映射)。

  3. 内存控制

    • 使用 highWaterMark 参数控制读取缓冲区大小。

    • 通过 yield 分批处理结果,避免内存溢出。

  4. 压缩文件处理:若日志是压缩的(如 .gz),使用 zlib 模块流式解压。

四、注意事项

  • 日志格式适配extractDateFromLine 函数需根据实际日志格式调整。

  • 边界条件:处理日期格式错误、文件末尾不完整行等情况。

  • 错误处理:添加重试机制(如读取失败时回退1KB重新读取)。