Skip to content

响应式编程

🎯 学习目标

  • 深入理解响应式编程的核心概念和原理
  • 掌握Observable、Stream等响应式数据流
  • 学会使用RxJS进行响应式编程
  • 了解响应式架构的设计模式和最佳实践

📚 核心概念

响应式编程基础

javascript
// 响应式编程核心概念
const reactiveProgrammingConcepts = {
  paradigm: {
    description: '基于数据流和变化传播的编程范式',
    characteristics: [
      '声明式编程',
      '异步数据流',
      '事件驱动',
      '函数式编程'
    ],
    benefits: [
      '处理复杂异步逻辑',
      '统一的数据流处理',
      '强大的操作符',
      '错误处理机制'
    ]
  },
  coreElements: {
    observable: 'Observable - 可观察对象,数据流的源头',
    observer: 'Observer - 观察者,订阅和处理数据',
    subscription: 'Subscription - 订阅关系,管理生命周期',
    operators: 'Operators - 操作符,转换和处理数据流',
    scheduler: 'Scheduler - 调度器,控制执行时机'
  },
  patterns: {
    hot: 'Hot Observable - 热流,主动发射数据',
    cold: 'Cold Observable - 冷流,被动发射数据',
    subject: 'Subject - 既是Observable又是Observer',
    backpressure: 'Backpressure - 背压处理机制'
  }
};

console.log('响应式编程概念:', reactiveProgrammingConcepts);

🌊 基础响应式实现

简单Observable实现

javascript
// simple-observable.js

// 基础Observable实现
class SimpleObservable {
  constructor(subscribe) {
    this.subscribe = subscribe;
  }

  // 静态创建方法
  static create(subscribe) {
    return new SimpleObservable(subscribe);
  }

  static of(...values) {
    return new SimpleObservable(observer => {
      try {
        for (const value of values) {
          observer.next(value);
        }
        observer.complete();
      } catch (error) {
        observer.error(error);
      }
    });
  }

  static from(iterable) {
    return new SimpleObservable(observer => {
      try {
        for (const value of iterable) {
          observer.next(value);
        }
        observer.complete();
      } catch (error) {
        observer.error(error);
      }
    });
  }

  static fromEvent(target, eventName) {
    return new SimpleObservable(observer => {
      const handler = (event) => observer.next(event);
      target.addEventListener(eventName, handler);
      
      return () => {
        target.removeEventListener(eventName, handler);
      };
    });
  }

  static interval(period) {
    return new SimpleObservable(observer => {
      let count = 0;
      const intervalId = setInterval(() => {
        observer.next(count++);
      }, period);
      
      return () => clearInterval(intervalId);
    });
  }

  static timer(delay, period) {
    return new SimpleObservable(observer => {
      const timeoutId = setTimeout(() => {
        observer.next(0);
        
        if (period) {
          let count = 1;
          const intervalId = setInterval(() => {
            observer.next(count++);
          }, period);
          
          return () => clearInterval(intervalId);
        } else {
          observer.complete();
        }
      }, delay);
      
      return () => clearTimeout(timeoutId);
    });
  }

  // 基础操作符
  map(transform) {
    return new SimpleObservable(observer => {
      return this.subscribe({
        next: value => {
          try {
            observer.next(transform(value));
          } catch (error) {
            observer.error(error);
          }
        },
        error: error => observer.error(error),
        complete: () => observer.complete()
      });
    });
  }

  filter(predicate) {
    return new SimpleObservable(observer => {
      return this.subscribe({
        next: value => {
          try {
            if (predicate(value)) {
              observer.next(value);
            }
          } catch (error) {
            observer.error(error);
          }
        },
        error: error => observer.error(error),
        complete: () => observer.complete()
      });
    });
  }

  take(count) {
    return new SimpleObservable(observer => {
      let taken = 0;
      const subscription = this.subscribe({
        next: value => {
          if (taken < count) {
            observer.next(value);
            taken++;
            if (taken === count) {
              observer.complete();
              if (subscription) subscription();
            }
          }
        },
        error: error => observer.error(error),
        complete: () => observer.complete()
      });
      
      return subscription;
    });
  }

