Skip to content

转换流

🎯 学习目标

  • 深入理解转换流的工作原理和应用场景
  • 掌握自定义转换流的实现方法
  • 学会数据转换、过滤和聚合技术
  • 了解流水线处理和性能优化

📚 核心概念

转换流基础

javascript
// 转换流核心概念
const transformStreamConcepts = {
  definition: {
    description: '转换流是同时可读和可写的流',
    inheritance: '继承自Duplex流',
    purpose: '对通过流的数据进行转换处理'
  },
  methods: {
    _transform: '处理每个数据块的核心方法',
    _flush: '流结束前的最终处理方法',
    push: '向可读端推送转换后的数据',
    callback: '处理完成后的回调函数'
  },
  patterns: {
    oneToOne: '一对一转换 - 每个输入产生一个输出',
    oneToMany: '一对多转换 - 一个输入产生多个输出',
    manyToOne: '多对一转换 - 多个输入合并为一个输出',
    filter: '过滤模式 - 选择性输出数据',
    aggregate: '聚合模式 - 累积处理数据'
  }
};

console.log('转换流概念:', transformStreamConcepts);

🛠️ 基础转换流实现

数据转换流

javascript
// basic-transform-streams.js
const { Transform } = require('stream');

// JSON解析转换流
class JSONParseStream extends Transform {
  constructor(options = {}) {
    super({ 
      objectMode: true,
      ...options 
    });
    
    this.strict = options.strict !== false;
    this.skipErrors = options.skipErrors === true;
    this.errorCount = 0;
    this.successCount = 0;
  }

  _transform(chunk, encoding, callback) {
    try {
      const jsonString = chunk.toString().trim();
      
      if (!jsonString) {
        return callback();
      }
      
      const parsed = JSON.parse(jsonString);
      this.successCount++;
      
      this.push(parsed);
      callback();
      
    } catch (error) {
      this.errorCount++;
      
      if (this.skipErrors) {
        console.warn(`⚠️ JSON解析错误 (已跳过): ${error.message}`);
        this.emit('parseError', { chunk, error });
        callback();
      } else if (this.strict) {
        callback(error);
      } else {
        // 非严格模式,推送原始数据
        this.push({ 
          _parseError: error.message, 
          _rawData: chunk.toString() 
        });
        callback();
      }
    }
  }

  _flush(callback) {
    console.log(`📊 JSON解析完成: 成功=${this.successCount}, 错误=${this.errorCount}`);
    callback();
  }
}

// 数据验证转换流
class ValidationStream extends Transform {
  constructor(schema, options = {}) {
    super({ 
      objectMode: true,
      ...options 
    });
    
    this.schema = schema;
    this.validCount = 0;
    this.invalidCount = 0;
    this.onInvalid = options.onInvalid || 'error'; // 'error', 'skip', 'mark'
  }

  _transform(chunk, encoding, callback) {
    try {
      const validation = this.validateData(chunk);
      
      if (validation.valid) {
        this.validCount++;
        this.push(chunk);
        callback();
      } else {
        this.invalidCount++;
        this.handleInvalidData(chunk, validation, callback);
      }
    } catch (error) {
      callback(error);
    }
  }

  validateData(data) {
    const errors = [];
    
    for (const [field, rules] of Object.entries(this.schema)) {
      const value = data[field];
      
      if (rules.required && (value === undefined || value === null)) {
        errors.push(`字段 ${field} 是必需的`);
        continue;
      }
      
      if (value !== undefined && rules.type) {
        const actualType = Array.isArray(value) ? 'array' : typeof value;
        if (actualType !== rules.type) {
          errors.push(`字段 ${field} 类型错误: 期望 ${rules.type}, 实际 ${actualType}`);
        }
      }
      
      if (value !== undefined && rules.validate) {
        try {
          if (!rules.validate(value)) {
            errors.push(`字段 ${field} 验证失败`);
          }
        } catch (error) {
          errors.push(`字段 ${field} 验证函数错误: ${error.message}`);
        }
      }
    }
    
    return {
      valid: errors.length === 0,
      errors: errors
    };
  }

