fix: 增强限流跟踪逻辑,支持预计算费用的传递,修复窗口限制计费错误

This commit is contained in:
sczheng189
2026-02-24 18:17:25 +08:00
parent 915544cc73
commit 4b220263eb
8 changed files with 339 additions and 164 deletions

View File

@@ -209,7 +209,13 @@ function ensureGeminiPermissionMiddleware(req, res, next) {
/**
* 应用速率限制跟踪
*/
async function applyRateLimitTracking(req, usageSummary, model, context = '') {
async function applyRateLimitTracking(
req,
usageSummary,
model,
context = '',
preCalculatedCost = null
) {
if (!req.rateLimitInfo) {
return
}
@@ -222,7 +228,8 @@ async function applyRateLimitTracking(req, usageSummary, model, context = '') {
usageSummary,
model,
req.apiKey?.id,
'gemini'
'gemini',
preCalculatedCost
)
if (totalTokens > 0) {
@@ -1705,7 +1712,7 @@ async function handleGenerateContent(req, res) {
if (response?.response?.usageMetadata) {
try {
const usage = response.response.usageMetadata
await apiKeyService.recordUsage(
const geminiNonStreamCosts = await apiKeyService.recordUsage(
req.apiKey.id,
usage.promptTokenCount || 0,
usage.candidatesTokenCount || 0,
@@ -1728,7 +1735,8 @@ async function handleGenerateContent(req, res) {
cacheReadTokens: 0
},
model,
'gemini-non-stream'
'gemini-non-stream',
geminiNonStreamCosts
)
} catch (error) {
logger.error('Failed to record Gemini usage:', error)
@@ -2053,8 +2061,8 @@ async function handleStreamGenerateContent(req, res) {
// 异步记录使用统计
if (!usageReported && totalUsage.totalTokenCount > 0) {
Promise.all([
apiKeyService.recordUsage(
apiKeyService
.recordUsage(
req.apiKey.id,
totalUsage.promptTokenCount || 0,
totalUsage.candidatesTokenCount || 0,
@@ -2063,19 +2071,21 @@ async function handleStreamGenerateContent(req, res) {
model,
account.id,
'gemini'
),
applyRateLimitTracking(
req,
{
inputTokens: totalUsage.promptTokenCount || 0,
outputTokens: totalUsage.candidatesTokenCount || 0,
cacheCreateTokens: 0,
cacheReadTokens: 0
},
model,
'gemini-stream'
)
])
.then((costs) =>
applyRateLimitTracking(
req,
{
inputTokens: totalUsage.promptTokenCount || 0,
outputTokens: totalUsage.candidatesTokenCount || 0,
cacheCreateTokens: 0,
cacheReadTokens: 0
},
model,
'gemini-stream',
costs
)
)
.then(() => {
logger.info(
`📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}`

View File

@@ -33,7 +33,8 @@ function queueRateLimitUpdate(
model,
context = '',
keyId = null,
accountType = null
accountType = null,
preCalculatedCost = null
) {
if (!rateLimitInfo) {
return Promise.resolve({ totalTokens: 0, totalCost: 0 })
@@ -41,7 +42,14 @@ function queueRateLimitUpdate(
const label = context ? ` (${context})` : ''
return updateRateLimitCounters(rateLimitInfo, usageSummary, model, keyId, accountType)
return updateRateLimitCounters(
rateLimitInfo,
usageSummary,
model,
keyId,
accountType,
preCalculatedCost
)
.then(({ totalTokens, totalCost }) => {
if (totalTokens > 0) {
logger.api(`📊 Updated rate limit token count${label}: +${totalTokens} tokens`)
@@ -492,24 +500,40 @@ async function handleMessagesRequest(req, res) {
apiKeyService
.recordUsageWithDetails(_apiKeyId, usageObject, model, usageAccountId, accountType)
.then((costs) => {
queueRateLimitUpdate(
_rateLimitInfo,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'claude-stream',
_apiKeyId,
accountType,
costs
)
})
.catch((error) => {
logger.error('❌ Failed to record stream usage:', error)
// Fallback: 仍然更新限流计数(使用 legacy 计算)
queueRateLimitUpdate(
_rateLimitInfo,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'claude-stream',
_apiKeyId,
accountType
)
})
queueRateLimitUpdate(
_rateLimitInfo,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'claude-stream',
_apiKeyId,
accountType
)
usageDataCaptured = true
logger.api(
`📊 Stream usage recorded (real) - Model: ${model}, Input: ${inputTokens}, Output: ${outputTokens}, Cache Create: ${cacheCreateTokens}, Cache Read: ${cacheReadTokens}, Total: ${inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens} tokens`
@@ -608,24 +632,39 @@ async function handleMessagesRequest(req, res) {
usageAccountId,
'claude-console'
)
.then((costs) => {
queueRateLimitUpdate(
_rateLimitInfoConsole,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'claude-console-stream',
_apiKeyIdConsole,
accountType,
costs
)
})
.catch((error) => {
logger.error('❌ Failed to record stream usage:', error)
queueRateLimitUpdate(
_rateLimitInfoConsole,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'claude-console-stream',
_apiKeyIdConsole,
accountType
)
})
queueRateLimitUpdate(
_rateLimitInfoConsole,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'claude-console-stream',
_apiKeyIdConsole,
accountType
)
usageDataCaptured = true
logger.api(
`📊 Stream usage recorded (real) - Model: ${model}, Input: ${inputTokens}, Output: ${outputTokens}, Cache Create: ${cacheCreateTokens}, Cache Read: ${cacheReadTokens}, Total: ${inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens} tokens`
@@ -674,24 +713,39 @@ async function handleMessagesRequest(req, res) {
accountId,
'bedrock'
)
.then((costs) => {
queueRateLimitUpdate(
_rateLimitInfoBedrock,
{
inputTokens,
outputTokens,
cacheCreateTokens: 0,
cacheReadTokens: 0
},
result.model,
'bedrock-stream',
_apiKeyIdBedrock,
'bedrock',
costs
)
})
.catch((error) => {
logger.error('❌ Failed to record Bedrock stream usage:', error)
queueRateLimitUpdate(
_rateLimitInfoBedrock,
{
inputTokens,
outputTokens,
cacheCreateTokens: 0,
cacheReadTokens: 0
},
result.model,
'bedrock-stream',
_apiKeyIdBedrock,
'bedrock'
)
})
queueRateLimitUpdate(
_rateLimitInfoBedrock,
{
inputTokens,
outputTokens,
cacheCreateTokens: 0,
cacheReadTokens: 0
},
result.model,
'bedrock-stream',
_apiKeyIdBedrock,
'bedrock'
)
usageDataCaptured = true
logger.api(
`📊 Bedrock stream usage recorded - Model: ${result.model}, Input: ${inputTokens}, Output: ${outputTokens}, Total: ${inputTokens + outputTokens} tokens`
@@ -781,24 +835,39 @@ async function handleMessagesRequest(req, res) {
apiKeyService
.recordUsageWithDetails(_apiKeyIdCcr, usageObject, model, usageAccountId, 'ccr')
.then((costs) => {
queueRateLimitUpdate(
_rateLimitInfoCcr,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'ccr-stream',
_apiKeyIdCcr,
'ccr',
costs
)
})
.catch((error) => {
logger.error('❌ Failed to record CCR stream usage:', error)
queueRateLimitUpdate(
_rateLimitInfoCcr,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'ccr-stream',
_apiKeyIdCcr,
'ccr'
)
})
queueRateLimitUpdate(
_rateLimitInfoCcr,
{
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
},
model,
'ccr-stream',
_apiKeyIdCcr,
'ccr'
)
usageDataCaptured = true
logger.api(
`📊 CCR stream usage recorded (real) - Model: ${model}, Input: ${inputTokens}, Output: ${outputTokens}, Cache Create: ${cacheCreateTokens}, Cache Read: ${cacheReadTokens}, Total: ${inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens} tokens`
@@ -1143,7 +1212,7 @@ async function handleMessagesRequest(req, res) {
// 记录真实的token使用量包含模型信息和所有4种token以及账户ID
const { accountId: responseAccountId } = response
await apiKeyService.recordUsage(
const nonStreamCosts = await apiKeyService.recordUsage(
_apiKeyIdNonStream,
inputTokens,
outputTokens,
@@ -1165,7 +1234,8 @@ async function handleMessagesRequest(req, res) {
model,
'claude-non-stream',
_apiKeyIdNonStream,
accountType
accountType,
nonStreamCosts
)
usageRecorded = true

View File

@@ -30,7 +30,8 @@ function queueRateLimitUpdate(
model,
context = '',
keyId = null,
accountType = null
accountType = null,
preCalculatedCost = null
) {
if (!rateLimitInfo) {
return
@@ -38,7 +39,7 @@ function queueRateLimitUpdate(
const label = context ? ` (${context})` : ''
updateRateLimitCounters(rateLimitInfo, usageSummary, model, keyId, accountType)
updateRateLimitCounters(rateLimitInfo, usageSummary, model, keyId, accountType, preCalculatedCost)
.then(({ totalTokens, totalCost }) => {
if (totalTokens > 0) {
logger.api(`📊 Updated rate limit token count${label}: +${totalTokens} tokens`)
@@ -306,23 +307,38 @@ async function handleChatCompletion(req, res, apiKeyData) {
accountId,
accountType
)
.then((costs) => {
queueRateLimitUpdate(
req.rateLimitInfo,
{
inputTokens: usage.input_tokens || 0,
outputTokens: usage.output_tokens || 0,
cacheCreateTokens,
cacheReadTokens
},
model,
`openai-${accountType}-stream`,
req.apiKey?.id,
accountType,
costs
)
})
.catch((error) => {
logger.error('❌ Failed to record usage:', error)
queueRateLimitUpdate(
req.rateLimitInfo,
{
inputTokens: usage.input_tokens || 0,
outputTokens: usage.output_tokens || 0,
cacheCreateTokens,
cacheReadTokens
},
model,
`openai-${accountType}-stream`,
req.apiKey?.id,
accountType
)
})
queueRateLimitUpdate(
req.rateLimitInfo,
{
inputTokens: usage.input_tokens || 0,
outputTokens: usage.output_tokens || 0,
cacheCreateTokens,
cacheReadTokens
},
model,
`openai-${accountType}-stream`,
req.apiKey?.id,
accountType
)
}
}
@@ -444,23 +460,38 @@ async function handleChatCompletion(req, res, apiKeyData) {
accountId,
accountType
)
.then((costs) => {
queueRateLimitUpdate(
req.rateLimitInfo,
{
inputTokens: usage.input_tokens || 0,
outputTokens: usage.output_tokens || 0,
cacheCreateTokens,
cacheReadTokens
},
claudeRequest.model,
`openai-${accountType}-non-stream`,
req.apiKey?.id,
accountType,
costs
)
})
.catch((error) => {
logger.error('❌ Failed to record usage:', error)
queueRateLimitUpdate(
req.rateLimitInfo,
{
inputTokens: usage.input_tokens || 0,
outputTokens: usage.output_tokens || 0,
cacheCreateTokens,
cacheReadTokens
},
claudeRequest.model,
`openai-${accountType}-non-stream`,
req.apiKey?.id,
accountType
)
})
queueRateLimitUpdate(
req.rateLimitInfo,
{
inputTokens: usage.input_tokens || 0,
outputTokens: usage.output_tokens || 0,
cacheCreateTokens,
cacheReadTokens
},
claudeRequest.model,
`openai-${accountType}-non-stream`,
req.apiKey?.id,
accountType
)
}
// 返回 OpenAI 格式响应

View File

@@ -70,7 +70,14 @@ function extractCodexUsageHeaders(headers) {
return hasData ? snapshot : null
}
async function applyRateLimitTracking(req, usageSummary, model, context = '', accountType = null) {
async function applyRateLimitTracking(
req,
usageSummary,
model,
context = '',
accountType = null,
preCalculatedCost = null
) {
if (!req.rateLimitInfo) {
return
}
@@ -83,7 +90,8 @@ async function applyRateLimitTracking(req, usageSummary, model, context = '', ac
usageSummary,
model,
req.apiKey?.id,
accountType
accountType,
preCalculatedCost
)
if (totalTokens > 0) {
@@ -613,7 +621,7 @@ const handleResponses = async (req, res) => {
// 计算实际输入token总输入减去缓存部分
const actualInputTokens = Math.max(0, totalInputTokens - cacheReadTokens)
await apiKeyService.recordUsage(
const nonStreamCosts = await apiKeyService.recordUsage(
apiKeyData.id,
actualInputTokens, // 传递实际输入(不含缓存)
outputTokens,
@@ -638,7 +646,8 @@ const handleResponses = async (req, res) => {
},
actualModel,
'openai-non-stream',
'openai'
'openai',
nonStreamCosts
)
}
@@ -729,7 +738,7 @@ const handleResponses = async (req, res) => {
// 使用响应中的真实 model如果没有则使用请求中的 model最后回退到默认值
const modelToRecord = actualModel || requestedModel || 'gpt-4'
await apiKeyService.recordUsage(
const streamCosts = await apiKeyService.recordUsage(
apiKeyData.id,
actualInputTokens, // 传递实际输入(不含缓存)
outputTokens,
@@ -755,7 +764,8 @@ const handleResponses = async (req, res) => {
},
modelToRecord,
'openai-stream',
'openai'
'openai',
streamCosts
)
} catch (error) {
logger.error('Failed to record OpenAI usage:', error)

View File

@@ -1805,7 +1805,8 @@ async function applyRateLimitTracking(
usageSummary,
model,
context = '',
keyId = null
keyId = null,
preCalculatedCost = null
) {
if (!rateLimitInfo) {
return
@@ -1819,7 +1820,8 @@ async function applyRateLimitTracking(
usageSummary,
model,
keyId,
'gemini'
'gemini',
preCalculatedCost
)
if (totalTokens > 0) {
logger.api(`📊 Updated rate limit token count${label}: +${totalTokens} tokens`)
@@ -2135,7 +2137,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
: mapGeminiFinishReasonToAnthropicStopReason(finishReason)
if (req.apiKey?.id && (inputTokens > 0 || outputTokens > 0)) {
await apiKeyService.recordUsage(
const bridgeCosts = await apiKeyService.recordUsage(
req.apiKey.id,
inputTokens,
outputTokens,
@@ -2150,7 +2152,8 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
{ inputTokens, outputTokens, cacheCreateTokens: 0, cacheReadTokens: 0 },
effectiveModel,
'anthropic-messages',
req.apiKey?.id
req.apiKey?.id,
bridgeCosts
)
}
@@ -2675,7 +2678,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
}
if (req.apiKey?.id && (inputTokens > 0 || outputTokens > 0)) {
await apiKeyService.recordUsage(
const bridgeStreamCosts = await apiKeyService.recordUsage(
req.apiKey.id,
inputTokens,
outputTokens,
@@ -2689,7 +2692,9 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
req.rateLimitInfo,
{ inputTokens, outputTokens, cacheCreateTokens: 0, cacheReadTokens: 0 },
effectiveModel,
'anthropic-messages-stream'
'anthropic-messages-stream',
req.apiKey?.id,
bridgeStreamCosts
)
}
}

View File

@@ -1662,8 +1662,11 @@ class ApiKeyService {
logParts.push(`Total: ${totalTokens} tokens`)
logger.database(`📊 Recorded usage: ${keyId} - ${logParts.join(', ')}`)
return { realCost, ratedCost }
} catch (error) {
logger.error('❌ Failed to record usage:', error)
return { realCost: 0, ratedCost: 0 }
}
}
@@ -1958,8 +1961,11 @@ class ApiKeyService {
// 发布失败不影响主流程,只记录错误
logger.warn('⚠️ Failed to publish billing event:', err.message)
})
return { realCost: realCostWithDetails, ratedCost: ratedCostWithDetails }
} catch (error) {
logger.error('❌ Failed to record usage:', error)
return { realCost: 0, ratedCost: 0 }
}
}

View File

@@ -91,7 +91,14 @@ class DroidRelayService {
return normalizedBody
}
async _applyRateLimitTracking(rateLimitInfo, usageSummary, model, context = '', keyId = null) {
async _applyRateLimitTracking(
rateLimitInfo,
usageSummary,
model,
context = '',
keyId = null,
preCalculatedCost = null
) {
if (!rateLimitInfo) {
return
}
@@ -102,7 +109,8 @@ class DroidRelayService {
usageSummary,
model,
keyId,
'droid'
'droid',
preCalculatedCost
)
if (totalTokens > 0) {
@@ -616,7 +624,7 @@ class DroidRelayService {
// 记录 usage 数据
if (!skipUsageRecord) {
const normalizedUsage = await this._recordUsageFromStreamData(
const { normalizedUsage, costs: streamCosts } = await this._recordUsageFromStreamData(
currentUsageData,
apiKeyData,
account,
@@ -635,7 +643,8 @@ class DroidRelayService {
usageSummary,
model,
' [stream]',
keyId
keyId,
streamCosts
)
logger.success(`Droid stream completed - Account: ${account.name}`)
@@ -871,8 +880,8 @@ class DroidRelayService {
*/
async _recordUsageFromStreamData(usageData, apiKeyData, account, model) {
const normalizedUsage = this._normalizeUsageSnapshot(usageData)
await this._recordUsage(apiKeyData, account, model, normalizedUsage)
return normalizedUsage
const costs = await this._recordUsage(apiKeyData, account, model, normalizedUsage)
return { normalizedUsage, costs }
}
/**
@@ -1234,7 +1243,7 @@ class DroidRelayService {
const normalizedUsage = this._normalizeUsageSnapshot(usage)
if (!skipUsageRecord) {
await this._recordUsage(apiKeyData, account, model, normalizedUsage)
const droidCosts = await this._recordUsage(apiKeyData, account, model, normalizedUsage)
const totalTokens = this._getTotalTokens(normalizedUsage)
@@ -1256,7 +1265,8 @@ class DroidRelayService {
usageSummary,
model,
endpointLabel,
keyId
keyId,
droidCosts
)
logger.success(
@@ -1283,15 +1293,22 @@ class DroidRelayService {
if (totalTokens <= 0) {
logger.debug('🪙 Droid usage 数据为空,跳过记录')
return
return { realCost: 0, ratedCost: 0 }
}
try {
const keyId = apiKeyData?.id
const accountId = this._extractAccountId(account)
let costs = { realCost: 0, ratedCost: 0 }
if (keyId) {
await apiKeyService.recordUsageWithDetails(keyId, usageObject, model, accountId, 'droid')
costs = await apiKeyService.recordUsageWithDetails(
keyId,
usageObject,
model,
accountId,
'droid'
)
} else if (accountId) {
await redis.incrementAccountUsage(
accountId,
@@ -1307,14 +1324,17 @@ class DroidRelayService {
)
} else {
logger.warn('⚠️ 无法记录 Droid usage缺少 API Key 和账户标识')
return
return { realCost: 0, ratedCost: 0 }
}
logger.debug(
`📊 Droid usage recorded - Key: ${keyId || 'unknown'}, Account: ${accountId || 'unknown'}, Model: ${model}, Input: ${usageObject.input_tokens || 0}, Output: ${usageObject.output_tokens || 0}, Cache Create: ${usageObject.cache_creation_input_tokens || 0}, Cache Read: ${usageObject.cache_read_input_tokens || 0}, Total: ${totalTokens}`
)
return costs
} catch (error) {
logger.error('❌ Failed to record Droid usage:', error)
return { realCost: 0, ratedCost: 0 }
}
}

View File

@@ -8,12 +8,14 @@ function toNumber(value) {
}
// keyId 和 accountType 用于计算倍率成本
// preCalculatedCost: 可选的 { realCost, ratedCost },由调用方提供以避免重复计算
async function updateRateLimitCounters(
rateLimitInfo,
usageSummary,
model,
keyId = null,
accountType = null
accountType = null,
preCalculatedCost = null
) {
if (!rateLimitInfo) {
return { totalTokens: 0, totalCost: 0, ratedCost: 0 }
@@ -36,47 +38,68 @@ async function updateRateLimitCounters(
}
let totalCost = 0
const usagePayload = {
input_tokens: inputTokens,
output_tokens: outputTokens,
cache_creation_input_tokens: cacheCreateTokens,
cache_read_input_tokens: cacheReadTokens
}
let ratedCost = 0
try {
const costInfo = pricingService.calculateCost(usagePayload, model)
const { totalCost: calculatedCost } = costInfo || {}
if (typeof calculatedCost === 'number') {
totalCost = calculatedCost
if (
preCalculatedCost &&
typeof preCalculatedCost.ratedCost === 'number' &&
preCalculatedCost.ratedCost > 0
) {
// 使用调用方已计算好的费用(避免重复计算,且能正确处理 1h 缓存、Fast Mode 等特殊计费)
// eslint-disable-next-line prefer-destructuring
ratedCost = preCalculatedCost.ratedCost
totalCost = preCalculatedCost.realCost || 0
} else if (
preCalculatedCost &&
typeof preCalculatedCost.realCost === 'number' &&
preCalculatedCost.realCost > 0
) {
// 有 realCost 但 ratedCost 为 0 或缺失,使用 realCost
totalCost = preCalculatedCost.realCost
ratedCost = preCalculatedCost.realCost
} else {
// Legacy fallback调用方未提供费用时自行计算不支持 1h 缓存等特殊计费)
const usagePayload = {
input_tokens: inputTokens,
output_tokens: outputTokens,
cache_creation_input_tokens: cacheCreateTokens,
cache_read_input_tokens: cacheReadTokens
}
} catch (error) {
// 忽略此处错误,后续使用备用计算
totalCost = 0
}
if (totalCost === 0) {
try {
const fallback = CostCalculator.calculateCost(usagePayload, model)
const { costs } = fallback || {}
if (costs && typeof costs.total === 'number') {
totalCost = costs.total
const costInfo = pricingService.calculateCost(usagePayload, model)
const { totalCost: calculatedCost } = costInfo || {}
if (typeof calculatedCost === 'number') {
totalCost = calculatedCost
}
} catch (error) {
// 忽略此处错误,后续使用备用计算
totalCost = 0
}
}
// 计算倍率成本(用于限流计数)
let ratedCost = totalCost
if (totalCost > 0 && keyId) {
try {
const apiKeyService = require('../services/apiKeyService')
const serviceRatesService = require('../services/serviceRatesService')
const service = serviceRatesService.getService(accountType, model)
ratedCost = await apiKeyService.calculateRatedCost(keyId, service, totalCost)
} catch (error) {
// 倍率计算失败时使用真实成本
ratedCost = totalCost
if (totalCost === 0) {
try {
const fallback = CostCalculator.calculateCost(usagePayload, model)
const { costs } = fallback || {}
if (costs && typeof costs.total === 'number') {
totalCost = costs.total
}
} catch (error) {
totalCost = 0
}
}
// 计算倍率成本(用于限流计数)
ratedCost = totalCost
if (totalCost > 0 && keyId) {
try {
const apiKeyService = require('../services/apiKeyService')
const serviceRatesService = require('../services/serviceRatesService')
const service = serviceRatesService.getService(accountType, model)
ratedCost = await apiKeyService.calculateRatedCost(keyId, service, totalCost)
} catch (error) {
ratedCost = totalCost
}
}
}