  skip(count) {
    return new SimpleObservable(observer => {
      let skipped = 0;
      return this.subscribe({
        next: value => {
          if (skipped < count) {
            skipped++;
          } else {
            observer.next(value);
          }
        },
        error: error => observer.error(error),
        complete: () => observer.complete()
      });
    });
  }

  debounceTime(delay) {
    return new SimpleObservable(observer => {
      let timeoutId;
      
      return this.subscribe({
        next: value => {
          if (timeoutId) {
            clearTimeout(timeoutId);
          }
          
          timeoutId = setTimeout(() => {
            observer.next(value);
          }, delay);
        },
        error: error => observer.error(error),
        complete: () => {
          if (timeoutId) {
            clearTimeout(timeoutId);
          }
          observer.complete();
        }
      });
    });
  }

  throttleTime(delay) {
    return new SimpleObservable(observer => {
      let lastEmitTime = 0;
      
      return this.subscribe({
        next: value => {
          const now = Date.now();
          if (now - lastEmitTime >= delay) {
            observer.next(value);
            lastEmitTime = now;
          }
        },
        error: error => observer.error(error),
        complete: () => observer.complete()
      });
    });
  }

  distinctUntilChanged() {
    return new SimpleObservable(observer => {
      let hasValue = false;
      let lastValue;
      
      return this.subscribe({
        next: value => {
          if (!hasValue || value !== lastValue) {
            hasValue = true;
            lastValue = value;
            observer.next(value);
          }
        },
        error: error => observer.error(error),
        complete: () => observer.complete()
      });
    });
  }

  // 组合操作符
  merge(other) {
    return new SimpleObservable(observer => {
      let completed = 0;
      const subscription1 = this.subscribe({
        next: value => observer.next(value),
        error: error => observer.error(error),
        complete: () => {
          completed++;
          if (completed === 2) observer.complete();
        }
      });
      
      const subscription2 = other.subscribe({
        next: value => observer.next(value),
        error: error => observer.error(error),
        complete: () => {
          completed++;
          if (completed === 2) observer.complete();
        }
      });
      
      return () => {
        if (subscription1) subscription1();
        if (subscription2) subscription2();
      };
    });
  }

  concat(other) {
    return new SimpleObservable(observer => {
      const subscription1 = this.subscribe({
        next: value => observer.next(value),
        error: error => observer.error(error),
        complete: () => {
          const subscription2 = other.subscribe({
            next: value => observer.next(value),
            error: error => observer.error(error),
            complete: () => observer.complete()
          });
        }
      });
      
      return subscription1;
    });
  }

  // 错误处理
  catchError(handler) {
    return new SimpleObservable(observer => {
      return this.subscribe({
        next: value => observer.next(value),
        error: error => {
          try {
            const fallback = handler(error);
            if (fallback instanceof SimpleObservable) {
              fallback.subscribe(observer);
            } else {
              observer.next(fallback);
              observer.complete();
            }
          } catch (handlerError) {
            observer.error(handlerError);
          }
        },
        complete: () => observer.complete()
      });
    });
  }

  retry(count = 1) {
    return new SimpleObservable(observer => {
      let attempts = 0;
      
      const attempt = () => {
        this.subscribe({
          next: value => observer.next(value),
          error: error => {
            attempts++;
            if (attempts <= count) {
              console.log(`重试第 ${attempts} 次...`);
              attempt();
            } else {
              observer.error(error);
            }
          },
          complete: () => observer.complete()
        });
      };
      
      attempt();
    });
  }

  // 副作用操作符
  tap(sideEffect) {
    return new SimpleObservable(observer => {
      return this.subscribe({
        next: value => {
          try {
            sideEffect(value);
            observer.next(value);
          } catch (error) {
            observer.error(error);
          }
        },
        error: error => observer.error(error),
        complete: () => observer.complete()
      });
    });
  }

