异步模式
🎯 学习目标
- 深入理解Node.js中的各种异步编程模式
- 掌握Promise、async/await的高级用法
- 学会处理复杂的异步流程控制
- 了解异步编程的最佳实践和性能优化
📚 核心概念
异步编程模式概览
javascript
// 异步编程模式概念
const asyncPatterns = {
evolution: {
callbacks: '回调函数 - 最基础的异步模式',
promises: 'Promise - 解决回调地狱问题',
asyncAwait: 'async/await - 同步风格的异步编程',
generators: '生成器 - 可暂停的函数执行',
observables: '观察者模式 - 响应式编程'
},
patterns: {
sequential: '顺序执行 - 一个接一个执行',
parallel: '并行执行 - 同时执行多个任务',
concurrent: '并发执行 - 控制并发数量',
pipeline: '管道执行 - 流水线处理',
conditional: '条件执行 - 基于条件的异步流程',
retry: '重试模式 - 失败重试机制',
timeout: '超时模式 - 设置执行超时',
circuit: '断路器模式 - 故障隔离'
},
challenges: {
errorHandling: '错误处理 - 异步错误的捕获和处理',
memoryLeaks: '内存泄漏 - 未清理的异步操作',
raceConditions: '竞态条件 - 并发访问共享资源',
backpressure: '背压处理 - 生产消费速度不匹配'
}
};
console.log('异步编程模式:', asyncPatterns);
🔄 基础异步模式
Promise高级用法
javascript
// advanced-promise-patterns.js
// Promise工具类
class PromiseUtils {
// 带超时的Promise
static timeout(promise, ms, timeoutError = new Error('Operation timeout')) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(timeoutError), ms)
)
]);
}
// 重试Promise
static async retry(fn, options = {}) {
const {
retries = 3,
delay = 1000,
backoff = 2,
condition = () => true
} = options;
let lastError;
for (let attempt = 0; attempt <= retries; attempt++) {
try {
const result = await fn(attempt);
return result;
} catch (error) {
lastError = error;
if (attempt === retries || !condition(error)) {
throw error;
}
const waitTime = delay * Math.pow(backoff, attempt);
console.log(`重试第 ${attempt + 1} 次,等待 ${waitTime}ms...`);
await new Promise(resolve => setTimeout(resolve, waitTime));
}
}
throw lastError;
}
// 限制并发数的Promise.all
static async concurrentLimit(tasks, limit = 5) {
const results = new Array(tasks.length);
const executing = [];
for (let i = 0; i < tasks.length; i++) {
const task = Promise.resolve(tasks[i]).then(result => {
results[i] = result;
});
executing.push(task);
if (executing.length >= limit) {
await Promise.race(executing);
executing.splice(executing.findIndex(t => t === task), 1);
}
}
await Promise.all(executing);
return results;
}
// Promise队列 - 顺序执行
static async sequence(tasks) {
const results = [];
for (const task of tasks) {
const result = await Promise.resolve(task);
results.push(result);
}
return results;
}
// Promise管道
static pipeline(...functions) {
return (input) => {
return functions.reduce(
(promise, fn) => promise.then(fn),
Promise.resolve(input)
);
};
}
// 条件Promise
static conditional(condition, thenPromise, elsePromise = Promise.resolve()) {
return Promise.resolve(condition).then(result => {
return result ? thenPromise : elsePromise;
});
}
// Promise缓存
static memoize(fn, keyGenerator = (...args) => JSON.stringify(args)) {
const cache = new Map();
return (...args) => {
const key = keyGenerator(...args);
if (cache.has(key)) {
return cache.get(key);
}
const promise = Promise.resolve(fn(...args));
cache.set(key, promise);
// 清理失败的Promise
promise.catch(() => cache.delete(key));
return promise;
};
}
// Promise池
static createPool(factory, options = {}) {
const {
max = 10,
min = 2,
acquireTimeout = 30000,
idleTimeout = 300000
} = options;
const pool = {
available: [],
busy: new Set(),
pending: [],
async acquire() {
if (this.available.length > 0) {
const resource = this.available.pop();
this.busy.add(resource);
return resource;
}
if (this.busy.size < max) {
const resource = await factory();
this.busy.add(resource);
return resource;
}
// 等待资源释放
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
const index = this.pending.findIndex(p => p.resolve === resolve);
if (index > -1) {
this.pending.splice(index, 1);
}
reject(new Error('Acquire timeout'));
}, acquireTimeout);
this.pending.push({ resolve, reject, timeout });
});
},
release(resource) {
this.busy.delete(resource);
if (this.pending.length > 0) {
const { resolve, timeout } = this.pending.shift();
clearTimeout(timeout);
this.busy.add(resource);
resolve(resource);
} else {
this.available.push(resource);
// 设置空闲超时
setTimeout(() => {
const index = this.available.indexOf(resource);
if (index > -1 && this.available.length > min) {
this.available.splice(index, 1);
// 销毁资源的逻辑可以在这里添加
}
}, idleTimeout);
}
},
async destroy() {
// 清理所有资源
this.available.length = 0;
this.busy.clear();
// 拒绝所有等待的请求
this.pending.forEach(({ reject, timeout }) => {
clearTimeout(timeout);
reject(new Error('Pool destroyed'));
});
this.pending.length = 0;
}
};
return pool;
}
}
// 异步迭代器模式
class AsyncIteratorPatterns {
// 异步生成器 - 分页数据
static async* paginatedData(fetchPage, pageSize = 10) {
let page = 1;
let hasMore = true;
while (hasMore) {
try {
const data = await fetchPage(page, pageSize);
if (!data || data.length === 0) {
hasMore = false;
break;
}
for (const item of data) {
yield item;
}
hasMore = data.length === pageSize;
page++;
} catch (error) {
console.error(`获取第 ${page} 页数据失败:`, error);
throw error;
}
}
}
// 异步生成器 - 流式处理
static async* streamProcessor(source, processor) {
for await (const item of source) {
try {
const processed = await processor(item);
if (processed !== undefined) {
yield processed;
}
} catch (error) {
console.error('处理项目失败:', error);
// 可以选择跳过错误项或重新抛出错误
}
}
}
// 异步生成器 - 批量处理
static async* batchProcessor(source, batchSize = 10, processor) {
let batch = [];
for await (const item of source) {
batch.push(item);
if (batch.length >= batchSize) {
try {
const results = await processor(batch);
for (const result of results) {
yield result;
}
} catch (error) {
console.error('批量处理失败:', error);
throw error;
}
batch = [];
}
}
// 处理剩余的批次
if (batch.length > 0) {
try {
const results = await processor(batch);
for (const result of results) {
yield result;
}
} catch (error) {
console.error('最后批次处理失败:', error);
throw error;
}
}
}
}
// 使用示例
async function demonstratePromisePatterns() {
console.log('🔄 Promise高级模式演示...\n');
// 1. 超时Promise
console.log('1. 超时Promise:');
try {
const slowOperation = new Promise(resolve =>
setTimeout(() => resolve('完成'), 2000)
);
const result = await PromiseUtils.timeout(slowOperation, 1000);
console.log(' 结果:', result);
} catch (error) {
console.log(' 超时错误:', error.message);
}
// 2. 重试Promise
console.log('\n2. 重试Promise:');
let attempt = 0;
const unreliableOperation = () => {
attempt++;
if (attempt < 3) {
throw new Error(`尝试 ${attempt} 失败`);
}
return `成功在第 ${attempt} 次尝试`;
};
try {
const result = await PromiseUtils.retry(unreliableOperation, {
retries: 3,
delay: 100
});
console.log(' 重试结果:', result);
} catch (error) {
console.log(' 重试失败:', error.message);
}
// 3. 并发限制
console.log('\n3. 并发限制:');
const tasks = Array.from({ length: 10 }, (_, i) =>
new Promise(resolve =>
setTimeout(() => resolve(`任务 ${i + 1} 完成`), Math.random() * 1000)
)
);
const startTime = Date.now();
const results = await PromiseUtils.concurrentLimit(tasks, 3);
const duration = Date.now() - startTime;
console.log(` 并发执行完成,耗时: ${duration}ms`);
console.log(` 结果数量: ${results.length}`);
// 4. Promise管道
console.log('\n4. Promise管道:');
const pipeline = PromiseUtils.pipeline(
x => x * 2,
async x => {
await new Promise(resolve => setTimeout(resolve, 100));
return x + 10;
},
x => x.toString()
);
const pipelineResult = await pipeline(5);
console.log(' 管道结果:', pipelineResult);
// 5. 异步迭代器
console.log('\n5. 异步迭代器:');
// 模拟分页数据获取
const fetchPage = async (page, pageSize) => {
await new Promise(resolve => setTimeout(resolve, 100));
if (page > 3) return []; // 模拟没有更多数据
return Array.from({ length: pageSize }, (_, i) =>
`页面${page}-项目${i + 1}`
);
};
let count = 0;
for await (const item of AsyncIteratorPatterns.paginatedData(fetchPage, 3)) {
count++;
if (count <= 5) {
console.log(` 分页数据: ${item}`);
}
}
console.log(` 总共处理了 ${count} 项数据`);
}
module.exports = {
PromiseUtils,
AsyncIteratorPatterns,
demonstratePromisePatterns
};
异步流程控制
javascript
// async-flow-control.js
// 异步流程控制器
class AsyncFlowController {
constructor() {
this.tasks = new Map();
this.results = new Map();
this.dependencies = new Map();
}
// 添加任务
addTask(name, fn, dependencies = []) {
this.tasks.set(name, fn);
this.dependencies.set(name, dependencies);
return this;
}
// 执行所有任务(拓扑排序)
async executeAll() {
const completed = new Set();
const executing = new Map();
const results = new Map();
const canExecute = (taskName) => {
const deps = this.dependencies.get(taskName) || [];
return deps.every(dep => completed.has(dep));
};
const executeTask = async (taskName) => {
if (completed.has(taskName) || executing.has(taskName)) {
return;
}
// 等待依赖完成
const deps = this.dependencies.get(taskName) || [];
for (const dep of deps) {
if (!completed.has(dep)) {
if (executing.has(dep)) {
await executing.get(dep);
} else {
await executeTask(dep);
}
}
}
// 执行任务
const taskFn = this.tasks.get(taskName);
const dependencyResults = {};
for (const dep of deps) {
dependencyResults[dep] = results.get(dep);
}
console.log(`🚀 开始执行任务: ${taskName}`);
const startTime = Date.now();
const promise = Promise.resolve(taskFn(dependencyResults));
executing.set(taskName, promise);
try {
const result = await promise;
const duration = Date.now() - startTime;
results.set(taskName, result);
completed.add(taskName);
executing.delete(taskName);
console.log(`✅ 任务完成: ${taskName} (${duration}ms)`);
return result;
} catch (error) {
executing.delete(taskName);
console.error(`❌ 任务失败: ${taskName}`, error);
throw error;
}
};
// 启动所有可执行的任务
const promises = [];
for (const taskName of this.tasks.keys()) {
promises.push(executeTask(taskName));
}
await Promise.all(promises);
return Object.fromEntries(results);
}
// 执行特定任务及其依赖
async execute(taskName) {
if (!this.tasks.has(taskName)) {
throw new Error(`任务不存在: ${taskName}`);
}
const completed = new Set();
const results = new Map();
const executeRecursive = async (name) => {
if (completed.has(name)) {
return results.get(name);
}
const deps = this.dependencies.get(name) || [];
const dependencyResults = {};
// 先执行依赖
for (const dep of deps) {
dependencyResults[dep] = await executeRecursive(dep);
}
// 执行当前任务
const taskFn = this.tasks.get(name);
console.log(`🚀 执行任务: ${name}`);
const result = await taskFn(dependencyResults);
results.set(name, result);
completed.add(name);
console.log(`✅ 任务完成: ${name}`);
return result;
};
return await executeRecursive(taskName);
}
}
// 异步状态机
class AsyncStateMachine {
constructor(initialState, transitions) {
this.currentState = initialState;
this.transitions = transitions;
this.history = [initialState];
this.listeners = new Map();
}
// 添加状态监听器
on(state, listener) {
if (!this.listeners.has(state)) {
this.listeners.set(state, []);
}
this.listeners.get(state).push(listener);
}
// 触发状态转换
async trigger(event, data = null) {
const currentTransitions = this.transitions[this.currentState];
if (!currentTransitions || !currentTransitions[event]) {
throw new Error(`无效的状态转换: ${this.currentState} -> ${event}`);
}
const transition = currentTransitions[event];
const { target, action } = transition;
console.log(`🔄 状态转换: ${this.currentState} --[${event}]--> ${target}`);
try {
// 执行转换动作
if (action) {
await action(this.currentState, target, data);
}
const previousState = this.currentState;
this.currentState = target;
this.history.push(target);
// 触发状态监听器
const listeners = this.listeners.get(target) || [];
for (const listener of listeners) {
try {
await listener(previousState, target, data);
} catch (error) {
console.error('状态监听器执行失败:', error);
}
}
console.log(`✅ 状态转换完成: ${target}`);
return target;
} catch (error) {
console.error(`❌ 状态转换失败: ${this.currentState} -> ${target}`, error);
throw error;
}
}
// 获取当前状态
getState() {
return this.currentState;
}
// 获取状态历史
getHistory() {
return [...this.history];
}
// 检查是否可以执行某个事件
canTrigger(event) {
const currentTransitions = this.transitions[this.currentState];
return currentTransitions && currentTransitions[event];
}
}
// 异步工作流引擎
class AsyncWorkflowEngine {
constructor() {
this.workflows = new Map();
this.executions = new Map();
}
// 定义工作流
defineWorkflow(name, definition) {
this.workflows.set(name, definition);
console.log(`📋 工作流已定义: ${name}`);
}
// 执行工作流
async executeWorkflow(workflowName, input = {}) {
const definition = this.workflows.get(workflowName);
if (!definition) {
throw new Error(`工作流不存在: ${workflowName}`);
}
const executionId = `${workflowName}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const execution = {
id: executionId,
workflowName: workflowName,
status: 'running',
startTime: Date.now(),
input: input,
context: { ...input },
steps: [],
currentStep: 0
};
this.executions.set(executionId, execution);
console.log(`🚀 开始执行工作流: ${workflowName} (${executionId})`);
try {
const result = await this.executeSteps(definition.steps, execution);
execution.status = 'completed';
execution.endTime = Date.now();
execution.result = result;
console.log(`✅ 工作流执行完成: ${workflowName} (${Date.now() - execution.startTime}ms)`);
return result;
} catch (error) {
execution.status = 'failed';
execution.endTime = Date.now();
execution.error = error.message;
console.error(`❌ 工作流执行失败: ${workflowName}`, error);
throw error;
}
}
// 执行工作流步骤
async executeSteps(steps, execution) {
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
execution.currentStep = i;
console.log(`📍 执行步骤 ${i + 1}/${steps.length}: ${step.name}`);
const stepExecution = {
name: step.name,
type: step.type,
startTime: Date.now(),
input: execution.context
};
try {
let result;
switch (step.type) {
case 'task':
result = await this.executeTask(step, execution.context);
break;
case 'parallel':
result = await this.executeParallel(step, execution.context);
break;
case 'condition':
result = await this.executeCondition(step, execution.context);
break;
case 'loop':
result = await this.executeLoop(step, execution.context);
break;
default:
throw new Error(`未知步骤类型: ${step.type}`);
}
stepExecution.status = 'completed';
stepExecution.endTime = Date.now();
stepExecution.result = result;
// 更新上下文
if (step.output && result !== undefined) {
execution.context[step.output] = result;
}
} catch (error) {
stepExecution.status = 'failed';
stepExecution.endTime = Date.now();
stepExecution.error = error.message;
execution.steps.push(stepExecution);
throw error;
}
execution.steps.push(stepExecution);
}
return execution.context;
}
// 执行任务步骤
async executeTask(step, context) {
if (typeof step.action === 'function') {
return await step.action(context);
} else {
throw new Error('任务步骤必须包含action函数');
}
}
// 执行并行步骤
async executeParallel(step, context) {
const promises = step.tasks.map(task => this.executeTask(task, context));
return await Promise.all(promises);
}
// 执行条件步骤
async executeCondition(step, context) {
const condition = typeof step.condition === 'function'
? await step.condition(context)
: step.condition;
if (condition && step.then) {
return await this.executeSteps([step.then], { context });
} else if (!condition && step.else) {
return await this.executeSteps([step.else], { context });
}
return null;
}
// 执行循环步骤
async executeLoop(step, context) {
const results = [];
let iteration = 0;
while (true) {
const shouldContinue = typeof step.condition === 'function'
? await step.condition(context, iteration)
: iteration < (step.iterations || 1);
if (!shouldContinue) {
break;
}
const iterationContext = { ...context, iteration };
const result = await this.executeSteps(step.steps, { context: iterationContext });
results.push(result);
iteration++;
if (step.maxIterations && iteration >= step.maxIterations) {
break;
}
}
return results;
}
// 获取执行状态
getExecution(executionId) {
return this.executions.get(executionId);
}
// 获取所有执行
getAllExecutions() {
return Array.from(this.executions.values());
}
}
// 使用示例
async function demonstrateAsyncFlowControl() {
console.log('🔄 异步流程控制演示...\n');
// 1. 任务依赖执行
console.log('1. 任务依赖执行:');
const controller = new AsyncFlowController();
controller
.addTask('fetchUser', async () => {
await new Promise(resolve => setTimeout(resolve, 100));
return { id: 1, name: 'Alice' };
})
.addTask('fetchProfile', async (deps) => {
await new Promise(resolve => setTimeout(resolve, 150));
return { userId: deps.fetchUser.id, bio: 'Software Developer' };
}, ['fetchUser'])
.addTask('fetchPosts', async (deps) => {
await new Promise(resolve => setTimeout(resolve, 200));
return [
{ id: 1, title: 'Post 1', authorId: deps.fetchUser.id },
{ id: 2, title: 'Post 2', authorId: deps.fetchUser.id }
];
}, ['fetchUser'])
.addTask('buildUserData', async (deps) => {
return {
user: deps.fetchUser,
profile: deps.fetchProfile,
posts: deps.fetchPosts
};
}, ['fetchUser', 'fetchProfile', 'fetchPosts']);
const results = await controller.executeAll();
console.log(' 执行结果:', JSON.stringify(results.buildUserData, null, 2));
// 2. 异步状态机
console.log('\n2. 异步状态机:');
const orderStateMachine = new AsyncStateMachine('pending', {
pending: {
pay: {
target: 'paid',
action: async (from, to, data) => {
console.log(` 处理支付: ${JSON.stringify(data)}`);
await new Promise(resolve => setTimeout(resolve, 100));
}
},
cancel: { target: 'cancelled' }
},
paid: {
ship: {
target: 'shipped',
action: async () => {
console.log(' 准备发货...');
await new Promise(resolve => setTimeout(resolve, 100));
}
},
refund: { target: 'refunded' }
},
shipped: {
deliver: { target: 'delivered' }
},
cancelled: {},
refunded: {},
delivered: {}
});
await orderStateMachine.trigger('pay', { amount: 100, method: 'credit_card' });
await orderStateMachine.trigger('ship');
await orderStateMachine.trigger('deliver');
console.log(' 最终状态:', orderStateMachine.getState());
console.log(' 状态历史:', orderStateMachine.getHistory());
// 3. 工作流引擎
console.log('\n3. 工作流引擎:');
const workflowEngine = new AsyncWorkflowEngine();
workflowEngine.defineWorkflow('userRegistration', {
steps: [
{
name: 'validateInput',
type: 'task',
action: async (context) => {
console.log(' 验证输入数据...');
if (!context.email || !context.password) {
throw new Error('邮箱和密码是必需的');
}
return { valid: true };
},
output: 'validation'
},
{
name: 'createUser',
type: 'task',
action: async (context) => {
console.log(' 创建用户账户...');
await new Promise(resolve => setTimeout(resolve, 100));
return {
id: Math.floor(Math.random() * 1000),
email: context.email,
createdAt: new Date().toISOString()
};
},
output: 'user'
},
{
name: 'sendWelcomeEmail',
type: 'task',
action: async (context) => {
console.log(' 发送欢迎邮件...');
await new Promise(resolve => setTimeout(resolve, 150));
return { sent: true, to: context.user.email };
},
output: 'emailResult'
}
]
});
const workflowResult = await workflowEngine.executeWorkflow('userRegistration', {
email: 'user@example.com',
password: 'securePassword'
});
console.log(' 工作流结果:', JSON.stringify(workflowResult.user, null, 2));
}
module.exports = {
AsyncFlowController,
AsyncStateMachine,
AsyncWorkflowEngine,
demonstrateAsyncFlowControl
};
异步模式是Node.js编程的核心技能,掌握各种异步模式和流程控制技术对构建复杂的异步应用至关重要!