Skip to content

子进程管理

🎯 学习目标

  • 深入理解Node.js子进程的创建和管理机制
  • 掌握不同子进程API的使用场景和最佳实践
  • 学会进程间通信和数据传输技术
  • 了解子进程的监控、调度和资源管理

📚 核心概念

子进程基础

javascript
// 子进程核心概念
const childProcessConcepts = {
  types: {
    spawn: {
      description: '启动新进程执行命令',
      characteristics: ['流式I/O', '实时输出', '长时间运行'],
      use_cases: ['执行系统命令', '启动服务', '实时数据处理']
    },
    exec: {
      description: '在shell中执行命令',
      characteristics: ['缓冲输出', '简单接口', 'shell特性'],
      use_cases: ['简单命令执行', 'shell脚本', '管道操作']
    },
    execFile: {
      description: '直接执行文件',
      characteristics: ['不通过shell', '更安全', '更快'],
      use_cases: ['执行可执行文件', '安全执行', '性能敏感场景']
    },
    fork: {
      description: '创建Node.js子进程',
      characteristics: ['IPC通道', 'Node.js环境', '模块加载'],
      use_cases: ['CPU密集任务', '并行处理', '微服务架构']
    }
  },
  communication: {
    stdio: 'stdin/stdout/stderr流通信',
    ipc: '进程间通信通道',
    signals: '信号通信',
    files: '文件系统通信',
    network: '网络通信'
  },
  lifecycle: {
    creation: '进程创建',
    execution: '进程执行',
    communication: '进程通信',
    monitoring: '进程监控',
    termination: '进程终止'
  }
};

console.log('子进程概念:', childProcessConcepts);

🚀 基础子进程操作

子进程创建和管理

javascript
// basic-child-process.js
const { spawn, exec, execFile, fork } = require('child_process');
const path = require('path');
const fs = require('fs');

class ChildProcessManager {
  constructor() {
    this.processes = new Map();
    this.processId = 0;
  }

  // 使用spawn创建子进程
  createSpawnProcess(command, args = [], options = {}) {
    const processId = ++this.processId;
    console.log(`🚀 创建spawn进程 ${processId}: ${command} ${args.join(' ')}`);

    const child = spawn(command, args, {
      stdio: ['pipe', 'pipe', 'pipe'],
      ...options
    });

    const processInfo = {
      id: processId,
      type: 'spawn',
      command: command,
      args: args,
      child: child,
      startTime: Date.now(),
      status: 'running',
      output: [],
      errors: []
    };

    // 监听输出
    child.stdout.on('data', (data) => {
      const output = data.toString();
      processInfo.output.push(output);
      console.log(`📤 进程 ${processId} 输出: ${output.trim()}`);
    });

    child.stderr.on('data', (data) => {
      const error = data.toString();
      processInfo.errors.push(error);
      console.error(`❌ 进程 ${processId} 错误: ${error.trim()}`);
    });

    // 监听进程事件
    child.on('close', (code, signal) => {
      processInfo.status = 'closed';
      processInfo.exitCode = code;
      processInfo.signal = signal;
      processInfo.endTime = Date.now();
      
      console.log(`🏁 进程 ${processId} 关闭: 代码=${code}, 信号=${signal}`);
    });

    child.on('error', (error) => {
      processInfo.status = 'error';
      processInfo.error = error.message;
      console.error(`💥 进程 ${processId} 启动失败:`, error.message);
    });

    this.processes.set(processId, processInfo);
    return { processId, child, processInfo };
  }