  // 终端操作符
  forEach(callback) {
    return new Promise((resolve, reject) => {
      this.subscribe({
        next: value => {
          try {
            callback(value);
          } catch (error) {
            reject(error);
          }
        },
        error: error => reject(error),
        complete: () => resolve()
      });
    });
  }

  toArray() {
    return new Promise((resolve, reject) => {
      const values = [];
      this.subscribe({
        next: value => values.push(value),
        error: error => reject(error),
        complete: () => resolve(values)
      });
    });
  }

  reduce(accumulator, seed) {
    return new Promise((resolve, reject) => {
      let acc = seed;
      let hasValue = seed !== undefined;
      
      this.subscribe({
        next: value => {
          try {
            if (hasValue) {
              acc = accumulator(acc, value);
            } else {
              acc = value;
              hasValue = true;
            }
          } catch (error) {
            reject(error);
          }
        },
        error: error => reject(error),
        complete: () => resolve(acc)
      });
    });
  }
}

// Subject实现
class SimpleSubject extends SimpleObservable {
  constructor() {
    super(observer => {
      this.observers.push(observer);
      
      return () => {
        const index = this.observers.indexOf(observer);
        if (index > -1) {
          this.observers.splice(index, 1);
        }
      };
    });
    
    this.observers = [];
    this.isStopped = false;
    this.hasError = false;
    this.thrownError = null;
  }

  next(value) {
    if (this.isStopped) return;
    
    for (const observer of this.observers.slice()) {
      try {
        observer.next(value);
      } catch (error) {
        console.error('Observer处理错误:', error);
      }
    }
  }

  error(error) {
    if (this.isStopped) return;
    
    this.isStopped = true;
    this.hasError = true;
    this.thrownError = error;
    
    for (const observer of this.observers.slice()) {
      try {
        observer.error(error);
      } catch (e) {
        console.error('Observer错误处理失败:', e);
      }
    }
    
    this.observers = [];
  }

  complete() {
    if (this.isStopped) return;
    
    this.isStopped = true;
    
    for (const observer of this.observers.slice()) {
      try {
        observer.complete();
      } catch (error) {
        console.error('Observer完成处理错误:', error);
      }
    }
    
    this.observers = [];
  }

  asObservable() {
    return new SimpleObservable(observer => this.subscribe(observer));
  }
}

// BehaviorSubject实现
class SimpleBehaviorSubject extends SimpleSubject {
  constructor(initialValue) {
    super();
    this.value = initialValue;
    this.hasCurrentValue = true;
  }

  subscribe(observer) {
    const subscription = super.subscribe(observer);
    
    if (this.hasCurrentValue && !this.isStopped) {
      observer.next(this.value);
    }
    
    if (this.hasError) {
      observer.error(this.thrownError);
    } else if (this.isStopped) {
      observer.complete();
    }
    
    return subscription;
  }

  next(value) {
    this.value = value;
    super.next(value);
  }

  getValue() {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.isStopped) {
      throw new Error('BehaviorSubject已完成');
    }
    
    return this.value;
  }
}

module.exports = {
  SimpleObservable,
  SimpleSubject,
  SimpleBehaviorSubject
};

响应式数据流处理

javascript
// reactive-data-streams.js
const { SimpleObservable, SimpleSubject, SimpleBehaviorSubject } = require('./simple-observable');

// 响应式数据存储
class ReactiveStore {
  constructor(initialState = {}) {
    this.state = new SimpleBehaviorSubject(initialState);
    this.actions = new SimpleSubject();
    this.effects = [];
    
    this.setupActionProcessing();
  }

  // 设置动作处理
  setupActionProcessing() {
    this.actions
      .tap(action => console.log(`🎯 执行动作: ${action.type}`))
      .subscribe({
        next: action => this.processAction(action),
        error: error => console.error('动作处理错误:', error)
      });
  }

