Skip to content

Cluster模块

🎯 学习目标

  • 深入理解Node.js Cluster模块的工作原理
  • 掌握多进程架构的设计和实现
  • 学会进程间通信和负载均衡
  • 了解集群管理和故障恢复机制

📚 核心概念

Cluster模块基础

javascript
// Cluster模块核心概念
const clusterConcepts = {
  architecture: {
    master: {
      description: '主进程 - 管理工作进程',
      responsibilities: [
        '创建和管理工作进程',
        '负载均衡',
        '进程监控',
        '故障恢复'
      ]
    },
    worker: {
      description: '工作进程 - 处理实际业务',
      responsibilities: [
        '处理客户端请求',
        '执行业务逻辑',
        '与主进程通信',
        '报告状态'
      ]
    }
  },
  communication: {
    ipc: '进程间通信 (Inter-Process Communication)',
    events: '事件驱动通信',
    messageQueue: '消息队列机制'
  },
  loadBalancing: {
    roundRobin: '轮询调度 (默认)',
    none: '操作系统调度',
    custom: '自定义调度策略'
  }
};

console.log('Cluster概念:', clusterConcepts);

🛠️ 基础Cluster实现

简单集群服务器

javascript
// basic-cluster.js
const cluster = require('cluster');
const http = require('http');
const os = require('os');

class BasicClusterServer {
  constructor(options = {}) {
    this.options = {
      workers: options.workers || os.cpus().length,
      port: options.port || 3000,
      host: options.host || 'localhost',
      ...options
    };
    
    this.workers = new Map();
    this.stats = {
      requests: 0,
      workers: {
        created: 0,
        died: 0,
        restarted: 0
      },
      startTime: Date.now()
    };
  }

  start() {
    if (cluster.isMaster) {
      this.setupMaster();
    } else {
      this.setupWorker();
    }
  }

  setupMaster() {
    console.log(`🚀 主进程启动 PID: ${process.pid}`);
    console.log(`📊 CPU核心数: ${os.cpus().length}`);
    console.log(`👥 启动 ${this.options.workers} 个工作进程...\n`);

    // 设置集群配置
    cluster.setupMaster({
      exec: __filename,
      args: ['--worker'],
      silent: false
    });

    // 创建工作进程
    for (let i = 0; i < this.options.workers; i++) {
      this.createWorker();
    }

    // 监听工作进程事件
    this.setupMasterEvents();

    // 显示状态信息
    this.displayStatus();
  }

  createWorker() {
    const worker = cluster.fork();
    
    this.workers.set(worker.id, {
      worker,
      pid: worker.process.pid,
      startTime: Date.now(),
      requests: 0,
      memory: 0,
      cpu: 0
    });

    this.stats.workers.created++;
    
    console.log(`✅ 工作进程创建: ID=${worker.id}, PID=${worker.process.pid}`);
    
    return worker;
  }

  setupMasterEvents() {
    // 工作进程上线
    cluster.on('online', (worker) => {
      console.log(`🟢 工作进程上线: ID=${worker.id}, PID=${worker.process.pid}`);
    });

    // 工作进程退出
    cluster.on('exit', (worker, code, signal) => {
      console.log(`🔴 工作进程退出: ID=${worker.id}, PID=${worker.process.pid}, 代码=${code}, 信号=${signal}`);
      
      this.workers.delete(worker.id);
      this.stats.workers.died++;

      // 自动重启工作进程
      if (!worker.exitedAfterDisconnect) {
        console.log(`🔄 重启工作进程...`);
        this.createWorker();
        this.stats.workers.restarted++;
      }
    });

    // 工作进程断开连接
    cluster.on('disconnect', (worker) => {
      console.log(`⚠️ 工作进程断开连接: ID=${worker.id}`);
    });

    // 监听工作进程消息
    cluster.on('message', (worker, message) => {
      this.handleWorkerMessage(worker, message);
    });

    // 优雅关闭处理
    process.on('SIGINT', () => {
      this.gracefulShutdown();
    });

    process.on('SIGTERM', () => {
      this.gracefulShutdown();
    });
  }