  // 使用exec执行命令
  async executeCommand(command, options = {}) {
    const processId = ++this.processId;
    console.log(`⚡ 执行命令 ${processId}: ${command}`);

    return new Promise((resolve, reject) => {
      const startTime = Date.now();
      
      const child = exec(command, {
        timeout: 30000,
        maxBuffer: 1024 * 1024, // 1MB
        ...options
      }, (error, stdout, stderr) => {
        const endTime = Date.now();
        const duration = endTime - startTime;

        const result = {
          processId: processId,
          command: command,
          duration: duration,
          stdout: stdout,
          stderr: stderr,
          error: error
        };

        if (error) {
          console.error(`❌ 命令 ${processId} 执行失败 (${duration}ms):`, error.message);
          reject(result);
        } else {
          console.log(`✅ 命令 ${processId} 执行成功 (${duration}ms)`);
          resolve(result);
        }
      });

      const processInfo = {
        id: processId,
        type: 'exec',
        command: command,
        child: child,
        startTime: startTime,
        status: 'running'
      };

      this.processes.set(processId, processInfo);
    });
  }

  // 使用execFile执行文件
  async executeFile(file, args = [], options = {}) {
    const processId = ++this.processId;
    console.log(`📁 执行文件 ${processId}: ${file} ${args.join(' ')}`);

    return new Promise((resolve, reject) => {
      const startTime = Date.now();
      
      const child = execFile(file, args, {
        timeout: 30000,
        maxBuffer: 1024 * 1024,
        ...options
      }, (error, stdout, stderr) => {
        const endTime = Date.now();
        const duration = endTime - startTime;

        const result = {
          processId: processId,
          file: file,
          args: args,
          duration: duration,
          stdout: stdout,
          stderr: stderr,
          error: error
        };

        if (error) {
          console.error(`❌ 文件 ${processId} 执行失败 (${duration}ms):`, error.message);
          reject(result);
        } else {
          console.log(`✅ 文件 ${processId} 执行成功 (${duration}ms)`);
          resolve(result);
        }
      });

      const processInfo = {
        id: processId,
        type: 'execFile',
        file: file,
        args: args,
        child: child,
        startTime: startTime,
        status: 'running'
      };

      this.processes.set(processId, processInfo);
    });
  }

  // 使用fork创建Node.js子进程
  createNodeProcess(modulePath, args = [], options = {}) {
    const processId = ++this.processId;
    console.log(`🍴 创建Node进程 ${processId}: ${modulePath}`);

    const child = fork(modulePath, args, {
      silent: false,
      ...options
    });

    const processInfo = {
      id: processId,
      type: 'fork',
      modulePath: modulePath,
      args: args,
      child: child,
      startTime: Date.now(),
      status: 'running',
      messages: []
    };

    // 监听消息
    child.on('message', (message) => {
      processInfo.messages.push({
        timestamp: Date.now(),
        message: message
      });
      console.log(`💬 进程 ${processId} 消息:`, message);
    });

    // 监听进程事件
    child.on('close', (code, signal) => {
      processInfo.status = 'closed';
      processInfo.exitCode = code;
      processInfo.signal = signal;
      processInfo.endTime = Date.now();
      
      console.log(`🏁 Node进程 ${processId} 关闭: 代码=${code}, 信号=${signal}`);
    });

    child.on('error', (error) => {
      processInfo.status = 'error';
      processInfo.error = error.message;
      console.error(`💥 Node进程 ${processId} 启动失败:`, error.message);
    });

    this.processes.set(processId, processInfo);
    return { processId, child, processInfo };
  }

  // 发送消息给Node子进程
  sendMessage(processId, message) {
    const processInfo = this.processes.get(processId);
    
    if (!processInfo) {
      throw new Error(`进程 ${processId} 不存在`);
    }

    if (processInfo.type !== 'fork') {
      throw new Error(`进程 ${processId} 不是Node子进程,无法发送消息`);
    }

    if (processInfo.status !== 'running') {
      throw new Error(`进程 ${processId} 未运行`);
    }

    processInfo.child.send(message);
    console.log(`📨 向进程 ${processId} 发送消息:`, message);
  }

  // 终止进程
  terminateProcess(processId, signal = 'SIGTERM') {
    const processInfo = this.processes.get(processId);
    
    if (!processInfo) {
      throw new Error(`进程 ${processId} 不存在`);
    }

    if (processInfo.status !== 'running') {
      console.warn(`进程 ${processId} 已经停止`);
      return;
    }

    console.log(`🛑 终止进程 ${processId} (信号: ${signal})`);
    processInfo.child.kill(signal);
    
    // 设置超时强制杀死
    setTimeout(() => {
      if (processInfo.status === 'running') {
        console.log(`💀 强制杀死进程 ${processId}`);
        processInfo.child.kill('SIGKILL');
      }
    }, 5000);
  }

