const axios = require('axios') const { v4: uuidv4 } = require('uuid') const claudeConsoleAccountService = require('./claudeConsoleAccountService') const redis = require('../models/redis') const logger = require('../utils/logger') const config = require('../../config/config') const { sanitizeUpstreamError, sanitizeErrorMessage, isAccountDisabledError } = require('../utils/errorSanitizer') const userMessageQueueService = require('./userMessageQueueService') const { isStreamWritable } = require('../utils/streamHelper') const { filterForClaude } = require('../utils/headerFilter') class ClaudeConsoleRelayService { constructor() { this.defaultUserAgent = 'claude-cli/2.0.52 (external, cli)' } // 🚀 转发请求到Claude Console API async relayRequest( requestBody, apiKeyData, clientRequest, clientResponse, clientHeaders, accountId, options = {} ) { let abortController = null let account = null const requestId = uuidv4() // 用于并发追踪 let concurrencyAcquired = false let queueLockAcquired = false let queueRequestId = null try { // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 if (userMessageQueueService.isUserMessageRequest(requestBody)) { // 校验 accountId 非空,避免空值污染队列锁键 if (!accountId || accountId === '') { logger.error('❌ accountId missing for queue lock in console relayRequest') throw new Error('accountId missing for queue lock') } const queueResult = await userMessageQueueService.acquireQueueLock(accountId) if (!queueResult.acquired && !queueResult.skipped) { // 区分 Redis 后端错误和队列超时 const isBackendError = queueResult.error === 'queue_backend_error' const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' const errorMessage = isBackendError ? 'Queue service temporarily unavailable, please retry later' : 'User message queue wait timeout, please retry later' const statusCode = isBackendError ? 500 : 503 // 结构化性能日志,用于后续统计 logger.performance('user_message_queue_error', { errorType, errorCode, accountId, statusCode, apiKeyName: apiKeyData.name, backendError: isBackendError ? queueResult.errorMessage : undefined }) logger.warn( `📬 User message queue ${errorType} for console account ${accountId}, key: ${apiKeyData.name}`, isBackendError ? { backendError: queueResult.errorMessage } : {} ) return { statusCode, headers: { 'Content-Type': 'application/json', 'x-user-message-queue-error': errorType }, body: JSON.stringify({ type: 'error', error: { type: errorType, code: errorCode, message: errorMessage } }), accountId } } if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId logger.debug( `📬 User message queue lock acquired for console account ${accountId}, requestId: ${queueRequestId}` ) } } // 获取账户信息 account = await claudeConsoleAccountService.getAccount(accountId) if (!account) { throw new Error('Claude Console Claude account not found') } const autoProtectionDisabled = account.disableAutoProtection === true logger.info( `📤 Processing Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}` ) // 🔒 并发控制:原子性抢占槽位 if (account.maxConcurrentTasks > 0) { // 先抢占,再检查 - 避免竞态条件 const newConcurrency = Number( await redis.incrConsoleAccountConcurrency(accountId, requestId, 600) ) concurrencyAcquired = true // 检查是否超过限制 if (newConcurrency > account.maxConcurrentTasks) { // 超限,立即回滚 await redis.decrConsoleAccountConcurrency(accountId, requestId) concurrencyAcquired = false logger.warn( `⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (request: ${requestId}, rolled back)` ) const error = new Error('Console account concurrency limit reached') error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL' error.accountId = accountId throw error } logger.debug( `🔓 Acquired concurrency slot for account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}` ) } logger.debug(`🌐 Account API URL: ${account.apiUrl}`) logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`) logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`) logger.debug(`📝 Request model: ${requestBody.model}`) // 处理模型映射 let mappedModel = requestBody.model if ( account.supportedModels && typeof account.supportedModels === 'object' && !Array.isArray(account.supportedModels) ) { const newModel = claudeConsoleAccountService.getMappedModel( account.supportedModels, requestBody.model ) if (newModel !== requestBody.model) { logger.info(`🔄 Mapping model from ${requestBody.model} to ${newModel}`) mappedModel = newModel } } // 创建修改后的请求体 const modifiedRequestBody = { ...requestBody, model: mappedModel } // 模型兼容性检查已经在调度器中完成,这里不需要再检查 // 创建代理agent const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy) // 创建AbortController用于取消请求 abortController = new AbortController() // 设置客户端断开监听器 const handleClientDisconnect = () => { logger.info('🔌 Client disconnected, aborting Claude Console Claude request') if (abortController && !abortController.signal.aborted) { abortController.abort() } } // 监听客户端断开事件 if (clientRequest) { clientRequest.once('close', handleClientDisconnect) } if (clientResponse) { clientResponse.once('close', handleClientDisconnect) } // 构建完整的API URL const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠 let apiEndpoint if (options.customPath) { // 如果指定了自定义路径(如 count_tokens),使用它 const baseUrl = cleanUrl.replace(/\/v1\/messages$/, '') // 移除已有的 /v1/messages apiEndpoint = `${baseUrl}${options.customPath}` } else { // 默认使用 messages 端点 apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` } logger.debug(`🎯 Final API endpoint: ${apiEndpoint}`) logger.debug(`[DEBUG] Options passed to relayRequest: ${JSON.stringify(options)}`) logger.debug(`[DEBUG] Client headers received: ${JSON.stringify(clientHeaders)}`) // 过滤客户端请求头 const filteredHeaders = this._filterClientHeaders(clientHeaders) logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`) // 决定使用的 User-Agent:优先使用账户自定义的,否则透传客户端的,最后才使用默认值 const userAgent = account.userAgent || clientHeaders?.['user-agent'] || clientHeaders?.['User-Agent'] || this.defaultUserAgent // 准备请求配置 const requestConfig = { method: 'POST', url: apiEndpoint, data: modifiedRequestBody, headers: { 'Content-Type': 'application/json', 'anthropic-version': '2023-06-01', 'User-Agent': userAgent, ...filteredHeaders }, timeout: config.requestTimeout || 600000, signal: abortController.signal, validateStatus: () => true // 接受所有状态码 } if (proxyAgent) { requestConfig.httpAgent = proxyAgent requestConfig.httpsAgent = proxyAgent requestConfig.proxy = false } // 根据 API Key 格式选择认证方式 if (account.apiKey && account.apiKey.startsWith('sk-ant-')) { // Anthropic 官方 API Key 使用 x-api-key requestConfig.headers['x-api-key'] = account.apiKey logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key') } else { // 其他 API Key 使用 Authorization Bearer requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}` logger.debug('[DEBUG] Using Authorization Bearer authentication') } logger.debug( `[DEBUG] Initial headers before beta: ${JSON.stringify(requestConfig.headers, null, 2)}` ) // 添加beta header如果需要 if (options.betaHeader) { logger.debug(`[DEBUG] Adding beta header: ${options.betaHeader}`) requestConfig.headers['anthropic-beta'] = options.betaHeader } else { logger.debug('[DEBUG] No beta header to add') } // 发送请求 logger.debug( '📤 Sending request to Claude Console API with headers:', JSON.stringify(requestConfig.headers, null, 2) ) const response = await axios(requestConfig) // 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成) // 因为 Claude API 限流基于请求发送时刻计算(RPM),不是请求完成时刻 if (queueLockAcquired && queueRequestId && accountId) { try { await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) queueLockAcquired = false // 标记已释放,防止 finally 重复释放 logger.debug( `📬 User message queue lock released early for console account ${accountId}, requestId: ${queueRequestId}` ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock early for console account ${accountId}:`, releaseError.message ) } } // 移除监听器(请求成功完成) if (clientRequest) { clientRequest.removeListener('close', handleClientDisconnect) } if (clientResponse) { clientResponse.removeListener('close', handleClientDisconnect) } logger.debug(`🔗 Claude Console API response: ${response.status}`) logger.debug(`[DEBUG] Response headers: ${JSON.stringify(response.headers)}`) logger.debug(`[DEBUG] Response data type: ${typeof response.data}`) logger.debug( `[DEBUG] Response data length: ${response.data ? (typeof response.data === 'string' ? response.data.length : JSON.stringify(response.data).length) : 0}` ) // 对于错误响应,记录原始错误和清理后的预览 if (response.status < 200 || response.status >= 300) { // 记录原始错误响应(包含供应商信息,用于调试) const rawData = typeof response.data === 'string' ? response.data : JSON.stringify(response.data) logger.error( `📝 Upstream error response from ${account?.name || accountId}: ${rawData.substring(0, 500)}` ) // 记录清理后的数据到error try { const responseData = typeof response.data === 'string' ? JSON.parse(response.data) : response.data const sanitizedData = sanitizeUpstreamError(responseData) logger.error(`🧹 [SANITIZED] Error response to client: ${JSON.stringify(sanitizedData)}`) } catch (e) { const rawText = typeof response.data === 'string' ? response.data : JSON.stringify(response.data) const sanitizedText = sanitizeErrorMessage(rawText) logger.error(`🧹 [SANITIZED] Error response to client: ${sanitizedText}`) } } else { logger.debug( `[DEBUG] Response data preview: ${typeof response.data === 'string' ? response.data.substring(0, 200) : JSON.stringify(response.data).substring(0, 200)}` ) } // 检查是否为账户禁用/不可用的 400 错误 const accountDisabledError = isAccountDisabledError(response.status, response.data) // 检查错误状态并相应处理 if (response.status === 401) { logger.warn( `🚫 Unauthorized error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) if (!autoProtectionDisabled) { await claudeConsoleAccountService.markAccountUnauthorized(accountId) } } else if (accountDisabledError) { logger.error( `🚫 Account disabled error (400) detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) // 传入完整的错误详情到 webhook const errorDetails = typeof response.data === 'string' ? response.data : JSON.stringify(response.data) if (!autoProtectionDisabled) { await claudeConsoleAccountService.markConsoleAccountBlocked(accountId, errorDetails) } } else if (response.status === 429) { logger.warn( `🚫 Rate limit detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) // 收到429先检查是否因为超过了手动配置的每日额度 await claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => { logger.error('❌ Failed to check quota after 429 error:', err) }) if (!autoProtectionDisabled) { await claudeConsoleAccountService.markAccountRateLimited(accountId) } } else if (response.status === 529) { logger.warn( `🚫 Overload error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) if (!autoProtectionDisabled) { await claudeConsoleAccountService.markAccountOverloaded(accountId) } } else if (response.status === 200 || response.status === 201) { // 如果请求成功,检查并移除错误状态 const isRateLimited = await claudeConsoleAccountService.isAccountRateLimited(accountId) if (isRateLimited) { await claudeConsoleAccountService.removeAccountRateLimit(accountId) } const isOverloaded = await claudeConsoleAccountService.isAccountOverloaded(accountId) if (isOverloaded) { await claudeConsoleAccountService.removeAccountOverload(accountId) } } // 更新最后使用时间 await this._updateLastUsedTime(accountId) // 准备响应体并清理错误信息(如果是错误响应) let responseBody if (response.status < 200 || response.status >= 300) { // 错误响应,清理供应商信息 try { const responseData = typeof response.data === 'string' ? JSON.parse(response.data) : response.data const sanitizedData = sanitizeUpstreamError(responseData) responseBody = JSON.stringify(sanitizedData) logger.debug(`🧹 Sanitized error response`) } catch (parseError) { // 如果无法解析为JSON,尝试清理文本 const rawText = typeof response.data === 'string' ? response.data : JSON.stringify(response.data) responseBody = sanitizeErrorMessage(rawText) logger.debug(`🧹 Sanitized error text`) } } else { // 成功响应,不需要清理 responseBody = typeof response.data === 'string' ? response.data : JSON.stringify(response.data) } logger.debug(`[DEBUG] Final response body to return: ${responseBody.substring(0, 200)}...`) return { statusCode: response.status, headers: response.headers, body: responseBody, accountId } } catch (error) { // 处理特定错误 if ( error.name === 'AbortError' || error.name === 'CanceledError' || error.code === 'ECONNABORTED' || error.code === 'ERR_CANCELED' ) { logger.info('Request aborted due to client disconnect') throw new Error('Client disconnected') } logger.error( `❌ Claude Console relay request failed (Account: ${account?.name || accountId}):`, error.message ) // 不再因为模型不支持而block账号 throw error } finally { // 🔓 并发控制:释放并发槽位 if (concurrencyAcquired) { try { await redis.decrConsoleAccountConcurrency(accountId, requestId) logger.debug( `🔓 Released concurrency slot for account ${account?.name || accountId}, request: ${requestId}` ) } catch (releaseError) { logger.error( `❌ Failed to release concurrency slot for account ${accountId}, request: ${requestId}:`, releaseError.message ) } } // 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) logger.debug( `📬 User message queue lock released in finally for console account ${accountId}, requestId: ${queueRequestId}` ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for account ${accountId}:`, releaseError.message ) } } } } // 🌊 处理流式响应 async relayStreamRequestWithUsageCapture( requestBody, apiKeyData, responseStream, clientHeaders, usageCallback, accountId, streamTransformer = null, options = {} ) { let account = null const requestId = uuidv4() // 用于并发追踪 let concurrencyAcquired = false let leaseRefreshInterval = null // 租约刷新定时器 let queueLockAcquired = false let queueRequestId = null try { // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 if (userMessageQueueService.isUserMessageRequest(requestBody)) { // 校验 accountId 非空,避免空值污染队列锁键 if (!accountId || accountId === '') { logger.error( '❌ accountId missing for queue lock in console relayStreamRequestWithUsageCapture' ) throw new Error('accountId missing for queue lock') } const queueResult = await userMessageQueueService.acquireQueueLock(accountId) if (!queueResult.acquired && !queueResult.skipped) { // 区分 Redis 后端错误和队列超时 const isBackendError = queueResult.error === 'queue_backend_error' const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' const errorMessage = isBackendError ? 'Queue service temporarily unavailable, please retry later' : 'User message queue wait timeout, please retry later' const statusCode = isBackendError ? 500 : 503 // 结构化性能日志,用于后续统计 logger.performance('user_message_queue_error', { errorType, errorCode, accountId, statusCode, stream: true, apiKeyName: apiKeyData.name, backendError: isBackendError ? queueResult.errorMessage : undefined }) logger.warn( `📬 User message queue ${errorType} for console account ${accountId} (stream), key: ${apiKeyData.name}`, isBackendError ? { backendError: queueResult.errorMessage } : {} ) if (!responseStream.headersSent) { const existingConnection = responseStream.getHeader ? responseStream.getHeader('Connection') : null responseStream.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: existingConnection || 'keep-alive', 'x-user-message-queue-error': errorType }) } const errorEvent = `event: error\ndata: ${JSON.stringify({ type: 'error', error: { type: errorType, code: errorCode, message: errorMessage } })}\n\n` responseStream.write(errorEvent) responseStream.write('data: [DONE]\n\n') responseStream.end() return } if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId logger.debug( `📬 User message queue lock acquired for console account ${accountId} (stream), requestId: ${queueRequestId}` ) } } // 获取账户信息 account = await claudeConsoleAccountService.getAccount(accountId) if (!account) { throw new Error('Claude Console Claude account not found') } logger.info( `📡 Processing streaming Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}` ) // 🔒 并发控制:原子性抢占槽位 if (account.maxConcurrentTasks > 0) { // 先抢占,再检查 - 避免竞态条件 const newConcurrency = Number( await redis.incrConsoleAccountConcurrency(accountId, requestId, 600) ) concurrencyAcquired = true // 检查是否超过限制 if (newConcurrency > account.maxConcurrentTasks) { // 超限,立即回滚 await redis.decrConsoleAccountConcurrency(accountId, requestId) concurrencyAcquired = false logger.warn( `⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (stream request: ${requestId}, rolled back)` ) const error = new Error('Console account concurrency limit reached') error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL' error.accountId = accountId throw error } logger.debug( `🔓 Acquired concurrency slot for stream account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}` ) // 🔄 启动租约刷新定时器(每5分钟刷新一次,防止长连接租约过期) leaseRefreshInterval = setInterval( async () => { try { await redis.refreshConsoleAccountConcurrencyLease(accountId, requestId, 600) logger.debug( `🔄 Refreshed concurrency lease for stream account ${account.name} (${accountId}), request: ${requestId}` ) } catch (refreshError) { logger.error( `❌ Failed to refresh concurrency lease for account ${accountId}, request: ${requestId}:`, refreshError.message ) } }, 5 * 60 * 1000 ) // 5分钟刷新一次 } logger.debug(`🌐 Account API URL: ${account.apiUrl}`) // 处理模型映射 let mappedModel = requestBody.model if ( account.supportedModels && typeof account.supportedModels === 'object' && !Array.isArray(account.supportedModels) ) { const newModel = claudeConsoleAccountService.getMappedModel( account.supportedModels, requestBody.model ) if (newModel !== requestBody.model) { logger.info(`🔄 [Stream] Mapping model from ${requestBody.model} to ${newModel}`) mappedModel = newModel } } // 创建修改后的请求体 const modifiedRequestBody = { ...requestBody, model: mappedModel } // 模型兼容性检查已经在调度器中完成,这里不需要再检查 // 创建代理agent const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy) // 发送流式请求 await this._makeClaudeConsoleStreamRequest( modifiedRequestBody, account, proxyAgent, clientHeaders, responseStream, accountId, usageCallback, streamTransformer, options, // 📬 回调:在收到响应头时释放队列锁 async () => { if (queueLockAcquired && queueRequestId && accountId) { try { await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) queueLockAcquired = false // 标记已释放,防止 finally 重复释放 logger.debug( `📬 User message queue lock released early for console stream account ${accountId}, requestId: ${queueRequestId}` ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock early for console stream account ${accountId}:`, releaseError.message ) } } } ) // 更新最后使用时间 await this._updateLastUsedTime(accountId) } catch (error) { // 客户端主动断开连接是正常情况,使用 INFO 级别 if (error.message === 'Client disconnected') { logger.info( `🔌 Claude Console stream relay ended: Client disconnected (Account: ${account?.name || accountId})` ) } else { logger.error( `❌ Claude Console stream relay failed (Account: ${account?.name || accountId}):`, error ) } throw error } finally { // 🛑 清理租约刷新定时器 if (leaseRefreshInterval) { clearInterval(leaseRefreshInterval) logger.debug( `🛑 Cleared lease refresh interval for stream account ${account?.name || accountId}, request: ${requestId}` ) } // 🔓 并发控制:释放并发槽位 if (concurrencyAcquired) { try { await redis.decrConsoleAccountConcurrency(accountId, requestId) logger.debug( `🔓 Released concurrency slot for stream account ${account?.name || accountId}, request: ${requestId}` ) } catch (releaseError) { logger.error( `❌ Failed to release concurrency slot for stream account ${accountId}, request: ${requestId}:`, releaseError.message ) } } // 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) logger.debug( `📬 User message queue lock released in finally for console stream account ${accountId}, requestId: ${queueRequestId}` ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for stream account ${accountId}:`, releaseError.message ) } } } } // 🌊 发送流式请求到Claude Console API async _makeClaudeConsoleStreamRequest( body, account, proxyAgent, clientHeaders, responseStream, accountId, usageCallback, streamTransformer = null, requestOptions = {}, onResponseHeaderReceived = null ) { return new Promise((resolve, reject) => { let aborted = false // 构建完整的API URL const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠 const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` logger.debug(`🎯 Final API endpoint for stream: ${apiEndpoint}`) // 过滤客户端请求头 const filteredHeaders = this._filterClientHeaders(clientHeaders) logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`) // 决定使用的 User-Agent:优先使用账户自定义的,否则透传客户端的,最后才使用默认值 const userAgent = account.userAgent || clientHeaders?.['user-agent'] || clientHeaders?.['User-Agent'] || this.defaultUserAgent // 准备请求配置 const requestConfig = { method: 'POST', url: apiEndpoint, data: body, headers: { 'Content-Type': 'application/json', 'anthropic-version': '2023-06-01', 'User-Agent': userAgent, ...filteredHeaders }, timeout: config.requestTimeout || 600000, responseType: 'stream', validateStatus: () => true // 接受所有状态码 } if (proxyAgent) { requestConfig.httpAgent = proxyAgent requestConfig.httpsAgent = proxyAgent requestConfig.proxy = false } // 根据 API Key 格式选择认证方式 if (account.apiKey && account.apiKey.startsWith('sk-ant-')) { // Anthropic 官方 API Key 使用 x-api-key requestConfig.headers['x-api-key'] = account.apiKey logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key') } else { // 其他 API Key 使用 Authorization Bearer requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}` logger.debug('[DEBUG] Using Authorization Bearer authentication') } // 添加beta header如果需要 if (requestOptions.betaHeader) { requestConfig.headers['anthropic-beta'] = requestOptions.betaHeader } // 发送请求 const request = axios(requestConfig) // 注意:使用 .then(async ...) 模式处理响应 // - 内部的 releaseQueueLock 有独立的 try-catch,不会导致未捕获异常 // - queueLockAcquired = false 的赋值会在 finally 执行前完成(JS 单线程保证) request .then(async (response) => { logger.debug(`🌊 Claude Console Claude stream response status: ${response.status}`) // 错误响应处理 if (response.status !== 200) { logger.error( `❌ Claude Console API returned error status: ${response.status} | Account: ${account?.name || accountId}` ) // 收集错误数据用于检测 let errorDataForCheck = '' const errorChunks = [] response.data.on('data', (chunk) => { errorChunks.push(chunk) errorDataForCheck += chunk.toString() }) response.data.on('end', async () => { const autoProtectionDisabled = account.disableAutoProtection === true // 记录原始错误消息到日志(方便调试,包含供应商信息) logger.error( `📝 [Stream] Upstream error response from ${account?.name || accountId}: ${errorDataForCheck.substring(0, 500)}` ) // 检查是否为账户禁用错误 const accountDisabledError = isAccountDisabledError( response.status, errorDataForCheck ) if (response.status === 401) { logger.warn( `🚫 [Stream] Unauthorized error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) if (!autoProtectionDisabled) { await claudeConsoleAccountService.markAccountUnauthorized(accountId) } } else if (accountDisabledError) { logger.error( `🚫 [Stream] Account disabled error (400) detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) // 传入完整的错误详情到 webhook if (!autoProtectionDisabled) { await claudeConsoleAccountService.markConsoleAccountBlocked( accountId, errorDataForCheck ) } } else if (response.status === 429) { logger.warn( `🚫 [Stream] Rate limit detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) // 检查是否因为超过每日额度 claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => { logger.error('❌ Failed to check quota after 429 error:', err) }) if (!autoProtectionDisabled) { await claudeConsoleAccountService.markAccountRateLimited(accountId) } } else if (response.status === 529) { logger.warn( `🚫 [Stream] Overload error detected for Claude Console account ${accountId}${autoProtectionDisabled ? ' (auto-protection disabled, skipping status change)' : ''}` ) if (!autoProtectionDisabled) { await claudeConsoleAccountService.markAccountOverloaded(accountId) } } // 设置响应头 if (!responseStream.headersSent) { responseStream.writeHead(response.status, { 'Content-Type': 'application/json', 'Cache-Control': 'no-cache' }) } // 清理并发送错误响应 try { const fullErrorData = Buffer.concat(errorChunks).toString() const errorJson = JSON.parse(fullErrorData) const sanitizedError = sanitizeUpstreamError(errorJson) // 记录清理后的错误消息(发送给客户端的,完整记录) logger.error( `🧹 [Stream] [SANITIZED] Error response to client: ${JSON.stringify(sanitizedError)}` ) if (isStreamWritable(responseStream)) { responseStream.write(JSON.stringify(sanitizedError)) responseStream.end() } } catch (parseError) { const sanitizedText = sanitizeErrorMessage(errorDataForCheck) logger.error(`🧹 [Stream] [SANITIZED] Error response to client: ${sanitizedText}`) if (isStreamWritable(responseStream)) { responseStream.write(sanitizedText) responseStream.end() } } resolve() // 不抛出异常,正常完成流处理 }) return } // 📬 收到成功响应头(HTTP 200),调用回调释放队列锁 // 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成 if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') { try { await onResponseHeaderReceived() } catch (callbackError) { logger.error( `❌ Failed to execute onResponseHeaderReceived callback for console stream account ${accountId}:`, callbackError.message ) } } // 成功响应,检查并移除错误状态 claudeConsoleAccountService.isAccountRateLimited(accountId).then((isRateLimited) => { if (isRateLimited) { claudeConsoleAccountService.removeAccountRateLimit(accountId) } }) claudeConsoleAccountService.isAccountOverloaded(accountId).then((isOverloaded) => { if (isOverloaded) { claudeConsoleAccountService.removeAccountOverload(accountId) } }) // 设置响应头 // ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close // 当并发队列功能启用时,auth.js 会设置 Connection: close 来禁用 Keep-Alive if (!responseStream.headersSent) { const existingConnection = responseStream.getHeader ? responseStream.getHeader('Connection') : null const connectionHeader = existingConnection || 'keep-alive' if (existingConnection) { logger.debug( `🔌 [Console Stream] Preserving existing Connection header: ${existingConnection}` ) } responseStream.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: connectionHeader, 'X-Accel-Buffering': 'no' }) } let buffer = '' let finalUsageReported = false const collectedUsageData = { model: body.model || account?.defaultModel || null } // 处理流数据 response.data.on('data', (chunk) => { try { if (aborted) { return } const chunkStr = chunk.toString() buffer += chunkStr // 处理完整的SSE行 const lines = buffer.split('\n') buffer = lines.pop() || '' // 转发数据并解析usage if (lines.length > 0) { // 检查流是否可写(客户端连接是否有效) if (isStreamWritable(responseStream)) { const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') // 应用流转换器如果有 let dataToWrite = linesToForward if (streamTransformer) { const transformed = streamTransformer(linesToForward) if (transformed) { dataToWrite = transformed } else { dataToWrite = null } } if (dataToWrite) { responseStream.write(dataToWrite) } } else { // 客户端连接已断开,记录警告(但仍继续解析usage) logger.warn( `⚠️ [Console] Client disconnected during stream, skipping ${lines.length} lines for account: ${account?.name || accountId}` ) } // 解析SSE数据寻找usage信息(无论连接状态如何) for (const line of lines) { if (line.startsWith('data:')) { const jsonStr = line.slice(5).trimStart() if (!jsonStr || jsonStr === '[DONE]') { continue } try { const data = JSON.parse(jsonStr) // 收集usage数据 if (data.type === 'message_start' && data.message && data.message.usage) { collectedUsageData.input_tokens = data.message.usage.input_tokens || 0 collectedUsageData.cache_creation_input_tokens = data.message.usage.cache_creation_input_tokens || 0 collectedUsageData.cache_read_input_tokens = data.message.usage.cache_read_input_tokens || 0 collectedUsageData.model = data.message.model // 检查是否有详细的 cache_creation 对象 if ( data.message.usage.cache_creation && typeof data.message.usage.cache_creation === 'object' ) { collectedUsageData.cache_creation = { ephemeral_5m_input_tokens: data.message.usage.cache_creation.ephemeral_5m_input_tokens || 0, ephemeral_1h_input_tokens: data.message.usage.cache_creation.ephemeral_1h_input_tokens || 0 } logger.info( '📊 Collected detailed cache creation data:', JSON.stringify(collectedUsageData.cache_creation) ) } } if (data.type === 'message_delta' && data.usage) { // 提取所有usage字段,message_delta可能包含完整的usage信息 if (data.usage.output_tokens !== undefined) { collectedUsageData.output_tokens = data.usage.output_tokens || 0 } // 提取input_tokens(如果存在) if (data.usage.input_tokens !== undefined) { collectedUsageData.input_tokens = data.usage.input_tokens || 0 } // 提取cache相关的tokens if (data.usage.cache_creation_input_tokens !== undefined) { collectedUsageData.cache_creation_input_tokens = data.usage.cache_creation_input_tokens || 0 } if (data.usage.cache_read_input_tokens !== undefined) { collectedUsageData.cache_read_input_tokens = data.usage.cache_read_input_tokens || 0 } // 检查是否有详细的 cache_creation 对象 if ( data.usage.cache_creation && typeof data.usage.cache_creation === 'object' ) { collectedUsageData.cache_creation = { ephemeral_5m_input_tokens: data.usage.cache_creation.ephemeral_5m_input_tokens || 0, ephemeral_1h_input_tokens: data.usage.cache_creation.ephemeral_1h_input_tokens || 0 } } logger.info( '📊 [Console] Collected usage data from message_delta:', JSON.stringify(collectedUsageData) ) // 如果已经收集到了完整数据,触发回调 if ( collectedUsageData.input_tokens !== undefined && collectedUsageData.output_tokens !== undefined && !finalUsageReported ) { if (!collectedUsageData.model) { collectedUsageData.model = body.model || account?.defaultModel || null } logger.info( '🎯 [Console] Complete usage data collected:', JSON.stringify(collectedUsageData) ) if (usageCallback && typeof usageCallback === 'function') { usageCallback({ ...collectedUsageData, accountId }) } finalUsageReported = true } } // 不再因为模型不支持而block账号 } catch (e) { // 忽略解析错误 } } } } } catch (error) { logger.error( `❌ Error processing Claude Console stream data (Account: ${account?.name || accountId}):`, error ) if (isStreamWritable(responseStream)) { // 如果有 streamTransformer(如测试请求),使用前端期望的格式 if (streamTransformer) { responseStream.write( `data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n` ) } else { responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: 'Stream processing error', message: error.message, timestamp: new Date().toISOString() })}\n\n` ) } } } }) response.data.on('end', () => { try { // 处理缓冲区中剩余的数据 if (buffer.trim() && isStreamWritable(responseStream)) { if (streamTransformer) { const transformed = streamTransformer(buffer) if (transformed) { responseStream.write(transformed) } } else { responseStream.write(buffer) } } // 🔧 兜底逻辑:确保所有未保存的usage数据都不会丢失 if (!finalUsageReported) { if ( collectedUsageData.input_tokens !== undefined || collectedUsageData.output_tokens !== undefined ) { // 补全缺失的字段 if (collectedUsageData.input_tokens === undefined) { collectedUsageData.input_tokens = 0 logger.warn( '⚠️ [Console] message_delta missing input_tokens, setting to 0. This may indicate incomplete usage data.' ) } if (collectedUsageData.output_tokens === undefined) { collectedUsageData.output_tokens = 0 logger.warn( '⚠️ [Console] message_delta missing output_tokens, setting to 0. This may indicate incomplete usage data.' ) } // 确保有 model 字段 if (!collectedUsageData.model) { collectedUsageData.model = body.model || account?.defaultModel || null } logger.info( `📊 [Console] Saving incomplete usage data via fallback: ${JSON.stringify(collectedUsageData)}` ) if (usageCallback && typeof usageCallback === 'function') { usageCallback({ ...collectedUsageData, accountId }) } finalUsageReported = true } else { logger.warn( '⚠️ [Console] Stream completed but no usage data was captured! This indicates a problem with SSE parsing or API response format.' ) } } // 确保流正确结束 if (isStreamWritable(responseStream)) { // 📊 诊断日志:流结束前状态 logger.info( `📤 [STREAM] Ending response | destroyed: ${responseStream.destroyed}, ` + `socketDestroyed: ${responseStream.socket?.destroyed}, ` + `socketBytesWritten: ${responseStream.socket?.bytesWritten || 0}` ) // 禁用 Nagle 算法确保数据立即发送 if (responseStream.socket && !responseStream.socket.destroyed) { responseStream.socket.setNoDelay(true) } // 等待数据完全 flush 到客户端后再 resolve responseStream.end(() => { logger.info( `✅ [STREAM] Response ended and flushed | socketBytesWritten: ${responseStream.socket?.bytesWritten || 'unknown'}` ) resolve() }) } else { // 连接已断开,记录警告 logger.warn( `⚠️ [Console] Client disconnected before stream end, data may not have been received | account: ${account?.name || accountId}` ) resolve() } } catch (error) { logger.error('❌ Error processing stream end:', error) reject(error) } }) response.data.on('error', (error) => { logger.error( `❌ Claude Console stream error (Account: ${account?.name || accountId}):`, error ) if (isStreamWritable(responseStream)) { // 如果有 streamTransformer(如测试请求),使用前端期望的格式 if (streamTransformer) { responseStream.write( `data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n` ) } else { responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: 'Stream error', message: error.message, timestamp: new Date().toISOString() })}\n\n` ) } responseStream.end() } reject(error) }) }) .catch((error) => { if (aborted) { return } logger.error( `❌ Claude Console stream request error (Account: ${account?.name || accountId}):`, error.message ) // 检查错误状态 if (error.response) { if (error.response.status === 401) { claudeConsoleAccountService.markAccountUnauthorized(accountId) } else if (error.response.status === 429) { claudeConsoleAccountService.markAccountRateLimited(accountId) // 检查是否因为超过每日额度 claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => { logger.error('❌ Failed to check quota after 429 error:', err) }) } else if (error.response.status === 529) { claudeConsoleAccountService.markAccountOverloaded(accountId) } } // 发送错误响应 if (!responseStream.headersSent) { const existingConnection = responseStream.getHeader ? responseStream.getHeader('Connection') : null responseStream.writeHead(error.response?.status || 500, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: existingConnection || 'keep-alive' }) } if (isStreamWritable(responseStream)) { // 如果有 streamTransformer(如测试请求),使用前端期望的格式 if (streamTransformer) { responseStream.write( `data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n` ) } else { responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: error.message, code: error.code, timestamp: new Date().toISOString() })}\n\n` ) } responseStream.end() } reject(error) }) // 处理客户端断开连接 responseStream.on('close', () => { logger.debug('🔌 Client disconnected, cleaning up Claude Console stream') aborted = true }) }) } // 🔧 过滤客户端请求头 _filterClientHeaders(clientHeaders) { // 使用统一的 headerFilter 工具类(白名单模式) // 与 claudeRelayService 保持一致,避免透传 CDN headers 触发上游 API 安全检查 return filterForClaude(clientHeaders) } // 🕐 更新最后使用时间 async _updateLastUsedTime(accountId) { try { const client = require('../models/redis').getClientSafe() const accountKey = `claude_console_account:${accountId}` const exists = await client.exists(accountKey) if (!exists) { logger.debug(`🔎 跳过更新已删除的Claude Console账号最近使用时间: ${accountId}`) return } await client.hset(accountKey, 'lastUsedAt', new Date().toISOString()) } catch (error) { logger.warn( `⚠️ Failed to update last used time for Claude Console account ${accountId}:`, error.message ) } } // 🧪 创建测试用的流转换器,将 Claude API SSE 格式转换为前端期望的格式 _createTestStreamTransformer() { let testStartSent = false return (rawData) => { const lines = rawData.split('\n') const outputLines = [] for (const line of lines) { if (!line.startsWith('data: ')) { // 保留空行用于 SSE 分隔 if (line.trim() === '') { outputLines.push('') } continue } const jsonStr = line.substring(6).trim() if (!jsonStr || jsonStr === '[DONE]') { continue } try { const data = JSON.parse(jsonStr) // 发送 test_start 事件(只在第一次 message_start 时发送) if (data.type === 'message_start' && !testStartSent) { testStartSent = true outputLines.push(`data: ${JSON.stringify({ type: 'test_start' })}`) outputLines.push('') } // 转换 content_block_delta 为 content if (data.type === 'content_block_delta' && data.delta && data.delta.text) { outputLines.push(`data: ${JSON.stringify({ type: 'content', text: data.delta.text })}`) outputLines.push('') } // 转换 message_stop 为 test_complete if (data.type === 'message_stop') { outputLines.push(`data: ${JSON.stringify({ type: 'test_complete', success: true })}`) outputLines.push('') } // 处理错误事件 if (data.type === 'error') { const errorMsg = data.error?.message || data.message || '未知错误' outputLines.push(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}`) outputLines.push('') } } catch { // 忽略解析错误 } } return outputLines.length > 0 ? outputLines.join('\n') : null } } // 🧪 测试账号连接(供Admin API使用) async testAccountConnection(accountId, responseStream) { const { sendStreamTestRequest } = require('../utils/testPayloadHelper') try { const account = await claudeConsoleAccountService.getAccount(accountId) if (!account) { throw new Error('Account not found') } logger.info(`🧪 Testing Claude Console account connection: ${account.name} (${accountId})`) const cleanUrl = account.apiUrl.replace(/\/$/, '') const apiUrl = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages?beta=true` await sendStreamTestRequest({ apiUrl, authorization: `Bearer ${account.apiKey}`, responseStream, proxyAgent: claudeConsoleAccountService._createProxyAgent(account.proxy), extraHeaders: account.userAgent ? { 'User-Agent': account.userAgent } : {} }) } catch (error) { logger.error(`❌ Test account connection failed:`, error) if (!responseStream.headersSent) { responseStream.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache' }) } if (isStreamWritable(responseStream)) { responseStream.write( `data: ${JSON.stringify({ type: 'test_complete', success: false, error: error.message })}\n\n` ) responseStream.end() } } } // 🎯 健康检查 async healthCheck() { try { const accounts = await claudeConsoleAccountService.getAllAccounts() const activeAccounts = accounts.filter((acc) => acc.isActive && acc.status === 'active') return { healthy: activeAccounts.length > 0, activeAccounts: activeAccounts.length, totalAccounts: accounts.length, timestamp: new Date().toISOString() } } catch (error) { logger.error('❌ Claude Console Claude health check failed:', error) return { healthy: false, error: error.message, timestamp: new Date().toISOString() } } } } module.exports = new ClaudeConsoleRelayService()