Node流式处理
流式处理
流式处理是一种处理数据的方式,它将数据分成小块,然后逐个处理这些小块。流式处理可以提高处理数据的效率,减少内存的使用。
Node.js 的流式处理是其处理大文件、网络数据的核心机制,通过事件驱动和非阻塞方式实现高效数据传输。理解流的工作原理对构建高性能应用至关重要。
Node流式处理
一、流的核心概念
1. 流的本质
流是数据的抽象序列,像“管道”一样让数据按顺序流动,无需一次性加载全部数据。常见场景:
- 读取/写入大文件(如 300GB 日志)
- 网络请求/响应(如 HTTP 流)
- 数据转换(如压缩、加密)
2. 四大流类型
| 类型 | 方向 | 典型场景 |
|---|---|---|
| 可读流 | 数据源 → 程序 | fs.createReadStream |
| 可写流 | 程序 → 目的地 | fs.createWriteStream |
| 双工流 | 双向 | net.Socket(网络连接) |
| 转换流 | 输入 → 转换 → 输出 | zlib(压缩/解压)、crypto(加密) |
3. 核心特性
- 背压(Backpressure):当写入速度慢于读取速度时,自动暂停读取,防止内存溢出。
- 事件驱动:通过
on('data')、on('end')等事件处理数据。 - 管道(Pipeline):用
stream.pipe()连接多个流,形成数据处理链。
二、可读流(Readable Stream)
1. 两种模式
- 流动模式(Flow Mode):数据自动流出,通过
data事件触发。 - 暂停模式(Paused Mode):需主动调用
stream.read()获取数据。
2. 创建可读流
const { Readable } = require('stream');
// 自定义可读流(示例:生成1-10的数字)
const readable = new Readable({
read(size) {
if (this.current >= 10) {
this.push(null); // 结束流
return;
}
this.push(this.current++.toString());
}
});
readable.current = 1;
// 监听数据事件(流动模式)
readable.on('data', (chunk) => {
console.log('接收到数据:', chunk.toString());
});
readable.on('end', () => {
console.log('流结束');
});3. 关键事件
data:有新数据可读时触发。end:数据全部读完时触发。error:发生错误时触发。pause/resume:控制流动模式的开关。
三、可写流(Writable Stream)
1. 核心方法
stream.write(chunk):写入数据块。stream.end():结束写入,可选传入最后一块数据。
2. 创建可写流
const { Writable } = require('stream');
// 自定义可写流(示例:将数据写入数组)
const writable = new Writable({
write(chunk, encoding, callback) {
// 处理数据(示例:存入数组)
this.dataArray.push(chunk.toString());
callback(); // 必须调用回调通知写入完成
}
});
writable.dataArray = [];
// 写入数据
writable.write('Hello');
writable.write('World');
writable.end(); // 结束写入
writable.on('finish', () => {
console.log('写入完成:', writable.dataArray); // 输出: ['Hello', 'World']
});3. 背压处理
write() 返回 false 时,表示缓冲区已满,需暂停写入:
if (!writable.write(chunk)) {
readable.pause(); // 暂停读取
writable.once('drain', () => {
readable.resume(); // 恢复读取
});
}四、管道(Pipeline):连接流的桥梁
1. 基本用法
const fs = require('fs');
// 从文件读取 → 压缩 → 写入新文件
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt.gz');
const compressStream = require('zlib').createGzip();
readStream.pipe(compressStream).pipe(writeStream);
// 监听管道完成或错误
writeStream.on('finish', () => console.log('压缩完成'));
writeStream.on('error', (err) => console.error('管道错误:', err));2. 优势
- 自动处理背压,无需手动管理
pause/resume。 - 简化流的连接,避免复杂的事件监听。
3. 现代管道 API(Node 10+)
const { pipeline } = require('stream/promises');
async function run() {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz')
);
console.log('管道执行完成');
}
run().catch(console.error);五、转换流(Transform Stream)
1. 典型场景
- 数据格式转换(如 CSV → JSON)。
- 内容处理(如替换文本、加密)。
2. 创建转换流
const { Transform } = require('stream');
// 示例:将文本转为大写
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// 使用转换流
process.stdin // 标准输入
.pipe(upperCaseTransform)
.pipe(process.stdout); // 标准输出
// 输入 "hello" → 输出 "HELLO"六、实战:高效处理超大文件
1. 逐行读取日志文件
const fs = require('fs');
const readline = require('readline');
async function processLargeFile(filePath) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity // 识别所有换行符
});
let lineCount = 0;
for await (const line of rl) {
// 逐行处理(示例:统计行数)
lineCount++;
if (lineCount % 100000 === 0) {
console.log(`已处理 ${lineCount} 行`);
}
}
console.log(`文件处理完成,共 ${lineCount} 行`);
}
processLargeFile('huge_log.txt');2. 分块写入大文件
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
// 模拟生成大量数据(1GB)
const totalSize = 1024 * 1024 * 1024; // 1GB
const chunkSize = 1024 * 1024; // 1MB
const totalChunks = totalSize / chunkSize;
for (let i = 0; i < totalChunks; i++) {
const chunk = Buffer.alloc(chunkSize, 'a'); // 生成1MB的'a'
if (!writeStream.write(chunk)) {
// 处理背压
await new Promise(resolve => writeStream.once('drain', resolve));
}
}
writeStream.end();七、性能优化与最佳实践
1. 内存控制
- 设置合理的
highWaterMark(默认 16KB):javascriptfs.createReadStream('file.txt', { highWaterMark: 64 * 1024 }); // 64KB缓冲区 - 使用
pipeline()自动处理背压。
2. 错误处理
- 每个流都应监听
error事件。 - 使用
pipeline()时,错误会自动传播并关闭所有流。
3. 并发处理
- 对独立数据块,可并行处理(需注意顺序):javascript
const { Transform } = require('stream'); const { Worker } = require('worker_threads'); const workerTransform = new Transform({ objectMode: true, transform(chunk, encoding, callback) { const worker = new Worker('./process_chunk.js'); worker.on('message', (result) => callback(null, result)); worker.on('error', (err) => callback(err)); worker.postMessage(chunk); } });
经典流式处理案例
Node.js 流式提取大日志文件方案
例子(面试题):
现在有个几百G的日志文件,内容里的每一行的开头都是日志写的日期,是按时间顺序的,如何高效读取某个时间范围内的日志内容。
对于 几百G 的超大日志文件,传统的一次性读取方式会导致内存溢出。
最佳方案是使用 Node.js 的流(Stream)接口进行高效处理。
一、核心思路
流式读取:使用
fs.createReadStream逐行读取,避免内存溢出。二分查找:利用日志按时间排序的特性,通过二分法快速定位起始位置。
范围过滤:读取时判断每行日志时间是否在目标范围内,匹配则输出。
二、实现步骤
1. 逐行解析日志文件
使用 readline 模块逐行处理,避免一次性加载整个文件:
const fs = require('fs');
const readline = require('readline');
async function streamLogFile(filePath, startDate, endDate, options = {}) {
const { chunkSize = 1024 * 1024 } = options; // 默认每次读取1MB
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity // 识别所有换行符
});
const results = [];
let isInRange = false;
for await (const line of rl) {
const logDate = extractDateFromLine(line); // 自定义日期提取函数
// 判断是否进入时间范围
if (!isInRange && logDate >= startDate) {
isInRange = true;
}
// 超出结束时间则停止读取
if (isInRange && logDate > endDate) {
break;
}
// 收集范围内的日志
if (isInRange) {
results.push(line);
// 控制内存使用,分批处理
if (results.length >= chunkSize) {
yield results;
results.length = 0; // 清空数组
}
}
}
// 输出剩余结果
if (results.length > 0) {
yield results;
}
rl.close();
}
// 从日志行中提取日期(需根据实际日志格式调整)
function extractDateFromLine(line) {
const dateStr = line.substring(0, 19); // 假设前19个字符是日期(YYYY-MM-DD HH:MM:SS)
return new Date(dateStr);
}2. 优化:二分查找定位起始位置
对于超大型文件,从头逐行读取仍效率低下。可通过二分法快速定位起始日期:
async function findStartPosition(filePath, startDate) {
const fileSize = fs.statSync(filePath).size;
let low = 0;
let high = fileSize;
while (low < high) {
const mid = Math.floor((low + high) / 2);
const line = await readLineAtPosition(filePath, mid);
const lineDate = extractDateFromLine(line);
if (lineDate < startDate) {
low = mid + 1;
} else {
high = mid;
}
}
return low;
}
async function readLineAtPosition(filePath, position) {
const fileStream = fs.createReadStream(filePath, {
start: position,
end: position + 1024, // 读取足够的字节以包含完整行
encoding: 'utf8'
});
let data = '';
for await (const chunk of fileStream) {
data += chunk;
const lineEnd = data.indexOf('\n');
if (lineEnd !== -1) {
return data.substring(0, lineEnd);
}
}
return data;
}3. 完整使用示例
结合上述方法,高效读取指定时间范围的日志:
async function main() {
const filePath = '/path/to/huge.log';
const startDate = new Date('2023-01-01 00:00:00');
const endDate = new Date('2023-01-02 23:59:59');
// 快速定位起始位置
const startPosition = await findStartPosition(filePath, startDate);
// 从起始位置开始流式读取
const streamOptions = {
start: startPosition,
chunkSize: 10000 // 每批处理10000行
};
// 处理结果(示例:写入新文件)
const writeStream = fs.createWriteStream('filtered.log');
for await (const chunk of streamLogFile(filePath, startDate, endDate, streamOptions)) {
writeStream.write(chunk.join('\n') + '\n');
}
writeStream.end();
console.log('日志提取完成');
}
main().catch(console.error);三、性能优化建议
并行处理:对于多TB级日志,可按文件偏移量分割,并行处理多个片段。
索引加速:预先创建时间索引文件(记录时间戳与文件位置的映射)。
内存控制:
使用
highWaterMark参数控制读取缓冲区大小。通过
yield分批处理结果,避免内存溢出。
压缩文件处理:若日志是压缩的(如
.gz),使用zlib模块流式解压。
四、注意事项
日志格式适配:
extractDateFromLine函数需根据实际日志格式调整。边界条件:处理日期格式错误、文件末尾不完整行等情况。
错误处理:添加重试机制(如读取失败时回退1KB重新读取)。