  // 获取进程信息
  getProcessInfo(processId) {
    return this.processes.get(processId);
  }

  // 获取所有进程信息
  getAllProcesses() {
    return Array.from(this.processes.values());
  }

  // 获取运行中的进程
  getRunningProcesses() {
    return Array.from(this.processes.values()).filter(p => p.status === 'running');
  }

  // 清理已结束的进程
  cleanup() {
    const toRemove = [];
    
    for (const [id, processInfo] of this.processes) {
      if (processInfo.status === 'closed' || processInfo.status === 'error') {
        toRemove.push(id);
      }
    }
    
    for (const id of toRemove) {
      this.processes.delete(id);
    }
    
    console.log(`🧹 清理了 ${toRemove.length} 个已结束的进程`);
  }

  // 终止所有进程
  terminateAll() {
    const runningProcesses = this.getRunningProcesses();
    
    console.log(`🛑 终止所有进程 (${runningProcesses.length} 个)`);
    
    for (const processInfo of runningProcesses) {
      this.terminateProcess(processInfo.id);
    }
  }
}

// 使用示例
async function demonstrateBasicChildProcess() {
  console.log('🚀 基础子进程操作演示...\n');

  const manager = new ChildProcessManager();

  try {
    // 1. 使用spawn执行长时间运行的命令
    console.log('1. Spawn进程 - 执行ping命令:');
    const { processId: pingId } = manager.createSpawnProcess('ping', ['-c', '3', 'localhost']);
    
    // 等待ping完成
    await new Promise(resolve => {
      const checkPing = () => {
        const info = manager.getProcessInfo(pingId);
        if (info.status !== 'running') {
          resolve();
        } else {
          setTimeout(checkPing, 100);
        }
      };
      checkPing();
    });

    // 2. 使用exec执行简单命令
    console.log('\n2. Exec命令 - 获取系统信息:');
    try {
      const result = await manager.executeCommand('node --version');
      console.log('  Node.js版本:', result.stdout.trim());
    } catch (error) {
      console.error('  执行失败:', error.stderr);
    }

    // 3. 使用execFile执行文件
    console.log('\n3. ExecFile - 执行Node脚本:');
    try {
      const result = await manager.executeFile('node', ['-e', 'console.log("Hello from execFile!")']);
      console.log('  输出:', result.stdout.trim());
    } catch (error) {
      console.error('  执行失败:', error.stderr);
    }

    // 4. 创建简单的worker脚本并fork
    console.log('\n4. Fork进程 - 创建Node子进程:');
    
    // 创建临时worker文件
    const workerScript = `
      console.log('Worker进程启动, PID:', process.pid);
      
      process.on('message', (message) => {
        console.log('Worker收到消息:', message);
        
        if (message.type === 'calculate') {
          const result = message.a + message.b;
          process.send({ type: 'result', result: result });
        } else if (message.type === 'exit') {
          process.exit(0);
        }
      });
      
      // 发送就绪消息
      process.send({ type: 'ready', pid: process.pid });
    `;
    
    const workerPath = path.join(__dirname, 'temp-worker.js');
    fs.writeFileSync(workerPath, workerScript);

    const { processId: workerId, child: workerChild } = manager.createNodeProcess(workerPath);
    
    // 等待worker就绪
    await new Promise(resolve => {
      const messageHandler = (message) => {
        if (message.type === 'ready') {
          workerChild.removeListener('message', messageHandler);
          resolve();
        }
      };
      workerChild.on('message', messageHandler);
    });

    // 发送计算任务
    manager.sendMessage(workerId, { type: 'calculate', a: 10, b: 20 });
    
    // 等待结果
    await new Promise(resolve => {
      const messageHandler = (message) => {
        if (message.type === 'result') {
          console.log('  计算结果:', message.result);
          workerChild.removeListener('message', messageHandler);
          resolve();
        }
      };
      workerChild.on('message', messageHandler);
    });

    // 终止worker
    manager.sendMessage(workerId, { type: 'exit' });
    
    // 清理临时文件
    fs.unlinkSync(workerPath);

    // 5. 显示进程统计
    console.log('\n5. 进程统计:');
    const allProcesses = manager.getAllProcesses();
    console.log(`  总进程数: ${allProcesses.length}`);
    
    allProcesses.forEach(proc => {
      const duration = proc.endTime ? proc.endTime - proc.startTime : Date.now() - proc.startTime;
      console.log(`  进程 ${proc.id} (${proc.type}): ${proc.status}, 运行时间: ${duration}ms`);
    });

  } catch (error) {
    console.error('演示过程中发生错误:', error);
  } finally {
    // 清理所有进程
    manager.terminateAll();
    
    // 等待清理完成
    setTimeout(() => {
      manager.cleanup();
    }, 1000);
  }
}

