集成基础
📖 概述
企业应用集成(EAI)是连接不同系统、应用程序和数据源的过程。通过有效的集成策略,企业可以实现数据共享、流程自动化和系统互操作,从而提高业务效率和响应能力。
🎯 学习目标
- 理解企业集成的核心概念和模式
- 掌握同步和异步集成方式
- 学习消息传递和数据转换技术
- 实现可靠的集成中间件系统
🏗️ 集成架构基础
1. 集成模式框架
javascript
// 集成模式基础框架
class IntegrationFramework {
constructor() {
this.endpoints = new Map()
this.transformers = new Map()
this.routers = new Map()
this.filters = new Map()
this.channels = new Map()
this.messageStore = new MessageStore()
this.errorHandler = new ErrorHandler()
}
// 端点管理
class Endpoint {
constructor(config) {
this.id = config.id
this.name = config.name
this.type = config.type // 'inbound', 'outbound', 'bidirectional'
this.protocol = config.protocol // 'http', 'tcp', 'file', 'database'
this.address = config.address
this.configuration = config.configuration || {}
this.status = 'stopped'
this.statistics = {
messagesReceived: 0,
messagesSent: 0,
errors: 0,
lastActivity: null
}
}
async start() {
try {
console.log(`启动端点: ${this.name} (${this.protocol})`)
await this.initialize()
this.status = 'running'
console.log(`端点启动成功: ${this.name}`)
} catch (error) {
console.error(`端点启动失败: ${this.name}`, error)
this.status = 'error'
throw error
}
}
async stop() {
try {
console.log(`停止端点: ${this.name}`)
await this.cleanup()
this.status = 'stopped'
console.log(`端点停止成功: ${this.name}`)
} catch (error) {
console.error(`端点停止失败: ${this.name}`, error)
throw error
}
}
async send(message) {
if (this.status !== 'running') {
throw new Error(`端点未运行: ${this.name}`)
}
try {
console.log(`发送消息: ${this.name}`)
const result = await this.sendMessage(message)
this.statistics.messagesSent++
this.statistics.lastActivity = new Date()
return result
} catch (error) {
this.statistics.errors++
console.error(`消息发送失败: ${this.name}`, error)
throw error
}
}
async receive(callback) {
if (this.status !== 'running') {
throw new Error(`端点未运行: ${this.name}`)
}
try {
await this.receiveMessage(callback)
} catch (error) {
this.statistics.errors++
console.error(`消息接收失败: ${this.name}`, error)
throw error
}
}
// 抽象方法 - 子类实现
async initialize() {
throw new Error('initialize方法必须被实现')
}
async cleanup() {
throw new Error('cleanup方法必须被实现')
}
async sendMessage(message) {
throw new Error('sendMessage方法必须被实现')
}
async receiveMessage(callback) {
throw new Error('receiveMessage方法必须被实现')
}
getStatistics() {
return { ...this.statistics }
}
}
// HTTP端点实现
class HttpEndpoint extends Endpoint {
constructor(config) {
super(config)
this.server = null
this.client = null
}
async initialize() {
if (this.type === 'inbound' || this.type === 'bidirectional') {
await this.startServer()
}
if (this.type === 'outbound' || this.type === 'bidirectional') {
await this.initializeClient()
}
}
async startServer() {
const express = require('express')
this.server = express()
this.server.use(express.json())
this.server.use(express.urlencoded({ extended: true }))
// 设置路由
this.server.all('*', async (req, res) => {
try {
const message = this.createMessageFromRequest(req)
if (this.messageCallback) {
const response = await this.messageCallback(message)
res.json(response || { success: true })
} else {
res.json({ success: true, message: 'Message received' })
}
this.statistics.messagesReceived++
this.statistics.lastActivity = new Date()
} catch (error) {
this.statistics.errors++
res.status(500).json({ error: error.message })
}
})
const port = this.configuration.port || 3000
return new Promise((resolve, reject) => {
this.server.listen(port, (error) => {
if (error) {
reject(error)
} else {
console.log(`HTTP服务器启动在端口 ${port}`)
resolve()
}
})
})
}
async initializeClient() {
// HTTP客户端初始化
this.client = {
baseURL: this.address,
timeout: this.configuration.timeout || 30000,
headers: this.configuration.headers || {}
}
}
async sendMessage(message) {
if (!this.client) {
throw new Error('HTTP客户端未初始化')
}
const axios = require('axios')
const config = {
method: message.method || 'POST',
url: this.client.baseURL,
data: message.body,
headers: {
...this.client.headers,
...message.headers
},
timeout: this.client.timeout
}
const response = await axios(config)
return {
statusCode: response.status,
headers: response.headers,
body: response.data,
timestamp: new Date()
}
}
async receiveMessage(callback) {
this.messageCallback = callback
}
createMessageFromRequest(req) {
return {
id: this.generateMessageId(),
method: req.method,
path: req.path,
headers: req.headers,
query: req.query,
body: req.body,
timestamp: new Date(),
source: this.name
}
}
generateMessageId() {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 8)}`
}
async cleanup() {
if (this.server) {
this.server.close()
this.server = null
}
}
}
// 消息转换器
class MessageTransformer {
constructor(config) {
this.id = config.id
this.name = config.name
this.sourceFormat = config.sourceFormat
this.targetFormat = config.targetFormat
this.transformationRules = config.transformationRules || []
this.statistics = {
transformationsCount: 0,
errors: 0
}
}
async transform(message) {
try {
console.log(`转换消息: ${this.name} (${this.sourceFormat} -> ${this.targetFormat})`)
let transformedMessage = { ...message }
// 应用转换规则
for (const rule of this.transformationRules) {
transformedMessage = await this.applyRule(transformedMessage, rule)
}
// 格式转换
transformedMessage = await this.convertFormat(transformedMessage)
this.statistics.transformationsCount++
return transformedMessage
} catch (error) {
this.statistics.errors++
console.error(`消息转换失败: ${this.name}`, error)
throw error
}
}
async applyRule(message, rule) {
switch (rule.type) {
case 'map':
return this.mapFields(message, rule)
case 'filter':
return this.filterFields(message, rule)
case 'enrich':
return await this.enrichMessage(message, rule)
case 'validate':
this.validateMessage(message, rule)
return message
default:
console.warn(`未知转换规则类型: ${rule.type}`)
return message
}
}
mapFields(message, rule) {
const mapped = { ...message }
rule.mappings.forEach(mapping => {
const sourceValue = this.getNestedValue(message, mapping.source)
this.setNestedValue(mapped, mapping.target, sourceValue)
// 如果需要删除源字段
if (mapping.deleteSource) {
this.deleteNestedValue(mapped, mapping.source)
}
})
return mapped
}
filterFields(message, rule) {
const filtered = {}
rule.allowedFields.forEach(field => {
const value = this.getNestedValue(message, field)
if (value !== undefined) {
this.setNestedValue(filtered, field, value)
}
})
return filtered
}
async enrichMessage(message, rule) {
const enriched = { ...message }
for (const enrichment of rule.enrichments) {
switch (enrichment.type) {
case 'timestamp':
this.setNestedValue(enriched, enrichment.field, new Date())
break
case 'uuid':
this.setNestedValue(enriched, enrichment.field, this.generateUUID())
break
case 'lookup':
const lookupValue = await this.performLookup(enrichment.lookup, message)
this.setNestedValue(enriched, enrichment.field, lookupValue)
break
case 'constant':
this.setNestedValue(enriched, enrichment.field, enrichment.value)
break
}
}
return enriched
}
validateMessage(message, rule) {
rule.validations.forEach(validation => {
const value = this.getNestedValue(message, validation.field)
switch (validation.type) {
case 'required':
if (value === undefined || value === null) {
throw new Error(`必填字段缺失: ${validation.field}`)
}
break
case 'type':
if (typeof value !== validation.expectedType) {
throw new Error(`字段类型错误: ${validation.field}`)
}
break
case 'regex':
if (!validation.pattern.test(value)) {
throw new Error(`字段格式错误: ${validation.field}`)
}
break
case 'range':
if (value < validation.min || value > validation.max) {
throw new Error(`字段值超出范围: ${validation.field}`)
}
break
}
})
}
async convertFormat(message) {
if (this.sourceFormat === this.targetFormat) {
return message
}
switch (this.targetFormat) {
case 'json':
return this.toJSON(message)
case 'xml':
return this.toXML(message)
case 'csv':
return this.toCSV(message)
case 'edi':
return this.toEDI(message)
default:
throw new Error(`不支持的目标格式: ${this.targetFormat}`)
}
}
toJSON(message) {
return {
...message,
body: typeof message.body === 'string' ?
JSON.parse(message.body) :
message.body
}
}
toXML(message) {
// 简化的XML转换
const xmlBuilder = require('xmlbuilder')
const root = xmlBuilder.create('message')
this.objectToXML(root, message.body)
return {
...message,
body: root.end({ pretty: true }),
headers: {
...message.headers,
'Content-Type': 'application/xml'
}
}
}
objectToXML(parent, obj) {
for (const [key, value] of Object.entries(obj)) {
if (typeof value === 'object' && value !== null) {
const child = parent.ele(key)
this.objectToXML(child, value)
} else {
parent.ele(key, value)
}
}
}
// 工具方法
getNestedValue(obj, path) {
return path.split('.').reduce((current, key) =>
current && current[key] !== undefined ? current[key] : undefined, obj)
}
setNestedValue(obj, path, value) {
const keys = path.split('.')
const lastKey = keys.pop()
const target = keys.reduce((current, key) => {
if (!current[key]) current[key] = {}
return current[key]
}, obj)
target[lastKey] = value
}
deleteNestedValue(obj, path) {
const keys = path.split('.')
const lastKey = keys.pop()
const target = keys.reduce((current, key) => current && current[key], obj)
if (target) delete target[lastKey]
}
generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0
const v = c == 'x' ? r : (r & 0x3 | 0x8)
return v.toString(16)
})
}
async performLookup(lookup, message) {
// 简化的查找实现
console.log(`执行查找: ${lookup.table}`)
return `lookup_result_${Date.now()}`
}
getStatistics() {
return { ...this.statistics }
}
}
// 消息路由器
class MessageRouter {
constructor(config) {
this.id = config.id
this.name = config.name
this.routes = config.routes || []
this.defaultRoute = config.defaultRoute
this.statistics = {
messagesRouted: 0,
routeStatistics: {}
}
}
async route(message) {
try {
console.log(`路由消息: ${this.name}`)
// 查找匹配的路由
const matchedRoutes = []
for (const route of this.routes) {
if (await this.evaluateCondition(message, route.condition)) {
matchedRoutes.push(route)
}
}
// 如果没有匹配的路由,使用默认路由
if (matchedRoutes.length === 0 && this.defaultRoute) {
matchedRoutes.push(this.defaultRoute)
}
if (matchedRoutes.length === 0) {
throw new Error('没有匹配的路由规则')
}
// 路由消息
const routingResults = []
for (const route of matchedRoutes) {
try {
const result = await this.routeToDestination(message, route)
routingResults.push(result)
// 更新统计
this.updateRouteStatistics(route.id, true)
} catch (error) {
console.error(`路由失败: ${route.id}`, error)
this.updateRouteStatistics(route.id, false)
routingResults.push({
routeId: route.id,
success: false,
error: error.message
})
}
}
this.statistics.messagesRouted++
return routingResults
} catch (error) {
console.error(`消息路由失败: ${this.name}`, error)
throw error
}
}
async evaluateCondition(message, condition) {
if (!condition) {
return true // 无条件则匹配
}
switch (condition.type) {
case 'header':
return this.evaluateHeaderCondition(message, condition)
case 'body':
return this.evaluateBodyCondition(message, condition)
case 'javascript':
return this.evaluateJavaScriptCondition(message, condition)
case 'xpath':
return this.evaluateXPathCondition(message, condition)
default:
console.warn(`未知条件类型: ${condition.type}`)
return false
}
}
evaluateHeaderCondition(message, condition) {
const headerValue = message.headers[condition.header]
switch (condition.operator) {
case 'equals':
return headerValue === condition.value
case 'contains':
return headerValue && headerValue.includes(condition.value)
case 'regex':
return headerValue && new RegExp(condition.value).test(headerValue)
default:
return false
}
}
evaluateBodyCondition(message, condition) {
const bodyValue = this.getNestedValue(message.body, condition.field)
switch (condition.operator) {
case 'equals':
return bodyValue === condition.value
case 'greater':
return bodyValue > condition.value
case 'less':
return bodyValue < condition.value
case 'contains':
return bodyValue && bodyValue.includes && bodyValue.includes(condition.value)
case 'exists':
return bodyValue !== undefined
default:
return false
}
}
evaluateJavaScriptCondition(message, condition) {
try {
// 安全的JavaScript条件评估
const context = {
message,
headers: message.headers,
body: message.body
}
const func = new Function('context', `return ${condition.expression}`)
return func(context)
} catch (error) {
console.error('JavaScript条件评估失败:', error)
return false
}
}
async routeToDestination(message, route) {
console.log(`路由到目标: ${route.destination}`)
// 应用路由级别的转换
let routedMessage = message
if (route.transformer) {
routedMessage = await route.transformer.transform(message)
}
// 发送到目标端点
const result = await route.destination.send(routedMessage)
return {
routeId: route.id,
destinationId: route.destination.id,
success: true,
result: result
}
}
updateRouteStatistics(routeId, success) {
if (!this.statistics.routeStatistics[routeId]) {
this.statistics.routeStatistics[routeId] = {
attempts: 0,
successes: 0,
failures: 0
}
}
const stats = this.statistics.routeStatistics[routeId]
stats.attempts++
if (success) {
stats.successes++
} else {
stats.failures++
}
}
getNestedValue(obj, path) {
return path.split('.').reduce((current, key) =>
current && current[key] !== undefined ? current[key] : undefined, obj)
}
getStatistics() {
return { ...this.statistics }
}
}
}
// 消息存储
class MessageStore {
constructor() {
this.messages = new Map()
this.config = {
maxMessages: 10000,
retentionPeriod: 7 * 24 * 60 * 60 * 1000 // 7天
}
}
async store(message) {
const messageId = message.id || this.generateMessageId()
const storedMessage = {
id: messageId,
originalMessage: { ...message },
storedAt: new Date(),
status: 'stored',
attempts: 0,
lastAttempt: null,
errors: []
}
this.messages.set(messageId, storedMessage)
// 检查存储限制
await this.enforceStorageLimits()
console.log(`消息存储: ${messageId}`)
return messageId
}
async retrieve(messageId) {
const message = this.messages.get(messageId)
if (!message) {
return null
}
console.log(`消息检索: ${messageId}`)
return message
}
async updateStatus(messageId, status, error = null) {
const message = this.messages.get(messageId)
if (!message) {
throw new Error(`消息不存在: ${messageId}`)
}
message.status = status
message.lastAttempt = new Date()
message.attempts++
if (error) {
message.errors.push({
timestamp: new Date(),
error: error.message || error
})
}
console.log(`消息状态更新: ${messageId} -> ${status}`)
}
async search(criteria) {
const results = []
this.messages.forEach((message, id) => {
if (this.matchesCriteria(message, criteria)) {
results.push(message)
}
})
return results
}
matchesCriteria(message, criteria) {
if (criteria.status && message.status !== criteria.status) {
return false
}
if (criteria.fromDate && message.storedAt < criteria.fromDate) {
return false
}
if (criteria.toDate && message.storedAt > criteria.toDate) {
return false
}
if (criteria.source && message.originalMessage.source !== criteria.source) {
return false
}
return true
}
async enforceStorageLimits() {
const now = new Date()
// 删除过期消息
const expiredMessages = []
this.messages.forEach((message, id) => {
if (now - message.storedAt > this.config.retentionPeriod) {
expiredMessages.push(id)
}
})
expiredMessages.forEach(id => {
this.messages.delete(id)
console.log(`删除过期消息: ${id}`)
})
// 检查消息数量限制
if (this.messages.size > this.config.maxMessages) {
const sortedMessages = Array.from(this.messages.entries())
.sort(([, a], [, b]) => a.storedAt - b.storedAt)
const toDelete = sortedMessages.slice(0, this.messages.size - this.config.maxMessages)
toDelete.forEach(([id]) => {
this.messages.delete(id)
console.log(`删除旧消息: ${id}`)
})
}
}
generateMessageId() {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 8)}`
}
getStatistics() {
const statusCounts = {}
let totalSize = 0
this.messages.forEach(message => {
statusCounts[message.status] = (statusCounts[message.status] || 0) + 1
totalSize += JSON.stringify(message).length
})
return {
totalMessages: this.messages.size,
statusCounts,
totalSize,
retentionPeriod: this.config.retentionPeriod,
maxMessages: this.config.maxMessages
}
}
}
// 错误处理器
class ErrorHandler {
constructor() {
this.errorStrategies = new Map()
this.defaultStrategy = 'log'
this.errorLog = []
}
registerStrategy(name, strategy) {
this.errorStrategies.set(name, strategy)
console.log(`注册错误处理策略: ${name}`)
}
async handleError(error, context, strategyName = null) {
const strategy = strategyName || this.defaultStrategy
const errorStrategy = this.errorStrategies.get(strategy)
if (!errorStrategy) {
console.error(`错误处理策略不存在: ${strategy}`)
return this.logError(error, context)
}
try {
return await errorStrategy.handle(error, context)
} catch (strategyError) {
console.error('错误处理策略执行失败:', strategyError)
return this.logError(error, context)
}
}
logError(error, context) {
const errorRecord = {
id: this.generateErrorId(),
timestamp: new Date(),
error: {
message: error.message,
stack: error.stack,
name: error.name
},
context: context || {},
severity: this.determineSeverity(error)
}
this.errorLog.push(errorRecord)
console.error(`记录错误: ${errorRecord.id}`, errorRecord)
// 保持错误日志大小限制
if (this.errorLog.length > 1000) {
this.errorLog = this.errorLog.slice(-1000)
}
return errorRecord
}
determineSeverity(error) {
if (error.name === 'ValidationError') {
return 'warning'
} else if (error.name === 'ConnectionError') {
return 'error'
} else if (error.name === 'SecurityError') {
return 'critical'
} else {
return 'error'
}
}
getErrorStatistics() {
const severityCounts = {}
const errorTypes = {}
this.errorLog.forEach(errorRecord => {
const severity = errorRecord.severity
severityCounts[severity] = (severityCounts[severity] || 0) + 1
const errorType = errorRecord.error.name
errorTypes[errorType] = (errorTypes[errorType] || 0) + 1
})
return {
totalErrors: this.errorLog.length,
severityCounts,
errorTypes,
recentErrors: this.errorLog.slice(-10)
}
}
generateErrorId() {
return `err_${Date.now()}_${Math.random().toString(36).substr(2, 6)}`
}
}
2. 集成流程实现
javascript
// 集成流程引擎
class IntegrationFlow {
constructor(config) {
this.id = config.id
this.name = config.name
this.description = config.description
this.steps = config.steps || []
this.status = 'stopped'
this.statistics = {
executions: 0,
successes: 0,
failures: 0,
averageExecutionTime: 0
}
}
async execute(inputMessage, context = {}) {
const executionId = this.generateExecutionId()
const executionContext = {
...context,
executionId,
flowId: this.id,
startTime: new Date()
}
console.log(`执行集成流程: ${this.name} (${executionId})`)
try {
let currentMessage = inputMessage
const stepResults = []
for (const [index, step] of this.steps.entries()) {
console.log(`执行步骤 ${index + 1}: ${step.name}`)
const stepStartTime = Date.now()
try {
const stepResult = await this.executeStep(step, currentMessage, executionContext)
stepResults.push({
stepIndex: index,
stepName: step.name,
success: true,
executionTime: Date.now() - stepStartTime,
result: stepResult
})
// 更新当前消息为步骤输出
if (stepResult && stepResult.outputMessage) {
currentMessage = stepResult.outputMessage
}
} catch (stepError) {
console.error(`步骤执行失败: ${step.name}`, stepError)
stepResults.push({
stepIndex: index,
stepName: step.name,
success: false,
executionTime: Date.now() - stepStartTime,
error: stepError.message
})
// 检查错误处理策略
if (step.onError === 'continue') {
console.log('忽略错误,继续执行')
continue
} else if (step.onError === 'stop') {
console.log('停止流程执行')
break
} else {
throw stepError
}
}
}
const executionTime = Date.now() - executionContext.startTime.getTime()
// 更新统计信息
this.updateStatistics(true, executionTime)
const result = {
executionId,
success: true,
inputMessage,
outputMessage: currentMessage,
stepResults,
executionTime,
completedAt: new Date()
}
console.log(`流程执行完成: ${this.name} (${executionTime}ms)`)
return result
} catch (error) {
const executionTime = Date.now() - executionContext.startTime.getTime()
// 更新统计信息
this.updateStatistics(false, executionTime)
console.error(`流程执行失败: ${this.name}`, error)
throw {
executionId,
success: false,
error: error.message,
executionTime,
failedAt: new Date()
}
}
}
async executeStep(step, message, context) {
switch (step.type) {
case 'transform':
return await this.executeTransformStep(step, message, context)
case 'route':
return await this.executeRouteStep(step, message, context)
case 'filter':
return await this.executeFilterStep(step, message, context)
case 'enrich':
return await this.executeEnrichStep(step, message, context)
case 'validate':
return await this.executeValidateStep(step, message, context)
case 'custom':
return await this.executeCustomStep(step, message, context)
default:
throw new Error(`未知步骤类型: ${step.type}`)
}
}
async executeTransformStep(step, message, context) {
const transformer = step.transformer
if (!transformer) {
throw new Error('转换步骤缺少转换器')
}
const transformedMessage = await transformer.transform(message)
return {
outputMessage: transformedMessage,
transformations: transformer.transformationRules.length
}
}
async executeRouteStep(step, message, context) {
const router = step.router
if (!router) {
throw new Error('路由步骤缺少路由器')
}
const routingResults = await router.route(message)
return {
routingResults,
routedDestinations: routingResults.length
}
}
async executeFilterStep(step, message, context) {
const condition = step.condition
if (!condition) {
throw new Error('过滤步骤缺少条件')
}
const passed = await this.evaluateCondition(message, condition, context)
if (!passed) {
throw new Error('消息被过滤器阻止')
}
return {
outputMessage: message,
filterPassed: true
}
}
async executeEnrichStep(step, message, context) {
const enrichments = step.enrichments || []
let enrichedMessage = { ...message }
for (const enrichment of enrichments) {
enrichedMessage = await this.applyEnrichment(enrichedMessage, enrichment, context)
}
return {
outputMessage: enrichedMessage,
enrichmentsApplied: enrichments.length
}
}
async executeValidateStep(step, message, context) {
const validations = step.validations || []
const errors = []
for (const validation of validations) {
try {
await this.performValidation(message, validation, context)
} catch (validationError) {
errors.push({
validation: validation.name,
error: validationError.message
})
}
}
if (errors.length > 0) {
throw new Error(`验证失败: ${errors.map(e => e.error).join(', ')}`)
}
return {
outputMessage: message,
validationsPassed: validations.length
}
}
async executeCustomStep(step, message, context) {
const customHandler = step.handler
if (!customHandler) {
throw new Error('自定义步骤缺少处理器')
}
return await customHandler(message, context, step.config)
}
updateStatistics(success, executionTime) {
this.statistics.executions++
if (success) {
this.statistics.successes++
} else {
this.statistics.failures++
}
// 更新平均执行时间
const totalTime = this.statistics.averageExecutionTime * (this.statistics.executions - 1) + executionTime
this.statistics.averageExecutionTime = totalTime / this.statistics.executions
}
generateExecutionId() {
return `exec_${Date.now()}_${Math.random().toString(36).substr(2, 8)}`
}
start() {
this.status = 'running'
console.log(`集成流程启动: ${this.name}`)
}
stop() {
this.status = 'stopped'
console.log(`集成流程停止: ${this.name}`)
}
getStatistics() {
return {
...this.statistics,
successRate: this.statistics.executions > 0 ?
this.statistics.successes / this.statistics.executions : 0,
status: this.status
}
}
}
// 集成管理器
class IntegrationManager {
constructor() {
this.endpoints = new Map()
this.transformers = new Map()
this.routers = new Map()
this.flows = new Map()
this.framework = new IntegrationFramework()
this.status = 'stopped'
}
// 注册组件
registerEndpoint(endpoint) {
this.endpoints.set(endpoint.id, endpoint)
console.log(`注册端点: ${endpoint.name}`)
}
registerTransformer(transformer) {
this.transformers.set(transformer.id, transformer)
console.log(`注册转换器: ${transformer.name}`)
}
registerRouter(router) {
this.routers.set(router.id, router)
console.log(`注册路由器: ${router.name}`)
}
registerFlow(flow) {
this.flows.set(flow.id, flow)
console.log(`注册集成流程: ${flow.name}`)
}
// 启动集成管理器
async start() {
try {
console.log('启动集成管理器...')
// 启动所有端点
for (const endpoint of this.endpoints.values()) {
await endpoint.start()
}
// 启动所有流程
for (const flow of this.flows.values()) {
flow.start()
}
this.status = 'running'
console.log('集成管理器启动完成')
} catch (error) {
console.error('集成管理器启动失败:', error)
this.status = 'error'
throw error
}
}
// 停止集成管理器
async stop() {
try {
console.log('停止集成管理器...')
// 停止所有流程
for (const flow of this.flows.values()) {
flow.stop()
}
// 停止所有端点
for (const endpoint of this.endpoints.values()) {
await endpoint.stop()
}
this.status = 'stopped'
console.log('集成管理器停止完成')
} catch (error) {
console.error('集成管理器停止失败:', error)
throw error
}
}
// 执行流程
async executeFlow(flowId, inputMessage, context = {}) {
const flow = this.flows.get(flowId)
if (!flow) {
throw new Error(`流程不存在: ${flowId}`)
}
if (flow.status !== 'running') {
throw new Error(`流程未运行: ${flowId}`)
}
return await flow.execute(inputMessage, context)
}
// 获取系统状态
getSystemStatus() {
const endpointStatuses = {}
this.endpoints.forEach((endpoint, id) => {
endpointStatuses[id] = {
name: endpoint.name,
status: endpoint.status,
statistics: endpoint.getStatistics()
}
})
const flowStatuses = {}
this.flows.forEach((flow, id) => {
flowStatuses[id] = {
name: flow.name,
status: flow.status,
statistics: flow.getStatistics()
}
})
return {
systemStatus: this.status,
endpoints: endpointStatuses,
flows: flowStatuses,
timestamp: new Date()
}
}
// 健康检查
async healthCheck() {
const health = {
status: 'healthy',
timestamp: new Date(),
components: {}
}
try {
// 检查端点健康状态
for (const [id, endpoint] of this.endpoints) {
health.components[`endpoint_${id}`] = {
status: endpoint.status,
lastActivity: endpoint.statistics.lastActivity
}
if (endpoint.status === 'error') {
health.status = 'unhealthy'
}
}
// 检查流程健康状态
for (const [id, flow] of this.flows) {
health.components[`flow_${id}`] = {
status: flow.status,
successRate: flow.getStatistics().successRate
}
if (flow.getStatistics().successRate < 0.9) {
health.status = 'degraded'
}
}
} catch (error) {
health.status = 'unhealthy'
health.error = error.message
}
return health
}
}
📚 最佳实践总结
- 松耦合设计:使用消息传递实现系统间的松耦合
- 标准化接口:定义清晰的集成接口和数据格式
- 错误处理:实现完善的错误处理和重试机制
- 监控告警:全面监控集成流程的性能和健康状况
- 数据转换:处理不同系统间的数据格式差异
- 安全保护:确保集成过程中的数据安全和访问控制
- 性能优化:优化消息传递和处理的性能
- 版本管理:管理接口和消息格式的版本兼容性
通过掌握集成基础技术,您将能够构建稳定可靠的企业应用集成解决方案。