  handleInvalidData(data, validation, callback) {
    switch (this.onInvalid) {
      case 'error':
        callback(new Error(`数据验证失败: ${validation.errors.join(', ')}`));
        break;
        
      case 'skip':
        console.warn(`⚠️ 跳过无效数据: ${validation.errors.join(', ')}`);
        this.emit('invalidData', { data, errors: validation.errors });
        callback();
        break;
        
      case 'mark':
        const markedData = {
          ...data,
          _validation: {
            valid: false,
            errors: validation.errors
          }
        };
        this.push(markedData);
        callback();
        break;
        
      default:
        callback(new Error(`未知的无效数据处理方式: ${this.onInvalid}`));
    }
  }

  _flush(callback) {
    const total = this.validCount + this.invalidCount;
    const successRate = total > 0 ? (this.validCount / total * 100).toFixed(2) : '0';
    
    console.log(`📊 数据验证完成: 有效=${this.validCount}, 无效=${this.invalidCount}, 成功率=${successRate}%`);
    callback();
  }
}

// 数据格式化转换流
class FormatStream extends Transform {
  constructor(formatter, options = {}) {
    super({ 
      objectMode: true,
      ...options 
    });
    
    this.formatter = formatter;
    this.formatCount = 0;
  }

  _transform(chunk, encoding, callback) {
    try {
      const formatted = this.formatter(chunk);
      this.formatCount++;
      
      if (formatted !== undefined && formatted !== null) {
        this.push(formatted);
      }
      
      callback();
    } catch (error) {
      callback(error);
    }
  }

  _flush(callback) {
    console.log(`📊 数据格式化完成: ${this.formatCount} 项`);
    callback();
  }
}

// CSV转换流
class CSVTransformStream extends Transform {
  constructor(options = {}) {
    super({
      objectMode: true,
      ...options
    });
    
    this.headers = options.headers || null;
    this.delimiter = options.delimiter || ',';
    this.quote = options.quote || '"';
    this.escape = options.escape || '"';
    this.skipHeader = options.skipHeader === true;
    this.autoDetectHeaders = options.autoDetectHeaders !== false;
    
    this.isFirstRow = true;
    this.rowCount = 0;
  }

  _transform(chunk, encoding, callback) {
    try {
      const line = chunk.toString().trim();
      
      if (!line) {
        return callback();
      }
      
      const fields = this.parseCSVLine(line);
      
      if (this.isFirstRow) {
        this.isFirstRow = false;
        
        if (this.autoDetectHeaders && !this.headers) {
          this.headers = fields;
          
          if (this.skipHeader) {
            return callback();
          }
        }
      }
      
      let result;
      
      if (this.headers) {
        result = {};
        fields.forEach((field, index) => {
          if (this.headers[index]) {
            result[this.headers[index]] = field;
          }
        });
      } else {
        result = fields;
      }
      
      this.rowCount++;
      this.push(result);
      callback();
      
    } catch (error) {
      callback(error);
    }
  }

  parseCSVLine(line) {
    const fields = [];
    let currentField = '';
    let inQuotes = false;
    let i = 0;
    
    while (i < line.length) {
      const char = line[i];
      const nextChar = line[i + 1];
      
      if (char === this.quote) {
        if (inQuotes && nextChar === this.quote) {
          // 转义引号
          currentField += this.quote;
          i += 2;
        } else {
          // 开始或结束引号
          inQuotes = !inQuotes;
          i++;
        }
      } else if (char === this.delimiter && !inQuotes) {
        // 字段分隔符
        fields.push(currentField);
        currentField = '';
        i++;
      } else {
        currentField += char;
        i++;
      }
    }
    
    fields.push(currentField);
    return fields;
  }

  _flush(callback) {
    console.log(`📊 CSV转换完成: ${this.rowCount} 行`);
    callback();
  }
}

