feat: 实现OpenAI账户管理和统一调度系统

- 新增 OpenAI 账户管理服务,支持多账户轮询和负载均衡
- 实现统一的 OpenAI API 调度器,智能选择最优账户
- 优化成本计算器,支持更精确的 token 计算
- 更新模型定价数据,包含最新的 OpenAI 模型价格
- 增强 API Key 管理,支持更灵活的配额控制
- 改进管理界面,添加教程视图和账户分组管理
- 优化限流配置组件,提供更直观的用户体验

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
shaw
2025-08-11 13:58:43 +08:00
parent f22a38d24a
commit f462684f97
22 changed files with 6163 additions and 3134 deletions

View File

@@ -4,6 +4,7 @@ const claudeAccountService = require('../services/claudeAccountService')
const claudeConsoleAccountService = require('../services/claudeConsoleAccountService')
const bedrockAccountService = require('../services/bedrockAccountService')
const geminiAccountService = require('../services/geminiAccountService')
const openaiAccountService = require('../services/openaiAccountService')
const accountGroupService = require('../services/accountGroupService')
const redis = require('../models/redis')
const { authenticateAdmin } = require('../middleware/auth')
@@ -388,6 +389,7 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => {
claudeAccountId,
claudeConsoleAccountId,
geminiAccountId,
openaiAccountId,
permissions,
concurrencyLimit,
rateLimitWindow,
@@ -483,6 +485,7 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => {
claudeAccountId,
claudeConsoleAccountId,
geminiAccountId,
openaiAccountId,
permissions,
concurrencyLimit,
rateLimitWindow,
@@ -515,6 +518,7 @@ router.post('/api-keys/batch', authenticateAdmin, async (req, res) => {
claudeAccountId,
claudeConsoleAccountId,
geminiAccountId,
openaiAccountId,
permissions,
concurrencyLimit,
rateLimitWindow,
@@ -557,6 +561,7 @@ router.post('/api-keys/batch', authenticateAdmin, async (req, res) => {
claudeAccountId,
claudeConsoleAccountId,
geminiAccountId,
openaiAccountId,
permissions,
concurrencyLimit,
rateLimitWindow,
@@ -626,6 +631,7 @@ router.put('/api-keys/:keyId', authenticateAdmin, async (req, res) => {
claudeAccountId,
claudeConsoleAccountId,
geminiAccountId,
openaiAccountId,
permissions,
enableModelRestriction,
restrictedModels,
@@ -684,12 +690,17 @@ router.put('/api-keys/:keyId', authenticateAdmin, async (req, res) => {
updates.geminiAccountId = geminiAccountId || ''
}
if (openaiAccountId !== undefined) {
// 空字符串表示解绑null或空字符串都设置为空字符串
updates.openaiAccountId = openaiAccountId || ''
}
if (permissions !== undefined) {
// 验证权限值
if (!['claude', 'gemini', 'all'].includes(permissions)) {
if (!['claude', 'gemini', 'openai', 'all'].includes(permissions)) {
return res
.status(400)
.json({ error: 'Invalid permissions value. Must be claude, gemini, or all' })
.json({ error: 'Invalid permissions value. Must be claude, gemini, openai, or all' })
}
updates.permissions = permissions
}
@@ -894,6 +905,11 @@ router.get('/account-groups/:groupId/members', authenticateAdmin, async (req, re
account = await geminiAccountService.getAccount(memberId)
}
// 如果还找不到尝试OpenAI账户
if (!account) {
account = await openaiAccountService.getAccount(memberId)
}
if (account) {
members.push(account)
}
@@ -2396,6 +2412,7 @@ router.get('/dashboard', authenticateAdmin, async (req, res) => {
claudeConsoleAccounts,
geminiAccounts,
bedrockAccountsResult,
openaiAccounts,
todayStats,
systemAverages,
realtimeMetrics
@@ -2406,6 +2423,7 @@ router.get('/dashboard', authenticateAdmin, async (req, res) => {
claudeConsoleAccountService.getAllAccounts(),
geminiAccountService.getAllAccounts(),
bedrockAccountService.getAllAccounts(),
redis.getAllOpenAIAccounts(),
redis.getTodayStats(),
redis.getSystemAverages(),
redis.getRealtimeSystemMetrics()
@@ -2543,6 +2561,39 @@ router.get('/dashboard', authenticateAdmin, async (req, res) => {
(acc) => acc.rateLimitStatus && acc.rateLimitStatus.isRateLimited
).length
// OpenAI账户统计
// 注意OpenAI账户的isActive和schedulable是字符串类型默认值为'true'
const normalOpenAIAccounts = openaiAccounts.filter(
(acc) =>
(acc.isActive === 'true' ||
acc.isActive === true ||
(!acc.isActive && acc.isActive !== 'false' && acc.isActive !== false)) &&
acc.status !== 'blocked' &&
acc.status !== 'unauthorized' &&
acc.schedulable !== 'false' &&
acc.schedulable !== false && // 包括'true'、true和undefined
!(acc.rateLimitStatus && acc.rateLimitStatus.isRateLimited)
).length
const abnormalOpenAIAccounts = openaiAccounts.filter(
(acc) =>
acc.isActive === 'false' ||
acc.isActive === false ||
acc.status === 'blocked' ||
acc.status === 'unauthorized'
).length
const pausedOpenAIAccounts = openaiAccounts.filter(
(acc) =>
(acc.schedulable === 'false' || acc.schedulable === false) &&
(acc.isActive === 'true' ||
acc.isActive === true ||
(!acc.isActive && acc.isActive !== 'false' && acc.isActive !== false)) &&
acc.status !== 'blocked' &&
acc.status !== 'unauthorized'
).length
const rateLimitedOpenAIAccounts = openaiAccounts.filter(
(acc) => acc.rateLimitStatus && acc.rateLimitStatus.isRateLimited
).length
const dashboard = {
overview: {
totalApiKeys: apiKeys.length,
@@ -2552,27 +2603,32 @@ router.get('/dashboard', authenticateAdmin, async (req, res) => {
claudeAccounts.length +
claudeConsoleAccounts.length +
geminiAccounts.length +
bedrockAccounts.length,
bedrockAccounts.length +
openaiAccounts.length,
normalAccounts:
normalClaudeAccounts +
normalClaudeConsoleAccounts +
normalGeminiAccounts +
normalBedrockAccounts,
normalBedrockAccounts +
normalOpenAIAccounts,
abnormalAccounts:
abnormalClaudeAccounts +
abnormalClaudeConsoleAccounts +
abnormalGeminiAccounts +
abnormalBedrockAccounts,
abnormalBedrockAccounts +
abnormalOpenAIAccounts,
pausedAccounts:
pausedClaudeAccounts +
pausedClaudeConsoleAccounts +
pausedGeminiAccounts +
pausedBedrockAccounts,
pausedBedrockAccounts +
pausedOpenAIAccounts,
rateLimitedAccounts:
rateLimitedClaudeAccounts +
rateLimitedClaudeConsoleAccounts +
rateLimitedGeminiAccounts +
rateLimitedBedrockAccounts,
rateLimitedBedrockAccounts +
rateLimitedOpenAIAccounts,
// 各平台详细统计
accountsByPlatform: {
claude: {
@@ -2602,6 +2658,13 @@ router.get('/dashboard', authenticateAdmin, async (req, res) => {
abnormal: abnormalBedrockAccounts,
paused: pausedBedrockAccounts,
rateLimited: rateLimitedBedrockAccounts
},
openai: {
total: openaiAccounts.length,
normal: normalOpenAIAccounts,
abnormal: abnormalOpenAIAccounts,
paused: pausedOpenAIAccounts,
rateLimited: rateLimitedOpenAIAccounts
}
},
// 保留旧字段以兼容
@@ -2609,7 +2672,8 @@ router.get('/dashboard', authenticateAdmin, async (req, res) => {
normalClaudeAccounts +
normalClaudeConsoleAccounts +
normalGeminiAccounts +
normalBedrockAccounts,
normalBedrockAccounts +
normalOpenAIAccounts,
totalClaudeAccounts: claudeAccounts.length + claudeConsoleAccounts.length,
activeClaudeAccounts: normalClaudeAccounts + normalClaudeConsoleAccounts,
rateLimitedClaudeAccounts: rateLimitedClaudeAccounts + rateLimitedClaudeConsoleAccounts,
@@ -4513,7 +4577,7 @@ router.post('/openai-accounts/exchange-code', authenticateAdmin, async (req, res
// 获取所有 OpenAI 账户
router.get('/openai-accounts', authenticateAdmin, async (req, res) => {
try {
const accounts = await redis.getAllOpenAIAccounts()
const accounts = await openaiAccountService.getAllAccounts()
logger.info(`获取 OpenAI 账户列表: ${accounts.length} 个账户`)
@@ -4553,60 +4617,41 @@ router.post('/openai-accounts', authenticateAdmin, async (req, res) => {
message: '账户名称不能为空'
})
}
const id = uuidv4()
// 创建账户数据
const accountData = {
id,
name,
description: description || '',
platform: 'openai',
accountType: accountType || 'shared',
groupId: groupId || null,
dedicatedApiKeys: dedicatedApiKeys || [],
priority: priority || 50,
rateLimitDuration: rateLimitDuration || 60,
enabled: true,
idToken: claudeAccountService._encryptSensitiveData(openaiOauth.idToken),
accessToken: claudeAccountService._encryptSensitiveData(openaiOauth.accessToken),
refreshToken: claudeAccountService._encryptSensitiveData(openaiOauth.refreshToken),
accountId: accountInfo?.accountId || '',
expiresAt: (Math.floor(Date.now() / 1000) + openaiOauth.expires_in) * 1000,
chatgptUserId: accountInfo?.chatgptUserId || '',
organizationId: accountInfo?.organizationId || '',
organizationRole: accountInfo?.organizationRole || '',
organizationTitle: accountInfo?.organizationTitle || '',
planType: accountInfo?.planType || '',
email: claudeAccountService._encryptSensitiveData(accountInfo?.email || ''),
emailVerified: accountInfo?.emailVerified || false,
openaiOauth: openaiOauth || {},
accountInfo: accountInfo || {},
proxy: proxy?.enabled
? {
type: proxy.type,
host: proxy.host,
port: proxy.port,
username: proxy.username || null,
password: proxy.password || null
}
: null,
isActive: true,
status: 'active',
lastRefresh: new Date().toISOString(),
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString()
schedulable: true
}
// 存储代理配置(如果提供)
if (proxy?.enabled) {
accountData.proxy = {
type: proxy.type,
host: proxy.host,
port: proxy.port,
username: proxy.username || null,
password: proxy.password || null
}
// 创建账户
const createdAccount = await openaiAccountService.createAccount(accountData)
// 如果是分组类型,添加到分组
if (accountType === 'group' && groupId) {
await accountGroupService.addAccountToGroup(createdAccount.id, groupId, 'openai')
}
// 保存到 Redis
const accountId = await redis.setOpenAiAccount(id, accountData)
logger.success(`✅ 创建 OpenAI 账户成功: ${name} (ID: ${accountId})`)
logger.success(`✅ 创建 OpenAI 账户成功: ${name} (ID: ${createdAccount.id})`)
return res.json({
success: true,
data: {
id: accountId,
...accountData
}
data: createdAccount
})
} catch (error) {
logger.error('创建 OpenAI 账户失败:', error)
@@ -4619,19 +4664,100 @@ router.post('/openai-accounts', authenticateAdmin, async (req, res) => {
})
// 更新 OpenAI 账户
router.put('/openai-accounts/:id', authenticateAdmin, async (req, res) =>
//TODO:
res.json({
success: true
})
)
router.put('/openai-accounts/:id', authenticateAdmin, async (req, res) => {
try {
const { id } = req.params
const updates = req.body
// 验证accountType的有效性
if (updates.accountType && !['shared', 'dedicated', 'group'].includes(updates.accountType)) {
return res
.status(400)
.json({ error: 'Invalid account type. Must be "shared", "dedicated" or "group"' })
}
// 如果更新为分组类型验证groupId
if (updates.accountType === 'group' && !updates.groupId) {
return res.status(400).json({ error: 'Group ID is required for group type accounts' })
}
// 获取账户当前信息以处理分组变更
const currentAccount = await openaiAccountService.getAccount(id)
if (!currentAccount) {
return res.status(404).json({ error: 'Account not found' })
}
// 处理分组的变更
if (updates.accountType !== undefined) {
// 如果之前是分组类型,需要从原分组中移除
if (currentAccount.accountType === 'group') {
const oldGroup = await accountGroupService.getAccountGroup(id)
if (oldGroup) {
await accountGroupService.removeAccountFromGroup(id, oldGroup.id)
}
}
// 如果新类型是分组,添加到新分组
if (updates.accountType === 'group' && updates.groupId) {
await accountGroupService.addAccountToGroup(id, updates.groupId, 'openai')
}
}
// 准备更新数据
const updateData = { ...updates }
// 处理敏感数据加密
if (updates.openaiOauth) {
updateData.openaiOauth = updates.openaiOauth
if (updates.openaiOauth.idToken) {
updateData.idToken = updates.openaiOauth.idToken
}
if (updates.openaiOauth.accessToken) {
updateData.accessToken = updates.openaiOauth.accessToken
}
if (updates.openaiOauth.refreshToken) {
updateData.refreshToken = updates.openaiOauth.refreshToken
}
if (updates.openaiOauth.expires_in) {
updateData.expiresAt = new Date(
Date.now() + updates.openaiOauth.expires_in * 1000
).toISOString()
}
}
// 更新账户信息
if (updates.accountInfo) {
updateData.accountId = updates.accountInfo.accountId || currentAccount.accountId
updateData.chatgptUserId = updates.accountInfo.chatgptUserId || currentAccount.chatgptUserId
updateData.organizationId =
updates.accountInfo.organizationId || currentAccount.organizationId
updateData.organizationRole =
updates.accountInfo.organizationRole || currentAccount.organizationRole
updateData.organizationTitle =
updates.accountInfo.organizationTitle || currentAccount.organizationTitle
updateData.planType = updates.accountInfo.planType || currentAccount.planType
updateData.email = updates.accountInfo.email || currentAccount.email
updateData.emailVerified =
updates.accountInfo.emailVerified !== undefined
? updates.accountInfo.emailVerified
: currentAccount.emailVerified
}
const updatedAccount = await openaiAccountService.updateAccount(id, updateData)
logger.success(`📝 Admin updated OpenAI account: ${id}`)
return res.json({ success: true, data: updatedAccount })
} catch (error) {
logger.error('❌ Failed to update OpenAI account:', error)
return res.status(500).json({ error: 'Failed to update account', message: error.message })
}
})
// 删除 OpenAI 账户
router.delete('/openai-accounts/:id', authenticateAdmin, async (req, res) => {
try {
const { id } = req.params
const account = await redis.getOpenAiAccount(id)
const account = await openaiAccountService.getAccount(id)
if (!account) {
return res.status(404).json({
success: false,
@@ -4639,7 +4765,15 @@ router.delete('/openai-accounts/:id', authenticateAdmin, async (req, res) => {
})
}
await redis.deleteOpenAiAccount(id)
// 如果账户在分组中,从分组中移除
if (account.accountType === 'group') {
const group = await accountGroupService.getAccountGroup(id)
if (group) {
await accountGroupService.removeAccountFromGroup(id, group.id)
}
}
await openaiAccountService.deleteAccount(id)
logger.success(`✅ 删除 OpenAI 账户成功: ${account.name} (ID: ${id})`)
@@ -4695,4 +4829,30 @@ router.put('/openai-accounts/:id/toggle', authenticateAdmin, async (req, res) =>
}
})
// 切换 OpenAI 账户调度状态
router.put(
'/openai-accounts/:accountId/toggle-schedulable',
authenticateAdmin,
async (req, res) => {
try {
const { accountId } = req.params
const result = await openaiAccountService.toggleSchedulable(accountId)
return res.json({
success: result.success,
schedulable: result.schedulable,
message: result.schedulable ? '已启用调度' : '已禁用调度'
})
} catch (error) {
logger.error('切换 OpenAI 账户调度状态失败:', error)
return res.status(500).json({
success: false,
message: '切换调度状态失败',
error: error.message
})
}
}
)
module.exports = router

View File

@@ -279,6 +279,9 @@ router.post('/api/user-stats', async (req, res) => {
let currentWindowRequests = 0
let currentWindowTokens = 0
let currentDailyCost = 0
let windowStartTime = null
let windowEndTime = null
let windowRemainingSeconds = null
try {
// 获取当前时间窗口的请求次数和Token使用量
@@ -286,9 +289,32 @@ router.post('/api/user-stats', async (req, res) => {
const client = redis.getClientSafe()
const requestCountKey = `rate_limit:requests:${keyId}`
const tokenCountKey = `rate_limit:tokens:${keyId}`
const windowStartKey = `rate_limit:window_start:${keyId}`
currentWindowRequests = parseInt((await client.get(requestCountKey)) || '0')
currentWindowTokens = parseInt((await client.get(tokenCountKey)) || '0')
// 获取窗口开始时间和计算剩余时间
const windowStart = await client.get(windowStartKey)
if (windowStart) {
const now = Date.now()
windowStartTime = parseInt(windowStart)
const windowDuration = fullKeyData.rateLimitWindow * 60 * 1000 // 转换为毫秒
windowEndTime = windowStartTime + windowDuration
// 如果窗口还有效
if (now < windowEndTime) {
windowRemainingSeconds = Math.max(0, Math.floor((windowEndTime - now) / 1000))
} else {
// 窗口已过期,下次请求会重置
windowStartTime = null
windowEndTime = null
windowRemainingSeconds = 0
// 重置计数为0因为窗口已过期
currentWindowRequests = 0
currentWindowTokens = 0
}
}
}
// 获取当日费用
@@ -334,7 +360,11 @@ router.post('/api/user-stats', async (req, res) => {
// 当前使用量
currentWindowRequests,
currentWindowTokens,
currentDailyCost
currentDailyCost,
// 时间窗口信息
windowStartTime,
windowEndTime,
windowRemainingSeconds
},
// 绑定的账户信息只显示ID不显示敏感信息

View File

@@ -3,33 +3,51 @@ const axios = require('axios')
const router = express.Router()
const logger = require('../utils/logger')
const { authenticateApiKey } = require('../middleware/auth')
const redis = require('../models/redis')
const claudeAccountService = require('../services/claudeAccountService')
const unifiedOpenAIScheduler = require('../services/unifiedOpenAIScheduler')
const openaiAccountService = require('../services/openaiAccountService')
const apiKeyService = require('../services/apiKeyService')
const crypto = require('crypto')
// 选择一个可用的 OpenAI 账户,并返回解密后的 accessToken
async function getOpenAIAuthToken() {
// 使用统一调度器选择 OpenAI 账户
async function getOpenAIAuthToken(apiKeyData, sessionId = null, requestedModel = null) {
try {
const accounts = await redis.getAllOpenAIAccounts()
if (!accounts || accounts.length === 0) {
throw new Error('No OpenAI accounts found in Redis')
// 生成会话哈希如果有会话ID
const sessionHash = sessionId
? crypto.createHash('sha256').update(sessionId).digest('hex')
: null
// 使用统一调度器选择账户
const result = await unifiedOpenAIScheduler.selectAccountForApiKey(
apiKeyData,
sessionHash,
requestedModel
)
if (!result || !result.accountId) {
throw new Error('No available OpenAI account found')
}
// 简单选择策略:选择第一个启用并活跃的账户
const candidate =
accounts.find((a) => String(a.enabled) === 'true' && String(a.isActive) === 'true') ||
accounts[0]
if (!candidate || !candidate.accessToken) {
throw new Error('No valid OpenAI account with accessToken')
// 获取账户详情
const account = await openaiAccountService.getAccount(result.accountId)
if (!account || !account.accessToken) {
throw new Error(`OpenAI account ${result.accountId} has no valid accessToken`)
}
const accessToken = claudeAccountService._decryptSensitiveData(candidate.accessToken)
// 解密 accessToken
const accessToken = claudeAccountService._decryptSensitiveData(account.accessToken)
if (!accessToken) {
throw new Error('Failed to decrypt OpenAI accessToken')
}
return { accessToken, accountId: candidate.accountId || 'unknown' }
logger.info(`Selected OpenAI account: ${account.name} (${result.accountId})`)
return {
accessToken,
accountId: result.accountId,
accountName: account.name
}
} catch (error) {
logger.error('Failed to get OpenAI auth token from Redis:', error)
logger.error('Failed to get OpenAI auth token:', error)
throw error
}
}
@@ -37,7 +55,27 @@ async function getOpenAIAuthToken() {
router.post('/responses', authenticateApiKey, async (req, res) => {
let upstream = null
try {
const { accessToken, accountId } = await getOpenAIAuthToken()
// 从中间件获取 API Key 数据
const apiKeyData = req.apiKeyData || {}
// 从请求头或请求体中提取会话 ID
const sessionId =
req.headers['session_id'] ||
req.headers['x-session-id'] ||
req.body?.session_id ||
req.body?.conversation_id ||
null
// 从请求体中提取模型和流式标志
const requestedModel = req.body?.model || null
const isStream = req.body?.stream !== false // 默认为流式(兼容现有行为)
// 使用调度器选择账户
const { accessToken, accountId } = await getOpenAIAuthToken(
apiKeyData,
sessionId,
requestedModel
)
// 基于白名单构造上游所需的请求头,确保键为小写且值受控
const incoming = req.headers || {}
@@ -54,21 +92,39 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
headers['authorization'] = `Bearer ${accessToken}`
headers['chatgpt-account-id'] = accountId
headers['host'] = 'chatgpt.com'
headers['accept'] = 'text/event-stream'
headers['accept'] = isStream ? 'text/event-stream' : 'application/json'
headers['content-type'] = 'application/json'
req.body['store'] = false
// 使用流式转发,保持与上游一致
upstream = await axios.post('https://chatgpt.com/backend-api/codex/responses', req.body, {
headers,
responseType: 'stream',
timeout: 60000,
validateStatus: () => true
})
// 根据 stream 参数决定请求类型
if (isStream) {
// 流式请求
upstream = await axios.post('https://chatgpt.com/backend-api/codex/responses', req.body, {
headers,
responseType: 'stream',
timeout: 60000,
validateStatus: () => true
})
} else {
// 非流式请求
upstream = await axios.post('https://chatgpt.com/backend-api/codex/responses', req.body, {
headers,
timeout: 60000,
validateStatus: () => true
})
}
res.status(upstream.status)
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
if (isStream) {
// 流式响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
} else {
// 非流式响应头
res.setHeader('Content-Type', 'application/json')
}
// 透传关键诊断头,避免传递不安全或与传输相关的头
const passThroughHeaderKeys = ['openai-version', 'x-request-id', 'openai-processing-ms']
@@ -79,11 +135,170 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
}
}
// 立即刷新响应头,开始 SSE
if (typeof res.flushHeaders === 'function') {
res.flushHeaders()
if (isStream) {
// 立即刷新响应头,开始 SSE
if (typeof res.flushHeaders === 'function') {
res.flushHeaders()
}
}
// 处理响应并捕获 usage 数据和真实的 model
let buffer = ''
let usageData = null
let actualModel = null
let usageReported = false
if (!isStream) {
// 非流式响应处理
try {
logger.info(`📄 Processing OpenAI non-stream response for model: ${requestedModel}`)
// 直接获取完整响应
const responseData = upstream.data
// 从响应中获取实际的 model 和 usage
actualModel = responseData.model || requestedModel || 'gpt-4'
usageData = responseData.usage
logger.debug(`📊 Non-stream response - Model: ${actualModel}, Usage:`, usageData)
// 记录使用统计
if (usageData) {
const inputTokens = usageData.input_tokens || usageData.prompt_tokens || 0
const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0
const cacheCreateTokens = usageData.input_tokens_details?.cache_creation_tokens || 0
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0
await apiKeyService.recordUsage(
apiKeyData.id,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
actualModel,
accountId
)
logger.info(
`📊 Recorded OpenAI non-stream usage - Input: ${inputTokens}, Output: ${outputTokens}, Total: ${usageData.total_tokens || inputTokens + outputTokens}, Model: ${actualModel}`
)
}
// 返回响应
res.json(responseData)
return
} catch (error) {
logger.error('Failed to process non-stream response:', error)
if (!res.headersSent) {
res.status(500).json({ error: { message: 'Failed to process response' } })
}
return
}
}
// 解析 SSE 事件以捕获 usage 数据和 model
const parseSSEForUsage = (data) => {
const lines = data.split('\n')
for (const line of lines) {
if (line.startsWith('event: response.completed')) {
// 下一行应该是数据
continue
}
if (line.startsWith('data: ')) {
try {
const jsonStr = line.slice(6) // 移除 'data: ' 前缀
const eventData = JSON.parse(jsonStr)
// 检查是否是 response.completed 事件
if (eventData.type === 'response.completed' && eventData.response) {
// 从响应中获取真实的 model
if (eventData.response.model) {
actualModel = eventData.response.model
logger.debug(`📊 Captured actual model: ${actualModel}`)
}
// 获取 usage 数据
if (eventData.response.usage) {
usageData = eventData.response.usage
logger.debug('📊 Captured OpenAI usage data:', usageData)
}
}
} catch (e) {
// 忽略解析错误
}
}
}
}
upstream.data.on('data', (chunk) => {
try {
const chunkStr = chunk.toString()
// 转发数据给客户端
if (!res.headersSent) {
res.write(chunk)
}
// 同时解析数据以捕获 usage 信息
buffer += chunkStr
// 处理完整的 SSE 事件
if (buffer.includes('\n\n')) {
const events = buffer.split('\n\n')
buffer = events.pop() || '' // 保留最后一个可能不完整的事件
for (const event of events) {
if (event.trim()) {
parseSSEForUsage(event)
}
}
}
} catch (error) {
logger.error('Error processing OpenAI stream chunk:', error)
}
})
upstream.data.on('end', async () => {
// 处理剩余的 buffer
if (buffer.trim()) {
parseSSEForUsage(buffer)
}
// 记录使用统计
if (!usageReported && usageData) {
try {
const inputTokens = usageData.input_tokens || 0
const outputTokens = usageData.output_tokens || 0
const cacheCreateTokens = usageData.input_tokens_details?.cache_creation_tokens || 0
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0
// 使用响应中的真实 model如果没有则使用请求中的 model最后回退到默认值
const modelToRecord = actualModel || requestedModel || 'gpt-4'
await apiKeyService.recordUsage(
apiKeyData.id,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
modelToRecord,
accountId
)
logger.info(
`📊 Recorded OpenAI usage - Input: ${inputTokens}, Output: ${outputTokens}, Total: ${usageData.total_tokens || inputTokens + outputTokens}, Model: ${modelToRecord} (actual: ${actualModel}, requested: ${requestedModel})`
)
usageReported = true
} catch (error) {
logger.error('Failed to record OpenAI usage:', error)
}
}
res.end()
})
upstream.data.on('error', (err) => {
logger.error('Upstream stream error:', err)
if (!res.headersSent) {
@@ -93,8 +308,6 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
}
})
upstream.data.pipe(res)
// 客户端断开时清理上游流
const cleanup = () => {
try {
@@ -116,4 +329,65 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
}
})
// 使用情况统计端点
router.get('/usage', authenticateApiKey, async (req, res) => {
try {
const { usage } = req.apiKey
res.json({
object: 'usage',
total_tokens: usage.total.tokens,
total_requests: usage.total.requests,
daily_tokens: usage.daily.tokens,
daily_requests: usage.daily.requests,
monthly_tokens: usage.monthly.tokens,
monthly_requests: usage.monthly.requests
})
} catch (error) {
logger.error('Failed to get usage stats:', error)
res.status(500).json({
error: {
message: 'Failed to retrieve usage statistics',
type: 'api_error'
}
})
}
})
// API Key 信息端点
router.get('/key-info', authenticateApiKey, async (req, res) => {
try {
const keyData = req.apiKey
res.json({
id: keyData.id,
name: keyData.name,
description: keyData.description,
permissions: keyData.permissions || 'all',
token_limit: keyData.tokenLimit,
tokens_used: keyData.usage.total.tokens,
tokens_remaining:
keyData.tokenLimit > 0
? Math.max(0, keyData.tokenLimit - keyData.usage.total.tokens)
: null,
rate_limit: {
window: keyData.rateLimitWindow,
requests: keyData.rateLimitRequests
},
usage: {
total: keyData.usage.total,
daily: keyData.usage.daily,
monthly: keyData.usage.monthly
}
})
} catch (error) {
logger.error('Failed to get key info:', error)
res.status(500).json({
error: {
message: 'Failed to retrieve API key information',
type: 'api_error'
}
})
}
})
module.exports = router