Files
claude-relay-service/src/services/claudeConsoleRelayService.js

1456 lines
57 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const axios = require('axios')
const { v4: uuidv4 } = require('uuid')
const claudeConsoleAccountService = require('./claudeConsoleAccountService')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const config = require('../../config/config')
const {
sanitizeUpstreamError,
sanitizeErrorMessage,
isAccountDisabledError
} = require('../utils/errorSanitizer')
const userMessageQueueService = require('./userMessageQueueService')
const { isStreamWritable } = require('../utils/streamHelper')
const { filterForClaude } = require('../utils/headerFilter')
class ClaudeConsoleRelayService {
constructor() {
this.defaultUserAgent = 'claude-cli/2.0.52 (external, cli)'
}
// 🚀 转发请求到Claude Console API
async relayRequest(
requestBody,
apiKeyData,
clientRequest,
clientResponse,
clientHeaders,
accountId,
options = {}
) {
let abortController = null
let account = null
const requestId = uuidv4() // 用于并发追踪
let concurrencyAcquired = false
let queueLockAcquired = false
let queueRequestId = null
try {
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
if (userMessageQueueService.isUserMessageRequest(requestBody)) {
// 校验 accountId 非空,避免空值污染队列锁键
if (!accountId || accountId === '') {
logger.error('❌ accountId missing for queue lock in console relayRequest')
throw new Error('accountId missing for queue lock')
}
const queueResult = await userMessageQueueService.acquireQueueLock(accountId)
if (!queueResult.acquired && !queueResult.skipped) {
// 区分 Redis 后端错误和队列超时
const isBackendError = queueResult.error === 'queue_backend_error'
const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT'
const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout'
const errorMessage = isBackendError
? 'Queue service temporarily unavailable, please retry later'
: 'User message queue wait timeout, please retry later'
const statusCode = isBackendError ? 500 : 503
// 结构化性能日志,用于后续统计
logger.performance('user_message_queue_error', {
errorType,
errorCode,
accountId,
statusCode,
apiKeyName: apiKeyData.name,
backendError: isBackendError ? queueResult.errorMessage : undefined
})
logger.warn(
`📬 User message queue ${errorType} for console account ${accountId}, key: ${apiKeyData.name}`,
isBackendError ? { backendError: queueResult.errorMessage } : {}
)
return {
statusCode,
headers: {
'Content-Type': 'application/json',
'x-user-message-queue-error': errorType
},
body: JSON.stringify({
type: 'error',
error: {
type: errorType,
code: errorCode,
message: errorMessage
}
}),
accountId
}
}
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
logger.debug(
`📬 User message queue lock acquired for console account ${accountId}, requestId: ${queueRequestId}`
)
}
}
// 获取账户信息
account = await claudeConsoleAccountService.getAccount(accountId)
if (!account) {
throw new Error('Claude Console Claude account not found')
}
const autoProtectionDisabled = account.disableAutoProtection === true
logger.info(
`📤 Processing Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}`
)
// 🔒 并发控制:原子性抢占槽位
if (account.maxConcurrentTasks > 0) {
// 先抢占,再检查 - 避免竞态条件
const newConcurrency = Number(
await redis.incrConsoleAccountConcurrency(accountId, requestId, 600)
)
concurrencyAcquired = true
// 检查是否超过限制
if (newConcurrency > account.maxConcurrentTasks) {
// 超限,立即回滚
await redis.decrConsoleAccountConcurrency(accountId, requestId)
concurrencyAcquired = false
logger.warn(
`⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (request: ${requestId}, rolled back)`
)
const error = new Error('Console account concurrency limit reached')
error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL'
error.accountId = accountId
throw error
}
logger.debug(
`🔓 Acquired concurrency slot for account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}`
)
}
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`)
logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`)
logger.debug(`📝 Request model: ${requestBody.model}`)
// 处理模型映射
let mappedModel = requestBody.model
if (
account.supportedModels &&
typeof account.supportedModels === 'object' &&
!Array.isArray(account.supportedModels)
) {
const newModel = claudeConsoleAccountService.getMappedModel(
account.supportedModels,
requestBody.model
)
if (newModel !== requestBody.model) {
logger.info(`🔄 Mapping model from ${requestBody.model} to ${newModel}`)
mappedModel = newModel
}
}
// 创建修改后的请求体
const modifiedRequestBody = {
...requestBody,
model: mappedModel
}
// 模型兼容性检查已经在调度器中完成,这里不需要再检查
// 创建代理agent
const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy)
// 创建AbortController用于取消请求
abortController = new AbortController()
// 设置客户端断开监听器
const handleClientDisconnect = () => {
logger.info('🔌 Client disconnected, aborting Claude Console Claude request')
if (abortController && !abortController.signal.aborted) {
abortController.abort()
}
}
// 监听客户端断开事件
if (clientRequest) {
clientRequest.once('close', handleClientDisconnect)
}
if (clientResponse) {
clientResponse.once('close', handleClientDisconnect)
}
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠
let apiEndpoint
if (options.customPath) {
// 如果指定了自定义路径(如 count_tokens使用它
const baseUrl = cleanUrl.replace(/\/v1\/messages$/, '') // 移除已有的 /v1/messages
apiEndpoint = `${baseUrl}${options.customPath}`
} else {
// 默认使用 messages 端点
apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
}
logger.debug(`🎯 Final API endpoint: ${apiEndpoint}`)
logger.debug(`[DEBUG] Options passed to relayRequest: ${JSON.stringify(options)}`)
logger.debug(`[DEBUG] Client headers received: ${JSON.stringify(clientHeaders)}`)
// 过滤客户端请求头
const filteredHeaders = this._filterClientHeaders(clientHeaders)
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`)
// 决定使用的 User-Agent优先使用账户自定义的否则透传客户端的最后才使用默认值
const userAgent =
account.userAgent ||
clientHeaders?.['user-agent'] ||
clientHeaders?.['User-Agent'] ||
this.defaultUserAgent
// 准备请求配置
const requestConfig = {
method: 'POST',
url: apiEndpoint,
data: modifiedRequestBody,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': userAgent,
...filteredHeaders
},
timeout: config.requestTimeout || 600000,
signal: abortController.signal,
validateStatus: () => true // 接受所有状态码
}
if (proxyAgent) {
requestConfig.httpAgent = proxyAgent
requestConfig.httpsAgent = proxyAgent
requestConfig.proxy = false
}
// 根据 API Key 格式选择认证方式
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) {
// Anthropic 官方 API Key 使用 x-api-key
requestConfig.headers['x-api-key'] = account.apiKey
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key')
} else {
// 其他 API Key 使用 Authorization Bearer
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}`
logger.debug('[DEBUG] Using Authorization Bearer authentication')
}
logger.debug(
`[DEBUG] Initial headers before beta: ${JSON.stringify(requestConfig.headers, null, 2)}`
)
// 添加beta header如果需要
if (options.betaHeader) {
logger.debug(`[DEBUG] Adding beta header: ${options.betaHeader}`)
requestConfig.headers['anthropic-beta'] = options.betaHeader
} else {
logger.debug('[DEBUG] No beta header to add')
}
// 发送请求
logger.debug(
'📤 Sending request to Claude Console API with headers:',
JSON.stringify(requestConfig.headers, null, 2)
)
const response = await axios(requestConfig)
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为 Claude API 限流基于请求发送时刻计算RPM不是请求完成时刻
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for console account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for console account ${accountId}:`,
releaseError.message
)
}
}
// 移除监听器(请求成功完成)
if (clientRequest) {
clientRequest.removeListener('close', handleClientDisconnect)
}
if (clientResponse) {
clientResponse.removeListener('close', handleClientDisconnect)
}
logger.debug(`🔗 Claude Console API response: ${response.status}`)
logger.debug(`[DEBUG] Response headers: ${JSON.stringify(response.headers)}`)
logger.debug(`[DEBUG] Response data type: ${typeof response.data}`)
logger.debug(
`[DEBUG] Response data length: ${response.data ? (typeof response.data === 'string' ? response.data.length : JSON.stringify(response.data).length) : 0}`
)
// 对于错误响应,记录原始错误和清理后的预览
if (response.status < 200 || response.status >= 300) {
// 记录原始错误响应(包含供应商信息,用于调试)
const rawData =
typeof response.data === 'string' ? response.data : JSON.stringify(response.data)
logger.error(
`📝 Upstream error response from ${account?.name || accountId}: ${rawData.substring(0, 500)}`
)
// 记录清理后的数据到error
try {
const responseData =
typeof response.data === 'string' ? JSON.parse(response.data) : response.data
const sanitizedData = sanitizeUpstreamError(responseData)
logger.error(`🧹 [SANITIZED] Error response to client: ${JSON.stringify(sanitizedData)}`)
} catch (e) {
const rawText =
typeof response.data === 'string' ? response.data : JSON.stringify(response.data)
const sanitizedText = sanitizeErrorMessage(rawText)
logger.error(`🧹 [SANITIZED] Error response to client: ${sanitizedText}`)
}
} else {
logger.debug(
`[DEBUG] Response data preview: ${typeof response.data === 'string' ? response.data.substring(0, 200) : JSON.stringify(response.data).substring(0, 200)}`
)
}
// 检查是否为账户禁用/不可用的 400 错误
const accountDisabledError = isAccountDisabledError(response.status, response.data)
// 检查错误状态并相应处理
if (response.status === 401) {
logger.warn(
`🚫 Unauthorized error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markAccountUnauthorized(accountId)
}
} else if (accountDisabledError) {
logger.error(
`🚫 Account disabled error (400) detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
// 传入完整的错误详情到 webhook
const errorDetails =
typeof response.data === 'string' ? response.data : JSON.stringify(response.data)
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markConsoleAccountBlocked(accountId, errorDetails)
}
} else if (response.status === 429) {
logger.warn(
`🚫 Rate limit detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
// 收到429先检查是否因为超过了手动配置的每日额度
await claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => {
logger.error('❌ Failed to check quota after 429 error:', err)
})
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markAccountRateLimited(accountId)
}
} else if (response.status === 529) {
logger.warn(
`🚫 Overload error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markAccountOverloaded(accountId)
}
} else if (response.status === 200 || response.status === 201) {
// 如果请求成功,检查并移除错误状态
const isRateLimited = await claudeConsoleAccountService.isAccountRateLimited(accountId)
if (isRateLimited) {
await claudeConsoleAccountService.removeAccountRateLimit(accountId)
}
const isOverloaded = await claudeConsoleAccountService.isAccountOverloaded(accountId)
if (isOverloaded) {
await claudeConsoleAccountService.removeAccountOverload(accountId)
}
}
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
// 准备响应体并清理错误信息(如果是错误响应)
let responseBody
if (response.status < 200 || response.status >= 300) {
// 错误响应,清理供应商信息
try {
const responseData =
typeof response.data === 'string' ? JSON.parse(response.data) : response.data
const sanitizedData = sanitizeUpstreamError(responseData)
responseBody = JSON.stringify(sanitizedData)
logger.debug(`🧹 Sanitized error response`)
} catch (parseError) {
// 如果无法解析为JSON尝试清理文本
const rawText =
typeof response.data === 'string' ? response.data : JSON.stringify(response.data)
responseBody = sanitizeErrorMessage(rawText)
logger.debug(`🧹 Sanitized error text`)
}
} else {
// 成功响应,不需要清理
responseBody =
typeof response.data === 'string' ? response.data : JSON.stringify(response.data)
}
logger.debug(`[DEBUG] Final response body to return: ${responseBody.substring(0, 200)}...`)
return {
statusCode: response.status,
headers: response.headers,
body: responseBody,
accountId
}
} catch (error) {
// 处理特定错误
if (
error.name === 'AbortError' ||
error.name === 'CanceledError' ||
error.code === 'ECONNABORTED' ||
error.code === 'ERR_CANCELED'
) {
logger.info('Request aborted due to client disconnect')
throw new Error('Client disconnected')
}
logger.error(
`❌ Claude Console relay request failed (Account: ${account?.name || accountId}):`,
error.message
)
// 不再因为模型不支持而block账号
throw error
} finally {
// 🔓 并发控制:释放并发槽位
if (concurrencyAcquired) {
try {
await redis.decrConsoleAccountConcurrency(accountId, requestId)
logger.debug(
`🔓 Released concurrency slot for account ${account?.name || accountId}, request: ${requestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release concurrency slot for account ${accountId}, request: ${requestId}:`,
releaseError.message
)
}
}
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for console account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for account ${accountId}:`,
releaseError.message
)
}
}
}
}
// 🌊 处理流式响应
async relayStreamRequestWithUsageCapture(
requestBody,
apiKeyData,
responseStream,
clientHeaders,
usageCallback,
accountId,
streamTransformer = null,
options = {}
) {
let account = null
const requestId = uuidv4() // 用于并发追踪
let concurrencyAcquired = false
let leaseRefreshInterval = null // 租约刷新定时器
let queueLockAcquired = false
let queueRequestId = null
try {
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
if (userMessageQueueService.isUserMessageRequest(requestBody)) {
// 校验 accountId 非空,避免空值污染队列锁键
if (!accountId || accountId === '') {
logger.error(
'❌ accountId missing for queue lock in console relayStreamRequestWithUsageCapture'
)
throw new Error('accountId missing for queue lock')
}
const queueResult = await userMessageQueueService.acquireQueueLock(accountId)
if (!queueResult.acquired && !queueResult.skipped) {
// 区分 Redis 后端错误和队列超时
const isBackendError = queueResult.error === 'queue_backend_error'
const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT'
const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout'
const errorMessage = isBackendError
? 'Queue service temporarily unavailable, please retry later'
: 'User message queue wait timeout, please retry later'
const statusCode = isBackendError ? 500 : 503
// 结构化性能日志,用于后续统计
logger.performance('user_message_queue_error', {
errorType,
errorCode,
accountId,
statusCode,
stream: true,
apiKeyName: apiKeyData.name,
backendError: isBackendError ? queueResult.errorMessage : undefined
})
logger.warn(
`📬 User message queue ${errorType} for console account ${accountId} (stream), key: ${apiKeyData.name}`,
isBackendError ? { backendError: queueResult.errorMessage } : {}
)
if (!responseStream.headersSent) {
const existingConnection = responseStream.getHeader
? responseStream.getHeader('Connection')
: null
responseStream.writeHead(statusCode, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: existingConnection || 'keep-alive',
'x-user-message-queue-error': errorType
})
}
const errorEvent = `event: error\ndata: ${JSON.stringify({ type: 'error', error: { type: errorType, code: errorCode, message: errorMessage } })}\n\n`
responseStream.write(errorEvent)
responseStream.write('data: [DONE]\n\n')
responseStream.end()
return
}
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
logger.debug(
`📬 User message queue lock acquired for console account ${accountId} (stream), requestId: ${queueRequestId}`
)
}
}
// 获取账户信息
account = await claudeConsoleAccountService.getAccount(accountId)
if (!account) {
throw new Error('Claude Console Claude account not found')
}
logger.info(
`📡 Processing streaming Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}`
)
// 🔒 并发控制:原子性抢占槽位
if (account.maxConcurrentTasks > 0) {
// 先抢占,再检查 - 避免竞态条件
const newConcurrency = Number(
await redis.incrConsoleAccountConcurrency(accountId, requestId, 600)
)
concurrencyAcquired = true
// 检查是否超过限制
if (newConcurrency > account.maxConcurrentTasks) {
// 超限,立即回滚
await redis.decrConsoleAccountConcurrency(accountId, requestId)
concurrencyAcquired = false
logger.warn(
`⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (stream request: ${requestId}, rolled back)`
)
const error = new Error('Console account concurrency limit reached')
error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL'
error.accountId = accountId
throw error
}
logger.debug(
`🔓 Acquired concurrency slot for stream account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}`
)
// 🔄 启动租约刷新定时器每5分钟刷新一次防止长连接租约过期
leaseRefreshInterval = setInterval(
async () => {
try {
await redis.refreshConsoleAccountConcurrencyLease(accountId, requestId, 600)
logger.debug(
`🔄 Refreshed concurrency lease for stream account ${account.name} (${accountId}), request: ${requestId}`
)
} catch (refreshError) {
logger.error(
`❌ Failed to refresh concurrency lease for account ${accountId}, request: ${requestId}:`,
refreshError.message
)
}
},
5 * 60 * 1000
) // 5分钟刷新一次
}
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
// 处理模型映射
let mappedModel = requestBody.model
if (
account.supportedModels &&
typeof account.supportedModels === 'object' &&
!Array.isArray(account.supportedModels)
) {
const newModel = claudeConsoleAccountService.getMappedModel(
account.supportedModels,
requestBody.model
)
if (newModel !== requestBody.model) {
logger.info(`🔄 [Stream] Mapping model from ${requestBody.model} to ${newModel}`)
mappedModel = newModel
}
}
// 创建修改后的请求体
const modifiedRequestBody = {
...requestBody,
model: mappedModel
}
// 模型兼容性检查已经在调度器中完成,这里不需要再检查
// 创建代理agent
const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy)
// 发送流式请求
await this._makeClaudeConsoleStreamRequest(
modifiedRequestBody,
account,
proxyAgent,
clientHeaders,
responseStream,
accountId,
usageCallback,
streamTransformer,
options,
// 📬 回调:在收到响应头时释放队列锁
async () => {
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for console stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for console stream account ${accountId}:`,
releaseError.message
)
}
}
}
)
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
} catch (error) {
// 客户端主动断开连接是正常情况,使用 INFO 级别
if (error.message === 'Client disconnected') {
logger.info(
`🔌 Claude Console stream relay ended: Client disconnected (Account: ${account?.name || accountId})`
)
} else {
logger.error(
`❌ Claude Console stream relay failed (Account: ${account?.name || accountId}):`,
error
)
}
throw error
} finally {
// 🛑 清理租约刷新定时器
if (leaseRefreshInterval) {
clearInterval(leaseRefreshInterval)
logger.debug(
`🛑 Cleared lease refresh interval for stream account ${account?.name || accountId}, request: ${requestId}`
)
}
// 🔓 并发控制:释放并发槽位
if (concurrencyAcquired) {
try {
await redis.decrConsoleAccountConcurrency(accountId, requestId)
logger.debug(
`🔓 Released concurrency slot for stream account ${account?.name || accountId}, request: ${requestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release concurrency slot for stream account ${accountId}, request: ${requestId}:`,
releaseError.message
)
}
}
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for console stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for stream account ${accountId}:`,
releaseError.message
)
}
}
}
}
// 🌊 发送流式请求到Claude Console API
async _makeClaudeConsoleStreamRequest(
body,
account,
proxyAgent,
clientHeaders,
responseStream,
accountId,
usageCallback,
streamTransformer = null,
requestOptions = {},
onResponseHeaderReceived = null
) {
return new Promise((resolve, reject) => {
let aborted = false
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠
const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
logger.debug(`🎯 Final API endpoint for stream: ${apiEndpoint}`)
// 过滤客户端请求头
const filteredHeaders = this._filterClientHeaders(clientHeaders)
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`)
// 决定使用的 User-Agent优先使用账户自定义的否则透传客户端的最后才使用默认值
const userAgent =
account.userAgent ||
clientHeaders?.['user-agent'] ||
clientHeaders?.['User-Agent'] ||
this.defaultUserAgent
// 准备请求配置
const requestConfig = {
method: 'POST',
url: apiEndpoint,
data: body,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': userAgent,
...filteredHeaders
},
timeout: config.requestTimeout || 600000,
responseType: 'stream',
validateStatus: () => true // 接受所有状态码
}
if (proxyAgent) {
requestConfig.httpAgent = proxyAgent
requestConfig.httpsAgent = proxyAgent
requestConfig.proxy = false
}
// 根据 API Key 格式选择认证方式
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) {
// Anthropic 官方 API Key 使用 x-api-key
requestConfig.headers['x-api-key'] = account.apiKey
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key')
} else {
// 其他 API Key 使用 Authorization Bearer
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}`
logger.debug('[DEBUG] Using Authorization Bearer authentication')
}
// 添加beta header如果需要
if (requestOptions.betaHeader) {
requestConfig.headers['anthropic-beta'] = requestOptions.betaHeader
}
// 发送请求
const request = axios(requestConfig)
// 注意:使用 .then(async ...) 模式处理响应
// - 内部的 releaseQueueLock 有独立的 try-catch不会导致未捕获异常
// - queueLockAcquired = false 的赋值会在 finally 执行前完成JS 单线程保证)
request
.then(async (response) => {
logger.debug(`🌊 Claude Console Claude stream response status: ${response.status}`)
// 错误响应处理
if (response.status !== 200) {
logger.error(
`❌ Claude Console API returned error status: ${response.status} | Account: ${account?.name || accountId}`
)
// 收集错误数据用于检测
let errorDataForCheck = ''
const errorChunks = []
response.data.on('data', (chunk) => {
errorChunks.push(chunk)
errorDataForCheck += chunk.toString()
})
response.data.on('end', async () => {
const autoProtectionDisabled = account.disableAutoProtection === true
// 记录原始错误消息到日志(方便调试,包含供应商信息)
logger.error(
`📝 [Stream] Upstream error response from ${account?.name || accountId}: ${errorDataForCheck.substring(0, 500)}`
)
// 检查是否为账户禁用错误
const accountDisabledError = isAccountDisabledError(
response.status,
errorDataForCheck
)
if (response.status === 401) {
logger.warn(
`🚫 [Stream] Unauthorized error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markAccountUnauthorized(accountId)
}
} else if (accountDisabledError) {
logger.error(
`🚫 [Stream] Account disabled error (400) detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
// 传入完整的错误详情到 webhook
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markConsoleAccountBlocked(
accountId,
errorDataForCheck
)
}
} else if (response.status === 429) {
logger.warn(
`🚫 [Stream] Rate limit detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
// 检查是否因为超过每日额度
claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => {
logger.error('❌ Failed to check quota after 429 error:', err)
})
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markAccountRateLimited(accountId)
}
} else if (response.status === 529) {
logger.warn(
`🚫 [Stream] Overload error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}`
)
if (!autoProtectionDisabled) {
await claudeConsoleAccountService.markAccountOverloaded(accountId)
}
}
// 设置响应头
if (!responseStream.headersSent) {
responseStream.writeHead(response.status, {
'Content-Type': 'application/json',
'Cache-Control': 'no-cache'
})
}
// 清理并发送错误响应
try {
const fullErrorData = Buffer.concat(errorChunks).toString()
const errorJson = JSON.parse(fullErrorData)
const sanitizedError = sanitizeUpstreamError(errorJson)
// 记录清理后的错误消息(发送给客户端的,完整记录)
logger.error(
`🧹 [Stream] [SANITIZED] Error response to client: ${JSON.stringify(sanitizedError)}`
)
if (isStreamWritable(responseStream)) {
responseStream.write(JSON.stringify(sanitizedError))
responseStream.end()
}
} catch (parseError) {
const sanitizedText = sanitizeErrorMessage(errorDataForCheck)
logger.error(`🧹 [Stream] [SANITIZED] Error response to client: ${sanitizedText}`)
if (isStreamWritable(responseStream)) {
responseStream.write(sanitizedText)
responseStream.end()
}
}
resolve() // 不抛出异常,正常完成流处理
})
return
}
// 📬 收到成功响应头HTTP 200调用回调释放队列锁
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') {
try {
await onResponseHeaderReceived()
} catch (callbackError) {
logger.error(
`❌ Failed to execute onResponseHeaderReceived callback for console stream account ${accountId}:`,
callbackError.message
)
}
}
// 成功响应,检查并移除错误状态
claudeConsoleAccountService.isAccountRateLimited(accountId).then((isRateLimited) => {
if (isRateLimited) {
claudeConsoleAccountService.removeAccountRateLimit(accountId)
}
})
claudeConsoleAccountService.isAccountOverloaded(accountId).then((isOverloaded) => {
if (isOverloaded) {
claudeConsoleAccountService.removeAccountOverload(accountId)
}
})
// 设置响应头
// ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close
// 当并发队列功能启用时auth.js 会设置 Connection: close 来禁用 Keep-Alive
if (!responseStream.headersSent) {
const existingConnection = responseStream.getHeader
? responseStream.getHeader('Connection')
: null
const connectionHeader = existingConnection || 'keep-alive'
if (existingConnection) {
logger.debug(
`🔌 [Console Stream] Preserving existing Connection header: ${existingConnection}`
)
}
responseStream.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: connectionHeader,
'X-Accel-Buffering': 'no'
})
}
let buffer = ''
let finalUsageReported = false
const collectedUsageData = {
model: body.model || account?.defaultModel || null
}
// 处理流数据
response.data.on('data', (chunk) => {
try {
if (aborted) {
return
}
const chunkStr = chunk.toString()
buffer += chunkStr
// 处理完整的SSE行
const lines = buffer.split('\n')
buffer = lines.pop() || ''
// 转发数据并解析usage
if (lines.length > 0) {
// 检查流是否可写(客户端连接是否有效)
if (isStreamWritable(responseStream)) {
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '')
// 应用流转换器如果有
let dataToWrite = linesToForward
if (streamTransformer) {
const transformed = streamTransformer(linesToForward)
if (transformed) {
dataToWrite = transformed
} else {
dataToWrite = null
}
}
if (dataToWrite) {
responseStream.write(dataToWrite)
}
} else {
// 客户端连接已断开记录警告但仍继续解析usage
logger.warn(
`⚠️ [Console] Client disconnected during stream, skipping ${lines.length} lines for account: ${account?.name || accountId}`
)
}
// 解析SSE数据寻找usage信息无论连接状态如何
for (const line of lines) {
if (line.startsWith('data:')) {
const jsonStr = line.slice(5).trimStart()
if (!jsonStr || jsonStr === '[DONE]') {
continue
}
try {
const data = JSON.parse(jsonStr)
// 收集usage数据
if (data.type === 'message_start' && data.message && data.message.usage) {
collectedUsageData.input_tokens = data.message.usage.input_tokens || 0
collectedUsageData.cache_creation_input_tokens =
data.message.usage.cache_creation_input_tokens || 0
collectedUsageData.cache_read_input_tokens =
data.message.usage.cache_read_input_tokens || 0
collectedUsageData.model = data.message.model
// 检查是否有详细的 cache_creation 对象
if (
data.message.usage.cache_creation &&
typeof data.message.usage.cache_creation === 'object'
) {
collectedUsageData.cache_creation = {
ephemeral_5m_input_tokens:
data.message.usage.cache_creation.ephemeral_5m_input_tokens || 0,
ephemeral_1h_input_tokens:
data.message.usage.cache_creation.ephemeral_1h_input_tokens || 0
}
logger.info(
'📊 Collected detailed cache creation data:',
JSON.stringify(collectedUsageData.cache_creation)
)
}
}
if (data.type === 'message_delta' && data.usage) {
// 提取所有usage字段message_delta可能包含完整的usage信息
if (data.usage.output_tokens !== undefined) {
collectedUsageData.output_tokens = data.usage.output_tokens || 0
}
// 提取input_tokens如果存在
if (data.usage.input_tokens !== undefined) {
collectedUsageData.input_tokens = data.usage.input_tokens || 0
}
// 提取cache相关的tokens
if (data.usage.cache_creation_input_tokens !== undefined) {
collectedUsageData.cache_creation_input_tokens =
data.usage.cache_creation_input_tokens || 0
}
if (data.usage.cache_read_input_tokens !== undefined) {
collectedUsageData.cache_read_input_tokens =
data.usage.cache_read_input_tokens || 0
}
// 检查是否有详细的 cache_creation 对象
if (
data.usage.cache_creation &&
typeof data.usage.cache_creation === 'object'
) {
collectedUsageData.cache_creation = {
ephemeral_5m_input_tokens:
data.usage.cache_creation.ephemeral_5m_input_tokens || 0,
ephemeral_1h_input_tokens:
data.usage.cache_creation.ephemeral_1h_input_tokens || 0
}
}
logger.info(
'📊 [Console] Collected usage data from message_delta:',
JSON.stringify(collectedUsageData)
)
// 如果已经收集到了完整数据,触发回调
if (
collectedUsageData.input_tokens !== undefined &&
collectedUsageData.output_tokens !== undefined &&
!finalUsageReported
) {
if (!collectedUsageData.model) {
collectedUsageData.model = body.model || account?.defaultModel || null
}
logger.info(
'🎯 [Console] Complete usage data collected:',
JSON.stringify(collectedUsageData)
)
if (usageCallback && typeof usageCallback === 'function') {
usageCallback({ ...collectedUsageData, accountId })
}
finalUsageReported = true
}
}
// 不再因为模型不支持而block账号
} catch (e) {
// 忽略解析错误
}
}
}
}
} catch (error) {
logger.error(
`❌ Error processing Claude Console stream data (Account: ${account?.name || accountId}):`,
error
)
if (isStreamWritable(responseStream)) {
// 如果有 streamTransformer如测试请求使用前端期望的格式
if (streamTransformer) {
responseStream.write(
`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`
)
} else {
responseStream.write('event: error\n')
responseStream.write(
`data: ${JSON.stringify({
error: 'Stream processing error',
message: error.message,
timestamp: new Date().toISOString()
})}\n\n`
)
}
}
}
})
response.data.on('end', () => {
try {
// 处理缓冲区中剩余的数据
if (buffer.trim() && isStreamWritable(responseStream)) {
if (streamTransformer) {
const transformed = streamTransformer(buffer)
if (transformed) {
responseStream.write(transformed)
}
} else {
responseStream.write(buffer)
}
}
// 🔧 兜底逻辑确保所有未保存的usage数据都不会丢失
if (!finalUsageReported) {
if (
collectedUsageData.input_tokens !== undefined ||
collectedUsageData.output_tokens !== undefined
) {
// 补全缺失的字段
if (collectedUsageData.input_tokens === undefined) {
collectedUsageData.input_tokens = 0
logger.warn(
'⚠️ [Console] message_delta missing input_tokens, setting to 0. This may indicate incomplete usage data.'
)
}
if (collectedUsageData.output_tokens === undefined) {
collectedUsageData.output_tokens = 0
logger.warn(
'⚠️ [Console] message_delta missing output_tokens, setting to 0. This may indicate incomplete usage data.'
)
}
// 确保有 model 字段
if (!collectedUsageData.model) {
collectedUsageData.model = body.model || account?.defaultModel || null
}
logger.info(
`📊 [Console] Saving incomplete usage data via fallback: ${JSON.stringify(collectedUsageData)}`
)
if (usageCallback && typeof usageCallback === 'function') {
usageCallback({ ...collectedUsageData, accountId })
}
finalUsageReported = true
} else {
logger.warn(
'⚠️ [Console] Stream completed but no usage data was captured! This indicates a problem with SSE parsing or API response format.'
)
}
}
// 确保流正确结束
if (isStreamWritable(responseStream)) {
// 📊 诊断日志:流结束前状态
logger.info(
`📤 [STREAM] Ending response | destroyed: ${responseStream.destroyed}, ` +
`socketDestroyed: ${responseStream.socket?.destroyed}, ` +
`socketBytesWritten: ${responseStream.socket?.bytesWritten || 0}`
)
// 禁用 Nagle 算法确保数据立即发送
if (responseStream.socket && !responseStream.socket.destroyed) {
responseStream.socket.setNoDelay(true)
}
// 等待数据完全 flush 到客户端后再 resolve
responseStream.end(() => {
logger.info(
`✅ [STREAM] Response ended and flushed | socketBytesWritten: ${responseStream.socket?.bytesWritten || 'unknown'}`
)
resolve()
})
} else {
// 连接已断开,记录警告
logger.warn(
`⚠️ [Console] Client disconnected before stream end, data may not have been received | account: ${account?.name || accountId}`
)
resolve()
}
} catch (error) {
logger.error('❌ Error processing stream end:', error)
reject(error)
}
})
response.data.on('error', (error) => {
logger.error(
`❌ Claude Console stream error (Account: ${account?.name || accountId}):`,
error
)
if (isStreamWritable(responseStream)) {
// 如果有 streamTransformer如测试请求使用前端期望的格式
if (streamTransformer) {
responseStream.write(
`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`
)
} else {
responseStream.write('event: error\n')
responseStream.write(
`data: ${JSON.stringify({
error: 'Stream error',
message: error.message,
timestamp: new Date().toISOString()
})}\n\n`
)
}
responseStream.end()
}
reject(error)
})
})
.catch((error) => {
if (aborted) {
return
}
logger.error(
`❌ Claude Console stream request error (Account: ${account?.name || accountId}):`,
error.message
)
// 检查错误状态
if (error.response) {
if (error.response.status === 401) {
claudeConsoleAccountService.markAccountUnauthorized(accountId)
} else if (error.response.status === 429) {
claudeConsoleAccountService.markAccountRateLimited(accountId)
// 检查是否因为超过每日额度
claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => {
logger.error('❌ Failed to check quota after 429 error:', err)
})
} else if (error.response.status === 529) {
claudeConsoleAccountService.markAccountOverloaded(accountId)
}
}
// 发送错误响应
if (!responseStream.headersSent) {
const existingConnection = responseStream.getHeader
? responseStream.getHeader('Connection')
: null
responseStream.writeHead(error.response?.status || 500, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: existingConnection || 'keep-alive'
})
}
if (isStreamWritable(responseStream)) {
// 如果有 streamTransformer如测试请求使用前端期望的格式
if (streamTransformer) {
responseStream.write(
`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`
)
} else {
responseStream.write('event: error\n')
responseStream.write(
`data: ${JSON.stringify({
error: error.message,
code: error.code,
timestamp: new Date().toISOString()
})}\n\n`
)
}
responseStream.end()
}
reject(error)
})
// 处理客户端断开连接
responseStream.on('close', () => {
logger.debug('🔌 Client disconnected, cleaning up Claude Console stream')
aborted = true
})
})
}
// 🔧 过滤客户端请求头
_filterClientHeaders(clientHeaders) {
// 使用统一的 headerFilter 工具类(白名单模式)
// 与 claudeRelayService 保持一致,避免透传 CDN headers 触发上游 API 安全检查
return filterForClaude(clientHeaders)
}
// 🕐 更新最后使用时间
async _updateLastUsedTime(accountId) {
try {
const client = require('../models/redis').getClientSafe()
const accountKey = `claude_console_account:${accountId}`
const exists = await client.exists(accountKey)
if (!exists) {
logger.debug(`🔎 跳过更新已删除的Claude Console账号最近使用时间: ${accountId}`)
return
}
await client.hset(accountKey, 'lastUsedAt', new Date().toISOString())
} catch (error) {
logger.warn(
`⚠️ Failed to update last used time for Claude Console account ${accountId}:`,
error.message
)
}
}
// 🧪 创建测试用的流转换器,将 Claude API SSE 格式转换为前端期望的格式
_createTestStreamTransformer() {
let testStartSent = false
return (rawData) => {
const lines = rawData.split('\n')
const outputLines = []
for (const line of lines) {
if (!line.startsWith('data: ')) {
// 保留空行用于 SSE 分隔
if (line.trim() === '') {
outputLines.push('')
}
continue
}
const jsonStr = line.substring(6).trim()
if (!jsonStr || jsonStr === '[DONE]') {
continue
}
try {
const data = JSON.parse(jsonStr)
// 发送 test_start 事件(只在第一次 message_start 时发送)
if (data.type === 'message_start' && !testStartSent) {
testStartSent = true
outputLines.push(`data: ${JSON.stringify({ type: 'test_start' })}`)
outputLines.push('')
}
// 转换 content_block_delta 为 content
if (data.type === 'content_block_delta' && data.delta && data.delta.text) {
outputLines.push(`data: ${JSON.stringify({ type: 'content', text: data.delta.text })}`)
outputLines.push('')
}
// 转换 message_stop 为 test_complete
if (data.type === 'message_stop') {
outputLines.push(`data: ${JSON.stringify({ type: 'test_complete', success: true })}`)
outputLines.push('')
}
// 处理错误事件
if (data.type === 'error') {
const errorMsg = data.error?.message || data.message || '未知错误'
outputLines.push(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}`)
outputLines.push('')
}
} catch {
// 忽略解析错误
}
}
return outputLines.length > 0 ? outputLines.join('\n') : null
}
}
// 🧪 测试账号连接供Admin API使用
async testAccountConnection(accountId, responseStream) {
const { sendStreamTestRequest } = require('../utils/testPayloadHelper')
try {
const account = await claudeConsoleAccountService.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
logger.info(`🧪 Testing Claude Console account connection: ${account.name} (${accountId})`)
const cleanUrl = account.apiUrl.replace(/\/$/, '')
const apiUrl = cleanUrl.endsWith('/v1/messages')
? cleanUrl
: `${cleanUrl}/v1/messages?beta=true`
await sendStreamTestRequest({
apiUrl,
authorization: `Bearer ${account.apiKey}`,
responseStream,
proxyAgent: claudeConsoleAccountService._createProxyAgent(account.proxy),
extraHeaders: account.userAgent ? { 'User-Agent': account.userAgent } : {}
})
} catch (error) {
logger.error(`❌ Test account connection failed:`, error)
if (!responseStream.headersSent) {
responseStream.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
})
}
if (isStreamWritable(responseStream)) {
responseStream.write(
`data: ${JSON.stringify({ type: 'test_complete', success: false, error: error.message })}\n\n`
)
responseStream.end()
}
}
}
// 🎯 健康检查
async healthCheck() {
try {
const accounts = await claudeConsoleAccountService.getAllAccounts()
const activeAccounts = accounts.filter((acc) => acc.isActive && acc.status === 'active')
return {
healthy: activeAccounts.length > 0,
activeAccounts: activeAccounts.length,
totalAccounts: accounts.length,
timestamp: new Date().toISOString()
}
} catch (error) {
logger.error('❌ Claude Console Claude health check failed:', error)
return {
healthy: false,
error: error.message,
timestamp: new Date().toISOString()
}
}
}
}
module.exports = new ClaudeConsoleRelayService()