Skip to content

可写流

🎯 学习目标

  • 深入理解可写流的工作原理和生命周期
  • 掌握自定义可写流的实现方法
  • 学会可写流的性能优化和背压处理
  • 了解流式数据写入和批量处理技术

📚 核心概念

可写流基础

javascript
// 可写流核心概念
const writableStreamConcepts = {
  states: {
    writable: '可写状态 - 可以接受数据写入',
    ending: '结束状态 - 调用end()后,等待所有数据写完',
    ended: '已结束状态 - 所有数据已写入完成',
    destroyed: '销毁状态 - 流已被销毁',
    error: '错误状态 - 发生写入错误'
  },
  methods: {
    write: '写入数据块',
    end: '结束写入并关闭流',
    destroy: '销毁流',
    cork: '暂停写入操作',
    uncork: '恢复写入操作'
  },
  events: {
    drain: '背压解除,可以继续写入',
    finish: '所有数据写入完成',
    error: '写入过程中发生错误',
    close: '流已关闭',
    pipe: '有可读流管道连接到此流',
    unpipe: '可读流断开管道连接'
  },
  backpressure: {
    description: '背压机制 - 当写入缓冲区满时暂停写入',
    handling: ['监听drain事件', '检查write返回值', '实现流控制']
  }
};

console.log('可写流概念:', writableStreamConcepts);

🛠️ 自定义可写流实现

基础可写流

javascript
// basic-writable-stream.js
const { Writable } = require('stream');
const fs = require('fs');
const path = require('path');

// 文件写入流
class FileWriterStream extends Writable {
  constructor(filePath, options = {}) {
    super(options);
    
    this.filePath = filePath;
    this.fd = null;
    this.bytesWritten = 0;
    this.encoding = options.encoding || 'utf8';
    
    // 确保目录存在
    this.ensureDirectory();
    
    // 打开文件
    this.openFile();
  }

  async ensureDirectory() {
    const dir = path.dirname(this.filePath);
    try {
      await fs.promises.mkdir(dir, { recursive: true });
    } catch (error) {
      this.destroy(error);
    }
  }

  async openFile() {
    try {
      this.fd = await fs.promises.open(this.filePath, 'w');
      console.log(`📝 文件已打开: ${this.filePath}`);
    } catch (error) {
      this.destroy(error);
    }
  }

  _write(chunk, encoding, callback) {
    if (!this.fd) {
      // 文件还未打开,延迟写入
      setTimeout(() => this._write(chunk, encoding, callback), 10);
      return;
    }

    this.writeToFile(chunk, encoding, callback);
  }

  async writeToFile(chunk, encoding, callback) {
    try {
      const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
      const { bytesWritten } = await this.fd.write(buffer);
      
      this.bytesWritten += bytesWritten;
      
      // 发出写入进度事件
      this.emit('progress', {
        bytesWritten: this.bytesWritten,
        chunkSize: bytesWritten
      });
      
      callback();
    } catch (error) {
      callback(error);
    }
  }

  _final(callback) {
    // 流结束时的清理工作
    console.log(`✅ 文件写入完成: ${this.bytesWritten} 字节`);
    callback();
  }

  _destroy(error, callback) {
    // 关闭文件描述符
    if (this.fd) {
      this.fd.close().finally(() => {
        console.log(`🔒 文件已关闭: ${this.filePath}`);
        callback(error);
      });
    } else {
      callback(error);
    }
  }

  getBytesWritten() {
    return this.bytesWritten;
  }
}

// 日志写入流
class LogWriterStream extends Writable {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    
    this.logLevel = options.logLevel || 'info';
    this.format = options.format || 'json';
    this.includeTimestamp = options.includeTimestamp !== false;
    this.includeLevel = options.includeLevel !== false;
    this.colorize = options.colorize && process.stdout.isTTY;
    
