feat: 添加 CCR (Claude Code Router) 账户类型支持

实现通过供应商前缀语法进行 CCR 后端路由的完整支持。
用户现在可以在 Claude Code 中使用 `/model ccr,model_name` 将请求路由到 CCR 后端。
暂时没有实现`/v1/messages/count_tokens`,因为这需要在CCR后端支持。
CCR类型的账户也暂时没有考虑模型的支持情况

## 核心实现

### 供应商前缀路由

- 添加 modelHelper 工具用于解析模型名称中的 `ccr,` 供应商前缀
- 检测到前缀时自动路由到 CCR 账户池
- 转发到 CCR 后端前移除供应商前缀

### 账户管理

- 创建 ccrAccountService 实现 CCR 账户的完整 CRUD 操作
- 支持账户属性:名称、API URL、API Key、代理、优先级、配额
- 实现账户状态:active、rate_limited、unauthorized、overloaded
- 支持模型映射和支持模型配置

### 请求转发

- 实现 ccrRelayService 处理 CCR 后端通信
- 支持流式和非流式请求
- 从 SSE 流中解析和捕获使用数据
- 支持 Bearer 和 x-api-key 两种认证格式

### 统一调度

- 将 CCR 账户集成到 unifiedClaudeScheduler
- 添加 \_selectCcrAccount 方法用于 CCR 特定账户选择
- 支持 CCR 账户的会话粘性
- 防止跨类型会话映射(CCR 会话仅用于 CCR 请求)

### 错误处理

- 实现全面的错误状态管理
- 处理 401(未授权)、429(速率限制)、529(过载)错误
- 成功请求后自动从错误状态恢复
- 支持可配置的速率限制持续时间

### Web 管理界面

- 添加 CcrAccountForm 组件用于创建/编辑 CCR 账户
- 将 CCR 账户集成到 AccountsView 中,提供完整管理功能
- 支持账户切换、重置和使用统计
- 在界面中显示账户状态和错误信息

### API 端点

- POST /admin/ccr-accounts - 创建 CCR 账户
- GET /admin/ccr-accounts - 列出所有 CCR 账户
- PUT /admin/ccr-accounts/:id - 更新 CCR 账户
- DELETE /admin/ccr-accounts/:id - 删除 CCR 账户
- PUT /admin/ccr-accounts/:id/toggle - 切换账户启用状态
- PUT /admin/ccr-accounts/:id/toggle-schedulable - 切换可调度状态
- POST /admin/ccr-accounts/:id/reset-usage - 重置每日使用量
- POST /admin/ccr-accounts/:id/reset-status - 重置错误状态

## 技术细节

- CCR 账户使用 'ccr' 作为 accountType 标识符
- 带有 `ccr,` 前缀的请求绕过普通账户池
- 转发到 CCR 后端前清理模型名称内的`ccr,`
- 从流式和非流式响应中捕获使用数据
- 支持缓存令牌跟踪(创建和读取)
This commit is contained in:
sususu98
2025-09-10 14:21:15 +08:00
parent 1c3b74f45b
commit 7f9869ae20
11 changed files with 3117 additions and 52 deletions

View File

