WebSocket应用
📖 概述
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 请求-响应模式不同,WebSocket 允许服务器和客户端之间进行实时双向数据传输,非常适合构建实时应用程序。
🎯 学习目标
- 掌握 WebSocket 协议原理
- 学习 Node.js 中的 WebSocket 实现
- 了解实时应用架构设计
- 掌握 WebSocket 性能优化技巧
🚀 快速开始
1. 原生 WebSocket 服务器
javascript
// 使用原生 ws 库
const WebSocket = require('ws');
const http = require('http');
// 创建 HTTP 服务器
const server = http.createServer();
// 创建 WebSocket 服务器
const wss = new WebSocket.Server({
server,
path: '/ws'
});
// 连接处理
wss.on('connection', (ws, req) => {
const clientIp = req.socket.remoteAddress;
console.log(`新客户端连接: ${clientIp}`);
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
message: '连接成功',
timestamp: new Date().toISOString()
}));
// 消息处理
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
console.log('收到消息:', message);
handleMessage(ws, message);
} catch (error) {
console.error('消息解析错误:', error);
ws.send(JSON.stringify({
type: 'error',
message: '消息格式错误'
}));
}
});
// 连接关闭
ws.on('close', (code, reason) => {
console.log(`客户端断开连接: ${code} - ${reason}`);
});
// 错误处理
ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
// 心跳检测
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
});
// 心跳检测定时器
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
// 清理定时器
wss.on('close', () => {
clearInterval(interval);
});
// 消息处理函数
function handleMessage(ws, message) {
switch (message.type) {
case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;
case 'chat':
broadcastMessage(message);
break;
case 'join-room':
joinRoom(ws, message.room);
break;
case 'leave-room':
leaveRoom(ws, message.room);
break;
default:
ws.send(JSON.stringify({
type: 'error',
message: '未知消息类型'
}));
}
}
// 广播消息
function broadcastMessage(message) {
const broadcastData = JSON.stringify({
type: 'chat',
...message,
timestamp: new Date().toISOString()
});
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(broadcastData);
}
});
}
// 启动服务器
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
console.log(`WebSocket 服务器运行在端口 ${PORT}`);
});
2. 客户端连接示例
javascript
// 浏览器客户端
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectInterval = 1000;
this.messageHandlers = new Map();
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.onopen = (event) => {
console.log('WebSocket 连接已建立');
this.reconnectAttempts = 0;
resolve(event);
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.handleMessage(message);
} catch (error) {
console.error('消息解析错误:', error);
}
};
this.ws.onclose = (event) => {
console.log('WebSocket 连接已关闭:', event.code, event.reason);
this.attemptReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
reject(error);
};
});
}
send(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
console.warn('WebSocket 未连接,无法发送消息');
}
}
on(type, handler) {
if (!this.messageHandlers.has(type)) {
this.messageHandlers.set(type, []);
}
this.messageHandlers.get(type).push(handler);
}
handleMessage(message) {
const handlers = this.messageHandlers.get(message.type);
if (handlers) {
handlers.forEach(handler => handler(message));
}
}
attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectInterval * Math.pow(2, this.reconnectAttempts - 1);
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts}),${delay}ms 后重试`);
setTimeout(() => {
this.connect().catch(error => {
console.error('重连失败:', error);
});
}, delay);
} else {
console.error('达到最大重连次数,停止重连');
}
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// 使用示例
const client = new WebSocketClient('ws://localhost:3000/ws');
client.on('welcome', (message) => {
console.log('收到欢迎消息:', message);
});
client.on('chat', (message) => {
console.log('收到聊天消息:', message);
});
client.connect().then(() => {
// 发送消息
client.send({
type: 'chat',
user: 'Alice',
message: 'Hello, World!'
});
});
🏗️ 实时应用架构
1. 房间管理系统
javascript
// 房间管理器
class RoomManager {
constructor() {
this.rooms = new Map(); // roomId -> Set<WebSocket>
this.clientRooms = new Map(); // WebSocket -> Set<roomId>
}
// 创建房间
createRoom(roomId, options = {}) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
console.log(`房间已创建: ${roomId}`);
return true;
}
return false;
}
// 加入房间
joinRoom(ws, roomId) {
// 确保房间存在
if (!this.rooms.has(roomId)) {
this.createRoom(roomId);
}
// 添加客户端到房间
this.rooms.get(roomId).add(ws);
// 记录客户端所在房间
if (!this.clientRooms.has(ws)) {
this.clientRooms.set(ws, new Set());
}
this.clientRooms.get(ws).add(roomId);
// 通知房间内其他用户
this.broadcastToRoom(roomId, {
type: 'user-joined',
roomId,
timestamp: new Date().toISOString()
}, ws);
// 发送房间信息给新用户
ws.send(JSON.stringify({
type: 'room-joined',
roomId,
memberCount: this.rooms.get(roomId).size
}));
console.log(`用户加入房间: ${roomId}`);
}
// 离开房间
leaveRoom(ws, roomId) {
if (this.rooms.has(roomId)) {
this.rooms.get(roomId).delete(ws);
// 如果房间为空,删除房间
if (this.rooms.get(roomId).size === 0) {
this.rooms.delete(roomId);
console.log(`房间已删除: ${roomId}`);
} else {
// 通知房间内其他用户
this.broadcastToRoom(roomId, {
type: 'user-left',
roomId,
timestamp: new Date().toISOString()
});
}
}
if (this.clientRooms.has(ws)) {
this.clientRooms.get(ws).delete(roomId);
}
console.log(`用户离开房间: ${roomId}`);
}
// 离开所有房间
leaveAllRooms(ws) {
if (this.clientRooms.has(ws)) {
const rooms = Array.from(this.clientRooms.get(ws));
rooms.forEach(roomId => this.leaveRoom(ws, roomId));
this.clientRooms.delete(ws);
}
}
// 向房间广播消息
broadcastToRoom(roomId, message, excludeClient = null) {
if (this.rooms.has(roomId)) {
const messageData = JSON.stringify(message);
this.rooms.get(roomId).forEach(client => {
if (client !== excludeClient && client.readyState === WebSocket.OPEN) {
client.send(messageData);
}
});
}
}
// 获取房间信息
getRoomInfo(roomId) {
return {
id: roomId,
memberCount: this.rooms.has(roomId) ? this.rooms.get(roomId).size : 0,
exists: this.rooms.has(roomId)
};
}
// 获取所有房间
getAllRooms() {
const roomList = [];
this.rooms.forEach((clients, roomId) => {
roomList.push({
id: roomId,
memberCount: clients.size
});
});
return roomList;
}
}
// 使用房间管理器
const roomManager = new RoomManager();
// WebSocket 服务器集成
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const message = JSON.parse(data);
switch (message.type) {
case 'create-room':
const created = roomManager.createRoom(message.roomId);
ws.send(JSON.stringify({
type: 'room-created',
roomId: message.roomId,
success: created
}));
break;
case 'join-room':
roomManager.joinRoom(ws, message.roomId);
break;
case 'leave-room':
roomManager.leaveRoom(ws, message.roomId);
break;
case 'room-message':
roomManager.broadcastToRoom(message.roomId, {
type: 'room-message',
roomId: message.roomId,
user: message.user,
content: message.content,
timestamp: new Date().toISOString()
});
break;
case 'get-rooms':
ws.send(JSON.stringify({
type: 'rooms-list',
rooms: roomManager.getAllRooms()
}));
break;
}
});
ws.on('close', () => {
roomManager.leaveAllRooms(ws);
});
});
2. 用户认证和授权
javascript
// 用户认证管理器
const jwt = require('jsonwebtoken');
class AuthManager {
constructor(jwtSecret) {
this.jwtSecret = jwtSecret;
this.authenticatedClients = new Map(); // WebSocket -> userInfo
}
// 验证 JWT 令牌
verifyToken(token) {
try {
return jwt.verify(token, this.jwtSecret);
} catch (error) {
throw new Error('Invalid token');
}
}
// 认证客户端
authenticateClient(ws, token) {
try {
const decoded = this.verifyToken(token);
const userInfo = {
id: decoded.userId,
username: decoded.username,
roles: decoded.roles || [],
authenticatedAt: new Date()
};
this.authenticatedClients.set(ws, userInfo);
return userInfo;
} catch (error) {
throw new Error('Authentication failed');
}
}
// 获取用户信息
getUserInfo(ws) {
return this.authenticatedClients.get(ws);
}
// 检查权限
hasPermission(ws, permission) {
const userInfo = this.getUserInfo(ws);
return userInfo && userInfo.roles.includes(permission);
}
// 移除认证信息
removeClient(ws) {
this.authenticatedClients.delete(ws);
}
// 获取在线用户列表
getOnlineUsers() {
const users = [];
this.authenticatedClients.forEach((userInfo, ws) => {
if (ws.readyState === WebSocket.OPEN) {
users.push({
id: userInfo.id,
username: userInfo.username,
authenticatedAt: userInfo.authenticatedAt
});
}
});
return users;
}
}
// 认证中间件
function requireAuth(authManager) {
return (ws, message, next) => {
const userInfo = authManager.getUserInfo(ws);
if (!userInfo) {
ws.send(JSON.stringify({
type: 'error',
message: 'Authentication required'
}));
return;
}
message.user = userInfo;
next();
};
}
// 权限检查中间件
function requirePermission(authManager, permission) {
return (ws, message, next) => {
if (!authManager.hasPermission(ws, permission)) {
ws.send(JSON.stringify({
type: 'error',
message: 'Insufficient permissions'
}));
return;
}
next();
};
}
// 使用认证
const authManager = new AuthManager(process.env.JWT_SECRET);
wss.on('connection', (ws) => {
let isAuthenticated = false;
ws.on('message', (data) => {
const message = JSON.parse(data);
if (message.type === 'auth') {
try {
const userInfo = authManager.authenticateClient(ws, message.token);
isAuthenticated = true;
ws.send(JSON.stringify({
type: 'auth-success',
user: userInfo
}));
} catch (error) {
ws.send(JSON.stringify({
type: 'auth-failed',
message: error.message
}));
ws.close();
}
return;
}
// 需要认证的操作
if (!isAuthenticated) {
ws.send(JSON.stringify({
type: 'error',
message: 'Please authenticate first'
}));
return;
}
// 处理认证后的消息
handleAuthenticatedMessage(ws, message);
});
ws.on('close', () => {
authManager.removeClient(ws);
});
});
function handleAuthenticatedMessage(ws, message) {
const userInfo = authManager.getUserInfo(ws);
switch (message.type) {
case 'admin-broadcast':
if (authManager.hasPermission(ws, 'admin')) {
broadcastToAll({
type: 'admin-message',
content: message.content,
from: userInfo.username
});
} else {
ws.send(JSON.stringify({
type: 'error',
message: 'Admin permission required'
}));
}
break;
case 'get-online-users':
ws.send(JSON.stringify({
type: 'online-users',
users: authManager.getOnlineUsers()
}));
break;
}
}
3. 消息队列和持久化
javascript
// 消息队列管理器
const Redis = require('redis');
class MessageQueue {
constructor(redisConfig) {
this.redis = Redis.createClient(redisConfig);
this.subscriber = Redis.createClient(redisConfig);
this.messageHandlers = new Map();
this.setupSubscriber();
}
setupSubscriber() {
this.subscriber.on('message', (channel, message) => {
const handlers = this.messageHandlers.get(channel);
if (handlers) {
const messageData = JSON.parse(message);
handlers.forEach(handler => handler(messageData));
}
});
}
// 发布消息
async publish(channel, message) {
await this.redis.publish(channel, JSON.stringify(message));
}
// 订阅频道
subscribe(channel, handler) {
if (!this.messageHandlers.has(channel)) {
this.messageHandlers.set(channel, []);
this.subscriber.subscribe(channel);
}
this.messageHandlers.get(channel).push(handler);
}
// 存储消息到历史记录
async storeMessage(roomId, message) {
const key = `room:${roomId}:messages`;
const messageData = {
...message,
timestamp: new Date().toISOString()
};
// 使用 Redis List 存储消息
await this.redis.lpush(key, JSON.stringify(messageData));
// 只保留最近 1000 条消息
await this.redis.ltrim(key, 0, 999);
}
// 获取历史消息
async getMessageHistory(roomId, limit = 50) {
const key = `room:${roomId}:messages`;
const messages = await this.redis.lrange(key, 0, limit - 1);
return messages.map(msg => JSON.parse(msg)).reverse();
}
// 存储用户会话
async storeUserSession(userId, sessionData) {
const key = `user:${userId}:session`;
await this.redis.setex(key, 3600, JSON.stringify(sessionData)); // 1小时过期
}
// 获取用户会话
async getUserSession(userId) {
const key = `user:${userId}:session`;
const sessionData = await this.redis.get(key);
return sessionData ? JSON.parse(sessionData) : null;
}
}
// 集成消息队列
const messageQueue = new MessageQueue({
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
});
// 订阅跨服务器消息
messageQueue.subscribe('room-broadcast', (message) => {
roomManager.broadcastToRoom(message.roomId, message.data);
});
// 修改房间消息处理
function handleRoomMessage(ws, message) {
const userInfo = authManager.getUserInfo(ws);
const messageData = {
type: 'room-message',
roomId: message.roomId,
user: {
id: userInfo.id,
username: userInfo.username
},
content: message.content,
timestamp: new Date().toISOString()
};
// 存储消息到历史记录
messageQueue.storeMessage(message.roomId, messageData);
// 发布到其他服务器实例
messageQueue.publish('room-broadcast', {
roomId: message.roomId,
data: messageData
});
// 本地广播
roomManager.broadcastToRoom(message.roomId, messageData);
}
// 获取历史消息
wss.on('connection', (ws) => {
ws.on('message', async (data) => {
const message = JSON.parse(data);
if (message.type === 'get-history') {
const history = await messageQueue.getMessageHistory(message.roomId, message.limit);
ws.send(JSON.stringify({
type: 'message-history',
roomId: message.roomId,
messages: history
}));
}
});
});
📊 性能优化
1. 连接池管理
javascript
// 连接池管理器
class ConnectionPool {
constructor(maxConnections = 10000) {
this.maxConnections = maxConnections;
this.connections = new Set();
this.connectionStats = {
total: 0,
active: 0,
inactive: 0
};
// 定期清理无效连接
setInterval(() => {
this.cleanupConnections();
}, 30000);
}
addConnection(ws) {
if (this.connections.size >= this.maxConnections) {
throw new Error('Connection limit exceeded');
}
this.connections.add(ws);
this.updateStats();
ws.on('close', () => {
this.removeConnection(ws);
});
console.log(`连接已添加,当前连接数: ${this.connections.size}`);
}
removeConnection(ws) {
this.connections.delete(ws);
this.updateStats();
console.log(`连接已移除,当前连接数: ${this.connections.size}`);
}
cleanupConnections() {
const before = this.connections.size;
this.connections.forEach(ws => {
if (ws.readyState !== WebSocket.OPEN) {
this.connections.delete(ws);
}
});
const cleaned = before - this.connections.size;
if (cleaned > 0) {
console.log(`清理了 ${cleaned} 个无效连接`);
this.updateStats();
}
}
updateStats() {
this.connectionStats.total = this.connections.size;
this.connectionStats.active = 0;
this.connectionStats.inactive = 0;
this.connections.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
this.connectionStats.active++;
} else {
this.connectionStats.inactive++;
}
});
}
getStats() {
this.updateStats();
return { ...this.connectionStats };
}
broadcast(message) {
const messageData = JSON.stringify(message);
let sentCount = 0;
this.connections.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(messageData);
sentCount++;
}
});
return sentCount;
}
}
const connectionPool = new ConnectionPool(10000);
// 连接限制中间件
wss.on('connection', (ws, req) => {
try {
connectionPool.addConnection(ws);
} catch (error) {
ws.close(1013, 'Server overloaded');
return;
}
// 正常处理连接
handleConnection(ws, req);
});
2. 消息批处理
javascript
// 消息批处理器
class MessageBatcher {
constructor(batchSize = 100, flushInterval = 100) {
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.batches = new Map(); // roomId -> messages[]
this.timers = new Map(); // roomId -> timer
}
addMessage(roomId, message) {
if (!this.batches.has(roomId)) {
this.batches.set(roomId, []);
}
this.batches.get(roomId).push(message);
// 检查是否需要立即刷新
if (this.batches.get(roomId).length >= this.batchSize) {
this.flush(roomId);
} else {
// 设置定时器
this.scheduleFlush(roomId);
}
}
scheduleFlush(roomId) {
if (this.timers.has(roomId)) {
return; // 已经有定时器了
}
const timer = setTimeout(() => {
this.flush(roomId);
}, this.flushInterval);
this.timers.set(roomId, timer);
}
flush(roomId) {
const messages = this.batches.get(roomId);
if (!messages || messages.length === 0) {
return;
}
// 清理定时器
if (this.timers.has(roomId)) {
clearTimeout(this.timers.get(roomId));
this.timers.delete(roomId);
}
// 发送批量消息
roomManager.broadcastToRoom(roomId, {
type: 'message-batch',
messages: messages,
timestamp: new Date().toISOString()
});
// 清空批次
this.batches.set(roomId, []);
console.log(`刷新了 ${messages.length} 条消息到房间 ${roomId}`);
}
flushAll() {
this.batches.forEach((messages, roomId) => {
if (messages.length > 0) {
this.flush(roomId);
}
});
}
}
const messageBatcher = new MessageBatcher(50, 200);
// 使用批处理
function handleBatchedMessage(ws, message) {
const userInfo = authManager.getUserInfo(ws);
const messageData = {
user: userInfo.username,
content: message.content,
timestamp: new Date().toISOString()
};
messageBatcher.addMessage(message.roomId, messageData);
}
3. 压缩和优化
javascript
// 消息压缩
const zlib = require('zlib');
class MessageCompressor {
constructor() {
this.compressionThreshold = 1024; // 1KB
}
async compress(data) {
const jsonData = JSON.stringify(data);
if (jsonData.length < this.compressionThreshold) {
return {
compressed: false,
data: jsonData
};
}
try {
const compressed = await new Promise((resolve, reject) => {
zlib.gzip(jsonData, (err, result) => {
if (err) reject(err);
else resolve(result);
});
});
return {
compressed: true,
data: compressed.toString('base64')
};
} catch (error) {
console.error('压缩失败:', error);
return {
compressed: false,
data: jsonData
};
}
}
async decompress(compressedData) {
try {
const buffer = Buffer.from(compressedData, 'base64');
const decompressed = await new Promise((resolve, reject) => {
zlib.gunzip(buffer, (err, result) => {
if (err) reject(err);
else resolve(result.toString());
});
});
return JSON.parse(decompressed);
} catch (error) {
console.error('解压失败:', error);
throw error;
}
}
}
// 优化的发送函数
const compressor = new MessageCompressor();
async function sendOptimizedMessage(ws, data) {
try {
const result = await compressor.compress(data);
if (result.compressed) {
ws.send(JSON.stringify({
type: 'compressed',
data: result.data
}));
} else {
ws.send(result.data);
}
} catch (error) {
console.error('发送消息失败:', error);
}
}
// 客户端解压处理
ws.on('message', async (data) => {
try {
const message = JSON.parse(data);
if (message.type === 'compressed') {
const decompressed = await compressor.decompress(message.data);
handleMessage(ws, decompressed);
} else {
handleMessage(ws, message);
}
} catch (error) {
console.error('消息处理失败:', error);
}
});
🔧 监控和调试
1. 性能监控
javascript
// 性能监控器
class PerformanceMonitor {
constructor() {
this.metrics = {
connections: {
total: 0,
active: 0,
peak: 0
},
messages: {
sent: 0,
received: 0,
failed: 0,
rate: 0
},
rooms: {
total: 0,
active: 0
},
memory: {
heapUsed: 0,
heapTotal: 0,
external: 0
}
};
this.startTime = Date.now();
this.lastMessageCount = 0;
// 定期更新指标
setInterval(() => {
this.updateMetrics();
}, 5000);
}
updateMetrics() {
// 连接指标
this.metrics.connections.total = connectionPool.connections.size;
this.metrics.connections.active = connectionPool.getStats().active;
this.metrics.connections.peak = Math.max(
this.metrics.connections.peak,
this.metrics.connections.total
);
// 房间指标
this.metrics.rooms.total = roomManager.rooms.size;
this.metrics.rooms.active = Array.from(roomManager.rooms.values())
.filter(clients => clients.size > 0).length;
// 消息速率
const currentMessageCount = this.metrics.messages.sent + this.metrics.messages.received;
const messageRate = (currentMessageCount - this.lastMessageCount) / 5; // 每秒消息数
this.metrics.messages.rate = messageRate;
this.lastMessageCount = currentMessageCount;
// 内存使用
const memUsage = process.memoryUsage();
this.metrics.memory.heapUsed = Math.round(memUsage.heapUsed / 1024 / 1024);
this.metrics.memory.heapTotal = Math.round(memUsage.heapTotal / 1024 / 1024);
this.metrics.memory.external = Math.round(memUsage.external / 1024 / 1024);
}
incrementMessageSent() {
this.metrics.messages.sent++;
}
incrementMessageReceived() {
this.metrics.messages.received++;
}
incrementMessageFailed() {
this.metrics.messages.failed++;
}
getMetrics() {
this.updateMetrics();
return {
...this.metrics,
uptime: Math.round((Date.now() - this.startTime) / 1000)
};
}
getHealthStatus() {
const metrics = this.getMetrics();
return {
status: this.determineHealthStatus(metrics),
timestamp: new Date().toISOString(),
metrics
};
}
determineHealthStatus(metrics) {
// 简单的健康状态判断
if (metrics.memory.heapUsed > 500) { // 500MB
return 'warning';
}
if (metrics.connections.active > 8000) {
return 'warning';
}
if (metrics.messages.failed / (metrics.messages.sent + metrics.messages.received) > 0.1) {
return 'critical';
}
return 'healthy';
}
}
const performanceMonitor = new PerformanceMonitor();
// 监控端点
const express = require('express');
const app = express();
app.get('/health', (req, res) => {
const health = performanceMonitor.getHealthStatus();
const statusCode = health.status === 'healthy' ? 200 :
health.status === 'warning' ? 200 : 503;
res.status(statusCode).json(health);
});
app.get('/metrics', (req, res) => {
res.json(performanceMonitor.getMetrics());
});
app.listen(3001, () => {
console.log('监控服务器运行在端口 3001');
});
2. 日志记录
javascript
// 日志记录器
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: { service: 'websocket-server' },
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' }),
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
})
]
});
// WebSocket 事件日志
function logWebSocketEvent(event, ws, data = {}) {
const clientInfo = {
remoteAddress: ws._socket?.remoteAddress,
userAgent: ws.upgradeReq?.headers['user-agent']
};
logger.info('WebSocket Event', {
event,
clientInfo,
data,
timestamp: new Date().toISOString()
});
}
// 集成日志记录
wss.on('connection', (ws, req) => {
logWebSocketEvent('connection', ws, {
url: req.url,
headers: req.headers
});
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
performanceMonitor.incrementMessageReceived();
logWebSocketEvent('message', ws, {
type: message.type,
size: data.length
});
} catch (error) {
performanceMonitor.incrementMessageFailed();
logger.error('Message parsing failed', {
error: error.message,
data: data.toString()
});
}
});
ws.on('close', (code, reason) => {
logWebSocketEvent('close', ws, {
code,
reason: reason.toString()
});
});
ws.on('error', (error) => {
performanceMonitor.incrementMessageFailed();
logger.error('WebSocket error', {
error: error.message,
stack: error.stack
});
});
});
🚀 实际应用示例
完整的实时聊天应用
javascript
// 完整的 WebSocket 聊天服务器
const WebSocket = require('ws');
const express = require('express');
const http = require('http');
const path = require('path');
// 创建 Express 应用
const app = express();
const server = http.createServer(app);
// 静态文件服务
app.use(express.static(path.join(__dirname, 'public')));
app.use(express.json());
// 创建 WebSocket 服务器
const wss = new WebSocket.Server({
server,
path: '/chat'
});
// 初始化组件
const connectionPool = new ConnectionPool(1000);
const roomManager = new RoomManager();
const authManager = new AuthManager(process.env.JWT_SECRET || 'secret');
const performanceMonitor = new PerformanceMonitor();
const messageBatcher = new MessageBatcher(20, 100);
// WebSocket 连接处理
wss.on('connection', (ws, req) => {
try {
connectionPool.addConnection(ws);
logWebSocketEvent('connection', ws);
} catch (error) {
logger.error('Connection rejected', { error: error.message });
ws.close(1013, 'Server overloaded');
return;
}
let isAuthenticated = false;
let userInfo = null;
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
message: '欢迎连接到聊天服务器',
serverTime: new Date().toISOString()
}));
ws.on('message', async (data) => {
try {
const message = JSON.parse(data);
performanceMonitor.incrementMessageReceived();
// 认证处理
if (message.type === 'auth') {
try {
userInfo = authManager.authenticateClient(ws, message.token);
isAuthenticated = true;
ws.send(JSON.stringify({
type: 'auth-success',
user: {
id: userInfo.id,
username: userInfo.username,
roles: userInfo.roles
}
}));
logger.info('User authenticated', {
userId: userInfo.id,
username: userInfo.username
});
} catch (error) {
ws.send(JSON.stringify({
type: 'auth-failed',
message: error.message
}));
ws.close(1008, 'Authentication failed');
}
return;
}
// 需要认证的操作
if (!isAuthenticated) {
ws.send(JSON.stringify({
type: 'error',
message: '请先进行身份认证'
}));
return;
}
// 处理认证后的消息
await handleAuthenticatedMessage(ws, message, userInfo);
} catch (error) {
performanceMonitor.incrementMessageFailed();
logger.error('Message handling failed', { error: error.message });
ws.send(JSON.stringify({
type: 'error',
message: '消息处理失败'
}));
}
});
ws.on('close', (code, reason) => {
if (isAuthenticated) {
roomManager.leaveAllRooms(ws);
authManager.removeClient(ws);
}
logWebSocketEvent('close', ws, { code, reason: reason.toString() });
});
ws.on('error', (error) => {
logger.error('WebSocket error', { error: error.message });
});
});
// 认证后的消息处理
async function handleAuthenticatedMessage(ws, message, userInfo) {
switch (message.type) {
case 'join-room':
roomManager.joinRoom(ws, message.roomId);
// 发送历史消息
if (messageQueue) {
const history = await messageQueue.getMessageHistory(message.roomId, 50);
ws.send(JSON.stringify({
type: 'message-history',
roomId: message.roomId,
messages: history
}));
}
break;
case 'leave-room':
roomManager.leaveRoom(ws, message.roomId);
break;
case 'chat-message':
const chatMessage = {
type: 'chat-message',
roomId: message.roomId,
user: {
id: userInfo.id,
username: userInfo.username
},
content: message.content,
timestamp: new Date().toISOString()
};
// 存储消息
if (messageQueue) {
await messageQueue.storeMessage(message.roomId, chatMessage);
}
// 广播消息
roomManager.broadcastToRoom(message.roomId, chatMessage);
performanceMonitor.incrementMessageSent();
break;
case 'get-rooms':
ws.send(JSON.stringify({
type: 'rooms-list',
rooms: roomManager.getAllRooms()
}));
break;
case 'get-online-users':
ws.send(JSON.stringify({
type: 'online-users',
users: authManager.getOnlineUsers()
}));
break;
default:
ws.send(JSON.stringify({
type: 'error',
message: '未知消息类型'
}));
}
}
// API 端点
app.post('/api/auth', (req, res) => {
// 简单的认证逻辑(实际应用中应该连接真实的用户系统)
const { username, password } = req.body;
if (username && password) {
const token = jwt.sign(
{
userId: Date.now().toString(),
username,
roles: ['user']
},
process.env.JWT_SECRET || 'secret',
{ expiresIn: '24h' }
);
res.json({ token });
} else {
res.status(400).json({ error: '用户名和密码是必需的' });
}
});
// 启动服务器
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
console.log(`聊天服务器运行在端口 ${PORT}`);
console.log(`WebSocket 端点: ws://localhost:${PORT}/chat`);
console.log(`监控端点: http://localhost:${PORT}/health`);
});
// 优雅关闭
process.on('SIGTERM', () => {
console.log('收到 SIGTERM,正在关闭服务器...');
wss.clients.forEach(ws => {
ws.close(1001, 'Server shutdown');
});
server.close(() => {
console.log('服务器已关闭');
process.exit(0);
});
});
📚 最佳实践总结
- 连接管理:实现连接池和限制,防止资源耗尽
- 认证授权:使用 JWT 等标准方式进行身份验证
- 错误处理:优雅处理连接错误和消息解析错误
- 性能优化:使用消息批处理、压缩等技术
- 监控日志:实现全面的性能监控和日志记录
- 扩展性:使用 Redis 等外部存储支持多实例部署
- 安全性:验证输入数据,防止 XSS 和注入攻击
通过掌握这些 WebSocket 技术,您将能够构建高性能、可扩展的实时应用程序。