diff --git a/src/routes/apiStats.js b/src/routes/apiStats.js index 808d48e5..308b18c6 100644 --- a/src/routes/apiStats.js +++ b/src/routes/apiStats.js @@ -5,6 +5,7 @@ const apiKeyService = require('../services/apiKeyService') const CostCalculator = require('../utils/costCalculator') const claudeAccountService = require('../services/claudeAccountService') const openaiAccountService = require('../services/openaiAccountService') +const { createClaudeTestPayload } = require('../utils/testPayloadHelper') const router = express.Router() @@ -792,8 +793,8 @@ router.post('/api/batch-model-stats', async (req, res) => { // 🧪 API Key 端点测试接口 - 测试API Key是否能正常访问服务 router.post('/api-key/test', async (req, res) => { - const axios = require('axios') const config = require('../../config/config') + const { sendStreamTestRequest } = require('../utils/testPayloadHelper') try { const { apiKey, model = 'claude-sonnet-4-5-20250929' } = req.body @@ -805,7 +806,6 @@ router.post('/api-key/test', async (req, res) => { }) } - // 基本格式验证 if (typeof apiKey !== 'string' || apiKey.length < 10 || apiKey.length > 512) { return res.status(400).json({ error: 'Invalid API key format', @@ -813,7 +813,6 @@ router.post('/api-key/test', async (req, res) => { }) } - // 首先验证API Key是否有效(不触发激活) const validation = await apiKeyService.validateApiKeyForStats(apiKey) if (!validation.valid) { return res.status(401).json({ @@ -824,244 +823,29 @@ router.post('/api-key/test', async (req, res) => { logger.api(`🧪 API Key test started for: ${validation.keyData.name} (${validation.keyData.id})`) - // 设置SSE响应头 - res.setHeader('Content-Type', 'text/event-stream') - res.setHeader('Cache-Control', 'no-cache') - res.setHeader('Connection', 'keep-alive') - res.setHeader('X-Accel-Buffering', 'no') - - // 发送测试开始事件 - res.write(`data: ${JSON.stringify({ type: 'test_start', message: 'Test started' })}\n\n`) - - // 构建测试请求,模拟 Claude CLI 客户端 const port = config.server.port || 3000 - const baseURL = `http://127.0.0.1:${port}` + const apiUrl = `http://127.0.0.1:${port}/api/v1/messages?beta=true` - const testPayload = { - model, - messages: [ - { - role: 'user', - content: 'hi' - } - ], - system: [ - { - type: 'text', - text: "You are Claude Code, Anthropic's official CLI for Claude." - } - ], - max_tokens: 32000, - temperature: 1, - stream: true - } - - const headers = { - 'Content-Type': 'application/json', - 'User-Agent': 'claude-cli/2.0.52 (external, cli)', - 'x-api-key': apiKey, - 'anthropic-version': config.claude.apiVersion || '2023-06-01' - } - - // 向自身服务发起测试请求 - // 使用 validateStatus 允许所有状态码通过,以便我们可以处理流式错误响应 - const response = await axios.post(`${baseURL}/api/v1/messages`, testPayload, { - headers, - responseType: 'stream', - timeout: 60000, // 60秒超时 - validateStatus: () => true // 接受所有状态码,自行处理错误 - }) - - // 检查响应状态码,如果不是2xx,尝试读取错误信息 - if (response.status >= 400) { - logger.error( - `🧪 API Key test received error status ${response.status} for: ${validation.keyData.name}` - ) - - // 尝试从流中读取错误信息 - let errorBody = '' - for await (const chunk of response.data) { - errorBody += chunk.toString() - } - - let errorMessage = `HTTP ${response.status}` - try { - // 尝试解析SSE格式的错误 - const lines = errorBody.split('\n') - for (const line of lines) { - if (line.startsWith('data: ')) { - const dataStr = line.substring(6).trim() - if (dataStr && dataStr !== '[DONE]') { - const data = JSON.parse(dataStr) - if (data.error?.message) { - errorMessage = data.error.message - break - } else if (data.message) { - errorMessage = data.message - break - } else if (typeof data.error === 'string') { - errorMessage = data.error - break - } - } - } - } - // 如果不是SSE格式,尝试直接解析JSON - if (errorMessage === `HTTP ${response.status}`) { - const jsonError = JSON.parse(errorBody) - errorMessage = - jsonError.error?.message || jsonError.message || jsonError.error || errorMessage - } - } catch { - // 解析失败,使用原始错误体或默认消息 - if (errorBody && errorBody.length < 500) { - errorMessage = errorBody - } - } - - res.write(`data: ${JSON.stringify({ type: 'error', error: errorMessage })}\n\n`) - res.write( - `data: ${JSON.stringify({ type: 'test_complete', success: false, error: errorMessage })}\n\n` - ) - res.end() - return - } - - let receivedContent = '' - let testSuccess = false - let upstreamError = null - - // 处理流式响应 - response.data.on('data', (chunk) => { - const lines = chunk.toString().split('\n') - - for (const line of lines) { - if (line.startsWith('data: ')) { - const dataStr = line.substring(6).trim() - if (dataStr === '[DONE]') { - continue - } - - try { - const data = JSON.parse(dataStr) - - // 检查上游返回的错误事件 - if (data.type === 'error' || data.error) { - let errorMsg = 'Unknown upstream error' - - // 优先从 data.error 提取(如果是对象,获取其 message) - if (typeof data.error === 'object' && data.error?.message) { - errorMsg = data.error.message - } else if (typeof data.error === 'string' && data.error !== 'Claude API error') { - // 如果 error 是字符串且不是通用错误,直接使用 - errorMsg = data.error - } else if (data.details) { - // 尝试从 details 字段解析详细错误(claudeRelayService 格式) - try { - const details = - typeof data.details === 'string' ? JSON.parse(data.details) : data.details - if (details.error?.message) { - errorMsg = details.error.message - } else if (details.message) { - errorMsg = details.message - } - } catch { - // details 不是有效 JSON,尝试直接使用 - if (typeof data.details === 'string' && data.details.length < 500) { - errorMsg = data.details - } - } - } else if (data.message) { - errorMsg = data.message - } - - // 添加状态码信息(如果有) - if (data.status && errorMsg !== 'Unknown upstream error') { - errorMsg = `[${data.status}] ${errorMsg}` - } - - upstreamError = errorMsg - logger.error(`🧪 Upstream error in test for: ${validation.keyData.name}:`, errorMsg) - res.write(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}\n\n`) - continue - } - - // 提取文本内容 - if (data.type === 'content_block_delta' && data.delta?.text) { - receivedContent += data.delta.text - res.write(`data: ${JSON.stringify({ type: 'content', text: data.delta.text })}\n\n`) - } - - // 消息结束 - if (data.type === 'message_stop') { - testSuccess = true - res.write(`data: ${JSON.stringify({ type: 'message_stop' })}\n\n`) - } - } catch { - // 忽略解析错误 - } - } - } - }) - - response.data.on('end', () => { - // 如果有上游错误,标记为失败 - if (upstreamError) { - testSuccess = false - } - - logger.api( - `🧪 API Key test completed for: ${validation.keyData.name}, success: ${testSuccess}, content length: ${receivedContent.length}${upstreamError ? `, error: ${upstreamError}` : ''}` - ) - res.write( - `data: ${JSON.stringify({ - type: 'test_complete', - success: testSuccess, - contentLength: receivedContent.length, - error: upstreamError || undefined - })}\n\n` - ) - res.end() - }) - - response.data.on('error', (err) => { - logger.error(`🧪 API Key test stream error for: ${validation.keyData.name}`, err) - - // 如果已经捕获了上游错误,优先使用那个 - let errorMsg = upstreamError || err.message || 'Stream error' - - // 如果错误消息是通用的 "Claude API error: xxx",提供更友好的提示 - if (errorMsg.startsWith('Claude API error:') && upstreamError) { - errorMsg = upstreamError - } - - res.write(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}\n\n`) - res.write( - `data: ${JSON.stringify({ type: 'test_complete', success: false, error: errorMsg })}\n\n` - ) - res.end() - }) - - // 处理客户端断开连接 - req.on('close', () => { - if (!res.writableEnded) { - response.data.destroy() - } + await sendStreamTestRequest({ + apiUrl, + authorization: apiKey, + responseStream: res, + payload: createClaudeTestPayload(model, { stream: true }), + timeout: 60000, + extraHeaders: { 'x-api-key': apiKey } }) } catch (error) { logger.error('❌ API Key test failed:', error) - // 如果还未发送响应头,返回JSON错误 if (!res.headersSent) { return res.status(500).json({ error: 'Test failed', - message: error.response?.data?.error?.message || error.message || 'Internal server error' + message: error.message || 'Internal server error' }) } - // 如果已经是SSE流,发送错误事件 res.write( - `data: ${JSON.stringify({ type: 'error', error: error.response?.data?.error?.message || error.message || 'Test failed' })}\n\n` + `data: ${JSON.stringify({ type: 'error', error: error.message || 'Test failed' })}\n\n` ) res.end() } diff --git a/src/services/claudeConsoleRelayService.js b/src/services/claudeConsoleRelayService.js index 5f01cfc2..436adf1c 100644 --- a/src/services/claudeConsoleRelayService.js +++ b/src/services/claudeConsoleRelayService.js @@ -1113,55 +1113,11 @@ class ClaudeConsoleRelayService { } } - // 🧪 测试账号连接(供Admin API使用,独立处理以确保错误时也返回SSE格式) + // 🧪 测试账号连接(供Admin API使用) async testAccountConnection(accountId, responseStream) { - const testRequestBody = { - model: 'claude-sonnet-4-5-20250929', - max_tokens: 32000, - stream: true, - messages: [ - { - role: 'user', - content: 'hi' - } - ], - system: [ - { - type: 'text', - text: "You are Claude Code, Anthropic's official CLI for Claude." - } - ] - } - - // 辅助函数:发送 SSE 事件 - const sendSSEEvent = (type, data) => { - if (!responseStream.destroyed && !responseStream.writableEnded) { - try { - responseStream.write(`data: ${JSON.stringify({ type, ...data })}\n\n`) - } catch { - // 忽略写入错误 - } - } - } - - // 辅助函数:结束测试并关闭流 - const endTest = (success, error = null) => { - if (!responseStream.destroyed && !responseStream.writableEnded) { - try { - if (success) { - sendSSEEvent('test_complete', { success: true }) - } else { - sendSSEEvent('test_complete', { success: false, error: error || '测试失败' }) - } - responseStream.end() - } catch { - // 忽略写入错误 - } - } - } + const { sendStreamTestRequest } = require('../utils/testPayloadHelper') try { - // 获取账户信息 const account = await claudeConsoleAccountService.getAccount(accountId) if (!account) { throw new Error('Account not found') @@ -1169,178 +1125,32 @@ class ClaudeConsoleRelayService { logger.info(`🧪 Testing Claude Console account connection: ${account.name} (${accountId})`) - // 创建代理agent - const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy) - - // 设置响应头 - if (!responseStream.headersSent) { - responseStream.writeHead(200, { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - 'X-Accel-Buffering': 'no' - }) - } - - // 发送测试开始事件 - sendSSEEvent('test_start', {}) - - // 构建完整的API URL const cleanUrl = account.apiUrl.replace(/\/$/, '') - const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` + const apiUrl = cleanUrl.endsWith('/v1/messages') + ? cleanUrl + : `${cleanUrl}/v1/messages?beta=true` - // 决定使用的 User-Agent - const userAgent = account.userAgent || this.defaultUserAgent - - // 准备请求配置 - const requestConfig = { - method: 'POST', - url: apiEndpoint, - data: testRequestBody, - headers: { - 'Content-Type': 'application/json', - 'anthropic-version': '2023-06-01', - 'User-Agent': userAgent - }, - timeout: 30000, // 测试请求使用较短超时 - responseType: 'stream', - validateStatus: () => true - } - - if (proxyAgent) { - requestConfig.httpAgent = proxyAgent - requestConfig.httpsAgent = proxyAgent - requestConfig.proxy = false - } - - // 设置认证方式 - if (account.apiKey && account.apiKey.startsWith('sk-ant-')) { - requestConfig.headers['x-api-key'] = account.apiKey - } else { - requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}` - } - - // 发送请求 - const response = await axios(requestConfig) - - logger.debug(`🌊 Claude Console test response status: ${response.status}`) - - // 处理非200响应 - if (response.status !== 200) { - logger.error( - `❌ Claude Console API returned error status: ${response.status} | Account: ${account?.name || accountId}` - ) - - // 收集错误响应数据 - return new Promise((resolve) => { - const errorChunks = [] - - response.data.on('data', (chunk) => { - errorChunks.push(chunk) - }) - - response.data.on('end', () => { - try { - const fullErrorData = Buffer.concat(errorChunks).toString() - logger.error( - `📝 [Test] Upstream error response from ${account?.name || accountId}: ${fullErrorData.substring(0, 500)}` - ) - - // 尝试解析错误信息 - let errorMessage = `API Error: ${response.status}` - try { - const errorJson = JSON.parse(fullErrorData) - // 直接提取所有可能的错误信息字段 - errorMessage = - errorJson.message || - errorJson.error?.message || - errorJson.statusMessage || - errorJson.error || - (typeof errorJson === 'string' ? errorJson : JSON.stringify(errorJson)) - } catch { - errorMessage = fullErrorData.substring(0, 200) || `API Error: ${response.status}` - } - - endTest(false, errorMessage) - resolve() - } catch { - endTest(false, `API Error: ${response.status}`) - resolve() - } - }) - - response.data.on('error', (err) => { - endTest(false, err.message || '流读取错误') - resolve() - }) - }) - } - - // 处理成功的流式响应 - return new Promise((resolve) => { - let buffer = '' - - response.data.on('data', (chunk) => { - try { - buffer += chunk.toString() - const lines = buffer.split('\n') - buffer = lines.pop() || '' - - for (const line of lines) { - if (!line.startsWith('data: ')) { - continue - } - - const jsonStr = line.substring(6).trim() - if (!jsonStr || jsonStr === '[DONE]') { - continue - } - - try { - const data = JSON.parse(jsonStr) - - // 转换 content_block_delta 为 content - if (data.type === 'content_block_delta' && data.delta && data.delta.text) { - sendSSEEvent('content', { text: data.delta.text }) - } - - // 处理消息完成 - if (data.type === 'message_stop') { - endTest(true) - } - - // 处理错误事件 - if (data.type === 'error') { - const errorMsg = data.error?.message || data.message || '未知错误' - endTest(false, errorMsg) - } - } catch { - // 忽略解析错误 - } - } - } catch { - // 忽略处理错误 - } - }) - - response.data.on('end', () => { - logger.info(`✅ Test request completed for account: ${account.name}`) - // 如果还没结束,发送完成事件 - if (!responseStream.destroyed && !responseStream.writableEnded) { - endTest(true) - } - resolve() - }) - - response.data.on('error', (err) => { - logger.error(`❌ Test stream error:`, err) - endTest(false, err.message || '流处理错误') - resolve() - }) + await sendStreamTestRequest({ + apiUrl, + authorization: `Bearer ${account.apiKey}`, + responseStream, + proxyAgent: claudeConsoleAccountService._createProxyAgent(account.proxy), + extraHeaders: account.userAgent ? { 'User-Agent': account.userAgent } : {} }) } catch (error) { logger.error(`❌ Test account connection failed:`, error) - endTest(false, error.message || '测试失败') + if (!responseStream.headersSent) { + responseStream.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache' + }) + } + if (!responseStream.destroyed && !responseStream.writableEnded) { + responseStream.write( + `data: ${JSON.stringify({ type: 'test_complete', success: false, error: error.message })}\n\n` + ) + responseStream.end() + } } } diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index ddf75f4e..2fd853a7 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -13,6 +13,7 @@ const redis = require('../models/redis') const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator') const { formatDateWithTimezone } = require('../utils/dateHelper') const requestIdentityService = require('./requestIdentityService') +const { createClaudeTestPayload } = require('../utils/testPayloadHelper') class ClaudeRelayService { constructor() { @@ -2244,26 +2245,7 @@ class ClaudeRelayService { // 🧪 测试账号连接(供Admin API使用,直接复用 _makeClaudeStreamRequestWithUsageCapture) async testAccountConnection(accountId, responseStream) { - const testRequestBody = { - model: 'claude-sonnet-4-5-20250929', - max_tokens: 100, - stream: true, - system: [ - { - type: 'text', - text: this.claudeCodeSystemPrompt, - cache_control: { - type: 'ephemeral' - } - } - ], - messages: [ - { - role: 'user', - content: 'hi' - } - ] - } + const testRequestBody = createClaudeTestPayload('claude-sonnet-4-5-20250929', { stream: true }) try { // 获取账户信息 diff --git a/src/utils/testPayloadHelper.js b/src/utils/testPayloadHelper.js new file mode 100644 index 00000000..c5e2ed41 --- /dev/null +++ b/src/utils/testPayloadHelper.js @@ -0,0 +1,242 @@ +const crypto = require('crypto') + +/** + * 生成随机十六进制字符串 + * @param {number} bytes - 字节数 + * @returns {string} 十六进制字符串 + */ +function randomHex(bytes = 32) { + return crypto.randomBytes(bytes).toString('hex') +} + +/** + * 生成 Claude Code 风格的会话字符串 + * @returns {string} 会话字符串,格式: user_{64位hex}_account__session_{uuid} + */ +function generateSessionString() { + const hex64 = randomHex(32) // 32 bytes => 64 hex characters + const uuid = crypto.randomUUID() + return `user_${hex64}_account__session_${uuid}` +} + +/** + * 生成 Claude 测试请求体 + * @param {string} model - 模型名称 + * @param {object} options - 可选配置 + * @param {boolean} options.stream - 是否流式(默认false) + * @returns {object} 测试请求体 + */ +function createClaudeTestPayload(model = 'claude-sonnet-4-5-20250929', options = {}) { + const payload = { + model, + messages: [ + { + role: 'user', + content: [ + { + type: 'text', + text: 'hi', + cache_control: { + type: 'ephemeral' + } + } + ] + } + ], + system: [ + { + type: 'text', + text: "You are Claude Code, Anthropic's official CLI for Claude.", + cache_control: { + type: 'ephemeral' + } + } + ], + metadata: { + user_id: generateSessionString() + }, + max_tokens: 21333, + temperature: 1 + } + + if (options.stream) { + payload.stream = true + } + + return payload +} + +/** + * 发送流式测试请求并处理SSE响应 + * @param {object} options - 配置选项 + * @param {string} options.apiUrl - API URL + * @param {string} options.authorization - Authorization header值 + * @param {object} options.responseStream - Express响应流 + * @param {object} [options.payload] - 请求体(默认使用createClaudeTestPayload) + * @param {object} [options.proxyAgent] - 代理agent + * @param {number} [options.timeout] - 超时时间(默认30000) + * @param {object} [options.extraHeaders] - 额外的请求头 + * @returns {Promise} + */ +async function sendStreamTestRequest(options) { + const axios = require('axios') + const logger = require('./logger') + + const { + apiUrl, + authorization, + responseStream, + payload = createClaudeTestPayload('claude-sonnet-4-5-20250929', { stream: true }), + proxyAgent = null, + timeout = 30000, + extraHeaders = {} + } = options + + const sendSSE = (type, data = {}) => { + if (!responseStream.destroyed && !responseStream.writableEnded) { + try { + responseStream.write(`data: ${JSON.stringify({ type, ...data })}\n\n`) + } catch { + // ignore + } + } + } + + const endTest = (success, error = null) => { + if (!responseStream.destroyed && !responseStream.writableEnded) { + try { + responseStream.write( + `data: ${JSON.stringify({ type: 'test_complete', success, error: error || undefined })}\n\n` + ) + responseStream.end() + } catch { + // ignore + } + } + } + + // 设置响应头 + if (!responseStream.headersSent) { + responseStream.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no' + }) + } + + sendSSE('test_start', { message: 'Test started' }) + + const requestConfig = { + method: 'POST', + url: apiUrl, + data: payload, + headers: { + 'Content-Type': 'application/json', + 'anthropic-version': '2023-06-01', + 'User-Agent': 'claude-cli/2.0.52 (external, cli)', + authorization, + ...extraHeaders + }, + timeout, + responseType: 'stream', + validateStatus: () => true + } + + if (proxyAgent) { + requestConfig.httpAgent = proxyAgent + requestConfig.httpsAgent = proxyAgent + requestConfig.proxy = false + } + + try { + const response = await axios(requestConfig) + logger.debug(`🌊 Test response status: ${response.status}`) + + // 处理非200响应 + if (response.status !== 200) { + return new Promise((resolve) => { + const chunks = [] + response.data.on('data', (chunk) => chunks.push(chunk)) + response.data.on('end', () => { + const errorData = Buffer.concat(chunks).toString() + let errorMsg = `API Error: ${response.status}` + try { + const json = JSON.parse(errorData) + errorMsg = json.message || json.error?.message || json.error || errorMsg + } catch { + if (errorData.length < 200) { + errorMsg = errorData || errorMsg + } + } + endTest(false, errorMsg) + resolve() + }) + response.data.on('error', (err) => { + endTest(false, err.message) + resolve() + }) + }) + } + + // 处理成功的流式响应 + return new Promise((resolve) => { + let buffer = '' + + response.data.on('data', (chunk) => { + buffer += chunk.toString() + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + for (const line of lines) { + if (!line.startsWith('data:')) { + continue + } + const jsonStr = line.substring(5).trim() + if (!jsonStr || jsonStr === '[DONE]') { + continue + } + + try { + const data = JSON.parse(jsonStr) + + if (data.type === 'content_block_delta' && data.delta?.text) { + sendSSE('content', { text: data.delta.text }) + } + if (data.type === 'message_stop') { + sendSSE('message_stop') + } + if (data.type === 'error' || data.error) { + const errMsg = data.error?.message || data.message || data.error || 'Unknown error' + sendSSE('error', { error: errMsg }) + } + } catch { + // ignore parse errors + } + } + }) + + response.data.on('end', () => { + if (!responseStream.destroyed && !responseStream.writableEnded) { + endTest(true) + } + resolve() + }) + + response.data.on('error', (err) => { + endTest(false, err.message) + resolve() + }) + }) + } catch (error) { + logger.error('❌ Stream test request failed:', error.message) + endTest(false, error.message) + } +} + +module.exports = { + randomHex, + generateSessionString, + createClaudeTestPayload, + sendStreamTestRequest +}