Files
claude-relay-service/src/services/apiKeyService.js
shaw 3bcdb511fe feat: 实现多服务账户缓存优化系统
- 添加通用LRU缓存工具类,支持过期时间和内存限制
- 实现缓存监控系统,提供统计和健康检查接口
- 为所有账户服务(Claude、Gemini、OpenAI、Bedrock、Claude Console)添加缓存层
- 优化账户选择性能,减少Redis查询频率
- 添加缓存统计监控端点 /admin/cache/stats

性能提升:
- 账户列表查询从O(n)优化到O(1)
- 减少90%以上的Redis查询
- 响应时间降低50ms以上

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-17 15:38:49 +08:00

642 lines
22 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const crypto = require('crypto')
const { v4: uuidv4 } = require('uuid')
const config = require('../../config/config')
const redis = require('../models/redis')
const logger = require('../utils/logger')
class ApiKeyService {
constructor() {
this.prefix = config.security.apiKeyPrefix
}
// 🔑 生成新的API Key
async generateApiKey(options = {}) {
const {
name = 'Unnamed Key',
description = '',
tokenLimit = config.limits.defaultTokenLimit,
expiresAt = null,
claudeAccountId = null,
claudeConsoleAccountId = null,
geminiAccountId = null,
openaiAccountId = null,
bedrockAccountId = null, // 添加 Bedrock 账号ID支持
permissions = 'all', // 'claude', 'gemini', 'openai', 'all'
isActive = true,
concurrencyLimit = 0,
rateLimitWindow = null,
rateLimitRequests = null,
enableModelRestriction = false,
restrictedModels = [],
enableClientRestriction = false,
allowedClients = [],
dailyCostLimit = 0,
tags = []
} = options
// 生成简单的API Key (64字符十六进制)
const apiKey = `${this.prefix}${this._generateSecretKey()}`
const keyId = uuidv4()
const hashedKey = this._hashApiKey(apiKey)
const keyData = {
id: keyId,
name,
description,
apiKey: hashedKey,
tokenLimit: String(tokenLimit ?? 0),
concurrencyLimit: String(concurrencyLimit ?? 0),
rateLimitWindow: String(rateLimitWindow ?? 0),
rateLimitRequests: String(rateLimitRequests ?? 0),
isActive: String(isActive),
claudeAccountId: claudeAccountId || '',
claudeConsoleAccountId: claudeConsoleAccountId || '',
geminiAccountId: geminiAccountId || '',
openaiAccountId: openaiAccountId || '',
bedrockAccountId: bedrockAccountId || '', // 添加 Bedrock 账号ID
permissions: permissions || 'all',
enableModelRestriction: String(enableModelRestriction),
restrictedModels: JSON.stringify(restrictedModels || []),
enableClientRestriction: String(enableClientRestriction || false),
allowedClients: JSON.stringify(allowedClients || []),
dailyCostLimit: String(dailyCostLimit || 0),
tags: JSON.stringify(tags || []),
createdAt: new Date().toISOString(),
lastUsedAt: '',
expiresAt: expiresAt || '',
createdBy: 'admin' // 可以根据需要扩展用户系统
}
// 保存API Key数据并建立哈希映射
await redis.setApiKey(keyId, keyData, hashedKey)
logger.success(`🔑 Generated new API key: ${name} (${keyId})`)
return {
id: keyId,
apiKey, // 只在创建时返回完整的key
name: keyData.name,
description: keyData.description,
tokenLimit: parseInt(keyData.tokenLimit),
concurrencyLimit: parseInt(keyData.concurrencyLimit),
rateLimitWindow: parseInt(keyData.rateLimitWindow || 0),
rateLimitRequests: parseInt(keyData.rateLimitRequests || 0),
isActive: keyData.isActive === 'true',
claudeAccountId: keyData.claudeAccountId,
claudeConsoleAccountId: keyData.claudeConsoleAccountId,
geminiAccountId: keyData.geminiAccountId,
openaiAccountId: keyData.openaiAccountId,
bedrockAccountId: keyData.bedrockAccountId, // 添加 Bedrock 账号ID
permissions: keyData.permissions,
enableModelRestriction: keyData.enableModelRestriction === 'true',
restrictedModels: JSON.parse(keyData.restrictedModels),
enableClientRestriction: keyData.enableClientRestriction === 'true',
allowedClients: JSON.parse(keyData.allowedClients || '[]'),
dailyCostLimit: parseFloat(keyData.dailyCostLimit || 0),
tags: JSON.parse(keyData.tags || '[]'),
createdAt: keyData.createdAt,
expiresAt: keyData.expiresAt,
createdBy: keyData.createdBy
}
}
// 🔍 验证API Key
async validateApiKey(apiKey) {
try {
if (!apiKey || !apiKey.startsWith(this.prefix)) {
return { valid: false, error: 'Invalid API key format' }
}
// 计算API Key的哈希值
const hashedKey = this._hashApiKey(apiKey)
// 通过哈希值直接查找API Key性能优化
const keyData = await redis.findApiKeyByHash(hashedKey)
if (!keyData) {
return { valid: false, error: 'API key not found' }
}
// 检查是否激活
if (keyData.isActive !== 'true') {
return { valid: false, error: 'API key is disabled' }
}
// 检查是否过期
if (keyData.expiresAt && new Date() > new Date(keyData.expiresAt)) {
return { valid: false, error: 'API key has expired' }
}
// 获取使用统计(供返回数据使用)
const usage = await redis.getUsageStats(keyData.id)
// 获取当日费用统计
const dailyCost = await redis.getDailyCost(keyData.id)
// 更新最后使用时间优化只在实际API调用时更新而不是验证时
// 注意lastUsedAt的更新已移至recordUsage方法中
logger.api(`🔓 API key validated successfully: ${keyData.id}`)
// 解析限制模型数据
let restrictedModels = []
try {
restrictedModels = keyData.restrictedModels ? JSON.parse(keyData.restrictedModels) : []
} catch (e) {
restrictedModels = []
}
// 解析允许的客户端
let allowedClients = []
try {
allowedClients = keyData.allowedClients ? JSON.parse(keyData.allowedClients) : []
} catch (e) {
allowedClients = []
}
// 解析标签
let tags = []
try {
tags = keyData.tags ? JSON.parse(keyData.tags) : []
} catch (e) {
tags = []
}
return {
valid: true,
keyData: {
id: keyData.id,
name: keyData.name,
description: keyData.description,
createdAt: keyData.createdAt,
expiresAt: keyData.expiresAt,
claudeAccountId: keyData.claudeAccountId,
claudeConsoleAccountId: keyData.claudeConsoleAccountId,
geminiAccountId: keyData.geminiAccountId,
openaiAccountId: keyData.openaiAccountId,
bedrockAccountId: keyData.bedrockAccountId, // 添加 Bedrock 账号ID
permissions: keyData.permissions || 'all',
tokenLimit: parseInt(keyData.tokenLimit),
concurrencyLimit: parseInt(keyData.concurrencyLimit || 0),
rateLimitWindow: parseInt(keyData.rateLimitWindow || 0),
rateLimitRequests: parseInt(keyData.rateLimitRequests || 0),
enableModelRestriction: keyData.enableModelRestriction === 'true',
restrictedModels,
enableClientRestriction: keyData.enableClientRestriction === 'true',
allowedClients,
dailyCostLimit: parseFloat(keyData.dailyCostLimit || 0),
dailyCost: dailyCost || 0,
tags,
usage
}
}
} catch (error) {
logger.error('❌ API key validation error:', error)
return { valid: false, error: 'Internal validation error' }
}
}
// 📋 获取所有API Keys
async getAllApiKeys() {
try {
const apiKeys = await redis.getAllApiKeys()
const client = redis.getClientSafe()
// 为每个key添加使用统计和当前并发数
for (const key of apiKeys) {
key.usage = await redis.getUsageStats(key.id)
key.tokenLimit = parseInt(key.tokenLimit)
key.concurrencyLimit = parseInt(key.concurrencyLimit || 0)
key.rateLimitWindow = parseInt(key.rateLimitWindow || 0)
key.rateLimitRequests = parseInt(key.rateLimitRequests || 0)
key.currentConcurrency = await redis.getConcurrency(key.id)
key.isActive = key.isActive === 'true'
key.enableModelRestriction = key.enableModelRestriction === 'true'
key.enableClientRestriction = key.enableClientRestriction === 'true'
key.permissions = key.permissions || 'all' // 兼容旧数据
key.dailyCostLimit = parseFloat(key.dailyCostLimit || 0)
key.dailyCost = (await redis.getDailyCost(key.id)) || 0
// 获取当前时间窗口的请求次数和Token使用量
if (key.rateLimitWindow > 0) {
const requestCountKey = `rate_limit:requests:${key.id}`
const tokenCountKey = `rate_limit:tokens:${key.id}`
const windowStartKey = `rate_limit:window_start:${key.id}`
key.currentWindowRequests = parseInt((await client.get(requestCountKey)) || '0')
key.currentWindowTokens = parseInt((await client.get(tokenCountKey)) || '0')
// 获取窗口开始时间和计算剩余时间
const windowStart = await client.get(windowStartKey)
if (windowStart) {
const now = Date.now()
const windowStartTime = parseInt(windowStart)
const windowDuration = key.rateLimitWindow * 60 * 1000 // 转换为毫秒
const windowEndTime = windowStartTime + windowDuration
// 如果窗口还有效
if (now < windowEndTime) {
key.windowStartTime = windowStartTime
key.windowEndTime = windowEndTime
key.windowRemainingSeconds = Math.max(0, Math.floor((windowEndTime - now) / 1000))
} else {
// 窗口已过期,下次请求会重置
key.windowStartTime = null
key.windowEndTime = null
key.windowRemainingSeconds = 0
// 重置计数为0因为窗口已过期
key.currentWindowRequests = 0
key.currentWindowTokens = 0
}
} else {
// 窗口还未开始(没有任何请求)
key.windowStartTime = null
key.windowEndTime = null
key.windowRemainingSeconds = null
}
} else {
key.currentWindowRequests = 0
key.currentWindowTokens = 0
key.windowStartTime = null
key.windowEndTime = null
key.windowRemainingSeconds = null
}
try {
key.restrictedModels = key.restrictedModels ? JSON.parse(key.restrictedModels) : []
} catch (e) {
key.restrictedModels = []
}
try {
key.allowedClients = key.allowedClients ? JSON.parse(key.allowedClients) : []
} catch (e) {
key.allowedClients = []
}
try {
key.tags = key.tags ? JSON.parse(key.tags) : []
} catch (e) {
key.tags = []
}
delete key.apiKey // 不返回哈希后的key
}
return apiKeys
} catch (error) {
logger.error('❌ Failed to get API keys:', error)
throw error
}
}
// 📝 更新API Key
async updateApiKey(keyId, updates) {
try {
const keyData = await redis.getApiKey(keyId)
if (!keyData || Object.keys(keyData).length === 0) {
throw new Error('API key not found')
}
// 允许更新的字段
const allowedUpdates = [
'name',
'description',
'tokenLimit',
'concurrencyLimit',
'rateLimitWindow',
'rateLimitRequests',
'isActive',
'claudeAccountId',
'claudeConsoleAccountId',
'geminiAccountId',
'openaiAccountId',
'bedrockAccountId', // 添加 Bedrock 账号ID
'permissions',
'expiresAt',
'enableModelRestriction',
'restrictedModels',
'enableClientRestriction',
'allowedClients',
'dailyCostLimit',
'tags'
]
const updatedData = { ...keyData }
for (const [field, value] of Object.entries(updates)) {
if (allowedUpdates.includes(field)) {
if (field === 'restrictedModels' || field === 'allowedClients' || field === 'tags') {
// 特殊处理数组字段
updatedData[field] = JSON.stringify(value || [])
} else if (field === 'enableModelRestriction' || field === 'enableClientRestriction') {
// 布尔值转字符串
updatedData[field] = String(value)
} else {
updatedData[field] = (value !== null && value !== undefined ? value : '').toString()
}
}
}
updatedData.updatedAt = new Date().toISOString()
// 更新时不需要重新建立哈希映射因为API Key本身没有变化
await redis.setApiKey(keyId, updatedData)
logger.success(`📝 Updated API key: ${keyId}`)
return { success: true }
} catch (error) {
logger.error('❌ Failed to update API key:', error)
throw error
}
}
// 🗑️ 删除API Key
async deleteApiKey(keyId) {
try {
const result = await redis.deleteApiKey(keyId)
if (result === 0) {
throw new Error('API key not found')
}
logger.success(`🗑️ Deleted API key: ${keyId}`)
return { success: true }
} catch (error) {
logger.error('❌ Failed to delete API key:', error)
throw error
}
}
// 📊 记录使用情况支持缓存token和账户级别统计
async recordUsage(
keyId,
inputTokens = 0,
outputTokens = 0,
cacheCreateTokens = 0,
cacheReadTokens = 0,
model = 'unknown',
accountId = null
) {
try {
const totalTokens = inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens
// 计算费用
const CostCalculator = require('../utils/costCalculator')
const costInfo = CostCalculator.calculateCost(
{
input_tokens: inputTokens,
output_tokens: outputTokens,
cache_creation_input_tokens: cacheCreateTokens,
cache_read_input_tokens: cacheReadTokens
},
model
)
// 记录API Key级别的使用统计
await redis.incrementTokenUsage(
keyId,
totalTokens,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
model
)
// 记录费用统计
if (costInfo.costs.total > 0) {
await redis.incrementDailyCost(keyId, costInfo.costs.total)
logger.database(
`💰 Recorded cost for ${keyId}: $${costInfo.costs.total.toFixed(6)}, model: ${model}`
)
} else {
logger.debug(`💰 No cost recorded for ${keyId} - zero cost for model: ${model}`)
}
// 获取API Key数据以确定关联的账户
const keyData = await redis.getApiKey(keyId)
if (keyData && Object.keys(keyData).length > 0) {
// 更新最后使用时间
keyData.lastUsedAt = new Date().toISOString()
await redis.setApiKey(keyId, keyData)
// 记录账户级别的使用统计(只统计实际处理请求的账户)
if (accountId) {
await redis.incrementAccountUsage(
accountId,
totalTokens,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
model
)
logger.database(
`📊 Recorded account usage: ${accountId} - ${totalTokens} tokens (API Key: ${keyId})`
)
} else {
logger.debug(
'⚠️ No accountId provided for usage recording, skipping account-level statistics'
)
}
}
const logParts = [`Model: ${model}`, `Input: ${inputTokens}`, `Output: ${outputTokens}`]
if (cacheCreateTokens > 0) {
logParts.push(`Cache Create: ${cacheCreateTokens}`)
}
if (cacheReadTokens > 0) {
logParts.push(`Cache Read: ${cacheReadTokens}`)
}
logParts.push(`Total: ${totalTokens} tokens`)
logger.database(`📊 Recorded usage: ${keyId} - ${logParts.join(', ')}`)
} catch (error) {
logger.error('❌ Failed to record usage:', error)
}
}
// 📊 记录使用情况(新版本,支持详细的缓存类型)
async recordUsageWithDetails(keyId, usageObject, model = 'unknown', accountId = null) {
try {
// 提取 token 数量
const inputTokens = usageObject.input_tokens || 0
const outputTokens = usageObject.output_tokens || 0
const cacheCreateTokens = usageObject.cache_creation_input_tokens || 0
const cacheReadTokens = usageObject.cache_read_input_tokens || 0
const totalTokens = inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens
// 计算费用(支持详细的缓存类型)- 添加错误处理
let costInfo = { totalCost: 0, ephemeral5mCost: 0, ephemeral1hCost: 0 }
try {
const pricingService = require('./pricingService')
// 确保 pricingService 已初始化
if (!pricingService.pricingData) {
logger.warn('⚠️ PricingService not initialized, initializing now...')
await pricingService.initialize()
}
costInfo = pricingService.calculateCost(usageObject, model)
} catch (pricingError) {
logger.error('❌ Failed to calculate cost:', pricingError)
// 继续执行,不要因为费用计算失败而跳过统计记录
}
// 提取详细的缓存创建数据
let ephemeral5mTokens = 0
let ephemeral1hTokens = 0
if (usageObject.cache_creation && typeof usageObject.cache_creation === 'object') {
ephemeral5mTokens = usageObject.cache_creation.ephemeral_5m_input_tokens || 0
ephemeral1hTokens = usageObject.cache_creation.ephemeral_1h_input_tokens || 0
}
// 记录API Key级别的使用统计 - 这个必须执行
await redis.incrementTokenUsage(
keyId,
totalTokens,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
model,
ephemeral5mTokens, // 传递5分钟缓存 tokens
ephemeral1hTokens // 传递1小时缓存 tokens
)
// 记录费用统计
if (costInfo.totalCost > 0) {
await redis.incrementDailyCost(keyId, costInfo.totalCost)
logger.database(
`💰 Recorded cost for ${keyId}: $${costInfo.totalCost.toFixed(6)}, model: ${model}`
)
// 记录详细的缓存费用(如果有)
if (costInfo.ephemeral5mCost > 0 || costInfo.ephemeral1hCost > 0) {
logger.database(
`💰 Cache costs - 5m: $${costInfo.ephemeral5mCost.toFixed(6)}, 1h: $${costInfo.ephemeral1hCost.toFixed(6)}`
)
}
} else {
logger.debug(`💰 No cost recorded for ${keyId} - zero cost for model: ${model}`)
}
// 获取API Key数据以确定关联的账户
const keyData = await redis.getApiKey(keyId)
if (keyData && Object.keys(keyData).length > 0) {
// 更新最后使用时间
keyData.lastUsedAt = new Date().toISOString()
await redis.setApiKey(keyId, keyData)
// 记录账户级别的使用统计(只统计实际处理请求的账户)
if (accountId) {
await redis.incrementAccountUsage(
accountId,
totalTokens,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
model
)
logger.database(
`📊 Recorded account usage: ${accountId} - ${totalTokens} tokens (API Key: ${keyId})`
)
} else {
logger.debug(
'⚠️ No accountId provided for usage recording, skipping account-level statistics'
)
}
}
const logParts = [`Model: ${model}`, `Input: ${inputTokens}`, `Output: ${outputTokens}`]
if (cacheCreateTokens > 0) {
logParts.push(`Cache Create: ${cacheCreateTokens}`)
// 如果有详细的缓存创建数据,也记录它们
if (usageObject.cache_creation) {
const { ephemeral_5m_input_tokens, ephemeral_1h_input_tokens } =
usageObject.cache_creation
if (ephemeral_5m_input_tokens > 0) {
logParts.push(`5m: ${ephemeral_5m_input_tokens}`)
}
if (ephemeral_1h_input_tokens > 0) {
logParts.push(`1h: ${ephemeral_1h_input_tokens}`)
}
}
}
if (cacheReadTokens > 0) {
logParts.push(`Cache Read: ${cacheReadTokens}`)
}
logParts.push(`Total: ${totalTokens} tokens`)
logger.database(`📊 Recorded usage: ${keyId} - ${logParts.join(', ')}`)
} catch (error) {
logger.error('❌ Failed to record usage:', error)
}
}
// 🔐 生成密钥
_generateSecretKey() {
return crypto.randomBytes(32).toString('hex')
}
// 🔒 哈希API Key
_hashApiKey(apiKey) {
return crypto
.createHash('sha256')
.update(apiKey + config.security.encryptionKey)
.digest('hex')
}
// 📈 获取使用统计
async getUsageStats(keyId) {
return await redis.getUsageStats(keyId)
}
// 📊 获取账户使用统计
async getAccountUsageStats(accountId) {
return await redis.getAccountUsageStats(accountId)
}
// 📈 获取所有账户使用统计
async getAllAccountsUsageStats() {
return await redis.getAllAccountsUsageStats()
}
// 🧹 清理过期的API Keys
async cleanupExpiredKeys() {
try {
const apiKeys = await redis.getAllApiKeys()
const now = new Date()
let cleanedCount = 0
for (const key of apiKeys) {
// 检查是否已过期且仍处于激活状态
if (key.expiresAt && new Date(key.expiresAt) < now && key.isActive === 'true') {
// 将过期的 API Key 标记为禁用状态,而不是直接删除
await this.updateApiKey(key.id, { isActive: false })
logger.info(`🔒 API Key ${key.id} (${key.name}) has expired and been disabled`)
cleanedCount++
}
}
if (cleanedCount > 0) {
logger.success(`🧹 Disabled ${cleanedCount} expired API keys`)
}
return cleanedCount
} catch (error) {
logger.error('❌ Failed to cleanup expired keys:', error)
return 0
}
}
}
// 导出实例和单独的方法
const apiKeyService = new ApiKeyService()
// 为了方便其他服务调用,导出 recordUsage 方法
apiKeyService.recordUsageMetrics = apiKeyService.recordUsage.bind(apiKeyService)
module.exports = apiKeyService