  handleWorkerMessage(worker, message) {
    const workerInfo = this.workers.get(worker.id);
    
    if (!workerInfo) return;

    switch (message.type) {
      case 'request':
        workerInfo.requests++;
        this.stats.requests++;
        break;
        
      case 'stats':
        workerInfo.memory = message.memory;
        workerInfo.cpu = message.cpu;
        break;
        
      case 'error':
        console.error(`❌ 工作进程错误 ID=${worker.id}:`, message.error);
        break;
        
      default:
        console.log(`📨 工作进程消息 ID=${worker.id}:`, message);
    }
  }

  setupWorker() {
    const server = http.createServer((req, res) => {
      // 向主进程报告请求
      process.send({ type: 'request', url: req.url, method: req.method });

      // 处理请求
      const response = {
        message: 'Hello from Cluster!',
        worker: {
          id: cluster.worker.id,
          pid: process.pid
        },
        timestamp: new Date().toISOString(),
        url: req.url,
        method: req.method
      };

      res.writeHead(200, { 'Content-Type': 'application/json' });
      res.end(JSON.stringify(response, null, 2));
    });

    server.listen(this.options.port, this.options.host, () => {
      console.log(`🌐 工作进程服务器启动: ID=${cluster.worker.id}, PID=${process.pid}, 端口=${this.options.port}`);
    });

    // 定期发送统计信息
    setInterval(() => {
      const memUsage = process.memoryUsage();
      const cpuUsage = process.cpuUsage();
      
      process.send({
        type: 'stats',
        memory: memUsage.heapUsed,
        cpu: cpuUsage.user + cpuUsage.system
      });
    }, 5000);

    // 优雅关闭处理
    process.on('SIGTERM', () => {
      console.log(`🔄 工作进程 ${cluster.worker.id} 开始优雅关闭...`);
      
      server.close(() => {
        console.log(`✅ 工作进程 ${cluster.worker.id} 已关闭`);
        process.exit(0);
      });
    });
  }

  displayStatus() {
    setInterval(() => {
      console.log('\n📊 集群状态:');
      console.log(`  运行时间: ${Math.floor((Date.now() - this.stats.startTime) / 1000)}秒`);
      console.log(`  总请求数: ${this.stats.requests}`);
      console.log(`  工作进程: 创建=${this.stats.workers.created}, 死亡=${this.stats.workers.died}, 重启=${this.stats.workers.restarted}`);
      console.log(`  活跃工作进程: ${this.workers.size}`);
      
      console.log('\n👥 工作进程详情:');
      for (const [id, info] of this.workers) {
        const uptime = Math.floor((Date.now() - info.startTime) / 1000);
        const memoryMB = (info.memory / 1024 / 1024).toFixed(2);
        
        console.log(`  ID=${id}, PID=${info.pid}, 运行=${uptime}s, 请求=${info.requests}, 内存=${memoryMB}MB`);
      }
      console.log('');
    }, 10000);
  }

  gracefulShutdown() {
    console.log('\n🔄 开始优雅关闭集群...');
    
    // 断开所有工作进程
    for (const [id, info] of this.workers) {
      info.worker.disconnect();
    }

    // 等待工作进程关闭
    setTimeout(() => {
      console.log('💀 强制终止剩余工作进程...');
      for (const [id, info] of this.workers) {
        info.worker.kill();
      }
      
      console.log('✅ 集群已关闭');
      process.exit(0);
    }, 10000);
  }
}

// 使用示例
if (require.main === module) {
  const server = new BasicClusterServer({
    workers: 4,
    port: 3000
  });
  
  server.start();
}

module.exports = BasicClusterServer;

高级集群管理器

javascript
// advanced-cluster-manager.js
const cluster = require('cluster');
const EventEmitter = require('events');
const os = require('os');