module.exports = {
  ChildProcessManager,
  demonstrateBasicChildProcess
};

高级子进程管理

javascript
// advanced-child-process.js
const { spawn, fork } = require('child_process');
const EventEmitter = require('events');
const path = require('path');
const fs = require('fs');

// 进程池管理器
class ProcessPool extends EventEmitter {
  constructor(options = {}) {
    super();
    
    this.options = {
      min: options.min || 2,
      max: options.max || 10,
      idleTimeout: options.idleTimeout || 300000, // 5分钟
      maxTasks: options.maxTasks || 1000,
      workerScript: options.workerScript,
      ...options
    };
    
    this.workers = [];
    this.availableWorkers = [];
    this.busyWorkers = new Set();
    this.taskQueue = [];
    this.totalTasks = 0;
    this.completedTasks = 0;
    this.failedTasks = 0;
    
    this.initialize();
  }

  // 初始化进程池
  async initialize() {
    console.log(`🏊 初始化进程池 (${this.options.min}-${this.options.max} 个进程)`);
    
    // 创建最小数量的worker
    for (let i = 0; i < this.options.min; i++) {
      await this.createWorker();
    }
    
    // 启动任务处理
    this.startTaskProcessing();
    
    console.log(`✅ 进程池初始化完成,${this.workers.length} 个进程就绪`);
  }

  // 创建worker进程
  async createWorker() {
    return new Promise((resolve, reject) => {
      const worker = fork(this.options.workerScript, [], {
        silent: true
      });

      const workerInfo = {
        id: worker.pid,
        process: worker,
        busy: false,
        tasks: 0,
        errors: 0,
        startTime: Date.now(),
        lastUsed: Date.now(),
        idleTimer: null
      };

      // 监听worker消息
      worker.on('message', (message) => {
        this.handleWorkerMessage(workerInfo, message);
      });

      // 监听worker错误
      worker.on('error', (error) => {
        console.error(`❌ Worker ${workerInfo.id} 错误:`, error);
        workerInfo.errors++;
        this.handleWorkerError(workerInfo, error);
      });

      // 监听worker退出
      worker.on('exit', (code, signal) => {
        console.log(`🚪 Worker ${workerInfo.id} 退出: 代码=${code}, 信号=${signal}`);
        this.handleWorkerExit(workerInfo);
      });

      // 等待worker就绪
      const readyHandler = (message) => {
        if (message.type === 'ready') {
          worker.removeListener('message', readyHandler);
          
          this.workers.push(workerInfo);
          this.availableWorkers.push(workerInfo);
          this.setupWorkerIdleTimeout(workerInfo);
          
          console.log(`🔧 Worker ${workerInfo.id} 创建成功`);
          resolve(workerInfo);
        }
      };

      worker.on('message', readyHandler);
      
      // 设置创建超时
      setTimeout(() => {
        worker.removeListener('message', readyHandler);
        reject(new Error('Worker创建超时'));
      }, 10000);
    });
  }

