数据同步
概述
数据同步是确保多个数据库实例之间数据一致性的过程。在分布式系统、主从复制、跨地域部署等场景中,数据同步是保证系统可靠性和一致性的关键技术。
同步模式
1. 实时同步
javascript
// 实时数据同步管理器
class RealTimeSynchronizer {
constructor(sourceDB, targetDBs, options = {}) {
this.sourceDB = sourceDB;
this.targetDBs = Array.isArray(targetDBs) ? targetDBs : [targetDBs];
this.options = {
batchSize: options.batchSize || 100,
maxRetries: options.maxRetries || 3,
retryDelay: options.retryDelay || 1000,
enableConflictResolution: options.enableConflictResolution || false
};
this.changeLog = [];
this.isRunning = false;
this.syncQueue = [];
this.conflictResolver = new ConflictResolver();
}
// 开始实时同步
async startRealTimeSync() {
if (this.isRunning) {
console.log('实时同步已在运行');
return;
}
this.isRunning = true;
console.log('开始实时数据同步');
// 监听源数据库变更
await this.setupChangeCapture();
// 启动同步处理器
this.startSyncProcessor();
}
// 停止实时同步
async stopRealTimeSync() {
this.isRunning = false;
console.log('停止实时数据同步');
}
// 设置变更捕获
async setupChangeCapture() {
// 使用数据库的变更数据捕获功能(CDC)
// 这里使用一个简化的实现
// MySQL binlog 监听示例
if (this.sourceDB.type === 'mysql') {
await this.setupMySQLBinlogCapture();
}
// PostgreSQL logical replication 示例
else if (this.sourceDB.type === 'postgresql') {
await this.setupPostgreSQLLogicalReplication();
}
// 通用轮询方式
else {
await this.setupPollingCapture();
}
}
async setupMySQLBinlogCapture() {
const MySQLEvents = require('@rodrigogs/mysql-events');
const instance = new MySQLEvents({
host: this.sourceDB.config.host,
user: this.sourceDB.config.user,
password: this.sourceDB.config.password
}, {
startAtEnd: true,
excludedSchemas: {
mysql: true,
information_schema: true,
performance_schema: true,
sys: true
}
});
instance.addTrigger({
name: 'realtime_sync',
expression: '*',
statement: MySQLEvents.STATEMENTS.ALL,
onEvent: (event) => {
this.handleChangeEvent(event);
}
});
await instance.start();
console.log('MySQL Binlog 监听已启动');
}
async setupPostgreSQLLogicalReplication() {
// PostgreSQL logical replication 实现
const { Client } = require('pg');
const client = new Client(this.sourceDB.config);
await client.connect();
// 创建复制槽
await client.query(`
SELECT * FROM pg_create_logical_replication_slot('sync_slot', 'pgoutput')
`);
// 监听变更
const replicationClient = new Client({
...this.sourceDB.config,
replication: 'database'
});
await replicationClient.connect();
replicationClient.on('replicationMessage', (message) => {
this.handlePostgreSQLChange(message);
});
console.log('PostgreSQL 逻辑复制已启动');
}
async setupPollingCapture() {
// 通用轮询方式
setInterval(async () => {
if (!this.isRunning) return;
try {
const changes = await this.pollForChanges();
changes.forEach(change => this.handleChangeEvent(change));
} catch (error) {
console.error('轮询变更失败:', error);
}
}, 1000); // 每秒轮询一次
}
async pollForChanges() {
// 检查变更日志表
const lastSyncTime = this.getLastSyncTime();
const changes = await this.sourceDB.query(`
SELECT * FROM change_log
WHERE created_at > ?
ORDER BY created_at ASC
LIMIT ?
`, [lastSyncTime, this.options.batchSize]);
return changes;
}
// 处理变更事件
handleChangeEvent(event) {
const changeRecord = {
id: this.generateChangeId(),
timestamp: new Date(),
operation: event.type || event.operation,
table: event.table,
data: event.after || event.data,
oldData: event.before,
binlogPosition: event.binlogPosition,
retries: 0
};
this.syncQueue.push(changeRecord);
this.changeLog.push(changeRecord);
// 保持变更日志大小
if (this.changeLog.length > 10000) {
this.changeLog.shift();
}
}
handlePostgreSQLChange(message) {
// 解析PostgreSQL复制消息
const changeRecord = this.parsePostgreSQLMessage(message);
this.handleChangeEvent(changeRecord);
}
// 启动同步处理器
startSyncProcessor() {
const processSync = async () => {
while (this.isRunning) {
if (this.syncQueue.length > 0) {
const batch = this.syncQueue.splice(0, this.options.batchSize);
await this.processSyncBatch(batch);
}
await new Promise(resolve => setTimeout(resolve, 100));
}
};
processSync().catch(error => {
console.error('同步处理器错误:', error);
});
}
// 处理同步批次
async processSyncBatch(batch) {
const results = [];
for (const change of batch) {
try {
const result = await this.syncChangeToTargets(change);
results.push({ change, result, success: true });
} catch (error) {
console.error(`同步失败 ${change.id}:`, error);
change.retries++;
if (change.retries < this.options.maxRetries) {
// 重新加入队列
setTimeout(() => {
this.syncQueue.push(change);
}, this.options.retryDelay * change.retries);
} else {
console.error(`同步放弃 ${change.id}: 超过最大重试次数`);
}
results.push({ change, error: error.message, success: false });
}
}
return results;
}
// 同步变更到目标数据库
async syncChangeToTargets(change) {
const promises = this.targetDBs.map(targetDB =>
this.syncChangeToTarget(change, targetDB)
);
const results = await Promise.allSettled(promises);
// 检查是否有失败
const failures = results.filter(result => result.status === 'rejected');
if (failures.length > 0) {
throw new Error(`${failures.length}个目标数据库同步失败`);
}
return results;
}
// 同步变更到单个目标数据库
async syncChangeToTarget(change, targetDB) {
switch (change.operation.toLowerCase()) {
case 'insert':
return await this.handleInsert(change, targetDB);
case 'update':
return await this.handleUpdate(change, targetDB);
case 'delete':
return await this.handleDelete(change, targetDB);
default:
throw new Error(`不支持的操作类型: ${change.operation}`);
}
}
async handleInsert(change, targetDB) {
const columns = Object.keys(change.data);
const values = Object.values(change.data);
const placeholders = columns.map(() => '?').join(', ');
const sql = `INSERT INTO ${change.table} (${columns.join(', ')}) VALUES (${placeholders})`;
try {
await targetDB.execute(sql, values);
} catch (error) {
// 处理重复键冲突
if (error.code === 'ER_DUP_ENTRY' || error.code === '23505') {
if (this.options.enableConflictResolution) {
return await this.resolveInsertConflict(change, targetDB, error);
}
}
throw error;
}
}
async handleUpdate(change, targetDB) {
const setClauses = Object.keys(change.data)
.map(key => `${key} = ?`)
.join(', ');
const whereClause = this.buildWhereClause(change.oldData || change.data);
const sql = `UPDATE ${change.table} SET ${setClauses} WHERE ${whereClause.clause}`;
const values = [...Object.values(change.data), ...whereClause.values];
const result = await targetDB.execute(sql, values);
if (result.affectedRows === 0) {
// 没有影响任何行,可能是数据不存在
if (this.options.enableConflictResolution) {
return await this.resolveUpdateConflict(change, targetDB);
}
}
return result;
}
async handleDelete(change, targetDB) {
const whereClause = this.buildWhereClause(change.oldData || change.data);
const sql = `DELETE FROM ${change.table} WHERE ${whereClause.clause}`;
return await targetDB.execute(sql, whereClause.values);
}
buildWhereClause(data) {
const conditions = [];
const values = [];
Object.entries(data).forEach(([key, value]) => {
if (value !== null) {
conditions.push(`${key} = ?`);
values.push(value);
} else {
conditions.push(`${key} IS NULL`);
}
});
return {
clause: conditions.join(' AND '),
values: values
};
}
// 冲突解决
async resolveInsertConflict(change, targetDB, error) {
return await this.conflictResolver.resolveInsertConflict(change, targetDB, error);
}
async resolveUpdateConflict(change, targetDB) {
return await this.conflictResolver.resolveUpdateConflict(change, targetDB);
}
// 工具方法
generateChangeId() {
return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
getLastSyncTime() {
// 从持久化存储获取最后同步时间
return new Date(Date.now() - 60000); // 默认1分钟前
}
parsePostgreSQLMessage(message) {
// 解析PostgreSQL复制消息的实现
return {
operation: message.tag,
table: message.relation,
data: message.new,
oldData: message.old
};
}
// 获取同步统计
getSyncStats() {
const now = Date.now();
const oneHourAgo = now - 3600000;
const recentChanges = this.changeLog.filter(change =>
change.timestamp.getTime() > oneHourAgo
);
return {
totalChanges: this.changeLog.length,
recentChanges: recentChanges.length,
queueLength: this.syncQueue.length,
isRunning: this.isRunning,
lastChangeTime: this.changeLog.length > 0 ?
this.changeLog[this.changeLog.length - 1].timestamp : null
};
}
}
2. 批量同步
javascript
// 批量数据同步管理器
class BatchSynchronizer {
constructor(sourceDB, targetDB, options = {}) {
this.sourceDB = sourceDB;
this.targetDB = targetDB;
this.options = {
batchSize: options.batchSize || 1000,
parallel: options.parallel || 4,
compareMethod: options.compareMethod || 'checksum',
syncDirection: options.syncDirection || 'source_to_target',
conflictResolution: options.conflictResolution || 'source_wins'
};
}
// 执行批量同步
async executeBatchSync() {
console.log('开始批量数据同步');
const startTime = Date.now();
try {
// 1. 获取表列表
const tables = await this.getTableList();
// 2. 同步每个表
const results = {
totalTables: tables.length,
syncedTables: 0,
totalRows: 0,
syncedRows: 0,
conflicts: 0,
errors: []
};
for (const table of tables) {
try {
const tableResult = await this.syncTable(table);
results.syncedTables++;
results.totalRows += tableResult.totalRows;
results.syncedRows += tableResult.syncedRows;
results.conflicts += tableResult.conflicts;
console.log(`表${table}同步完成: ${tableResult.syncedRows}/${tableResult.totalRows}行`);
} catch (error) {
console.error(`表${table}同步失败:`, error);
results.errors.push({
table: table,
error: error.message
});
}
}
const duration = Date.now() - startTime;
results.duration = duration;
console.log(`批量同步完成,耗时${duration}ms`);
return results;
} catch (error) {
console.error('批量同步失败:', error);
throw error;
}
}
// 同步单个表
async syncTable(tableName) {
console.log(`开始同步表: ${tableName}`);
const result = {
totalRows: 0,
syncedRows: 0,
conflicts: 0
};
// 获取表结构
const tableSchema = await this.getTableSchema(tableName);
const primaryKeys = tableSchema.primaryKeys;
if (primaryKeys.length === 0) {
throw new Error(`表${tableName}没有主键,无法同步`);
}
// 分批处理数据
let offset = 0;
while (true) {
const sourceBatch = await this.getTableBatch(this.sourceDB, tableName, offset, this.options.batchSize);
if (sourceBatch.length === 0) break;
const batchResult = await this.syncBatch(tableName, sourceBatch, primaryKeys);
result.totalRows += sourceBatch.length;
result.syncedRows += batchResult.syncedRows;
result.conflicts += batchResult.conflicts;
offset += this.options.batchSize;
}
return result;
}
// 同步批次数据
async syncBatch(tableName, sourceBatch, primaryKeys) {
const result = {
syncedRows: 0,
conflicts: 0
};
// 获取目标数据库中对应的数据
const targetBatch = await this.getTargetBatch(tableName, sourceBatch, primaryKeys);
// 比较和同步
for (const sourceRow of sourceBatch) {
const primaryKeyValues = primaryKeys.map(key => sourceRow[key]);
const targetRow = targetBatch.find(row =>
primaryKeys.every(key => row[key] === sourceRow[key])
);
if (!targetRow) {
// 目标不存在,插入
await this.insertRow(tableName, sourceRow);
result.syncedRows++;
} else {
// 比较数据
const isDifferent = await this.compareRows(sourceRow, targetRow);
if (isDifferent) {
// 处理冲突
const conflictResult = await this.resolveConflict(tableName, sourceRow, targetRow, primaryKeys);
if (conflictResult.resolved) {
result.syncedRows++;
}
result.conflicts++;
}
}
}
return result;
}
// 获取表批次数据
async getTableBatch(database, tableName, offset, limit) {
const sql = `SELECT * FROM ${tableName} LIMIT ${limit} OFFSET ${offset}`;
return await database.query(sql);
}
// 获取目标数据
async getTargetBatch(tableName, sourceBatch, primaryKeys) {
if (sourceBatch.length === 0) return [];
// 构建 IN 查询
const conditions = sourceBatch.map(row => {
const keyConditions = primaryKeys.map(key => `${key} = ?`).join(' AND ');
return `(${keyConditions})`;
}).join(' OR ');
const values = sourceBatch.flatMap(row =>
primaryKeys.map(key => row[key])
);
const sql = `SELECT * FROM ${tableName} WHERE ${conditions}`;
return await this.targetDB.query(sql, values);
}
// 比较两行数据
async compareRows(sourceRow, targetRow) {
switch (this.options.compareMethod) {
case 'checksum':
return this.compareByChecksum(sourceRow, targetRow);
case 'field_by_field':
return this.compareFieldByField(sourceRow, targetRow);
case 'timestamp':
return this.compareByTimestamp(sourceRow, targetRow);
default:
return this.compareFieldByField(sourceRow, targetRow);
}
}
compareByChecksum(sourceRow, targetRow) {
const sourceChecksum = this.calculateChecksum(sourceRow);
const targetChecksum = this.calculateChecksum(targetRow);
return sourceChecksum !== targetChecksum;
}
compareFieldByField(sourceRow, targetRow) {
const sourceKeys = Object.keys(sourceRow);
const targetKeys = Object.keys(targetRow);
if (sourceKeys.length !== targetKeys.length) return true;
return sourceKeys.some(key => sourceRow[key] !== targetRow[key]);
}
compareByTimestamp(sourceRow, targetRow) {
const sourceTimestamp = sourceRow.updated_at || sourceRow.modified_at;
const targetTimestamp = targetRow.updated_at || targetRow.modified_at;
if (!sourceTimestamp || !targetTimestamp) {
return this.compareFieldByField(sourceRow, targetRow);
}
return new Date(sourceTimestamp) > new Date(targetTimestamp);
}
calculateChecksum(row) {
const crypto = require('crypto');
const data = JSON.stringify(row, Object.keys(row).sort());
return crypto.createHash('md5').update(data).digest('hex');
}
// 插入新行
async insertRow(tableName, row) {
const columns = Object.keys(row);
const values = Object.values(row);
const placeholders = columns.map(() => '?').join(', ');
const sql = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})`;
try {
await this.targetDB.execute(sql, values);
} catch (error) {
// 处理插入失败
console.error(`插入失败 ${tableName}:`, error);
throw error;
}
}
// 解决冲突
async resolveConflict(tableName, sourceRow, targetRow, primaryKeys) {
switch (this.options.conflictResolution) {
case 'source_wins':
return await this.updateTargetRow(tableName, sourceRow, primaryKeys);
case 'target_wins':
return { resolved: false, reason: 'target_wins_policy' };
case 'timestamp':
return await this.resolveByTimestamp(tableName, sourceRow, targetRow, primaryKeys);
case 'manual':
return await this.flagForManualResolution(tableName, sourceRow, targetRow);
default:
return await this.updateTargetRow(tableName, sourceRow, primaryKeys);
}
}
async updateTargetRow(tableName, row, primaryKeys) {
const nonKeyColumns = Object.keys(row).filter(key => !primaryKeys.includes(key));
const setClauses = nonKeyColumns.map(key => `${key} = ?`).join(', ');
const whereClause = primaryKeys.map(key => `${key} = ?`).join(' AND ');
const sql = `UPDATE ${tableName} SET ${setClauses} WHERE ${whereClause}`;
const values = [
...nonKeyColumns.map(key => row[key]),
...primaryKeys.map(key => row[key])
];
try {
await this.targetDB.execute(sql, values);
return { resolved: true };
} catch (error) {
console.error(`更新失败 ${tableName}:`, error);
return { resolved: false, error: error.message };
}
}
async resolveByTimestamp(tableName, sourceRow, targetRow, primaryKeys) {
const sourceTime = sourceRow.updated_at || sourceRow.modified_at;
const targetTime = targetRow.updated_at || targetRow.modified_at;
if (sourceTime && targetTime) {
if (new Date(sourceTime) > new Date(targetTime)) {
return await this.updateTargetRow(tableName, sourceRow, primaryKeys);
} else {
return { resolved: false, reason: 'target_is_newer' };
}
}
// 没有时间戳,默认使用源数据
return await this.updateTargetRow(tableName, sourceRow, primaryKeys);
}
async flagForManualResolution(tableName, sourceRow, targetRow) {
// 将冲突记录到冲突表中
const conflictRecord = {
table_name: tableName,
source_data: JSON.stringify(sourceRow),
target_data: JSON.stringify(targetRow),
created_at: new Date(),
status: 'pending'
};
await this.targetDB.execute(`
INSERT INTO sync_conflicts (table_name, source_data, target_data, created_at, status)
VALUES (?, ?, ?, ?, ?)
`, Object.values(conflictRecord));
return { resolved: false, reason: 'flagged_for_manual_resolution' };
}
// 获取表列表
async getTableList() {
const tables = await this.sourceDB.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = DATABASE() AND table_type = 'BASE TABLE'
`);
return tables.map(t => t.table_name);
}
// 获取表结构
async getTableSchema(tableName) {
const columns = await this.sourceDB.query(`
SELECT column_name, column_key
FROM information_schema.columns
WHERE table_name = ? AND table_schema = DATABASE()
`, [tableName]);
const primaryKeys = columns
.filter(col => col.column_key === 'PRI')
.map(col => col.column_name);
return {
tableName: tableName,
columns: columns.map(col => col.column_name),
primaryKeys: primaryKeys
};
}
}
3. 双向同步
javascript
// 双向数据同步管理器
class BidirectionalSynchronizer {
constructor(database1, database2, options = {}) {
this.db1 = database1;
this.db2 = database2;
this.options = {
conflictResolution: options.conflictResolution || 'timestamp',
syncInterval: options.syncInterval || 60000, // 1分钟
enableRealTime: options.enableRealTime || false
};
this.syncHistory = [];
this.conflicts = [];
this.isRunning = false;
}
// 开始双向同步
async startBidirectionalSync() {
if (this.isRunning) {
console.log('双向同步已在运行');
return;
}
this.isRunning = true;
console.log('开始双向数据同步');
// 初始化同步表
await this.initializeSyncTables();
if (this.options.enableRealTime) {
// 实时同步
await this.setupRealTimeBidirectionalSync();
} else {
// 定时同步
this.startPeriodicSync();
}
}
// 停止双向同步
async stopBidirectionalSync() {
this.isRunning = false;
console.log('停止双向数据同步');
}
// 初始化同步表
async initializeSyncTables() {
const syncTableSQL = `
CREATE TABLE IF NOT EXISTS sync_metadata (
id VARCHAR(255) PRIMARY KEY,
table_name VARCHAR(255) NOT NULL,
record_id VARCHAR(255) NOT NULL,
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_db ENUM('db1', 'db2') NOT NULL,
checksum VARCHAR(32),
INDEX idx_table_record (table_name, record_id)
)
`;
await Promise.all([
this.db1.execute(syncTableSQL),
this.db2.execute(syncTableSQL)
]);
}
// 设置实时双向同步
async setupRealTimeBidirectionalSync() {
// 为两个数据库设置变更监听
const sync1to2 = new RealTimeSynchronizer(this.db1, [this.db2], {
enableConflictResolution: true
});
const sync2to1 = new RealTimeSynchronizer(this.db2, [this.db1], {
enableConflictResolution: true
});
// 设置冲突检测
sync1to2.onConflict = (change) => this.handleBidirectionalConflict(change, 'db1_to_db2');
sync2to1.onConflict = (change) => this.handleBidirectionalConflict(change, 'db2_to_db1');
await Promise.all([
sync1to2.startRealTimeSync(),
sync2to1.startRealTimeSync()
]);
}
// 开始定时同步
startPeriodicSync() {
const syncProcess = async () => {
while (this.isRunning) {
try {
await this.performBidirectionalSync();
} catch (error) {
console.error('双向同步错误:', error);
}
await new Promise(resolve => setTimeout(resolve, this.options.syncInterval));
}
};
syncProcess().catch(error => {
console.error('同步进程错误:', error);
});
}
// 执行双向同步
async performBidirectionalSync() {
console.log('执行双向同步...');
const tables = await this.getCommonTables();
const syncResults = {
tablesProcessed: 0,
conflictsResolved: 0,
errors: []
};
for (const table of tables) {
try {
const result = await this.syncTableBidirectionally(table);
syncResults.tablesProcessed++;
syncResults.conflictsResolved += result.conflictsResolved;
} catch (error) {
console.error(`表${table}双向同步失败:`, error);
syncResults.errors.push({
table: table,
error: error.message
});
}
}
this.syncHistory.push({
timestamp: new Date(),
results: syncResults
});
return syncResults;
}
// 双向同步单个表
async syncTableBidirectionally(tableName) {
const result = {
conflictsResolved: 0
};
// 获取两个数据库的数据
const [data1, data2] = await Promise.all([
this.getTableData(this.db1, tableName),
this.getTableData(this.db2, tableName)
]);
// 获取同步元数据
const [metadata1, metadata2] = await Promise.all([
this.getSyncMetadata(this.db1, tableName),
this.getSyncMetadata(this.db2, tableName)
]);
// 比较和同步
const conflicts = await this.compareAndSync(tableName, data1, data2, metadata1, metadata2);
result.conflictsResolved = conflicts.length;
this.conflicts.push(...conflicts);
return result;
}
// 比较和同步数据
async compareAndSync(tableName, data1, data2, metadata1, metadata2) {
const conflicts = [];
const allRecords = new Map();
// 收集所有记录
data1.forEach(record => {
const key = this.getRecordKey(record);
allRecords.set(key, { db1: record, db2: null });
});
data2.forEach(record => {
const key = this.getRecordKey(record);
if (allRecords.has(key)) {
allRecords.get(key).db2 = record;
} else {
allRecords.set(key, { db1: null, db2: record });
}
});
// 处理每个记录
for (const [key, records] of allRecords) {
const conflict = await this.resolveRecordConflict(
tableName, key, records, metadata1, metadata2
);
if (conflict) {
conflicts.push(conflict);
}
}
return conflicts;
}
// 解决记录冲突
async resolveRecordConflict(tableName, recordKey, records, metadata1, metadata2) {
const { db1: record1, db2: record2 } = records;
if (!record1 && !record2) return null;
// 只在一个数据库中存在
if (!record1) {
await this.copyRecord(record2, this.db2, this.db1, tableName);
return null;
}
if (!record2) {
await this.copyRecord(record1, this.db1, this.db2, tableName);
return null;
}
// 两个数据库都存在,检查是否一致
if (this.recordsEqual(record1, record2)) {
return null; // 数据一致,无需同步
}
// 数据不一致,需要解决冲突
const resolution = await this.resolveBidirectionalConflict(
tableName, recordKey, record1, record2, metadata1, metadata2
);
return {
table: tableName,
recordKey: recordKey,
resolution: resolution,
timestamp: new Date()
};
}
// 解决双向冲突
async resolveBidirectionalConflict(tableName, recordKey, record1, record2, metadata1, metadata2) {
const meta1 = metadata1.get(recordKey);
const meta2 = metadata2.get(recordKey);
switch (this.options.conflictResolution) {
case 'timestamp':
return await this.resolveByTimestamp(tableName, record1, record2, meta1, meta2);
case 'source_priority':
return await this.resolveBySourcePriority(tableName, record1, record2, meta1, meta2);
case 'manual':
return await this.flagForManualResolution(tableName, recordKey, record1, record2);
default:
return await this.resolveByTimestamp(tableName, record1, record2, meta1, meta2);
}
}
async resolveByTimestamp(tableName, record1, record2, meta1, meta2) {
const time1 = meta1?.last_modified || record1.updated_at || record1.created_at;
const time2 = meta2?.last_modified || record2.updated_at || record2.created_at;
if (!time1 && !time2) {
// 没有时间信息,默认使用db1的数据
await this.copyRecord(record1, this.db1, this.db2, tableName);
return 'db1_wins_no_timestamp';
}
if (new Date(time1) > new Date(time2)) {
await this.copyRecord(record1, this.db1, this.db2, tableName);
return 'db1_wins_newer';
} else {
await this.copyRecord(record2, this.db2, this.db1, tableName);
return 'db2_wins_newer';
}
}
// 复制记录
async copyRecord(record, sourceDB, targetDB, tableName) {
const columns = Object.keys(record);
const values = Object.values(record);
const placeholders = columns.map(() => '?').join(', ');
// 先尝试更新
const updateClauses = columns.map(col => `${col} = ?`).join(', ');
const primaryKey = this.getPrimaryKeyColumn(tableName);
const updateSQL = `UPDATE ${tableName} SET ${updateClauses} WHERE ${primaryKey} = ?`;
try {
const result = await targetDB.execute(updateSQL, [...values, record[primaryKey]]);
if (result.affectedRows === 0) {
// 记录不存在,插入
const insertSQL = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})`;
await targetDB.execute(insertSQL, values);
}
} catch (error) {
console.error(`复制记录失败 ${tableName}:`, error);
throw error;
}
}
// 工具方法
async getTableData(database, tableName) {
return await database.query(`SELECT * FROM ${tableName}`);
}
async getSyncMetadata(database, tableName) {
const metadata = await database.query(
`SELECT record_id, last_modified, checksum FROM sync_metadata WHERE table_name = ?`,
[tableName]
);
const metadataMap = new Map();
metadata.forEach(meta => {
metadataMap.set(meta.record_id, meta);
});
return metadataMap;
}
async getCommonTables() {
const [tables1, tables2] = await Promise.all([
this.db1.query(`SHOW TABLES`),
this.db2.query(`SHOW TABLES`)
]);
const tableNames1 = new Set(tables1.map(t => Object.values(t)[0]));
const tableNames2 = new Set(tables2.map(t => Object.values(t)[0]));
return [...tableNames1].filter(table => tableNames2.has(table));
}
getRecordKey(record) {
// 简化实现,假设使用id作为主键
return record.id?.toString() || JSON.stringify(record);
}
recordsEqual(record1, record2) {
return JSON.stringify(record1) === JSON.stringify(record2);
}
getPrimaryKeyColumn(tableName) {
// 简化实现,假设使用id作为主键
return 'id';
}
handleBidirectionalConflict(change, direction) {
console.log(`双向同步冲突: ${direction}`, change);
// 处理实时同步中的冲突
}
}
冲突解决
1. 冲突解决器
javascript
// 数据同步冲突解决器
class ConflictResolver {
constructor(options = {}) {
this.options = {
defaultStrategy: options.defaultStrategy || 'timestamp',
customResolvers: options.customResolvers || {},
logConflicts: options.logConflicts !== false
};
this.conflictLog = [];
}
// 解决插入冲突
async resolveInsertConflict(change, targetDB, error) {
const conflict = {
type: 'insert_conflict',
table: change.table,
data: change.data,
error: error.message,
timestamp: new Date()
};
this.logConflict(conflict);
// 尝试转为更新操作
try {
const primaryKey = this.extractPrimaryKey(change.data);
if (primaryKey) {
return await this.convertInsertToUpdate(change, targetDB, primaryKey);
}
} catch (updateError) {
console.error('转为更新操作失败:', updateError);
}
// 如果转为更新也失败,记录冲突
return await this.logUnresolvedConflict(conflict);
}
// 解决更新冲突
async resolveUpdateConflict(change, targetDB) {
const conflict = {
type: 'update_conflict',
table: change.table,
data: change.data,
oldData: change.oldData,
timestamp: new Date()
};
this.logConflict(conflict);
// 检查目标记录是否存在
const targetRecord = await this.findTargetRecord(change, targetDB);
if (!targetRecord) {
// 记录不存在,转为插入操作
return await this.convertUpdateToInsert(change, targetDB);
}
// 记录存在,根据策略解决冲突
return await this.resolveDataConflict(change, targetRecord, targetDB);
}
// 解决数据冲突
async resolveDataConflict(change, targetRecord, targetDB) {
const strategy = this.getResolutionStrategy(change.table);
switch (strategy) {
case 'source_wins':
return await this.applySourceWins(change, targetDB);
case 'target_wins':
return { resolved: false, reason: 'target_wins_policy' };
case 'timestamp':
return await this.resolveByTimestamp(change, targetRecord, targetDB);
case 'merge':
return await this.mergeRecords(change, targetRecord, targetDB);
case 'custom':
return await this.applyCustomResolution(change, targetRecord, targetDB);
default:
return await this.resolveByTimestamp(change, targetRecord, targetDB);
}
}
async applySourceWins(change, targetDB) {
const columns = Object.keys(change.data);
const values = Object.values(change.data);
const setClauses = columns.map(col => `${col} = ?`).join(', ');
const whereClause = this.buildWhereClause(change.oldData || change.data);
const sql = `UPDATE ${change.table} SET ${setClauses} WHERE ${whereClause.clause}`;
const allValues = [...values, ...whereClause.values];
try {
await targetDB.execute(sql, allValues);
return { resolved: true, strategy: 'source_wins' };
} catch (error) {
return { resolved: false, error: error.message };
}
}
async resolveByTimestamp(change, targetRecord, targetDB) {
const sourceTime = this.extractTimestamp(change.data);
const targetTime = this.extractTimestamp(targetRecord);
if (!sourceTime || !targetTime) {
// 没有时间戳,默认使用源数据
return await this.applySourceWins(change, targetDB);
}
if (new Date(sourceTime) > new Date(targetTime)) {
return await this.applySourceWins(change, targetDB);
} else {
return { resolved: false, reason: 'target_is_newer' };
}
}
async mergeRecords(change, targetRecord, targetDB) {
const mergedData = { ...targetRecord, ...change.data };
// 更新时间戳
mergedData.updated_at = new Date();
const columns = Object.keys(mergedData);
const values = Object.values(mergedData);
const setClauses = columns.map(col => `${col} = ?`).join(', ');
const whereClause = this.buildWhereClause(change.oldData || change.data);
const sql = `UPDATE ${change.table} SET ${setClauses} WHERE ${whereClause.clause}`;
const allValues = [...values, ...whereClause.values];
try {
await targetDB.execute(sql, allValues);
return { resolved: true, strategy: 'merge', mergedData };
} catch (error) {
return { resolved: false, error: error.message };
}
}
async applyCustomResolution(change, targetRecord, targetDB) {
const customResolver = this.options.customResolvers[change.table];
if (!customResolver) {
return await this.resolveByTimestamp(change, targetRecord, targetDB);
}
try {
return await customResolver(change, targetRecord, targetDB);
} catch (error) {
console.error('自定义解决器失败:', error);
return { resolved: false, error: error.message };
}
}
// 工具方法
async convertInsertToUpdate(change, targetDB, primaryKey) {
const columns = Object.keys(change.data).filter(col => col !== primaryKey);
const values = columns.map(col => change.data[col]);
const setClauses = columns.map(col => `${col} = ?`).join(', ');
const sql = `UPDATE ${change.table} SET ${setClauses} WHERE ${primaryKey} = ?`;
const allValues = [...values, change.data[primaryKey]];
await targetDB.execute(sql, allValues);
return { resolved: true, strategy: 'convert_to_update' };
}
async convertUpdateToInsert(change, targetDB) {
const columns = Object.keys(change.data);
const values = Object.values(change.data);
const placeholders = columns.map(() => '?').join(', ');
const sql = `INSERT INTO ${change.table} (${columns.join(', ')}) VALUES (${placeholders})`;
await targetDB.execute(sql, values);
return { resolved: true, strategy: 'convert_to_insert' };
}
async findTargetRecord(change, targetDB) {
const whereClause = this.buildWhereClause(change.oldData || change.data);
const sql = `SELECT * FROM ${change.table} WHERE ${whereClause.clause} LIMIT 1`;
const result = await targetDB.query(sql, whereClause.values);
return result.length > 0 ? result[0] : null;
}
extractPrimaryKey(data) {
// 简化实现,假设使用id作为主键
return data.id ? 'id' : null;
}
extractTimestamp(data) {
return data.updated_at || data.modified_at || data.created_at;
}
buildWhereClause(data) {
const conditions = [];
const values = [];
Object.entries(data).forEach(([key, value]) => {
if (value !== null && value !== undefined) {
conditions.push(`${key} = ?`);
values.push(value);
} else {
conditions.push(`${key} IS NULL`);
}
});
return {
clause: conditions.join(' AND '),
values: values
};
}
getResolutionStrategy(tableName) {
return this.options.customResolvers[tableName]?.strategy || this.options.defaultStrategy;
}
logConflict(conflict) {
if (this.options.logConflicts) {
this.conflictLog.push(conflict);
console.log('数据同步冲突:', conflict);
// 保持冲突日志大小
if (this.conflictLog.length > 1000) {
this.conflictLog.shift();
}
}
}
async logUnresolvedConflict(conflict) {
// 将未解决的冲突记录到数据库中
console.error('未解决的数据同步冲突:', conflict);
return { resolved: false, logged: true };
}
// 获取冲突统计
getConflictStats() {
const stats = {
total: this.conflictLog.length,
byType: {},
byTable: {},
recent: this.conflictLog.slice(-10)
};
this.conflictLog.forEach(conflict => {
stats.byType[conflict.type] = (stats.byType[conflict.type] || 0) + 1;
stats.byTable[conflict.table] = (stats.byTable[conflict.table] || 0) + 1;
});
return stats;
}
}
总结
数据同步的关键要点:
- 同步模式选择:实时、批量、双向同步适用不同场景
- 变更捕获:CDC、binlog、逻辑复制等技术
- 冲突解决:时间戳、优先级、合并等策略
- 数据一致性:校验和验证数据完整性
- 性能优化:批量处理、并行同步、网络优化
- 监控告警:实时监控同步状态和冲突情况
选择合适的数据同步策略需要考虑数据一致性要求、网络延迟、系统负载等因素。