服务集成测试
📋 概述
服务集成测试专注于验证微服务架构中不同服务之间的交互和协作。在Node.js微服务生态中,服务集成测试确保服务间API调用、消息传递、数据一致性、事务处理等关键集成点正常工作,是构建可靠分布式系统的重要保障。
🎯 学习目标
- 理解微服务集成测试的核心概念和挑战
- 掌握服务间通信测试的设计和实施
- 学会契约测试、消息队列测试等关键技术
- 了解分布式系统测试的最佳实践和工具
🏗️ 微服务集成测试架构
服务集成测试分类
mermaid
graph TB
A[服务集成测试类型] --> B[同步通信测试<br/>Synchronous Communication]
A --> C[异步通信测试<br/>Asynchronous Communication]
A --> D[数据一致性测试<br/>Data Consistency]
A --> E[服务发现测试<br/>Service Discovery]
B --> B1[REST API调用<br/>GraphQL查询<br/>gRPC通信<br/>服务间认证]
C --> C1[消息队列<br/>事件驱动<br/>发布订阅<br/>流处理]
D --> D1[分布式事务<br/>最终一致性<br/>数据同步<br/>补偿机制]
E --> E1[服务注册<br/>负载均衡<br/>健康检查<br/>配置管理]
style B fill:#e1f5fe
style C fill:#f3e5f5
style D fill:#e8f5e8
style E fill:#fff3e0
微服务测试策略
javascript
const MicroserviceTestStrategy = {
ISOLATION_LEVELS: {
unitLevel: {
scope: '单个服务内部逻辑',
dependencies: '全部Mock',
speed: 'fastest',
confidence: 'low',
maintenance: 'low'
},
serviceLevel: {
scope: '单个服务完整功能',
dependencies: '外部依赖Mock,内部真实',
speed: 'fast',
confidence: 'medium',
maintenance: 'medium'
},
integrationLevel: {
scope: '服务间交互',
dependencies: '关键服务真实,其他Mock',
speed: 'medium',
confidence: 'high',
maintenance: 'high'
},
systemLevel: {
scope: '完整系统',
dependencies: '所有服务真实',
speed: 'slow',
confidence: 'highest',
maintenance: 'highest'
}
},
TESTING_PATTERNS: {
contractTesting: {
purpose: '验证API契约一致性',
tools: ['Pact', 'Spring Cloud Contract', 'OpenAPI'],
benefits: ['早期发现接口变更', '独立开发验证', '向后兼容性保证']
},
stubTesting: {
purpose: '模拟外部服务行为',
tools: ['WireMock', 'MockServer', 'JSON Server'],
benefits: ['快速反馈', '控制测试场景', '避免外部依赖']
},
testContainers: {
purpose: '轻量级集成环境',
tools: ['Testcontainers', 'Docker Compose', 'Kind'],
benefits: ['真实环境', '隔离性好', '可重复性强']
}
}
};
🤝 契约测试实现
Pact契约测试
javascript
// pact-setup.js
const { Pact } = require('@pact-foundation/pact');
const path = require('path');
class PactTestSetup {
constructor() {
this.providers = new Map();
this.consumers = new Map();
}
// 消费者端设置
setupConsumer(consumerName, providerName, options = {}) {
const defaultOptions = {
consumer: consumerName,
provider: providerName,
port: options.port || 1234,
log: path.resolve(process.cwd(), 'logs', 'pact.log'),
dir: path.resolve(process.cwd(), 'pacts'),
logLevel: 'INFO',
spec: 2
};
const pact = new Pact({ ...defaultOptions, ...options });
this.consumers.set(`${consumerName}-${providerName}`, pact);
return pact;
}
// 获取消费者Pact实例
getConsumerPact(consumerName, providerName) {
return this.consumers.get(`${consumerName}-${providerName}`);
}
// 清理所有Pact实例
async cleanup() {
for (const [key, pact] of this.consumers) {
try {
await pact.finalize();
console.log(`✅ Pact finalized: ${key}`);
} catch (error) {
console.warn(`⚠️ Failed to finalize pact ${key}:`, error.message);
}
}
this.consumers.clear();
this.providers.clear();
}
}
module.exports = new PactTestSetup();
消费者契约测试
javascript
// tests/contract/user-service-consumer.test.js
const { Matchers } = require('@pact-foundation/pact');
const pactSetup = require('./pact-setup');
const UserApiClient = require('../../src/clients/user-api-client');
describe('User Service Consumer Contract', () => {
let pact;
let userApiClient;
beforeAll(async () => {
pact = pactSetup.setupConsumer('order-service', 'user-service', {
port: 1234
});
await pact.setup();
userApiClient = new UserApiClient('http://localhost:1234');
});
afterAll(async () => {
await pact.finalize();
});
afterEach(async () => {
await pact.verify();
});
describe('获取用户信息', () => {
it('应该返回有效用户数据', async () => {
// 定义期望的交互
const expectedInteraction = {
state: 'user with ID 123 exists',
uponReceiving: 'a request for user 123',
withRequest: {
method: 'GET',
path: '/api/users/123',
headers: {
'Accept': 'application/json',
'Authorization': Matchers.like('Bearer token123')
}
},
willRespondWith: {
status: 200,
headers: {
'Content-Type': 'application/json'
},
body: {
id: Matchers.like(123),
name: Matchers.like('John Doe'),
email: Matchers.like('john@example.com'),
createdAt: Matchers.iso8601DateTime(),
profile: {
avatar: Matchers.like('https://example.com/avatar.jpg'),
bio: Matchers.like('Software developer')
}
}
}
};
await pact.addInteraction(expectedInteraction);
// 执行实际的API调用
const user = await userApiClient.getUser(123, 'Bearer token123');
// 验证返回的数据结构
expect(user).toMatchObject({
id: expect.any(Number),
name: expect.any(String),
email: expect.any(String),
createdAt: expect.any(String),
profile: {
avatar: expect.any(String),
bio: expect.any(String)
}
});
expect(user.id).toBe(123);
expect(user.name).toBe('John Doe');
});
it('应该处理用户不存在的情况', async () => {
const expectedInteraction = {
state: 'user with ID 999 does not exist',
uponReceiving: 'a request for non-existent user 999',
withRequest: {
method: 'GET',
path: '/api/users/999',
headers: {
'Accept': 'application/json',
'Authorization': Matchers.like('Bearer token123')
}
},
willRespondWith: {
status: 404,
headers: {
'Content-Type': 'application/json'
},
body: {
error: 'User not found',
code: 'USER_NOT_FOUND'
}
}
};
await pact.addInteraction(expectedInteraction);
await expect(userApiClient.getUser(999, 'Bearer token123'))
.rejects
.toThrow('User not found');
});
it('应该处理认证失败', async () => {
const expectedInteraction = {
state: 'invalid authentication token',
uponReceiving: 'a request with invalid token',
withRequest: {
method: 'GET',
path: '/api/users/123',
headers: {
'Accept': 'application/json',
'Authorization': 'Bearer invalid-token'
}
},
willRespondWith: {
status: 401,
headers: {
'Content-Type': 'application/json'
},
body: {
error: 'Unauthorized',
code: 'INVALID_TOKEN'
}
}
};
await pact.addInteraction(expectedInteraction);
await expect(userApiClient.getUser(123, 'Bearer invalid-token'))
.rejects
.toThrow('Unauthorized');
});
});
describe('创建用户', () => {
it('应该成功创建新用户', async () => {
const newUserData = {
name: 'Jane Smith',
email: 'jane@example.com',
password: 'securepassword'
};
const expectedInteraction = {
state: 'create user is available',
uponReceiving: 'a request to create a new user',
withRequest: {
method: 'POST',
path: '/api/users',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': Matchers.like('Bearer token123')
},
body: {
name: Matchers.like(newUserData.name),
email: Matchers.like(newUserData.email),
password: Matchers.like(newUserData.password)
}
},
willRespondWith: {
status: 201,
headers: {
'Content-Type': 'application/json'
},
body: {
id: Matchers.like(456),
name: newUserData.name,
email: newUserData.email,
createdAt: Matchers.iso8601DateTime()
}
}
};
await pact.addInteraction(expectedInteraction);
const createdUser = await userApiClient.createUser(newUserData, 'Bearer token123');
expect(createdUser).toMatchObject({
id: expect.any(Number),
name: newUserData.name,
email: newUserData.email,
createdAt: expect.any(String)
});
});
});
});
提供者契约验证
javascript
// tests/contract/user-service-provider.test.js
const { Verifier } = require('@pact-foundation/pact');
const path = require('path');
const app = require('../../src/app');
const DatabaseSetup = require('../helpers/database-setup');
describe('User Service Provider Contract Verification', () => {
let server;
let dbSetup;
beforeAll(async () => {
// 设置测试数据库
dbSetup = new DatabaseSetup();
await dbSetup.setup();
// 启动应用服务器
server = app.listen(3000);
console.log('Provider server started on port 3000');
});
afterAll(async () => {
if (server) {
server.close();
}
await dbSetup.cleanup();
});
it('验证与order-service的契约', async () => {
const opts = {
provider: 'user-service',
providerBaseUrl: 'http://localhost:3000',
// Pact文件位置
pactUrls: [
path.resolve(__dirname, '../../pacts/order-service-user-service.json')
],
// 状态处理器
stateHandlers: {
'user with ID 123 exists': async () => {
// 创建测试用户数据
await dbSetup.createUser({
id: 123,
name: 'John Doe',
email: 'john@example.com',
profile: {
avatar: 'https://example.com/avatar.jpg',
bio: 'Software developer'
}
});
},
'user with ID 999 does not exist': async () => {
// 确保用户不存在
await dbSetup.deleteUser(999);
},
'invalid authentication token': async () => {
// 不需要特殊处理,应用会自动验证token
},
'create user is available': async () => {
// 确保系统可以创建用户
await dbSetup.clearUsers();
}
},
// 请求过滤器(添加认证等)
requestFilter: (req, res, next) => {
// 为测试请求添加有效的认证token
if (req.headers.authorization && req.headers.authorization.includes('token123')) {
req.user = { id: 1, role: 'user' };
}
next();
},
// 日志配置
logLevel: 'INFO',
// 发布验证结果到Pact Broker
publishVerificationResult: process.env.CI === 'true',
providerVersion: process.env.GIT_COMMIT || '1.0.0'
};
const verifier = new Verifier(opts);
return verifier.verifyProvider();
});
});
📨 消息队列集成测试
RabbitMQ集成测试
javascript
// tests/integration/message-queue.test.js
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');
const OrderService = require('../../src/services/order-service');
const PaymentService = require('../../src/services/payment-service');
describe('Message Queue Integration Tests', () => {
let connection;
let channel;
let orderService;
let paymentService;
beforeAll(async () => {
// 连接到测试RabbitMQ实例
connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost');
channel = await connection.createChannel();
// 声明测试队列和交换机
await setupTestQueues();
orderService = new OrderService(channel);
paymentService = new PaymentService(channel);
});
afterAll(async () => {
if (connection) {
await connection.close();
}
});
beforeEach(async () => {
// 清空队列
await channel.purgeQueue('order.created');
await channel.purgeQueue('payment.processed');
await channel.purgeQueue('order.updated');
});
async function setupTestQueues() {
// 声明交换机
await channel.assertExchange('order-events', 'topic', { durable: false });
await channel.assertExchange('payment-events', 'topic', { durable: false });
// 声明队列
await channel.assertQueue('order.created', { durable: false });
await channel.assertQueue('payment.processed', { durable: false });
await channel.assertQueue('order.updated', { durable: false });
// 绑定队列到交换机
await channel.bindQueue('order.created', 'order-events', 'order.created');
await channel.bindQueue('payment.processed', 'payment-events', 'payment.processed');
await channel.bindQueue('order.updated', 'order-events', 'order.updated');
}
describe('订单创建流程', () => {
it('应该在订单创建后发送消息', async () => {
const orderData = {
userId: 123,
items: [
{ productId: 'prod-1', quantity: 2, price: 29.99 },
{ productId: 'prod-2', quantity: 1, price: 49.99 }
],
totalAmount: 109.97
};
// 监听订单创建事件
const messagePromise = new Promise((resolve) => {
channel.consume('order.created', (msg) => {
if (msg) {
const content = JSON.parse(msg.content.toString());
channel.ack(msg);
resolve(content);
}
});
});
// 创建订单
const order = await orderService.createOrder(orderData);
// 验证消息被发送
const receivedMessage = await messagePromise;
expect(receivedMessage).toMatchObject({
orderId: order.id,
userId: orderData.userId,
totalAmount: orderData.totalAmount,
status: 'pending'
});
expect(receivedMessage.createdAt).toBeDefined();
expect(receivedMessage.eventType).toBe('ORDER_CREATED');
});
it('应该处理支付完成事件', async () => {
// 先创建一个订单
const order = await orderService.createOrder({
userId: 456,
items: [{ productId: 'prod-3', quantity: 1, price: 99.99 }],
totalAmount: 99.99
});
// 监听订单更新事件
const orderUpdatePromise = new Promise((resolve) => {
channel.consume('order.updated', (msg) => {
if (msg) {
const content = JSON.parse(msg.content.toString());
channel.ack(msg);
resolve(content);
}
});
});
// 模拟支付完成事件
const paymentEvent = {
orderId: order.id,
paymentId: uuidv4(),
amount: 99.99,
status: 'completed',
processedAt: new Date().toISOString()
};
// 发送支付完成消息
await channel.publish(
'payment-events',
'payment.processed',
Buffer.from(JSON.stringify(paymentEvent)),
{ messageId: uuidv4() }
);
// 等待订单服务处理支付事件并更新订单
const orderUpdateEvent = await orderUpdatePromise;
expect(orderUpdateEvent).toMatchObject({
orderId: order.id,
status: 'paid',
paymentId: paymentEvent.paymentId
});
});
});
describe('消息处理可靠性', () => {
it('应该处理重复消息', async () => {
const messageId = uuidv4();
const orderData = {
userId: 789,
items: [{ productId: 'prod-4', quantity: 1, price: 19.99 }],
totalAmount: 19.99
};
// 发送相同的消息两次
const message = {
messageId,
eventType: 'ORDER_CREATION_REQUEST',
data: orderData
};
await channel.sendToQueue(
'order.created',
Buffer.from(JSON.stringify(message)),
{ messageId }
);
await channel.sendToQueue(
'order.created',
Buffer.from(JSON.stringify(message)),
{ messageId }
);
// 等待处理
await new Promise(resolve => setTimeout(resolve, 1000));
// 验证只创建了一个订单(幂等性)
const orders = await orderService.getOrdersByUser(789);
expect(orders).toHaveLength(1);
});
it('应该处理消息处理失败并重试', async () => {
let attemptCount = 0;
// Mock订单服务使其前两次失败
const originalCreateOrder = orderService.createOrder;
orderService.createOrder = jest.fn().mockImplementation(async (data) => {
attemptCount++;
if (attemptCount < 3) {
throw new Error('Temporary failure');
}
return originalCreateOrder.call(orderService, data);
});
const orderData = {
userId: 999,
items: [{ productId: 'prod-5', quantity: 1, price: 39.99 }],
totalAmount: 39.99
};
// 设置消息重试处理
const message = {
messageId: uuidv4(),
eventType: 'ORDER_CREATION_REQUEST',
data: orderData,
retryCount: 0
};
await channel.sendToQueue(
'order.created',
Buffer.from(JSON.stringify(message)),
{
messageId: message.messageId,
headers: { 'x-retry-count': '0' }
}
);
// 等待重试完成
await new Promise(resolve => setTimeout(resolve, 5000));
// 验证最终创建成功
const orders = await orderService.getOrdersByUser(999);
expect(orders).toHaveLength(1);
expect(attemptCount).toBe(3);
// 恢复原始方法
orderService.createOrder = originalCreateOrder;
});
});
describe('消息路由测试', () => {
it('应该正确路由不同类型的消息', async () => {
const messages = [];
// 设置消息监听
const setupListener = (queue, handler) => {
channel.consume(queue, (msg) => {
if (msg) {
const content = JSON.parse(msg.content.toString());
messages.push({ queue, content });
channel.ack(msg);
handler(content);
}
});
};
setupListener('order.created', () => {});
setupListener('payment.processed', () => {});
setupListener('order.updated', () => {});
// 发送不同类型的事件
const events = [
{
exchange: 'order-events',
routingKey: 'order.created',
data: { orderId: '001', type: 'order_created' }
},
{
exchange: 'payment-events',
routingKey: 'payment.processed',
data: { paymentId: '002', type: 'payment_processed' }
},
{
exchange: 'order-events',
routingKey: 'order.updated',
data: { orderId: '003', type: 'order_updated' }
}
];
// 发送所有事件
for (const event of events) {
await channel.publish(
event.exchange,
event.routingKey,
Buffer.from(JSON.stringify(event.data)),
{ messageId: uuidv4() }
);
}
// 等待消息处理
await new Promise(resolve => setTimeout(resolve, 1000));
// 验证消息路由正确
expect(messages).toHaveLength(3);
expect(messages.find(m => m.queue === 'order.created')).toBeDefined();
expect(messages.find(m => m.queue === 'payment.processed')).toBeDefined();
expect(messages.find(m => m.queue === 'order.updated')).toBeDefined();
});
});
});
Redis Pub/Sub集成测试
javascript
// tests/integration/redis-pubsub.test.js
const redis = require('redis');
const NotificationService = require('../../src/services/notification-service');
const ChatService = require('../../src/services/chat-service');
describe('Redis Pub/Sub Integration Tests', () => {
let publisher;
let subscriber;
let notificationService;
let chatService;
beforeAll(async () => {
publisher = redis.createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379'
});
subscriber = redis.createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379'
});
await publisher.connect();
await subscriber.connect();
notificationService = new NotificationService(publisher);
chatService = new ChatService(publisher, subscriber);
});
afterAll(async () => {
await publisher.quit();
await subscriber.quit();
});
beforeEach(async () => {
// 清理订阅
await subscriber.unsubscribe();
await subscriber.pUnsubscribe();
});
describe('实时通知系统', () => {
it('应该发布和接收用户通知', async () => {
const receivedNotifications = [];
// 订阅用户通知频道
await subscriber.subscribe('user:123:notifications', (message) => {
receivedNotifications.push(JSON.parse(message));
});
// 发送通知
const notification = {
id: 'notif-001',
userId: 123,
type: 'order_update',
title: '订单状态更新',
message: '您的订单已发货',
timestamp: new Date().toISOString()
};
await notificationService.sendUserNotification(123, notification);
// 等待消息传递
await new Promise(resolve => setTimeout(resolve, 100));
expect(receivedNotifications).toHaveLength(1);
expect(receivedNotifications[0]).toMatchObject(notification);
});
it('应该支持频道模式匹配', async () => {
const receivedMessages = [];
// 订阅所有订单相关频道
await subscriber.pSubscribe('order:*:updates', (message, channel) => {
receivedMessages.push({
channel,
message: JSON.parse(message)
});
});
// 发送不同订单的更新
const orders = ['order-001', 'order-002', 'order-003'];
for (const orderId of orders) {
const update = {
orderId,
status: 'shipped',
updatedAt: new Date().toISOString()
};
await publisher.publish(
`order:${orderId}:updates`,
JSON.stringify(update)
);
}
// 等待消息传递
await new Promise(resolve => setTimeout(resolve, 200));
expect(receivedMessages).toHaveLength(3);
orders.forEach((orderId, index) => {
expect(receivedMessages[index].channel).toBe(`order:${orderId}:updates`);
expect(receivedMessages[index].message.orderId).toBe(orderId);
});
});
});
describe('实时聊天系统', () => {
it('应该处理聊天室消息', async () => {
const chatMessages = [];
const roomId = 'room-123';
// 用户加入聊天室
await subscriber.subscribe(`chat:${roomId}`, (message) => {
chatMessages.push(JSON.parse(message));
});
// 发送聊天消息
const messages = [
{
id: 'msg-001',
roomId,
userId: 'user-1',
username: 'Alice',
content: 'Hello everyone!',
timestamp: new Date().toISOString()
},
{
id: 'msg-002',
roomId,
userId: 'user-2',
username: 'Bob',
content: 'Hi Alice!',
timestamp: new Date().toISOString()
}
];
for (const message of messages) {
await chatService.sendMessage(roomId, message);
}
// 等待消息传递
await new Promise(resolve => setTimeout(resolve, 100));
expect(chatMessages).toHaveLength(2);
expect(chatMessages[0].content).toBe('Hello everyone!');
expect(chatMessages[1].content).toBe('Hi Alice!');
});
it('应该处理用户状态更新', async () => {
const statusUpdates = [];
const roomId = 'room-456';
// 订阅用户状态更新
await subscriber.subscribe(`chat:${roomId}:status`, (message) => {
statusUpdates.push(JSON.parse(message));
});
// 模拟用户状态变化
const statusEvents = [
{ userId: 'user-1', status: 'online', timestamp: new Date().toISOString() },
{ userId: 'user-2', status: 'typing', timestamp: new Date().toISOString() },
{ userId: 'user-1', status: 'idle', timestamp: new Date().toISOString() }
];
for (const event of statusEvents) {
await chatService.updateUserStatus(roomId, event);
}
// 等待状态更新
await new Promise(resolve => setTimeout(resolve, 100));
expect(statusUpdates).toHaveLength(3);
expect(statusUpdates[0].status).toBe('online');
expect(statusUpdates[1].status).toBe('typing');
expect(statusUpdates[2].status).toBe('idle');
});
});
describe('分布式缓存失效', () => {
it('应该通知所有实例缓存失效', async () => {
const cacheInvalidations = [];
// 模拟多个服务实例监听缓存失效
await subscriber.subscribe('cache:invalidate', (message) => {
cacheInvalidations.push(JSON.parse(message));
});
// 触发缓存失效
const invalidationEvent = {
key: 'user:123:profile',
reason: 'user_profile_updated',
timestamp: new Date().toISOString()
};
await publisher.publish(
'cache:invalidate',
JSON.stringify(invalidationEvent)
);
// 等待失效通知
await new Promise(resolve => setTimeout(resolve, 50));
expect(cacheInvalidations).toHaveLength(1);
expect(cacheInvalidations[0]).toMatchObject(invalidationEvent);
});
});
});
🔄 分布式事务测试
Saga模式测试
javascript
// tests/integration/saga-pattern.test.js
const SagaOrchestrator = require('../../src/saga/saga-orchestrator');
const OrderService = require('../../src/services/order-service');
const PaymentService = require('../../src/services/payment-service');
const InventoryService = require('../../src/services/inventory-service');
const ShippingService = require('../../src/services/shipping-service');
describe('Saga Pattern Integration Tests', () => {
let sagaOrchestrator;
let orderService;
let paymentService;
let inventoryService;
let shippingService;
beforeEach(async () => {
// 初始化服务(使用真实实现或测试替身)
orderService = new OrderService();
paymentService = new PaymentService();
inventoryService = new InventoryService();
shippingService = new ShippingService();
sagaOrchestrator = new SagaOrchestrator({
orderService,
paymentService,
inventoryService,
shippingService
});
// 清理测试数据
await orderService.clearAll();
await inventoryService.resetStock();
});
describe('成功的订单处理流程', () => {
it('应该完成完整的订单处理Saga', async () => {
// 准备测试数据
const orderData = {
userId: 123,
items: [
{ productId: 'prod-1', quantity: 2, price: 50.00 },
{ productId: 'prod-2', quantity: 1, price: 30.00 }
],
totalAmount: 130.00,
shippingAddress: {
street: '123 Main St',
city: 'Springfield',
zipCode: '12345'
}
};
// 设置库存
await inventoryService.setStock('prod-1', 10);
await inventoryService.setStock('prod-2', 5);
// 执行订单处理Saga
const sagaResult = await sagaOrchestrator.processOrder(orderData);
expect(sagaResult.status).toBe('completed');
expect(sagaResult.orderId).toBeDefined();
// 验证各个步骤的执行结果
const order = await orderService.getOrder(sagaResult.orderId);
expect(order.status).toBe('confirmed');
// 验证库存已扣减
const stock1 = await inventoryService.getStock('prod-1');
const stock2 = await inventoryService.getStock('prod-2');
expect(stock1).toBe(8); // 10 - 2
expect(stock2).toBe(4); // 5 - 1
// 验证支付已处理
const payment = await paymentService.getPaymentByOrderId(sagaResult.orderId);
expect(payment.status).toBe('completed');
// 验证运输已安排
const shipping = await shippingService.getShippingByOrderId(sagaResult.orderId);
expect(shipping.status).toBe('scheduled');
});
});
describe('Saga补偿机制', () => {
it('应该在支付失败时执行补偿操作', async () => {
const orderData = {
userId: 456,
items: [{ productId: 'prod-3', quantity: 1, price: 100.00 }],
totalAmount: 100.00
};
// 设置库存
await inventoryService.setStock('prod-3', 5);
// Mock支付服务使其失败
jest.spyOn(paymentService, 'processPayment')
.mockRejectedValue(new Error('Payment gateway timeout'));
// 执行Saga
const sagaResult = await sagaOrchestrator.processOrder(orderData);
expect(sagaResult.status).toBe('failed');
expect(sagaResult.error).toContain('Payment gateway timeout');
// 验证补偿操作已执行
const order = await orderService.getOrder(sagaResult.orderId);
expect(order.status).toBe('cancelled');
// 验证库存已恢复
const stock = await inventoryService.getStock('prod-3');
expect(stock).toBe(5); // 库存应该恢复
// 验证没有创建支付记录或已标记为失败
const payment = await paymentService.getPaymentByOrderId(sagaResult.orderId);
expect(payment).toBeNull();
});
it('应该在库存不足时快速失败', async () => {
const orderData = {
userId: 789,
items: [{ productId: 'prod-4', quantity: 10, price: 25.00 }],
totalAmount: 250.00
};
// 设置不足的库存
await inventoryService.setStock('prod-4', 5);
// 执行Saga
const sagaResult = await sagaOrchestrator.processOrder(orderData);
expect(sagaResult.status).toBe('failed');
expect(sagaResult.error).toContain('Insufficient stock');
// 验证订单被标记为失败
const order = await orderService.getOrder(sagaResult.orderId);
expect(order.status).toBe('failed');
// 验证没有进行支付处理
const payment = await paymentService.getPaymentByOrderId(sagaResult.orderId);
expect(payment).toBeNull();
});
it('应该处理部分补偿失败的情况', async () => {
const orderData = {
userId: 999,
items: [{ productId: 'prod-5', quantity: 3, price: 40.00 }],
totalAmount: 120.00
};
await inventoryService.setStock('prod-5', 10);
// Mock运输服务失败
jest.spyOn(shippingService, 'scheduleShipping')
.mockRejectedValue(new Error('Shipping service unavailable'));
// Mock库存恢复也失败
jest.spyOn(inventoryService, 'restoreStock')
.mockRejectedValue(new Error('Inventory service error'));
const sagaResult = await sagaOrchestrator.processOrder(orderData);
expect(sagaResult.status).toBe('failed');
// 验证Saga记录了补偿失败
expect(sagaResult.compensationFailures).toBeDefined();
expect(sagaResult.compensationFailures.length).toBeGreaterThan(0);
// 验证这种情况被标记需要人工干预
expect(sagaResult.requiresManualIntervention).toBe(true);
});
});
describe('Saga状态恢复', () => {
it('应该能够从中断点恢复Saga执行', async () => {
const orderData = {
userId: 111,
items: [{ productId: 'prod-6', quantity: 1, price: 75.00 }],
totalAmount: 75.00
};
await inventoryService.setStock('prod-6', 3);
// 模拟在支付步骤中断
let paymentCallCount = 0;
jest.spyOn(paymentService, 'processPayment')
.mockImplementation(async () => {
paymentCallCount++;
if (paymentCallCount === 1) {
throw new Error('Network timeout'); // 第一次失败
}
return { paymentId: 'pay-123', status: 'completed' };
});
// 第一次执行Saga(失败)
const firstResult = await sagaOrchestrator.processOrder(orderData);
expect(firstResult.status).toBe('failed');
// 验证Saga状态被保存
const sagaState = await sagaOrchestrator.getSagaState(firstResult.sagaId);
expect(sagaState).toBeDefined();
expect(sagaState.currentStep).toBe('payment');
// 恢复Saga执行
const resumeResult = await sagaOrchestrator.resumeSaga(firstResult.sagaId);
expect(resumeResult.status).toBe('completed');
// 验证最终状态正确
const order = await orderService.getOrder(resumeResult.orderId);
expect(order.status).toBe('confirmed');
});
});
});
🔍 服务发现和负载均衡测试
服务注册发现测试
javascript
// tests/integration/service-discovery.test.js
const ConsulClient = require('consul');
const ServiceRegistry = require('../../src/services/service-registry');
const ApiGateway = require('../../src/gateway/api-gateway');
describe('Service Discovery Integration Tests', () => {
let consul;
let serviceRegistry;
let apiGateway;
beforeAll(async () => {
consul = new ConsulClient({
host: process.env.CONSUL_HOST || 'localhost',
port: process.env.CONSUL_PORT || 8500
});
serviceRegistry = new ServiceRegistry(consul);
apiGateway = new ApiGateway(serviceRegistry);
});
afterEach(async () => {
// 清理注册的服务
await serviceRegistry.deregisterAll();
});
describe('服务注册和发现', () => {
it('应该成功注册和发现服务', async () => {
const serviceInfo = {
name: 'user-service',
id: 'user-service-1',
address: '192.168.1.100',
port: 3001,
tags: ['api', 'user', 'v1'],
check: {
http: 'http://192.168.1.100:3001/health',
interval: '10s'
}
};
// 注册服务
await serviceRegistry.register(serviceInfo);
// 发现服务
const services = await serviceRegistry.discover('user-service');
expect(services).toHaveLength(1);
expect(services[0]).toMatchObject({
id: serviceInfo.id,
address: serviceInfo.address,
port: serviceInfo.port,
tags: serviceInfo.tags
});
});
it('应该支持多实例服务注册', async () => {
const services = [
{
name: 'order-service',
id: 'order-service-1',
address: '192.168.1.101',
port: 3002
},
{
name: 'order-service',
id: 'order-service-2',
address: '192.168.1.102',
port: 3002
},
{
name: 'order-service',
id: 'order-service-3',
address: '192.168.1.103',
port: 3002
}
];
// 注册多个实例
for (const service of services) {
await serviceRegistry.register(service);
}
// 发现所有实例
const discoveredServices = await serviceRegistry.discover('order-service');
expect(discoveredServices).toHaveLength(3);
const addresses = discoveredServices.map(s => s.address);
expect(addresses).toContain('192.168.1.101');
expect(addresses).toContain('192.168.1.102');
expect(addresses).toContain('192.168.1.103');
});
it('应该检测服务健康状态', async () => {
const healthyService = {
name: 'payment-service',
id: 'payment-service-1',
address: '192.168.1.104',
port: 3003,
check: {
http: 'http://192.168.1.104:3003/health',
interval: '5s'
}
};
await serviceRegistry.register(healthyService);
// 等待健康检查
await new Promise(resolve => setTimeout(resolve, 6000));
// 获取健康的服务实例
const healthyServices = await serviceRegistry.getHealthyServices('payment-service');
// 注意:在实际测试中,需要模拟健康检查端点
// 这里假设服务是健康的
expect(healthyServices.length).toBeGreaterThanOrEqual(0);
});
});
describe('负载均衡策略', () => {
beforeEach(async () => {
// 注册多个服务实例
const services = [
{ name: 'api-service', id: 'api-1', address: '10.0.1.1', port: 3000 },
{ name: 'api-service', id: 'api-2', address: '10.0.1.2', port: 3000 },
{ name: 'api-service', id: 'api-3', address: '10.0.1.3', port: 3000 }
];
for (const service of services) {
await serviceRegistry.register(service);
}
});
it('应该实现轮询负载均衡', async () => {
const selectedServices = [];
// 连续选择服务多次
for (let i = 0; i < 6; i++) {
const service = await apiGateway.selectService('api-service', 'round-robin');
selectedServices.push(service.id);
}
// 验证轮询模式
expect(selectedServices).toEqual([
'api-1', 'api-2', 'api-3',
'api-1', 'api-2', 'api-3'
]);
});
it('应该实现随机负载均衡', async () => {
const selectedServices = [];
// 多次随机选择
for (let i = 0; i < 20; i++) {
const service = await apiGateway.selectService('api-service', 'random');
selectedServices.push(service.id);
}
// 验证所有实例都被选择过
const uniqueServices = [...new Set(selectedServices)];
expect(uniqueServices).toContain('api-1');
expect(uniqueServices).toContain('api-2');
expect(uniqueServices).toContain('api-3');
});
it('应该实现加权负载均衡', async () => {
// 更新服务权重
await serviceRegistry.updateServiceWeight('api-1', 1);
await serviceRegistry.updateServiceWeight('api-2', 2);
await serviceRegistry.updateServiceWeight('api-3', 3);
const selectedServices = [];
// 多次选择以观察权重分布
for (let i = 0; i < 60; i++) {
const service = await apiGateway.selectService('api-service', 'weighted');
selectedServices.push(service.id);
}
// 统计各实例被选择的次数
const counts = selectedServices.reduce((acc, id) => {
acc[id] = (acc[id] || 0) + 1;
return acc;
}, {});
// 验证权重分布(允许一定误差)
expect(counts['api-3']).toBeGreaterThan(counts['api-2']);
expect(counts['api-2']).toBeGreaterThan(counts['api-1']);
});
});
describe('服务故障处理', () => {
it('应该自动移除不健康的服务', async () => {
const services = [
{ name: 'data-service', id: 'data-1', address: '10.0.2.1', port: 4000 },
{ name: 'data-service', id: 'data-2', address: '10.0.2.2', port: 4000 }
];
for (const service of services) {
await serviceRegistry.register(service);
}
// 模拟其中一个服务变为不健康
await serviceRegistry.markUnhealthy('data-1');
// 获取健康服务列表
const healthyServices = await serviceRegistry.getHealthyServices('data-service');
expect(healthyServices).toHaveLength(1);
expect(healthyServices[0].id).toBe('data-2');
});
it('应该实现断路器模式', async () => {
const service = {
name: 'external-service',
id: 'external-1',
address: '10.0.3.1',
port: 5000
};
await serviceRegistry.register(service);
// 模拟连续失败
const circuitBreaker = apiGateway.getCircuitBreaker('external-service');
// 触发多次失败
for (let i = 0; i < 5; i++) {
try {
await circuitBreaker.call(async () => {
throw new Error('Service unavailable');
});
} catch (error) {
// 预期的失败
}
}
// 验证断路器已打开
expect(circuitBreaker.isOpen()).toBe(true);
// 尝试调用应该立即失败
await expect(circuitBreaker.call(async () => {
return 'success';
})).rejects.toThrow('Circuit breaker is open');
});
});
});
📝 服务集成测试最佳实践
测试环境管理
javascript
const ServiceIntegrationBestPractices = {
ENVIRONMENT_STRATEGY: {
isolation: [
'使用Docker Compose编排测试环境',
'每个测试套件使用独立的网络',
'服务实例使用随机端口避免冲突',
'测试数据完全隔离'
],
serviceStubs: [
'对外部依赖使用服务桩',
'模拟各种响应场景',
'支持故障注入测试',
'记录和验证交互'
],
dataConsistency: [
'确保测试数据的一致性',
'支持事务性数据清理',
'处理异步数据同步',
'验证最终一致性'
]
},
CONTRACT_TESTING: {
implementation: [
'使用Pact进行消费者驱动的契约测试',
'在CI/CD中集成契约验证',
'版本化API契约',
'向后兼容性保证'
],
maintenance: [
'定期更新契约测试',
'监控契约变更影响',
'自动化契约发布',
'团队间协作机制'
]
},
MONITORING: {
observability: [
'集成分布式追踪',
'监控服务间调用链',
'记录详细的测试日志',
'性能指标收集'
],
alerting: [
'测试失败即时通知',
'服务健康状态监控',
'性能回归检测',
'依赖关系变更提醒'
]
}
};
📝 总结
服务集成测试是微服务架构质量保障的核心:
- 契约驱动:通过契约测试确保API一致性
- 异步通信:验证消息队列、事件驱动架构
- 分布式事务:测试Saga模式、补偿机制
- 服务治理:验证服务发现、负载均衡、熔断机制
通过系统化的服务集成测试,可以确保微服务系统的可靠性和稳定性。