    this.colors = {
      error: '\x1b[31m',   // 红色
      warn: '\x1b[33m',    // 黄色
      info: '\x1b[36m',    // 青色
      debug: '\x1b[37m',   // 白色
      reset: '\x1b[0m'     // 重置
    };
  }

  _write(chunk, encoding, callback) {
    try {
      const logEntry = this.processLogEntry(chunk);
      const formatted = this.formatLogEntry(logEntry);
      
      // 写入到标准输出或标准错误
      const output = logEntry.level === 'error' ? process.stderr : process.stdout;
      output.write(formatted + '\n');
      
      callback();
    } catch (error) {
      callback(error);
    }
  }

  processLogEntry(chunk) {
    let logEntry;
    
    if (typeof chunk === 'string') {
      logEntry = {
        level: this.logLevel,
        message: chunk
      };
    } else if (typeof chunk === 'object') {
      logEntry = {
        level: chunk.level || this.logLevel,
        message: chunk.message || JSON.stringify(chunk),
        ...chunk
      };
    } else {
      logEntry = {
        level: this.logLevel,
        message: String(chunk)
      };
    }
    
    if (this.includeTimestamp) {
      logEntry.timestamp = new Date().toISOString();
    }
    
    return logEntry;
  }

  formatLogEntry(logEntry) {
    switch (this.format) {
      case 'json':
        return JSON.stringify(logEntry);
        
      case 'text':
        return this.formatTextEntry(logEntry);
        
      case 'pretty':
        return this.formatPrettyEntry(logEntry);
        
      default:
        return JSON.stringify(logEntry);
    }
  }

  formatTextEntry(logEntry) {
    const parts = [];
    
    if (logEntry.timestamp) {
      parts.push(`[${logEntry.timestamp}]`);
    }
    
    if (this.includeLevel) {
      parts.push(`${logEntry.level.toUpperCase()}`);
    }
    
    parts.push(logEntry.message);
    
    return parts.join(' ');
  }

  formatPrettyEntry(logEntry) {
    let formatted = this.formatTextEntry(logEntry);
    
    if (this.colorize && this.colors[logEntry.level]) {
      formatted = this.colors[logEntry.level] + formatted + this.colors.reset;
    }
    
    return formatted;
  }
}

// 数据验证写入流
class ValidatingWriterStream extends Writable {
  constructor(validator, options = {}) {
    super({ objectMode: true, ...options });
    
    this.validator = validator;
    this.validCount = 0;
    this.invalidCount = 0;
    this.errors = [];
    
    this.onInvalidData = options.onInvalidData || 'error'; // 'error', 'skip', 'log'
  }

  _write(chunk, encoding, callback) {
    try {
      const validationResult = this.validateChunk(chunk);
      
      if (validationResult.valid) {
        this.validCount++;
        this.emit('validData', chunk, validationResult);
        callback();
      } else {
        this.invalidCount++;
        this.handleInvalidData(chunk, validationResult, callback);
      }
    } catch (error) {
      callback(error);
    }
  }

  validateChunk(chunk) {
    try {
      const result = this.validator(chunk);
      
      if (typeof result === 'boolean') {
        return { valid: result, errors: result ? [] : ['Validation failed'] };
      } else if (typeof result === 'object') {
        return result;
      } else {
        return { valid: false, errors: ['Invalid validator result'] };
      }
    } catch (error) {
      return { valid: false, errors: [error.message] };
    }
  }

  handleInvalidData(chunk, validationResult, callback) {
    const error = new Error(`数据验证失败: ${validationResult.errors.join(', ')}`);
    error.chunk = chunk;
    error.validationResult = validationResult;
    
    this.errors.push({
      timestamp: Date.now(),
      chunk,
      errors: validationResult.errors
    });

    switch (this.onInvalidData) {
      case 'error':
        callback(error);
        break;
        
      case 'skip':
        console.warn(`⚠️ 跳过无效数据: ${validationResult.errors.join(', ')}`);
        this.emit('invalidData', chunk, validationResult);
        callback();
        break;
        
      case 'log':
        console.error(`❌ 无效数据: ${validationResult.errors.join(', ')}`);
        this.emit('invalidData', chunk, validationResult);
        callback();
        break;
        
      default:
        callback(error);
    }
  }

  getStats() {
    return {
      valid: this.validCount,
      invalid: this.invalidCount,
      total: this.validCount + this.invalidCount,
      successRate: (this.validCount / (this.validCount + this.invalidCount) * 100).toFixed(2) + '%',
      errors: this.errors.slice(-10) // 最近10个错误
    };
  }
}

