feat: claude账户支持使用统一的客户端标识

This commit is contained in:
shaw
2025-09-08 11:35:44 +08:00
parent a9a560da67
commit e824858d60
13 changed files with 1033 additions and 41 deletions

View File

@@ -87,7 +87,8 @@ async function getOpenAIAuthToken(apiKeyData, sessionId = null, requestedModel =
}
}
router.post('/responses', authenticateApiKey, async (req, res) => {
// 主处理函数,供两个路由共享
const handleResponses = async (req, res) => {
let upstream = null
try {
// 从中间件获取 API Key 数据
@@ -205,6 +206,96 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
axiosConfig
)
}
// 处理 429 限流错误
if (upstream.status === 429) {
logger.warn(`🚫 Rate limit detected for OpenAI account ${accountId} (Codex API)`)
// 解析响应体中的限流信息
let resetsInSeconds = null
let errorData = null
try {
// 对于429错误无论是否是流式请求响应都会是完整的JSON错误对象
if (isStream && upstream.data) {
// 流式响应需要先收集数据
const chunks = []
await new Promise((resolve, reject) => {
upstream.data.on('data', (chunk) => chunks.push(chunk))
upstream.data.on('end', resolve)
upstream.data.on('error', reject)
// 设置超时防止无限等待
setTimeout(resolve, 5000)
})
const fullResponse = Buffer.concat(chunks).toString()
try {
errorData = JSON.parse(fullResponse)
} catch (e) {
logger.error('Failed to parse 429 error response:', e)
logger.debug('Raw response:', fullResponse)
}
} else {
// 非流式响应直接使用data
errorData = upstream.data
}
// 提取重置时间
if (errorData && errorData.error && errorData.error.resets_in_seconds) {
resetsInSeconds = errorData.error.resets_in_seconds
logger.info(
`🕐 Codex rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)`
)
} else {
logger.warn(
'⚠️ Could not extract resets_in_seconds from 429 response, using default 60 minutes'
)
}
} catch (e) {
logger.error('⚠️ Failed to parse rate limit error:', e)
}
// 标记账户为限流状态
await unifiedOpenAIScheduler.markAccountRateLimited(
accountId,
'openai',
sessionId ? crypto.createHash('sha256').update(sessionId).digest('hex') : null,
resetsInSeconds
)
// 返回错误响应给客户端
const errorResponse = errorData || {
error: {
type: 'usage_limit_reached',
message: 'The usage limit has been reached',
resets_in_seconds: resetsInSeconds
}
}
if (isStream) {
// 流式响应也需要设置正确的状态码
res.status(429)
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.write(`data: ${JSON.stringify(errorResponse)}\n\n`)
res.end()
} else {
res.status(429).json(errorResponse)
}
return
} else if (upstream.status === 200 || upstream.status === 201) {
// 请求成功,检查并移除限流状态
const isRateLimited = await unifiedOpenAIScheduler.isAccountRateLimited(accountId)
if (isRateLimited) {
logger.info(
`✅ Removing rate limit for OpenAI account ${accountId} after successful request`
)
await unifiedOpenAIScheduler.removeAccountRateLimit(accountId, 'openai')
}
}
res.status(upstream.status)
if (isStream) {
@@ -239,6 +330,8 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
let usageData = null
let actualModel = null
let usageReported = false
let rateLimitDetected = false
let rateLimitResetsInSeconds = null
if (!isStream) {
// 非流式响应处理
@@ -317,6 +410,17 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
logger.debug('📊 Captured OpenAI usage data:', usageData)
}
}
// 检查是否有限流错误
if (eventData.error && eventData.error.type === 'usage_limit_reached') {
rateLimitDetected = true
if (eventData.error.resets_in_seconds) {
rateLimitResetsInSeconds = eventData.error.resets_in_seconds
logger.warn(
`🚫 Rate limit detected in stream, resets in ${rateLimitResetsInSeconds} seconds`
)
}
}
} catch (e) {
// 忽略解析错误
}
@@ -388,6 +492,26 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
}
}
// 如果在流式响应中检测到限流
if (rateLimitDetected) {
logger.warn(`🚫 Processing rate limit for OpenAI account ${accountId} from stream`)
await unifiedOpenAIScheduler.markAccountRateLimited(
accountId,
'openai',
sessionId ? crypto.createHash('sha256').update(sessionId).digest('hex') : null,
rateLimitResetsInSeconds
)
} else if (upstream.status === 200) {
// 流式请求成功,检查并移除限流状态
const isRateLimited = await unifiedOpenAIScheduler.isAccountRateLimited(accountId)
if (isRateLimited) {
logger.info(
`✅ Removing rate limit for OpenAI account ${accountId} after successful stream`
)
await unifiedOpenAIScheduler.removeAccountRateLimit(accountId, 'openai')
}
}
res.end()
})
@@ -419,7 +543,11 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
res.status(status).json({ error: { message } })
}
}
})
}
// 注册两个路由路径,都使用相同的处理函数
router.post('/responses', authenticateApiKey, handleResponses)
router.post('/v1/responses', authenticateApiKey, handleResponses)
// 使用情况统计端点
router.get('/usage', authenticateApiKey, async (req, res) => {