Merge branch 'main' into dev

This commit is contained in:
shaw
2025-12-21 21:35:16 +08:00
40 changed files with 7073 additions and 349 deletions

View File

@@ -945,6 +945,30 @@ async function calculateKeyStats(keyId, timeRange, startDate, endDate) {
allTimeCost = parseFloat((await client.get(totalCostKey)) || '0')
}
// 🔧 FIX: 对于 "全部时间" 时间范围,直接使用 allTimeCost
// 因为 usage:*:model:daily:* 键有 30 天 TTL旧数据已经过期
if (timeRange === 'all' && allTimeCost > 0) {
logger.debug(`📊 使用 allTimeCost 计算 timeRange='all': ${allTimeCost}`)
return {
requests: 0, // 旧数据详情不可用
tokens: 0,
inputTokens: 0,
outputTokens: 0,
cacheCreateTokens: 0,
cacheReadTokens: 0,
cost: allTimeCost,
formattedCost: CostCalculator.formatCost(allTimeCost),
// 实时限制数据(始终返回,不受时间范围影响)
dailyCost,
currentWindowCost,
windowRemainingSeconds,
windowStartTime,
windowEndTime,
allTimeCost
}
}
// 只在启用了窗口限制时查询窗口数据
if (rateLimitWindow > 0) {
const costCountKey = `rate_limit:cost:${keyId}`
@@ -1006,12 +1030,10 @@ async function calculateKeyStats(keyId, timeRange, startDate, endDate) {
const modelStatsMap = new Map()
let totalRequests = 0
// 用于去重:统计数据,避免与数据重复
// 用于去重:统计数据,避免与数据重复
const dailyKeyPattern = /usage:.+:model:daily:(.+):\d{4}-\d{2}-\d{2}$/
const monthlyKeyPattern = /usage:.+:model:monthly:(.+):\d{4}-\d{2}$/
// 检查是否有日数据
const hasDailyData = uniqueKeys.some((key) => dailyKeyPattern.test(key))
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(2, '0')}`
for (let i = 0; i < results.length; i++) {
const [err, data] = results[i]
@@ -1038,8 +1060,12 @@ async function calculateKeyStats(keyId, timeRange, startDate, endDate) {
continue
}
// 如果有日数据,则跳过月数据以避免重复
if (hasDailyData && isMonthly) {
// 跳过当前月的月数据
if (isMonthly && key.includes(`:${currentMonth}`)) {
continue
}
// 跳过非当前月的日数据
if (!isMonthly && !key.includes(`:${currentMonth}-`)) {
continue
}

View File

@@ -9,6 +9,7 @@ const router = express.Router()
const claudeAccountService = require('../../services/claudeAccountService')
const claudeRelayService = require('../../services/claudeRelayService')
const accountGroupService = require('../../services/accountGroupService')
const accountTestSchedulerService = require('../../services/accountTestSchedulerService')
const apiKeyService = require('../../services/apiKeyService')
const redis = require('../../models/redis')
const { authenticateAdmin } = require('../../middleware/auth')
@@ -277,7 +278,7 @@ router.post('/claude-accounts/oauth-with-cookie', authenticateAdmin, async (req,
logger.info('🍪 Starting Cookie-based OAuth authorization', {
sessionKeyLength: trimmedSessionKey.length,
sessionKeyPrefix: trimmedSessionKey.substring(0, 10) + '...',
sessionKeyPrefix: `${trimmedSessionKey.substring(0, 10)}...`,
hasProxy: !!proxy
})
@@ -326,7 +327,7 @@ router.post('/claude-accounts/setup-token-with-cookie', authenticateAdmin, async
logger.info('🍪 Starting Cookie-based Setup Token authorization', {
sessionKeyLength: trimmedSessionKey.length,
sessionKeyPrefix: trimmedSessionKey.substring(0, 10) + '...',
sessionKeyPrefix: `${trimmedSessionKey.substring(0, 10)}...`,
hasProxy: !!proxy
})
@@ -583,7 +584,8 @@ router.post('/claude-accounts', authenticateAdmin, async (req, res) => {
useUnifiedClientId,
unifiedClientId,
expiresAt,
extInfo
extInfo,
maxConcurrency
} = req.body
if (!name) {
@@ -628,7 +630,8 @@ router.post('/claude-accounts', authenticateAdmin, async (req, res) => {
useUnifiedClientId: useUnifiedClientId === true, // 默认为false
unifiedClientId: unifiedClientId || '', // 统一的客户端标识
expiresAt: expiresAt || null, // 账户订阅到期时间
extInfo: extInfo || null
extInfo: extInfo || null,
maxConcurrency: maxConcurrency || 0 // 账户级串行队列0=使用全局配置,>0=强制启用
})
// 如果是分组类型,将账户添加到分组
@@ -903,4 +906,219 @@ router.post('/claude-accounts/:accountId/test', authenticateAdmin, async (req, r
}
})
// ============================================================================
// 账户定时测试相关端点
// ============================================================================
// 获取账户测试历史
router.get('/claude-accounts/:accountId/test-history', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
try {
const history = await redis.getAccountTestHistory(accountId, 'claude')
return res.json({
success: true,
data: {
accountId,
platform: 'claude',
history
}
})
} catch (error) {
logger.error(`❌ Failed to get test history for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to get test history',
message: error.message
})
}
})
// 获取账户定时测试配置
router.get('/claude-accounts/:accountId/test-config', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
try {
const testConfig = await redis.getAccountTestConfig(accountId, 'claude')
return res.json({
success: true,
data: {
accountId,
platform: 'claude',
config: testConfig || {
enabled: false,
cronExpression: '0 8 * * *',
model: 'claude-sonnet-4-5-20250929'
}
}
})
} catch (error) {
logger.error(`❌ Failed to get test config for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to get test config',
message: error.message
})
}
})
// 设置账户定时测试配置
router.put('/claude-accounts/:accountId/test-config', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
const { enabled, cronExpression, model } = req.body
try {
// 验证 enabled 参数
if (typeof enabled !== 'boolean') {
return res.status(400).json({
error: 'Invalid parameter',
message: 'enabled must be a boolean'
})
}
// 验证 cronExpression 参数
if (!cronExpression || typeof cronExpression !== 'string') {
return res.status(400).json({
error: 'Invalid parameter',
message: 'cronExpression is required and must be a string'
})
}
// 限制 cronExpression 长度防止 DoS
const MAX_CRON_LENGTH = 100
if (cronExpression.length > MAX_CRON_LENGTH) {
return res.status(400).json({
error: 'Invalid parameter',
message: `cronExpression too long (max ${MAX_CRON_LENGTH} characters)`
})
}
// 使用 service 的方法验证 cron 表达式
if (!accountTestSchedulerService.validateCronExpression(cronExpression)) {
return res.status(400).json({
error: 'Invalid parameter',
message: `Invalid cron expression: ${cronExpression}. Format: "minute hour day month weekday" (e.g., "0 8 * * *" for daily at 8:00)`
})
}
// 验证模型参数
const testModel = model || 'claude-sonnet-4-5-20250929'
if (typeof testModel !== 'string' || testModel.length > 256) {
return res.status(400).json({
error: 'Invalid parameter',
message: 'model must be a valid string (max 256 characters)'
})
}
// 检查账户是否存在
const account = await claudeAccountService.getAccount(accountId)
if (!account) {
return res.status(404).json({
error: 'Account not found',
message: `Claude account ${accountId} not found`
})
}
// 保存配置
await redis.saveAccountTestConfig(accountId, 'claude', {
enabled,
cronExpression,
model: testModel
})
logger.success(
`📝 Updated test config for Claude account ${accountId}: enabled=${enabled}, cronExpression=${cronExpression}, model=${testModel}`
)
return res.json({
success: true,
message: 'Test config updated successfully',
data: {
accountId,
platform: 'claude',
config: { enabled, cronExpression, model: testModel }
}
})
} catch (error) {
logger.error(`❌ Failed to update test config for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to update test config',
message: error.message
})
}
})
// 手动触发账户测试非流式返回JSON结果
router.post('/claude-accounts/:accountId/test-sync', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
try {
// 检查账户是否存在
const account = await claudeAccountService.getAccount(accountId)
if (!account) {
return res.status(404).json({
error: 'Account not found',
message: `Claude account ${accountId} not found`
})
}
logger.info(`🧪 Manual sync test triggered for Claude account: ${accountId}`)
// 执行测试
const testResult = await claudeRelayService.testAccountConnectionSync(accountId)
// 保存测试结果到历史
await redis.saveAccountTestResult(accountId, 'claude', testResult)
await redis.setAccountLastTestTime(accountId, 'claude')
return res.json({
success: true,
data: {
accountId,
platform: 'claude',
result: testResult
}
})
} catch (error) {
logger.error(`❌ Failed to run sync test for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to run test',
message: error.message
})
}
})
// 批量获取多个账户的测试历史
router.post('/claude-accounts/batch-test-history', authenticateAdmin, async (req, res) => {
const { accountIds } = req.body
try {
if (!Array.isArray(accountIds) || accountIds.length === 0) {
return res.status(400).json({
error: 'Invalid parameter',
message: 'accountIds must be a non-empty array'
})
}
// 限制批量查询数量
const limitedIds = accountIds.slice(0, 100)
const accounts = limitedIds.map((accountId) => ({
accountId,
platform: 'claude'
}))
const historyMap = await redis.getAccountsTestHistory(accounts)
return res.json({
success: true,
data: historyMap
})
} catch (error) {
logger.error('❌ Failed to get batch test history:', error)
return res.status(500).json({
error: 'Failed to get batch test history',
message: error.message
})
}
})
module.exports = router

View File

@@ -40,7 +40,14 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => {
claudeCodeOnlyEnabled,
globalSessionBindingEnabled,
sessionBindingErrorMessage,
sessionBindingTtlDays
sessionBindingTtlDays,
userMessageQueueEnabled,
userMessageQueueDelayMs,
userMessageQueueTimeoutMs,
concurrentRequestQueueEnabled,
concurrentRequestQueueMaxSize,
concurrentRequestQueueMaxSizeMultiplier,
concurrentRequestQueueTimeoutMs
} = req.body
// 验证输入
@@ -78,15 +85,117 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => {
}
}
// 验证用户消息队列配置
if (userMessageQueueEnabled !== undefined && typeof userMessageQueueEnabled !== 'boolean') {
return res.status(400).json({ error: 'userMessageQueueEnabled must be a boolean' })
}
if (userMessageQueueDelayMs !== undefined) {
if (
typeof userMessageQueueDelayMs !== 'number' ||
userMessageQueueDelayMs < 0 ||
userMessageQueueDelayMs > 10000
) {
return res
.status(400)
.json({ error: 'userMessageQueueDelayMs must be a number between 0 and 10000' })
}
}
if (userMessageQueueTimeoutMs !== undefined) {
if (
typeof userMessageQueueTimeoutMs !== 'number' ||
userMessageQueueTimeoutMs < 1000 ||
userMessageQueueTimeoutMs > 300000
) {
return res
.status(400)
.json({ error: 'userMessageQueueTimeoutMs must be a number between 1000 and 300000' })
}
}
// 验证并发请求排队配置
if (
concurrentRequestQueueEnabled !== undefined &&
typeof concurrentRequestQueueEnabled !== 'boolean'
) {
return res.status(400).json({ error: 'concurrentRequestQueueEnabled must be a boolean' })
}
if (concurrentRequestQueueMaxSize !== undefined) {
if (
typeof concurrentRequestQueueMaxSize !== 'number' ||
!Number.isInteger(concurrentRequestQueueMaxSize) ||
concurrentRequestQueueMaxSize < 1 ||
concurrentRequestQueueMaxSize > 100
) {
return res
.status(400)
.json({ error: 'concurrentRequestQueueMaxSize must be an integer between 1 and 100' })
}
}
if (concurrentRequestQueueMaxSizeMultiplier !== undefined) {
// 使用 Number.isFinite() 同时排除 NaN、Infinity、-Infinity 和非数字类型
if (
!Number.isFinite(concurrentRequestQueueMaxSizeMultiplier) ||
concurrentRequestQueueMaxSizeMultiplier < 0 ||
concurrentRequestQueueMaxSizeMultiplier > 10
) {
return res.status(400).json({
error: 'concurrentRequestQueueMaxSizeMultiplier must be a finite number between 0 and 10'
})
}
}
if (concurrentRequestQueueTimeoutMs !== undefined) {
if (
typeof concurrentRequestQueueTimeoutMs !== 'number' ||
!Number.isInteger(concurrentRequestQueueTimeoutMs) ||
concurrentRequestQueueTimeoutMs < 5000 ||
concurrentRequestQueueTimeoutMs > 300000
) {
return res.status(400).json({
error:
'concurrentRequestQueueTimeoutMs must be an integer between 5000 and 300000 (5 seconds to 5 minutes)'
})
}
}
const updateData = {}
if (claudeCodeOnlyEnabled !== undefined)
if (claudeCodeOnlyEnabled !== undefined) {
updateData.claudeCodeOnlyEnabled = claudeCodeOnlyEnabled
if (globalSessionBindingEnabled !== undefined)
}
if (globalSessionBindingEnabled !== undefined) {
updateData.globalSessionBindingEnabled = globalSessionBindingEnabled
if (sessionBindingErrorMessage !== undefined)
}
if (sessionBindingErrorMessage !== undefined) {
updateData.sessionBindingErrorMessage = sessionBindingErrorMessage
if (sessionBindingTtlDays !== undefined)
}
if (sessionBindingTtlDays !== undefined) {
updateData.sessionBindingTtlDays = sessionBindingTtlDays
}
if (userMessageQueueEnabled !== undefined) {
updateData.userMessageQueueEnabled = userMessageQueueEnabled
}
if (userMessageQueueDelayMs !== undefined) {
updateData.userMessageQueueDelayMs = userMessageQueueDelayMs
}
if (userMessageQueueTimeoutMs !== undefined) {
updateData.userMessageQueueTimeoutMs = userMessageQueueTimeoutMs
}
if (concurrentRequestQueueEnabled !== undefined) {
updateData.concurrentRequestQueueEnabled = concurrentRequestQueueEnabled
}
if (concurrentRequestQueueMaxSize !== undefined) {
updateData.concurrentRequestQueueMaxSize = concurrentRequestQueueMaxSize
}
if (concurrentRequestQueueMaxSizeMultiplier !== undefined) {
updateData.concurrentRequestQueueMaxSizeMultiplier = concurrentRequestQueueMaxSizeMultiplier
}
if (concurrentRequestQueueTimeoutMs !== undefined) {
updateData.concurrentRequestQueueTimeoutMs = concurrentRequestQueueTimeoutMs
}
const updatedConfig = await claudeRelayConfigService.updateConfig(
updateData,

View File

@@ -7,26 +7,40 @@ const express = require('express')
const router = express.Router()
const redis = require('../../models/redis')
const logger = require('../../utils/logger')
const { authenticateAdmin } = require('../../middleware/auth')
const { calculateWaitTimeStats } = require('../../utils/statsHelper')
/**
* GET /admin/concurrency
* 获取所有并发状态
*/
router.get('/concurrency', async (req, res) => {
router.get('/concurrency', authenticateAdmin, async (req, res) => {
try {
const status = await redis.getAllConcurrencyStatus()
// 为每个 API Key 获取排队计数
const statusWithQueue = await Promise.all(
status.map(async (s) => {
const queueCount = await redis.getConcurrencyQueueCount(s.apiKeyId)
return {
...s,
queueCount
}
})
)
// 计算汇总统计
const summary = {
totalKeys: status.length,
totalActiveRequests: status.reduce((sum, s) => sum + s.activeCount, 0),
totalExpiredRequests: status.reduce((sum, s) => sum + s.expiredCount, 0)
totalKeys: statusWithQueue.length,
totalActiveRequests: statusWithQueue.reduce((sum, s) => sum + s.activeCount, 0),
totalExpiredRequests: statusWithQueue.reduce((sum, s) => sum + s.expiredCount, 0),
totalQueuedRequests: statusWithQueue.reduce((sum, s) => sum + s.queueCount, 0)
}
res.json({
success: true,
summary,
concurrencyStatus: status
concurrencyStatus: statusWithQueue
})
} catch (error) {
logger.error('❌ Failed to get concurrency status:', error)
@@ -39,17 +53,171 @@ router.get('/concurrency', async (req, res) => {
})
/**
* GET /admin/concurrency/:apiKeyId
* 获取特定 API Key 的并发状态详情
* GET /admin/concurrency-queue/stats
* 获取排队统计信息
*/
router.get('/concurrency/:apiKeyId', async (req, res) => {
router.get('/concurrency-queue/stats', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.params
const status = await redis.getConcurrencyStatus(apiKeyId)
// 获取所有有统计数据的 API Key
const statsKeys = await redis.scanConcurrencyQueueStatsKeys()
const queueKeys = await redis.scanConcurrencyQueueKeys()
// 合并所有相关的 API Key
const allApiKeyIds = [...new Set([...statsKeys, ...queueKeys])]
// 获取各 API Key 的详细统计
const perKeyStats = await Promise.all(
allApiKeyIds.map(async (apiKeyId) => {
const [queueCount, stats, waitTimes] = await Promise.all([
redis.getConcurrencyQueueCount(apiKeyId),
redis.getConcurrencyQueueStats(apiKeyId),
redis.getQueueWaitTimes(apiKeyId)
])
return {
apiKeyId,
currentQueueCount: queueCount,
stats,
waitTimeStats: calculateWaitTimeStats(waitTimes)
}
})
)
// 获取全局等待时间统计
const globalWaitTimes = await redis.getGlobalQueueWaitTimes()
const globalWaitTimeStats = calculateWaitTimeStats(globalWaitTimes)
// 计算全局汇总
const globalStats = {
totalEntered: perKeyStats.reduce((sum, s) => sum + s.stats.entered, 0),
totalSuccess: perKeyStats.reduce((sum, s) => sum + s.stats.success, 0),
totalTimeout: perKeyStats.reduce((sum, s) => sum + s.stats.timeout, 0),
totalCancelled: perKeyStats.reduce((sum, s) => sum + s.stats.cancelled, 0),
totalSocketChanged: perKeyStats.reduce((sum, s) => sum + (s.stats.socket_changed || 0), 0),
totalRejectedOverload: perKeyStats.reduce(
(sum, s) => sum + (s.stats.rejected_overload || 0),
0
),
currentTotalQueued: perKeyStats.reduce((sum, s) => sum + s.currentQueueCount, 0),
// 队列资源利用率指标
peakQueueSize:
perKeyStats.length > 0 ? Math.max(...perKeyStats.map((s) => s.currentQueueCount)) : 0,
avgQueueSize:
perKeyStats.length > 0
? Math.round(
perKeyStats.reduce((sum, s) => sum + s.currentQueueCount, 0) / perKeyStats.length
)
: 0,
activeApiKeys: perKeyStats.filter((s) => s.currentQueueCount > 0).length
}
// 计算成功率
if (globalStats.totalEntered > 0) {
globalStats.successRate = Math.round(
(globalStats.totalSuccess / globalStats.totalEntered) * 100
)
globalStats.timeoutRate = Math.round(
(globalStats.totalTimeout / globalStats.totalEntered) * 100
)
globalStats.cancelledRate = Math.round(
(globalStats.totalCancelled / globalStats.totalEntered) * 100
)
}
// 从全局等待时间统计中提取关键指标
if (globalWaitTimeStats) {
globalStats.avgWaitTimeMs = globalWaitTimeStats.avg
globalStats.p50WaitTimeMs = globalWaitTimeStats.p50
globalStats.p90WaitTimeMs = globalWaitTimeStats.p90
globalStats.p99WaitTimeMs = globalWaitTimeStats.p99
// 多实例采样策略标记(详见 design.md Decision 9
// 全局 P90 仅用于可视化和监控,不用于系统决策
// 健康检查使用 API Key 级别的 P90每 Key 独立采样)
globalWaitTimeStats.globalP90ForVisualizationOnly = true
}
res.json({
success: true,
concurrencyStatus: status
globalStats,
globalWaitTimeStats,
perKeyStats
})
} catch (error) {
logger.error('❌ Failed to get queue stats:', error)
res.status(500).json({
success: false,
error: 'Failed to get queue stats',
message: error.message
})
}
})
/**
* DELETE /admin/concurrency-queue/:apiKeyId
* 清理特定 API Key 的排队计数
*/
router.delete('/concurrency-queue/:apiKeyId', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.params
await redis.clearConcurrencyQueue(apiKeyId)
logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} cleared queue for key ${apiKeyId}`)
res.json({
success: true,
message: `Successfully cleared queue for API key ${apiKeyId}`
})
} catch (error) {
logger.error(`❌ Failed to clear queue for ${req.params.apiKeyId}:`, error)
res.status(500).json({
success: false,
error: 'Failed to clear queue',
message: error.message
})
}
})
/**
* DELETE /admin/concurrency-queue
* 清理所有排队计数
*/
router.delete('/concurrency-queue', authenticateAdmin, async (req, res) => {
try {
const cleared = await redis.clearAllConcurrencyQueues()
logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} cleared ALL queues`)
res.json({
success: true,
message: 'Successfully cleared all queues',
cleared
})
} catch (error) {
logger.error('❌ Failed to clear all queues:', error)
res.status(500).json({
success: false,
error: 'Failed to clear all queues',
message: error.message
})
}
})
/**
* GET /admin/concurrency/:apiKeyId
* 获取特定 API Key 的并发状态详情
*/
router.get('/concurrency/:apiKeyId', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.params
const status = await redis.getConcurrencyStatus(apiKeyId)
const queueCount = await redis.getConcurrencyQueueCount(apiKeyId)
res.json({
success: true,
concurrencyStatus: {
...status,
queueCount
}
})
} catch (error) {
logger.error(`❌ Failed to get concurrency status for ${req.params.apiKeyId}:`, error)
@@ -65,7 +233,7 @@ router.get('/concurrency/:apiKeyId', async (req, res) => {
* DELETE /admin/concurrency/:apiKeyId
* 强制清理特定 API Key 的并发计数
*/
router.delete('/concurrency/:apiKeyId', async (req, res) => {
router.delete('/concurrency/:apiKeyId', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.params
const result = await redis.forceClearConcurrency(apiKeyId)
@@ -93,7 +261,7 @@ router.delete('/concurrency/:apiKeyId', async (req, res) => {
* DELETE /admin/concurrency
* 强制清理所有并发计数
*/
router.delete('/concurrency', async (req, res) => {
router.delete('/concurrency', authenticateAdmin, async (req, res) => {
try {
const result = await redis.forceClearAllConcurrency()
@@ -118,7 +286,7 @@ router.delete('/concurrency', async (req, res) => {
* POST /admin/concurrency/cleanup
* 清理过期的并发条目(不影响活跃请求)
*/
router.post('/concurrency/cleanup', async (req, res) => {
router.post('/concurrency/cleanup', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.body
const result = await redis.cleanupExpiredConcurrency(apiKeyId || null)

View File

@@ -38,6 +38,73 @@ function queueRateLimitUpdate(rateLimitInfo, usageSummary, model, context = '')
})
}
/**
* 判断是否为旧会话(污染的会话)
* Claude Code 发送的请求特点:
* - messages 数组通常只有 1 个元素
* - 历史对话记录嵌套在单个 message 的 content 数组中
* - content 数组中包含 <system-reminder> 开头的系统注入内容
*
* 污染会话的特征:
* 1. messages.length > 1
* 2. messages.length === 1 但 content 中有多个用户输入
* 3. "warmup" 请求:单条简单消息 + 无 tools真正新会话会带 tools
*
* @param {Object} body - 请求体
* @returns {boolean} 是否为旧会话
*/
function isOldSession(body) {
const messages = body?.messages
const tools = body?.tools
if (!messages || messages.length === 0) {
return false
}
// 1. 多条消息 = 旧会话
if (messages.length > 1) {
return true
}
// 2. 单条消息,分析 content
const firstMessage = messages[0]
const content = firstMessage?.content
if (!content) {
return false
}
// 如果 content 是字符串,只有一条输入,需要检查 tools
if (typeof content === 'string') {
// 有 tools = 正常新会话,无 tools = 可疑
return !tools || tools.length === 0
}
// 如果 content 是数组,统计非 system-reminder 的元素
if (Array.isArray(content)) {
const userInputs = content.filter((item) => {
if (item.type !== 'text') {
return false
}
const text = item.text || ''
// 剔除以 <system-reminder> 开头的
return !text.trimStart().startsWith('<system-reminder>')
})
// 多个用户输入 = 旧会话
if (userInputs.length > 1) {
return true
}
// Warmup 检测:单个消息 + 无 tools = 旧会话
if (userInputs.length === 1 && (!tools || tools.length === 0)) {
return true
}
}
return false
}
// 🔧 共享的消息处理函数
async function handleMessagesRequest(req, res) {
try {
@@ -123,12 +190,42 @@ async function handleMessagesRequest(req, res) {
)
if (isStream) {
// 🔍 检查客户端连接是否仍然有效(可能在并发排队等待期间断开)
if (res.destroyed || res.socket?.destroyed || res.writableEnded) {
logger.warn(
`⚠️ Client disconnected before stream response could start for key: ${req.apiKey?.name || 'unknown'}`
)
return undefined
}
// 流式响应 - 只使用官方真实usage数据
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('X-Accel-Buffering', 'no') // 禁用 Nginx 缓冲
// ⚠️ 检查 headers 是否已发送(可能在排队心跳时已设置)
if (!res.headersSent) {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
// ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close
// 当并发队列功能启用时auth.js 会设置 Connection: close 来禁用 Keep-Alive
// 这里只在没有设置过 Connection 头时才设置 keep-alive
const existingConnection = res.getHeader('Connection')
if (!existingConnection) {
res.setHeader('Connection', 'keep-alive')
} else {
logger.api(
`🔌 [STREAM] Preserving existing Connection header: ${existingConnection} for key: ${req.apiKey?.name || 'unknown'}`
)
}
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('X-Accel-Buffering', 'no') // 禁用 Nginx 缓冲
} else {
logger.debug(
`📤 [STREAM] Headers already sent, skipping setHeader for key: ${req.apiKey?.name || 'unknown'}`
)
}
// 禁用 Nagle 算法,确保数据立即发送
if (res.socket && typeof res.socket.setNoDelay === 'function') {
@@ -233,19 +330,18 @@ async function handleMessagesRequest(req, res) {
}
// 🔗 在成功调度后建立会话绑定(仅 claude-official 类型)
// claude-official 只接受1) 新会话(messages.length=1) 2) 已绑定的会话
// claude-official 只接受1) 新会话 2) 已绑定的会话
if (
needSessionBinding &&
originalSessionIdForBinding &&
accountId &&
accountType === 'claude-official'
) {
// 🚫 新会话必须 messages.length === 1
const messages = req.body?.messages
if (messages && messages.length > 1) {
// 🚫 检测旧会话(污染的会话)
if (isOldSession(req.body)) {
const cfg = await claudeRelayConfigService.getConfig()
logger.warn(
`🚫 New session with messages.length > 1 rejected: sessionId=${originalSessionIdForBinding}, messages.length=${messages.length}`
`🚫 Old session rejected: sessionId=${originalSessionIdForBinding}, messages.length=${req.body?.messages?.length}, tools.length=${req.body?.tools?.length || 0}, isOldSession=true`
)
return res.status(400).json({
error: {
@@ -591,12 +687,61 @@ async function handleMessagesRequest(req, res) {
}
}, 1000) // 1秒后检查
} else {
// 🔍 检查客户端连接是否仍然有效(可能在并发排队等待期间断开)
if (res.destroyed || res.socket?.destroyed || res.writableEnded) {
logger.warn(
`⚠️ Client disconnected before non-stream request could start for key: ${req.apiKey?.name || 'unknown'}`
)
return undefined
}
// 非流式响应 - 只使用官方真实usage数据
logger.info('📄 Starting non-streaming request', {
apiKeyId: req.apiKey.id,
apiKeyName: req.apiKey.name
})
// 📊 监听 socket 事件以追踪连接状态变化
const nonStreamSocket = res.socket
let _clientClosedConnection = false
let _socketCloseTime = null
if (nonStreamSocket) {
const onSocketEnd = () => {
_clientClosedConnection = true
_socketCloseTime = Date.now()
logger.warn(
`⚠️ [NON-STREAM] Socket 'end' event - client sent FIN | key: ${req.apiKey?.name}, ` +
`requestId: ${req.requestId}, elapsed: ${Date.now() - startTime}ms`
)
}
const onSocketClose = () => {
_clientClosedConnection = true
logger.warn(
`⚠️ [NON-STREAM] Socket 'close' event | key: ${req.apiKey?.name}, ` +
`requestId: ${req.requestId}, elapsed: ${Date.now() - startTime}ms, ` +
`hadError: ${nonStreamSocket.destroyed}`
)
}
const onSocketError = (err) => {
logger.error(
`❌ [NON-STREAM] Socket error | key: ${req.apiKey?.name}, ` +
`requestId: ${req.requestId}, error: ${err.message}`
)
}
nonStreamSocket.once('end', onSocketEnd)
nonStreamSocket.once('close', onSocketClose)
nonStreamSocket.once('error', onSocketError)
// 清理监听器(在响应结束后)
res.once('finish', () => {
nonStreamSocket.removeListener('end', onSocketEnd)
nonStreamSocket.removeListener('close', onSocketClose)
nonStreamSocket.removeListener('error', onSocketError)
})
}
// 生成会话哈希用于sticky会话
const sessionHash = sessionHelper.generateSessionHash(req.body)
@@ -684,19 +829,18 @@ async function handleMessagesRequest(req, res) {
}
// 🔗 在成功调度后建立会话绑定(非流式,仅 claude-official 类型)
// claude-official 只接受1) 新会话(messages.length=1) 2) 已绑定的会话
// claude-official 只接受1) 新会话 2) 已绑定的会话
if (
needSessionBindingNonStream &&
originalSessionIdForBindingNonStream &&
accountId &&
accountType === 'claude-official'
) {
// 🚫 新会话必须 messages.length === 1
const messages = req.body?.messages
if (messages && messages.length > 1) {
// 🚫 检测旧会话(污染的会话)
if (isOldSession(req.body)) {
const cfg = await claudeRelayConfigService.getConfig()
logger.warn(
`🚫 New session with messages.length > 1 rejected (non-stream): sessionId=${originalSessionIdForBindingNonStream}, messages.length=${messages.length}`
`🚫 Old session rejected (non-stream): sessionId=${originalSessionIdForBindingNonStream}, messages.length=${req.body?.messages?.length}, tools.length=${req.body?.tools?.length || 0}, isOldSession=true`
)
return res.status(400).json({
error: {
@@ -802,6 +946,15 @@ async function handleMessagesRequest(req, res) {
bodyLength: response.body ? response.body.length : 0
})
// 🔍 检查客户端连接是否仍然有效
// 在长时间请求过程中,客户端可能已经断开连接(超时、用户取消等)
if (res.destroyed || res.socket?.destroyed || res.writableEnded) {
logger.warn(
`⚠️ Client disconnected before non-stream response could be sent for key: ${req.apiKey?.name || 'unknown'}`
)
return undefined
}
res.status(response.statusCode)
// 设置响应头,避免 Content-Length 和 Transfer-Encoding 冲突
@@ -867,10 +1020,12 @@ async function handleMessagesRequest(req, res) {
logger.warn('⚠️ No usage data found in Claude API JSON response')
}
// 使用 Express 内建的 res.json() 发送响应(简单可靠)
res.json(jsonData)
} catch (parseError) {
logger.warn('⚠️ Failed to parse Claude API response as JSON:', parseError.message)
logger.info('📄 Raw response body:', response.body)
// 使用 Express 内建的 res.send() 发送响应(简单可靠)
res.send(response.body)
}
@@ -1157,6 +1312,41 @@ router.post('/v1/messages/count_tokens', authenticateApiKey, async (req, res) =>
})
}
// 🔗 会话绑定验证(与 messages 端点保持一致)
const originalSessionId = claudeRelayConfigService.extractOriginalSessionId(req.body)
const sessionValidation = await claudeRelayConfigService.validateNewSession(
req.body,
originalSessionId
)
if (!sessionValidation.valid) {
logger.warn(
`🚫 Session binding validation failed (count_tokens): ${sessionValidation.code} for session ${originalSessionId}`
)
return res.status(400).json({
error: {
type: 'session_binding_error',
message: sessionValidation.error
}
})
}
// 🔗 检测旧会话(污染的会话)- 仅对需要绑定的新会话检查
if (sessionValidation.isNewSession && originalSessionId) {
if (isOldSession(req.body)) {
const cfg = await claudeRelayConfigService.getConfig()
logger.warn(
`🚫 Old session rejected (count_tokens): sessionId=${originalSessionId}, messages.length=${req.body?.messages?.length}, tools.length=${req.body?.tools?.length || 0}, isOldSession=true`
)
return res.status(400).json({
error: {
type: 'session_binding_error',
message: cfg.sessionBindingErrorMessage || '你的本地session已污染请清理后使用。'
}
})
}
}
logger.info(`🔢 Processing token count request for key: ${req.apiKey.name}`)
const sessionHash = sessionHelper.generateSessionHash(req.body)

View File

@@ -206,74 +206,85 @@ router.post('/api/user-stats', async (req, res) => {
// 获取验证结果中的完整keyData包含isActive状态和cost信息
const fullKeyData = keyData
// 计算总费用 - 使用与模型统计相同的逻辑(按模型分别计算)
// 🔧 FIX: 使用 allTimeCost 而不是扫描月度键
// 计算总费用 - 优先使用持久化的总费用计数器
let totalCost = 0
let formattedCost = '$0.000000'
try {
const client = redis.getClientSafe()
// 获取所有月度模型统计与model-stats接口相同的逻辑
const allModelKeys = await client.keys(`usage:${keyId}:model:monthly:*:*`)
const modelUsageMap = new Map()
// 读取累积的总费用(没有 TTL 的持久键
const totalCostKey = `usage:cost:total:${keyId}`
const allTimeCost = parseFloat((await client.get(totalCostKey)) || '0')
for (const key of allModelKeys) {
const modelMatch = key.match(/usage:.+:model:monthly:(.+):(\d{4}-\d{2})$/)
if (!modelMatch) {
continue
}
if (allTimeCost > 0) {
totalCost = allTimeCost
formattedCost = CostCalculator.formatCost(allTimeCost)
logger.debug(`📊 使用 allTimeCost 计算用户统计: ${allTimeCost}`)
} else {
// Fallback: 如果 allTimeCost 为空(旧键),尝试月度键
const allModelKeys = await client.keys(`usage:${keyId}:model:monthly:*:*`)
const modelUsageMap = new Map()
const model = modelMatch[1]
const data = await client.hgetall(key)
if (data && Object.keys(data).length > 0) {
if (!modelUsageMap.has(model)) {
modelUsageMap.set(model, {
inputTokens: 0,
outputTokens: 0,
cacheCreateTokens: 0,
cacheReadTokens: 0
})
for (const key of allModelKeys) {
const modelMatch = key.match(/usage:.+:model:monthly:(.+):(\d{4}-\d{2})$/)
if (!modelMatch) {
continue
}
const modelUsage = modelUsageMap.get(model)
modelUsage.inputTokens += parseInt(data.inputTokens) || 0
modelUsage.outputTokens += parseInt(data.outputTokens) || 0
modelUsage.cacheCreateTokens += parseInt(data.cacheCreateTokens) || 0
modelUsage.cacheReadTokens += parseInt(data.cacheReadTokens) || 0
}
}
const model = modelMatch[1]
const data = await client.hgetall(key)
// 按模型计算费用并汇总
for (const [model, usage] of modelUsageMap) {
const usageData = {
input_tokens: usage.inputTokens,
output_tokens: usage.outputTokens,
cache_creation_input_tokens: usage.cacheCreateTokens,
cache_read_input_tokens: usage.cacheReadTokens
if (data && Object.keys(data).length > 0) {
if (!modelUsageMap.has(model)) {
modelUsageMap.set(model, {
inputTokens: 0,
outputTokens: 0,
cacheCreateTokens: 0,
cacheReadTokens: 0
})
}
const modelUsage = modelUsageMap.get(model)
modelUsage.inputTokens += parseInt(data.inputTokens) || 0
modelUsage.outputTokens += parseInt(data.outputTokens) || 0
modelUsage.cacheCreateTokens += parseInt(data.cacheCreateTokens) || 0
modelUsage.cacheReadTokens += parseInt(data.cacheReadTokens) || 0
}
}
const costResult = CostCalculator.calculateCost(usageData, model)
totalCost += costResult.costs.total
}
// 按模型计算费用并汇总
for (const [model, usage] of modelUsageMap) {
const usageData = {
input_tokens: usage.inputTokens,
output_tokens: usage.outputTokens,
cache_creation_input_tokens: usage.cacheCreateTokens,
cache_read_input_tokens: usage.cacheReadTokens
}
// 如果没有模型级别的详细数据,回退到总体数据计算
if (modelUsageMap.size === 0 && fullKeyData.usage?.total?.allTokens > 0) {
const usage = fullKeyData.usage.total
const costUsage = {
input_tokens: usage.inputTokens || 0,
output_tokens: usage.outputTokens || 0,
cache_creation_input_tokens: usage.cacheCreateTokens || 0,
cache_read_input_tokens: usage.cacheReadTokens || 0
const costResult = CostCalculator.calculateCost(usageData, model)
totalCost += costResult.costs.total
}
const costResult = CostCalculator.calculateCost(costUsage, 'claude-3-5-sonnet-20241022')
totalCost = costResult.costs.total
}
// 如果没有模型级别的详细数据,回退到总体数据计算
if (modelUsageMap.size === 0 && fullKeyData.usage?.total?.allTokens > 0) {
const usage = fullKeyData.usage.total
const costUsage = {
input_tokens: usage.inputTokens || 0,
output_tokens: usage.outputTokens || 0,
cache_creation_input_tokens: usage.cacheCreateTokens || 0,
cache_read_input_tokens: usage.cacheReadTokens || 0
}
formattedCost = CostCalculator.formatCost(totalCost)
const costResult = CostCalculator.calculateCost(costUsage, 'claude-3-5-sonnet-20241022')
totalCost = costResult.costs.total
}
formattedCost = CostCalculator.formatCost(totalCost)
}
} catch (error) {
logger.warn(`Failed to calculate detailed cost for key ${keyId}:`, error)
logger.warn(`Failed to calculate cost for key ${keyId}:`, error)
// 回退到简单计算
if (fullKeyData.usage?.total?.allTokens > 0) {
const usage = fullKeyData.usage.total