缓存一致性
概述
缓存一致性是分布式系统中的核心挑战之一,指的是确保缓存中的数据与数据库中的数据保持同步。不同的一致性级别和实现策略适用于不同的业务场景,需要在性能、可用性和一致性之间做出权衡。
一致性级别
1. 强一致性
javascript
// 强一致性缓存实现
class StrongConsistencyCache {
constructor(cache, database) {
this.cache = cache;
this.database = database;
this.lockManager = new Map(); // 分布式锁管理
}
async get(key) {
// 获取读锁
const lock = await this.acquireReadLock(key);
try {
// 先从缓存读取
let data = await this.cache.get(key);
if (data === null) {
// 缓存未命中,从数据库读取
data = await this.database.get(key);
if (data !== null) {
// 写入缓存
await this.cache.set(key, data, 3600);
}
}
return data;
} finally {
await this.releaseReadLock(key, lock);
}
}
async set(key, data) {
// 获取写锁
const lock = await this.acquireWriteLock(key);
try {
// 先更新数据库
await this.database.set(key, data);
// 然后更新缓存
await this.cache.set(key, data, 3600);
return data;
} finally {
await this.releaseWriteLock(key, lock);
}
}
async delete(key) {
const lock = await this.acquireWriteLock(key);
try {
// 先删除数据库记录
await this.database.delete(key);
// 然后删除缓存
await this.cache.delete(key);
} finally {
await this.releaseWriteLock(key, lock);
}
}
async acquireReadLock(key) {
const lockKey = `read_lock:${key}`;\n const lockValue = `${Date.now()}_${Math.random()}`;\n \n // 简化的读锁实现(实际应该使用Redis等分布式锁)\n const acquired = await this.cache.set(lockKey, lockValue, 10000, 'NX');\n \n if (!acquired) {\n // 等待并重试\n await new Promise(resolve => setTimeout(resolve, 10));\n return await this.acquireReadLock(key);\n }\n \n return { lockKey, lockValue };\n }\n \n async acquireWriteLock(key) {\n const lockKey = `write_lock:${key}`;\n const lockValue = `${Date.now()}_${Math.random()}`;\n \n const acquired = await this.cache.set(lockKey, lockValue, 10000, 'NX');\n \n if (!acquired) {\n await new Promise(resolve => setTimeout(resolve, 10));\n return await this.acquireWriteLock(key);\n }\n \n return { lockKey, lockValue };\n }\n \n async releaseReadLock(key, lock) {\n await this.cache.delete(lock.lockKey);\n }\n \n async releaseWriteLock(key, lock) {\n await this.cache.delete(lock.lockKey);\n }\n}\n```\n\n### 2. 最终一致性\n```javascript\n// 最终一致性缓存实现\nclass EventualConsistencyCache {\n constructor(cache, database, eventBus) {\n this.cache = cache;\n this.database = database;\n this.eventBus = eventBus;\n this.inconsistencyWindow = 5000; // 5秒不一致窗口\n \n this.setupEventHandlers();\n }\n \n setupEventHandlers() {\n this.eventBus.on('data.changed', async (event) => {\n await this.handleDataChange(event);\n });\n \n this.eventBus.on('cache.invalidate', async (event) => {\n await this.handleCacheInvalidation(event);\n });\n }\n \n async get(key) {\n // 直接从缓存读取(可能不是最新数据)\n let data = await this.cache.get(key);\n \n if (data === null) {\n // 缓存未命中,从数据库读取\n data = await this.database.get(key);\n \n if (data !== null) {\n // 异步写入缓存\n this.cache.set(key, data, 3600).catch(error => {\n console.error('缓存写入失败:', error);\n });\n }\n }\n \n return data;\n }\n \n async set(key, data) {\n // 立即更新数据库\n await this.database.set(key, data);\n \n // 发布数据变更事件\n this.eventBus.emit('data.changed', {\n key,\n data,\n timestamp: Date.now(),\n operation: 'set'\n });\n \n return data;\n }\n \n async handleDataChange(event) {\n // 延迟更新缓存,允许短暂的不一致\n setTimeout(async () => {\n try {\n if (event.operation === 'set') {\n await this.cache.set(event.key, event.data, 3600);\n } else if (event.operation === 'delete') {\n await this.cache.delete(event.key);\n }\n \n console.log(`缓存已同步: ${event.key}`);\n } catch (error) {\n console.error('缓存同步失败:', error);\n // 可以实现重试机制\n }\n }, Math.random() * this.inconsistencyWindow);\n }\n \n async handleCacheInvalidation(event) {\n // 处理缓存失效\n if (event.pattern) {\n // 模式匹配失效\n const keys = await this.cache.keys(event.pattern);\n for (const key of keys) {\n await this.cache.delete(key);\n }\n } else if (event.keys) {\n // 批量失效\n for (const key of event.keys) {\n await this.cache.delete(key);\n }\n }\n }\n}\n```\n\n## 一致性实现策略\n\n### 1. 版本号机制\n```javascript\n// 基于版本号的一致性控制\nclass VersionBasedConsistency {\n constructor(cache, database) {\n this.cache = cache;\n this.database = database;\n }\n \n async get(key) {\n // 获取缓存数据和版本\n const [cachedData, cachedVersion] = await Promise.all([\n this.cache.get(`data:${key}`),\n this.cache.get(`version:${key}`)\n ]);\n \n if (cachedData && cachedVersion) {\n // 验证版本是否最新\n const currentVersion = await this.database.getVersion(key);\n \n if (cachedVersion === currentVersion) {\n return cachedData;\n }\n \n // 版本不匹配,清理旧缓存\n await this.invalidateCache(key);\n }\n \n // 从数据库加载最新数据\n const [data, version] = await Promise.all([\n this.database.get(key),\n this.database.getVersion(key)\n ]);\n \n if (data) {\n // 缓存数据和版本\n await Promise.all([\n this.cache.set(`data:${key}`, data, 3600),\n this.cache.set(`version:${key}`, version, 3600)\n ]);\n }\n \n return data;\n }\n \n async set(key, data) {\n // 生成新版本号\n const newVersion = this.generateVersion();\n \n // 使用事务更新数据库\n await this.database.transaction(async (tx) => {\n await tx.set(key, data);\n await tx.setVersion(key, newVersion);\n });\n \n // 更新缓存\n await Promise.all([\n this.cache.set(`data:${key}`, data, 3600),\n this.cache.set(`version:${key}`, newVersion, 3600)\n ]);\n \n return data;\n }\n \n generateVersion() {\n return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;\n }\n \n async invalidateCache(key) {\n await Promise.all([\n this.cache.delete(`data:${key}`),\n this.cache.delete(`version:${key}`)\n ]);\n }\n}\n```\n\n### 2. 时间戳机制\n```javascript\n// 基于时间戳的一致性控制\nclass TimestampBasedConsistency {\n constructor(cache, database, options = {}) {\n this.cache = cache;\n this.database = database;\n this.maxAge = options.maxAge || 300000; // 5分钟最大缓存时间\n this.staleWhileRevalidate = options.staleWhileRevalidate || 60000; // 1分钟过期后仍可使用\n }\n \n async get(key) {\n const cached = await this.cache.get(key);\n \n if (cached) {\n const age = Date.now() - cached.timestamp;\n \n if (age < this.maxAge) {\n // 数据新鲜,直接返回\n return cached.data;\n } else if (age < this.maxAge + this.staleWhileRevalidate) {\n // 数据过期但在容忍范围内,异步更新\n this.revalidateInBackground(key);\n return cached.data;\n }\n }\n \n // 数据过期或不存在,同步加载\n return await this.loadAndCache(key);\n }\n \n async set(key, data) {\n const timestamp = Date.now();\n \n // 更新数据库\n await this.database.set(key, data, timestamp);\n \n // 更新缓存\n await this.cache.set(key, {\n data,\n timestamp\n }, this.maxAge + this.staleWhileRevalidate);\n \n return data;\n }\n \n async loadAndCache(key) {\n const data = await this.database.get(key);\n \n if (data) {\n const cached = {\n data: data.value,\n timestamp: data.timestamp || Date.now()\n };\n \n await this.cache.set(key, cached, this.maxAge + this.staleWhileRevalidate);\n return cached.data;\n }\n \n return null;\n }\n \n async revalidateInBackground(key) {\n try {\n await this.loadAndCache(key);\n console.log(`后台重新验证完成: ${key}`);\n } catch (error) {\n console.error(`后台重新验证失败: ${key}`, error);\n }\n }\n}\n```\n\n### 3. 事件驱动同步\n```javascript\n// 事件驱动的缓存同步\nclass EventDrivenSync {\n constructor(cache, database, eventBus) {\n this.cache = cache;\n this.database = database;\n this.eventBus = eventBus;\n this.syncQueue = [];\n this.isProcessing = false;\n \n this.setupEventHandlers();\n this.startSyncProcessor();\n }\n \n setupEventHandlers() {\n // 监听数据变更事件\n this.eventBus.on('db.insert', (event) => this.queueSync(event));\n this.eventBus.on('db.update', (event) => this.queueSync(event));\n this.eventBus.on('db.delete', (event) => this.queueSync(event));\n \n // 监听缓存失效事件\n this.eventBus.on('cache.invalidate', (event) => this.handleInvalidation(event));\n }\n \n async get(key) {\n return await this.cache.get(key);\n }\n \n async set(key, data) {\n // 更新数据库\n await this.database.set(key, data);\n \n // 发布变更事件\n this.eventBus.emit('db.update', {\n key,\n data,\n timestamp: Date.now(),\n source: 'api'\n });\n \n return data;\n }\n \n queueSync(event) {\n this.syncQueue.push({\n ...event,\n queuedAt: Date.now()\n });\n }\n \n startSyncProcessor() {\n setInterval(async () => {\n if (!this.isProcessing && this.syncQueue.length > 0) {\n await this.processSyncQueue();\n }\n }, 100); // 每100ms处理一次\n }\n \n async processSyncQueue() {\n this.isProcessing = true;\n \n try {\n const batch = this.syncQueue.splice(0, 10); // 批量处理\n \n for (const event of batch) {\n await this.syncCacheFromEvent(event);\n }\n } catch (error) {\n console.error('同步处理失败:', error);\n } finally {\n this.isProcessing = false;\n }\n }\n \n async syncCacheFromEvent(event) {\n try {\n switch (event.type) {\n case 'db.insert':\n case 'db.update':\n await this.cache.set(event.key, event.data, 3600);\n break;\n case 'db.delete':\n await this.cache.delete(event.key);\n break;\n }\n \n console.log(`缓存同步完成: ${event.key}`);\n } catch (error) {\n console.error(`缓存同步失败: ${event.key}`, error);\n \n // 重试机制\n if (event.retries < 3) {\n this.syncQueue.push({\n ...event,\n retries: (event.retries || 0) + 1\n });\n }\n }\n }\n \n async handleInvalidation(event) {\n if (event.keys) {\n for (const key of event.keys) {\n await this.cache.delete(key);\n }\n } else if (event.pattern) {\n const keys = await this.cache.keys(event.pattern);\n for (const key of keys) {\n await this.cache.delete(key);\n }\n }\n }\n}\n```\n\n## 一致性检测和修复\n\n### 1. 一致性检测\n```javascript\n// 缓存一致性检测器\nclass ConsistencyChecker {\n constructor(cache, database) {\n this.cache = cache;\n this.database = database;\n this.inconsistencies = [];\n }\n \n // 检查单个键的一致性\n async checkKey(key) {\n const [cachedData, dbData] = await Promise.all([\n this.cache.get(key),\n this.database.get(key)\n ]);\n \n const isConsistent = this.compareData(cachedData, dbData);\n \n if (!isConsistent) {\n const inconsistency = {\n key,\n cachedData,\n dbData,\n detectedAt: new Date(),\n type: this.getInconsistencyType(cachedData, dbData)\n };\n \n this.inconsistencies.push(inconsistency);\n return inconsistency;\n }\n \n return null;\n }\n \n // 批量检查一致性\n async checkKeys(keys) {\n const results = [];\n const batchSize = 50;\n \n for (let i = 0; i < keys.length; i += batchSize) {\n const batch = keys.slice(i, i + batchSize);\n const batchResults = await Promise.all(\n batch.map(key => this.checkKey(key))\n );\n \n results.push(...batchResults.filter(Boolean));\n }\n \n return results;\n }\n \n // 全量一致性检查\n async fullConsistencyCheck() {\n console.log('开始全量一致性检查...');\n const startTime = Date.now();\n \n // 获取所有缓存键\n const cacheKeys = await this.cache.keys('*');\n \n // 检查缓存中的键\n const cacheInconsistencies = await this.checkKeys(cacheKeys);\n \n // 检查数据库中存在但缓存中不存在的数据\n const dbKeys = await this.database.getAllKeys();\n const missingInCache = dbKeys.filter(key => !cacheKeys.includes(key));\n \n const missingInconsistencies = missingInCache.map(key => ({\n key,\n cachedData: null,\n dbData: 'exists',\n detectedAt: new Date(),\n type: 'missing_in_cache'\n }));\n \n const allInconsistencies = [...cacheInconsistencies, ...missingInconsistencies];\n \n const duration = Date.now() - startTime;\n console.log(`一致性检查完成,耗时${duration}ms,发现${allInconsistencies.length}个不一致`);\n \n return {\n totalChecked: cacheKeys.length + missingInCache.length,\n inconsistencies: allInconsistencies,\n duration,\n checkTime: new Date()\n };\n }\n \n compareData(cachedData, dbData) {\n if (cachedData === null && dbData === null) return true;\n if (cachedData === null || dbData === null) return false;\n \n // 深度比较(简化实现)\n return JSON.stringify(cachedData) === JSON.stringify(dbData);\n }\n \n getInconsistencyType(cachedData, dbData) {\n if (cachedData === null && dbData !== null) return 'missing_in_cache';\n if (cachedData !== null && dbData === null) return 'extra_in_cache';\n return 'data_mismatch';\n }\n \n // 获取一致性报告\n getConsistencyReport() {\n const typeCount = {};\n this.inconsistencies.forEach(inc => {\n typeCount[inc.type] = (typeCount[inc.type] || 0) + 1;\n });\n \n return {\n totalInconsistencies: this.inconsistencies.length,\n byType: typeCount,\n recent: this.inconsistencies.slice(-10),\n oldestInconsistency: this.inconsistencies[0]?.detectedAt,\n newestInconsistency: this.inconsistencies[this.inconsistencies.length - 1]?.detectedAt\n };\n }\n}\n```\n\n### 2. 自动修复\n```javascript\n// 缓存一致性自动修复器\nclass ConsistencyRepairer {\n constructor(cache, database, checker) {\n this.cache = cache;\n this.database = database;\n this.checker = checker;\n this.repairStrategies = new Map();\n \n this.initializeRepairStrategies();\n }\n \n initializeRepairStrategies() {\n this.repairStrategies.set('missing_in_cache', this.repairMissingInCache.bind(this));\n this.repairStrategies.set('extra_in_cache', this.repairExtraInCache.bind(this));\n this.repairStrategies.set('data_mismatch', this.repairDataMismatch.bind(this));\n }\n \n // 修复单个不一致\n async repairInconsistency(inconsistency) {\n const strategy = this.repairStrategies.get(inconsistency.type);\n \n if (!strategy) {\n throw new Error(`未知的不一致类型: ${inconsistency.type}`);\n }\n \n try {\n await strategy(inconsistency);\n console.log(`修复完成: ${inconsistency.key} (${inconsistency.type})`);\n return true;\n } catch (error) {\n console.error(`修复失败: ${inconsistency.key}`, error);\n return false;\n }\n }\n \n // 批量修复\n async repairInconsistencies(inconsistencies) {\n const results = {\n total: inconsistencies.length,\n repaired: 0,\n failed: 0,\n errors: []\n };\n \n for (const inconsistency of inconsistencies) {\n try {\n const success = await this.repairInconsistency(inconsistency);\n if (success) {\n results.repaired++;\n } else {\n results.failed++;\n }\n } catch (error) {\n results.failed++;\n results.errors.push({\n key: inconsistency.key,\n error: error.message\n });\n }\n }\n \n return results;\n }\n \n // 修复缓存中缺失的数据\n async repairMissingInCache(inconsistency) {\n const dbData = await this.database.get(inconsistency.key);\n if (dbData !== null) {\n await this.cache.set(inconsistency.key, dbData, 3600);\n }\n }\n \n // 修复缓存中多余的数据\n async repairExtraInCache(inconsistency) {\n await this.cache.delete(inconsistency.key);\n }\n \n // 修复数据不匹配\n async repairDataMismatch(inconsistency) {\n // 默认以数据库为准\n const dbData = await this.database.get(inconsistency.key);\n \n if (dbData !== null) {\n await this.cache.set(inconsistency.key, dbData, 3600);\n } else {\n await this.cache.delete(inconsistency.key);\n }\n }\n \n // 自动修复流程\n async autoRepair(options = {}) {\n console.log('开始自动一致性修复...');\n \n // 1. 执行一致性检查\n const checkResult = await this.checker.fullConsistencyCheck();\n \n if (checkResult.inconsistencies.length === 0) {\n console.log('未发现一致性问题');\n return { repaired: 0, failed: 0 };\n }\n \n // 2. 过滤需要修复的不一致\n let toRepair = checkResult.inconsistencies;\n \n if (options.maxRepairs) {\n toRepair = toRepair.slice(0, options.maxRepairs);\n }\n \n if (options.repairTypes) {\n toRepair = toRepair.filter(inc => options.repairTypes.includes(inc.type));\n }\n \n // 3. 执行修复\n const repairResult = await this.repairInconsistencies(toRepair);\n \n console.log(`自动修复完成: 修复${repairResult.repaired}个,失败${repairResult.failed}个`);\n \n return repairResult;\n }\n}\n```\n\n## 监控和告警\n\n### 1. 一致性监控\n```javascript\n// 一致性监控系统\nclass ConsistencyMonitor {\n constructor(checker, repairer, options = {}) {\n this.checker = checker;\n this.repairer = repairer;\n \n this.options = {\n checkInterval: options.checkInterval || 300000, // 5分钟\n alertThreshold: options.alertThreshold || 10,\n autoRepair: options.autoRepair || false,\n maxAutoRepairs: options.maxAutoRepairs || 100\n };\n \n this.metrics = {\n checksPerformed: 0,\n inconsistenciesFound: 0,\n repairsAttempted: 0,\n repairsSucceeded: 0,\n lastCheckTime: null,\n lastInconsistencyCount: 0\n };\n \n this.alerts = [];\n }\n \n // 启动监控\n startMonitoring() {\n console.log('启动一致性监控...');\n \n this.monitorInterval = setInterval(async () => {\n await this.performCheck();\n }, this.options.checkInterval);\n }\n \n // 停止监控\n stopMonitoring() {\n if (this.monitorInterval) {\n clearInterval(this.monitorInterval);\n console.log('一致性监控已停止');\n }\n }\n \n // 执行检查\n async performCheck() {\n try {\n const checkResult = await this.checker.fullConsistencyCheck();\n \n this.metrics.checksPerformed++;\n this.metrics.inconsistenciesFound += checkResult.inconsistencies.length;\n this.metrics.lastCheckTime = new Date();\n this.metrics.lastInconsistencyCount = checkResult.inconsistencies.length;\n \n // 检查是否需要告警\n if (checkResult.inconsistencies.length > this.options.alertThreshold) {\n this.triggerAlert('HIGH_INCONSISTENCY', {\n count: checkResult.inconsistencies.length,\n threshold: this.options.alertThreshold\n });\n }\n \n // 自动修复\n if (this.options.autoRepair && checkResult.inconsistencies.length > 0) {\n const repairResult = await this.repairer.repairInconsistencies(\n checkResult.inconsistencies.slice(0, this.options.maxAutoRepairs)\n );\n \n this.metrics.repairsAttempted += repairResult.total;\n this.metrics.repairsSucceeded += repairResult.repaired;\n \n if (repairResult.failed > 0) {\n this.triggerAlert('REPAIR_FAILURES', {\n failed: repairResult.failed,\n errors: repairResult.errors\n });\n }\n }\n \n } catch (error) {\n console.error('一致性检查失败:', error);\n this.triggerAlert('CHECK_FAILURE', { error: error.message });\n }\n }\n \n // 触发告警\n triggerAlert(type, data) {\n const alert = {\n type,\n data,\n timestamp: new Date(),\n severity: this.getAlertSeverity(type)\n };\n \n this.alerts.push(alert);\n console.warn('一致性告警:', alert);\n \n // 保持最近50条告警\n if (this.alerts.length > 50) {\n this.alerts.shift();\n }\n \n // 发送通知(邮件、Slack等)\n this.sendNotification(alert);\n }\n \n getAlertSeverity(type) {\n const severityMap = {\n 'HIGH_INCONSISTENCY': 'warning',\n 'REPAIR_FAILURES': 'error',\n 'CHECK_FAILURE': 'critical'\n };\n return severityMap[type] || 'info';\n }\n \n async sendNotification(alert) {\n // 实现通知发送逻辑\n console.log('发送一致性告警通知:', alert.type);\n }\n \n // 获取监控报告\n getMonitoringReport() {\n const consistencyRate = this.metrics.checksPerformed > 0 ? \n ((this.metrics.checksPerformed * 100 - this.metrics.inconsistenciesFound) / (this.metrics.checksPerformed * 100)) * 100 : 100;\n \n const repairSuccessRate = this.metrics.repairsAttempted > 0 ?\n (this.metrics.repairsSucceeded / this.metrics.repairsAttempted) * 100 : 0;\n \n return {\n metrics: this.metrics,\n consistencyRate: consistencyRate.toFixed(2) + '%',\n repairSuccessRate: repairSuccessRate.toFixed(2) + '%',\n recentAlerts: this.alerts.slice(-10),\n status: this.getOverallStatus()\n };\n }\n \n getOverallStatus() {\n if (this.metrics.lastInconsistencyCount > this.options.alertThreshold) {\n return 'critical';\n }\n \n if (this.metrics.lastInconsistencyCount > 0) {\n return 'warning';\n }\n \n return 'healthy';\n }\n}\n```\n\n## 总结\n\n缓存一致性管理的关键要点:\n\n1. **一致性级别选择**:根据业务需求选择强一致性或最终一致性\n2. **实现策略**:版本号、时间戳、事件驱动等机制\n3. **检测机制**:定期检查缓存与数据库的一致性\n4. **自动修复**:建立自动化的不一致修复流程\n5. **监控告警**:实时监控一致性状态并及时告警\n6. **性能平衡**:在一致性和性能之间找到合适的平衡点\n\n选择合适的一致性策略需要考虑业务特性、性能要求、数据重要性等因素。