class AdvancedClusterManager extends EventEmitter {
  constructor(options = {}) {
    super();
    
    this.options = {
      workers: options.workers || os.cpus().length,
      maxRestarts: options.maxRestarts || 10,
      restartDelay: options.restartDelay || 1000,
      gracefulTimeout: options.gracefulTimeout || 10000,
      healthCheckInterval: options.healthCheckInterval || 30000,
      memoryThreshold: options.memoryThreshold || 500 * 1024 * 1024, // 500MB
      cpuThreshold: options.cpuThreshold || 80, // 80%
      ...options
    };
    
    this.workers = new Map();
    this.restartCounts = new Map();
    this.isShuttingDown = false;
    
    this.stats = {
      startTime: Date.now(),
      totalRequests: 0,
      totalErrors: 0,
      restarts: 0,
      healthChecks: 0
    };
    
    this.loadBalancer = new LoadBalancer(this.options.loadBalancing);
  }

  start() {
    if (cluster.isMaster) {
      this.startMaster();
    } else {
      this.startWorker();
    }
  }

  startMaster() {
    console.log(`🚀 高级集群管理器启动 PID: ${process.pid}`);
    
    // 设置集群环境
    this.setupCluster();
    
    // 创建工作进程
    this.createWorkers();
    
    // 设置事件监听
    this.setupEvents();
    
    // 启动健康检查
    this.startHealthCheck();
    
    // 启动监控
    this.startMonitoring();
    
    console.log(`✅ 集群管理器启动完成,${this.workers.size} 个工作进程运行中`);
  }

  setupCluster() {
    cluster.setupMaster({
      exec: this.options.workerScript || __filename,
      args: this.options.args || [],
      silent: this.options.silent || false
    });

    // 设置调度策略
    cluster.schedulingPolicy = this.options.schedulingPolicy || cluster.SCHED_RR;
  }

  createWorkers() {
    for (let i = 0; i < this.options.workers; i++) {
      this.createWorker();
    }
  }

  createWorker() {
    if (this.isShuttingDown) {
      return null;
    }

    const worker = cluster.fork();
    const workerInfo = {
      id: worker.id,
      pid: worker.process.pid,
      startTime: Date.now(),
      requests: 0,
      errors: 0,
      memory: 0,
      cpu: 0,
      status: 'starting',
      lastHealthCheck: Date.now(),
      restartCount: this.restartCounts.get(worker.id) || 0
    };

    this.workers.set(worker.id, workerInfo);
    this.loadBalancer.addWorker(worker.id, workerInfo);

    console.log(`✨ 创建工作进程: ID=${worker.id}, PID=${worker.process.pid}`);
    
    return worker;
  }

  setupEvents() {
    // 工作进程上线
    cluster.on('online', (worker) => {
      const workerInfo = this.workers.get(worker.id);
      if (workerInfo) {
        workerInfo.status = 'online';
        console.log(`🟢 工作进程上线: ID=${worker.id}`);
        this.emit('workerOnline', worker, workerInfo);
      }
    });

    // 工作进程监听端口
    cluster.on('listening', (worker, address) => {
      const workerInfo = this.workers.get(worker.id);
      if (workerInfo) {
        workerInfo.status = 'listening';
        workerInfo.address = address;
        console.log(`👂 工作进程监听: ID=${worker.id}, ${address.address}:${address.port}`);
        this.emit('workerListening', worker, address);
      }
    });

    // 工作进程退出
    cluster.on('exit', (worker, code, signal) => {
      this.handleWorkerExit(worker, code, signal);
    });

    // 工作进程断开
    cluster.on('disconnect', (worker) => {
      console.log(`⚠️ 工作进程断开: ID=${worker.id}`);
      this.emit('workerDisconnect', worker);
    });

    // 进程间消息
    Object.values(cluster.workers).forEach(worker => {
      worker.on('message', (message) => {
        this.handleWorkerMessage(worker, message);
      });
    });

    // 优雅关闭
    process.on('SIGINT', () => this.gracefulShutdown('SIGINT'));
    process.on('SIGTERM', () => this.gracefulShutdown('SIGTERM'));
  }