// 使用示例
async function demonstrateBasicWritableStreams() {
  console.log('📝 基础可写流演示...\n');

  // 1. 文件写入流演示
  console.log('1. 文件写入流:');
  const fileWriter = new FileWriterStream('./output/test.txt');
  
  fileWriter.on('progress', (progress) => {
    console.log(`  写入进度: ${progress.bytesWritten} 字节 (+${progress.chunkSize})`);
  });

  fileWriter.on('finish', () => {
    console.log(`  ✅ 文件写入完成: ${fileWriter.getBytesWritten()} 字节\n`);
  });

  fileWriter.write('第一行数据\n');
  fileWriter.write('第二行数据\n');
  fileWriter.write('第三行数据\n');
  fileWriter.end();

  // 等待文件写入完成
  await new Promise(resolve => fileWriter.on('finish', resolve));

  // 2. 日志写入流演示
  console.log('2. 日志写入流:');
  const logger = new LogWriterStream({
    format: 'pretty',
    colorize: true
  });

  logger.write({ level: 'info', message: '应用启动' });
  logger.write({ level: 'warn', message: '配置文件缺少某些选项' });
  logger.write({ level: 'error', message: '数据库连接失败', error: 'ECONNREFUSED' });
  logger.write('这是一个普通的日志消息');

  // 3. 数据验证流演示
  console.log('\n3. 数据验证流:');
  const validator = (data) => {
    if (typeof data !== 'object') {
      return { valid: false, errors: ['数据必须是对象'] };
    }
    
    if (!data.id || typeof data.id !== 'number') {
      return { valid: false, errors: ['缺少有效的id字段'] };
    }
    
    if (!data.name || typeof data.name !== 'string') {
      return { valid: false, errors: ['缺少有效的name字段'] };
    }
    
    return { valid: true, errors: [] };
  };

  const validatingWriter = new ValidatingWriterStream(validator, {
    onInvalidData: 'log'
  });

  validatingWriter.on('validData', (data) => {
    console.log(`  ✅ 有效数据: ${JSON.stringify(data)}`);
  });

  validatingWriter.on('invalidData', (data, result) => {
    console.log(`  ❌ 无效数据: ${JSON.stringify(data)} - ${result.errors.join(', ')}`);
  });

  validatingWriter.on('finish', () => {
    const stats = validatingWriter.getStats();
    console.log(`  📊 验证统计: 有效=${stats.valid}, 无效=${stats.invalid}, 成功率=${stats.successRate}\n`);
  });

  // 写入测试数据
  validatingWriter.write({ id: 1, name: 'Alice' }); // 有效
  validatingWriter.write({ id: 2 }); // 无效 - 缺少name
  validatingWriter.write({ name: 'Bob' }); // 无效 - 缺少id
  validatingWriter.write('invalid'); // 无效 - 不是对象
  validatingWriter.write({ id: 3, name: 'Charlie' }); // 有效
  validatingWriter.end();

  // 等待验证完成
  await new Promise(resolve => validatingWriter.on('finish', resolve));
}

module.exports = {
  FileWriterStream,
  LogWriterStream,
  ValidatingWriterStream,
  demonstrateBasicWritableStreams
};

高级可写流实现

javascript
// advanced-writable-stream.js
const { Writable } = require('stream');
const EventEmitter = require('events');

// 批量处理写入流
class BatchWriterStream extends Writable {
  constructor(processor, options = {}) {
    super({ objectMode: true, ...options });
    
    this.processor = processor; // 批处理函数
    this.batchSize = options.batchSize || 100;
    this.flushInterval = options.flushInterval || 1000;
    this.maxRetries = options.maxRetries || 3;
    
    this.batch = [];
    this.flushTimer = null;
    this.processedCount = 0;
    this.failedCount = 0;
    this.retryQueue = [];
    
    this.stats = {
      batchesProcessed: 0,
      itemsProcessed: 0,
      itemsFailed: 0,
      retriesAttempted: 0,
      avgBatchSize: 0,
      avgProcessingTime: 0,
      processingTimes: []
    };
  }

  _write(chunk, encoding, callback) {
    this.batch.push({
      data: chunk,
      timestamp: Date.now(),
      retries: 0
    });

    // 检查是否需要立即处理批次
    if (this.batch.length >= this.batchSize) {
      this.flushBatch();
    } else if (!this.flushTimer) {
      // 设置定时刷新
      this.flushTimer = setTimeout(() => {
        this.flushBatch();
      }, this.flushInterval);
    }

    callback();
  }

  async flushBatch() {
    if (this.flushTimer) {
      clearTimeout(this.flushTimer);
      this.flushTimer = null;
    }

    if (this.batch.length === 0) {
      return;
    }

    const currentBatch = this.batch.splice(0);
    console.log(`📦 处理批次: ${currentBatch.length} 项`);

    try {
      await this.processBatch(currentBatch);
    } catch (error) {
      console.error('❌ 批次处理失败:', error);
      this.handleBatchFailure(currentBatch, error);
    }
  }

