EventEmitter使用
🎯 学习目标
- 深入理解EventEmitter的工作原理
- 掌握事件驱动编程的最佳实践
- 学会自定义EventEmitter和事件管理
- 了解性能优化和内存泄漏防护
📚 核心概念
EventEmitter基础
javascript
// EventEmitter核心概念
const eventEmitterConcepts = {
events: {
description: '事件系统核心',
characteristics: [
'异步非阻塞',
'观察者模式',
'松耦合架构',
'事件驱动'
]
},
listeners: {
types: {
once: '一次性监听器',
on: '持续监听器',
prependListener: '前置监听器',
removeListener: '移除监听器'
},
lifecycle: ['注册', '触发', '执行', '移除']
},
features: {
maxListeners: '最大监听器数量限制',
errorHandling: '错误事件处理',
asyncSupport: '异步事件支持',
eventCapture: '事件捕获机制'
}
};
console.log('EventEmitter概念:', eventEmitterConcepts);
🛠️ 基础EventEmitter使用
基本事件操作
javascript
// basic-event-emitter.js
const EventEmitter = require('events');
class BasicEventDemo extends EventEmitter {
constructor() {
super();
this.setupBasicEvents();
}
setupBasicEvents() {
// 设置最大监听器数量
this.setMaxListeners(20);
// 监听错误事件
this.on('error', (error) => {
console.error('❌ 事件错误:', error.message);
});
// 监听新监听器添加
this.on('newListener', (event, listener) => {
console.log(`📝 添加监听器: ${event}`);
});
// 监听监听器移除
this.on('removeListener', (event, listener) => {
console.log(`🗑️ 移除监听器: ${event}`);
});
}
// 演示基本事件操作
demonstrateBasicEvents() {
console.log('🎭 基本事件操作演示...\n');
// 1. 基本事件监听和触发
this.on('message', (data) => {
console.log(`📨 接收消息: ${data}`);
});
this.emit('message', 'Hello EventEmitter!');
// 2. 一次性事件监听
this.once('startup', (config) => {
console.log(`🚀 应用启动: ${JSON.stringify(config)}`);
});
this.emit('startup', { port: 3000, env: 'development' });
this.emit('startup', { port: 3000, env: 'development' }); // 不会触发
// 3. 多个监听器
this.on('user:login', (user) => {
console.log(`👤 用户登录: ${user.name}`);
});
this.on('user:login', (user) => {
console.log(`📊 记录登录日志: ${user.id}`);
});
this.on('user:login', (user) => {
console.log(`📧 发送欢迎邮件给: ${user.email}`);
});
this.emit('user:login', {
id: 1,
name: 'Alice',
email: 'alice@example.com'
});
// 4. 前置监听器
this.prependListener('order:created', (order) => {
console.log(`🔍 订单验证: ${order.id}`);
});
this.on('order:created', (order) => {
console.log(`📦 订单处理: ${order.id}`);
});
this.emit('order:created', { id: 'ORDER-001', amount: 99.99 });
// 5. 带参数的复杂事件
this.on('data:processed', (result, metadata) => {
console.log(`📊 数据处理完成: ${result.count} 条记录`);
console.log(`⏱️ 处理时间: ${metadata.duration}ms`);
});
this.emit('data:processed',
{ count: 1000, success: true },
{ duration: 250, memory: '15MB' }
);
}
// 演示监听器管理
demonstrateListenerManagement() {
console.log('\n🔧 监听器管理演示...\n');
// 创建命名函数以便移除
const messageHandler = (msg) => {
console.log(`📬 处理消息: ${msg}`);
};
const logHandler = (msg) => {
console.log(`📝 记录日志: ${msg}`);
};
// 添加监听器
this.on('notification', messageHandler);
this.on('notification', logHandler);
console.log(`监听器数量: ${this.listenerCount('notification')}`);
// 触发事件
this.emit('notification', '测试通知');
// 移除特定监听器
this.removeListener('notification', messageHandler);
console.log(`移除后监听器数量: ${this.listenerCount('notification')}`);
this.emit('notification', '第二个通知');
// 移除所有监听器
this.removeAllListeners('notification');
console.log(`清空后监听器数量: ${this.listenerCount('notification')}`);
this.emit('notification', '第三个通知'); // 不会有输出
}
// 演示异步事件处理
async demonstrateAsyncEvents() {
console.log('\n⚡ 异步事件处理演示...\n');
// 异步事件处理器
this.on('async:task', async (task) => {
console.log(`🔄 开始异步任务: ${task.name}`);
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, task.duration));
console.log(`✅ 异步任务完成: ${task.name}`);
this.emit('async:task:completed', task);
});
this.on('async:task:completed', (task) => {
console.log(`📋 任务 ${task.name} 已完成,耗时 ${task.duration}ms`);
});
// 触发异步事件
this.emit('async:task', { name: '数据处理', duration: 500 });
this.emit('async:task', { name: '文件上传', duration: 300 });
// 等待异步事件完成
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
// 使用示例
async function demonstrateBasicEventEmitter() {
const demo = new BasicEventDemo();
demo.demonstrateBasicEvents();
demo.demonstrateListenerManagement();
await demo.demonstrateAsyncEvents();
}
module.exports = { BasicEventDemo, demonstrateBasicEventEmitter };
高级EventEmitter实现
javascript
// advanced-event-emitter.js
const EventEmitter = require('events');
const { promisify } = require('util');
// 增强型EventEmitter
class EnhancedEventEmitter extends EventEmitter {
constructor(options = {}) {
super();
this.options = {
enableMetrics: options.enableMetrics !== false,
enableHistory: options.enableHistory !== false,
historyLimit: options.historyLimit || 100,
enableAsync: options.enableAsync !== false,
...options
};
// 事件统计
this.eventMetrics = new Map();
// 事件历史
this.eventHistory = [];
// 异步事件队列
this.asyncQueue = [];
this.processingAsync = false;
this.setupEnhancements();
}
setupEnhancements() {
// 拦截emit方法以添加增强功能
const originalEmit = this.emit;
this.emit = (event, ...args) => {
// 记录事件统计
if (this.options.enableMetrics) {
this.recordEventMetrics(event);
}
// 记录事件历史
if (this.options.enableHistory) {
this.recordEventHistory(event, args);
}
return originalEmit.call(this, event, ...args);
};
// 拦截on方法以添加监听器统计
const originalOn = this.on;
this.on = (event, listener) => {
const result = originalOn.call(this, event, listener);
this.updateListenerMetrics(event, 'add');
return result;
};
const originalRemoveListener = this.removeListener;
this.removeListener = (event, listener) => {
const result = originalRemoveListener.call(this, event, listener);
this.updateListenerMetrics(event, 'remove');
return result;
};
}
// 记录事件统计
recordEventMetrics(event) {
if (!this.eventMetrics.has(event)) {
this.eventMetrics.set(event, {
count: 0,
firstEmitted: Date.now(),
lastEmitted: null,
listeners: 0
});
}
const metrics = this.eventMetrics.get(event);
metrics.count++;
metrics.lastEmitted = Date.now();
metrics.listeners = this.listenerCount(event);
}
// 记录事件历史
recordEventHistory(event, args) {
const historyEntry = {
event,
args: args.length > 0 ? args : undefined,
timestamp: Date.now(),
listeners: this.listenerCount(event)
};
this.eventHistory.push(historyEntry);
// 限制历史记录长度
if (this.eventHistory.length > this.options.historyLimit) {
this.eventHistory.shift();
}
}
// 更新监听器统计
updateListenerMetrics(event, action) {
if (this.eventMetrics.has(event)) {
const metrics = this.eventMetrics.get(event);
metrics.listeners = this.listenerCount(event);
}
}
// 异步事件发射
emitAsync(event, ...args) {
return new Promise((resolve, reject) => {
const listeners = this.listeners(event);
if (listeners.length === 0) {
resolve([]);
return;
}
const promises = listeners.map(listener => {
try {
const result = listener(...args);
return Promise.resolve(result);
} catch (error) {
return Promise.reject(error);
}
});
Promise.allSettled(promises)
.then(results => {
const successful = results.filter(r => r.status === 'fulfilled');
const failed = results.filter(r => r.status === 'rejected');
if (failed.length > 0) {
console.warn(`⚠️ ${failed.length} 个监听器执行失败`);
}
resolve(successful.map(r => r.value));
})
.catch(reject);
});
}
// 串行异步事件发射
async emitAsyncSerial(event, ...args) {
const listeners = this.listeners(event);
const results = [];
for (const listener of listeners) {
try {
const result = await Promise.resolve(listener(...args));
results.push(result);
} catch (error) {
console.error(`❌ 监听器执行失败:`, error);
results.push({ error: error.message });
}
}
return results;
}
// 条件事件发射
emitIf(condition, event, ...args) {
if (typeof condition === 'function') {
if (condition(...args)) {
return this.emit(event, ...args);
}
} else if (condition) {
return this.emit(event, ...args);
}
return false;
}
// 延迟事件发射
emitDelayed(delay, event, ...args) {
return new Promise((resolve) => {
setTimeout(() => {
const result = this.emit(event, ...args);
resolve(result);
}, delay);
});
}
// 节流事件发射
emitThrottled(event, delay = 1000) {
const throttleKey = `throttle_${event}`;
if (this[throttleKey]) {
return false;
}
this[throttleKey] = true;
setTimeout(() => {
delete this[throttleKey];
}, delay);
return this.emit(event, ...Array.from(arguments).slice(2));
}
// 防抖事件发射
emitDebounced(event, delay = 1000) {
const debounceKey = `debounce_${event}`;
if (this[debounceKey]) {
clearTimeout(this[debounceKey]);
}
const args = Array.from(arguments).slice(2);
this[debounceKey] = setTimeout(() => {
this.emit(event, ...args);
delete this[debounceKey];
}, delay);
}
// 获取事件统计
getEventMetrics(event) {
if (event) {
return this.eventMetrics.get(event) || null;
}
const summary = {};
for (const [eventName, metrics] of this.eventMetrics) {
summary[eventName] = { ...metrics };
}
return summary;
}
// 获取事件历史
getEventHistory(event, limit) {
let history = this.eventHistory;
if (event) {
history = history.filter(entry => entry.event === event);
}
if (limit && limit > 0) {
history = history.slice(-limit);
}
return history;
}
// 获取监听器信息
getListenerInfo() {
const info = {};
const events = this.eventNames();
for (const event of events) {
info[event] = {
listenerCount: this.listenerCount(event),
maxListeners: this.getMaxListeners(),
listeners: this.listeners(event).map(fn => ({
name: fn.name || 'anonymous',
isAsync: fn.constructor.name === 'AsyncFunction'
}))
};
}
return info;
}
// 清理统计数据
clearMetrics() {
this.eventMetrics.clear();
this.eventHistory = [];
console.log('📊 事件统计数据已清理');
}
// 生成事件报告
generateReport() {
const report = {
timestamp: new Date().toISOString(),
totalEvents: this.eventNames().length,
totalEmissions: Array.from(this.eventMetrics.values())
.reduce((sum, metrics) => sum + metrics.count, 0),
eventMetrics: this.getEventMetrics(),
listenerInfo: this.getListenerInfo(),
recentHistory: this.getEventHistory(null, 10)
};
return report;
}
}
// 事件总线实现
class EventBus extends EnhancedEventEmitter {
constructor(options = {}) {
super(options);
this.namespaces = new Map();
this.middlewares = [];
this.eventFilters = [];
}
// 命名空间支持
namespace(name) {
if (!this.namespaces.has(name)) {
const ns = new EnhancedEventEmitter(this.options);
this.namespaces.set(name, ns);
}
return this.namespaces.get(name);
}
// 添加中间件
use(middleware) {
if (typeof middleware !== 'function') {
throw new TypeError('中间件必须是函数');
}
this.middlewares.push(middleware);
}
// 添加事件过滤器
addFilter(filter) {
if (typeof filter !== 'function') {
throw new TypeError('过滤器必须是函数');
}
this.eventFilters.push(filter);
}
// 重写emit方法以支持中间件和过滤器
emit(event, ...args) {
// 应用过滤器
for (const filter of this.eventFilters) {
if (!filter(event, ...args)) {
return false;
}
}
// 创建事件上下文
const context = {
event,
args,
timestamp: Date.now(),
stopped: false
};
// 应用中间件
const runMiddleware = (index) => {
if (index >= this.middlewares.length || context.stopped) {
// 所有中间件执行完毕或被停止,发射事件
return super.emit(event, ...args);
}
const middleware = this.middlewares[index];
try {
middleware(context, () => runMiddleware(index + 1));
} catch (error) {
console.error('❌ 中间件执行错误:', error);
return false;
}
};
return runMiddleware(0);
}
// 全局事件广播
broadcast(event, ...args) {
let emitted = super.emit(event, ...args);
// 向所有命名空间广播
for (const [name, ns] of this.namespaces) {
try {
ns.emit(event, ...args);
emitted = true;
} catch (error) {
console.error(`❌ 命名空间 ${name} 广播失败:`, error);
}
}
return emitted;
}
}
// 使用示例
async function demonstrateAdvancedEventEmitter() {
console.log('🚀 高级EventEmitter演示...\n');
const enhanced = new EnhancedEventEmitter({
enableMetrics: true,
enableHistory: true,
historyLimit: 50
});
// 添加一些监听器
enhanced.on('user:action', (action, user) => {
console.log(`👤 用户操作: ${user.name} - ${action}`);
});
enhanced.on('user:action', async (action, user) => {
console.log(`📊 记录用户行为: ${action}`);
await new Promise(resolve => setTimeout(resolve, 100));
});
// 测试各种事件发射方式
console.log('1. 基本事件发射:');
enhanced.emit('user:action', 'login', { name: 'Alice', id: 1 });
console.log('\n2. 异步事件发射:');
const asyncResults = await enhanced.emitAsync('user:action', 'logout', { name: 'Bob', id: 2 });
console.log('异步结果:', asyncResults);
console.log('\n3. 条件事件发射:');
enhanced.emitIf(true, 'user:action', 'view_profile', { name: 'Charlie', id: 3 });
enhanced.emitIf(false, 'user:action', 'delete_account', { name: 'David', id: 4 }); // 不会触发
console.log('\n4. 延迟事件发射:');
await enhanced.emitDelayed(500, 'user:action', 'update_profile', { name: 'Eve', id: 5 });
console.log('\n5. 节流事件发射:');
enhanced.emitThrottled('user:action', 1000, 'rapid_click', { name: 'Frank', id: 6 });
enhanced.emitThrottled('user:action', 1000, 'rapid_click', { name: 'Grace', id: 7 }); // 被节流
// 生成报告
console.log('\n📊 事件统计报告:');
const report = enhanced.generateReport();
console.log(JSON.stringify(report, null, 2));
// 演示事件总线
console.log('\n🚌 事件总线演示:');
const eventBus = new EventBus();
// 添加中间件
eventBus.use((context, next) => {
console.log(`🔍 中间件: 处理事件 ${context.event}`);
next();
});
// 添加过滤器
eventBus.addFilter((event, ...args) => {
if (event.startsWith('private:')) {
console.log(`🚫 过滤器: 拒绝私有事件 ${event}`);
return false;
}
return true;
});
// 测试事件总线
eventBus.on('public:message', (msg) => {
console.log(`📢 公共消息: ${msg}`);
});
eventBus.emit('public:message', 'Hello World!');
eventBus.emit('private:secret', 'This should be filtered'); // 被过滤
// 命名空间测试
const userNS = eventBus.namespace('user');
userNS.on('login', (user) => {
console.log(`👤 用户命名空间 - 登录: ${user.name}`);
});
userNS.emit('login', { name: 'Alice' });
}
module.exports = {
EnhancedEventEmitter,
EventBus,
demonstrateAdvancedEventEmitter
};
🛡️ 内存泄漏防护和性能优化
监听器泄漏检测
javascript
// memory-leak-protection.js
const EventEmitter = require('events');
class LeakProtectedEventEmitter extends EventEmitter {
constructor(options = {}) {
super();
this.options = {
maxListeners: options.maxListeners || 10,
warningThreshold: options.warningThreshold || 8,
leakDetectionInterval: options.leakDetectionInterval || 30000,
enableLeakDetection: options.enableLeakDetection !== false,
...options
};
// 监听器跟踪
this.listenerTracking = new Map();
this.leakWarnings = new Map();
this.setMaxListeners(this.options.maxListeners);
this.setupLeakDetection();
}
setupLeakDetection() {
if (!this.options.enableLeakDetection) {
return;
}
// 重写监听器添加方法
const originalOn = this.on;
const originalOnce = this.once;
const originalPrependListener = this.prependListener;
this.on = (event, listener) => {
this.trackListener(event, listener, 'on');
return originalOn.call(this, event, listener);
};
this.once = (event, listener) => {
this.trackListener(event, listener, 'once');
return originalOnce.call(this, event, listener);
};
this.prependListener = (event, listener) => {
this.trackListener(event, listener, 'prepend');
return originalPrependListener.call(this, event, listener);
};
// 重写监听器移除方法
const originalRemoveListener = this.removeListener;
const originalRemoveAllListeners = this.removeAllListeners;
this.removeListener = (event, listener) => {
this.untrackListener(event, listener);
return originalRemoveListener.call(this, event, listener);
};
this.removeAllListeners = (event) => {
if (event) {
this.untrackAllListeners(event);
} else {
this.listenerTracking.clear();
}
return originalRemoveAllListeners.call(this, event);
};
// 定期检测泄漏
this.leakDetectionTimer = setInterval(() => {
this.detectLeaks();
}, this.options.leakDetectionInterval);
// 监听新监听器添加警告
this.on('maxListenersExceeded', (event) => {
console.warn(`⚠️ 事件 "${event}" 监听器数量超过限制 (${this.getMaxListeners()})`);
this.analyzeListenerLeak(event);
});
}
trackListener(event, listener, type) {
if (!this.listenerTracking.has(event)) {
this.listenerTracking.set(event, []);
}
const tracking = this.listenerTracking.get(event);
tracking.push({
listener,
type,
addedAt: Date.now(),
stackTrace: this.captureStackTrace()
});
// 检查是否接近警告阈值
if (tracking.length >= this.options.warningThreshold) {
this.warnPotentialLeak(event, tracking.length);
}
}
untrackListener(event, listener) {
if (!this.listenerTracking.has(event)) {
return;
}
const tracking = this.listenerTracking.get(event);
const index = tracking.findIndex(item => item.listener === listener);
if (index > -1) {
tracking.splice(index, 1);
}
if (tracking.length === 0) {
this.listenerTracking.delete(event);
}
}
untrackAllListeners(event) {
this.listenerTracking.delete(event);
}
captureStackTrace() {
const stack = new Error().stack;
return stack.split('\n').slice(2, 6).join('\n'); // 保留相关的调用栈
}
warnPotentialLeak(event, count) {
const lastWarning = this.leakWarnings.get(event);
const now = Date.now();
// 避免频繁警告
if (lastWarning && now - lastWarning < 60000) {
return;
}
console.warn(`⚠️ 潜在内存泄漏: 事件 "${event}" 有 ${count} 个监听器`);
this.leakWarnings.set(event, now);
}
detectLeaks() {
const suspiciousEvents = [];
for (const [event, tracking] of this.listenerTracking) {
const currentCount = this.listenerCount(event);
// 检查监听器数量是否异常增长
if (currentCount > this.options.warningThreshold) {
suspiciousEvents.push({
event,
count: currentCount,
tracking: tracking.length
});
}
}
if (suspiciousEvents.length > 0) {
console.warn('🔍 泄漏检测发现可疑事件:');
suspiciousEvents.forEach(item => {
console.warn(` - ${item.event}: ${item.count} 个监听器`);
});
}
}
analyzeListenerLeak(event) {
const tracking = this.listenerTracking.get(event) || [];
console.log(`🔬 分析事件 "${event}" 的监听器:`);
console.log(` 总数: ${tracking.length}`);
// 按类型分组
const byType = {};
tracking.forEach(item => {
byType[item.type] = (byType[item.type] || 0) + 1;
});
console.log(' 按类型分布:', byType);
// 显示最近添加的监听器
const recent = tracking
.slice(-5)
.map(item => ({
type: item.type,
addedAt: new Date(item.addedAt).toISOString(),
stackTrace: item.stackTrace.split('\n')[0]
}));
console.log(' 最近添加的监听器:');
recent.forEach((item, index) => {
console.log(` ${index + 1}. ${item.type} - ${item.addedAt}`);
console.log(` ${item.stackTrace}`);
});
}
// 获取泄漏报告
getLeakReport() {
const report = {
timestamp: new Date().toISOString(),
totalEvents: this.eventNames().length,
suspiciousEvents: [],
memoryUsage: process.memoryUsage()
};
for (const event of this.eventNames()) {
const count = this.listenerCount(event);
if (count > this.options.warningThreshold) {
const tracking = this.listenerTracking.get(event) || [];
report.suspiciousEvents.push({
event,
listenerCount: count,
trackingCount: tracking.length,
types: tracking.reduce((acc, item) => {
acc[item.type] = (acc[item.type] || 0) + 1;
return acc;
}, {})
});
}
}
return report;
}
// 清理资源
cleanup() {
if (this.leakDetectionTimer) {
clearInterval(this.leakDetectionTimer);
this.leakDetectionTimer = null;
}
this.removeAllListeners();
this.listenerTracking.clear();
this.leakWarnings.clear();
console.log('🧹 EventEmitter 资源已清理');
}
}
// 性能优化的EventEmitter
class PerformantEventEmitter extends LeakProtectedEventEmitter {
constructor(options = {}) {
super(options);
// 事件缓存
this.eventCache = new Map();
this.cacheEnabled = options.enableCache !== false;
this.cacheSize = options.cacheSize || 100;
// 批处理配置
this.batchEnabled = options.enableBatch !== false;
this.batchSize = options.batchSize || 10;
this.batchTimeout = options.batchTimeout || 10;
this.batchQueue = new Map();
// 性能统计
this.performanceStats = {
emitCount: 0,
totalEmitTime: 0,
avgEmitTime: 0,
cacheHits: 0,
cacheMisses: 0,
batchedEmits: 0
};
}
// 缓存优化的emit
emit(event, ...args) {
const startTime = process.hrtime.bigint();
try {
let result;
if (this.batchEnabled && this.shouldBatch(event)) {
result = this.addToBatch(event, args);
} else {
result = super.emit(event, ...args);
}
this.updatePerformanceStats(startTime);
return result;
} catch (error) {
console.error('❌ 事件发射错误:', error);
return false;
}
}
shouldBatch(event) {
// 某些事件适合批处理
return event.includes('log') || event.includes('metric') || event.includes('track');
}
addToBatch(event, args) {
if (!this.batchQueue.has(event)) {
this.batchQueue.set(event, []);
}
const batch = this.batchQueue.get(event);
batch.push({ args, timestamp: Date.now() });
if (batch.length >= this.batchSize) {
this.flushBatch(event);
} else {
// 设置超时刷新
setTimeout(() => {
if (this.batchQueue.has(event) && this.batchQueue.get(event).length > 0) {
this.flushBatch(event);
}
}, this.batchTimeout);
}
return true;
}
flushBatch(event) {
const batch = this.batchQueue.get(event);
if (!batch || batch.length === 0) {
return;
}
this.batchQueue.delete(event);
this.performanceStats.batchedEmits++;
// 发射批处理事件
super.emit(`${event}:batch`, batch);
console.log(`📦 批处理事件 ${event}: ${batch.length} 项`);
}
updatePerformanceStats(startTime) {
const endTime = process.hrtime.bigint();
const duration = Number(endTime - startTime) / 1000000; // 毫秒
this.performanceStats.emitCount++;
this.performanceStats.totalEmitTime += duration;
this.performanceStats.avgEmitTime =
this.performanceStats.totalEmitTime / this.performanceStats.emitCount;
}
// 获取性能统计
getPerformanceStats() {
return {
...this.performanceStats,
cacheHitRate: this.performanceStats.cacheHits /
(this.performanceStats.cacheHits + this.performanceStats.cacheMisses) * 100,
batchEfficiency: this.performanceStats.batchedEmits / this.performanceStats.emitCount * 100
};
}
// 性能基准测试
async performanceBenchmark(iterations = 10000) {
console.log(`🏃 EventEmitter性能基准测试 (${iterations} 次)...`);
// 重置统计
this.performanceStats = {
emitCount: 0,
totalEmitTime: 0,
avgEmitTime: 0,
cacheHits: 0,
cacheMisses: 0,
batchedEmits: 0
};
// 添加测试监听器
this.on('benchmark:test', (data) => {
// 简单处理
});
this.on('benchmark:batch', (batch) => {
// 批处理
});
const startTime = Date.now();
// 执行基准测试
for (let i = 0; i < iterations; i++) {
if (i % 3 === 0) {
this.emit('benchmark:batch', { id: i, data: `test_${i}` });
} else {
this.emit('benchmark:test', { id: i, data: `test_${i}` });
}
}
// 等待批处理完成
await new Promise(resolve => setTimeout(resolve, 100));
const endTime = Date.now();
const totalTime = endTime - startTime;
const stats = this.getPerformanceStats();
console.log('📊 基准测试结果:');
console.log(` 总时间: ${totalTime}ms`);
console.log(` 平均每次发射: ${stats.avgEmitTime.toFixed(4)}ms`);
console.log(` 吞吐量: ${Math.round(iterations / totalTime * 1000)} events/sec`);
console.log(` 批处理效率: ${stats.batchEfficiency.toFixed(2)}%`);
return {
totalTime,
avgEmitTime: stats.avgEmitTime,
throughput: Math.round(iterations / totalTime * 1000),
batchEfficiency: stats.batchEfficiency
};
}
}
// 使用示例
async function demonstrateMemoryLeakProtection() {
console.log('🛡️ 内存泄漏防护演示...\n');
const protectedEmitter = new LeakProtectedEventEmitter({
maxListeners: 5,
warningThreshold: 3,
leakDetectionInterval: 5000
});
// 模拟正常使用
protectedEmitter.on('normal:event', () => {
console.log('正常事件处理');
});
// 模拟潜在泄漏
for (let i = 0; i < 7; i++) {
protectedEmitter.on('potential:leak', () => {
console.log(`监听器 ${i + 1}`);
});
}
// 触发事件
protectedEmitter.emit('normal:event');
protectedEmitter.emit('potential:leak');
// 等待泄漏检测
await new Promise(resolve => setTimeout(resolve, 1000));
// 获取泄漏报告
const leakReport = protectedEmitter.getLeakReport();
console.log('\n📋 泄漏检测报告:');
console.log(JSON.stringify(leakReport, null, 2));
// 性能测试
console.log('\n🚀 性能优化测试:');
const performantEmitter = new PerformantEventEmitter({
enableBatch: true,
batchSize: 5,
batchTimeout: 50
});
const benchmarkResult = await performantEmitter.performanceBenchmark(1000);
// 清理资源
protectedEmitter.cleanup();
performantEmitter.cleanup();
}
module.exports = {
LeakProtectedEventEmitter,
PerformantEventEmitter,
demonstrateMemoryLeakProtection
};
EventEmitter是Node.js事件驱动架构的核心,掌握其高级用法和优化技巧对构建高性能应用至关重要!