// 使用示例
async function demonstrateBasicTransforms() {
  console.log('🔄 基础转换流演示...\n');

  // 1. JSON解析流
  console.log('1. JSON解析转换:');
  const { Readable } = require('stream');
  
  const jsonData = [
    '{"name": "Alice", "age": 30}',
    '{"name": "Bob", "age": 25}',
    'invalid json',
    '{"name": "Charlie", "age": 35}'
  ];

  const jsonSource = Readable.from(jsonData);
  const jsonParser = new JSONParseStream({ skipErrors: true });

  jsonSource.pipe(jsonParser);

  jsonParser.on('data', (data) => {
    console.log('  解析结果:', data);
  });

  await new Promise(resolve => jsonParser.on('end', resolve));

  // 2. 数据验证流
  console.log('\n2. 数据验证转换:');
  const schema = {
    name: { required: true, type: 'string' },
    age: { required: true, type: 'number', validate: (age) => age > 0 && age < 150 },
    email: { type: 'string', validate: (email) => email.includes('@') }
  };

  const validationData = [
    { name: 'Alice', age: 30, email: 'alice@example.com' },
    { name: 'Bob', age: -5 }, // 无效年龄
    { age: 25 }, // 缺少name
    { name: 'Charlie', age: 35, email: 'invalid-email' }
  ];

  const validationSource = Readable.from(validationData);
  const validator = new ValidationStream(schema, { onInvalid: 'mark' });

  validationSource.pipe(validator);

  validator.on('data', (data) => {
    console.log('  验证结果:', data);
  });

  await new Promise(resolve => validator.on('end', resolve));

  // 3. 数据格式化流
  console.log('\n3. 数据格式化转换:');
  const formatter = (data) => {
    if (data.name && data.age) {
      return `${data.name} (${data.age}岁)`;
    }
    return null;
  };

  const formatData = [
    { name: 'Alice', age: 30 },
    { name: 'Bob', age: 25 },
    { invalid: 'data' }
  ];

  const formatSource = Readable.from(formatData);
  const formatStream = new FormatStream(formatter);

  formatSource.pipe(formatStream);

  formatStream.on('data', (data) => {
    console.log('  格式化结果:', data);
  });

  await new Promise(resolve => formatStream.on('end', resolve));
}

module.exports = {
  JSONParseStream,
  ValidationStream,
  FormatStream,
  CSVTransformStream,
  demonstrateBasicTransforms
};

高级转换流实现

javascript
// advanced-transform-streams.js
const { Transform } = require('stream');

// 聚合转换流
class AggregateStream extends Transform {
  constructor(options = {}) {
    super({ 
      objectMode: true,
      ...options 
    });
    
    this.windowSize = options.windowSize || 100;
    this.groupBy = options.groupBy || null;
    this.aggregators = options.aggregators || {};
    this.emitInterval = options.emitInterval || null;
    
    this.buffer = [];
    this.groups = new Map();
    this.processedCount = 0;
    
    if (this.emitInterval) {
      this.intervalTimer = setInterval(() => {
        this.emitAggregates();
      }, this.emitInterval);
    }
  }

  _transform(chunk, encoding, callback) {
    this.processedCount++;
    
    if (this.groupBy) {
      this.processGroupedData(chunk);
    } else {
      this.buffer.push(chunk);
      
      if (this.buffer.length >= this.windowSize) {
        this.processBuffer();
      }
    }
    
    callback();
  }

  processGroupedData(chunk) {
    const groupKey = typeof this.groupBy === 'function' 
      ? this.groupBy(chunk) 
      : chunk[this.groupBy];
    
    if (!this.groups.has(groupKey)) {
      this.groups.set(groupKey, []);
    }
    
    this.groups.get(groupKey).push(chunk);
    
    // 检查组大小
    if (this.groups.get(groupKey).length >= this.windowSize) {
      this.processGroup(groupKey);
    }
  }

  processGroup(groupKey) {
    const groupData = this.groups.get(groupKey);
    const aggregate = this.calculateAggregates(groupData);
    
    this.push({
      group: groupKey,
      count: groupData.length,
      ...aggregate,
      timestamp: Date.now()
    });
    
    this.groups.set(groupKey, []); // 清空组
  }

  processBuffer() {
    const aggregate = this.calculateAggregates(this.buffer);
    
    this.push({
      count: this.buffer.length,
      ...aggregate,
      timestamp: Date.now()
    });
    
    this.buffer = [];
  }

  calculateAggregates(data) {
    const result = {};
    
    for (const [name, aggregator] of Object.entries(this.aggregators)) {
      try {
        result[name] = aggregator(data);
      } catch (error) {
        console.error(`聚合器 ${name} 执行失败:`, error);
        result[name] = null;
      }
    }
    
    return result;
  }

  emitAggregates() {
    // 发射所有待处理的聚合
    if (this.buffer.length > 0) {
      this.processBuffer();
    }
    
    for (const groupKey of this.groups.keys()) {
      if (this.groups.get(groupKey).length > 0) {
        this.processGroup(groupKey);
      }
    }
  }

  _flush(callback) {
    if (this.intervalTimer) {
      clearInterval(this.intervalTimer);
    }
    
    // 处理剩余数据
    this.emitAggregates();
    
    console.log(`📊 聚合转换完成: 处理 ${this.processedCount} 项`);
    callback();
  }
}