  handleWorkerExit(worker, code, signal) {
    const workerInfo = this.workers.get(worker.id);
    
    console.log(`🔴 工作进程退出: ID=${worker.id}, PID=${worker.process.pid}, 代码=${code}, 信号=${signal}`);
    
    if (workerInfo) {
      this.workers.delete(worker.id);
      this.loadBalancer.removeWorker(worker.id);
      this.emit('workerExit', worker, code, signal, workerInfo);
    }

    // 检查是否需要重启
    if (!worker.exitedAfterDisconnect && !this.isShuttingDown) {
      this.scheduleWorkerRestart(worker.id, code, signal);
    }
  }

  scheduleWorkerRestart(workerId, code, signal) {
    const restartCount = this.restartCounts.get(workerId) || 0;
    
    if (restartCount >= this.options.maxRestarts) {
      console.error(`💀 工作进程 ${workerId} 重启次数超限 (${restartCount}), 停止重启`);
      this.emit('workerMaxRestarts', workerId, restartCount);
      return;
    }

    this.restartCounts.set(workerId, restartCount + 1);
    this.stats.restarts++;

    console.log(`🔄 计划重启工作进程 ${workerId} (第${restartCount + 1}次), ${this.options.restartDelay}ms后执行`);

    setTimeout(() => {
      if (!this.isShuttingDown) {
        this.createWorker();
        console.log(`♻️ 工作进程已重启: 新ID=${cluster.worker?.id || 'unknown'}`);
      }
    }, this.options.restartDelay);
  }

  handleWorkerMessage(worker, message) {
    const workerInfo = this.workers.get(worker.id);
    if (!workerInfo) return;

    switch (message.type) {
      case 'request':
        workerInfo.requests++;
        this.stats.totalRequests++;
        break;

      case 'error':
        workerInfo.errors++;
        this.stats.totalErrors++;
        console.error(`❌ 工作进程错误 ID=${worker.id}:`, message.error);
        break;

      case 'health':
        this.updateWorkerHealth(worker.id, message.data);
        break;

      case 'metrics':
        this.updateWorkerMetrics(worker.id, message.data);
        break;

      default:
        this.emit('workerMessage', worker, message);
    }
  }

  updateWorkerHealth(workerId, healthData) {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo) return;

    workerInfo.memory = healthData.memory;
    workerInfo.cpu = healthData.cpu;
    workerInfo.lastHealthCheck = Date.now();

    // 检查健康阈值
    if (healthData.memory > this.options.memoryThreshold) {
      console.warn(`⚠️ 工作进程 ${workerId} 内存使用过高: ${(healthData.memory / 1024 / 1024).toFixed(2)}MB`);
      this.emit('workerHighMemory', workerId, healthData.memory);
    }