  // 处理动作
  processAction(action) {
    try {
      const currentState = this.state.getValue();
      const newState = this.reducer(currentState, action);
      
      if (newState !== currentState) {
        this.state.next(newState);
      }
      
      // 触发副作用
      this.triggerEffects(action, newState);
      
    } catch (error) {
      console.error('状态更新失败:', error);
      this.state.error(error);
    }
  }

  // 状态归约器
  reducer(state, action) {
    switch (action.type) {
      case 'SET':
        return { ...state, [action.key]: action.value };
      
      case 'UPDATE':
        return { ...state, ...action.payload };
      
      case 'DELETE':
        const newState = { ...state };
        delete newState[action.key];
        return newState;
      
      case 'RESET':
        return action.payload || {};
      
      default:
        return state;
    }
  }

  // 触发副作用
  triggerEffects(action, state) {
    for (const effect of this.effects) {
      try {
        effect(action, state);
      } catch (error) {
        console.error('副作用执行失败:', error);
      }
    }
  }

  // 派发动作
  dispatch(action) {
    this.actions.next(action);
  }

  // 获取状态流
  getState$() {
    return this.state.asObservable();
  }

  // 选择状态片段
  select(selector) {
    return this.state
      .asObservable()
      .map(state => selector(state))
      .distinctUntilChanged();
  }

  // 添加副作用
  addEffect(effect) {
    this.effects.push(effect);
    
    return () => {
      const index = this.effects.indexOf(effect);
      if (index > -1) {
        this.effects.splice(index, 1);
      }
    };
  }

  // 获取当前状态
  getCurrentState() {
    return this.state.getValue();
  }
}

// 响应式HTTP客户端
class ReactiveHTTPClient {
  constructor() {
    this.requestSubject = new SimpleSubject();
    this.responseSubject = new SimpleSubject();
    this.errorSubject = new SimpleSubject();
    
    this.setupRequestProcessing();
  }

  setupRequestProcessing() {
    this.requestSubject
      .tap(request => console.log(`🌐 HTTP请求: ${request.method} ${request.url}`))
      .subscribe({
        next: request => this.processRequest(request),
        error: error => console.error('请求处理错误:', error)
      });
  }

  async processRequest(request) {
    const startTime = Date.now();
    
    try {
      // 模拟HTTP请求
      const response = await this.simulateHTTPRequest(request);
      const duration = Date.now() - startTime;
      
      const responseData = {
        ...response,
        request: request,
        duration: duration,
        timestamp: Date.now()
      };
      
      this.responseSubject.next(responseData);
      
    } catch (error) {
      const errorData = {
        error: error,
        request: request,
        timestamp: Date.now()
      };
      
      this.errorSubject.next(errorData);
    }
  }

  async simulateHTTPRequest(request) {
    // 模拟网络延迟
    const delay = Math.random() * 1000 + 200;
    await new Promise(resolve => setTimeout(resolve, delay));
    
    // 模拟错误
    if (Math.random() < 0.1) {
      throw new Error('网络错误');
    }
    
    return {
      status: 200,
      statusText: 'OK',
      data: { 
        message: 'Success',
        timestamp: Date.now(),
        url: request.url
      }
    };
  }

  // 发送GET请求
  get(url, options = {}) {
    const request = { method: 'GET', url, ...options };
    this.requestSubject.next(request);
    
    return this.responseSubject
      .filter(response => 
        response.request.method === request.method && 
        response.request.url === request.url
      )
      .take(1);
  }

  // 发送POST请求
  post(url, data, options = {}) {
    const request = { method: 'POST', url, data, ...options };
    this.requestSubject.next(request);
    
    return this.responseSubject
      .filter(response => 
        response.request.method === request.method && 
        response.request.url === request.url
      )
      .take(1);
  }

  // 获取响应流
  getResponses$() {
    return this.responseSubject.asObservable();
  }

  // 获取错误流
  getErrors$() {
    return this.errorSubject.asObservable();
  }
}

// 响应式表单验证
class ReactiveFormValidator {
  constructor() {
    this.fields = new Map();
    this.validationResults = new SimpleSubject();
  }

