Files
claude-relay-service/src/services/openaiAccountService.js
shaw d2f3f6866c feat: Codex账号管理优化与API Key激活机制
 新功能
- 支持通过refreshToken新增Codex账号,创建时立即验证token有效性
- API Key新增首次使用自动激活机制,支持activation模式设置有效期
- 前端账号表单增加token验证功能,确保账号创建成功

🐛 修复
- 修复Codex token刷新失败问题,增加分布式锁防止并发刷新
- 优化token刷新错误处理,提供更详细的错误信息和建议
- 修复OpenAI账号token过期检测和自动刷新逻辑

📝 文档更新
- 更新README中Codex使用说明,改为config.toml配置方式
- 优化Cherry Studio等第三方工具接入文档
- 添加详细的配置示例和账号类型说明

🎨 界面优化
- 改进账号创建表单UI,支持手动和OAuth两种模式
- 优化API Key过期时间编辑弹窗,支持激活操作
- 调整教程页面布局,提升移动端响应式体验

💡 代码改进
- 重构token刷新服务,增强错误处理和重试机制
- 优化代理配置处理,确保OAuth请求正确使用代理
- 改进webhook通知,增加token刷新失败告警
2025-09-06 18:04:06 +08:00

938 lines
30 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const redisClient = require('../models/redis')
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const axios = require('axios')
const ProxyHelper = require('../utils/proxyHelper')
const config = require('../../config/config')
const logger = require('../utils/logger')
// const { maskToken } = require('../utils/tokenMask')
const {
logRefreshStart,
logRefreshSuccess,
logRefreshError,
logTokenUsage,
logRefreshSkipped
} = require('../utils/tokenRefreshLogger')
const LRUCache = require('../utils/lruCache')
const tokenRefreshService = require('./tokenRefreshService')
// 加密相关常量
const ALGORITHM = 'aes-256-cbc'
const ENCRYPTION_SALT = 'openai-account-salt'
const IV_LENGTH = 16
// 🚀 性能优化:缓存派生的加密密钥,避免每次重复计算
// scryptSync 是 CPU 密集型操作,缓存可以减少 95%+ 的 CPU 占用
let _encryptionKeyCache = null
// 🔄 解密结果缓存,提高解密性能
const decryptCache = new LRUCache(500)
// 生成加密密钥(使用与 claudeAccountService 相同的方法)
function generateEncryptionKey() {
if (!_encryptionKeyCache) {
_encryptionKeyCache = crypto.scryptSync(config.security.encryptionKey, ENCRYPTION_SALT, 32)
logger.info('🔑 OpenAI encryption key derived and cached for performance optimization')
}
return _encryptionKeyCache
}
// OpenAI 账户键前缀
const OPENAI_ACCOUNT_KEY_PREFIX = 'openai:account:'
const SHARED_OPENAI_ACCOUNTS_KEY = 'shared_openai_accounts'
const ACCOUNT_SESSION_MAPPING_PREFIX = 'openai_session_account_mapping:'
// 加密函数
function encrypt(text) {
if (!text) {
return ''
}
const key = generateEncryptionKey()
const iv = crypto.randomBytes(IV_LENGTH)
const cipher = crypto.createCipheriv(ALGORITHM, key, iv)
let encrypted = cipher.update(text)
encrypted = Buffer.concat([encrypted, cipher.final()])
return `${iv.toString('hex')}:${encrypted.toString('hex')}`
}
// 解密函数
function decrypt(text) {
if (!text || text === '') {
return ''
}
// 检查是否是有效的加密格式(至少需要 32 个字符的 IV + 冒号 + 加密文本)
if (text.length < 33 || text.charAt(32) !== ':') {
logger.warn('Invalid encrypted text format, returning empty string', {
textLength: text ? text.length : 0,
char32: text && text.length > 32 ? text.charAt(32) : 'N/A',
first50: text ? text.substring(0, 50) : 'N/A'
})
return ''
}
// 🎯 检查缓存
const cacheKey = crypto.createHash('sha256').update(text).digest('hex')
const cached = decryptCache.get(cacheKey)
if (cached !== undefined) {
return cached
}
try {
const key = generateEncryptionKey()
// IV 是固定长度的 32 个十六进制字符16 字节)
const ivHex = text.substring(0, 32)
const encryptedHex = text.substring(33) // 跳过冒号
const iv = Buffer.from(ivHex, 'hex')
const encryptedText = Buffer.from(encryptedHex, 'hex')
const decipher = crypto.createDecipheriv(ALGORITHM, key, iv)
let decrypted = decipher.update(encryptedText)
decrypted = Buffer.concat([decrypted, decipher.final()])
const result = decrypted.toString()
// 💾 存入缓存5分钟过期
decryptCache.set(cacheKey, result, 5 * 60 * 1000)
// 📊 定期打印缓存统计
if ((decryptCache.hits + decryptCache.misses) % 1000 === 0) {
decryptCache.printStats()
}
return result
} catch (error) {
logger.error('Decryption error:', error)
return ''
}
}
// 🧹 定期清理缓存每10分钟
setInterval(
() => {
decryptCache.cleanup()
logger.info('🧹 OpenAI decrypt cache cleanup completed', decryptCache.getStats())
},
10 * 60 * 1000
)
// 刷新访问令牌
async function refreshAccessToken(refreshToken, proxy = null) {
try {
// Codex CLI 的官方 CLIENT_ID
const CLIENT_ID = 'app_EMoamEEZ73f0CkXaXp7hrann'
// 准备请求数据
const requestData = new URLSearchParams({
grant_type: 'refresh_token',
client_id: CLIENT_ID,
refresh_token: refreshToken,
scope: 'openid profile email'
}).toString()
// 配置请求选项
const requestOptions = {
method: 'POST',
url: 'https://auth.openai.com/oauth/token',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': requestData.length
},
data: requestData,
timeout: 30000 // 30秒超时
}
// 配置代理(如果有)
const proxyAgent = ProxyHelper.createProxyAgent(proxy)
if (proxyAgent) {
requestOptions.httpsAgent = proxyAgent
requestOptions.proxy = false // 重要:禁用 axios 的默认代理,强制使用我们的 httpsAgent
logger.info(
`🌐 Using proxy for OpenAI token refresh: ${ProxyHelper.getProxyDescription(proxy)}`
)
} else {
logger.debug('🌐 No proxy configured for OpenAI token refresh')
}
// 发送请求
logger.info('🔍 发送 token 刷新请求,使用代理:', !!requestOptions.httpsAgent)
const response = await axios(requestOptions)
if (response.status === 200 && response.data) {
const result = response.data
logger.info('✅ Successfully refreshed OpenAI token')
// 返回新的 token 信息
return {
access_token: result.access_token,
id_token: result.id_token,
refresh_token: result.refresh_token || refreshToken, // 如果没有返回新的,保留原来的
expires_in: result.expires_in || 3600,
expiry_date: Date.now() + (result.expires_in || 3600) * 1000 // 计算过期时间
}
} else {
throw new Error(`Failed to refresh token: ${response.status} ${response.statusText}`)
}
} catch (error) {
if (error.response) {
// 服务器响应了错误状态码
const errorData = error.response.data || {}
logger.error('OpenAI token refresh failed:', {
status: error.response.status,
data: errorData,
headers: error.response.headers
})
// 构建详细的错误信息
let errorMessage = `OpenAI 服务器返回错误 (${error.response.status})`
if (error.response.status === 400) {
if (errorData.error === 'invalid_grant') {
errorMessage = 'Refresh Token 无效或已过期,请重新授权'
} else if (errorData.error === 'invalid_request') {
errorMessage = `请求参数错误:${errorData.error_description || errorData.error}`
} else {
errorMessage = `请求错误:${errorData.error_description || errorData.error || '未知错误'}`
}
} else if (error.response.status === 401) {
errorMessage = '认证失败Refresh Token 无效'
} else if (error.response.status === 403) {
errorMessage = '访问被拒绝:可能是 IP 被封或账户被禁用'
} else if (error.response.status === 429) {
errorMessage = '请求过于频繁,请稍后重试'
} else if (error.response.status >= 500) {
errorMessage = 'OpenAI 服务器内部错误,请稍后重试'
} else if (errorData.error_description) {
errorMessage = errorData.error_description
} else if (errorData.error) {
errorMessage = errorData.error
} else if (errorData.message) {
errorMessage = errorData.message
}
const fullError = new Error(errorMessage)
fullError.status = error.response.status
fullError.details = errorData
throw fullError
} else if (error.request) {
// 请求已发出但没有收到响应
logger.error('OpenAI token refresh no response:', error.message)
let errorMessage = '无法连接到 OpenAI 服务器'
if (proxy) {
errorMessage += `(代理: ${ProxyHelper.getProxyDescription(proxy)}`
}
if (error.code === 'ECONNREFUSED') {
errorMessage += ' - 连接被拒绝'
} else if (error.code === 'ETIMEDOUT') {
errorMessage += ' - 连接超时'
} else if (error.code === 'ENOTFOUND') {
errorMessage += ' - 无法解析域名'
} else if (error.code === 'EPROTO') {
errorMessage += ' - 协议错误(可能是代理配置问题)'
} else if (error.message) {
errorMessage += ` - ${error.message}`
}
const fullError = new Error(errorMessage)
fullError.code = error.code
throw fullError
} else {
// 设置请求时发生错误
logger.error('OpenAI token refresh error:', error.message)
const fullError = new Error(`请求设置错误: ${error.message}`)
fullError.originalError = error
throw fullError
}
}
}
// 检查 token 是否过期
function isTokenExpired(account) {
if (!account.expiresAt) {
return false
}
return new Date(account.expiresAt) <= new Date()
}
// 刷新账户的 access token带分布式锁
async function refreshAccountToken(accountId) {
let lockAcquired = false
let account = null
let accountName = accountId
try {
account = await getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
accountName = account.name || accountId
// 检查是否有 refresh token
// account.refreshToken 在 getAccount 中已经被解密了,直接使用即可
const refreshToken = account.refreshToken || null
if (!refreshToken) {
logRefreshSkipped(accountId, accountName, 'openai', 'No refresh token available')
throw new Error('No refresh token available')
}
// 尝试获取分布式锁
lockAcquired = await tokenRefreshService.acquireRefreshLock(accountId, 'openai')
if (!lockAcquired) {
// 如果无法获取锁,说明另一个进程正在刷新
logger.info(
`🔒 Token refresh already in progress for OpenAI account: ${accountName} (${accountId})`
)
logRefreshSkipped(accountId, accountName, 'openai', 'already_locked')
// 等待一段时间后返回,期望其他进程已完成刷新
await new Promise((resolve) => setTimeout(resolve, 2000))
// 重新获取账户数据(可能已被其他进程刷新)
const updatedAccount = await getAccount(accountId)
if (updatedAccount && !isTokenExpired(updatedAccount)) {
return {
access_token: decrypt(updatedAccount.accessToken),
id_token: updatedAccount.idToken,
refresh_token: updatedAccount.refreshToken,
expires_in: 3600,
expiry_date: new Date(updatedAccount.expiresAt).getTime()
}
}
throw new Error('Token refresh in progress by another process')
}
// 获取锁成功,开始刷新
logRefreshStart(accountId, accountName, 'openai')
logger.info(`🔄 Starting token refresh for OpenAI account: ${accountName} (${accountId})`)
// 获取代理配置
let proxy = null
if (account.proxy) {
try {
proxy = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy
} catch (e) {
logger.warn(`Failed to parse proxy config for account ${accountId}:`, e)
}
}
const newTokens = await refreshAccessToken(refreshToken, proxy)
if (!newTokens) {
throw new Error('Failed to refresh token')
}
// 准备更新数据
const updates = {
accessToken: encrypt(newTokens.access_token),
expiresAt: new Date(newTokens.expiry_date).toISOString()
}
// 如果有新的 ID token也更新它这对于首次未提供 ID Token 的账户特别重要)
if (newTokens.id_token) {
updates.idToken = encrypt(newTokens.id_token)
// 如果之前没有 ID Token尝试解析并更新用户信息
if (!account.idToken || account.idToken === '') {
try {
const idTokenParts = newTokens.id_token.split('.')
if (idTokenParts.length === 3) {
const payload = JSON.parse(Buffer.from(idTokenParts[1], 'base64').toString())
const authClaims = payload['https://api.openai.com/auth'] || {}
// 更新账户信息 - 使用正确的字段名
// OpenAI ID Token中用户ID在chatgpt_account_id、chatgpt_user_id和user_id字段
if (authClaims.chatgpt_account_id) {
updates.accountId = authClaims.chatgpt_account_id
}
if (authClaims.chatgpt_user_id) {
updates.chatgptUserId = authClaims.chatgpt_user_id
} else if (authClaims.user_id) {
// 有些情况下可能只有user_id字段
updates.chatgptUserId = authClaims.user_id
}
if (authClaims.organizations?.[0]?.id) {
updates.organizationId = authClaims.organizations[0].id
}
if (authClaims.organizations?.[0]?.role) {
updates.organizationRole = authClaims.organizations[0].role
}
if (authClaims.organizations?.[0]?.title) {
updates.organizationTitle = authClaims.organizations[0].title
}
if (payload.email) {
updates.email = encrypt(payload.email)
}
if (payload.email_verified !== undefined) {
updates.emailVerified = payload.email_verified
}
logger.info(`Updated user info from ID Token for account ${accountId}`)
}
} catch (e) {
logger.warn(`Failed to parse ID Token for account ${accountId}:`, e)
}
}
}
// 如果返回了新的 refresh token更新它
if (newTokens.refresh_token && newTokens.refresh_token !== refreshToken) {
updates.refreshToken = encrypt(newTokens.refresh_token)
logger.info(`Updated refresh token for account ${accountId}`)
}
// 更新账户信息
await updateAccount(accountId, updates)
logRefreshSuccess(accountId, accountName, 'openai', newTokens.expiry_date)
return newTokens
} catch (error) {
logRefreshError(accountId, account?.name || accountName, 'openai', error.message)
// 发送 Webhook 通知(如果启用)
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account?.name || accountName,
platform: 'openai',
status: 'error',
errorCode: 'OPENAI_TOKEN_REFRESH_FAILED',
reason: `Token refresh failed: ${error.message}`,
timestamp: new Date().toISOString()
})
logger.info(
`📢 Webhook notification sent for OpenAI account ${account?.name || accountName} refresh failure`
)
} catch (webhookError) {
logger.error('Failed to send webhook notification:', webhookError)
}
throw error
} finally {
// 确保释放锁
if (lockAcquired) {
await tokenRefreshService.releaseRefreshLock(accountId, 'openai')
logger.debug(`🔓 Released refresh lock for OpenAI account ${accountId}`)
}
}
}
// 创建账户
async function createAccount(accountData) {
const accountId = uuidv4()
const now = new Date().toISOString()
// 处理OAuth数据
let oauthData = {}
if (accountData.openaiOauth) {
oauthData =
typeof accountData.openaiOauth === 'string'
? JSON.parse(accountData.openaiOauth)
: accountData.openaiOauth
}
// 处理账户信息
const accountInfo = accountData.accountInfo || {}
// 检查邮箱是否已经是加密格式包含冒号分隔的32位十六进制字符
const isEmailEncrypted =
accountInfo.email && accountInfo.email.length >= 33 && accountInfo.email.charAt(32) === ':'
const account = {
id: accountId,
name: accountData.name,
description: accountData.description || '',
accountType: accountData.accountType || 'shared',
groupId: accountData.groupId || null,
priority: accountData.priority || 50,
rateLimitDuration:
accountData.rateLimitDuration !== undefined && accountData.rateLimitDuration !== null
? accountData.rateLimitDuration
: 60,
// OAuth相关字段加密存储
// ID Token 现在是可选的,如果没有提供会在首次刷新时自动获取
idToken: oauthData.idToken && oauthData.idToken.trim() ? encrypt(oauthData.idToken) : '',
accessToken:
oauthData.accessToken && oauthData.accessToken.trim() ? encrypt(oauthData.accessToken) : '',
refreshToken:
oauthData.refreshToken && oauthData.refreshToken.trim()
? encrypt(oauthData.refreshToken)
: '',
openaiOauth: encrypt(JSON.stringify(oauthData)),
// 账户信息字段 - 确保所有字段都被保存,即使是空字符串
accountId: accountInfo.accountId || '',
chatgptUserId: accountInfo.chatgptUserId || '',
organizationId: accountInfo.organizationId || '',
organizationRole: accountInfo.organizationRole || '',
organizationTitle: accountInfo.organizationTitle || '',
planType: accountInfo.planType || '',
// 邮箱字段:检查是否已经加密,避免双重加密
email: isEmailEncrypted ? accountInfo.email : encrypt(accountInfo.email || ''),
emailVerified: accountInfo.emailVerified === true ? 'true' : 'false',
// 过期时间
expiresAt: oauthData.expires_in
? new Date(Date.now() + oauthData.expires_in * 1000).toISOString()
: new Date(Date.now() + 365 * 24 * 60 * 60 * 1000).toISOString(), // 默认1年
// 状态字段
isActive: accountData.isActive !== false ? 'true' : 'false',
status: 'active',
schedulable: accountData.schedulable !== false ? 'true' : 'false',
lastRefresh: now,
createdAt: now,
updatedAt: now
}
// 代理配置
if (accountData.proxy) {
account.proxy =
typeof accountData.proxy === 'string' ? accountData.proxy : JSON.stringify(accountData.proxy)
}
const client = redisClient.getClientSafe()
await client.hset(`${OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`, account)
// 如果是共享账户,添加到共享账户集合
if (account.accountType === 'shared') {
await client.sadd(SHARED_OPENAI_ACCOUNTS_KEY, accountId)
}
logger.info(`Created OpenAI account: ${accountId}`)
return account
}
// 获取账户
async function getAccount(accountId) {
const client = redisClient.getClientSafe()
const accountData = await client.hgetall(`${OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`)
if (!accountData || Object.keys(accountData).length === 0) {
return null
}
// 解密敏感数据(仅用于内部处理,不返回给前端)
if (accountData.idToken) {
accountData.idToken = decrypt(accountData.idToken)
}
// 注意accessToken 在 openaiRoutes.js 中会被单独解密,这里不解密
// if (accountData.accessToken) {
// accountData.accessToken = decrypt(accountData.accessToken)
// }
if (accountData.refreshToken) {
accountData.refreshToken = decrypt(accountData.refreshToken)
}
if (accountData.email) {
accountData.email = decrypt(accountData.email)
}
if (accountData.openaiOauth) {
try {
accountData.openaiOauth = JSON.parse(decrypt(accountData.openaiOauth))
} catch (e) {
accountData.openaiOauth = null
}
}
// 解析代理配置
if (accountData.proxy && typeof accountData.proxy === 'string') {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
return accountData
}
// 更新账户
async function updateAccount(accountId, updates) {
const existingAccount = await getAccount(accountId)
if (!existingAccount) {
throw new Error('Account not found')
}
updates.updatedAt = new Date().toISOString()
// 加密敏感数据
if (updates.openaiOauth) {
const oauthData =
typeof updates.openaiOauth === 'string'
? updates.openaiOauth
: JSON.stringify(updates.openaiOauth)
updates.openaiOauth = encrypt(oauthData)
}
if (updates.idToken) {
updates.idToken = encrypt(updates.idToken)
}
if (updates.accessToken) {
updates.accessToken = encrypt(updates.accessToken)
}
if (updates.refreshToken && updates.refreshToken.trim()) {
updates.refreshToken = encrypt(updates.refreshToken)
}
if (updates.email) {
updates.email = encrypt(updates.email)
}
// 处理代理配置
if (updates.proxy) {
updates.proxy =
typeof updates.proxy === 'string' ? updates.proxy : JSON.stringify(updates.proxy)
}
// 更新账户类型时处理共享账户集合
const client = redisClient.getClientSafe()
if (updates.accountType && updates.accountType !== existingAccount.accountType) {
if (updates.accountType === 'shared') {
await client.sadd(SHARED_OPENAI_ACCOUNTS_KEY, accountId)
} else {
await client.srem(SHARED_OPENAI_ACCOUNTS_KEY, accountId)
}
}
await client.hset(`${OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`, updates)
logger.info(`Updated OpenAI account: ${accountId}`)
// 合并更新后的账户数据
const updatedAccount = { ...existingAccount, ...updates }
// 返回时解析代理配置
if (updatedAccount.proxy && typeof updatedAccount.proxy === 'string') {
try {
updatedAccount.proxy = JSON.parse(updatedAccount.proxy)
} catch (e) {
updatedAccount.proxy = null
}
}
return updatedAccount
}
// 删除账户
async function deleteAccount(accountId) {
const account = await getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
// 从 Redis 删除
const client = redisClient.getClientSafe()
await client.del(`${OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`)
// 从共享账户集合中移除
if (account.accountType === 'shared') {
await client.srem(SHARED_OPENAI_ACCOUNTS_KEY, accountId)
}
// 清理会话映射
const sessionMappings = await client.keys(`${ACCOUNT_SESSION_MAPPING_PREFIX}*`)
for (const key of sessionMappings) {
const mappedAccountId = await client.get(key)
if (mappedAccountId === accountId) {
await client.del(key)
}
}
logger.info(`Deleted OpenAI account: ${accountId}`)
return true
}
// 获取所有账户
async function getAllAccounts() {
const client = redisClient.getClientSafe()
const keys = await client.keys(`${OPENAI_ACCOUNT_KEY_PREFIX}*`)
const accounts = []
for (const key of keys) {
const accountData = await client.hgetall(key)
if (accountData && Object.keys(accountData).length > 0) {
// 解密敏感数据(但不返回给前端)
if (accountData.email) {
accountData.email = decrypt(accountData.email)
}
// 先保存 refreshToken 是否存在的标记
const hasRefreshTokenFlag = !!accountData.refreshToken
// 屏蔽敏感信息token等不应该返回给前端
delete accountData.idToken
delete accountData.accessToken
delete accountData.refreshToken
delete accountData.openaiOauth
// 获取限流状态信息
const rateLimitInfo = await getAccountRateLimitInfo(accountData.id)
// 解析代理配置
if (accountData.proxy) {
try {
accountData.proxy = JSON.parse(accountData.proxy)
// 屏蔽代理密码
if (accountData.proxy && accountData.proxy.password) {
accountData.proxy.password = '******'
}
} catch (e) {
// 如果解析失败设置为null
accountData.proxy = null
}
}
// 不解密敏感字段,只返回基本信息
accounts.push({
...accountData,
isActive: accountData.isActive === 'true',
schedulable: accountData.schedulable !== 'false',
openaiOauth: accountData.openaiOauth ? '[ENCRYPTED]' : '',
accessToken: accountData.accessToken ? '[ENCRYPTED]' : '',
refreshToken: accountData.refreshToken ? '[ENCRYPTED]' : '',
// 添加 scopes 字段用于判断认证方式
// 处理空字符串的情况
scopes:
accountData.scopes && accountData.scopes.trim() ? accountData.scopes.split(' ') : [],
// 添加 hasRefreshToken 标记
hasRefreshToken: hasRefreshTokenFlag,
// 添加限流状态信息(统一格式)
rateLimitStatus: rateLimitInfo
? {
isRateLimited: rateLimitInfo.isRateLimited,
rateLimitedAt: rateLimitInfo.rateLimitedAt,
minutesRemaining: rateLimitInfo.minutesRemaining
}
: {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
})
}
}
return accounts
}
// 选择可用账户(支持专属和共享账户)
async function selectAvailableAccount(apiKeyId, sessionHash = null) {
// 首先检查是否有粘性会话
const client = redisClient.getClientSafe()
if (sessionHash) {
const mappedAccountId = await client.get(`${ACCOUNT_SESSION_MAPPING_PREFIX}${sessionHash}`)
if (mappedAccountId) {
const account = await getAccount(mappedAccountId)
if (account && account.isActive === 'true' && !isTokenExpired(account)) {
logger.debug(`Using sticky session account: ${mappedAccountId}`)
return account
}
}
}
// 获取 API Key 信息
const apiKeyData = await client.hgetall(`api_key:${apiKeyId}`)
// 检查是否绑定了 OpenAI 账户
if (apiKeyData.openaiAccountId) {
const account = await getAccount(apiKeyData.openaiAccountId)
if (account && account.isActive === 'true') {
// 检查 token 是否过期
const isExpired = isTokenExpired(account)
// 记录token使用情况
logTokenUsage(account.id, account.name, 'openai', account.expiresAt, isExpired)
if (isExpired) {
await refreshAccountToken(account.id)
return await getAccount(account.id)
}
// 创建粘性会话映射
if (sessionHash) {
await client.setex(
`${ACCOUNT_SESSION_MAPPING_PREFIX}${sessionHash}`,
3600, // 1小时过期
account.id
)
}
return account
}
}
// 从共享账户池选择
const sharedAccountIds = await client.smembers(SHARED_OPENAI_ACCOUNTS_KEY)
const availableAccounts = []
for (const accountId of sharedAccountIds) {
const account = await getAccount(accountId)
if (account && account.isActive === 'true' && !isRateLimited(account)) {
availableAccounts.push(account)
}
}
if (availableAccounts.length === 0) {
throw new Error('No available OpenAI accounts')
}
// 选择使用最少的账户
const selectedAccount = availableAccounts.reduce((prev, curr) => {
const prevUsage = parseInt(prev.totalUsage || 0)
const currUsage = parseInt(curr.totalUsage || 0)
return prevUsage <= currUsage ? prev : curr
})
// 检查 token 是否过期
if (isTokenExpired(selectedAccount)) {
await refreshAccountToken(selectedAccount.id)
return await getAccount(selectedAccount.id)
}
// 创建粘性会话映射
if (sessionHash) {
await client.setex(
`${ACCOUNT_SESSION_MAPPING_PREFIX}${sessionHash}`,
3600, // 1小时过期
selectedAccount.id
)
}
return selectedAccount
}
// 检查账户是否被限流
function isRateLimited(account) {
if (account.rateLimitStatus === 'limited' && account.rateLimitedAt) {
const limitedAt = new Date(account.rateLimitedAt).getTime()
const now = Date.now()
const limitDuration = 60 * 60 * 1000 // 1小时
return now < limitedAt + limitDuration
}
return false
}
// 设置账户限流状态
async function setAccountRateLimited(accountId, isLimited) {
const updates = {
rateLimitStatus: isLimited ? 'limited' : 'normal',
rateLimitedAt: isLimited ? new Date().toISOString() : null
}
await updateAccount(accountId, updates)
logger.info(`Set rate limit status for OpenAI account ${accountId}: ${updates.rateLimitStatus}`)
// 如果被限流,发送 Webhook 通知
if (isLimited) {
try {
const account = await getAccount(accountId)
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || accountId,
platform: 'openai',
status: 'blocked',
errorCode: 'OPENAI_RATE_LIMITED',
reason: 'Account rate limited (429 error). Estimated reset in 1 hour',
timestamp: new Date().toISOString()
})
logger.info(`📢 Webhook notification sent for OpenAI account ${account.name} rate limit`)
} catch (webhookError) {
logger.error('Failed to send rate limit webhook notification:', webhookError)
}
}
}
// 切换账户调度状态
async function toggleSchedulable(accountId) {
const account = await getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
// 切换调度状态
const newSchedulable = account.schedulable === 'false' ? 'true' : 'false'
await updateAccount(accountId, {
schedulable: newSchedulable
})
logger.info(`Toggled schedulable status for OpenAI account ${accountId}: ${newSchedulable}`)
return {
success: true,
schedulable: newSchedulable === 'true'
}
}
// 获取账户限流信息
async function getAccountRateLimitInfo(accountId) {
const account = await getAccount(accountId)
if (!account) {
return null
}
if (account.rateLimitStatus === 'limited' && account.rateLimitedAt) {
const limitedAt = new Date(account.rateLimitedAt).getTime()
const now = Date.now()
const limitDuration = 60 * 60 * 1000 // 1小时
const remainingTime = Math.max(0, limitedAt + limitDuration - now)
return {
isRateLimited: remainingTime > 0,
rateLimitedAt: account.rateLimitedAt,
minutesRemaining: Math.ceil(remainingTime / (60 * 1000))
}
}
return {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
}
// 更新账户使用统计tokens参数可选默认为0仅更新最后使用时间
async function updateAccountUsage(accountId, tokens = 0) {
const account = await getAccount(accountId)
if (!account) {
return
}
const updates = {
lastUsedAt: new Date().toISOString()
}
// 如果有 tokens 参数且大于0同时更新使用统计
if (tokens > 0) {
const totalUsage = parseInt(account.totalUsage || 0) + tokens
updates.totalUsage = totalUsage.toString()
}
await updateAccount(accountId, updates)
}
// 为了兼容性保留recordUsage作为updateAccountUsage的别名
const recordUsage = updateAccountUsage
module.exports = {
createAccount,
getAccount,
updateAccount,
deleteAccount,
getAllAccounts,
selectAvailableAccount,
refreshAccountToken,
isTokenExpired,
setAccountRateLimited,
toggleSchedulable,
getAccountRateLimitInfo,
updateAccountUsage,
recordUsage, // 别名指向updateAccountUsage
encrypt,
decrypt,
generateEncryptionKey,
decryptCache // 暴露缓存对象以便测试和监控
}