    if (healthData.cpu > this.options.cpuThreshold) {
      console.warn(`⚠️ 工作进程 ${workerId} CPU使用过高: ${healthData.cpu.toFixed(2)}%`);
      this.emit('workerHighCPU', workerId, healthData.cpu);
    }
  }

  updateWorkerMetrics(workerId, metrics) {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo) return;

    workerInfo.metrics = metrics;
    this.emit('workerMetrics', workerId, metrics);
  }

  startHealthCheck() {
    setInterval(() => {
      this.performHealthCheck();
    }, this.options.healthCheckInterval);

    console.log(`🏥 健康检查已启动,间隔: ${this.options.healthCheckInterval}ms`);
  }

  performHealthCheck() {
    this.stats.healthChecks++;
    const now = Date.now();
    const unhealthyWorkers = [];

    for (const [workerId, workerInfo] of this.workers) {
      // 检查工作进程是否响应
      if (now - workerInfo.lastHealthCheck > this.options.healthCheckInterval * 2) {
        unhealthyWorkers.push(workerId);
      }
    }

    if (unhealthyWorkers.length > 0) {
      console.warn(`🏥 发现不健康的工作进程: ${unhealthyWorkers.join(', ')}`);
      
      unhealthyWorkers.forEach(workerId => {
        const worker = Object.values(cluster.workers).find(w => w.id === workerId);
        if (worker) {
          console.log(`💊 重启不健康的工作进程: ${workerId}`);
          worker.kill();
        }
      });
    }

    this.emit('healthCheck', {
      timestamp: now,
      totalWorkers: this.workers.size,
      unhealthyWorkers: unhealthyWorkers.length
    });
  }

  startMonitoring() {
    setInterval(() => {
      this.displayClusterStats();
    }, 30000); // 每30秒显示一次统计
  }

  displayClusterStats() {
    const uptime = Math.floor((Date.now() - this.stats.startTime) / 1000);
    const avgRequestsPerWorker = this.stats.totalRequests / this.workers.size;
    
    console.log('\n📊 集群统计信息:');
    console.log(`  运行时间: ${uptime}秒`);
    console.log(`  工作进程: ${this.workers.size}/${this.options.workers}`);
    console.log(`  总请求: ${this.stats.totalRequests}`);
    console.log(`  总错误: ${this.stats.totalErrors}`);
    console.log(`  重启次数: ${this.stats.restarts}`);
    console.log(`  健康检查: ${this.stats.healthChecks}`);
    console.log(`  平均请求/进程: ${avgRequestsPerWorker.toFixed(2)}`);

    console.log('\n👥 工作进程状态:');
    for (const [workerId, info] of this.workers) {
      const workerUptime = Math.floor((Date.now() - info.startTime) / 1000);
      const memoryMB = (info.memory / 1024 / 1024).toFixed(2);
      
      console.log(`  ID=${workerId}: 状态=${info.status}, 运行=${workerUptime}s, 请求=${info.requests}, 错误=${info.errors}, 内存=${memoryMB}MB`);
    }
    console.log('');
  }

  // 手动重启工作进程
  restartWorker(workerId) {
    const worker = Object.values(cluster.workers).find(w => w.id === workerId);
    
    if (!worker) {
      console.error(`❌ 工作进程 ${workerId} 不存在`);
      return false;
    }

    console.log(`🔄 手动重启工作进程: ${workerId}`);
    worker.kill();
    return true;
  }

  // 重启所有工作进程
  restartAllWorkers() {
    console.log('🔄 重启所有工作进程...');
    
    const workers = Array.from(this.workers.keys());
    
    workers.forEach(workerId => {
      setTimeout(() => {
        this.restartWorker(workerId);
      }, Math.random() * 5000); // 随机延迟避免同时重启
    });
  }

  gracefulShutdown(signal) {
    console.log(`\n🔄 收到信号 ${signal},开始优雅关闭...`);
    this.isShuttingDown = true;

    const workers = Object.values(cluster.workers);
    
    if (workers.length === 0) {
      console.log('✅ 没有工作进程需要关闭');
      process.exit(0);
      return;
    }

    console.log(`📤 向 ${workers.length} 个工作进程发送关闭信号...`);
    
    // 断开所有工作进程
    workers.forEach(worker => {
      worker.disconnect();
    });

    // 设置强制关闭超时
    const forceShutdownTimer = setTimeout(() => {
      console.log('💀 强制终止剩余工作进程...');
      
      workers.forEach(worker => {
        if (!worker.isDead()) {
          worker.kill('SIGKILL');
        }
      });
      
      process.exit(1);
    }, this.options.gracefulTimeout);

    // 等待所有工作进程退出
    let exitedWorkers = 0;
    
    const checkAllExited = () => {
      exitedWorkers++;
      
      if (exitedWorkers >= workers.length) {
        clearTimeout(forceShutdownTimer);
        console.log('✅ 所有工作进程已优雅关闭');
        process.exit(0);
      }
    };

    workers.forEach(worker => {
      worker.on('exit', checkAllExited);
    });
  }

  startWorker() {
    // 工作进程逻辑由子类或外部实现
    console.log(`🔧 工作进程启动: ID=${cluster.worker.id}, PID=${process.pid}`);
    
    // 定期发送健康状态
    setInterval(() => {
      const memUsage = process.memoryUsage();
      const cpuUsage = process.cpuUsage();
      
      process.send({
        type: 'health',
        data: {
          memory: memUsage.heapUsed,
          cpu: (cpuUsage.user + cpuUsage.system) / 1000000, // 转换为毫秒
          uptime: process.uptime()
        }
      });
    }, 5000);
  }

  // 获取集群状态
  getClusterStatus() {
    return {
      isShuttingDown: this.isShuttingDown,
      workers: Array.from(this.workers.values()),
      stats: this.stats,
      uptime: Date.now() - this.stats.startTime,
      loadBalancer: this.loadBalancer.getStatus()
    };
  }
}