@@ -483,6 +483,10 @@ class ApiKeyService {
} catch (e) {
key.tags = []
}
// 不暴露已弃用字段
if (Object.prototype.hasOwnProperty.call(key, 'ccrAccountId')) {
delete key.ccrAccountId
}
delete key.apiKey // 不返回哈希后的key
}
@@ -846,8 +850,11 @@ class ApiKeyService {
return // 不是 Opus 模型,直接返回
}
// 判断是否为 claudeclaude-console 账户
if (!accountType || (accountType !== 'claude' && accountType !== 'claude-console')) {
// 判断是否为 claudeclaude-console 或 ccr 账户
if (
!accountType ||
(accountType !== 'claude' && accountType !== 'claude-console' && accountType !== 'ccr')
) {
logger.debug(`⚠️ Skipping Opus cost recording for non-Claude account type: ${accountType}`)
return // 不是 claude 账户,直接返回
}

View File

@@ -0,0 +1,903 @@
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const ProxyHelper = require('../utils/proxyHelper')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const config = require('../../config/config')
const LRUCache = require('../utils/lruCache')
class CcrAccountService {
constructor() {
// 加密相关常量
this.ENCRYPTION_ALGORITHM = 'aes-256-cbc'
this.ENCRYPTION_SALT = 'ccr-account-salt'
// Redis键前缀
this.ACCOUNT_KEY_PREFIX = 'ccr_account:'
this.SHARED_ACCOUNTS_KEY = 'shared_ccr_accounts'
// 🚀 性能优化:缓存派生的加密密钥,避免每次重复计算
// scryptSync 是 CPU 密集型操作,缓存可以减少 95%+ 的 CPU 密集型操作
this._encryptionKeyCache = null
// 🔄 解密结果缓存,提高解密性能
this._decryptCache = new LRUCache(500)
// 🧹 定期清理缓存每10分钟
setInterval(
() => {
this._decryptCache.cleanup()
logger.info('🧹 CCR account decrypt cache cleanup completed', this._decryptCache.getStats())
},
10 * 60 * 1000
)
}
// 🏢 创建CCR账户
async createAccount(options = {}) {
const {
name = 'CCR Account',
description = '',
apiUrl = '',
apiKey = '',
priority = 50, // 默认优先级501-100
supportedModels = [], // 支持的模型列表或映射表,空数组/对象表示支持所有
userAgent = 'claude-relay-service/1.0.0',
rateLimitDuration = 60, // 限流时间(分钟)
proxy = null,
isActive = true,
accountType = 'shared', // 'dedicated' or 'shared'
schedulable = true, // 是否可被调度
dailyQuota = 0, // 每日额度限制美元0表示不限制
quotaResetTime = '00:00' // 额度重置时间HH:mm格式
} = options
// 验证必填字段
if (!apiUrl || !apiKey) {
throw new Error('API URL and API Key are required for CCR account')
}
const accountId = uuidv4()
// 处理 supportedModels确保向后兼容
const processedModels = this._processModelMapping(supportedModels)
const accountData = {
id: accountId,
platform: 'ccr',
name,
description,
apiUrl,
apiKey: this._encryptSensitiveData(apiKey),
priority: priority.toString(),
supportedModels: JSON.stringify(processedModels),
userAgent,
rateLimitDuration: rateLimitDuration.toString(),
proxy: proxy ? JSON.stringify(proxy) : '',
isActive: isActive.toString(),
accountType,
createdAt: new Date().toISOString(),
lastUsedAt: '',
status: 'active',
errorMessage: '',
// 限流相关
rateLimitedAt: '',
rateLimitStatus: '',
// 调度控制
schedulable: schedulable.toString(),
// 额度管理相关
dailyQuota: dailyQuota.toString(), // 每日额度限制(美元)
dailyUsage: '0', // 当日使用金额(美元)
// 使用与统计一致的时区日期,避免边界问题
lastResetDate: redis.getDateStringInTimezone(), // 最后重置日期(按配置时区)
quotaResetTime, // 额度重置时间
quotaStoppedAt: '' // 因额度停用的时间
}
const client = redis.getClientSafe()
logger.debug(
`[DEBUG] Saving CCR account data to Redis with key: ${this.ACCOUNT_KEY_PREFIX}${accountId}`
)
logger.debug(`[DEBUG] CCR Account data to save: ${JSON.stringify(accountData, null, 2)}`)
await client.hset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, accountData)
// 如果是共享账户,添加到共享账户集合
if (accountType === 'shared') {
await client.sadd(this.SHARED_ACCOUNTS_KEY, accountId)
}
logger.success(`🏢 Created CCR account: ${name} (${accountId})`)
return {
id: accountId,
name,
description,
apiUrl,
priority,
supportedModels,
userAgent,
rateLimitDuration,
isActive,
proxy,
accountType,
status: 'active',
createdAt: accountData.createdAt,
dailyQuota,
dailyUsage: 0,
lastResetDate: accountData.lastResetDate,
quotaResetTime,
quotaStoppedAt: null
}
}
// 📋 获取所有CCR账户
async getAllAccounts() {
try {
const client = redis.getClientSafe()
const keys = await client.keys(`${this.ACCOUNT_KEY_PREFIX}*`)
const accounts = []
for (const key of keys) {
const accountData = await client.hgetall(key)
if (accountData && Object.keys(accountData).length > 0) {
// 获取限流状态信息
const rateLimitInfo = this._getRateLimitInfo(accountData)
accounts.push({
id: accountData.id,
platform: accountData.platform,
name: accountData.name,
description: accountData.description,
apiUrl: accountData.apiUrl,
priority: parseInt(accountData.priority) || 50,
supportedModels: JSON.parse(accountData.supportedModels || '[]'),
userAgent: accountData.userAgent,
rateLimitDuration: Number.isNaN(parseInt(accountData.rateLimitDuration))
? 60
: parseInt(accountData.rateLimitDuration),
isActive: accountData.isActive === 'true',
proxy: accountData.proxy ? JSON.parse(accountData.proxy) : null,
accountType: accountData.accountType || 'shared',
createdAt: accountData.createdAt,
lastUsedAt: accountData.lastUsedAt,
status: accountData.status || 'active',
errorMessage: accountData.errorMessage,
rateLimitInfo,
schedulable: accountData.schedulable !== 'false', // 默认为true只有明确设置为false才不可调度
// 额度管理相关
dailyQuota: parseFloat(accountData.dailyQuota || '0'),
dailyUsage: parseFloat(accountData.dailyUsage || '0'),
lastResetDate: accountData.lastResetDate || '',
quotaResetTime: accountData.quotaResetTime || '00:00',
quotaStoppedAt: accountData.quotaStoppedAt || null
})
}
}
return accounts
} catch (error) {
logger.error('❌ Failed to get CCR accounts:', error)
throw error
}
}
// 🔍 获取单个账户(内部使用,包含敏感信息)
async getAccount(accountId) {
const client = redis.getClientSafe()
logger.debug(`[DEBUG] Getting CCR account data for ID: ${accountId}`)
const accountData = await client.hgetall(`${this.ACCOUNT_KEY_PREFIX}${accountId}`)
if (!accountData || Object.keys(accountData).length === 0) {
logger.debug(`[DEBUG] No CCR account data found for ID: ${accountId}`)
return null
}
logger.debug(`[DEBUG] Raw CCR account data keys: ${Object.keys(accountData).join(', ')}`)
logger.debug(`[DEBUG] Raw supportedModels value: ${accountData.supportedModels}`)
// 解密敏感字段只解密apiKeyapiUrl不加密
const decryptedKey = this._decryptSensitiveData(accountData.apiKey)
logger.debug(
`[DEBUG] URL exists: ${!!accountData.apiUrl}, Decrypted key exists: ${!!decryptedKey}`
)
accountData.apiKey = decryptedKey
// 解析JSON字段
const parsedModels = JSON.parse(accountData.supportedModels || '[]')
logger.debug(`[DEBUG] Parsed supportedModels: ${JSON.stringify(parsedModels)}`)
accountData.supportedModels = parsedModels
accountData.priority = parseInt(accountData.priority) || 50
{
const _parsedDuration = parseInt(accountData.rateLimitDuration)
accountData.rateLimitDuration = Number.isNaN(_parsedDuration) ? 60 : _parsedDuration
}
accountData.isActive = accountData.isActive === 'true'
accountData.schedulable = accountData.schedulable !== 'false' // 默认为true
if (accountData.proxy) {
accountData.proxy = JSON.parse(accountData.proxy)
}
logger.debug(
`[DEBUG] Final CCR account data - name: ${accountData.name}, hasApiUrl: ${!!accountData.apiUrl}, hasApiKey: ${!!accountData.apiKey}, supportedModels: ${JSON.stringify(accountData.supportedModels)}`
)
return accountData
}
// 📝 更新账户
async updateAccount(accountId, updates) {
try {
const existingAccount = await this.getAccount(accountId)
if (!existingAccount) {
throw new Error('CCR Account not found')
}
const client = redis.getClientSafe()
const updatedData = {}
// 处理各个字段的更新
logger.debug(
`[DEBUG] CCR update request received with fields: ${Object.keys(updates).join(', ')}`
)
logger.debug(`[DEBUG] CCR Updates content: ${JSON.stringify(updates, null, 2)}`)
if (updates.name !== undefined) {
updatedData.name = updates.name
}
if (updates.description !== undefined) {
updatedData.description = updates.description
}
if (updates.apiUrl !== undefined) {
updatedData.apiUrl = updates.apiUrl
}
if (updates.apiKey !== undefined) {
updatedData.apiKey = this._encryptSensitiveData(updates.apiKey)
}
if (updates.priority !== undefined) {
updatedData.priority = updates.priority.toString()
}
if (updates.supportedModels !== undefined) {
logger.debug(`[DEBUG] Updating supportedModels: ${JSON.stringify(updates.supportedModels)}`)
// 处理 supportedModels确保向后兼容
const processedModels = this._processModelMapping(updates.supportedModels)
updatedData.supportedModels = JSON.stringify(processedModels)
}
if (updates.userAgent !== undefined) {
updatedData.userAgent = updates.userAgent
}
if (updates.rateLimitDuration !== undefined) {
updatedData.rateLimitDuration = updates.rateLimitDuration.toString()
}
if (updates.proxy !== undefined) {
updatedData.proxy = updates.proxy ? JSON.stringify(updates.proxy) : ''
}
if (updates.isActive !== undefined) {
updatedData.isActive = updates.isActive.toString()
}
if (updates.schedulable !== undefined) {
updatedData.schedulable = updates.schedulable.toString()
}
if (updates.dailyQuota !== undefined) {
updatedData.dailyQuota = updates.dailyQuota.toString()
}
if (updates.quotaResetTime !== undefined) {
updatedData.quotaResetTime = updates.quotaResetTime
}
await client.hset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, updatedData)
// 处理共享账户集合变更
if (updates.accountType !== undefined) {
updatedData.accountType = updates.accountType
if (updates.accountType === 'shared') {
await client.sadd(this.SHARED_ACCOUNTS_KEY, accountId)
} else {
await client.srem(this.SHARED_ACCOUNTS_KEY, accountId)
}
}
logger.success(`📝 Updated CCR account: ${accountId}`)
return await this.getAccount(accountId)
} catch (error) {
logger.error(`❌ Failed to update CCR account ${accountId}:`, error)
throw error
}
}
// 🗑️ 删除账户
async deleteAccount(accountId) {
try {
const client = redis.getClientSafe()
// 从共享账户集合中移除
await client.srem(this.SHARED_ACCOUNTS_KEY, accountId)
// 删除账户数据
const result = await client.del(`${this.ACCOUNT_KEY_PREFIX}${accountId}`)
if (result === 0) {
throw new Error('CCR Account not found or already deleted')
}
logger.success(`🗑️ Deleted CCR account: ${accountId}`)
return { success: true }
} catch (error) {
logger.error(`❌ Failed to delete CCR account ${accountId}:`, error)
throw error
}
}
// 🚫 标记账户为限流状态
async markAccountRateLimited(accountId) {
try {
const client = redis.getClientSafe()
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('CCR Account not found')
}
// 如果限流时间设置为 0表示不启用限流机制直接返回
if (account.rateLimitDuration === 0) {
logger.info(
` CCR account ${account.name} (${accountId}) has rate limiting disabled, skipping rate limit`
)
return { success: true, skipped: true }
}
const now = new Date().toISOString()
await client.hmset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, {
status: 'rate_limited',
rateLimitedAt: now,
rateLimitStatus: 'active',
errorMessage: 'Rate limited by upstream service'
})
logger.warn(`⏱️ Marked CCR account as rate limited: ${account.name} (${accountId})`)
return { success: true, rateLimitedAt: now }
} catch (error) {
logger.error(`❌ Failed to mark CCR account as rate limited: ${accountId}`, error)
throw error
}
}
// ✅ 移除账户限流状态
async removeAccountRateLimit(accountId) {
try {
const client = redis.getClientSafe()
const accountKey = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 获取账户当前状态和额度信息
const [, quotaStoppedAt] = await client.hmget(accountKey, 'status', 'quotaStoppedAt')
// 删除限流相关字段
await client.hdel(accountKey, 'rateLimitedAt', 'rateLimitStatus')
// 根据不同情况决定是否恢复账户
let newStatus = 'active'
let errorMessage = ''
// 如果因额度问题停用,不要自动激活
if (quotaStoppedAt) {
newStatus = 'quota_exceeded'
errorMessage = 'Account stopped due to quota exceeded'
logger.info(
` CCR account ${accountId} rate limit removed but remains stopped due to quota exceeded`
)
} else {
logger.success(`✅ Removed rate limit for CCR account: ${accountId}`)
}
await client.hmset(accountKey, {
status: newStatus,
errorMessage
})
return { success: true, newStatus }
} catch (error) {
logger.error(`❌ Failed to remove rate limit for CCR account: ${accountId}`, error)
throw error
}
}
// 🔍 检查账户是否被限流
async isAccountRateLimited(accountId) {
try {
const client = redis.getClientSafe()
const accountKey = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
const [rateLimitedAt, rateLimitDuration] = await client.hmget(
accountKey,
'rateLimitedAt',
'rateLimitDuration'
)
if (rateLimitedAt) {
const limitTime = new Date(rateLimitedAt)
const duration = parseInt(rateLimitDuration) || 60
const now = new Date()
const expireTime = new Date(limitTime.getTime() + duration * 60 * 1000)
if (now < expireTime) {
return true
} else {
// 限流时间已过,自动移除限流状态
await this.removeAccountRateLimit(accountId)
return false
}
}
return false
} catch (error) {
logger.error(`❌ Failed to check rate limit status for CCR account: ${accountId}`, error)
return false
}
}
// 🔥 标记账户为过载状态
async markAccountOverloaded(accountId) {
try {
const client = redis.getClientSafe()
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('CCR Account not found')
}
const now = new Date().toISOString()
await client.hmset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, {
status: 'overloaded',
overloadedAt: now,
errorMessage: 'Account overloaded'
})
logger.warn(`🔥 Marked CCR account as overloaded: ${account.name} (${accountId})`)
return { success: true, overloadedAt: now }
} catch (error) {
logger.error(`❌ Failed to mark CCR account as overloaded: ${accountId}`, error)
throw error
}
}
// ✅ 移除账户过载状态
async removeAccountOverload(accountId) {
try {
const client = redis.getClientSafe()
const accountKey = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 删除过载相关字段
await client.hdel(accountKey, 'overloadedAt')
await client.hmset(accountKey, {
status: 'active',
errorMessage: ''
})
logger.success(`✅ Removed overload status for CCR account: ${accountId}`)
return { success: true }
} catch (error) {
logger.error(`❌ Failed to remove overload status for CCR account: ${accountId}`, error)
throw error
}
}
// 🔍 检查账户是否过载
async isAccountOverloaded(accountId) {
try {
const client = redis.getClientSafe()
const accountKey = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
const status = await client.hget(accountKey, 'status')
return status === 'overloaded'
} catch (error) {
logger.error(`❌ Failed to check overload status for CCR account: ${accountId}`, error)
return false
}
}
// 🚫 标记账户为未授权状态
async markAccountUnauthorized(accountId) {
try {
const client = redis.getClientSafe()
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('CCR Account not found')
}
await client.hmset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, {
status: 'unauthorized',
errorMessage: 'API key invalid or unauthorized'
})
logger.warn(`🚫 Marked CCR account as unauthorized: ${account.name} (${accountId})`)
return { success: true }
} catch (error) {
logger.error(`❌ Failed to mark CCR account as unauthorized: ${accountId}`, error)
throw error
}
}
// 🔄 处理模型映射
_processModelMapping(supportedModels) {
// 如果是空值,返回空对象(支持所有模型)
if (!supportedModels || (Array.isArray(supportedModels) && supportedModels.length === 0)) {
return {}
}
// 如果已经是对象格式(新的映射表格式),直接返回
if (typeof supportedModels === 'object' && !Array.isArray(supportedModels)) {
return supportedModels
}
// 如果是数组格式(旧格式),转换为映射表
if (Array.isArray(supportedModels)) {
const mapping = {}
supportedModels.forEach((model) => {
if (model && typeof model === 'string') {
mapping[model] = model // 默认映射:原模型名 -> 原模型名
}
})
return mapping
}
return {}
}
// 🔍 检查模型是否被支持
isModelSupported(modelMapping, requestedModel) {
// 如果映射表为空,支持所有模型
if (!modelMapping || Object.keys(modelMapping).length === 0) {
return true
}
// 检查请求的模型是否在映射表的键中
return Object.prototype.hasOwnProperty.call(modelMapping, requestedModel)
}
// 🔄 获取映射后的模型名称
getMappedModel(modelMapping, requestedModel) {
// 如果映射表为空,返回原模型
if (!modelMapping || Object.keys(modelMapping).length === 0) {
return requestedModel
}
// 返回映射后的模型名,如果不存在映射则返回原模型名
return modelMapping[requestedModel] || requestedModel
}
// 🔐 加密敏感数据
_encryptSensitiveData(data) {
if (!data) {
return ''
}
try {
const key = this._generateEncryptionKey()
const iv = crypto.randomBytes(16)
const cipher = crypto.createCipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let encrypted = cipher.update(data, 'utf8', 'hex')
encrypted += cipher.final('hex')
return `${iv.toString('hex')}:${encrypted}`
} catch (error) {
logger.error('❌ CCR encryption error:', error)
return data
}
}
// 🔓 解密敏感数据
_decryptSensitiveData(encryptedData) {
if (!encryptedData) {
return ''
}
// 🎯 检查缓存
const cacheKey = crypto.createHash('sha256').update(encryptedData).digest('hex')
const cached = this._decryptCache.get(cacheKey)
if (cached !== undefined) {
return cached
}
try {
const parts = encryptedData.split(':')
if (parts.length === 2) {
const key = this._generateEncryptionKey()
const iv = Buffer.from(parts[0], 'hex')
const encrypted = parts[1]
const decipher = crypto.createDecipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let decrypted = decipher.update(encrypted, 'hex', 'utf8')
decrypted += decipher.final('utf8')
// 💾 存入缓存5分钟过期
this._decryptCache.set(cacheKey, decrypted, 5 * 60 * 1000)
return decrypted
} else {
logger.error('❌ Invalid CCR encrypted data format')
return encryptedData
}
} catch (error) {
logger.error('❌ CCR decryption error:', error)
return encryptedData
}
}
// 🔑 生成加密密钥
_generateEncryptionKey() {
// 性能优化:缓存密钥派生结果,避免重复的 CPU 密集计算
if (!this._encryptionKeyCache) {
this._encryptionKeyCache = crypto.scryptSync(
config.security.encryptionKey,
this.ENCRYPTION_SALT,
32
)
}
return this._encryptionKeyCache
}
// 🔍 获取限流状态信息
_getRateLimitInfo(accountData) {
const { rateLimitedAt } = accountData
const rateLimitDuration = parseInt(accountData.rateLimitDuration) || 60
if (rateLimitedAt) {
const limitTime = new Date(rateLimitedAt)
const now = new Date()
const expireTime = new Date(limitTime.getTime() + rateLimitDuration * 60 * 1000)
const remainingMs = expireTime.getTime() - now.getTime()
return {
isRateLimited: remainingMs > 0,
rateLimitedAt,
rateLimitExpireAt: expireTime.toISOString(),
remainingTimeMs: Math.max(0, remainingMs),
remainingTimeMinutes: Math.max(0, Math.ceil(remainingMs / (60 * 1000)))
}
}
return {
isRateLimited: false,
rateLimitedAt: null,
rateLimitExpireAt: null,
remainingTimeMs: 0,
remainingTimeMinutes: 0
}
}
// 🔧 创建代理客户端
_createProxyAgent(proxy) {
return ProxyHelper.createProxyAgent(proxy)
}
// 💰 检查配额使用情况(可选实现)
async checkQuotaUsage(accountId) {
try {
const account = await this.getAccount(accountId)
if (!account) {
return false
}
const dailyQuota = parseFloat(account.dailyQuota || '0')
// 如果未设置额度限制,则不限制
if (dailyQuota <= 0) {
return false
}
// 检查是否需要重置每日使用量
const today = redis.getDateStringInTimezone()
if (account.lastResetDate !== today) {
await this.resetDailyUsage(accountId)
return false // 刚重置,不会超额
}
// 获取当日使用统计
const usageStats = await this.getAccountUsageStats(accountId)
if (!usageStats) {
return false
}
const dailyUsage = usageStats.dailyUsage || 0
const isExceeded = dailyUsage >= dailyQuota
if (isExceeded) {
// 标记账户因额度停用
const client = redis.getClientSafe()
await client.hmset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, {
status: 'quota_exceeded',
errorMessage: `Daily quota exceeded: $${dailyUsage.toFixed(2)} / $${dailyQuota.toFixed(2)}`,
quotaStoppedAt: new Date().toISOString()
})
logger.warn(
`💰 CCR account ${account.name} (${accountId}) quota exceeded: $${dailyUsage.toFixed(2)} / $${dailyQuota.toFixed(2)}`
)
// 发送 Webhook 通知
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || accountId,
platform: 'ccr',
status: 'quota_exceeded',
errorCode: 'QUOTA_EXCEEDED',
reason: `Daily quota exceeded: $${dailyUsage.toFixed(2)} / $${dailyQuota.toFixed(2)}`,
timestamp: new Date().toISOString()
})
} catch (webhookError) {
logger.warn('Failed to send webhook notification for CCR quota exceeded:', webhookError)
}
}
return isExceeded
} catch (error) {
logger.error(`❌ Failed to check quota usage for CCR account ${accountId}:`, error)
return false
}
}
// 🔄 重置每日使用量(可选实现)
async resetDailyUsage(accountId) {
try {
const client = redis.getClientSafe()
await client.hmset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, {
dailyUsage: '0',
lastResetDate: redis.getDateStringInTimezone(),
quotaStoppedAt: ''
})
return { success: true }
} catch (error) {
logger.error(`❌ Failed to reset daily usage for CCR account: ${accountId}`, error)
throw error
}
}
// 🚫 检查账户是否超额
async isAccountQuotaExceeded(accountId) {
try {
const account = await this.getAccount(accountId)
if (!account) {
return false
}
const dailyQuota = parseFloat(account.dailyQuota || '0')
// 如果未设置额度限制,则不限制
if (dailyQuota <= 0) {
return false
}
// 获取当日使用统计
const usageStats = await this.getAccountUsageStats(accountId)
if (!usageStats) {
return false
}
const dailyUsage = usageStats.dailyUsage || 0
const isExceeded = dailyUsage >= dailyQuota
if (isExceeded && !account.quotaStoppedAt) {
// 标记账户因额度停用
const client = redis.getClientSafe()
await client.hmset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, {
status: 'quota_exceeded',
errorMessage: `Daily quota exceeded: $${dailyUsage.toFixed(2)} / $${dailyQuota.toFixed(2)}`,
quotaStoppedAt: new Date().toISOString()
})
logger.warn(`💰 CCR account ${account.name} (${accountId}) quota exceeded`)
}
return isExceeded
} catch (error) {
logger.error(`❌ Failed to check quota for CCR account ${accountId}:`, error)
return false
}
}
// 🔄 重置所有CCR账户的每日使用量
async resetAllDailyUsage() {
try {
const accounts = await this.getAllAccounts()
const today = redis.getDateStringInTimezone()
let resetCount = 0
for (const account of accounts) {
if (account.lastResetDate !== today) {
await this.resetDailyUsage(account.id)
resetCount += 1
}
}
logger.success(`✅ Reset daily usage for ${resetCount} CCR accounts`)
return { success: true, resetCount }
} catch (error) {
logger.error('❌ Failed to reset all CCR daily usage:', error)
throw error
}
}
// 📊 获取CCR账户使用统计含每日费用
async getAccountUsageStats(accountId) {
try {
// 使用统一的 Redis 统计
const usageStats = await redis.getAccountUsageStats(accountId)
// 叠加账户自身的额度配置
const accountData = await this.getAccount(accountId)
if (!accountData) {
return null
}
const dailyQuota = parseFloat(accountData.dailyQuota || '0')
const currentDailyCost = usageStats?.daily?.cost || 0
return {
dailyQuota,
dailyUsage: currentDailyCost,
remainingQuota: dailyQuota > 0 ? Math.max(0, dailyQuota - currentDailyCost) : null,
usagePercentage: dailyQuota > 0 ? (currentDailyCost / dailyQuota) * 100 : 0,
lastResetDate: accountData.lastResetDate,
quotaResetTime: accountData.quotaResetTime,
quotaStoppedAt: accountData.quotaStoppedAt,
isQuotaExceeded: dailyQuota > 0 && currentDailyCost >= dailyQuota,
fullUsageStats: usageStats
}
} catch (error) {
logger.error('❌ Failed to get CCR account usage stats:', error)
return null
}
}
// 🔄 重置CCR账户所有异常状态
async resetAccountStatus(accountId) {
try {
const accountData = await this.getAccount(accountId)
if (!accountData) {
throw new Error('Account not found')
}
const client = redis.getClientSafe()
const accountKey = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
const updates = {
status: 'active',
errorMessage: '',
schedulable: 'true',
isActive: 'true'
}
const fieldsToDelete = [
'rateLimitedAt',
'rateLimitStatus',
'unauthorizedAt',
'unauthorizedCount',
'overloadedAt',
'overloadStatus',
'blockedAt',
'quotaStoppedAt'
]
await client.hset(accountKey, updates)
await client.hdel(accountKey, ...fieldsToDelete)
logger.success(`✅ Reset all error status for CCR account ${accountId}`)
// 异步发送 Webhook 通知(忽略错误)
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: accountData.name || accountId,
platform: 'ccr',
status: 'recovered',
errorCode: 'STATUS_RESET',
reason: 'Account status manually reset',
timestamp: new Date().toISOString()
})
} catch (webhookError) {
logger.warn('Failed to send webhook notification for CCR status reset:', webhookError)
}
return { success: true, accountId }
} catch (error) {
logger.error(`❌ Failed to reset CCR account status: ${accountId}`, error)
throw error
}
}
}
module.exports = new CcrAccountService()