  async processBatch(batch) {
    const startTime = Date.now();
    
    try {
      const data = batch.map(item => item.data);
      await this.processor(data);
      
      const processingTime = Date.now() - startTime;
      this.updateStats(batch.length, processingTime, true);
      
      this.processedCount += batch.length;
      console.log(`✅ 批次处理成功: ${batch.length} 项, 耗时 ${processingTime}ms`);
      
      this.emit('batchProcessed', {
        size: batch.length,
        processingTime,
        totalProcessed: this.processedCount
      });
      
    } catch (error) {
      this.updateStats(batch.length, Date.now() - startTime, false);
      throw error;
    }
  }

  handleBatchFailure(batch, error) {
    // 将失败的批次加入重试队列
    batch.forEach(item => {
      if (item.retries < this.maxRetries) {
        item.retries++;
        this.retryQueue.push(item);
        this.stats.retriesAttempted++;
      } else {
        this.failedCount++;
        this.stats.itemsFailed++;
        console.error(`💀 项目最终失败: ${JSON.stringify(item.data)}`);
        
        this.emit('itemFailed', {
          data: item.data,
          error: error.message,
          retries: item.retries
        });
      }
    });

    // 处理重试队列
    this.processRetryQueue();
  }

  async processRetryQueue() {
    if (this.retryQueue.length === 0) {
      return;
    }

    console.log(`🔄 处理重试队列: ${this.retryQueue.length} 项`);
    
    const retryBatch = this.retryQueue.splice(0, this.batchSize);
    
    // 延迟重试
    setTimeout(async () => {
      try {
        await this.processBatch(retryBatch);
      } catch (error) {
        console.error('🔄 重试批次失败:', error);
        this.handleBatchFailure(retryBatch, error);
      }
    }, 1000 * Math.min(retryBatch[0]?.retries || 1, 5)); // 指数退避
  }

  updateStats(batchSize, processingTime, success) {
    this.stats.batchesProcessed++;
    
    if (success) {
      this.stats.itemsProcessed += batchSize;
    }
    
    this.stats.processingTimes.push(processingTime);
    if (this.stats.processingTimes.length > 100) {
      this.stats.processingTimes = this.stats.processingTimes.slice(-100);
    }
    
    this.stats.avgBatchSize = this.stats.itemsProcessed / this.stats.batchesProcessed;
    this.stats.avgProcessingTime = 
      this.stats.processingTimes.reduce((a, b) => a + b, 0) / this.stats.processingTimes.length;
  }

  _final(callback) {
    console.log('🏁 流结束,处理剩余批次...');
    
    // 处理剩余的批次
    this.flushBatch().then(() => {
      // 等待重试队列处理完成
      const waitForRetries = () => {
        if (this.retryQueue.length > 0) {
          setTimeout(waitForRetries, 100);
        } else {
          console.log(`✅ 批量写入完成: 处理=${this.processedCount}, 失败=${this.failedCount}`);
          callback();
        }
      };
      
      waitForRetries();
    }).catch(callback);
  }

  getStats() {
    return {
      ...this.stats,
      successRate: (this.stats.itemsProcessed / (this.stats.itemsProcessed + this.stats.itemsFailed) * 100).toFixed(2) + '%',
      retryRate: (this.stats.retriesAttempted / this.stats.itemsProcessed * 100).toFixed(2) + '%'
    };
  }
}

// 多目标写入流
class MultiTargetWriterStream extends Writable {
  constructor(targets, options = {}) {
    super({ objectMode: true, ...options });
    
    this.targets = targets; // 目标写入流数组
    this.strategy = options.strategy || 'all'; // 'all', 'any', 'majority'
    this.continueOnError = options.continueOnError !== false;
    
    this.targetStats = new Map();
    this.initializeTargets();
  }

  initializeTargets() {
    this.targets.forEach((target, index) => {
      this.targetStats.set(index, {
        writes: 0,
        errors: 0,
        lastError: null,
        isHealthy: true
      });

      target.on('error', (error) => {
        const stats = this.targetStats.get(index);
        stats.errors++;
        stats.lastError = error;
        stats.isHealthy = false;
        
        console.error(`❌ 目标 ${index} 写入错误:`, error.message);
        
        if (!this.continueOnError) {
          this.destroy(error);
        }
      });

      target.on('drain', () => {
        this.emit('drain');
      });
    });
  }