  // 添加字段
  addField(name, initialValue = '', validators = []) {
    const field = {
      name: name,
      value: new SimpleBehaviorSubject(initialValue),
      validators: validators,
      errors: new SimpleBehaviorSubject([]),
      isValid: new SimpleBehaviorSubject(true)
    };

    // 设置验证
    field.value
      .debounceTime(300)
      .subscribe({
        next: value => this.validateField(name, value)
      });

    this.fields.set(name, field);
    return field;
  }

  // 验证字段
  async validateField(name, value) {
    const field = this.fields.get(name);
    if (!field) return;

    const errors = [];
    
    for (const validator of field.validators) {
      try {
        const result = await validator(value);
        if (result !== true) {
          errors.push(result);
        }
      } catch (error) {
        errors.push(error.message);
      }
    }

    field.errors.next(errors);
    field.isValid.next(errors.length === 0);

    // 发出整体验证结果
    this.emitValidationResult();
  }

  // 发出验证结果
  emitValidationResult() {
    const result = {};
    let isFormValid = true;

    for (const [name, field] of this.fields) {
      const fieldResult = {
        value: field.value.getValue(),
        errors: field.errors.getValue(),
        isValid: field.isValid.getValue()
      };
      
      result[name] = fieldResult;
      
      if (!fieldResult.isValid) {
        isFormValid = false;
      }
    }

    this.validationResults.next({
      fields: result,
      isValid: isFormValid,
      timestamp: Date.now()
    });
  }

  // 设置字段值
  setValue(name, value) {
    const field = this.fields.get(name);
    if (field) {
      field.value.next(value);
    }
  }

  // 获取字段值流
  getFieldValue$(name) {
    const field = this.fields.get(name);
    return field ? field.value.asObservable() : null;
  }

  // 获取字段错误流
  getFieldErrors$(name) {
    const field = this.fields.get(name);
    return field ? field.errors.asObservable() : null;
  }

  // 获取验证结果流
  getValidationResults$() {
    return this.validationResults.asObservable();
  }

  // 重置表单
  reset() {
    for (const field of this.fields.values()) {
      field.value.next('');
      field.errors.next([]);
      field.isValid.next(true);
    }
  }
}

// 常用验证器
const validators = {
  required: (message = '此字段是必需的') => {
    return (value) => {
      return value && value.trim() !== '' ? true : message;
    };
  },

  minLength: (min, message) => {
    return (value) => {
      const msg = message || `最少需要 ${min} 个字符`;
      return value && value.length >= min ? true : msg;
    };
  },

  maxLength: (max, message) => {
    return (value) => {
      const msg = message || `最多允许 ${max} 个字符`;
      return !value || value.length <= max ? true : msg;
    };
  },

  email: (message = '请输入有效的邮箱地址') => {
    const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
    return (value) => {
      return !value || emailRegex.test(value) ? true : message;
    };
  },

  pattern: (regex, message = '格式不正确') => {
    return (value) => {
      return !value || regex.test(value) ? true : message;
    };
  },

  async: (asyncValidator, message = '验证失败') => {
    return async (value) => {
      try {
        const result = await asyncValidator(value);
        return result ? true : message;
      } catch (error) {
        return error.message || message;
      }
    };
  }
};

module.exports = {
  ReactiveStore,
  ReactiveHTTPClient,
  ReactiveFormValidator,
  validators
};

🚀 实际应用示例

实时数据监控系统

javascript
// real-time-monitoring.js
const { 
  SimpleObservable, 
  SimpleSubject, 
  SimpleBehaviorSubject 
} = require('./simple-observable');

// 实时监控系统
class RealTimeMonitoringSystem {
  constructor() {
    this.metrics = new Map();
    this.alerts = new SimpleSubject();
    this.systemStatus = new SimpleBehaviorSubject('healthy');
    this.thresholds = new Map();
    
    this.setupMonitoring();
  }

