API网关
📖 概述
API网关是微服务架构中的关键组件,作为所有客户端请求的单一入口点。它负责请求路由、组合、协议转换、安全认证、限流监控等功能,为后端服务提供统一的访问接口。
🎯 学习目标
- 理解API网关的核心概念和架构模式
- 掌握请求路由和负载均衡机制
- 学习安全认证和访问控制
- 实现高性能的API网关中间件
🚪 API网关核心架构
1. 网关基础框架
javascript
// API网关核心框架
class APIGateway {
constructor(config = {}) {
this.config = config
this.routes = new Map()
this.middlewares = []
this.serviceRegistry = new ServiceRegistry()
this.loadBalancer = new LoadBalancer()
this.authenticationManager = new AuthenticationManager()
this.rateLimiter = new RateLimiter()
this.circuitBreaker = new CircuitBreaker()
this.metricsCollector = new MetricsCollector()
this.server = null
this.status = 'stopped'
}
// 请求路由管理
class RouteManager {
constructor() {
this.routes = new Map()
this.pathMatcher = new PathMatcher()
}
addRoute(routeConfig) {
const route = {
id: routeConfig.id || this.generateRouteId(),
path: routeConfig.path,
method: routeConfig.method || 'GET',
service: routeConfig.service,
backend: routeConfig.backend,
middleware: routeConfig.middleware || [],
timeout: routeConfig.timeout || 30000,
retries: routeConfig.retries || 3,
cache: routeConfig.cache || false,
rateLimit: routeConfig.rateLimit,
authentication: routeConfig.authentication || false,
authorization: routeConfig.authorization,
transformation: routeConfig.transformation,
validation: routeConfig.validation,
circuitBreaker: routeConfig.circuitBreaker || false,
enabled: routeConfig.enabled !== false,
created: new Date()
}
this.routes.set(route.id, route)
console.log(`路由添加: ${route.method} ${route.path} -> ${route.service}`)
return route
}
findRoute(method, path) {
for (const route of this.routes.values()) {
if (!route.enabled) continue
if (route.method !== method && route.method !== '*') continue
if (this.pathMatcher.match(route.path, path)) {
return {
route,
params: this.pathMatcher.extractParams(route.path, path)
}
}
}
return null
}
updateRoute(routeId, updates) {
const route = this.routes.get(routeId)
if (!route) {
throw new Error(`路由不存在: ${routeId}`)
}
Object.assign(route, updates, { updated: new Date() })
console.log(`路由更新: ${routeId}`)
return route
}
removeRoute(routeId) {
const deleted = this.routes.delete(routeId)
if (deleted) {
console.log(`路由删除: ${routeId}`)
}
return deleted
}
generateRouteId() {
return `route_${Date.now()}_${Math.random().toString(36).substr(2, 6)}`
}
getRoutes() {
return Array.from(this.routes.values())
}
}
// 路径匹配器
class PathMatcher {
match(pattern, path) {
const regex = this.patternToRegex(pattern)
return regex.test(path)
}
extractParams(pattern, path) {
const params = {}
const regex = this.patternToRegex(pattern)
const match = path.match(regex)
if (match) {
const paramNames = this.extractParamNames(pattern)
paramNames.forEach((name, index) => {
params[name] = match[index + 1]
})
}
return params
}
patternToRegex(pattern) {
// 转换路径模式为正则表达式
// /users/{id} -> /users/([^/]+)
// /users/{id}/posts/{postId} -> /users/([^/]+)/posts/([^/]+)
const regexPattern = pattern
.replace(/\{([^}]+)\}/g, '([^/]+)')
.replace(/\*/g, '.*')
return new RegExp(`^${regexPattern}$`)
}
extractParamNames(pattern) {
const matches = pattern.match(/\{([^}]+)\}/g)
if (!matches) return []
return matches.map(match => match.slice(1, -1))
}
}
// 中间件系统
use(middleware) {
this.middlewares.push(middleware)
console.log(`中间件注册: ${middleware.name || 'anonymous'}`)
}
async executeMiddleware(context) {
for (const middleware of this.middlewares) {
try {
const result = await middleware(context)
// 如果中间件返回响应,停止执行
if (result && result.response) {
return result.response
}
// 如果中间件指示停止,返回错误
if (result && result.stop) {
throw new Error(result.error || '中间件停止执行')
}
} catch (error) {
console.error(`中间件执行失败: ${middleware.name}`, error)
throw error
}
}
}
// 请求处理
async handleRequest(request) {
const requestId = this.generateRequestId()
const startTime = Date.now()
const context = {
requestId,
request,
startTime,
gateway: this,
route: null,
params: {},
user: null,
metrics: {},
response: null
}
try {
console.log(`处理请求: ${requestId} ${request.method} ${request.path}`)
// 路由匹配
const routeMatch = this.routeManager.findRoute(request.method, request.path)
if (!routeMatch) {
throw new Error('路由不存在', 404)
}
context.route = routeMatch.route
context.params = routeMatch.params
// 执行全局中间件
await this.executeMiddleware(context)
// 执行路由特定中间件
if (context.route.middleware) {
for (const middleware of context.route.middleware) {
await middleware(context)
}
}
// 认证检查
if (context.route.authentication) {
context.user = await this.authenticationManager.authenticate(context)
}
// 授权检查
if (context.route.authorization) {
await this.authenticationManager.authorize(context)
}
// 限流检查
if (context.route.rateLimit) {
await this.rateLimiter.checkRateLimit(context)
}
// 请求验证
if (context.route.validation) {
await this.validateRequest(context)
}
// 请求转换
if (context.route.transformation) {
await this.transformRequest(context)
}
// 代理到后端服务
const response = await this.proxyRequest(context)
// 响应转换
if (context.route.transformation && context.route.transformation.response) {
await this.transformResponse(context, response)
}
// 记录指标
this.metricsCollector.recordRequest(context, response, Date.now() - startTime)
console.log(`请求完成: ${requestId} (${Date.now() - startTime}ms)`)
return response
} catch (error) {
const errorResponse = await this.handleError(context, error)
// 记录错误指标
this.metricsCollector.recordError(context, error, Date.now() - startTime)
console.error(`请求失败: ${requestId}`, error)
return errorResponse
}
}
// 代理请求到后端
async proxyRequest(context) {
const route = context.route
// 服务发现
const serviceInstance = await this.serviceRegistry.discover(route.service)
if (!serviceInstance) {
throw new Error(`服务不可用: ${route.service}`)
}
// 构建后端URL
const backendUrl = this.buildBackendUrl(serviceInstance, route, context)
// 熔断器检查
if (route.circuitBreaker) {
await this.circuitBreaker.checkCircuitBreaker(route.service)
}
try {
const axios = require('axios')
const config = {
method: context.request.method,
url: backendUrl,
headers: this.buildBackendHeaders(context),
data: context.request.body,
timeout: route.timeout,
validateStatus: () => true // 不抛出状态码错误
}
const response = await axios(config)
// 熔断器记录成功
if (route.circuitBreaker) {
this.circuitBreaker.recordSuccess(route.service)
}
return {
statusCode: response.status,
headers: response.headers,
body: response.data
}
} catch (error) {
// 熔断器记录失败
if (route.circuitBreaker) {
this.circuitBreaker.recordFailure(route.service)
}
// 重试逻辑
if (route.retries > 0 && this.shouldRetry(error)) {
console.log(`重试请求: ${context.requestId} (剩余: ${route.retries})`)
route.retries--
await this.delay(1000) // 延迟1秒重试
return await this.proxyRequest(context)
}
throw error
}
}
buildBackendUrl(serviceInstance, route, context) {
let path = route.backend?.path || context.request.path
// 替换路径参数
Object.entries(context.params).forEach(([key, value]) => {
path = path.replace(`{${key}}`, value)
})
return `${serviceInstance.endpoint}${path}${context.request.queryString || ''}`
}
buildBackendHeaders(context) {
const headers = { ...context.request.headers }
// 添加网关标识
headers['X-Gateway'] = 'API-Gateway'
headers['X-Request-ID'] = context.requestId
// 添加用户信息
if (context.user) {
headers['X-User-ID'] = context.user.id
headers['X-User-Roles'] = context.user.roles.join(',')
}
// 移除不需要的头
delete headers['host']
delete headers['content-length']
return headers
}
async validateRequest(context) {
const validation = context.route.validation
if (validation.headers) {
this.validateHeaders(context.request.headers, validation.headers)
}
if (validation.query) {
this.validateQuery(context.request.query, validation.query)
}
if (validation.body) {
this.validateBody(context.request.body, validation.body)
}
}
validateHeaders(headers, schema) {
// 简化的头部验证
if (schema.required) {
schema.required.forEach(header => {
if (!headers[header.toLowerCase()]) {
throw new Error(`缺少必需头部: ${header}`)
}
})
}
}
validateBody(body, schema) {
// 可以集成JSON Schema验证
if (schema.type === 'object' && typeof body !== 'object') {
throw new Error('请求体必须是对象')
}
if (schema.required) {
schema.required.forEach(field => {
if (body[field] === undefined) {
throw new Error(`缺少必需字段: ${field}`)
}
})
}
}
async transformRequest(context) {
const transformation = context.route.transformation.request
if (!transformation) return
if (transformation.headers) {
this.applyHeaderTransformation(context.request.headers, transformation.headers)
}
if (transformation.body) {
context.request.body = this.applyBodyTransformation(context.request.body, transformation.body)
}
}
async transformResponse(context, response) {
const transformation = context.route.transformation.response
if (!transformation) return
if (transformation.headers) {
this.applyHeaderTransformation(response.headers, transformation.headers)
}
if (transformation.body) {
response.body = this.applyBodyTransformation(response.body, transformation.body)
}
}
applyHeaderTransformation(headers, transformation) {
if (transformation.add) {
Object.assign(headers, transformation.add)
}
if (transformation.remove) {
transformation.remove.forEach(header => {
delete headers[header]
})
}
if (transformation.rename) {
Object.entries(transformation.rename).forEach(([oldName, newName]) => {
if (headers[oldName]) {
headers[newName] = headers[oldName]
delete headers[oldName]
}
})
}
}
applyBodyTransformation(body, transformation) {
if (transformation.type === 'jq') {
// 可以集成jq库进行复杂转换
return body
}
if (transformation.mapping) {
const transformed = {}
Object.entries(transformation.mapping).forEach(([targetField, sourceField]) => {
const value = this.getNestedValue(body, sourceField)
this.setNestedValue(transformed, targetField, value)
})
return transformed
}
return body
}
async handleError(context, error) {
const errorResponse = {
statusCode: error.statusCode || 500,
headers: {
'Content-Type': 'application/json',
'X-Request-ID': context.requestId
},
body: {
error: {
code: error.code || 'INTERNAL_ERROR',
message: error.message || '内部服务器错误',
requestId: context.requestId,
timestamp: new Date().toISOString()
}
}
}
// 根据错误类型设置状态码
if (error.message === '路由不存在') {
errorResponse.statusCode = 404
errorResponse.body.error.code = 'ROUTE_NOT_FOUND'
} else if (error.message.includes('认证失败')) {
errorResponse.statusCode = 401
errorResponse.body.error.code = 'AUTHENTICATION_FAILED'
} else if (error.message.includes('权限不足')) {
errorResponse.statusCode = 403
errorResponse.body.error.code = 'AUTHORIZATION_FAILED'
} else if (error.message.includes('限流')) {
errorResponse.statusCode = 429
errorResponse.body.error.code = 'RATE_LIMIT_EXCEEDED'
}
return errorResponse
}
shouldRetry(error) {
// 网络错误或5xx错误可以重试
return error.code === 'ECONNRESET' ||
error.code === 'ECONNREFUSED' ||
(error.response && error.response.status >= 500)
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
generateRequestId() {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 8)}`
}
getNestedValue(obj, path) {
return path.split('.').reduce((current, key) =>
current && current[key] !== undefined ? current[key] : undefined, obj)
}
setNestedValue(obj, path, value) {
const keys = path.split('.')
const lastKey = keys.pop()
const target = keys.reduce((current, key) => {
if (!current[key]) current[key] = {}
return current[key]
}, obj)
target[lastKey] = value
}
// 启动网关
async start() {
try {
console.log('启动API网关...')
// 初始化组件
this.routeManager = new this.RouteManager()
await this.serviceRegistry.start()
await this.authenticationManager.start()
await this.rateLimiter.start()
await this.circuitBreaker.start()
await this.metricsCollector.start()
// 启动HTTP服务器
await this.startHttpServer()
this.status = 'running'
console.log('API网关启动完成')
} catch (error) {
console.error('API网关启动失败:', error)
this.status = 'error'
throw error
}
}
async startHttpServer() {
const express = require('express')
const app = express()
// 解析请求体
app.use(express.json({ limit: '10mb' }))
app.use(express.urlencoded({ extended: true, limit: '10mb' }))
// 处理所有请求
app.all('*', async (req, res) => {
try {
const request = {
method: req.method,
path: req.path,
headers: req.headers,
query: req.query,
body: req.body,
queryString: req.url.includes('?') ? req.url.substring(req.url.indexOf('?')) : ''
}
const response = await this.handleRequest(request)
// 设置响应头
Object.entries(response.headers || {}).forEach(([key, value]) => {
res.set(key, value)
})
res.status(response.statusCode || 200)
if (response.body !== undefined) {
res.json(response.body)
} else {
res.end()
}
} catch (error) {
console.error('请求处理异常:', error)
res.status(500).json({
error: {
code: 'GATEWAY_ERROR',
message: '网关内部错误'
}
})
}
})
const port = this.config.port || 8080
return new Promise((resolve, reject) => {
this.server = app.listen(port, (error) => {
if (error) {
reject(error)
} else {
console.log(`API网关服务启动在端口 ${port}`)
resolve()
}
})
})
}
async stop() {
try {
console.log('停止API网关...')
if (this.server) {
this.server.close()
}
await this.serviceRegistry.stop()
await this.authenticationManager.stop()
await this.rateLimiter.stop()
await this.circuitBreaker.stop()
await this.metricsCollector.stop()
this.status = 'stopped'
console.log('API网关停止完成')
} catch (error) {
console.error('API网关停止失败:', error)
throw error
}
}
getStatus() {
return {
status: this.status,
routes: this.routeManager ? this.routeManager.getRoutes().length : 0,
metrics: this.metricsCollector ? this.metricsCollector.getMetrics() : {},
uptime: this.startTime ? Date.now() - this.startTime : 0
}
}
}
2. 支持组件实现
javascript
// 服务注册表
class ServiceRegistry {
constructor() {
this.services = new Map()
this.healthChecks = new Map()
}
async start() {
console.log('服务注册表启动')
}
async stop() {
console.log('服务注册表停止')
}
register(serviceName, instances) {
this.services.set(serviceName, instances)
console.log(`服务注册: ${serviceName} (${instances.length} 个实例)`)
}
async discover(serviceName) {
const instances = this.services.get(serviceName)
if (!instances || instances.length === 0) {
return null
}
// 简单的轮询负载均衡
const healthyInstances = instances.filter(instance => instance.healthy !== false)
if (healthyInstances.length === 0) {
return null
}
return healthyInstances[Math.floor(Math.random() * healthyInstances.length)]
}
}
// 认证管理器
class AuthenticationManager {
constructor() {
this.providers = new Map()
this.cache = new Map()
}
async start() {
console.log('认证管理器启动')
}
async stop() {
console.log('认证管理器停止')
}
async authenticate(context) {
const authHeader = context.request.headers.authorization
if (!authHeader) {
throw new Error('认证失败: 缺少Authorization头')
}
if (authHeader.startsWith('Bearer ')) {
const token = authHeader.substring(7)
return await this.validateJWTToken(token)
}
throw new Error('认证失败: 不支持的认证方式')
}
async validateJWTToken(token) {
// 简化的JWT验证
try {
const jwt = require('jsonwebtoken')
const decoded = jwt.verify(token, process.env.JWT_SECRET || 'secret')
return {
id: decoded.sub,
username: decoded.username,
roles: decoded.roles || [],
permissions: decoded.permissions || []
}
} catch (error) {
throw new Error('认证失败: 无效的JWT令牌')
}
}
async authorize(context) {
const authorization = context.route.authorization
const user = context.user
if (!user) {
throw new Error('权限不足: 未认证用户')
}
if (authorization.roles && authorization.roles.length > 0) {
const hasRole = authorization.roles.some(role => user.roles.includes(role))
if (!hasRole) {
throw new Error('权限不足: 缺少必需角色')
}
}
if (authorization.permissions && authorization.permissions.length > 0) {
const hasPermission = authorization.permissions.some(permission =>
user.permissions.includes(permission))
if (!hasPermission) {
throw new Error('权限不足: 缺少必需权限')
}
}
}
}
// 限流器
class RateLimiter {
constructor() {
this.limiters = new Map()
}
async start() {
console.log('限流器启动')
}
async stop() {
console.log('限流器停止')
}
async checkRateLimit(context) {
const rateLimit = context.route.rateLimit
const key = this.generateLimitKey(context, rateLimit.key)
if (!this.limiters.has(key)) {
this.limiters.set(key, {
count: 0,
resetTime: Date.now() + rateLimit.window * 1000
})
}
const limiter = this.limiters.get(key)
if (Date.now() > limiter.resetTime) {
limiter.count = 0
limiter.resetTime = Date.now() + rateLimit.window * 1000
}
if (limiter.count >= rateLimit.limit) {
throw new Error(`限流: 超出速率限制 ${rateLimit.limit}/${rateLimit.window}s`)
}
limiter.count++
}
generateLimitKey(context, keyTemplate) {
if (keyTemplate === 'ip') {
return context.request.headers['x-forwarded-for'] ||
context.request.headers['x-real-ip'] ||
'unknown'
}
if (keyTemplate === 'user' && context.user) {
return `user:${context.user.id}`
}
return 'global'
}
}
// 熔断器
class CircuitBreaker {
constructor() {
this.breakers = new Map()
}
async start() {
console.log('熔断器启动')
}
async stop() {
console.log('熔断器停止')
}
async checkCircuitBreaker(serviceName) {
const breaker = this.getBreaker(serviceName)
if (breaker.state === 'open') {
if (Date.now() - breaker.lastFailureTime > breaker.timeout) {
breaker.state = 'half-open'
console.log(`熔断器半开: ${serviceName}`)
} else {
throw new Error(`熔断器开启: ${serviceName}`)
}
}
}
recordSuccess(serviceName) {
const breaker = this.getBreaker(serviceName)
breaker.failures = 0
if (breaker.state === 'half-open') {
breaker.state = 'closed'
console.log(`熔断器关闭: ${serviceName}`)
}
}
recordFailure(serviceName) {
const breaker = this.getBreaker(serviceName)
breaker.failures++
breaker.lastFailureTime = Date.now()
if (breaker.failures >= breaker.threshold) {
breaker.state = 'open'
console.log(`熔断器开启: ${serviceName}`)
}
}
getBreaker(serviceName) {
if (!this.breakers.has(serviceName)) {
this.breakers.set(serviceName, {
state: 'closed',
failures: 0,
threshold: 5,
timeout: 60000,
lastFailureTime: 0
})
}
return this.breakers.get(serviceName)
}
}
// 指标收集器
class MetricsCollector {
constructor() {
this.metrics = {
requests: 0,
errors: 0,
totalLatency: 0,
averageLatency: 0,
statusCodes: {},
routes: {}
}
}
async start() {
console.log('指标收集器启动')
}
async stop() {
console.log('指标收集器停止')
}
recordRequest(context, response, latency) {
this.metrics.requests++
this.metrics.totalLatency += latency
this.metrics.averageLatency = this.metrics.totalLatency / this.metrics.requests
const statusCode = response.statusCode
this.metrics.statusCodes[statusCode] = (this.metrics.statusCodes[statusCode] || 0) + 1
const routePath = context.route.path
if (!this.metrics.routes[routePath]) {
this.metrics.routes[routePath] = { requests: 0, errors: 0, latency: 0 }
}
this.metrics.routes[routePath].requests++
this.metrics.routes[routePath].latency += latency
}
recordError(context, error, latency) {
this.metrics.errors++
if (context.route) {
const routePath = context.route.path
if (!this.metrics.routes[routePath]) {
this.metrics.routes[routePath] = { requests: 0, errors: 0, latency: 0 }
}
this.metrics.routes[routePath].errors++
}
}
getMetrics() {
return {
...this.metrics,
errorRate: this.metrics.requests > 0 ? this.metrics.errors / this.metrics.requests : 0
}
}
}
// 使用示例
async function demonstrateAPIGateway() {
console.log('=== API网关演示 ===')
const gateway = new APIGateway({ port: 8080 })
// 注册服务
gateway.serviceRegistry.register('user-service', [
{ endpoint: 'http://user-service-1:8080', healthy: true },
{ endpoint: 'http://user-service-2:8080', healthy: true }
])
gateway.serviceRegistry.register('order-service', [
{ endpoint: 'http://order-service:8081', healthy: true }
])
// 添加路由
gateway.routeManager.addRoute({
path: '/api/users/{id}',
method: 'GET',
service: 'user-service',
backend: { path: '/users/{id}' },
authentication: true,
rateLimit: { limit: 100, window: 60, key: 'user' },
timeout: 5000
})
gateway.routeManager.addRoute({
path: '/api/orders',
method: 'POST',
service: 'order-service',
backend: { path: '/orders' },
authentication: true,
authorization: { roles: ['user', 'admin'] },
validation: {
body: {
type: 'object',
required: ['customerId', 'items']
}
}
})
// 启动网关
await gateway.start()
console.log('API网关状态:', gateway.getStatus())
return gateway
}
module.exports = {
APIGateway,
ServiceRegistry,
AuthenticationManager,
RateLimiter,
CircuitBreaker,
MetricsCollector
}
📚 最佳实践总结
- 统一入口:所有客户端请求通过API网关统一处理
- 服务发现:动态发现和路由到后端服务实例
- 负载均衡:智能分发请求到健康的服务实例
- 安全认证:统一的身份认证和权限控制
- 限流保护:防止服务过载和恶意攻击
- 熔断降级:快速失败和服务降级机制
- 监控告警:全面的请求监控和性能指标
- 配置管理:灵活的路由和策略配置
通过掌握API网关技术,您将能够构建高性能、高可用的微服务入口系统。