Streams基础
🎯 学习目标
- 深入理解Node.js Streams的核心概念
- 掌握四种基本Stream类型的使用
- 学会Stream的组合和管道操作
- 了解背压处理和错误管理
📚 核心概念
Stream类型概述
javascript
// Node.js Stream类型
const streamTypes = {
readable: {
description: '可读流 - 数据源',
examples: ['fs.createReadStream', 'http.IncomingMessage', 'process.stdin'],
characteristics: ['产生数据', '可以被消费', '支持暂停/恢复']
},
writable: {
description: '可写流 - 数据目标',
examples: ['fs.createWriteStream', 'http.ServerResponse', 'process.stdout'],
characteristics: ['消费数据', '可以写入', '支持结束信号']
},
duplex: {
description: '双工流 - 既可读又可写',
examples: ['net.Socket', 'tls.TLSSocket', 'crypto.Cipher'],
characteristics: ['独立的读写', '双向通信', '网络连接']
},
transform: {
description: '转换流 - 处理数据',
examples: ['zlib.createGzip', 'crypto.createHash', '自定义转换'],
characteristics: ['读写关联', '数据变换', '中间处理']
}
};
console.log('Stream类型:', streamTypes);
🔍 可读流详解
基础可读流实现
javascript
// readable-stream.js
const { Readable } = require('stream');
// 简单的数据生成器
class NumberGenerator extends Readable {
constructor(options = {}) {
super(options);
this.current = options.start || 1;
this.max = options.max || 100;
this.interval = options.interval || 100;
}
_read() {
if (this.current <= this.max) {
// 生成数据
const data = {
number: this.current,
timestamp: Date.now(),
message: `Number ${this.current}`
};
this.push(JSON.stringify(data) + '\n');
this.current++;
// 模拟异步数据生成
if (this.interval > 0) {
setTimeout(() => {
// 触发下一次读取
}, this.interval);
}
} else {
// 数据结束
this.push(null);
}
}
}
// 文件内容读取器
class FileContentReader extends Readable {
constructor(filePath, options = {}) {
super(options);
this.filePath = filePath;
this.chunkSize = options.chunkSize || 64 * 1024; // 64KB
this.position = 0;
this.fd = null;
this.fileSize = 0;
this._openFile();
}
async _openFile() {
try {
const fs = require('fs').promises;
const stats = await fs.stat(this.filePath);
this.fileSize = stats.size;
// 打开文件
this.fd = await fs.open(this.filePath, 'r');
} catch (error) {
this.destroy(error);
}
}
_read() {
if (!this.fd) {
// 文件还未打开,稍后重试
setTimeout(() => this._read(), 10);
return;
}
if (this.position >= this.fileSize) {
// 文件读取完成
this.push(null);
return;
}
// 读取文件块
const buffer = Buffer.allocUnsafe(this.chunkSize);
this.fd.read(buffer, 0, this.chunkSize, this.position)
.then(({ bytesRead }) => {
if (bytesRead > 0) {
this.position += bytesRead;
this.push(buffer.slice(0, bytesRead));
} else {
this.push(null);
}
})
.catch(error => {
this.destroy(error);
});
}
_destroy(error, callback) {
if (this.fd) {
this.fd.close().finally(() => callback(error));
} else {
callback(error);
}
}
}
// HTTP数据流
class HttpDataStream extends Readable {
constructor(url, options = {}) {
super(options);
this.url = url;
this.response = null;
this.isDestroyed = false;
this._makeRequest();
}
_makeRequest() {
const https = require('https');
const http = require('http');
const client = this.url.startsWith('https:') ? https : http;
const request = client.get(this.url, (response) => {
this.response = response;
// 设置编码
if (response.headers['content-type']?.includes('text')) {
response.setEncoding('utf8');
}
// 处理响应数据
response.on('data', (chunk) => {
if (!this.isDestroyed) {
this.push(chunk);
}
});
response.on('end', () => {
if (!this.isDestroyed) {
this.push(null);
}
});
response.on('error', (error) => {
this.destroy(error);
});
});
request.on('error', (error) => {
this.destroy(error);
});
}
_read() {
// HTTP流是被动的,不需要主动读取
}
_destroy(error, callback) {
this.isDestroyed = true;
if (this.response) {
this.response.destroy();
}
callback(error);
}
}
// 使用示例
async function demonstrateReadableStreams() {
console.log('🔍 可读流演示...\n');
// 1. 数字生成器
console.log('1. 数字生成器:');
const numberGen = new NumberGenerator({ start: 1, max: 5, interval: 200 });
numberGen.on('data', (chunk) => {
const data = JSON.parse(chunk.toString().trim());
console.log(` 生成: ${data.message} at ${new Date(data.timestamp).toISOString()}`);
});
numberGen.on('end', () => {
console.log(' ✅ 数字生成完成\n');
});
// 等待数字生成完成
await new Promise(resolve => numberGen.on('end', resolve));
// 2. 对象模式流
console.log('2. 对象模式流:');
const objectStream = new Readable({
objectMode: true,
read() {
const data = [
{ id: 1, name: 'Alice', age: 25 },
{ id: 2, name: 'Bob', age: 30 },
{ id: 3, name: 'Charlie', age: 35 },
null // 结束标记
];
const item = data.shift();
this.push(item);
}
});
objectStream.on('data', (obj) => {
console.log(' 对象:', obj);
});
objectStream.on('end', () => {
console.log(' ✅ 对象流结束\n');
});
// 3. 流控制演示
console.log('3. 流控制演示:');
const controlledStream = new Readable({
read() {
// 不立即推送数据
}
});
// 手动推送数据
setTimeout(() => {
controlledStream.push('第一块数据\n');
}, 500);
setTimeout(() => {
controlledStream.push('第二块数据\n');
}, 1000);
setTimeout(() => {
controlledStream.push('第三块数据\n');
controlledStream.push(null); // 结束
}, 1500);
controlledStream.on('data', (chunk) => {
console.log(` 接收: ${chunk.toString().trim()}`);
});
controlledStream.on('end', () => {
console.log(' ✅ 控制流结束\n');
});
}
module.exports = {
NumberGenerator,
FileContentReader,
HttpDataStream,
demonstrateReadableStreams
};
✍️ 可写流详解
基础可写流实现
javascript
// writable-stream.js
const { Writable } = require('stream');
const fs = require('fs');
// 控制台输出流
class ConsoleWriter extends Writable {
constructor(options = {}) {
super(options);
this.prefix = options.prefix || '[LOG]';
this.colorize = options.colorize !== false;
this.timestamps = options.timestamps !== false;
}
_write(chunk, encoding, callback) {
try {
let message = chunk.toString();
// 添加时间戳
if (this.timestamps) {
const timestamp = new Date().toISOString();
message = `${timestamp} ${message}`;
}
// 添加前缀
message = `${this.prefix} ${message}`;
// 着色(简单实现)
if (this.colorize) {
if (message.includes('ERROR')) {
message = `\x1b[31m${message}\x1b[0m`; // 红色
} else if (message.includes('WARN')) {
message = `\x1b[33m${message}\x1b[0m`; // 黄色
} else if (message.includes('INFO')) {
message = `\x1b[36m${message}\x1b[0m`; // 青色
}
}
process.stdout.write(message);
callback();
} catch (error) {
callback(error);
}
}
_writev(chunks, callback) {
try {
// 批量写入优化
const messages = chunks.map(chunk => {
let message = chunk.chunk.toString();
if (this.timestamps) {
const timestamp = new Date().toISOString();
message = `${timestamp} ${message}`;
}
return `${this.prefix} ${message}`;
});
process.stdout.write(messages.join(''));
callback();
} catch (error) {
callback(error);
}
}
}
// 内存缓冲区写入器
class MemoryWriter extends Writable {
constructor(options = {}) {
super(options);
this.buffer = [];
this.totalSize = 0;
this.maxSize = options.maxSize || 10 * 1024 * 1024; // 10MB
}
_write(chunk, encoding, callback) {
if (this.totalSize + chunk.length > this.maxSize) {
callback(new Error('Memory buffer overflow'));
return;
}
this.buffer.push({
data: chunk,
encoding: encoding,
timestamp: Date.now()
});
this.totalSize += chunk.length;
callback();
}
// 获取缓冲区内容
getBuffer() {
return this.buffer.map(item => ({
data: item.data.toString(item.encoding),
timestamp: new Date(item.timestamp).toISOString()
}));
}
// 清空缓冲区
clearBuffer() {
const oldSize = this.totalSize;
this.buffer = [];
this.totalSize = 0;
return oldSize;
}
// 获取统计信息
getStats() {
return {
chunks: this.buffer.length,
totalSize: this.totalSize,
maxSize: this.maxSize,
utilization: (this.totalSize / this.maxSize * 100).toFixed(2) + '%'
};
}
}
// 文件写入器(带缓冲)
class BufferedFileWriter extends Writable {
constructor(filePath, options = {}) {
super(options);
this.filePath = filePath;
this.bufferSize = options.bufferSize || 64 * 1024; // 64KB
this.flushInterval = options.flushInterval || 5000; // 5秒
this.buffer = [];
this.bufferLength = 0;
this.fileHandle = null;
this.flushTimer = null;
this._openFile();
this._startFlushTimer();
}
async _openFile() {
try {
this.fileHandle = await fs.promises.open(this.filePath, 'a');
} catch (error) {
this.destroy(error);
}
}
_write(chunk, encoding, callback) {
if (!this.fileHandle) {
// 文件还未打开,稍后重试
setTimeout(() => this._write(chunk, encoding, callback), 10);
return;
}
// 添加到缓冲区
this.buffer.push(chunk);
this.bufferLength += chunk.length;
// 检查是否需要刷新
if (this.bufferLength >= this.bufferSize) {
this._flush(callback);
} else {
callback();
}
}
async _flush(callback) {
if (this.buffer.length === 0) {
if (callback) callback();
return;
}
try {
const data = Buffer.concat(this.buffer);
await this.fileHandle.write(data);
await this.fileHandle.sync(); // 强制写入磁盘
this.buffer = [];
this.bufferLength = 0;
if (callback) callback();
} catch (error) {
if (callback) callback(error);
}
}
_startFlushTimer() {
this.flushTimer = setInterval(() => {
this._flush();
}, this.flushInterval);
}
_final(callback) {
// 流结束时刷新缓冲区
this._flush(callback);
}
_destroy(error, callback) {
if (this.flushTimer) {
clearInterval(this.flushTimer);
}
if (this.fileHandle) {
this.fileHandle.close().finally(() => callback(error));
} else {
callback(error);
}
}
}
// 多路复用写入器
class MultiWriter extends Writable {
constructor(writers, options = {}) {
super(options);
this.writers = writers || [];
this.writeStrategy = options.strategy || 'all'; // 'all', 'first', 'round-robin'
this.currentIndex = 0;
}
_write(chunk, encoding, callback) {
switch (this.writeStrategy) {
case 'all':
this._writeToAll(chunk, encoding, callback);
break;
case 'first':
this._writeToFirst(chunk, encoding, callback);
break;
case 'round-robin':
this._writeRoundRobin(chunk, encoding, callback);
break;
default:
callback(new Error('Unknown write strategy'));
}
}
_writeToAll(chunk, encoding, callback) {
let completedWrites = 0;
let hasError = false;
if (this.writers.length === 0) {
callback();
return;
}
this.writers.forEach((writer) => {
writer.write(chunk, encoding, (error) => {
if (error && !hasError) {
hasError = true;
callback(error);
} else {
completedWrites++;
if (completedWrites === this.writers.length && !hasError) {
callback();
}
}
});
});
}
_writeToFirst(chunk, encoding, callback) {
if (this.writers.length === 0) {
callback(new Error('No writers available'));
return;
}
this.writers[0].write(chunk, encoding, callback);
}
_writeRoundRobin(chunk, encoding, callback) {
if (this.writers.length === 0) {
callback(new Error('No writers available'));
return;
}
const writer = this.writers[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.writers.length;
writer.write(chunk, encoding, callback);
}
// 添加写入器
addWriter(writer) {
this.writers.push(writer);
}
// 移除写入器
removeWriter(writer) {
const index = this.writers.indexOf(writer);
if (index > -1) {
this.writers.splice(index, 1);
}
}
}
// 使用示例
async function demonstrateWritableStreams() {
console.log('✍️ 可写流演示...\n');
// 1. 控制台写入器
console.log('1. 控制台写入器:');
const consoleWriter = new ConsoleWriter({
prefix: '[DEMO]',
colorize: true
});
consoleWriter.write('INFO: 这是一条信息\n');
consoleWriter.write('WARN: 这是一条警告\n');
consoleWriter.write('ERROR: 这是一条错误\n');
// 2. 内存写入器
console.log('\n2. 内存写入器:');
const memoryWriter = new MemoryWriter({ maxSize: 1024 });
memoryWriter.write('第一条数据\n');
memoryWriter.write('第二条数据\n');
memoryWriter.write('第三条数据\n');
console.log(' 缓冲区内容:', memoryWriter.getBuffer());
console.log(' 统计信息:', memoryWriter.getStats());
// 3. 批量写入演示
console.log('\n3. 批量写入演示:');
const batchWriter = new Writable({
write(chunk, encoding, callback) {
console.log(` 单次写入: ${chunk.toString().trim()}`);
callback();
},
writev(chunks, callback) {
console.log(` 批量写入 ${chunks.length} 个块:`);
chunks.forEach((chunk, index) => {
console.log(` ${index + 1}: ${chunk.chunk.toString().trim()}`);
});
callback();
}
});
// 快速连续写入触发批量处理
batchWriter.write('消息1\n');
batchWriter.write('消息2\n');
batchWriter.write('消息3\n');
// 等待写入完成
await new Promise(resolve => {
batchWriter.end(() => resolve());
});
console.log(' ✅ 可写流演示完成');
}
module.exports = {
ConsoleWriter,
MemoryWriter,
BufferedFileWriter,
MultiWriter,
demonstrateWritableStreams
};
🔄 Transform流实现
数据转换流
javascript
// transform-stream.js
const { Transform } = require('stream');
// JSON解析转换流
class JSONParser extends Transform {
constructor(options = {}) {
super({
...options,
objectMode: true // 输出对象
});
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
// 尝试解析JSON对象
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) {
const trimmed = line.trim();
if (trimmed) {
try {
const obj = JSON.parse(trimmed);
this.push(obj);
} catch (error) {
this.emit('error', new Error(`JSON解析失败: ${error.message}, 行: ${trimmed}`));
return;
}
}
}
callback();
}
_flush(callback) {
// 处理最后的缓冲区内容
if (this.buffer.trim()) {
try {
const obj = JSON.parse(this.buffer.trim());
this.push(obj);
} catch (error) {
this.emit('error', new Error(`最后一行JSON解析失败: ${error.message}`));
return;
}
}
callback();
}
}
// 数据过滤转换流
class DataFilter extends Transform {
constructor(filterFn, options = {}) {
super({
...options,
objectMode: true
});
this.filterFn = filterFn;
this.filteredCount = 0;
this.totalCount = 0;
}
_transform(chunk, encoding, callback) {
this.totalCount++;
try {
if (this.filterFn(chunk)) {
this.push(chunk);
} else {
this.filteredCount++;
}
callback();
} catch (error) {
callback(error);
}
}
_flush(callback) {
// 输出统计信息
this.push({
_stats: {
total: this.totalCount,
filtered: this.filteredCount,
passed: this.totalCount - this.filteredCount
}
});
callback();
}
getStats() {
return {
total: this.totalCount,
filtered: this.filteredCount,
passed: this.totalCount - this.filteredCount,
filterRate: (this.filteredCount / this.totalCount * 100).toFixed(2) + '%'
};
}
}
// 数据聚合转换流
class DataAggregator extends Transform {
constructor(options = {}) {
super({
...options,
objectMode: true
});
this.windowSize = options.windowSize || 10;
this.window = [];
this.aggregateFn = options.aggregateFn || this._defaultAggregate;
}
_defaultAggregate(window) {
if (window.length === 0) return null;
// 简单的统计聚合
const numbers = window.filter(item => typeof item === 'number');
if (numbers.length === 0) return { count: window.length };
return {
count: window.length,
sum: numbers.reduce((a, b) => a + b, 0),
avg: numbers.reduce((a, b) => a + b, 0) / numbers.length,
min: Math.min(...numbers),
max: Math.max(...numbers)
};
}
_transform(chunk, encoding, callback) {
this.window.push(chunk);
if (this.window.length >= this.windowSize) {
const aggregated = this.aggregateFn([...this.window]);
if (aggregated !== null) {
this.push(aggregated);
}
// 滑动窗口
this.window = this.window.slice(Math.floor(this.windowSize / 2));
}
callback();
}
_flush(callback) {
// 处理剩余的窗口数据
if (this.window.length > 0) {
const aggregated = this.aggregateFn([...this.window]);
if (aggregated !== null) {
this.push(aggregated);
}
}
callback();
}
}
// 数据验证转换流
class DataValidator extends Transform {
constructor(schema, options = {}) {
super({
...options,
objectMode: true
});
this.schema = schema;
this.validCount = 0;
this.invalidCount = 0;
this.strictMode = options.strictMode !== false;
}
_validateObject(obj) {
const errors = [];
for (const [field, rules] of Object.entries(this.schema)) {
const value = obj[field];
// 检查必需字段
if (rules.required && (value === undefined || value === null)) {
errors.push(`字段 '${field}' 是必需的`);
continue;
}
if (value !== undefined && value !== null) {
// 类型检查
if (rules.type && typeof value !== rules.type) {
errors.push(`字段 '${field}' 类型应为 ${rules.type},实际为 ${typeof value}`);
}
// 范围检查
if (rules.min !== undefined && value < rules.min) {
errors.push(`字段 '${field}' 值 ${value} 小于最小值 ${rules.min}`);
}
if (rules.max !== undefined && value > rules.max) {
errors.push(`字段 '${field}' 值 ${value} 大于最大值 ${rules.max}`);
}
// 长度检查
if (rules.minLength !== undefined && value.length < rules.minLength) {
errors.push(`字段 '${field}' 长度 ${value.length} 小于最小长度 ${rules.minLength}`);
}
if (rules.maxLength !== undefined && value.length > rules.maxLength) {
errors.push(`字段 '${field}' 长度 ${value.length} 大于最大长度 ${rules.maxLength}`);
}
// 正则表达式检查
if (rules.pattern && !rules.pattern.test(value)) {
errors.push(`字段 '${field}' 值 '${value}' 不匹配模式 ${rules.pattern}`);
}
}
}
return errors;
}
_transform(chunk, encoding, callback) {
const errors = this._validateObject(chunk);
if (errors.length === 0) {
this.validCount++;
this.push(chunk);
} else {
this.invalidCount++;
if (this.strictMode) {
callback(new Error(`数据验证失败: ${errors.join(', ')}`));
return;
} else {
// 非严格模式,添加错误信息
this.push({
...chunk,
_validationErrors: errors
});
}
}
callback();
}
getValidationStats() {
return {
valid: this.validCount,
invalid: this.invalidCount,
total: this.validCount + this.invalidCount,
validRate: (this.validCount / (this.validCount + this.invalidCount) * 100).toFixed(2) + '%'
};
}
}
// 使用示例
async function demonstrateTransformStreams() {
console.log('🔄 Transform流演示...\n');
const { Readable } = require('stream');
const { pipeline } = require('stream/promises');
// 1. JSON解析演示
console.log('1. JSON解析转换流:');
const jsonData = new Readable({
read() {
const data = [
'{"id": 1, "name": "Alice", "age": 25}\n',
'{"id": 2, "name": "Bob", "age": 30}\n',
'{"id": 3, "name": "Charlie", "age": 35}\n',
null
];
this.push(data.shift());
}
});
const jsonParser = new JSONParser();
const results = [];
try {
await pipeline(
jsonData,
jsonParser,
new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
results.push(chunk);
console.log(` 解析对象:`, chunk);
callback();
}
})
);
console.log(' ✅ JSON解析完成\n');
} catch (error) {
console.error(' ❌ JSON解析失败:', error.message);
}
// 2. 数据过滤演示
console.log('2. 数据过滤转换流:');
const numberStream = new Readable({
objectMode: true,
read() {
const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, null];
this.push(numbers.shift());
}
});
const evenFilter = new DataFilter(num => typeof num === 'number' && num % 2 === 0);
try {
await pipeline(
numberStream,
evenFilter,
new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
if (chunk._stats) {
console.log(' 过滤统计:', chunk._stats);
} else {
console.log(` 通过过滤的数字: ${chunk}`);
}
callback();
}
})
);
console.log(' ✅ 数据过滤完成\n');
} catch (error) {
console.error(' ❌ 数据过滤失败:', error.message);
}
// 3. 数据聚合演示
console.log('3. 数据聚合转换流:');
const dataStream = new Readable({
objectMode: true,
read() {
const data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, null];
this.push(data.shift());
}
});
const aggregator = new DataAggregator({ windowSize: 5 });
try {
await pipeline(
dataStream,
aggregator,
new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
console.log(' 聚合结果:', chunk);
callback();
}
})
);
console.log(' ✅ 数据聚合完成');
} catch (error) {
console.error(' ❌ 数据聚合失败:', error.message);
}
console.log('\n✅ Transform流演示完成');
}
module.exports = {
JSONParser,
DataFilter,
DataAggregator,
DataValidator,
demonstrateTransformStreams
};
Node.js Streams提供了强大的数据处理能力,是构建高效I/O应用的基础!