  // 设置监控
  setupMonitoring() {
    // 监控所有指标变化
    this.getAllMetrics$()
      .subscribe({
        next: metrics => this.analyzeMetrics(metrics),
        error: error => console.error('指标分析错误:', error)
      });

    // 监控告警
    this.alerts
      .tap(alert => console.log(`🚨 告警: ${alert.type} - ${alert.message}`))
      .subscribe();
  }

  // 添加指标
  addMetric(name, initialValue = 0) {
    const metric = {
      name: name,
      value: new SimpleBehaviorSubject(initialValue),
      history: [],
      lastUpdated: Date.now()
    };

    // 记录历史数据
    metric.value.subscribe({
      next: value => {
        metric.history.push({
          value: value,
          timestamp: Date.now()
        });
        
        // 限制历史记录长度
        if (metric.history.length > 100) {
          metric.history.shift();
        }
        
        metric.lastUpdated = Date.now();
      }
    });

    this.metrics.set(name, metric);
    return metric;
  }

  // 更新指标值
  updateMetric(name, value) {
    const metric = this.metrics.get(name);
    if (metric) {
      metric.value.next(value);
    }
  }

  // 设置阈值
  setThreshold(metricName, config) {
    this.thresholds.set(metricName, config);
  }

  // 获取指标流
  getMetric$(name) {
    const metric = this.metrics.get(name);
    return metric ? metric.value.asObservable() : null;
  }

  // 获取所有指标流
  getAllMetrics$() {
    return SimpleObservable.interval(1000)
      .map(() => {
        const metrics = {};
        for (const [name, metric] of this.metrics) {
          metrics[name] = {
            name: name,
            value: metric.value.getValue(),
            lastUpdated: metric.lastUpdated,
            history: metric.history.slice(-10) // 最近10个值
          };
        }
        return metrics;
      });
  }

  // 分析指标
  analyzeMetrics(metrics) {
    for (const [name, metric] of Object.entries(metrics)) {
      const threshold = this.thresholds.get(name);
      if (!threshold) continue;

      this.checkThresholds(name, metric, threshold);
    }

    // 更新系统状态
    this.updateSystemStatus(metrics);
  }

  // 检查阈值
  checkThresholds(name, metric, threshold) {
    const value = metric.value;

    if (threshold.critical && value >= threshold.critical) {
      this.alerts.next({
        type: 'critical',
        metric: name,
        value: value,
        threshold: threshold.critical,
        message: `${name} 达到临界值: ${value}`,
        timestamp: Date.now()
      });
    } else if (threshold.warning && value >= threshold.warning) {
      this.alerts.next({
        type: 'warning',
        metric: name,
        value: value,
        threshold: threshold.warning,
        message: `${name} 达到警告值: ${value}`,
        timestamp: Date.now()
      });
    }
  }

  // 更新系统状态
  updateSystemStatus(metrics) {
    let status = 'healthy';
    
    for (const [name, metric] of Object.entries(metrics)) {
      const threshold = this.thresholds.get(name);
      if (!threshold) continue;

      if (threshold.critical && metric.value >= threshold.critical) {
        status = 'critical';
        break;
      } else if (threshold.warning && metric.value >= threshold.warning) {
        status = 'warning';
      }
    }

    if (status !== this.systemStatus.getValue()) {
      this.systemStatus.next(status);
      console.log(`📊 系统状态更新: ${status}`);
    }
  }

  // 获取告警流
  getAlerts$() {
    return this.alerts.asObservable();
  }

  // 获取系统状态流
  getSystemStatus$() {
    return this.systemStatus.asObservable();
  }

  // 获取统计信息
  getStatistics() {
    const stats = {};
    
    for (const [name, metric] of this.metrics) {
      const history = metric.history;
      if (history.length === 0) continue;

      const values = history.map(h => h.value);
      stats[name] = {
        current: metric.value.getValue(),
        min: Math.min(...values),
        max: Math.max(...values),
        avg: values.reduce((a, b) => a + b, 0) / values.length,
        trend: this.calculateTrend(values)
      };
    }

    return stats;
  }

