数据库连接池
🎯 学习目标
- 理解数据库连接池的重要性和工作原理
- 掌握不同数据库连接池的配置和使用
- 学会连接池性能优化和监控
- 了解连接池的最佳实践和故障处理
📚 核心概念
连接池简介
javascript
// 连接池的核心概念
const connectionPoolConcepts = {
purpose: {
description: '连接池的作用',
benefits: [
'减少连接创建/销毁开销',
'控制并发连接数量',
'提高应用性能',
'优化资源利用'
]
},
components: {
description: '连接池组件',
parts: [
'连接管理器',
'连接队列',
'健康检查',
'监控统计'
]
},
lifecycle: {
description: '连接生命周期',
stages: [
'连接创建',
'连接获取',
'连接使用',
'连接释放',
'连接销毁'
]
}
};
console.log('连接池概念:', connectionPoolConcepts);
连接池工作原理
javascript
// 连接池工作流程模拟
class ConnectionPoolSimulator {
constructor(options = {}) {
this.minConnections = options.minConnections || 5;
this.maxConnections = options.maxConnections || 20;
this.acquireTimeoutMillis = options.acquireTimeoutMillis || 30000;
this.idleTimeoutMillis = options.idleTimeoutMillis || 300000; // 5分钟
this.connections = new Set();
this.availableConnections = [];
this.busyConnections = new Set();
this.waitingQueue = [];
this.stats = {
created: 0,
destroyed: 0,
acquired: 0,
released: 0,
timeouts: 0
};
}
// 模拟连接创建
async createConnection() {
const connection = {
id: `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
createdAt: Date.now(),
lastUsedAt: Date.now(),
usageCount: 0,
isValid: true
};
// 模拟连接创建延迟
await new Promise(resolve => setTimeout(resolve, 100));
this.connections.add(connection);
this.availableConnections.push(connection);
this.stats.created++;
console.log(`🔗 创建连接: ${connection.id}`);
return connection;
}
// 获取连接
async acquireConnection() {
return new Promise(async (resolve, reject) => {
// 检查是否有可用连接
if (this.availableConnections.length > 0) {
const connection = this.availableConnections.shift();
this.busyConnections.add(connection);
connection.lastUsedAt = Date.now();
connection.usageCount++;
this.stats.acquired++;
console.log(`📤 获取连接: ${connection.id}`);
resolve(connection);
return;
}
// 如果没有可用连接且未达到最大连接数,创建新连接
if (this.connections.size < this.maxConnections) {
try {
const connection = await this.createConnection();
this.availableConnections.pop(); // 移除刚添加的
this.busyConnections.add(connection);
connection.lastUsedAt = Date.now();
connection.usageCount++;
this.stats.acquired++;
console.log(`📤 获取新连接: ${connection.id}`);
resolve(connection);
return;
} catch (error) {
reject(error);
return;
}
}
// 连接池已满,加入等待队列
const waitingRequest = {
resolve,
reject,
timestamp: Date.now()
};
this.waitingQueue.push(waitingRequest);
console.log(`⏳ 连接请求加入等待队列,当前队列长度: ${this.waitingQueue.length}`);
// 设置超时
setTimeout(() => {
const index = this.waitingQueue.indexOf(waitingRequest);
if (index > -1) {
this.waitingQueue.splice(index, 1);
this.stats.timeouts++;
reject(new Error('连接获取超时'));
}
}, this.acquireTimeoutMillis);
});
}
// 释放连接
releaseConnection(connection) {
if (!this.busyConnections.has(connection)) {
console.warn(`⚠️ 尝试释放未在使用的连接: ${connection.id}`);
return;
}
this.busyConnections.delete(connection);
connection.lastUsedAt = Date.now();
this.stats.released++;
// 检查连接是否仍然有效
if (connection.isValid) {
// 检查是否有等待的请求
if (this.waitingQueue.length > 0) {
const waitingRequest = this.waitingQueue.shift();
this.busyConnections.add(connection);
connection.usageCount++;
this.stats.acquired++;
console.log(`📤 连接直接分配给等待请求: ${connection.id}`);
waitingRequest.resolve(connection);
} else {
this.availableConnections.push(connection);
console.log(`📥 释放连接: ${connection.id}`);
}
} else {
this.destroyConnection(connection);
}
}
// 销毁连接
destroyConnection(connection) {
this.connections.delete(connection);
this.availableConnections = this.availableConnections.filter(c => c !== connection);
this.busyConnections.delete(connection);
this.stats.destroyed++;
console.log(`💥 销毁连接: ${connection.id}`);
}
// 健康检查
async healthCheck() {
console.log('🔍 执行连接池健康检查...');
const now = Date.now();
const connectionsToDestroy = [];
// 检查空闲超时的连接
for (const connection of this.availableConnections) {
if (now - connection.lastUsedAt > this.idleTimeoutMillis) {
connectionsToDestroy.push(connection);
}
}
// 销毁超时连接
for (const connection of connectionsToDestroy) {
this.destroyConnection(connection);
}
// 确保最小连接数
while (this.connections.size < this.minConnections) {
await this.createConnection();
}
console.log(`✅ 健康检查完成,销毁 ${connectionsToDestroy.length} 个超时连接`);
}
// 获取连接池状态
getStatus() {
return {
total: this.connections.size,
available: this.availableConnections.length,
busy: this.busyConnections.size,
waiting: this.waitingQueue.length,
config: {
min: this.minConnections,
max: this.maxConnections,
acquireTimeout: this.acquireTimeoutMillis,
idleTimeout: this.idleTimeoutMillis
},
stats: { ...this.stats }
};
}
}
// 使用示例
async function demonstrateConnectionPool() {
console.log('🏊 连接池工作原理演示...\n');
const pool = new ConnectionPoolSimulator({
minConnections: 3,
maxConnections: 8,
acquireTimeoutMillis: 5000,
idleTimeoutMillis: 10000
});
// 初始化最小连接数
for (let i = 0; i < 3; i++) {
await pool.createConnection();
}
console.log('初始状态:', pool.getStatus());
// 模拟并发连接请求
const tasks = [];
for (let i = 0; i < 10; i++) {
tasks.push(
pool.acquireConnection().then(async (connection) => {
console.log(`🔧 使用连接 ${connection.id} 执行任务 ${i + 1}`);
// 模拟任务执行时间
await new Promise(resolve => setTimeout(resolve, Math.random() * 2000));
pool.releaseConnection(connection);
console.log(`✅ 任务 ${i + 1} 完成`);
}).catch(error => {
console.error(`❌ 任务 ${i + 1} 失败:`, error.message);
})
);
}
await Promise.all(tasks);
console.log('所有任务完成后状态:', pool.getStatus());
// 执行健康检查
await pool.healthCheck();
console.log('健康检查后状态:', pool.getStatus());
}
// demonstrateConnectionPool();
🔧 MySQL连接池实现
使用mysql2连接池
javascript
// mysql-connection-pool.js
const mysql = require('mysql2/promise');
class MySQLConnectionPool {
constructor(config) {
this.config = {
host: config.host || 'localhost',
port: config.port || 3306,
user: config.user || 'root',
password: config.password || '',
database: config.database,
// 连接池配置
connectionLimit: config.connectionLimit || 10,
queueLimit: config.queueLimit || 0,
acquireTimeout: config.acquireTimeout || 60000,
timeout: config.timeout || 60000,
// 重连配置
reconnect: config.reconnect !== false,
idleTimeout: config.idleTimeout || 600000, // 10分钟
// 其他配置
charset: config.charset || 'utf8mb4',
timezone: config.timezone || '+00:00',
// SSL配置
ssl: config.ssl || false
};
this.pool = null;
this.isInitialized = false;
this.stats = {
connections: {
created: 0,
destroyed: 0,
acquired: 0,
released: 0
},
queries: {
total: 0,
successful: 0,
failed: 0
},
performance: {
totalQueryTime: 0,
avgQueryTime: 0
}
};
}
// 初始化连接池
async initialize() {
if (this.isInitialized) {
return;
}
try {
console.log('🚀 初始化MySQL连接池...');
this.pool = mysql.createPool(this.config);
// 监听连接事件
this.pool.on('connection', (connection) => {
this.stats.connections.created++;
console.log(`🔗 新连接创建: ${connection.threadId}`);
});
this.pool.on('acquire', (connection) => {
this.stats.connections.acquired++;
console.log(`📤 连接获取: ${connection.threadId}`);
});
this.pool.on('release', (connection) => {
this.stats.connections.released++;
console.log(`📥 连接释放: ${connection.threadId}`);
});
this.pool.on('error', (error) => {
console.error('❌ 连接池错误:', error);
});
// 测试连接
await this.testConnection();
this.isInitialized = true;
console.log('✅ MySQL连接池初始化成功');
} catch (error) {
console.error('❌ MySQL连接池初始化失败:', error);
throw error;
}
}
// 测试连接
async testConnection() {
const connection = await this.pool.getConnection();
try {
await connection.ping();
console.log('🏓 数据库连接测试成功');
} finally {
connection.release();
}
}
// 执行查询
async query(sql, params = []) {
if (!this.isInitialized) {
throw new Error('连接池未初始化');
}
const startTime = Date.now();
try {
this.stats.queries.total++;
const [rows, fields] = await this.pool.execute(sql, params);
const queryTime = Date.now() - startTime;
this.stats.performance.totalQueryTime += queryTime;
this.stats.performance.avgQueryTime =
this.stats.performance.totalQueryTime / this.stats.queries.total;
this.stats.queries.successful++;
console.log(`✅ 查询执行成功,耗时: ${queryTime}ms`);
return { rows, fields };
} catch (error) {
this.stats.queries.failed++;
console.error('❌ 查询执行失败:', error.message);
throw error;
}
}
// 执行事务
async transaction(callback) {
const connection = await this.pool.getConnection();
try {
await connection.beginTransaction();
const result = await callback(connection);
await connection.commit();
console.log('✅ 事务提交成功');
return result;
} catch (error) {
await connection.rollback();
console.error('❌ 事务回滚:', error.message);
throw error;
} finally {
connection.release();
}
}
// 批量插入
async batchInsert(tableName, data, batchSize = 1000) {
if (!Array.isArray(data) || data.length === 0) {
throw new Error('数据必须是非空数组');
}
const fields = Object.keys(data[0]);
const placeholders = fields.map(() => '?').join(', ');
const sql = `INSERT INTO ${tableName} (${fields.join(', ')}) VALUES (${placeholders})`;
let totalInserted = 0;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
await this.transaction(async (connection) => {
for (const row of batch) {
const values = fields.map(field => row[field]);
await connection.execute(sql, values);
totalInserted++;
}
});
console.log(`📦 批量插入进度: ${totalInserted}/${data.length}`);
}
console.log(`✅ 批量插入完成,总计: ${totalInserted} 条记录`);
return totalInserted;
}
// 获取连接池状态
async getPoolStatus() {
if (!this.pool) {
return { status: 'not_initialized' };
}
// 执行状态查询
const statusQuery = `
SELECT
VARIABLE_NAME,
VARIABLE_VALUE
FROM
INFORMATION_SCHEMA.SESSION_STATUS
WHERE
VARIABLE_NAME IN (
'Threads_connected',
'Threads_running',
'Max_used_connections',
'Aborted_connects'
)
`;
try {
const { rows } = await this.query(statusQuery);
const status = {};
rows.forEach(row => {
status[row.VARIABLE_NAME.toLowerCase()] = parseInt(row.VARIABLE_VALUE);
});
return {
config: {
connectionLimit: this.config.connectionLimit,
queueLimit: this.config.queueLimit,
acquireTimeout: this.config.acquireTimeout
},
mysql_status: status,
pool_stats: this.stats,
timestamp: new Date().toISOString()
};
} catch (error) {
console.error('获取连接池状态失败:', error);
return {
error: error.message,
pool_stats: this.stats
};
}
}
// 监控连接池健康状态
startHealthMonitoring(intervalMs = 30000) {
console.log(`🔍 启动连接池健康监控 (间隔: ${intervalMs}ms)`);
this.healthMonitorInterval = setInterval(async () => {
try {
const status = await this.getPoolStatus();
console.log('📊 连接池健康状态:');
console.log(` 查询统计: 总计=${status.pool_stats.queries.total}, 成功=${status.pool_stats.queries.successful}, 失败=${status.pool_stats.queries.failed}`);
console.log(` 平均查询时间: ${status.pool_stats.performance.avgQueryTime.toFixed(2)}ms`);
if (status.mysql_status) {
console.log(` MySQL连接: 当前=${status.mysql_status.threads_connected}, 运行中=${status.mysql_status.threads_running}`);
}
// 检查异常情况
const failureRate = status.pool_stats.queries.failed / status.pool_stats.queries.total;
if (failureRate > 0.1) { // 失败率超过10%
console.warn('⚠️ 查询失败率过高:', (failureRate * 100).toFixed(2) + '%');
}
if (status.pool_stats.performance.avgQueryTime > 1000) { // 平均查询时间超过1秒
console.warn('⚠️ 平均查询时间过长:', status.pool_stats.performance.avgQueryTime.toFixed(2) + 'ms');
}
} catch (error) {
console.error('❌ 健康监控检查失败:', error.message);
}
}, intervalMs);
}
// 停止健康监控
stopHealthMonitoring() {
if (this.healthMonitorInterval) {
clearInterval(this.healthMonitorInterval);
this.healthMonitorInterval = null;
console.log('⏹️ 连接池健康监控已停止');
}
}
// 关闭连接池
async close() {
if (this.pool) {
this.stopHealthMonitoring();
console.log('🔄 正在关闭连接池...');
await this.pool.end();
this.pool = null;
this.isInitialized = false;
console.log('✅ 连接池已关闭');
}
}
}
// 使用示例
async function demonstrateMySQLPool() {
console.log('🔧 MySQL连接池演示...\n');
const pool = new MySQLConnectionPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db',
connectionLimit: 5,
acquireTimeout: 10000
});
try {
// 初始化连接池
await pool.initialize();
// 启动健康监控
pool.startHealthMonitoring(10000);
// 执行一些查询
console.log('执行基础查询...');
// 创建测试表
await pool.query(`
CREATE TABLE IF NOT EXISTS test_users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`);
// 插入测试数据
const testUsers = [
{ name: 'Alice', email: 'alice@example.com' },
{ name: 'Bob', email: 'bob@example.com' },
{ name: 'Charlie', email: 'charlie@example.com' }
];
for (const user of testUsers) {
await pool.query(
'INSERT INTO test_users (name, email) VALUES (?, ?) ON DUPLICATE KEY UPDATE name = VALUES(name)',
[user.name, user.email]
);
}
// 查询数据
const { rows } = await pool.query('SELECT * FROM test_users ORDER BY id');
console.log('查询结果:', rows);
// 执行事务示例
console.log('\n执行事务示例...');
await pool.transaction(async (connection) => {
await connection.execute(
'INSERT INTO test_users (name, email) VALUES (?, ?)',
['David', 'david@example.com']
);
await connection.execute(
'UPDATE test_users SET name = ? WHERE email = ?',
['David Updated', 'david@example.com']
);
});
// 显示连接池状态
console.log('\n连接池状态:');
const status = await pool.getPoolStatus();
console.log(JSON.stringify(status, null, 2));
// 等待一段时间观察监控
console.log('\n等待监控数据...');
await new Promise(resolve => setTimeout(resolve, 5000));
} catch (error) {
console.error('❌ 演示过程中出现错误:', error);
} finally {
await pool.close();
}
}
module.exports = MySQLConnectionPool;
🐘 PostgreSQL连接池实现
使用pg连接池
javascript
// postgresql-connection-pool.js
const { Pool, Client } = require('pg');
class PostgreSQLConnectionPool {
constructor(config) {
this.config = {
host: config.host || 'localhost',
port: config.port || 5432,
user: config.user || 'postgres',
password: config.password || '',
database: config.database,
// 连接池配置
max: config.max || 20, // 最大连接数
min: config.min || 5, // 最小连接数
idle: config.idle || 1000, // 空闲连接超时时间
acquire: config.acquire || 30000, // 获取连接超时时间
evict: config.evict || 1000, // 检查空闲连接的间隔
// 连接配置
connectionTimeoutMillis: config.connectionTimeoutMillis || 2000,
idleTimeoutMillis: config.idleTimeoutMillis || 300000, // 5分钟
// SSL配置
ssl: config.ssl || false,
// 其他配置
application_name: config.application_name || 'nodejs_app'
};
this.pool = null;
this.isInitialized = false;
this.stats = {
connections: {
total: 0,
active: 0,
idle: 0,
waiting: 0
},
queries: {
total: 0,
successful: 0,
failed: 0,
totalTime: 0
}
};
}
// 初始化连接池
async initialize() {
if (this.isInitialized) {
return;
}
try {
console.log('🚀 初始化PostgreSQL连接池...');
this.pool = new Pool(this.config);
// 监听连接事件
this.pool.on('connect', (client) => {
console.log('🔗 新连接建立');
this.updateConnectionStats();
});
this.pool.on('acquire', (client) => {
console.log('📤 连接获取');
this.updateConnectionStats();
});
this.pool.on('remove', (client) => {
console.log('🗑️ 连接移除');
this.updateConnectionStats();
});
this.pool.on('error', (error, client) => {
console.error('❌ 连接池错误:', error);
});
// 测试连接
await this.testConnection();
this.isInitialized = true;
console.log('✅ PostgreSQL连接池初始化成功');
} catch (error) {
console.error('❌ PostgreSQL连接池初始化失败:', error);
throw error;
}
}
// 更新连接统计
updateConnectionStats() {
if (this.pool) {
this.stats.connections.total = this.pool.totalCount;
this.stats.connections.active = this.pool.totalCount - this.pool.idleCount;
this.stats.connections.idle = this.pool.idleCount;
this.stats.connections.waiting = this.pool.waitingCount;
}
}
// 测试连接
async testConnection() {
const client = await this.pool.connect();
try {
const result = await client.query('SELECT NOW() as current_time');
console.log('🏓 数据库连接测试成功:', result.rows[0].current_time);
} finally {
client.release();
}
}
// 执行查询
async query(text, params = []) {
if (!this.isInitialized) {
throw new Error('连接池未初始化');
}
const startTime = Date.now();
try {
this.stats.queries.total++;
const result = await this.pool.query(text, params);
const queryTime = Date.now() - startTime;
this.stats.queries.totalTime += queryTime;
this.stats.queries.successful++;
console.log(`✅ 查询执行成功,耗时: ${queryTime}ms,返回 ${result.rows.length} 行`);
return result;
} catch (error) {
this.stats.queries.failed++;
console.error('❌ 查询执行失败:', error.message);
throw error;
}
}
// 执行事务
async transaction(callback) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
console.log('✅ 事务提交成功');
return result;
} catch (error) {
await client.query('ROLLBACK');
console.error('❌ 事务回滚:', error.message);
throw error;
} finally {
client.release();
}
}
// 流式查询
async streamQuery(text, params = [], options = {}) {
const client = await this.pool.connect();
try {
const query = client.query(text, params);
const results = [];
// 设置行处理器
query.on('row', (row) => {
if (options.onRow) {
options.onRow(row);
} else {
results.push(row);
}
});
query.on('error', (error) => {
console.error('❌ 流式查询错误:', error);
throw error;
});
await new Promise((resolve, reject) => {
query.on('end', resolve);
query.on('error', reject);
});
console.log('✅ 流式查询完成');
return results;
} finally {
client.release();
}
}
// 批量操作
async batchOperation(operations, batchSize = 100) {
let completed = 0;
const results = [];
for (let i = 0; i < operations.length; i += batchSize) {
const batch = operations.slice(i, i + batchSize);
await this.transaction(async (client) => {
for (const operation of batch) {
const result = await client.query(operation.text, operation.params);
results.push(result);
completed++;
}
});
console.log(`📦 批量操作进度: ${completed}/${operations.length}`);
}
console.log(`✅ 批量操作完成,总计: ${completed} 个操作`);
return results;
}
// 获取连接池状态
async getPoolStatus() {
this.updateConnectionStats();
// 获取数据库统计信息
let dbStats = {};
try {
const result = await this.query(`
SELECT
numbackends as active_connections,
xact_commit as transactions_committed,
xact_rollback as transactions_rolled_back,
blks_read as blocks_read,
blks_hit as blocks_hit,
tup_returned as tuples_returned,
tup_fetched as tuples_fetched,
tup_inserted as tuples_inserted,
tup_updated as tuples_updated,
tup_deleted as tuples_deleted
FROM pg_stat_database
WHERE datname = $1
`, [this.config.database]);
if (result.rows.length > 0) {
dbStats = result.rows[0];
}
} catch (error) {
console.warn('获取数据库统计信息失败:', error.message);
}
return {
pool: {
total: this.stats.connections.total,
active: this.stats.connections.active,
idle: this.stats.connections.idle,
waiting: this.stats.connections.waiting,
config: {
max: this.config.max,
min: this.config.min,
idleTimeoutMillis: this.config.idleTimeoutMillis
}
},
queries: {
...this.stats.queries,
avgTime: this.stats.queries.total > 0 ?
(this.stats.queries.totalTime / this.stats.queries.total).toFixed(2) : 0
},
database: dbStats,
timestamp: new Date().toISOString()
};
}
// 连接池健康检查
async healthCheck() {
try {
console.log('🔍 执行连接池健康检查...');
const status = await this.getPoolStatus();
const issues = [];
// 检查连接池利用率
if (status.pool.active / status.pool.total > 0.8) {
issues.push('连接池使用率过高 (>80%)');
}
// 检查等待队列
if (status.pool.waiting > 0) {
issues.push(`有 ${status.pool.waiting} 个请求在等待连接`);
}
// 检查查询失败率
const failureRate = status.queries.failed / status.queries.total;
if (failureRate > 0.05) { // 5%
issues.push(`查询失败率过高: ${(failureRate * 100).toFixed(2)}%`);
}
// 检查平均查询时间
if (parseFloat(status.queries.avgTime) > 1000) {
issues.push(`平均查询时间过长: ${status.queries.avgTime}ms`);
}
if (issues.length > 0) {
console.warn('⚠️ 健康检查发现问题:');
issues.forEach(issue => console.warn(` - ${issue}`));
} else {
console.log('✅ 连接池健康状态良好');
}
return {
healthy: issues.length === 0,
issues,
status
};
} catch (error) {
console.error('❌ 健康检查失败:', error);
return {
healthy: false,
issues: ['健康检查执行失败'],
error: error.message
};
}
}
// 启动监控
startMonitoring(intervalMs = 30000) {
console.log(`🔍 启动连接池监控 (间隔: ${intervalMs}ms)`);
this.monitorInterval = setInterval(async () => {
const healthResult = await this.healthCheck();
if (!healthResult.healthy) {
console.warn('⚠️ 连接池状态异常,建议检查');
}
console.log('📊 连接池状态摘要:');
console.log(` 连接: ${healthResult.status.pool.active}/${healthResult.status.pool.total} (等待: ${healthResult.status.pool.waiting})`);
console.log(` 查询: 成功=${healthResult.status.queries.successful}, 失败=${healthResult.status.queries.failed}, 平均时间=${healthResult.status.queries.avgTime}ms`);
}, intervalMs);
}
// 停止监控
stopMonitoring() {
if (this.monitorInterval) {
clearInterval(this.monitorInterval);
this.monitorInterval = null;
console.log('⏹️ 连接池监控已停止');
}
}
// 关闭连接池
async close() {
if (this.pool) {
this.stopMonitoring();
console.log('🔄 正在关闭连接池...');
await this.pool.end();
this.pool = null;
this.isInitialized = false;
console.log('✅ 连接池已关闭');
}
}
}
module.exports = PostgreSQLConnectionPool;
🎯 最佳实践
连接池优化策略
javascript
// connection-pool-optimizer.js
class ConnectionPoolOptimizer {
constructor() {
this.recommendations = [];
this.metrics = new Map();
}
// 分析连接池配置
analyzeConfiguration(config, workloadPattern) {
console.log('🔍 分析连接池配置...');
const recommendations = [];
// 基于工作负载模式的建议
switch (workloadPattern.type) {
case 'high_concurrency':
if (config.max < 50) {
recommendations.push({
type: 'configuration',
priority: 'high',
message: '高并发场景建议增加最大连接数到50+',
suggestion: { max: Math.max(50, config.max * 2) }
});
}
break;
case 'batch_processing':
if (config.min > 5) {
recommendations.push({
type: 'configuration',
priority: 'medium',
message: '批处理场景可以减少最小连接数以节省资源',
suggestion: { min: Math.min(5, config.min) }
});
}
break;
case 'low_latency':
if (config.acquireTimeout > 5000) {
recommendations.push({
type: 'configuration',
priority: 'high',
message: '低延迟场景建议减少连接获取超时时间',
suggestion: { acquireTimeout: 5000 }
});
}
break;
}
// 通用配置检查
if (config.max / config.min > 10) {
recommendations.push({
type: 'configuration',
priority: 'medium',
message: '最大最小连接数比例过大,可能导致连接数波动',
suggestion: { min: Math.ceil(config.max / 5) }
});
}
return recommendations;
}
// 性能基准测试
async performanceBenchmark(pool, testCases) {
console.log('🏃 执行性能基准测试...');
const results = [];
for (const testCase of testCases) {
console.log(`测试案例: ${testCase.name}`);
const startTime = Date.now();
const promises = [];
// 并发执行测试
for (let i = 0; i < testCase.concurrency; i++) {
promises.push(this.executeTestQuery(pool, testCase.query));
}
try {
await Promise.all(promises);
const endTime = Date.now();
const totalTime = endTime - startTime;
const result = {
name: testCase.name,
concurrency: testCase.concurrency,
totalTime,
avgTime: totalTime / testCase.concurrency,
throughput: (testCase.concurrency / totalTime * 1000).toFixed(2)
};
results.push(result);
console.log(` 结果: 总时间=${totalTime}ms, 平均=${result.avgTime.toFixed(2)}ms, 吞吐量=${result.throughput} ops/sec`);
} catch (error) {
console.error(` 测试失败: ${error.message}`);
results.push({
name: testCase.name,
error: error.message
});
}
}
return results;
}
async executeTestQuery(pool, query) {
const startTime = Date.now();
try {
await pool.query(query.sql, query.params);
return Date.now() - startTime;
} catch (error) {
throw error;
}
}
// 连接泄漏检测
detectConnectionLeaks(poolStats, timeWindow = 300000) { // 5分钟
console.log('🔍 检测连接泄漏...');
const now = Date.now();
const windowStart = now - timeWindow;
// 获取时间窗口内的统计数据
const recentStats = Array.from(this.metrics.entries())
.filter(([timestamp]) => timestamp >= windowStart)
.map(([timestamp, stats]) => ({ timestamp, ...stats }));
if (recentStats.length < 2) {
return { detected: false, message: '数据不足,无法检测' };
}
// 分析连接数趋势
const connectionTrends = recentStats.map(stat => stat.connections?.active || 0);
const avgConnections = connectionTrends.reduce((a, b) => a + b, 0) / connectionTrends.length;
const maxConnections = Math.max(...connectionTrends);
const minConnections = Math.min(...connectionTrends);
// 检测异常模式
const issues = [];
// 连接数持续增长
if (maxConnections > avgConnections * 1.5) {
issues.push('连接数出现异常峰值');
}
// 连接数不释放
const recentConnections = connectionTrends.slice(-5);
const isIncreasing = recentConnections.every((val, i) =>
i === 0 || val >= recentConnections[i - 1]
);
if (isIncreasing && recentConnections.length >= 5) {
issues.push('连接数持续增长,可能存在连接泄漏');
}
// 空闲连接过多
const idleConnections = recentStats[recentStats.length - 1].connections?.idle || 0;
const activeConnections = recentStats[recentStats.length - 1].connections?.active || 0;
if (idleConnections > activeConnections * 2) {
issues.push('空闲连接数过多,建议调整连接池配置');
}
return {
detected: issues.length > 0,
issues,
stats: {
avgConnections: avgConnections.toFixed(2),
maxConnections,
minConnections,
currentActive: activeConnections,
currentIdle: idleConnections
}
};
}
// 记录指标
recordMetrics(stats) {
const timestamp = Date.now();
this.metrics.set(timestamp, stats);
// 保持最近1小时的数据
const oneHourAgo = timestamp - 3600000;
for (const [time] of this.metrics) {
if (time < oneHourAgo) {
this.metrics.delete(time);
} else {
break;
}
}
}
// 生成优化报告
generateOptimizationReport(poolStats, workloadPattern) {
const report = {
timestamp: new Date().toISOString(),
poolStatus: poolStats,
workloadPattern,
recommendations: [],
metrics: {
connectionUtilization: poolStats.pool.active / poolStats.pool.total,
queryPerformance: poolStats.queries.avgTime,
errorRate: poolStats.queries.failed / poolStats.queries.total
}
};
// 连接利用率分析
if (report.metrics.connectionUtilization > 0.9) {
report.recommendations.push({
category: 'capacity',
priority: 'high',
message: '连接利用率过高,建议增加最大连接数',
action: 'increase_max_connections'
});
} else if (report.metrics.connectionUtilization < 0.3) {
report.recommendations.push({
category: 'efficiency',
priority: 'medium',
message: '连接利用率较低,可以减少连接数以节省资源',
action: 'decrease_connections'
});
}
// 查询性能分析
if (report.metrics.queryPerformance > 1000) {
report.recommendations.push({
category: 'performance',
priority: 'high',
message: '平均查询时间过长,检查查询优化和索引',
action: 'optimize_queries'
});
}
// 错误率分析
if (report.metrics.errorRate > 0.05) {
report.recommendations.push({
category: 'reliability',
priority: 'high',
message: '查询错误率过高,检查数据库连接和查询逻辑',
action: 'investigate_errors'
});
}
return report;
}
}
// 使用示例
async function demonstrateOptimization() {
console.log('🎯 连接池优化演示...\n');
const optimizer = new ConnectionPoolOptimizer();
// 模拟不同工作负载模式的配置分析
const workloadPatterns = [
{
type: 'high_concurrency',
description: '高并发Web应用',
characteristics: {
peakConcurrency: 200,
avgQueryTime: 50,
readWriteRatio: '80:20'
}
},
{
type: 'batch_processing',
description: '批处理任务',
characteristics: {
batchSize: 1000,
avgQueryTime: 200,
readWriteRatio: '20:80'
}
},
{
type: 'low_latency',
description: '低延迟实时系统',
characteristics: {
maxLatency: 10,
avgQueryTime: 5,
readWriteRatio: '90:10'
}
}
];
const baseConfig = {
min: 5,
max: 20,
acquireTimeout: 30000,
idleTimeout: 300000
};
for (const pattern of workloadPatterns) {
console.log(`分析工作负载: ${pattern.description}`);
const recommendations = optimizer.analyzeConfiguration(baseConfig, pattern);
if (recommendations.length > 0) {
console.log(' 优化建议:');
recommendations.forEach(rec => {
console.log(` - [${rec.priority.toUpperCase()}] ${rec.message}`);
if (rec.suggestion) {
console.log(` 建议配置: ${JSON.stringify(rec.suggestion)}`);
}
});
} else {
console.log(' ✅ 当前配置适合此工作负载');
}
console.log('');
}
// 模拟连接泄漏检测
console.log('模拟连接泄漏检测...');
// 记录一些模拟指标
const baseTime = Date.now() - 300000; // 5分钟前
for (let i = 0; i < 10; i++) {
optimizer.recordMetrics({
connections: {
active: 5 + i * 2, // 模拟连接数增长
idle: 3,
total: 20
},
queries: {
total: 100 + i * 50,
successful: 95 + i * 48,
failed: 5 + i * 2
}
});
}
const leakDetection = optimizer.detectConnectionLeaks();
console.log('连接泄漏检测结果:', leakDetection);
console.log('\n✅ 连接池优化演示完成');
}
module.exports = { ConnectionPoolOptimizer, demonstrateOptimization };
数据库连接池是高性能Node.js应用的基础设施,合理的配置和监控能显著提升应用性能和稳定性!