// 负载均衡器
class LoadBalancer {
  constructor(strategy = 'round-robin') {
    this.strategy = strategy;
    this.workers = new Map();
    this.currentIndex = 0;
  }

  addWorker(workerId, workerInfo) {
    this.workers.set(workerId, workerInfo);
  }

  removeWorker(workerId) {
    this.workers.delete(workerId);
  }

  selectWorker() {
    const workerIds = Array.from(this.workers.keys());
    
    if (workerIds.length === 0) {
      return null;
    }

    switch (this.strategy) {
      case 'round-robin':
        const workerId = workerIds[this.currentIndex % workerIds.length];
        this.currentIndex++;
        return workerId;

      case 'least-connections':
        return this.selectLeastConnections();

      case 'least-memory':
        return this.selectLeastMemory();

      default:
        return workerIds[0];
    }
  }

  selectLeastConnections() {
    let minConnections = Infinity;
    let selectedWorkerId = null;

    for (const [workerId, workerInfo] of this.workers) {
      if (workerInfo.requests < minConnections) {
        minConnections = workerInfo.requests;
        selectedWorkerId = workerId;
      }
    }

    return selectedWorkerId;
  }

  selectLeastMemory() {
    let minMemory = Infinity;
    let selectedWorkerId = null;

    for (const [workerId, workerInfo] of this.workers) {
      if (workerInfo.memory < minMemory) {
        minMemory = workerInfo.memory;
        selectedWorkerId = workerId;
      }
    }

    return selectedWorkerId;
  }

  getStatus() {
    return {
      strategy: this.strategy,
      workers: this.workers.size,
      currentIndex: this.currentIndex
    };
  }
}

module.exports = { AdvancedClusterManager, LoadBalancer };

🔄 进程间通信和状态管理

进程通信管理器

javascript
// ipc-manager.js
const EventEmitter = require('events');

class IPCManager extends EventEmitter {
  constructor() {
    super();
    this.messageQueue = [];
    this.responseCallbacks = new Map();
    this.messageId = 0;
    this.setupIPC();
  }

  setupIPC() {
    if (process.send) {
      // 工作进程中
      process.on('message', (message) => {
        this.handleMessage(message);
      });
    }
  }

  // 发送消息到主进程或工作进程
  send(type, data, targetWorker = null) {
    const message = {
      id: ++this.messageId,
      type,
      data,
      timestamp: Date.now(),
      from: process.pid
    };

    if (targetWorker) {
      message.target = targetWorker;
    }

    if (process.send) {
      process.send(message);
    } else if (targetWorker && global.cluster) {
      // 主进程向特定工作进程发送消息
      const worker = Object.values(global.cluster.workers)
        .find(w => w.id === targetWorker);
      
      if (worker) {
        worker.send(message);
      }
    }

    return message.id;
  }

  // 发送请求并等待响应
  request(type, data, timeout = 5000) {
    return new Promise((resolve, reject) => {
      const messageId = this.send(type, data);
      
      const timeoutId = setTimeout(() => {
        this.responseCallbacks.delete(messageId);
        reject(new Error(`请求超时: ${type}`));
      }, timeout);

      this.responseCallbacks.set(messageId, {
        resolve: (response) => {
          clearTimeout(timeoutId);
          resolve(response);
        },
        reject: (error) => {
          clearTimeout(timeoutId);
          reject(error);
        }
      });
    });
  }