// 批量处理转换流
class BatchTransformStream extends Transform {
  constructor(processor, options = {}) {
    super({ 
      objectMode: true,
      ...options 
    });
    
    this.processor = processor;
    this.batchSize = options.batchSize || 10;
    this.batchTimeout = options.batchTimeout || 1000;
    this.parallel = options.parallel || false;
    this.maxConcurrency = options.maxConcurrency || 5;
    
    this.batch = [];
    this.batchTimer = null;
    this.activeBatches = 0;
    this.processedBatches = 0;
  }

  _transform(chunk, encoding, callback) {
    this.batch.push(chunk);
    
    if (this.batch.length >= this.batchSize) {
      this.processBatch();
    } else if (!this.batchTimer) {
      this.batchTimer = setTimeout(() => {
        this.processBatch();
      }, this.batchTimeout);
    }
    
    callback();
  }

  async processBatch() {
    if (this.batchTimer) {
      clearTimeout(this.batchTimer);
      this.batchTimer = null;
    }
    
    if (this.batch.length === 0) {
      return;
    }
    
    const currentBatch = this.batch.splice(0);
    
    if (this.parallel) {
      this.processParallelBatch(currentBatch);
    } else {
      await this.processSequentialBatch(currentBatch);
    }
  }

  async processParallelBatch(batch) {
    if (this.activeBatches >= this.maxConcurrency) {
      // 等待有空闲槽位
      await new Promise(resolve => {
        const checkSlot = () => {
          if (this.activeBatches < this.maxConcurrency) {
            resolve();
          } else {
            setTimeout(checkSlot, 10);
          }
        };
        checkSlot();
      });
    }
    
    this.activeBatches++;
    
    try {
      const results = await this.processor(batch);
      this.emitResults(results);
    } catch (error) {
      console.error('并行批处理失败:', error);
      this.emit('error', error);
    } finally {
      this.activeBatches--;
      this.processedBatches++;
    }
  }

  async processSequentialBatch(batch) {
    try {
      const results = await this.processor(batch);
      this.emitResults(results);
      this.processedBatches++;
    } catch (error) {
      console.error('顺序批处理失败:', error);
      this.emit('error', error);
    }
  }

  emitResults(results) {
    if (Array.isArray(results)) {
      results.forEach(result => this.push(result));
    } else if (results !== undefined) {
      this.push(results);
    }
  }

  async _flush(callback) {
    // 处理剩余批次
    await this.processBatch();
    
    // 等待所有并行批次完成
    while (this.activeBatches > 0) {
      await new Promise(resolve => setTimeout(resolve, 10));
    }
    
    console.log(`📊 批量处理完成: ${this.processedBatches} 个批次`);
    callback();
  }
}

// 流水线转换流
class PipelineTransformStream extends Transform {
  constructor(transformers, options = {}) {
    super({ 
      objectMode: true,
      ...options 
    });
    
    this.transformers = transformers;
    this.processedCount = 0;
    this.errorCount = 0;
    this.continueOnError = options.continueOnError === true;
  }

  _transform(chunk, encoding, callback) {
    this.processChunk(chunk, 0, callback);
  }

  async processChunk(data, transformerIndex, callback) {
    if (transformerIndex >= this.transformers.length) {
      // 所有转换器处理完成
      this.processedCount++;
      this.push(data);
      return callback();
    }
    
    try {
      const transformer = this.transformers[transformerIndex];
      const result = await this.applyTransformer(transformer, data);
      
      if (result !== undefined && result !== null) {
        this.processChunk(result, transformerIndex + 1, callback);
      } else {
        // 数据被过滤掉
        callback();
      }
    } catch (error) {
      this.errorCount++;
      
      if (this.continueOnError) {
        console.warn(`⚠️ 转换器 ${transformerIndex} 处理失败:`, error.message);
        this.emit('transformError', { data, transformerIndex, error });
        callback();
      } else {
        callback(error);
      }
    }
  }

  async applyTransformer(transformer, data) {
    if (typeof transformer === 'function') {
      return transformer(data);
    } else if (transformer && typeof transformer.transform === 'function') {
      return transformer.transform(data);
    } else {
      throw new Error('无效的转换器');
    }
  }