  _write(chunk, encoding, callback) {
    const promises = [];
    const healthyTargets = this.targets.filter((_, index) => 
      this.targetStats.get(index).isHealthy
    );

    if (healthyTargets.length === 0) {
      return callback(new Error('没有健康的目标流'));
    }

    healthyTargets.forEach((target, originalIndex) => {
      const promise = new Promise((resolve, reject) => {
        const canWrite = target.write(chunk, encoding, (error) => {
          if (error) {
            reject(error);
          } else {
            const stats = this.targetStats.get(originalIndex);
            stats.writes++;
            resolve(originalIndex);
          }
        });

        if (!canWrite) {
          // 处理背压
          target.once('drain', () => {
            this.emit('drain');
          });
        }
      });
      
      promises.push(promise);
    });

    this.handleWriteStrategy(promises, callback);
  }

  async handleWriteStrategy(promises, callback) {
    try {
      switch (this.strategy) {
        case 'all':
          await Promise.all(promises);
          callback();
          break;
          
        case 'any':
          await Promise.race(promises);
          callback();
          break;
          
        case 'majority':
          const results = await Promise.allSettled(promises);
          const successful = results.filter(r => r.status === 'fulfilled').length;
          
          if (successful > promises.length / 2) {
            callback();
          } else {
            callback(new Error('多数目标写入失败'));
          }
          break;
          
        default:
          callback(new Error(`未知策略: ${this.strategy}`));
      }
    } catch (error) {
      if (this.continueOnError) {
        console.warn(`⚠️ 写入策略失败,继续处理:`, error.message);
        callback();
      } else {
        callback(error);
      }
    }
  }

  _final(callback) {
    const endPromises = this.targets.map(target => {
      return new Promise((resolve) => {
        if (target.writable) {
          target.end(() => resolve());
        } else {
          resolve();
        }
      });
    });

    Promise.all(endPromises).then(() => {
      console.log('✅ 所有目标流已结束');
      callback();
    }).catch(callback);
  }

  getTargetStats() {
    const stats = {};
    
    this.targetStats.forEach((stat, index) => {
      stats[`target_${index}`] = {
        ...stat,
        successRate: stat.writes / (stat.writes + stat.errors) * 100
      };
    });
    
    return stats;
  }

  // 手动标记目标为不健康
  markTargetUnhealthy(targetIndex) {
    const stats = this.targetStats.get(targetIndex);
    if (stats) {
      stats.isHealthy = false;
      console.log(`🔴 目标 ${targetIndex} 已标记为不健康`);
    }
  }

  // 手动恢复目标健康状态
  markTargetHealthy(targetIndex) {
    const stats = this.targetStats.get(targetIndex);
    if (stats) {
      stats.isHealthy = true;
      stats.errors = 0;
      stats.lastError = null;
      console.log(`🟢 目标 ${targetIndex} 已恢复健康`);
    }
  }
}

// 缓冲写入流
class BufferedWriterStream extends Writable {
  constructor(writer, options = {}) {
    super(options);
    
    this.writer = writer; // 实际写入函数
    this.bufferSize = options.bufferSize || 64 * 1024; // 64KB
    this.flushInterval = options.flushInterval || 5000; // 5秒
    this.autoFlush = options.autoFlush !== false;
    
    this.buffer = Buffer.alloc(0);
    this.flushTimer = null;
    this.bytesBuffered = 0;
    this.totalBytesWritten = 0;
    this.flushCount = 0;
  }

  _write(chunk, encoding, callback) {
    const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
    
    // 添加到缓冲区
    this.buffer = Buffer.concat([this.buffer, buffer]);
    this.bytesBuffered += buffer.length;

    // 检查是否需要刷新
    if (this.buffer.length >= this.bufferSize) {
      this.flush().then(() => callback()).catch(callback);
    } else {
      if (this.autoFlush && !this.flushTimer) {
        this.flushTimer = setTimeout(() => {
          this.flush().catch(error => {
            console.error('自动刷新失败:', error);
            this.emit('error', error);
          });
        }, this.flushInterval);
      }
      callback();
    }
  }

  async flush() {
    if (this.flushTimer) {
      clearTimeout(this.flushTimer);
      this.flushTimer = null;
    }

    if (this.buffer.length === 0) {
      return;
    }

    const dataToWrite = this.buffer;
    this.buffer = Buffer.alloc(0);
    this.bytesBuffered = 0;

    try {
      await this.writer(dataToWrite);
      this.totalBytesWritten += dataToWrite.length;
      this.flushCount++;
      
      console.log(`💾 缓冲区刷新: ${dataToWrite.length} 字节`);
      
      this.emit('flush', {
        bytesWritten: dataToWrite.length,
        totalBytes: this.totalBytesWritten,
        flushCount: this.flushCount
      });
    } catch (error) {
      // 写入失败,恢复缓冲区
      this.buffer = Buffer.concat([dataToWrite, this.buffer]);
      this.bytesBuffered += dataToWrite.length;
      throw error;
    }
  }