  // 响应请求
  respond(originalMessage, data, error = null) {
    const response = {
      id: ++this.messageId,
      type: 'response',
      originalId: originalMessage.id,
      data,
      error,
      timestamp: Date.now(),
      from: process.pid
    };

    if (process.send) {
      process.send(response);
    }
  }

  handleMessage(message) {
    if (message.type === 'response') {
      // 处理响应消息
      const callback = this.responseCallbacks.get(message.originalId);
      if (callback) {
        this.responseCallbacks.delete(message.originalId);
        
        if (message.error) {
          callback.reject(new Error(message.error));
        } else {
          callback.resolve(message.data);
        }
      }
    } else {
      // 发出普通消息事件
      this.emit('message', message);
      this.emit(message.type, message.data, message);
    }
  }

  // 广播消息到所有工作进程
  broadcast(type, data) {
    if (global.cluster && global.cluster.isMaster) {
      const workers = Object.values(global.cluster.workers);
      
      workers.forEach(worker => {
        const message = {
          id: ++this.messageId,
          type,
          data,
          timestamp: Date.now(),
          from: process.pid,
          broadcast: true
        };
        
        worker.send(message);
      });
    }
  }
}

// 状态同步管理器
class StateManager extends EventEmitter {
  constructor() {
    super();
    this.state = new Map();
    this.subscribers = new Set();
    this.ipc = new IPCManager();
    
    this.setupStateSync();
  }

  setupStateSync() {
    this.ipc.on('state:get', (data, message) => {
      const value = this.state.get(data.key);
      this.ipc.respond(message, { key: data.key, value });
    });

    this.ipc.on('state:set', (data, message) => {
      this.setState(data.key, data.value, false); // 不广播,避免循环
      this.ipc.respond(message, { success: true });
    });

    this.ipc.on('state:sync', (data) => {
      // 同步状态更新
      this.setState(data.key, data.value, false);
    });

    this.ipc.on('state:subscribe', (data, message) => {
      this.subscribers.add(data.workerId);
      this.ipc.respond(message, { success: true });
    });
  }

  // 设置状态
  setState(key, value, broadcast = true) {
    const oldValue = this.state.get(key);
    this.state.set(key, value);
    
    this.emit('stateChange', { key, value, oldValue });
    
    if (broadcast) {
      // 广播状态变更
      this.ipc.broadcast('state:sync', { key, value });
    }
  }

  // 获取状态
  getState(key) {
    return this.state.get(key);
  }

  // 获取所有状态
  getAllState() {
    return Object.fromEntries(this.state);
  }

  // 删除状态
  deleteState(key) {
    const value = this.state.get(key);
    this.state.delete(key);
    
    this.emit('stateChange', { key, value: undefined, oldValue: value });
    this.ipc.broadcast('state:sync', { key, value: undefined });
  }

  // 订阅状态变更
  subscribe(callback) {
    this.on('stateChange', callback);
  }

  // 从其他进程获取状态
  async getRemoteState(key, targetWorker) {
    try {
      const response = await this.ipc.request('state:get', { key }, 5000);
      return response.value;
    } catch (error) {
      console.error(`获取远程状态失败: ${key}`, error);
      return undefined;
    }
  }

  // 向其他进程设置状态
  async setRemoteState(key, value, targetWorker) {
    try {
      await this.ipc.request('state:set', { key, value }, 5000);
      return true;
    } catch (error) {
      console.error(`设置远程状态失败: ${key}`, error);
      return false;
    }
  }
}

module.exports = { IPCManager, StateManager };

Cluster模块为Node.js提供了强大的多进程能力,通过合理的架构设计可以充分利用多核CPU资源,提升应用性能和可靠性!