View File

@@ -0,0 +1,641 @@
const axios = require('axios')
const ccrAccountService = require('./ccrAccountService')
const logger = require('../utils/logger')
const config = require('../../config/config')
const { parseVendorPrefixedModel } = require('../utils/modelHelper')
class CcrRelayService {
constructor() {
this.defaultUserAgent = 'claude-relay-service/1.0.0'
}
// 🚀 转发请求到CCR API
async relayRequest(
requestBody,
apiKeyData,
clientRequest,
clientResponse,
clientHeaders,
accountId,
options = {}
) {
let abortController = null
let account = null
try {
// 获取账户信息
account = await ccrAccountService.getAccount(accountId)
if (!account) {
throw new Error('CCR account not found')
}
logger.info(
`📤 Processing CCR API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})`
)
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`)
logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`)
logger.debug(`📝 Request model: ${requestBody.model}`)
// 处理模型前缀解析和映射
const { baseModel } = parseVendorPrefixedModel(requestBody.model)
logger.debug(`🔄 Parsed base model: ${baseModel} from original: ${requestBody.model}`)
let mappedModel = baseModel
if (
account.supportedModels &&
typeof account.supportedModels === 'object' &&
!Array.isArray(account.supportedModels)
) {
const newModel = ccrAccountService.getMappedModel(account.supportedModels, baseModel)
if (newModel !== baseModel) {
logger.info(`🔄 Mapping model from ${baseModel} to ${newModel}`)
mappedModel = newModel
}
}
// 创建修改后的请求体,使用去前缀后的模型名
const modifiedRequestBody = {
...requestBody,
model: mappedModel
}
// 创建代理agent
const proxyAgent = ccrAccountService._createProxyAgent(account.proxy)
// 创建AbortController用于取消请求
abortController = new AbortController()
// 设置客户端断开监听器
const handleClientDisconnect = () => {
logger.info('🔌 Client disconnected, aborting CCR request')
if (abortController && !abortController.signal.aborted) {
abortController.abort()
}
}
// 监听客户端断开事件
if (clientRequest) {
clientRequest.once('close', handleClientDisconnect)
}
if (clientResponse) {
clientResponse.once('close', handleClientDisconnect)
}
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠
let apiEndpoint
if (options.customPath) {
// 如果指定了自定义路径(如 count_tokens使用它
const baseUrl = cleanUrl.replace(/\/v1\/messages$/, '') // 移除已有的 /v1/messages
apiEndpoint = `${baseUrl}${options.customPath}`
} else {
// 默认使用 messages 端点
apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
}
logger.debug(`🎯 Final API endpoint: ${apiEndpoint}`)
logger.debug(`[DEBUG] Options passed to relayRequest: ${JSON.stringify(options)}`)
logger.debug(`[DEBUG] Client headers received: ${JSON.stringify(clientHeaders)}`)
// 过滤客户端请求头
const filteredHeaders = this._filterClientHeaders(clientHeaders)
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`)
// 决定使用的 User-Agent优先使用账户自定义的否则透传客户端的最后才使用默认值
const userAgent =
account.userAgent ||
clientHeaders?.['user-agent'] ||
clientHeaders?.['User-Agent'] ||
this.defaultUserAgent
// 准备请求配置
const requestConfig = {
method: 'POST',
url: apiEndpoint,
data: modifiedRequestBody,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': userAgent,
...filteredHeaders
},
httpsAgent: proxyAgent,
timeout: config.requestTimeout || 600000,
signal: abortController.signal,
validateStatus: () => true // 接受所有状态码
}
// 根据 API Key 格式选择认证方式
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) {
// Anthropic 官方 API Key 使用 x-api-key
requestConfig.headers['x-api-key'] = account.apiKey
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key')
} else {
// 其他 API Key (包括CCR API Key) 使用 Authorization Bearer
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}`
logger.debug('[DEBUG] Using Authorization Bearer authentication')
}
logger.debug(
`[DEBUG] Initial headers before beta: ${JSON.stringify(requestConfig.headers, null, 2)}`
)
// 添加beta header如果需要
if (options.betaHeader) {
logger.debug(`[DEBUG] Adding beta header: ${options.betaHeader}`)
requestConfig.headers['anthropic-beta'] = options.betaHeader
} else {
logger.debug('[DEBUG] No beta header to add')
}
// 发送请求
logger.debug(
'📤 Sending request to CCR API with headers:',
JSON.stringify(requestConfig.headers, null, 2)
)
const response = await axios(requestConfig)
// 移除监听器(请求成功完成)
if (clientRequest) {
clientRequest.removeListener('close', handleClientDisconnect)
}
if (clientResponse) {
clientResponse.removeListener('close', handleClientDisconnect)
}
logger.debug(`🔗 CCR API response: ${response.status}`)
logger.debug(`[DEBUG] Response headers: ${JSON.stringify(response.headers)}`)
logger.debug(`[DEBUG] Response data type: ${typeof response.data}`)
logger.debug(
`[DEBUG] Response data length: ${response.data ? (typeof response.data === 'string' ? response.data.length : JSON.stringify(response.data).length) : 0}`
)
logger.debug(
`[DEBUG] Response data preview: ${typeof response.data === 'string' ? response.data.substring(0, 200) : JSON.stringify(response.data).substring(0, 200)}`
)
// 检查错误状态并相应处理
if (response.status === 401) {
logger.warn(`🚫 Unauthorized error detected for CCR account ${accountId}`)
await ccrAccountService.markAccountUnauthorized(accountId)
} else if (response.status === 429) {
logger.warn(`🚫 Rate limit detected for CCR account ${accountId}`)
// 收到429先检查是否因为超过了手动配置的每日额度
await ccrAccountService.checkQuotaUsage(accountId).catch((err) => {
logger.error('❌ Failed to check quota after 429 error:', err)
})
await ccrAccountService.markAccountRateLimited(accountId)
} else if (response.status === 529) {
logger.warn(`🚫 Overload error detected for CCR account ${accountId}`)
await ccrAccountService.markAccountOverloaded(accountId)
} else if (response.status === 200 || response.status === 201) {
// 如果请求成功,检查并移除错误状态
const isRateLimited = await ccrAccountService.isAccountRateLimited(accountId)
if (isRateLimited) {
await ccrAccountService.removeAccountRateLimit(accountId)
}
const isOverloaded = await ccrAccountService.isAccountOverloaded(accountId)
if (isOverloaded) {
await ccrAccountService.removeAccountOverload(accountId)
}
}
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
const responseBody =
typeof response.data === 'string' ? response.data : JSON.stringify(response.data)
logger.debug(`[DEBUG] Final response body to return: ${responseBody}`)
return {
statusCode: response.status,
headers: response.headers,
body: responseBody,
accountId
}
} catch (error) {
// 处理特定错误
if (error.name === 'AbortError' || error.code === 'ECONNABORTED') {
logger.info('Request aborted due to client disconnect')
throw new Error('Client disconnected')
}
logger.error(
`❌ CCR relay request failed (Account: ${account?.name || accountId}):`,
error.message
)
throw error
}
}
// 🌊 处理流式响应
async relayStreamRequestWithUsageCapture(
requestBody,
apiKeyData,
responseStream,
clientHeaders,
usageCallback,
accountId,
streamTransformer = null,
options = {}
) {
let account = null
try {
// 获取账户信息
account = await ccrAccountService.getAccount(accountId)
if (!account) {
throw new Error('CCR account not found')
}
logger.info(
`📡 Processing streaming CCR API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})`
)
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
// 处理模型前缀解析和映射
const { baseModel } = parseVendorPrefixedModel(requestBody.model)
logger.debug(`🔄 Parsed base model: ${baseModel} from original: ${requestBody.model}`)
let mappedModel = baseModel
if (
account.supportedModels &&
typeof account.supportedModels === 'object' &&
!Array.isArray(account.supportedModels)
) {
const newModel = ccrAccountService.getMappedModel(account.supportedModels, baseModel)
if (newModel !== baseModel) {
logger.info(`🔄 [Stream] Mapping model from ${baseModel} to ${newModel}`)
mappedModel = newModel
}
}
// 创建修改后的请求体,使用去前缀后的模型名
const modifiedRequestBody = {
...requestBody,
model: mappedModel
}
// 创建代理agent
const proxyAgent = ccrAccountService._createProxyAgent(account.proxy)
// 发送流式请求
await this._makeCcrStreamRequest(
modifiedRequestBody,
account,
proxyAgent,
clientHeaders,
responseStream,
accountId,
usageCallback,
streamTransformer,
options
)
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
} catch (error) {
logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error)
throw error
}
}
// 🌊 发送流式请求到CCR API
async _makeCcrStreamRequest(
body,
account,
proxyAgent,
clientHeaders,
responseStream,
accountId,
usageCallback,
streamTransformer = null,
requestOptions = {}
) {
return new Promise((resolve, reject) => {
let aborted = false
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠
const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
logger.debug(`🎯 Final API endpoint for stream: ${apiEndpoint}`)
// 过滤客户端请求头
const filteredHeaders = this._filterClientHeaders(clientHeaders)
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`)
// 决定使用的 User-Agent优先使用账户自定义的否则透传客户端的最后才使用默认值
const userAgent =
account.userAgent ||
clientHeaders?.['user-agent'] ||
clientHeaders?.['User-Agent'] ||
this.defaultUserAgent
// 准备请求配置
const requestConfig = {
method: 'POST',
url: apiEndpoint,
data: body,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': userAgent,
...filteredHeaders
},
httpsAgent: proxyAgent,
timeout: config.requestTimeout || 600000,
responseType: 'stream',
validateStatus: () => true // 接受所有状态码
}
// 根据 API Key 格式选择认证方式
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) {
// Anthropic 官方 API Key 使用 x-api-key
requestConfig.headers['x-api-key'] = account.apiKey
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key')
} else {
// 其他 API Key (包括CCR API Key) 使用 Authorization Bearer
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}`
logger.debug('[DEBUG] Using Authorization Bearer authentication')
}
// 添加beta header如果需要
if (requestOptions.betaHeader) {
requestConfig.headers['anthropic-beta'] = requestOptions.betaHeader
}
// 发送请求
const request = axios(requestConfig)
request
.then((response) => {
logger.debug(`🌊 CCR stream response status: ${response.status}`)
// 错误响应处理
if (response.status !== 200) {
logger.error(
`❌ CCR API returned error status: ${response.status} | Account: ${account?.name || accountId}`
)
if (response.status === 401) {
ccrAccountService.markAccountUnauthorized(accountId)
} else if (response.status === 429) {
ccrAccountService.markAccountRateLimited(accountId)
// 检查是否因为超过每日额度
ccrAccountService.checkQuotaUsage(accountId).catch((err) => {
logger.error('❌ Failed to check quota after 429 error:', err)
})
} else if (response.status === 529) {
ccrAccountService.markAccountOverloaded(accountId)
}
// 设置错误响应的状态码和响应头
if (!responseStream.headersSent) {
const errorHeaders = {
'Content-Type': response.headers['content-type'] || 'application/json',
'Cache-Control': 'no-cache',
Connection: 'keep-alive'
}
// 避免 Transfer-Encoding 冲突,让 Express 自动处理
delete errorHeaders['Transfer-Encoding']
delete errorHeaders['Content-Length']
responseStream.writeHead(response.status, errorHeaders)
}
// 直接透传错误数据,不进行包装
response.data.on('data', (chunk) => {
if (!responseStream.destroyed) {
responseStream.write(chunk)
}
})
response.data.on('end', () => {
if (!responseStream.destroyed) {
responseStream.end()
}
resolve() // 不抛出异常,正常完成流处理
})
return
}
// 成功响应,检查并移除错误状态
ccrAccountService.isAccountRateLimited(accountId).then((isRateLimited) => {
if (isRateLimited) {
ccrAccountService.removeAccountRateLimit(accountId)
}
})
ccrAccountService.isAccountOverloaded(accountId).then((isOverloaded) => {
if (isOverloaded) {
ccrAccountService.removeAccountOverload(accountId)
}
})
// 设置响应头
if (!responseStream.headersSent) {
const headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
}
responseStream.writeHead(200, headers)
}
// 处理流数据和使用统计收集
let rawBuffer = ''
const collectedUsage = {}
response.data.on('data', (chunk) => {
if (aborted || responseStream.destroyed) {
return
}
try {
const chunkStr = chunk.toString('utf8')
rawBuffer += chunkStr
// 按行分割处理 SSE 数据
const lines = rawBuffer.split('\n')
rawBuffer = lines.pop() // 保留最后一个可能不完整的行
for (const line of lines) {
if (line.trim()) {
// 解析 SSE 数据并收集使用统计
const usageData = this._parseSSELineForUsage(line)
if (usageData) {
Object.assign(collectedUsage, usageData)
}
// 应用流转换器(如果提供)
let outputLine = line
if (streamTransformer && typeof streamTransformer === 'function') {
outputLine = streamTransformer(line)
}
// 写入到响应流
if (outputLine && !responseStream.destroyed) {
responseStream.write(`${outputLine}\n`)
}
} else {
// 空行也需要传递
if (!responseStream.destroyed) {
responseStream.write('\n')
}
}
}
} catch (err) {
logger.error('❌ Error processing SSE chunk:', err)
}
})
response.data.on('end', () => {
if (!responseStream.destroyed) {
responseStream.end()
}
// 如果收集到使用统计数据,调用回调
if (usageCallback && Object.keys(collectedUsage).length > 0) {
try {
logger.debug(`📊 Collected usage data: ${JSON.stringify(collectedUsage)}`)
// 在 usage 回调中包含模型信息
usageCallback({ ...collectedUsage, accountId, model: body.model })
} catch (err) {
logger.error('❌ Error in usage callback:', err)
}
}
resolve()
})
response.data.on('error', (err) => {
logger.error('❌ Stream data error:', err)
if (!responseStream.destroyed) {
responseStream.end()
}
reject(err)
})
// 客户端断开处理
responseStream.on('close', () => {
logger.info('🔌 Client disconnected from CCR stream')
aborted = true
if (response.data && typeof response.data.destroy === 'function') {
response.data.destroy()
}
})
responseStream.on('error', (err) => {
logger.error('❌ Response stream error:', err)
aborted = true
})
})
.catch((error) => {
if (!responseStream.headersSent) {
responseStream.writeHead(500, { 'Content-Type': 'application/json' })
}
const errorResponse = {
error: {
type: 'internal_error',
message: 'CCR API request failed'
}
}
if (!responseStream.destroyed) {
responseStream.write(`data: ${JSON.stringify(errorResponse)}\n\n`)
responseStream.end()
}
reject(error)
})
})
}
// 📊 解析SSE行以提取使用统计信息
_parseSSELineForUsage(line) {
try {
if (line.startsWith('data: ')) {
const data = line.substring(6).trim()
if (data === '[DONE]') {
return null
}
const jsonData = JSON.parse(data)
// 检查是否包含使用统计信息
if (jsonData.usage) {
return {
input_tokens: jsonData.usage.input_tokens || 0,
output_tokens: jsonData.usage.output_tokens || 0,
cache_creation_input_tokens: jsonData.usage.cache_creation_input_tokens || 0,
cache_read_input_tokens: jsonData.usage.cache_read_input_tokens || 0,
// 支持 ephemeral cache 字段
cache_creation_input_tokens_ephemeral_5m:
jsonData.usage.cache_creation_input_tokens_ephemeral_5m || 0,
cache_creation_input_tokens_ephemeral_1h:
jsonData.usage.cache_creation_input_tokens_ephemeral_1h || 0
}
}
// 检查 message_delta 事件中的使用统计
if (jsonData.type === 'message_delta' && jsonData.delta && jsonData.delta.usage) {
return {
input_tokens: jsonData.delta.usage.input_tokens || 0,
output_tokens: jsonData.delta.usage.output_tokens || 0,
cache_creation_input_tokens: jsonData.delta.usage.cache_creation_input_tokens || 0,
cache_read_input_tokens: jsonData.delta.usage.cache_read_input_tokens || 0,
cache_creation_input_tokens_ephemeral_5m:
jsonData.delta.usage.cache_creation_input_tokens_ephemeral_5m || 0,
cache_creation_input_tokens_ephemeral_1h:
jsonData.delta.usage.cache_creation_input_tokens_ephemeral_1h || 0
}
}
}
} catch (err) {
// 忽略解析错误,不是所有行都包含 JSON
}
return null
}
// 🔍 过滤客户端请求头
_filterClientHeaders(clientHeaders) {
if (!clientHeaders) {
return {}
}
const filteredHeaders = {}
const allowedHeaders = [
'accept-language',
'anthropic-beta',
'anthropic-dangerous-direct-browser-access'
]
// 只保留允许的头部信息
for (const [key, value] of Object.entries(clientHeaders)) {
const lowerKey = key.toLowerCase()
if (allowedHeaders.includes(lowerKey)) {
filteredHeaders[key] = value
}
}
return filteredHeaders
}
// ⏰ 更新账户最后使用时间
async _updateLastUsedTime(accountId) {
try {
const redis = require('../models/redis')
const client = redis.getClientSafe()
await client.hset(`ccr_account:${accountId}`, 'lastUsedAt', new Date().toISOString())
} catch (error) {
logger.error(`❌ Failed to update last used time for CCR account ${accountId}:`, error)
}
}
}
module.exports = new CcrRelayService()