  _flush(callback) {
    const successRate = this.processedCount + this.errorCount > 0 
      ? (this.processedCount / (this.processedCount + this.errorCount) * 100).toFixed(2)
      : '0';
    
    console.log(`📊 流水线处理完成: 成功=${this.processedCount}, 错误=${this.errorCount}, 成功率=${successRate}%`);
    callback();
  }
}

// 缓存转换流
class CachedTransformStream extends Transform {
  constructor(transformer, options = {}) {
    super({ 
      objectMode: true,
      ...options 
    });
    
    this.transformer = transformer;
    this.cache = new Map();
    this.maxCacheSize = options.maxCacheSize || 1000;
    this.ttl = options.ttl || 300000; // 5分钟
    this.keyGenerator = options.keyGenerator || JSON.stringify;
    
    this.cacheHits = 0;
    this.cacheMisses = 0;
    
    // 定期清理过期缓存
    this.cleanupInterval = setInterval(() => {
      this.cleanupExpiredCache();
    }, 60000); // 每分钟清理一次
  }

  _transform(chunk, encoding, callback) {
    const cacheKey = this.keyGenerator(chunk);
    const cached = this.cache.get(cacheKey);
    
    if (cached && Date.now() - cached.timestamp < this.ttl) {
      // 缓存命中
      this.cacheHits++;
      this.push(cached.result);
      callback();
    } else {
      // 缓存未命中,执行转换
      this.cacheMisses++;
      this.executeTransform(chunk, cacheKey, callback);
    }
  }

  async executeTransform(chunk, cacheKey, callback) {
    try {
      const result = await this.transformer(chunk);
      
      // 存储到缓存
      this.storeInCache(cacheKey, result);
      
      if (result !== undefined && result !== null) {
        this.push(result);
      }
      
      callback();
    } catch (error) {
      callback(error);
    }
  }

  storeInCache(key, result) {
    // LRU清理
    if (this.cache.size >= this.maxCacheSize) {
      const firstKey = this.cache.keys().next().value;
      this.cache.delete(firstKey);
    }
    
    this.cache.set(key, {
      result: result,
      timestamp: Date.now()
    });
  }

  cleanupExpiredCache() {
    const now = Date.now();
    const expiredKeys = [];
    
    for (const [key, value] of this.cache) {
      if (now - value.timestamp > this.ttl) {
        expiredKeys.push(key);
      }
    }
    
    expiredKeys.forEach(key => this.cache.delete(key));
    
    if (expiredKeys.length > 0) {
      console.log(`🧹 清理过期缓存: ${expiredKeys.length} 项`);
    }
  }

  _destroy(error, callback) {
    if (this.cleanupInterval) {
      clearInterval(this.cleanupInterval);
    }
    callback(error);
  }

  _flush(callback) {
    const totalRequests = this.cacheHits + this.cacheMisses;
    const hitRate = totalRequests > 0 ? (this.cacheHits / totalRequests * 100).toFixed(2) : '0';
    
    console.log(`📊 缓存转换完成: 命中=${this.cacheHits}, 未命中=${this.cacheMisses}, 命中率=${hitRate}%`);
    callback();
  }

  getCacheStats() {
    return {
      size: this.cache.size,
      maxSize: this.maxCacheSize,
      hits: this.cacheHits,
      misses: this.cacheMisses,
      hitRate: this.cacheHits + this.cacheMisses > 0 
        ? (this.cacheHits / (this.cacheHits + this.cacheMisses) * 100).toFixed(2) + '%'
        : '0%'
    };
  }
}

module.exports = {
  AggregateStream,
  BatchTransformStream,
  PipelineTransformStream,
  CachedTransformStream
};

🔧 实用转换流工具

通用转换工具集

javascript
// transform-utilities.js
const { Transform, pipeline } = require('stream');