  // 处理worker消息
  handleWorkerMessage(workerInfo, message) {
    switch (message.type) {
      case 'ready':
        // Worker就绪消息在createWorker中处理
        break;
        
      case 'taskComplete':
        this.handleTaskComplete(workerInfo, message);
        break;
        
      case 'taskError':
        this.handleTaskError(workerInfo, message);
        break;
        
      case 'status':
        this.handleWorkerStatus(workerInfo, message);
        break;
        
      default:
        console.log(`📨 Worker ${workerInfo.id} 消息:`, message);
    }
  }

  // 处理任务完成
  handleTaskComplete(workerInfo, message) {
    const { taskId, result, duration } = message;
    
    workerInfo.tasks++;
    this.completedTasks++;
    
    // 找到对应的任务
    const taskIndex = this.taskQueue.findIndex(task => task.id === taskId);
    if (taskIndex > -1) {
      const task = this.taskQueue.splice(taskIndex, 1)[0];
      task.resolve(result);
    }
    
    // 释放worker
    this.releaseWorker(workerInfo);
    
    console.log(`✅ 任务 ${taskId} 完成,耗时 ${duration}ms`);
  }

  // 处理任务错误
  handleTaskError(workerInfo, message) {
    const { taskId, error } = message;
    
    workerInfo.errors++;
    this.failedTasks++;
    
    // 找到对应的任务
    const taskIndex = this.taskQueue.findIndex(task => task.id === taskId);
    if (taskIndex > -1) {
      const task = this.taskQueue.splice(taskIndex, 1)[0];
      task.reject(new Error(error));
    }
    
    // 释放worker
    this.releaseWorker(workerInfo);
    
    console.error(`❌ 任务 ${taskId} 失败:`, error);
  }

  // 处理worker状态
  handleWorkerStatus(workerInfo, message) {
    console.log(`📊 Worker ${workerInfo.id} 状态:`, message.status);
  }

  // 处理worker错误
  handleWorkerError(workerInfo, error) {
    // 如果错误过多,重启worker
    if (workerInfo.errors > 5) {
      console.log(`🔄 重启错误频繁的Worker ${workerInfo.id}`);
      this.restartWorker(workerInfo);
    }
  }

  // 处理worker退出
  handleWorkerExit(workerInfo) {
    // 从各个集合中移除worker
    const workerIndex = this.workers.indexOf(workerInfo);
    if (workerIndex > -1) {
      this.workers.splice(workerIndex, 1);
    }
    
    const availableIndex = this.availableWorkers.indexOf(workerInfo);
    if (availableIndex > -1) {
      this.availableWorkers.splice(availableIndex, 1);
    }
    
    this.busyWorkers.delete(workerInfo);
    
    // 如果进程数低于最小值,创建新的worker
    if (this.workers.length < this.options.min) {
      console.log('🔄 创建新worker以维持最小数量');
      this.createWorker().catch(error => {
        console.error('创建替换worker失败:', error);
      });
    }
  }

  // 重启worker
  async restartWorker(workerInfo) {
    // 终止旧worker
    workerInfo.process.kill();
    
    // 创建新worker
    try {
      await this.createWorker();
    } catch (error) {
      console.error('重启worker失败:', error);
    }
  }

  // 设置worker空闲超时
  setupWorkerIdleTimeout(workerInfo) {
    const resetIdleTimeout = () => {
      if (workerInfo.idleTimer) {
        clearTimeout(workerInfo.idleTimer);
      }
      
      workerInfo.idleTimer = setTimeout(() => {
        if (!workerInfo.busy && this.workers.length > this.options.min) {
          console.log(`⏰ 终止空闲Worker ${workerInfo.id}`);
          workerInfo.process.kill();
        }
      }, this.options.idleTimeout);
    };
    
    resetIdleTimeout();
    
    // 每次使用后重置超时
    workerInfo.resetIdleTimeout = resetIdleTimeout;
  }

  // 获取可用worker
  getAvailableWorker() {
    if (this.availableWorkers.length > 0) {
      return this.availableWorkers.pop();
    }
    
    // 如果没有可用worker且未达到最大数量,创建新worker
    if (this.workers.length < this.options.max) {
      return this.createWorker();
    }
    
    return null;
  }