  _final(callback) {
    this.flush().then(() => {
      console.log(`✅ 缓冲写入完成: ${this.totalBytesWritten} 字节, ${this.flushCount} 次刷新`);
      callback();
    }).catch(callback);
  }

  // 强制刷新缓冲区
  forceFlush() {
    return this.flush();
  }

  // 获取缓冲区状态
  getBufferStatus() {
    return {
      bufferedBytes: this.buffer.length,
      bufferUtilization: (this.buffer.length / this.bufferSize * 100).toFixed(2) + '%',
      totalBytesWritten: this.totalBytesWritten,
      flushCount: this.flushCount
    };
  }
}

module.exports = {
  BatchWriterStream,
  MultiTargetWriterStream,
  BufferedWriterStream
};

⚡ 背压处理和性能优化

背压管理器

javascript
// backpressure-manager.js
const { Writable } = require('stream');
const EventEmitter = require('events');

class BackpressureManager extends EventEmitter {
  constructor(options = {}) {
    super();
    
    this.options = {
      highWaterMark: options.highWaterMark || 16384,
      maxBackpressureTime: options.maxBackpressureTime || 10000,
      backpressureThreshold: options.backpressureThreshold || 0.8,
      ...options
    };
    
    this.streams = new Map();
    this.backpressureStates = new Map();
    this.stats = {
      totalBackpressureEvents: 0,
      totalBackpressureTime: 0,
      activeBackpressureStreams: 0
    };
  }

  registerStream(streamId, stream) {
    this.streams.set(streamId, stream);
    this.backpressureStates.set(streamId, {
      isBackpressured: false,
      backpressureStartTime: null,
      backpressureCount: 0,
      totalBackpressureTime: 0
    });

    // 监听drain事件
    stream.on('drain', () => {
      this.handleDrainEvent(streamId);
    });

    // 监听写入事件
    const originalWrite = stream.write.bind(stream);
    stream.write = (chunk, encoding, callback) => {
      const result = originalWrite(chunk, encoding, callback);
      
      if (!result) {
        this.handleBackpressure(streamId);
      }
      
      return result;
    };

    console.log(`📊 流已注册到背压管理器: ${streamId}`);
  }

  handleBackpressure(streamId) {
    const state = this.backpressureStates.get(streamId);
    
    if (!state.isBackpressured) {
      state.isBackpressured = true;
      state.backpressureStartTime = Date.now();
      state.backpressureCount++;
      
      this.stats.totalBackpressureEvents++;
      this.stats.activeBackpressureStreams++;
      
      console.log(`⚠️ 流 ${streamId} 进入背压状态`);
      this.emit('backpressureStart', { streamId, count: state.backpressureCount });
      
      // 设置背压超时警告
      setTimeout(() => {
        if (state.isBackpressured) {
          console.warn(`🚨 流 ${streamId} 背压时间过长: ${Date.now() - state.backpressureStartTime}ms`);
          this.emit('backpressureTimeout', { streamId, duration: Date.now() - state.backpressureStartTime });
        }
      }, this.options.maxBackpressureTime);
    }
  }

  handleDrainEvent(streamId) {
    const state = this.backpressureStates.get(streamId);
    
    if (state.isBackpressured) {
      const backpressureDuration = Date.now() - state.backpressureStartTime;
      
      state.isBackpressured = false;
      state.backpressureStartTime = null;
      state.totalBackpressureTime += backpressureDuration;
      
      this.stats.totalBackpressureTime += backpressureDuration;
      this.stats.activeBackpressureStreams--;
      
      console.log(`✅ 流 ${streamId} 背压解除,持续时间: ${backpressureDuration}ms`);
      this.emit('backpressureEnd', { streamId, duration: backpressureDuration });
    }
  }

  getStreamStats(streamId) {
    const state = this.backpressureStates.get(streamId);
    
    if (!state) {
      return null;
    }
    
    return {
      streamId,
      isBackpressured: state.isBackpressured,
      backpressureCount: state.backpressureCount,
      totalBackpressureTime: state.totalBackpressureTime,
      avgBackpressureTime: state.backpressureCount > 0 
        ? state.totalBackpressureTime / state.backpressureCount 
        : 0
    };
  }