View File

@@ -79,7 +79,7 @@ class ClaudeRelayService {
requestedModel: requestBody.model
})
// 检查模型限制
// 检查模型限制restrictedModels 作为允许列表)
if (
apiKeyData.enableModelRestriction &&
apiKeyData.restrictedModels &&
@@ -87,12 +87,12 @@ class ClaudeRelayService {
) {
const requestedModel = requestBody.model
logger.info(
`🔒 Model restriction check - Requested model: ${requestedModel}, Restricted models: ${JSON.stringify(apiKeyData.restrictedModels)}`
`🔒 Model restriction check - Requested model: ${requestedModel}, Allowed models: ${JSON.stringify(apiKeyData.restrictedModels)}`
)
if (requestedModel && apiKeyData.restrictedModels.includes(requestedModel)) {
if (requestedModel && !apiKeyData.restrictedModels.includes(requestedModel)) {
logger.warn(
`🚫 Model restriction violation for key ${apiKeyData.name}: Attempted to use restricted model ${requestedModel}`
`🚫 Model restriction violation for key ${apiKeyData.name}: Attempted model ${requestedModel} not in allowed list`
)
return {
statusCode: 403,
@@ -844,7 +844,7 @@ class ClaudeRelayService {
requestedModel: requestBody.model
})
// 检查模型限制
// 检查模型限制restrictedModels 作为允许列表)
if (
apiKeyData.enableModelRestriction &&
apiKeyData.restrictedModels &&
@@ -852,12 +852,12 @@ class ClaudeRelayService {
) {
const requestedModel = requestBody.model
logger.info(
`🔒 [Stream] Model restriction check - Requested model: ${requestedModel}, Restricted models: ${JSON.stringify(apiKeyData.restrictedModels)}`
`🔒 [Stream] Model restriction check - Requested model: ${requestedModel}, Allowed models: ${JSON.stringify(apiKeyData.restrictedModels)}`
)
if (requestedModel && apiKeyData.restrictedModels.includes(requestedModel)) {
if (requestedModel && !apiKeyData.restrictedModels.includes(requestedModel)) {
logger.warn(
`🚫 Model restriction violation for key ${apiKeyData.name}: Attempted to use restricted model ${requestedModel}`
`🚫 Model restriction violation for key ${apiKeyData.name}: Attempted model ${requestedModel} not in allowed list`
)
// 对于流式响应,需要写入错误并结束流

View File

@@ -1,9 +1,11 @@
const claudeAccountService = require('./claudeAccountService')
const claudeConsoleAccountService = require('./claudeConsoleAccountService')
const bedrockAccountService = require('./bedrockAccountService')
const ccrAccountService = require('./ccrAccountService')
const accountGroupService = require('./accountGroupService')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const { parseVendorPrefixedModel } = require('../utils/modelHelper')
class UnifiedClaudeScheduler {
constructor() {
@@ -88,12 +90,53 @@ class UnifiedClaudeScheduler {
}
}
// CCR 账户的模型支持检查
if (accountType === 'ccr' && account.supportedModels) {
// 兼容旧格式(数组)和新格式(对象)
if (Array.isArray(account.supportedModels)) {
// 旧格式:数组
if (
account.supportedModels.length > 0 &&
!account.supportedModels.includes(requestedModel)
) {
logger.info(
`🚫 CCR account ${account.name} does not support model ${requestedModel}${context ? ` ${context}` : ''}`
)
return false
}
} else if (typeof account.supportedModels === 'object') {
// 新格式:映射表
if (
Object.keys(account.supportedModels).length > 0 &&
!ccrAccountService.isModelSupported(account.supportedModels, requestedModel)
) {
logger.info(
`🚫 CCR account ${account.name} does not support model ${requestedModel}${context ? ` ${context}` : ''}`
)
return false
}
}
}
return true
}
// 🎯 统一调度Claude账号官方和Console
async selectAccountForApiKey(apiKeyData, sessionHash = null, requestedModel = null) {
try {
// 解析供应商前缀
const { vendor, baseModel } = parseVendorPrefixedModel(requestedModel)
const effectiveModel = vendor === 'ccr' ? baseModel : requestedModel
logger.debug(
`🔍 Model parsing - Original: ${requestedModel}, Vendor: ${vendor}, Effective: ${effectiveModel}`
)
// 如果是 CCR 前缀,只在 CCR 账户池中选择
if (vendor === 'ccr') {
logger.info(`🎯 CCR vendor prefix detected, routing to CCR accounts only`)
return await this._selectCcrAccount(apiKeyData, sessionHash, effectiveModel)
}
// 如果API Key绑定了专属账户或分组优先使用
if (apiKeyData.claudeAccountId) {
// 检查是否是分组
@@ -102,7 +145,12 @@ class UnifiedClaudeScheduler {
logger.info(
`🎯 API key ${apiKeyData.name} is bound to group ${groupId}, selecting from group`
)
return await this.selectAccountFromGroup(groupId, sessionHash, requestedModel)
return await this.selectAccountFromGroup(
groupId,
sessionHash,
effectiveModel,
vendor === 'ccr'
)
}
// 普通专属账户
@@ -176,15 +224,24 @@ class UnifiedClaudeScheduler {
}
}
// CCR 账户不支持绑定(仅通过 ccr, 前缀进行 CCR 路由)
// 如果有会话哈希,检查是否有已映射的账户
if (sessionHash) {
const mappedAccount = await this._getSessionMapping(sessionHash)
if (mappedAccount) {
// 当本次请求不是 CCR 前缀时,不允许使用指向 CCR 的粘性会话映射
if (vendor !== 'ccr' && mappedAccount.accountType === 'ccr') {
logger.info(
` Skipping CCR sticky session mapping for non-CCR request; removing mapping for session ${sessionHash}`
)
await this._deleteSessionMapping(sessionHash)
} else {
// 验证映射的账户是否仍然可用
const isAvailable = await this._isAccountAvailable(
mappedAccount.accountId,
mappedAccount.accountType,
requestedModel
effectiveModel
)
if (isAvailable) {
// 🚀 智能会话续期剩余时间少于14天时自动续期到15天
@@ -199,17 +256,22 @@ class UnifiedClaudeScheduler {
)
await this._deleteSessionMapping(sessionHash)
}
}
}
}
// 获取所有可用账户(传递请求的模型进行过滤)
const availableAccounts = await this._getAllAvailableAccounts(apiKeyData, requestedModel)
const availableAccounts = await this._getAllAvailableAccounts(
apiKeyData,
effectiveModel,
false // 仅前缀才走 CCR默认池不包含 CCR 账户
)
if (availableAccounts.length === 0) {
// 提供更详细的错误信息
if (requestedModel) {
if (effectiveModel) {
throw new Error(
`No available Claude accounts support the requested model: ${requestedModel}`
`No available Claude accounts support the requested model: ${effectiveModel}`
)
} else {
throw new Error('No available Claude accounts (neither official nor console)')
@@ -249,7 +311,7 @@ class UnifiedClaudeScheduler {
}
// 📋 获取所有可用账户合并官方和Console
async _getAllAvailableAccounts(apiKeyData, requestedModel = null) {
async _getAllAvailableAccounts(apiKeyData, requestedModel = null, includeCcr = false) {
const availableAccounts = []
// 如果API Key绑定了专属账户优先返回
@@ -496,8 +558,60 @@ class UnifiedClaudeScheduler {
}
}
// 获取CCR账户共享池- 仅当明确要求包含时
if (includeCcr) {
const ccrAccounts = await ccrAccountService.getAllAccounts()
logger.info(`📋 Found ${ccrAccounts.length} total CCR accounts`)
for (const account of ccrAccounts) {
logger.info(
`🔍 Checking CCR account: ${account.name} - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}`
)
if (
account.isActive === true &&
account.status === 'active' &&
account.accountType === 'shared' &&
this._isSchedulable(account.schedulable)
) {
// 检查模型支持
if (!this._isModelSupportedByAccount(account, 'ccr', requestedModel)) {
continue
}
// 检查是否被限流
const isRateLimited = await ccrAccountService.isAccountRateLimited(account.id)
const isQuotaExceeded = await ccrAccountService.isAccountQuotaExceeded(account.id)
if (!isRateLimited && !isQuotaExceeded) {
availableAccounts.push({
...account,
accountId: account.id,
accountType: 'ccr',
priority: parseInt(account.priority) || 50,
lastUsedAt: account.lastUsedAt || '0'
})
logger.info(
`✅ Added CCR account to available pool: ${account.name} (priority: ${account.priority})`
)
} else {
if (isRateLimited) {
logger.warn(`⚠️ CCR account ${account.name} is rate limited`)
}
if (isQuotaExceeded) {
logger.warn(`💰 CCR account ${account.name} quota exceeded`)
}
}
} else {
logger.info(
`❌ CCR account ${account.name} not eligible - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}`
)
}
}
}
logger.info(
`📊 Total available accounts: ${availableAccounts.length} (Claude: ${availableAccounts.filter((a) => a.accountType === 'claude-official').length}, Console: ${availableAccounts.filter((a) => a.accountType === 'claude-console').length}, Bedrock: ${availableAccounts.filter((a) => a.accountType === 'bedrock').length})`
`📊 Total available accounts: ${availableAccounts.length} (Claude: ${availableAccounts.filter((a) => a.accountType === 'claude-official').length}, Console: ${availableAccounts.filter((a) => a.accountType === 'claude-console').length}, Bedrock: ${availableAccounts.filter((a) => a.accountType === 'bedrock').length}, CCR: ${availableAccounts.filter((a) => a.accountType === 'ccr').length})`
)
return availableAccounts
}
@@ -617,6 +731,52 @@ class UnifiedClaudeScheduler {
}
// Bedrock账户暂不需要限流检查因为AWS管理限流
return true
} else if (accountType === 'ccr') {
const account = await ccrAccountService.getAccount(accountId)
if (!account || !account.isActive) {
return false
}
// 检查账户状态
if (
account.status !== 'active' &&
account.status !== 'unauthorized' &&
account.status !== 'overloaded'
) {
return false
}
// 检查是否可调度
if (!this._isSchedulable(account.schedulable)) {
logger.info(`🚫 CCR account ${accountId} is not schedulable`)
return false
}
// 检查模型支持
if (!this._isModelSupportedByAccount(account, 'ccr', requestedModel, 'in session check')) {
return false
}
// 检查是否超额
try {
await ccrAccountService.checkQuotaUsage(accountId)
} catch (e) {
logger.warn(`Failed to check quota for CCR account ${accountId}: ${e.message}`)
// 继续处理
}
// 检查是否被限流
if (await ccrAccountService.isAccountRateLimited(accountId)) {
return false
}
if (await ccrAccountService.isAccountQuotaExceeded(accountId)) {
return false
}
// 检查是否未授权401错误
if (account.status === 'unauthorized') {
return false
}
// 检查是否过载529错误
if (await ccrAccountService.isAccountOverloaded(accountId)) {
return false
}
return true
}
return false
} catch (error) {
@@ -673,6 +833,8 @@ class UnifiedClaudeScheduler {
)
} else if (accountType === 'claude-console') {
await claudeConsoleAccountService.markAccountRateLimited(accountId)
} else if (accountType === 'ccr') {
await ccrAccountService.markAccountRateLimited(accountId)
}
// 删除会话映射
@@ -697,6 +859,8 @@ class UnifiedClaudeScheduler {
await claudeAccountService.removeAccountRateLimit(accountId)
} else if (accountType === 'claude-console') {
await claudeConsoleAccountService.removeAccountRateLimit(accountId)
} else if (accountType === 'ccr') {
await ccrAccountService.removeAccountRateLimit(accountId)
}
return { success: true }
@@ -716,6 +880,8 @@ class UnifiedClaudeScheduler {
return await claudeAccountService.isAccountRateLimited(accountId)
} else if (accountType === 'claude-console') {
return await claudeConsoleAccountService.isAccountRateLimited(accountId)
} else if (accountType === 'ccr') {
return await ccrAccountService.isAccountRateLimited(accountId)
}
return false
} catch (error) {
@@ -791,7 +957,12 @@ class UnifiedClaudeScheduler {
}
// 👥 从分组中选择账户
async selectAccountFromGroup(groupId, sessionHash = null, requestedModel = null) {
async selectAccountFromGroup(
groupId,
sessionHash = null,
requestedModel = null,
allowCcr = false
) {
try {
// 获取分组信息
const group = await accountGroupService.getGroup(groupId)
@@ -808,18 +979,23 @@ class UnifiedClaudeScheduler {
// 验证映射的账户是否属于这个分组
const memberIds = await accountGroupService.getGroupMembers(groupId)
if (memberIds.includes(mappedAccount.accountId)) {
const isAvailable = await this._isAccountAvailable(
mappedAccount.accountId,
mappedAccount.accountType,
requestedModel
)
if (isAvailable) {
// 🚀 智能会话续期剩余时间少于14天时自动续期到15天
await redis.extendSessionAccountMappingTTL(sessionHash)
logger.info(
`🎯 Using sticky session account from group: ${mappedAccount.accountId} (${mappedAccount.accountType}) for session ${sessionHash}`
// 非 CCR 请求时不允许 CCR 粘性映射
if (!allowCcr && mappedAccount.accountType === 'ccr') {
await this._deleteSessionMapping(sessionHash)
} else {
const isAvailable = await this._isAccountAvailable(
mappedAccount.accountId,
mappedAccount.accountType,
requestedModel
)
return mappedAccount
if (isAvailable) {
// 🚀 智能会话续期剩余时间少于14天时自动续期到15天
await redis.extendSessionAccountMappingTTL(sessionHash)
logger.info(
`🎯 Using sticky session account from group: ${mappedAccount.accountId} (${mappedAccount.accountType}) for session ${sessionHash}`
)
return mappedAccount
}
}
}
// 如果映射的账户不可用或不在分组中,删除映射
@@ -851,6 +1027,14 @@ class UnifiedClaudeScheduler {
account = await claudeConsoleAccountService.getAccount(memberId)
if (account) {
accountType = 'claude-console'
} else {
// 尝试CCR账户仅允许在 allowCcr 为 true 时)
if (allowCcr) {
account = await ccrAccountService.getAccount(memberId)
if (account) {
accountType = 'ccr'
}
}
}
}
} else if (group.platform === 'gemini') {
@@ -873,7 +1057,9 @@ class UnifiedClaudeScheduler {
const status =
accountType === 'claude-official'
? account.status !== 'error' && account.status !== 'blocked'
: account.status === 'active'
: accountType === 'ccr'
? account.status === 'active'
: account.status === 'active'
if (isActive && status && this._isSchedulable(account.schedulable)) {
// 检查模型支持
@@ -930,6 +1116,133 @@ class UnifiedClaudeScheduler {
throw error
}
}
// 🎯 专门选择CCR账户仅限CCR前缀路由使用
async _selectCcrAccount(apiKeyData, sessionHash = null, effectiveModel = null) {
try {
// 1. 检查会话粘性
if (sessionHash) {
const mappedAccount = await this._getSessionMapping(sessionHash)
if (mappedAccount && mappedAccount.accountType === 'ccr') {
// 验证映射的CCR账户是否仍然可用
const isAvailable = await this._isAccountAvailable(
mappedAccount.accountId,
mappedAccount.accountType,
effectiveModel
)
if (isAvailable) {
// 🚀 智能会话续期剩余时间少于14天时自动续期到15天
await redis.extendSessionAccountMappingTTL(sessionHash)
logger.info(
`🎯 Using sticky CCR session account: ${mappedAccount.accountId} for session ${sessionHash}`
)
return mappedAccount
} else {
logger.warn(
`⚠️ Mapped CCR account ${mappedAccount.accountId} is no longer available, selecting new account`
)
await this._deleteSessionMapping(sessionHash)
}
}
}
// 2. 获取所有可用的CCR账户
const availableCcrAccounts = await this._getAvailableCcrAccounts(effectiveModel)
if (availableCcrAccounts.length === 0) {
throw new Error(
`No available CCR accounts support the requested model: ${effectiveModel || 'unspecified'}`
)
}
// 3. 按优先级和最后使用时间排序
const sortedAccounts = this._sortAccountsByPriority(availableCcrAccounts)
const selectedAccount = sortedAccounts[0]
// 4. 建立会话映射
if (sessionHash) {
await this._setSessionMapping(
sessionHash,
selectedAccount.accountId,
selectedAccount.accountType
)
logger.info(
`🎯 Created new sticky CCR session mapping: ${selectedAccount.name} (${selectedAccount.accountId}) for session ${sessionHash}`
)
}
logger.info(
`🎯 Selected CCR account: ${selectedAccount.name} (${selectedAccount.accountId}) with priority ${selectedAccount.priority} for API key ${apiKeyData.name}`
)
return {
accountId: selectedAccount.accountId,
accountType: selectedAccount.accountType
}
} catch (error) {
logger.error('❌ Failed to select CCR account:', error)
throw error
}
}
// 📋 获取所有可用的CCR账户
async _getAvailableCcrAccounts(requestedModel = null) {
const availableAccounts = []
try {
const ccrAccounts = await ccrAccountService.getAllAccounts()
logger.info(`📋 Found ${ccrAccounts.length} total CCR accounts for CCR-only selection`)
for (const account of ccrAccounts) {
logger.debug(
`🔍 Checking CCR account: ${account.name} - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}`
)
if (
account.isActive === true &&
account.status === 'active' &&
account.accountType === 'shared' &&
this._isSchedulable(account.schedulable)
) {
// 检查模型支持
if (!this._isModelSupportedByAccount(account, 'ccr', requestedModel)) {
logger.debug(`CCR account ${account.name} does not support model ${requestedModel}`)
continue
}
// 检查是否被限流或超额
const isRateLimited = await ccrAccountService.isAccountRateLimited(account.id)
const isQuotaExceeded = await ccrAccountService.isAccountQuotaExceeded(account.id)
const isOverloaded = await ccrAccountService.isAccountOverloaded(account.id)
if (!isRateLimited && !isQuotaExceeded && !isOverloaded) {
availableAccounts.push({
...account,
accountId: account.id,
accountType: 'ccr',
priority: parseInt(account.priority) || 50,
lastUsedAt: account.lastUsedAt || '0'
})
logger.debug(`✅ Added CCR account to available pool: ${account.name}`)
} else {
logger.debug(
`❌ CCR account ${account.name} not available - rateLimited: ${isRateLimited}, quotaExceeded: ${isQuotaExceeded}, overloaded: ${isOverloaded}`
)
}
} else {
logger.debug(
`❌ CCR account ${account.name} not eligible - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}`
)
}
}
logger.info(`📊 Total available CCR accounts: ${availableAccounts.length}`)
return availableAccounts
} catch (error) {
logger.error('❌ Failed to get available CCR accounts:', error)
return []
}
}
}
module.exports = new UnifiedClaudeScheduler()