Skip to content

缓存一致性

概述

缓存一致性是分布式系统中的核心挑战之一,指的是确保缓存中的数据与数据库中的数据保持同步。不同的一致性级别和实现策略适用于不同的业务场景,需要在性能、可用性和一致性之间做出权衡。

一致性级别

1. 强一致性

javascript
// 强一致性缓存实现
class StrongConsistencyCache {
  constructor(cache, database) {
    this.cache = cache;
    this.database = database;
    this.lockManager = new Map(); // 分布式锁管理
  }
  
  async get(key) {
    // 获取读锁
    const lock = await this.acquireReadLock(key);
    
    try {
      // 先从缓存读取
      let data = await this.cache.get(key);
      
      if (data === null) {
        // 缓存未命中,从数据库读取
        data = await this.database.get(key);
        
        if (data !== null) {
          // 写入缓存
          await this.cache.set(key, data, 3600);
        }
      }
      
      return data;
    } finally {
      await this.releaseReadLock(key, lock);
    }
  }
  
  async set(key, data) {
    // 获取写锁
    const lock = await this.acquireWriteLock(key);
    
    try {
      // 先更新数据库
      await this.database.set(key, data);
      
      // 然后更新缓存
      await this.cache.set(key, data, 3600);
      
      return data;
    } finally {
      await this.releaseWriteLock(key, lock);
    }
  }
  
  async delete(key) {
    const lock = await this.acquireWriteLock(key);
    
    try {
      // 先删除数据库记录
      await this.database.delete(key);
      
      // 然后删除缓存
      await this.cache.delete(key);
    } finally {
      await this.releaseWriteLock(key, lock);
    }
  }
  