  // 释放worker
  releaseWorker(workerInfo) {
    workerInfo.busy = false;
    workerInfo.lastUsed = Date.now();
    
    this.busyWorkers.delete(workerInfo);
    this.availableWorkers.push(workerInfo);
    
    if (workerInfo.resetIdleTimeout) {
      workerInfo.resetIdleTimeout();
    }
    
    // 处理等待的任务
    this.processTaskQueue();
  }

  // 启动任务处理
  startTaskProcessing() {
    setInterval(() => {
      this.processTaskQueue();
    }, 100);
  }

  // 处理任务队列
  async processTaskQueue() {
    while (this.taskQueue.length > 0) {
      const worker = this.getAvailableWorker();
      
      if (!worker) {
        break; // 没有可用worker
      }
      
      // 如果worker是Promise(新创建的),等待创建完成
      const workerInfo = await Promise.resolve(worker);
      
      const task = this.taskQueue.find(t => !t.assigned);
      if (!task) {
        // 没有未分配的任务,将worker放回
        this.availableWorkers.push(workerInfo);
        break;
      }
      
      // 分配任务
      task.assigned = true;
      workerInfo.busy = true;
      this.busyWorkers.add(workerInfo);
      
      // 从可用列表中移除
      const index = this.availableWorkers.indexOf(workerInfo);
      if (index > -1) {
        this.availableWorkers.splice(index, 1);
      }
      
      // 发送任务给worker
      workerInfo.process.send({
        type: 'task',
        taskId: task.id,
        data: task.data
      });
      
      console.log(`📋 任务 ${task.id} 分配给Worker ${workerInfo.id}`);
    }
  }

  // 提交任务
  submitTask(data) {
    return new Promise((resolve, reject) => {
      const taskId = `task_${++this.totalTasks}`;
      
      const task = {
        id: taskId,
        data: data,
        resolve: resolve,
        reject: reject,
        assigned: false,
        createdAt: Date.now()
      };
      
      this.taskQueue.push(task);
      
      console.log(`📝 提交任务 ${taskId}`);
      
      // 立即尝试处理
      setImmediate(() => this.processTaskQueue());
    });
  }

  // 获取进程池状态
  getStatus() {
    return {
      workers: {
        total: this.workers.length,
        available: this.availableWorkers.length,
        busy: this.busyWorkers.size
      },
      tasks: {
        total: this.totalTasks,
        completed: this.completedTasks,
        failed: this.failedTasks,
        queued: this.taskQueue.length,
        pending: this.taskQueue.filter(t => !t.assigned).length
      },
      performance: {
        successRate: this.totalTasks > 0 
          ? (this.completedTasks / this.totalTasks * 100).toFixed(2) + '%'
          : '0%'
      }
    };
  }

  // 关闭进程池
  async shutdown() {
    console.log('🔄 关闭进程池...');
    
    // 停止接受新任务
    this.taskQueue = [];
    
    // 等待正在执行的任务完成
    while (this.busyWorkers.size > 0) {
      console.log(`⏳ 等待 ${this.busyWorkers.size} 个任务完成...`);
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
    
    // 终止所有worker
    for (const workerInfo of this.workers) {
      if (workerInfo.idleTimer) {
        clearTimeout(workerInfo.idleTimer);
      }
      workerInfo.process.kill();
    }
    
    console.log('✅ 进程池已关闭');
  }
}

// 任务调度器
class TaskScheduler extends EventEmitter {
  constructor(processPool) {
    super();
    this.processPool = processPool;
    this.scheduledTasks = new Map();
    this.cronJobs = new Map();
    this.taskId = 0;
  }