  // 计算趋势
  calculateTrend(values) {
    if (values.length < 2) return 'stable';
    
    const recent = values.slice(-5);
    const older = values.slice(-10, -5);
    
    if (recent.length === 0 || older.length === 0) return 'stable';
    
    const recentAvg = recent.reduce((a, b) => a + b, 0) / recent.length;
    const olderAvg = older.reduce((a, b) => a + b, 0) / older.length;
    
    const change = (recentAvg - olderAvg) / olderAvg;
    
    if (change > 0.1) return 'increasing';
    if (change < -0.1) return 'decreasing';
    return 'stable';
  }
}

// 使用示例
async function demonstrateReactiveProgramming() {
  console.log('🌊 响应式编程演示...\n');

  // 1. 基础Observable操作
  console.log('1. 基础Observable操作:');
  
  const numbers$ = SimpleObservable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  
  const result = await numbers$
    .filter(x => x % 2 === 0)
    .map(x => x * x)
    .take(3)
    .toArray();
  
  console.log('  过滤偶数并平方,取前3个:', result);

  // 2. 实时监控系统
  console.log('\n2. 实时监控系统:');
  
  const monitoring = new RealTimeMonitoringSystem();
  
  // 添加指标
  monitoring.addMetric('cpu_usage', 0);
  monitoring.addMetric('memory_usage', 0);
  monitoring.addMetric('response_time', 0);
  
  // 设置阈值
  monitoring.setThreshold('cpu_usage', { warning: 70, critical: 90 });
  monitoring.setThreshold('memory_usage', { warning: 80, critical: 95 });
  monitoring.setThreshold('response_time', { warning: 500, critical: 1000 });

  // 订阅告警
  monitoring.getAlerts$()
    .take(3)
    .subscribe({
      next: alert => console.log(`  🚨 ${alert.type}: ${alert.message}`)
    });

  // 订阅系统状态
  monitoring.getSystemStatus$()
    .distinctUntilChanged()
    .take(3)
    .subscribe({
      next: status => console.log(`  📊 系统状态: ${status}`)
    });

  // 模拟指标更新
  setTimeout(() => monitoring.updateMetric('cpu_usage', 75), 100);
  setTimeout(() => monitoring.updateMetric('memory_usage', 85), 200);
  setTimeout(() => monitoring.updateMetric('response_time', 1200), 300);

  // 等待一段时间观察结果
  await new Promise(resolve => setTimeout(resolve, 1000));

  console.log('\n  📈 统计信息:');
  const stats = monitoring.getStatistics();
  for (const [name, stat] of Object.entries(stats)) {
    console.log(`    ${name}: 当前=${stat.current}, 平均=${stat.avg.toFixed(2)}, 趋势=${stat.trend}`);
  }

  // 3. 数据流合并
  console.log('\n3. 数据流合并:');
  
  const stream1$ = SimpleObservable.interval(300).map(x => `A${x}`).take(3);
  const stream2$ = SimpleObservable.interval(500).map(x => `B${x}`).take(3);
  
  const merged = await stream1$
    .merge(stream2$)
    .toArray();
  
  console.log('  合并流结果:', merged);

  // 4. 错误处理和重试
  console.log('\n4. 错误处理和重试:');
  
  let attempts = 0;
  const unreliable$ = SimpleObservable.create(observer => {
    attempts++;
    if (attempts < 3) {
      observer.error(new Error(`尝试 ${attempts} 失败`));
    } else {
      observer.next(`成功在第 ${attempts} 次尝试`);
      observer.complete();
    }
  });

  try {
    const retryResult = await unreliable$
      .retry(3)
      .toArray();
    
    console.log('  重试结果:', retryResult);
  } catch (error) {
    console.log('  最终失败:', error.message);
  }
}

// 如果直接运行此文件
if (require.main === module) {
  demonstrateReactiveProgramming().catch(console.error);
}

module.exports = {
  RealTimeMonitoringSystem,
  demonstrateReactiveProgramming
};

响应式编程为处理复杂的异步数据流提供了强大而优雅的解决方案,是现代JavaScript应用开发的重要技术!