// 转换流工厂
class TransformFactory {
  // 创建映射转换流
  static map(mapper) {
    return new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        try {
          const result = mapper(chunk);
          if (result !== undefined) {
            this.push(result);
          }
          callback();
        } catch (error) {
          callback(error);
        }
      }
    });
  }

  // 创建过滤转换流
  static filter(predicate) {
    return new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        try {
          if (predicate(chunk)) {
            this.push(chunk);
          }
          callback();
        } catch (error) {
          callback(error);
        }
      }
    });
  }

  // 创建归约转换流
  static reduce(reducer, initialValue) {
    let accumulator = initialValue;
    
    return new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        try {
          accumulator = reducer(accumulator, chunk);
          callback();
        } catch (error) {
          callback(error);
        }
      },
      flush(callback) {
        this.push(accumulator);
        callback();
      }
    });
  }

  // 创建去重转换流
  static unique(keySelector) {
    const seen = new Set();
    
    return new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        try {
          const key = keySelector ? keySelector(chunk) : chunk;
          
          if (!seen.has(key)) {
            seen.add(key);
            this.push(chunk);
          }
          
          callback();
        } catch (error) {
          callback(error);
        }
      }
    });
  }

  // 创建排序转换流
  static sort(compareFn) {
    const items = [];
    
    return new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        items.push(chunk);
        callback();
      },
      flush(callback) {
        try {
          const sorted = items.sort(compareFn);
          sorted.forEach(item => this.push(item));
          callback();
        } catch (error) {
          callback(error);
        }
      }
    });
  }

  // 创建限制转换流
  static take(count) {
    let taken = 0;
    
    return new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        if (taken < count) {
          this.push(chunk);
          taken++;
        }
        
        if (taken >= count) {
          this.push(null); // 结束流
        }
        
        callback();
      }
    });
  }

  // 创建跳过转换流
  static skip(count) {
    let skipped = 0;
    
    return new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        if (skipped < count) {
          skipped++;
        } else {
          this.push(chunk);
        }
        callback();
      }
    });
  }
}

// 流水线构建器
class StreamPipelineBuilder {
  constructor() {
    this.transforms = [];
  }

  map(mapper) {
    this.transforms.push(TransformFactory.map(mapper));
    return this;
  }

  filter(predicate) {
    this.transforms.push(TransformFactory.filter(predicate));
    return this;
  }

  unique(keySelector) {
    this.transforms.push(TransformFactory.unique(keySelector));
    return this;
  }

  sort(compareFn) {
    this.transforms.push(TransformFactory.sort(compareFn));
    return this;
  }

  take(count) {
    this.transforms.push(TransformFactory.take(count));
    return this;
  }

  skip(count) {
    this.transforms.push(TransformFactory.skip(count));
    return this;
  }

  custom(transform) {
    this.transforms.push(transform);
    return this;
  }

  build() {
    return this.transforms;
  }

  // 执行流水线
  async execute(source, destination) {
    return new Promise((resolve, reject) => {
      const streams = [source, ...this.transforms];
      
      if (destination) {
        streams.push(destination);
      }

      pipeline(...streams, (error) => {
        if (error) {
          reject(error);
        } else {
          resolve();
        }
      });
    });
  }
}

// 使用示例
async function demonstrateTransformUtilities() {
  console.log('🔧 转换流工具演示...\n');

  const { Readable } = require('stream');

  // 示例数据
  const data = [
    { id: 1, name: 'Alice', age: 30, city: 'New York' },
    { id: 2, name: 'Bob', age: 25, city: 'London' },
    { id: 3, name: 'Alice', age: 35, city: 'Paris' },
    { id: 4, name: 'Charlie', age: 40, city: 'Tokyo' },
    { id: 5, name: 'David', age: 28, city: 'New York' },
    { id: 6, name: 'Eve', age: 32, city: 'London' }
  ];

  const source = Readable.from(data);

  // 构建流水线
  const pipeline = new StreamPipelineBuilder()
    .filter(person => person.age > 25)  // 过滤年龄大于25的
    .map(person => ({                   // 转换数据格式
      ...person,
      ageGroup: person.age < 30 ? 'young' : 'adult',
      displayName: `${person.name} (${person.age})`
    }))
    .unique(person => person.name)      // 按姓名去重
    .sort((a, b) => b.age - a.age)      // 按年龄降序排序
    .take(3);                           // 只取前3个

  // 收集结果
  const results = [];
  const collector = new Transform({
    objectMode: true,
    transform(chunk, encoding, callback) {
      results.push(chunk);
      callback();
    }
  });

  pipeline.custom(collector);

  // 执行流水线
  await pipeline.execute(source);

  console.log('流水线处理结果:');
  results.forEach((result, index) => {
    console.log(`${index + 1}. ${JSON.stringify(result)}`);
  });
}

module.exports = {
  TransformFactory,
  StreamPipelineBuilder,
  demonstrateTransformUtilities
};

转换流是Node.js流处理中最灵活和强大的组件,通过合理的设计和组合可以构建出高效的数据处理流水线!