feat: droid平台账户数据统计及调度能力

This commit is contained in:
shaw
2025-10-10 15:13:45 +08:00
parent 2fc84a6aca
commit 42db271848
21 changed files with 1424 additions and 212 deletions

View File

@@ -1,8 +1,11 @@
const https = require('https')
const axios = require('axios')
const ProxyHelper = require('../utils/proxyHelper')
const droidScheduler = require('./droidScheduler')
const droidAccountService = require('./droidAccountService')
const apiKeyService = require('./apiKeyService')
const redis = require('../models/redis')
const { updateRateLimitCounters } = require('../utils/rateLimitHelper')
const logger = require('../utils/logger')
const SYSTEM_PROMPT =
@@ -28,8 +31,7 @@ class DroidRelayService {
this.endpoints = {
anthropic: '/a/v1/messages',
openai: '/o/v1/responses',
common: '/o/v1/chat/completions'
openai: '/o/v1/responses'
}
this.userAgent = 'factory-cli/0.19.4'
@@ -45,6 +47,46 @@ class DroidRelayService {
})
}
_normalizeEndpointType(endpointType) {
if (!endpointType) {
return 'anthropic'
}
const normalized = String(endpointType).toLowerCase()
if (normalized === 'openai' || normalized === 'common') {
return 'openai'
}
if (normalized === 'anthropic') {
return 'anthropic'
}
return 'anthropic'
}
async _applyRateLimitTracking(rateLimitInfo, usageSummary, model, context = '') {
if (!rateLimitInfo) {
return
}
try {
const { totalTokens, totalCost } = await updateRateLimitCounters(
rateLimitInfo,
usageSummary,
model
)
if (totalTokens > 0) {
logger.api(`📊 Updated rate limit token count${context}: +${totalTokens}`)
}
if (typeof totalCost === 'number' && totalCost > 0) {
logger.api(`💰 Updated rate limit cost count${context}: +$${totalCost.toFixed(6)}`)
}
} catch (error) {
logger.error(`❌ Failed to update rate limit counters${context}:`, error)
}
}
async relayRequest(
requestBody,
apiKeyData,
@@ -53,26 +95,29 @@ class DroidRelayService {
clientHeaders,
options = {}
) {
const { endpointType = 'anthropic' } = options
const { endpointType = 'anthropic', sessionHash = null } = options
const keyInfo = apiKeyData || {}
const normalizedEndpoint = this._normalizeEndpointType(endpointType)
try {
logger.info(
`📤 Processing Droid API request for key: ${keyInfo.name || keyInfo.id || 'unknown'}, endpoint: ${endpointType}`
`📤 Processing Droid API request for key: ${
keyInfo.name || keyInfo.id || 'unknown'
}, endpoint: ${normalizedEndpoint}${sessionHash ? `, session: ${sessionHash}` : ''}`
)
// 选择一个可用的 Droid 账户
const account = await droidAccountService.selectAccount(endpointType)
// 选择一个可用的 Droid 账户(支持粘性会话和分组调度)
const account = await droidScheduler.selectAccount(keyInfo, normalizedEndpoint, sessionHash)
if (!account) {
throw new Error(`No available Droid account for endpoint type: ${endpointType}`)
throw new Error(`No available Droid account for endpoint type: ${normalizedEndpoint}`)
}
// 获取有效的 access token自动刷新
const accessToken = await droidAccountService.getValidAccessToken(account.id)
// 获取 Factory.ai API URL
const endpoint = this.endpoints[endpointType]
const endpoint = this.endpoints[normalizedEndpoint]
const apiUrl = `${this.factoryApiBaseUrl}${endpoint}`
logger.info(`🌐 Forwarding to Factory.ai: ${apiUrl}`)
@@ -86,10 +131,15 @@ class DroidRelayService {
}
// 构建请求头
const headers = this._buildHeaders(accessToken, requestBody, endpointType, clientHeaders)
const headers = this._buildHeaders(
accessToken,
requestBody,
normalizedEndpoint,
clientHeaders
)
// 处理请求体(注入 system prompt 等)
const processedBody = this._processRequestBody(requestBody, endpointType)
const processedBody = this._processRequestBody(requestBody, normalizedEndpoint)
// 发送请求
const isStreaming = processedBody.stream !== false
@@ -102,11 +152,12 @@ class DroidRelayService {
headers,
processedBody,
proxyAgent,
clientRequest,
clientResponse,
account,
keyInfo,
requestBody,
endpointType
normalizedEndpoint
)
} else {
// 非流式响应:使用 axios
@@ -128,7 +179,14 @@ class DroidRelayService {
logger.info(`✅ Factory.ai response status: ${response.status}`)
// 处理非流式响应
return this._handleNonStreamResponse(response, account, keyInfo, requestBody)
return this._handleNonStreamResponse(
response,
account,
keyInfo,
requestBody,
clientRequest,
normalizedEndpoint
)
}
} catch (error) {
logger.error(`❌ Droid relay error: ${error.message}`, error)
@@ -167,6 +225,7 @@ class DroidRelayService {
headers,
processedBody,
proxyAgent,
clientRequest,
clientResponse,
account,
apiKeyData,
@@ -181,6 +240,7 @@ class DroidRelayService {
...headers,
'content-length': contentLength.toString()
}
let responseStarted = false
let responseCompleted = false
let settled = false
@@ -298,12 +358,13 @@ class DroidRelayService {
// 转发数据到客户端
clientResponse.write(chunk)
hasForwardedData = true
// 解析 usage 数据(根据端点类型)
if (endpointType === 'anthropic') {
// Anthropic Messages API 格式
this._parseAnthropicUsageFromSSE(chunkStr, buffer, currentUsageData)
} else if (endpointType === 'openai' || endpointType === 'common') {
} else if (endpointType === 'openai') {
// OpenAI Chat Completions 格式
this._parseOpenAIUsageFromSSE(chunkStr, buffer, currentUsageData)
}
@@ -320,7 +381,26 @@ class DroidRelayService {
clientResponse.end()
// 记录 usage 数据
await this._recordUsageFromStreamData(currentUsageData, apiKeyData, account, model)
const normalizedUsage = await this._recordUsageFromStreamData(
currentUsageData,
apiKeyData,
account,
model
)
const usageSummary = {
inputTokens: normalizedUsage.input_tokens || 0,
outputTokens: normalizedUsage.output_tokens || 0,
cacheCreateTokens: normalizedUsage.cache_creation_input_tokens || 0,
cacheReadTokens: normalizedUsage.cache_read_input_tokens || 0
}
await this._applyRateLimitTracking(
clientRequest?.rateLimitInfo,
usageSummary,
model,
' [stream]'
)
logger.success(`✅ Droid stream completed - Account: ${account.name}`)
resolveOnce({ statusCode: 200, streaming: true })
@@ -432,7 +512,7 @@ class DroidRelayService {
const data = JSON.parse(jsonStr)
// OpenAI 格式在流结束时可能包含 usage
// 兼容传统 Chat Completions usage 字段
if (data.usage) {
currentUsageData.input_tokens = data.usage.prompt_tokens || 0
currentUsageData.output_tokens = data.usage.completion_tokens || 0
@@ -440,6 +520,17 @@ class DroidRelayService {
logger.debug('📊 Droid OpenAI usage:', currentUsageData)
}
// 新 Response API 在 response.usage 中返回统计
if (data.response && data.response.usage) {
const { usage } = data.response
currentUsageData.input_tokens =
usage.input_tokens || usage.prompt_tokens || usage.total_tokens || 0
currentUsageData.output_tokens = usage.output_tokens || usage.completion_tokens || 0
currentUsageData.total_tokens = usage.total_tokens || 0
logger.debug('📊 Droid OpenAI response usage:', currentUsageData)
}
} catch (parseError) {
// 忽略解析错误
}
@@ -471,7 +562,7 @@ class DroidRelayService {
return false
}
if (endpointType === 'openai' || endpointType === 'common') {
if (endpointType === 'openai') {
if (lower.includes('data: [done]')) {
return true
}
@@ -479,6 +570,17 @@ class DroidRelayService {
if (compact.includes('"finish_reason"')) {
return true
}
if (lower.includes('event: response.done') || lower.includes('event: response.completed')) {
return true
}
if (
compact.includes('"type":"response.done"') ||
compact.includes('"type":"response.completed"')
) {
return true
}
}
return false
@@ -488,23 +590,107 @@ class DroidRelayService {
* 记录从流中解析的 usage 数据
*/
async _recordUsageFromStreamData(usageData, apiKeyData, account, model) {
const inputTokens = usageData.input_tokens || 0
const outputTokens = usageData.output_tokens || 0
const cacheCreateTokens = usageData.cache_creation_input_tokens || 0
const cacheReadTokens = usageData.cache_read_input_tokens || 0
const totalTokens = inputTokens + outputTokens
const normalizedUsage = this._normalizeUsageSnapshot(usageData)
await this._recordUsage(apiKeyData, account, model, normalizedUsage)
return normalizedUsage
}
if (totalTokens > 0) {
await this._recordUsage(
apiKeyData,
account,
model,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
)
/**
* 标准化 usage 数据,确保字段完整且为数字
*/
_normalizeUsageSnapshot(usageData = {}) {
const toNumber = (value) => {
if (value === undefined || value === null || value === '') {
return 0
}
const num = Number(value)
if (!Number.isFinite(num)) {
return 0
}
return Math.max(0, num)
}
const inputTokens = toNumber(
usageData.input_tokens ??
usageData.prompt_tokens ??
usageData.inputTokens ??
usageData.total_input_tokens
)
const outputTokens = toNumber(
usageData.output_tokens ?? usageData.completion_tokens ?? usageData.outputTokens
)
const cacheReadTokens = toNumber(
usageData.cache_read_input_tokens ??
usageData.cacheReadTokens ??
usageData.input_tokens_details?.cached_tokens
)
const rawCacheCreateTokens =
usageData.cache_creation_input_tokens ??
usageData.cacheCreateTokens ??
usageData.cache_tokens ??
0
let cacheCreateTokens = toNumber(rawCacheCreateTokens)
const ephemeral5m = toNumber(
usageData.cache_creation?.ephemeral_5m_input_tokens ?? usageData.ephemeral_5m_input_tokens
)
const ephemeral1h = toNumber(
usageData.cache_creation?.ephemeral_1h_input_tokens ?? usageData.ephemeral_1h_input_tokens
)
if (cacheCreateTokens === 0 && (ephemeral5m > 0 || ephemeral1h > 0)) {
cacheCreateTokens = ephemeral5m + ephemeral1h
}
const normalized = {
input_tokens: inputTokens,
output_tokens: outputTokens,
cache_creation_input_tokens: cacheCreateTokens,
cache_read_input_tokens: cacheReadTokens
}
if (ephemeral5m > 0 || ephemeral1h > 0) {
normalized.cache_creation = {
ephemeral_5m_input_tokens: ephemeral5m,
ephemeral_1h_input_tokens: ephemeral1h
}
}
return normalized
}
/**
* 计算 usage 对象的总 token 数
*/
_getTotalTokens(usageObject = {}) {
const toNumber = (value) => {
if (value === undefined || value === null || value === '') {
return 0
}
const num = Number(value)
if (!Number.isFinite(num)) {
return 0
}
return Math.max(0, num)
}
return (
toNumber(usageObject.input_tokens) +
toNumber(usageObject.output_tokens) +
toNumber(usageObject.cache_creation_input_tokens) +
toNumber(usageObject.cache_read_input_tokens)
)
}
/**
* 提取账户 ID
*/
_extractAccountId(account) {
if (!account || typeof account !== 'object') {
return null
}
return account.id || account.accountId || account.account_id || null
}
/**
@@ -534,7 +720,7 @@ class DroidRelayService {
}
// OpenAI 特定头
if (endpointType === 'openai' || endpointType === 'common') {
if (endpointType === 'openai') {
headers['x-api-provider'] = 'azure_openai'
}
@@ -636,34 +822,40 @@ class DroidRelayService {
/**
* 处理非流式响应
*/
async _handleNonStreamResponse(response, account, apiKeyData, requestBody) {
async _handleNonStreamResponse(
response,
account,
apiKeyData,
requestBody,
clientRequest,
endpointType
) {
const { data } = response
// 从响应中提取 usage 数据
const usage = data.usage || {}
// Anthropic 格式
const inputTokens = usage.input_tokens || 0
const outputTokens = usage.output_tokens || 0
const cacheCreateTokens = usage.cache_creation_input_tokens || 0
const cacheReadTokens = usage.cache_read_input_tokens || 0
const totalTokens = inputTokens + outputTokens
const model = requestBody.model || 'unknown'
// 记录使用统计
if (totalTokens > 0) {
await this._recordUsage(
apiKeyData,
account,
model,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens
)
const normalizedUsage = this._normalizeUsageSnapshot(usage)
await this._recordUsage(apiKeyData, account, model, normalizedUsage)
const totalTokens = this._getTotalTokens(normalizedUsage)
const usageSummary = {
inputTokens: normalizedUsage.input_tokens || 0,
outputTokens: normalizedUsage.output_tokens || 0,
cacheCreateTokens: normalizedUsage.cache_creation_input_tokens || 0,
cacheReadTokens: normalizedUsage.cache_read_input_tokens || 0
}
await this._applyRateLimitTracking(
clientRequest?.rateLimitInfo,
usageSummary,
model,
endpointType === 'anthropic' ? ' [anthropic]' : ' [openai]'
)
logger.success(`✅ Droid request completed - Account: ${account.name}, Tokens: ${totalTokens}`)
return {
@@ -676,51 +868,38 @@ class DroidRelayService {
/**
* 记录使用统计
*/
async _recordUsage(
apiKeyData,
account,
model,
inputTokens,
outputTokens,
cacheCreateTokens = 0,
cacheReadTokens = 0
) {
const totalTokens = inputTokens + outputTokens
async _recordUsage(apiKeyData, account, model, usageObject = {}) {
const totalTokens = this._getTotalTokens(usageObject)
if (totalTokens <= 0) {
logger.debug('🪙 Droid usage 数据为空,跳过记录')
return
}
try {
const keyId = apiKeyData?.id
// 记录 API Key 级别的使用统计
const accountId = this._extractAccountId(account)
if (keyId) {
await redis.incrementTokenUsage(
keyId,
await apiKeyService.recordUsageWithDetails(keyId, usageObject, model, accountId, 'droid')
} else if (accountId) {
await redis.incrementAccountUsage(
accountId,
totalTokens,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
usageObject.input_tokens || 0,
usageObject.output_tokens || 0,
usageObject.cache_creation_input_tokens || 0,
usageObject.cache_read_input_tokens || 0,
model,
0, // ephemeral5mTokens
0, // ephemeral1hTokens
false // isLongContextRequest
false
)
} else {
logger.warn('⚠️ Skipping API Key usage recording: missing apiKeyData.id')
logger.warn('⚠️ 无法记录 Droid usage缺少 API Key 和账户标识')
return
}
// 记录账户级别的使用统计
await redis.incrementAccountUsage(
account.id,
totalTokens,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
model,
false // isLongContextRequest
)
logger.debug(
`📊 Droid usage recorded - Key: ${keyId || 'unknown'}, Account: ${account.id}, Model: ${model}, Input: ${inputTokens}, Output: ${outputTokens}, Total: ${totalTokens}`
`📊 Droid usage recorded - Key: ${keyId || 'unknown'}, Account: ${accountId || 'unknown'}, Model: ${model}, Input: ${usageObject.input_tokens || 0}, Output: ${usageObject.output_tokens || 0}, Cache Create: ${usageObject.cache_creation_input_tokens || 0}, Cache Read: ${usageObject.cache_read_input_tokens || 0}, Total: ${totalTokens}`
)
} catch (error) {
logger.error('❌ Failed to record Droid usage:', error)