  // 调度一次性任务
  schedule(delay, taskData) {
    const taskId = `scheduled_${++this.taskId}`;
    
    const timeout = setTimeout(async () => {
      try {
        console.log(`⏰ 执行调度任务 ${taskId}`);
        const result = await this.processPool.submitTask(taskData);
        this.emit('taskComplete', { taskId, result });
      } catch (error) {
        this.emit('taskError', { taskId, error });
      } finally {
        this.scheduledTasks.delete(taskId);
      }
    }, delay);
    
    this.scheduledTasks.set(taskId, {
      id: taskId,
      timeout: timeout,
      taskData: taskData,
      scheduledAt: Date.now(),
      executeAt: Date.now() + delay
    });
    
    console.log(`📅 调度任务 ${taskId},${delay}ms后执行`);
    return taskId;
  }

  // 调度周期性任务
  scheduleCron(interval, taskData, options = {}) {
    const jobId = `cron_${++this.taskId}`;
    const { maxExecutions = Infinity, startDelay = 0 } = options;
    
    let executions = 0;
    
    const executeJob = async () => {
      if (executions >= maxExecutions) {
        this.cancelCron(jobId);
        return;
      }
      
      try {
        console.log(`🔄 执行周期任务 ${jobId} (第${executions + 1}次)`);
        const result = await this.processPool.submitTask({
          ...taskData,
          execution: executions + 1
        });
        
        executions++;
        this.emit('cronTaskComplete', { jobId, execution: executions, result });
        
      } catch (error) {
        this.emit('cronTaskError', { jobId, execution: executions + 1, error });
      }
    };
    
    // 设置初始延迟
    const startTimeout = setTimeout(() => {
      executeJob();
      
      // 设置周期执行
      const intervalId = setInterval(executeJob, interval);
      
      this.cronJobs.set(jobId, {
        id: jobId,
        intervalId: intervalId,
        interval: interval,
        taskData: taskData,
        executions: executions,
        maxExecutions: maxExecutions,
        startedAt: Date.now()
      });
      
    }, startDelay);
    
    this.cronJobs.set(jobId, {
      id: jobId,
      startTimeout: startTimeout,
      interval: interval,
      taskData: taskData,
      executions: executions,
      maxExecutions: maxExecutions,
      scheduledAt: Date.now()
    });
    
    console.log(`⏰ 调度周期任务 ${jobId},间隔 ${interval}ms`);
    return jobId;
  }

  // 取消调度任务
  cancel(taskId) {
    const task = this.scheduledTasks.get(taskId);
    if (task) {
      clearTimeout(task.timeout);
      this.scheduledTasks.delete(taskId);
      console.log(`❌ 取消调度任务 ${taskId}`);
      return true;
    }
    return false;
  }

  // 取消周期任务
  cancelCron(jobId) {
    const job = this.cronJobs.get(jobId);
    if (job) {
      if (job.intervalId) {
        clearInterval(job.intervalId);
      }
      if (job.startTimeout) {
        clearTimeout(job.startTimeout);
      }
      
      this.cronJobs.delete(jobId);
      console.log(`❌ 取消周期任务 ${jobId}`);
      return true;
    }
    return false;
  }

  // 获取调度状态
  getScheduleStatus() {
    return {
      scheduledTasks: this.scheduledTasks.size,
      cronJobs: this.cronJobs.size,
      tasks: Array.from(this.scheduledTasks.values()).map(task => ({
        id: task.id,
        executeAt: new Date(task.executeAt).toISOString(),
        remainingTime: task.executeAt - Date.now()
      })),
      jobs: Array.from(this.cronJobs.values()).map(job => ({
        id: job.id,
        interval: job.interval,
        executions: job.executions,
        maxExecutions: job.maxExecutions
      }))
    };
  }

  // 清理所有调度
  cleanup() {
    // 取消所有调度任务
    for (const taskId of this.scheduledTasks.keys()) {
      this.cancel(taskId);
    }
    
    // 取消所有周期任务
    for (const jobId of this.cronJobs.keys()) {
      this.cancelCron(jobId);
    }
    
    console.log('🧹 调度器已清理');
  }
}

module.exports = {
  ProcessPool,
  TaskScheduler
};

子进程管理是Node.js构建高性能、可扩展应用的重要技术,通过合理的进程管理策略可以充分利用系统资源!