  async acquireReadLock(key) {
    const lockKey = `read_lock:${key}`;\n    const lockValue = `${Date.now()}_${Math.random()}`;\n    \n    // 简化的读锁实现(实际应该使用Redis等分布式锁)\n    const acquired = await this.cache.set(lockKey, lockValue, 10000, 'NX');\n    \n    if (!acquired) {\n      // 等待并重试\n      await new Promise(resolve => setTimeout(resolve, 10));\n      return await this.acquireReadLock(key);\n    }\n    \n    return { lockKey, lockValue };\n  }\n  \n  async acquireWriteLock(key) {\n    const lockKey = `write_lock:${key}`;\n    const lockValue = `${Date.now()}_${Math.random()}`;\n    \n    const acquired = await this.cache.set(lockKey, lockValue, 10000, 'NX');\n    \n    if (!acquired) {\n      await new Promise(resolve => setTimeout(resolve, 10));\n      return await this.acquireWriteLock(key);\n    }\n    \n    return { lockKey, lockValue };\n  }\n  \n  async releaseReadLock(key, lock) {\n    await this.cache.delete(lock.lockKey);\n  }\n  \n  async releaseWriteLock(key, lock) {\n    await this.cache.delete(lock.lockKey);\n  }\n}\n```\n\n### 2. 最终一致性\n```javascript\n// 最终一致性缓存实现\nclass EventualConsistencyCache {\n  constructor(cache, database, eventBus) {\n    this.cache = cache;\n    this.database = database;\n    this.eventBus = eventBus;\n    this.inconsistencyWindow = 5000; // 5秒不一致窗口\n    \n    this.setupEventHandlers();\n  }\n  \n  setupEventHandlers() {\n    this.eventBus.on('data.changed', async (event) => {\n      await this.handleDataChange(event);\n    });\n    \n    this.eventBus.on('cache.invalidate', async (event) => {\n      await this.handleCacheInvalidation(event);\n    });\n  }\n  \n  async get(key) {\n    // 直接从缓存读取(可能不是最新数据)\n    let data = await this.cache.get(key);\n    \n    if (data === null) {\n      // 缓存未命中,从数据库读取\n      data = await this.database.get(key);\n      \n      if (data !== null) {\n        // 异步写入缓存\n        this.cache.set(key, data, 3600).catch(error => {\n          console.error('缓存写入失败:', error);\n        });\n      }\n    }\n    \n    return data;\n  }\n  \n  async set(key, data) {\n    // 立即更新数据库\n    await this.database.set(key, data);\n    \n    // 发布数据变更事件\n    this.eventBus.emit('data.changed', {\n      key,\n      data,\n      timestamp: Date.now(),\n      operation: 'set'\n    });\n    \n    return data;\n  }\n  \n  async handleDataChange(event) {\n    // 延迟更新缓存,允许短暂的不一致\n    setTimeout(async () => {\n      try {\n        if (event.operation === 'set') {\n          await this.cache.set(event.key, event.data, 3600);\n        } else if (event.operation === 'delete') {\n          await this.cache.delete(event.key);\n        }\n        \n        console.log(`缓存已同步: ${event.key}`);\n      } catch (error) {\n        console.error('缓存同步失败:', error);\n        // 可以实现重试机制\n      }\n    }, Math.random() * this.inconsistencyWindow);\n  }\n  \n  async handleCacheInvalidation(event) {\n    // 处理缓存失效\n    if (event.pattern) {\n      // 模式匹配失效\n      const keys = await this.cache.keys(event.pattern);\n      for (const key of keys) {\n        await this.cache.delete(key);\n      }\n    } else if (event.keys) {\n      // 批量失效\n      for (const key of event.keys) {\n        await this.cache.delete(key);\n      }\n    }\n  }\n}\n```\n\n## 一致性实现策略\n\n### 1. 版本号机制\n```javascript\n// 基于版本号的一致性控制\nclass VersionBasedConsistency {\n  constructor(cache, database) {\n    this.cache = cache;\n    this.database = database;\n  }\n  \n  async get(key) {\n    // 获取缓存数据和版本\n    const [cachedData, cachedVersion] = await Promise.all([\n      this.cache.get(`data:${key}`),\n      this.cache.get(`version:${key}`)\n    ]);\n    \n    if (cachedData && cachedVersion) {\n      // 验证版本是否最新\n      const currentVersion = await this.database.getVersion(key);\n      \n      if (cachedVersion === currentVersion) {\n        return cachedData;\n      }\n      \n      // 版本不匹配,清理旧缓存\n      await this.invalidateCache(key);\n    }\n    \n    // 从数据库加载最新数据\n    const [data, version] = await Promise.all([\n      this.database.get(key),\n      this.database.getVersion(key)\n    ]);\n    \n    if (data) {\n      // 缓存数据和版本\n      await Promise.all([\n        this.cache.set(`data:${key}`, data, 3600),\n        this.cache.set(`version:${key}`, version, 3600)\n      ]);\n    }\n    \n    return data;\n  }\n  \n  async set(key, data) {\n    // 生成新版本号\n    const newVersion = this.generateVersion();\n    \n    // 使用事务更新数据库\n    await this.database.transaction(async (tx) => {\n      await tx.set(key, data);\n      await tx.setVersion(key, newVersion);\n    });\n    \n    // 更新缓存\n    await Promise.all([\n      this.cache.set(`data:${key}`, data, 3600),\n      this.cache.set(`version:${key}`, newVersion, 3600)\n    ]);\n    \n    return data;\n  }\n  \n  generateVersion() {\n    return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;\n  }\n  \n  async invalidateCache(key) {\n    await Promise.all([\n      this.cache.delete(`data:${key}`),\n      this.cache.delete(`version:${key}`)\n    ]);\n  }\n}\n```\n\n### 2. 时间戳机制\n```javascript\n// 基于时间戳的一致性控制\nclass TimestampBasedConsistency {\n  constructor(cache, database, options = {}) {\n    this.cache = cache;\n    this.database = database;\n    this.maxAge = options.maxAge || 300000; // 5分钟最大缓存时间\n    this.staleWhileRevalidate = options.staleWhileRevalidate || 60000; // 1分钟过期后仍可使用\n  }\n  \n  async get(key) {\n    const cached = await this.cache.get(key);\n    \n    if (cached) {\n      const age = Date.now() - cached.timestamp;\n      \n      if (age < this.maxAge) {\n        // 数据新鲜,直接返回\n        return cached.data;\n      } else if (age < this.maxAge + this.staleWhileRevalidate) {\n        // 数据过期但在容忍范围内,异步更新\n        this.revalidateInBackground(key);\n        return cached.data;\n      }\n    }\n    \n    // 数据过期或不存在,同步加载\n    return await this.loadAndCache(key);\n  }\n  \n  async set(key, data) {\n    const timestamp = Date.now();\n    \n    // 更新数据库\n    await this.database.set(key, data, timestamp);\n    \n    // 更新缓存\n    await this.cache.set(key, {\n      data,\n      timestamp\n    }, this.maxAge + this.staleWhileRevalidate);\n    \n    return data;\n  }\n  \n  async loadAndCache(key) {\n    const data = await this.database.get(key);\n    \n    if (data) {\n      const cached = {\n        data: data.value,\n        timestamp: data.timestamp || Date.now()\n      };\n      \n      await this.cache.set(key, cached, this.maxAge + this.staleWhileRevalidate);\n      return cached.data;\n    }\n    \n    return null;\n  }\n  \n  async revalidateInBackground(key) {\n    try {\n      await this.loadAndCache(key);\n      console.log(`后台重新验证完成: ${key}`);\n    } catch (error) {\n      console.error(`后台重新验证失败: ${key}`, error);\n    }\n  }\n}\n```\n\n### 3. 事件驱动同步\n```javascript\n// 事件驱动的缓存同步\nclass EventDrivenSync {\n  constructor(cache, database, eventBus) {\n    this.cache = cache;\n    this.database = database;\n    this.eventBus = eventBus;\n    this.syncQueue = [];\n    this.isProcessing = false;\n    \n    this.setupEventHandlers();\n    this.startSyncProcessor();\n  }\n  \n  setupEventHandlers() {\n    // 监听数据变更事件\n    this.eventBus.on('db.insert', (event) => this.queueSync(event));\n    this.eventBus.on('db.update', (event) => this.queueSync(event));\n    this.eventBus.on('db.delete', (event) => this.queueSync(event));\n    \n    // 监听缓存失效事件\n    this.eventBus.on('cache.invalidate', (event) => this.handleInvalidation(event));\n  }\n  \n  async get(key) {\n    return await this.cache.get(key);\n  }\n  \n  async set(key, data) {\n    // 更新数据库\n    await this.database.set(key, data);\n    \n    // 发布变更事件\n    this.eventBus.emit('db.update', {\n      key,\n      data,\n      timestamp: Date.now(),\n      source: 'api'\n    });\n    \n    return data;\n  }\n  \n  queueSync(event) {\n    this.syncQueue.push({\n      ...event,\n      queuedAt: Date.now()\n    });\n  }\n  \n  startSyncProcessor() {\n    setInterval(async () => {\n      if (!this.isProcessing && this.syncQueue.length > 0) {\n        await this.processSyncQueue();\n      }\n    }, 100); // 每100ms处理一次\n  }\n  \n  async processSyncQueue() {\n    this.isProcessing = true;\n    \n    try {\n      const batch = this.syncQueue.splice(0, 10); // 批量处理\n      \n      for (const event of batch) {\n        await this.syncCacheFromEvent(event);\n      }\n    } catch (error) {\n      console.error('同步处理失败:', error);\n    } finally {\n      this.isProcessing = false;\n    }\n  }\n  \n  async syncCacheFromEvent(event) {\n    try {\n      switch (event.type) {\n        case 'db.insert':\n        case 'db.update':\n          await this.cache.set(event.key, event.data, 3600);\n          break;\n        case 'db.delete':\n          await this.cache.delete(event.key);\n          break;\n      }\n      \n      console.log(`缓存同步完成: ${event.key}`);\n    } catch (error) {\n      console.error(`缓存同步失败: ${event.key}`, error);\n      \n      // 重试机制\n      if (event.retries < 3) {\n        this.syncQueue.push({\n          ...event,\n          retries: (event.retries || 0) + 1\n        });\n      }\n    }\n  }\n  \n  async handleInvalidation(event) {\n    if (event.keys) {\n      for (const key of event.keys) {\n        await this.cache.delete(key);\n      }\n    } else if (event.pattern) {\n      const keys = await this.cache.keys(event.pattern);\n      for (const key of keys) {\n        await this.cache.delete(key);\n      }\n    }\n  }\n}\n```\n\n## 一致性检测和修复\n\n### 1. 一致性检测\n```javascript\n// 缓存一致性检测器\nclass ConsistencyChecker {\n  constructor(cache, database) {\n    this.cache = cache;\n    this.database = database;\n    this.inconsistencies = [];\n  }\n  \n  // 检查单个键的一致性\n  async checkKey(key) {\n    const [cachedData, dbData] = await Promise.all([\n      this.cache.get(key),\n      this.database.get(key)\n    ]);\n    \n    const isConsistent = this.compareData(cachedData, dbData);\n    \n    if (!isConsistent) {\n      const inconsistency = {\n        key,\n        cachedData,\n        dbData,\n        detectedAt: new Date(),\n        type: this.getInconsistencyType(cachedData, dbData)\n      };\n      \n      this.inconsistencies.push(inconsistency);\n      return inconsistency;\n    }\n    \n    return null;\n  }\n  \n  // 批量检查一致性\n  async checkKeys(keys) {\n    const results = [];\n    const batchSize = 50;\n    \n    for (let i = 0; i < keys.length; i += batchSize) {\n      const batch = keys.slice(i, i + batchSize);\n      const batchResults = await Promise.all(\n        batch.map(key => this.checkKey(key))\n      );\n      \n      results.push(...batchResults.filter(Boolean));\n    }\n    \n    return results;\n  }\n  \n  // 全量一致性检查\n  async fullConsistencyCheck() {\n    console.log('开始全量一致性检查...');\n    const startTime = Date.now();\n    \n    // 获取所有缓存键\n    const cacheKeys = await this.cache.keys('*');\n    \n    // 检查缓存中的键\n    const cacheInconsistencies = await this.checkKeys(cacheKeys);\n    \n    // 检查数据库中存在但缓存中不存在的数据\n    const dbKeys = await this.database.getAllKeys();\n    const missingInCache = dbKeys.filter(key => !cacheKeys.includes(key));\n    \n    const missingInconsistencies = missingInCache.map(key => ({\n      key,\n      cachedData: null,\n      dbData: 'exists',\n      detectedAt: new Date(),\n      type: 'missing_in_cache'\n    }));\n    \n    const allInconsistencies = [...cacheInconsistencies, ...missingInconsistencies];\n    \n    const duration = Date.now() - startTime;\n    console.log(`一致性检查完成,耗时${duration}ms,发现${allInconsistencies.length}个不一致`);\n    \n    return {\n      totalChecked: cacheKeys.length + missingInCache.length,\n      inconsistencies: allInconsistencies,\n      duration,\n      checkTime: new Date()\n    };\n  }\n  \n  compareData(cachedData, dbData) {\n    if (cachedData === null && dbData === null) return true;\n    if (cachedData === null || dbData === null) return false;\n    \n    // 深度比较(简化实现)\n    return JSON.stringify(cachedData) === JSON.stringify(dbData);\n  }\n  \n  getInconsistencyType(cachedData, dbData) {\n    if (cachedData === null && dbData !== null) return 'missing_in_cache';\n    if (cachedData !== null && dbData === null) return 'extra_in_cache';\n    return 'data_mismatch';\n  }\n  \n  // 获取一致性报告\n  getConsistencyReport() {\n    const typeCount = {};\n    this.inconsistencies.forEach(inc => {\n      typeCount[inc.type] = (typeCount[inc.type] || 0) + 1;\n    });\n    \n    return {\n      totalInconsistencies: this.inconsistencies.length,\n      byType: typeCount,\n      recent: this.inconsistencies.slice(-10),\n      oldestInconsistency: this.inconsistencies[0]?.detectedAt,\n      newestInconsistency: this.inconsistencies[this.inconsistencies.length - 1]?.detectedAt\n    };\n  }\n}\n```\n\n### 2. 自动修复\n```javascript\n// 缓存一致性自动修复器\nclass ConsistencyRepairer {\n  constructor(cache, database, checker) {\n    this.cache = cache;\n    this.database = database;\n    this.checker = checker;\n    this.repairStrategies = new Map();\n    \n    this.initializeRepairStrategies();\n  }\n  \n  initializeRepairStrategies() {\n    this.repairStrategies.set('missing_in_cache', this.repairMissingInCache.bind(this));\n    this.repairStrategies.set('extra_in_cache', this.repairExtraInCache.bind(this));\n    this.repairStrategies.set('data_mismatch', this.repairDataMismatch.bind(this));\n  }\n  \n  // 修复单个不一致\n  async repairInconsistency(inconsistency) {\n    const strategy = this.repairStrategies.get(inconsistency.type);\n    \n    if (!strategy) {\n      throw new Error(`未知的不一致类型: ${inconsistency.type}`);\n    }\n    \n    try {\n      await strategy(inconsistency);\n      console.log(`修复完成: ${inconsistency.key} (${inconsistency.type})`);\n      return true;\n    } catch (error) {\n      console.error(`修复失败: ${inconsistency.key}`, error);\n      return false;\n    }\n  }\n  \n  // 批量修复\n  async repairInconsistencies(inconsistencies) {\n    const results = {\n      total: inconsistencies.length,\n      repaired: 0,\n      failed: 0,\n      errors: []\n    };\n    \n    for (const inconsistency of inconsistencies) {\n      try {\n        const success = await this.repairInconsistency(inconsistency);\n        if (success) {\n          results.repaired++;\n        } else {\n          results.failed++;\n        }\n      } catch (error) {\n        results.failed++;\n        results.errors.push({\n          key: inconsistency.key,\n          error: error.message\n        });\n      }\n    }\n    \n    return results;\n  }\n  \n  // 修复缓存中缺失的数据\n  async repairMissingInCache(inconsistency) {\n    const dbData = await this.database.get(inconsistency.key);\n    if (dbData !== null) {\n      await this.cache.set(inconsistency.key, dbData, 3600);\n    }\n  }\n  \n  // 修复缓存中多余的数据\n  async repairExtraInCache(inconsistency) {\n    await this.cache.delete(inconsistency.key);\n  }\n  \n  // 修复数据不匹配\n  async repairDataMismatch(inconsistency) {\n    // 默认以数据库为准\n    const dbData = await this.database.get(inconsistency.key);\n    \n    if (dbData !== null) {\n      await this.cache.set(inconsistency.key, dbData, 3600);\n    } else {\n      await this.cache.delete(inconsistency.key);\n    }\n  }\n  \n  // 自动修复流程\n  async autoRepair(options = {}) {\n    console.log('开始自动一致性修复...');\n    \n    // 1. 执行一致性检查\n    const checkResult = await this.checker.fullConsistencyCheck();\n    \n    if (checkResult.inconsistencies.length === 0) {\n      console.log('未发现一致性问题');\n      return { repaired: 0, failed: 0 };\n    }\n    \n    // 2. 过滤需要修复的不一致\n    let toRepair = checkResult.inconsistencies;\n    \n    if (options.maxRepairs) {\n      toRepair = toRepair.slice(0, options.maxRepairs);\n    }\n    \n    if (options.repairTypes) {\n      toRepair = toRepair.filter(inc => options.repairTypes.includes(inc.type));\n    }\n    \n    // 3. 执行修复\n    const repairResult = await this.repairInconsistencies(toRepair);\n    \n    console.log(`自动修复完成: 修复${repairResult.repaired}个,失败${repairResult.failed}个`);\n    \n    return repairResult;\n  }\n}\n```\n\n## 监控和告警\n\n### 1. 一致性监控\n```javascript\n// 一致性监控系统\nclass ConsistencyMonitor {\n  constructor(checker, repairer, options = {}) {\n    this.checker = checker;\n    this.repairer = repairer;\n    \n    this.options = {\n      checkInterval: options.checkInterval || 300000, // 5分钟\n      alertThreshold: options.alertThreshold || 10,\n      autoRepair: options.autoRepair || false,\n      maxAutoRepairs: options.maxAutoRepairs || 100\n    };\n    \n    this.metrics = {\n      checksPerformed: 0,\n      inconsistenciesFound: 0,\n      repairsAttempted: 0,\n      repairsSucceeded: 0,\n      lastCheckTime: null,\n      lastInconsistencyCount: 0\n    };\n    \n    this.alerts = [];\n  }\n  \n  // 启动监控\n  startMonitoring() {\n    console.log('启动一致性监控...');\n    \n    this.monitorInterval = setInterval(async () => {\n      await this.performCheck();\n    }, this.options.checkInterval);\n  }\n  \n  // 停止监控\n  stopMonitoring() {\n    if (this.monitorInterval) {\n      clearInterval(this.monitorInterval);\n      console.log('一致性监控已停止');\n    }\n  }\n  \n  // 执行检查\n  async performCheck() {\n    try {\n      const checkResult = await this.checker.fullConsistencyCheck();\n      \n      this.metrics.checksPerformed++;\n      this.metrics.inconsistenciesFound += checkResult.inconsistencies.length;\n      this.metrics.lastCheckTime = new Date();\n      this.metrics.lastInconsistencyCount = checkResult.inconsistencies.length;\n      \n      // 检查是否需要告警\n      if (checkResult.inconsistencies.length > this.options.alertThreshold) {\n        this.triggerAlert('HIGH_INCONSISTENCY', {\n          count: checkResult.inconsistencies.length,\n          threshold: this.options.alertThreshold\n        });\n      }\n      \n      // 自动修复\n      if (this.options.autoRepair && checkResult.inconsistencies.length > 0) {\n        const repairResult = await this.repairer.repairInconsistencies(\n          checkResult.inconsistencies.slice(0, this.options.maxAutoRepairs)\n        );\n        \n        this.metrics.repairsAttempted += repairResult.total;\n        this.metrics.repairsSucceeded += repairResult.repaired;\n        \n        if (repairResult.failed > 0) {\n          this.triggerAlert('REPAIR_FAILURES', {\n            failed: repairResult.failed,\n            errors: repairResult.errors\n          });\n        }\n      }\n      \n    } catch (error) {\n      console.error('一致性检查失败:', error);\n      this.triggerAlert('CHECK_FAILURE', { error: error.message });\n    }\n  }\n  \n  // 触发告警\n  triggerAlert(type, data) {\n    const alert = {\n      type,\n      data,\n      timestamp: new Date(),\n      severity: this.getAlertSeverity(type)\n    };\n    \n    this.alerts.push(alert);\n    console.warn('一致性告警:', alert);\n    \n    // 保持最近50条告警\n    if (this.alerts.length > 50) {\n      this.alerts.shift();\n    }\n    \n    // 发送通知(邮件、Slack等)\n    this.sendNotification(alert);\n  }\n  \n  getAlertSeverity(type) {\n    const severityMap = {\n      'HIGH_INCONSISTENCY': 'warning',\n      'REPAIR_FAILURES': 'error',\n      'CHECK_FAILURE': 'critical'\n    };\n    return severityMap[type] || 'info';\n  }\n  \n  async sendNotification(alert) {\n    // 实现通知发送逻辑\n    console.log('发送一致性告警通知:', alert.type);\n  }\n  \n  // 获取监控报告\n  getMonitoringReport() {\n    const consistencyRate = this.metrics.checksPerformed > 0 ? \n      ((this.metrics.checksPerformed * 100 - this.metrics.inconsistenciesFound) / (this.metrics.checksPerformed * 100)) * 100 : 100;\n    \n    const repairSuccessRate = this.metrics.repairsAttempted > 0 ?\n      (this.metrics.repairsSucceeded / this.metrics.repairsAttempted) * 100 : 0;\n    \n    return {\n      metrics: this.metrics,\n      consistencyRate: consistencyRate.toFixed(2) + '%',\n      repairSuccessRate: repairSuccessRate.toFixed(2) + '%',\n      recentAlerts: this.alerts.slice(-10),\n      status: this.getOverallStatus()\n    };\n  }\n  \n  getOverallStatus() {\n    if (this.metrics.lastInconsistencyCount > this.options.alertThreshold) {\n      return 'critical';\n    }\n    \n    if (this.metrics.lastInconsistencyCount > 0) {\n      return 'warning';\n    }\n    \n    return 'healthy';\n  }\n}\n```\n\n## 总结\n\n缓存一致性管理的关键要点:\n\n1. **一致性级别选择**:根据业务需求选择强一致性或最终一致性\n2. **实现策略**:版本号、时间戳、事件驱动等机制\n3. **检测机制**:定期检查缓存与数据库的一致性\n4. **自动修复**:建立自动化的不一致修复流程\n5. **监控告警**:实时监控一致性状态并及时告警\n6. **性能平衡**:在一致性和性能之间找到合适的平衡点\n\n选择合适的一致性策略需要考虑业务特性、性能要求、数据重要性等因素。