  getAllStats() {
    const streamStats = {};
    
    for (const streamId of this.streams.keys()) {
      streamStats[streamId] = this.getStreamStats(streamId);
    }
    
    return {
      global: this.stats,
      streams: streamStats
    };
  }

  // 自适应写入速率控制
  createAdaptiveWriter(streamId, baseRate = 1000) {
    const stream = this.streams.get(streamId);
    const state = this.backpressureStates.get(streamId);
    
    if (!stream || !state) {
      throw new Error(`流 ${streamId} 未注册`);
    }

    let currentRate = baseRate;
    let lastWriteTime = 0;

    return {
      write: (chunk, encoding) => {
        return new Promise((resolve, reject) => {
          const now = Date.now();
          const timeSinceLastWrite = now - lastWriteTime;
          const targetInterval = 1000 / currentRate;
          
          const delay = Math.max(0, targetInterval - timeSinceLastWrite);
          
          setTimeout(() => {
            const result = stream.write(chunk, encoding, (error) => {
              if (error) {
                reject(error);
              } else {
                resolve(result);
              }
            });
            
            lastWriteTime = Date.now();
            
            // 根据背压状态调整写入速率
            if (state.isBackpressured) {
              currentRate = Math.max(baseRate * 0.1, currentRate * 0.8);
            } else {
              currentRate = Math.min(baseRate, currentRate * 1.1);
            }
            
            if (!result) {
              // 等待drain事件
              stream.once('drain', () => resolve(true));
            }
          }, delay);
        });
      },
      
      getCurrentRate: () => currentRate,
      getBaseRate: () => baseRate
    };
  }
}

// 性能优化的可写流
class PerformantWritableStream extends Writable {
  constructor(options = {}) {
    super(options);
    
    this.options = {
      enableCompression: options.enableCompression,
      compressionThreshold: options.compressionThreshold || 1024,
      enableBatching: options.enableBatching,
      batchSize: options.batchSize || 10,
      batchTimeout: options.batchTimeout || 100,
      enableCaching: options.enableCaching,
      cacheSize: options.cacheSize || 100,
      ...options
    };
    
    this.batch = [];
    this.batchTimer = null;
    this.cache = new Map();
    this.stats = {
      writes: 0,
      batches: 0,
      compressions: 0,
      cacheHits: 0,
      cacheMisses: 0,
      totalBytes: 0,
      compressedBytes: 0
    };
  }

  _write(chunk, encoding, callback) {
    this.stats.writes++;
    
    if (this.options.enableBatching) {
      this.addToBatch(chunk, encoding, callback);
    } else {
      this.processChunk(chunk, encoding, callback);
    }
  }

  addToBatch(chunk, encoding, callback) {
    this.batch.push({ chunk, encoding, callback });
    
    if (this.batch.length >= this.options.batchSize) {
      this.processBatch();
    } else if (!this.batchTimer) {
      this.batchTimer = setTimeout(() => {
        this.processBatch();
      }, this.options.batchTimeout);
    }
  }

  processBatch() {
    if (this.batchTimer) {
      clearTimeout(this.batchTimer);
      this.batchTimer = null;
    }
    
    if (this.batch.length === 0) {
      return;
    }
    
    const currentBatch = this.batch.splice(0);
    this.stats.batches++;
    
    console.log(`📦 处理批次: ${currentBatch.length} 项`);
    
    // 合并数据块
    const chunks = currentBatch.map(item => 
      Buffer.isBuffer(item.chunk) ? item.chunk : Buffer.from(item.chunk, item.encoding)
    );
    
    const combinedChunk = Buffer.concat(chunks);
    
    this.processChunk(combinedChunk, 'buffer', (error) => {
      // 调用所有回调
      currentBatch.forEach(item => item.callback(error));
    });
  }

  async processChunk(chunk, encoding, callback) {
    try {
      let processedChunk = chunk;
      
      // 缓存检查
      if (this.options.enableCaching) {
        const cacheKey = this.generateCacheKey(chunk);
        const cached = this.cache.get(cacheKey);
        
        if (cached) {
          this.stats.cacheHits++;
          processedChunk = cached;
        } else {
          this.stats.cacheMisses++;
        }
      }
      
      // 压缩处理
      if (this.options.enableCompression && chunk.length > this.options.compressionThreshold) {
        processedChunk = await this.compressChunk(chunk);
        
        if (this.options.enableCaching) {
          this.updateCache(this.generateCacheKey(chunk), processedChunk);
        }
      }
      
      // 执行实际写入
      await this.performWrite(processedChunk, encoding);
      
      this.stats.totalBytes += chunk.length;
      if (processedChunk !== chunk) {
        this.stats.compressedBytes += processedChunk.length;
      }
      
      callback();
    } catch (error) {
      callback(error);
    }
  }

