可读流
🎯 学习目标
- 深入理解可读流的工作原理和生命周期
- 掌握自定义可读流的实现方法
- 学会可读流的性能优化和错误处理
- 了解背压处理和流控制机制
📚 核心概念
可读流基础
javascript
// 可读流的核心概念
const readableStreamConcepts = {
modes: {
flowing: {
description: '流动模式 - 数据自动流出',
triggers: ['data事件监听', 'pipe()调用', 'resume()调用'],
characteristics: ['自动读取', '事件驱动', '高吞吐量']
},
paused: {
description: '暂停模式 - 手动读取数据',
triggers: ['创建时默认', 'pause()调用', '移除data监听器'],
characteristics: ['手动控制', 'read()方法', '精确控制']
}
},
states: {
readable: '可读状态 - 有数据可读',
ended: '结束状态 - 没有更多数据',
destroyed: '销毁状态 - 流已被销毁'
},
buffers: {
internal: '内部缓冲区 - 存储待读取数据',
highWaterMark: '高水位标记 - 缓冲区大小限制'
}
};
console.log('可读流概念:', readableStreamConcepts);
🛠️ 自定义可读流实现
基础可读流
javascript
// basic-readable-stream.js
const { Readable } = require('stream');
const fs = require('fs');
// 数字序列生成器
class NumberSequenceStream extends Readable {
constructor(options = {}) {
super(options);
this.start = options.start || 1;
this.end = options.end || 100;
this.current = this.start;
this.step = options.step || 1;
this.delay = options.delay || 0;
}
_read() {
if (this.current <= this.end) {
const data = {
number: this.current,
timestamp: Date.now(),
isEven: this.current % 2 === 0
};
if (this.delay > 0) {
setTimeout(() => {
this.push(JSON.stringify(data) + '\n');
}, this.delay);
} else {
this.push(JSON.stringify(data) + '\n');
}
this.current += this.step;
} else {
// 结束流
this.push(null);
}
}
}
// 文件行读取器
class LineReaderStream extends Readable {
constructor(filePath, options = {}) {
super(options);
this.filePath = filePath;
this.fd = null;
this.position = 0;
this.buffer = Buffer.alloc(0);
this.encoding = options.encoding || 'utf8';
this.chunkSize = options.chunkSize || 64 * 1024; // 64KB
this.lineNumber = 0;
this._openFile();
}
async _openFile() {
try {
this.fd = await fs.promises.open(this.filePath, 'r');
} catch (error) {
this.destroy(error);
}
}
_read() {
if (!this.fd) {
// 文件还未打开,稍后重试
setTimeout(() => this._read(), 10);
return;
}
this._readChunk();
}
async _readChunk() {
try {
const chunk = Buffer.alloc(this.chunkSize);
const { bytesRead } = await this.fd.read(chunk, 0, this.chunkSize, this.position);
if (bytesRead === 0) {
// 文件读取完毕,处理剩余缓冲区
if (this.buffer.length > 0) {
this._emitLine(this.buffer.toString(this.encoding));
}
this.push(null);
return;
}
this.position += bytesRead;
this.buffer = Buffer.concat([this.buffer, chunk.slice(0, bytesRead)]);
this._processBuffer();
} catch (error) {
this.destroy(error);
}
}
_processBuffer() {
let lineStart = 0;
for (let i = 0; i < this.buffer.length; i++) {
if (this.buffer[i] === 0x0A) { // \n
const line = this.buffer.slice(lineStart, i).toString(this.encoding);
this._emitLine(line);
lineStart = i + 1;
}
}
// 保留未完成的行
if (lineStart < this.buffer.length) {
this.buffer = this.buffer.slice(lineStart);
} else {
this.buffer = Buffer.alloc(0);
}
}
_emitLine(line) {
this.lineNumber++;
const lineData = {
number: this.lineNumber,
content: line.replace(/\r$/, ''), // 移除\r
length: line.length
};
this.push(JSON.stringify(lineData) + '\n');
}
_destroy(error, callback) {
if (this.fd) {
this.fd.close().finally(() => callback(error));
} else {
callback(error);
}
}
}
// HTTP响应流包装器
class HTTPResponseStream extends Readable {
constructor(url, options = {}) {
super(options);
this.url = url;
this.response = null;
this.headers = null;
this.statusCode = null;
this._makeRequest();
}
_makeRequest() {
const https = require('https');
const http = require('http');
const { URL } = require('url');
const parsedUrl = new URL(this.url);
const client = parsedUrl.protocol === 'https:' ? https : http;
const request = client.get(this.url, (response) => {
this.response = response;
this.headers = response.headers;
this.statusCode = response.statusCode;
// 发出响应头信息
this.emit('response', {
statusCode: response.statusCode,
headers: response.headers
});
response.on('data', (chunk) => {
this.push(chunk);
});
response.on('end', () => {
this.push(null);
});
response.on('error', (error) => {
this.destroy(error);
});
});
request.on('error', (error) => {
this.destroy(error);
});
}
_read() {
// HTTP流是被动的,不需要主动读取
}
}
// 使用示例
async function demonstrateBasicReadableStreams() {
console.log('🔍 基础可读流演示...\n');
// 1. 数字序列生成器
console.log('1. 数字序列生成器:');
const numberStream = new NumberSequenceStream({
start: 1,
end: 5,
delay: 200
});
numberStream.on('data', (chunk) => {
const data = JSON.parse(chunk.toString().trim());
console.log(` 生成数字: ${data.number}, 偶数: ${data.isEven}`);
});
numberStream.on('end', () => {
console.log(' ✅ 数字序列生成完成\n');
});
// 等待数字流完成
await new Promise(resolve => numberStream.on('end', resolve));
// 2. 对象模式流
console.log('2. 对象模式可读流:');
const objectStream = new Readable({
objectMode: true,
read() {
const users = [
{ id: 1, name: 'Alice', role: 'admin' },
{ id: 2, name: 'Bob', role: 'user' },
{ id: 3, name: 'Charlie', role: 'moderator' },
null // 结束标记
];
this.push(users.shift());
}
});
objectStream.on('data', (user) => {
console.log(` 用户: ${user.name} (${user.role})`);
});
objectStream.on('end', () => {
console.log(' ✅ 对象流结束\n');
});
// 3. 可控流演示
console.log('3. 流控制演示:');
const controlledStream = new Readable({
read() {
// 不立即推送数据
}
});
let counter = 0;
const pushData = () => {
counter++;
if (counter <= 3) {
controlledStream.push(`数据块 ${counter}\n`);
setTimeout(pushData, 500);
} else {
controlledStream.push(null);
}
};
setTimeout(pushData, 100);
controlledStream.on('data', (chunk) => {
console.log(` 接收: ${chunk.toString().trim()}`);
});
controlledStream.on('end', () => {
console.log(' ✅ 控制流结束');
});
}
module.exports = {
NumberSequenceStream,
LineReaderStream,
HTTPResponseStream,
demonstrateBasicReadableStreams
};
高级可读流实现
javascript
// advanced-readable-stream.js
const { Readable } = require('stream');
const EventEmitter = require('events');
// 数据库查询流
class DatabaseQueryStream extends Readable {
constructor(query, options = {}) {
super({ objectMode: true, ...options });
this.query = query;
this.batchSize = options.batchSize || 100;
this.offset = 0;
this.isFinished = false;
this.totalRows = 0;
// 模拟数据库连接
this.db = options.db || this._createMockDB();
}
_createMockDB() {
// 模拟数据库
return {
query: async (sql, params) => {
// 模拟查询延迟
await new Promise(resolve => setTimeout(resolve, 50));
const offset = params[0] || 0;
const limit = params[1] || this.batchSize;
// 生成模拟数据
const rows = [];
for (let i = 0; i < limit && offset + i < 1000; i++) {
rows.push({
id: offset + i + 1,
name: `User ${offset + i + 1}`,
email: `user${offset + i + 1}@example.com`,
created_at: new Date()
});
}
return rows;
}
};
}
async _read() {
if (this.isFinished) {
return;
}
try {
const rows = await this.db.query(
`${this.query} LIMIT ? OFFSET ?`,
[this.batchSize, this.offset]
);
if (rows.length === 0) {
this.isFinished = true;
this.push(null);
return;
}
// 推送每一行数据
for (const row of rows) {
this.push(row);
this.totalRows++;
}
this.offset += rows.length;
// 如果返回的行数少于批次大小,说明已经到末尾
if (rows.length < this.batchSize) {
this.isFinished = true;
this.push(null);
}
} catch (error) {
this.destroy(error);
}
}
getTotalRows() {
return this.totalRows;
}
}
// 分页数据流
class PaginatedDataStream extends Readable {
constructor(dataSource, options = {}) {
super({ objectMode: true, ...options });
this.dataSource = dataSource;
this.pageSize = options.pageSize || 50;
this.currentPage = 1;
this.totalPages = 0;
this.isLoading = false;
}
async _read() {
if (this.isLoading) {
return;
}
if (this.totalPages > 0 && this.currentPage > this.totalPages) {
this.push(null);
return;
}
this.isLoading = true;
try {
const result = await this.dataSource.getPage(this.currentPage, this.pageSize);
if (this.totalPages === 0) {
this.totalPages = result.totalPages;
}
if (result.data && result.data.length > 0) {
for (const item of result.data) {
this.push(item);
}
}
this.currentPage++;
if (this.currentPage > this.totalPages || result.data.length === 0) {
this.push(null);
}
} catch (error) {
this.destroy(error);
} finally {
this.isLoading = false;
}
}
getCurrentPage() {
return this.currentPage - 1;
}
getTotalPages() {
return this.totalPages;
}
}
// 实时数据流
class RealTimeDataStream extends Readable {
constructor(dataSource, options = {}) {
super({ objectMode: true, ...options });
this.dataSource = dataSource;
this.interval = options.interval || 1000;
this.isActive = false;
this.timer = null;
this.buffer = [];
this.maxBufferSize = options.maxBufferSize || 1000;
}
_read() {
if (!this.isActive) {
this.startPolling();
}
// 从缓冲区推送数据
while (this.buffer.length > 0) {
const data = this.buffer.shift();
if (!this.push(data)) {
// 背压处理 - 停止推送
break;
}
}
}
startPolling() {
this.isActive = true;
const poll = async () => {
try {
const data = await this.dataSource.getLatestData();
if (data) {
if (this.buffer.length < this.maxBufferSize) {
this.buffer.push(data);
} else {
// 缓冲区满,丢弃最老的数据
this.buffer.shift();
this.buffer.push(data);
this.emit('bufferOverflow', { droppedData: true });
}
}
// 尝试推送数据
this._read();
} catch (error) {
this.emit('error', error);
}
if (this.isActive) {
this.timer = setTimeout(poll, this.interval);
}
};
poll();
}
stopPolling() {
this.isActive = false;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
_destroy(error, callback) {
this.stopPolling();
callback(error);
}
setInterval(newInterval) {
this.interval = newInterval;
if (this.isActive) {
this.stopPolling();
this.startPolling();
}
}
}
// 多源合并流
class MergedReadableStream extends Readable {
constructor(sources, options = {}) {
super({ objectMode: true, ...options });
this.sources = sources;
this.activeSources = new Set(sources);
this.buffer = [];
this.mergeStrategy = options.mergeStrategy || 'round-robin'; // 'round-robin', 'priority', 'timestamp'
this.currentSourceIndex = 0;
this.setupSources();
}
setupSources() {
this.sources.forEach((source, index) => {
source.on('data', (data) => {
this.handleSourceData(data, index);
});
source.on('end', () => {
this.activeSources.delete(source);
if (this.activeSources.size === 0) {
this.push(null);
}
});
source.on('error', (error) => {
this.destroy(error);
});
});
}
handleSourceData(data, sourceIndex) {
const enrichedData = {
...data,
sourceIndex,
timestamp: Date.now()
};
this.buffer.push(enrichedData);
this._read();
}
_read() {
if (this.buffer.length === 0) {
return;
}
let dataToSend;
switch (this.mergeStrategy) {
case 'round-robin':
dataToSend = this.buffer.shift();
break;
case 'priority':
// 按源索引优先级排序
this.buffer.sort((a, b) => a.sourceIndex - b.sourceIndex);
dataToSend = this.buffer.shift();
break;
case 'timestamp':
// 按时间戳排序
this.buffer.sort((a, b) => a.timestamp - b.timestamp);
dataToSend = this.buffer.shift();
break;
default:
dataToSend = this.buffer.shift();
}
this.push(dataToSend);
}
_destroy(error, callback) {
this.sources.forEach(source => {
if (source.destroy) {
source.destroy();
}
});
callback(error);
}
}
module.exports = {
DatabaseQueryStream,
PaginatedDataStream,
RealTimeDataStream,
MergedReadableStream
};
⚡ 性能优化和背压处理
背压处理机制
javascript
// backpressure-handling.js
const { Readable, pipeline } = require('stream');
const { promisify } = require('util');
class BackpressureAwareStream extends Readable {
constructor(options = {}) {
super(options);
this.dataQueue = [];
this.isReading = false;
this.highWaterMark = options.highWaterMark || 16384;
this.backpressureCount = 0;
this.totalPushed = 0;
// 性能监控
this.stats = {
backpressureEvents: 0,
totalDataGenerated: 0,
bufferOverflows: 0,
avgProcessingTime: 0,
processingTimes: []
};
}
_read(size) {
if (this.isReading) {
return;
}
this.isReading = true;
this.processQueue();
}
processQueue() {
const startTime = Date.now();
while (this.dataQueue.length > 0) {
const data = this.dataQueue.shift();
// 尝试推送数据
const canPushMore = this.push(data);
this.totalPushed++;
if (!canPushMore) {
// 遇到背压,停止推送
this.backpressureCount++;
this.stats.backpressureEvents++;
console.log(`⚠️ 背压检测,暂停数据推送 (队列剩余: ${this.dataQueue.length})`);
break;
}
}
const processingTime = Date.now() - startTime;
this.stats.processingTimes.push(processingTime);
if (this.stats.processingTimes.length > 100) {
this.stats.processingTimes = this.stats.processingTimes.slice(-100);
}
this.stats.avgProcessingTime =
this.stats.processingTimes.reduce((a, b) => a + b, 0) / this.stats.processingTimes.length;
this.isReading = false;
}
addData(data) {
this.dataQueue.push(data);
this.stats.totalDataGenerated++;
// 检查缓冲区是否过载
if (this.dataQueue.length > this.highWaterMark * 2) {
this.stats.bufferOverflows++;
console.warn(`🔴 缓冲区过载,丢弃最旧数据 (队列长度: ${this.dataQueue.length})`);
// 丢弃一半的旧数据
const dropCount = Math.floor(this.dataQueue.length / 2);
this.dataQueue.splice(0, dropCount);
}
// 如果不在读取状态,尝试处理队列
if (!this.isReading) {
setImmediate(() => this.processQueue());
}
}
endStream() {
// 处理剩余数据
this.processQueue();
this.push(null);
}
getStats() {
return {
...this.stats,
queueLength: this.dataQueue.length,
totalPushed: this.totalPushed,
backpressureRate: (this.stats.backpressureEvents / this.totalPushed * 100).toFixed(2) + '%'
};
}
}
// 自适应流速控制
class AdaptiveRateStream extends Readable {
constructor(options = {}) {
super(options);
this.baseRate = options.baseRate || 100; // 每秒数据项
this.currentRate = this.baseRate;
this.minRate = options.minRate || 10;
this.maxRate = options.maxRate || 1000;
this.adaptationFactor = options.adaptationFactor || 0.1;
this.lastReadTime = Date.now();
this.dataCounter = 0;
this.backpressureCounter = 0;
this.startDataGeneration();
}
startDataGeneration() {
const generateData = () => {
if (this.destroyed) return;
const data = {
id: this.dataCounter++,
timestamp: Date.now(),
rate: this.currentRate,
data: `Data item ${this.dataCounter}`
};
const canPush = this.push(JSON.stringify(data) + '\n');
if (!canPush) {
// 遇到背压,降低速率
this.backpressureCounter++;
this.adaptRate(false);
} else {
// 成功推送,可以尝试提高速率
this.adaptRate(true);
}
// 根据当前速率计算下次生成间隔
const interval = Math.max(1, 1000 / this.currentRate);
setTimeout(generateData, interval);
};
generateData();
}
adaptRate(successful) {
if (successful) {
// 成功推送,适度提高速率
this.currentRate = Math.min(
this.maxRate,
this.currentRate * (1 + this.adaptationFactor)
);
} else {
// 遇到背压,降低速率
this.currentRate = Math.max(
this.minRate,
this.currentRate * (1 - this.adaptationFactor * 2)
);
}
}
_read() {
this.lastReadTime = Date.now();
}
getPerformanceStats() {
return {
currentRate: this.currentRate.toFixed(2),
baseRate: this.baseRate,
dataGenerated: this.dataCounter,
backpressureEvents: this.backpressureCounter,
adaptationRatio: (this.currentRate / this.baseRate).toFixed(2)
};
}
}
// 流性能分析器
class StreamPerformanceAnalyzer {
constructor() {
this.metrics = {
throughput: [],
latency: [],
backpressure: [],
memory: []
};
}
analyzeStream(stream, duration = 10000) {
return new Promise((resolve) => {
const startTime = Date.now();
let dataCount = 0;
let totalLatency = 0;
let backpressureEvents = 0;
const measureInterval = setInterval(() => {
const memUsage = process.memoryUsage();
this.metrics.memory.push({
timestamp: Date.now(),
heapUsed: memUsage.heapUsed,
heapTotal: memUsage.heapTotal
});
}, 1000);
stream.on('data', (chunk) => {
dataCount++;
// 尝试解析时间戳计算延迟
try {
const data = JSON.parse(chunk.toString().trim());
if (data.timestamp) {
const latency = Date.now() - data.timestamp;
totalLatency += latency;
this.metrics.latency.push(latency);
}
} catch (error) {
// 忽略解析错误
}
});
stream.on('drain', () => {
backpressureEvents++;
this.metrics.backpressure.push({
timestamp: Date.now(),
event: 'drain'
});
});
setTimeout(() => {
clearInterval(measureInterval);
const endTime = Date.now();
const actualDuration = endTime - startTime;
const analysis = {
duration: actualDuration,
throughput: {
itemsPerSecond: (dataCount / actualDuration * 1000).toFixed(2),
totalItems: dataCount
},
latency: {
average: totalLatency > 0 ? (totalLatency / dataCount).toFixed(2) + 'ms' : 'N/A',
samples: this.metrics.latency.length
},
backpressure: {
events: backpressureEvents,
rate: (backpressureEvents / dataCount * 100).toFixed(2) + '%'
},
memory: {
peak: Math.max(...this.metrics.memory.map(m => m.heapUsed)),
average: this.metrics.memory.reduce((sum, m) => sum + m.heapUsed, 0) / this.metrics.memory.length
}
};
resolve(analysis);
}, duration);
});
}
generateReport(analysis) {
console.log('\n📊 流性能分析报告:');
console.log('='.repeat(40));
console.log(`测试时长: ${analysis.duration}ms`);
console.log(`吞吐量: ${analysis.throughput.itemsPerSecond} items/sec`);
console.log(`总处理项目: ${analysis.throughput.totalItems}`);
console.log(`平均延迟: ${analysis.latency.average}`);
console.log(`背压事件: ${analysis.backpressure.events} (${analysis.backpressure.rate})`);
console.log(`峰值内存: ${(analysis.memory.peak / 1024 / 1024).toFixed(2)}MB`);
console.log(`平均内存: ${(analysis.memory.average / 1024 / 1024).toFixed(2)}MB`);
}
}
// 使用示例
async function demonstrateBackpressureHandling() {
console.log('⚡ 背压处理演示...\n');
const analyzer = new StreamPerformanceAnalyzer();
// 1. 背压感知流测试
console.log('1. 背压感知流测试:');
const backpressureStream = new BackpressureAwareStream({ highWaterMark: 1024 });
// 模拟快速数据生成
let counter = 0;
const dataGenerator = setInterval(() => {
if (counter < 1000) {
backpressureStream.addData(`数据项 ${counter++}\n`);
} else {
clearInterval(dataGenerator);
backpressureStream.endStream();
}
}, 1);
let receivedCount = 0;
backpressureStream.on('data', () => {
receivedCount++;
});
backpressureStream.on('end', () => {
const stats = backpressureStream.getStats();
console.log(' 背压统计:', stats);
console.log(` 接收数据: ${receivedCount}\n`);
});
// 等待背压流完成
await new Promise(resolve => backpressureStream.on('end', resolve));
// 2. 自适应速率流测试
console.log('2. 自适应速率流测试:');
const adaptiveStream = new AdaptiveRateStream({
baseRate: 50,
maxRate: 200,
minRate: 5
});
// 分析自适应流性能
const adaptiveAnalysis = await analyzer.analyzeStream(adaptiveStream, 5000);
console.log(' 自适应流统计:', adaptiveStream.getPerformanceStats());
analyzer.generateReport(adaptiveAnalysis);
adaptiveStream.destroy();
}
module.exports = {
BackpressureAwareStream,
AdaptiveRateStream,
StreamPerformanceAnalyzer,
demonstrateBackpressureHandling
};
可读流是Node.js流系统的基础,掌握其原理和优化技巧对构建高效的数据处理应用至关重要!