  async compressChunk(chunk) {
    const zlib = require('zlib');
    
    try {
      const compressed = await new Promise((resolve, reject) => {
        zlib.gzip(chunk, (error, result) => {
          if (error) reject(error);
          else resolve(result);
        });
      });
      
      this.stats.compressions++;
      console.log(`🗜️ 数据压缩: ${chunk.length} -> ${compressed.length} 字节 (${((1 - compressed.length / chunk.length) * 100).toFixed(1)}% 压缩率)`);
      
      return compressed;
    } catch (error) {
      console.warn('压缩失败,使用原始数据:', error.message);
      return chunk;
    }
  }

  generateCacheKey(chunk) {
    const crypto = require('crypto');
    return crypto.createHash('md5').update(chunk).digest('hex');
  }

  updateCache(key, value) {
    if (this.cache.size >= this.options.cacheSize) {
      // LRU清理
      const firstKey = this.cache.keys().next().value;
      this.cache.delete(firstKey);
    }
    
    this.cache.set(key, value);
  }

  async performWrite(chunk, encoding) {
    // 子类实现具体的写入逻辑
    console.log(`✍️ 写入数据: ${chunk.length} 字节`);
    
    // 模拟异步写入
    await new Promise(resolve => setTimeout(resolve, 1));
  }

  _final(callback) {
    // 处理剩余批次
    if (this.batch.length > 0) {
      this.processBatch();
    }
    
    // 等待所有写入完成
    setTimeout(() => {
      console.log('📊 写入统计:', this.getStats());
      callback();
    }, 100);
  }

  getStats() {
    return {
      ...this.stats,
      compressionRatio: this.stats.compressedBytes > 0 
        ? ((this.stats.totalBytes - this.stats.compressedBytes) / this.stats.totalBytes * 100).toFixed(1) + '%'
        : '0%',
      cacheHitRate: this.stats.cacheHits + this.stats.cacheMisses > 0
        ? (this.stats.cacheHits / (this.stats.cacheHits + this.stats.cacheMisses) * 100).toFixed(1) + '%'
        : '0%',
      avgBatchSize: this.stats.batches > 0 ? (this.stats.writes / this.stats.batches).toFixed(1) : '0'
    };
  }
}

// 使用示例
async function demonstrateBackpressureHandling() {
  console.log('⚡ 背压处理演示...\n');

  const backpressureManager = new BackpressureManager();
  
  // 创建一个慢速写入流来模拟背压
  const slowWriter = new Writable({
    highWaterMark: 1024,
    write(chunk, encoding, callback) {
      // 模拟慢速写入
      setTimeout(() => {
        console.log(`慢速写入: ${chunk.length} 字节`);
        callback();
      }, 100);
    }
  });

  backpressureManager.registerStream('slow-writer', slowWriter);

  backpressureManager.on('backpressureStart', ({ streamId, count }) => {
    console.log(`🚨 背压开始: 流=${streamId}, 第${count}次`);
  });

  backpressureManager.on('backpressureEnd', ({ streamId, duration }) => {
    console.log(`✅ 背压结束: 流=${streamId}, 持续=${duration}ms`);
  });

  // 创建自适应写入器
  const adaptiveWriter = backpressureManager.createAdaptiveWriter('slow-writer', 50);

  // 快速写入大量数据
  for (let i = 0; i < 20; i++) {
    const data = `数据块 ${i}: ${'x'.repeat(200)}\n`;
    await adaptiveWriter.write(data);
    console.log(`当前写入速率: ${adaptiveWriter.getCurrentRate().toFixed(1)} writes/sec`);
  }

  slowWriter.end();
  
  // 等待完成
  await new Promise(resolve => slowWriter.on('finish', resolve));

  console.log('\n📊 最终统计:');
  console.log(JSON.stringify(backpressureManager.getAllStats(), null, 2));
}

module.exports = {
  BackpressureManager,
  PerformantWritableStream,
  demonstrateBackpressureHandling
};

可写流是Node.js数据处理的重要组件,掌握其高级用法和优化技巧对构建高效的数据写入系统至关重要!