fix: 增强console账号test端点

This commit is contained in:
shaw
2025-12-01 15:08:40 +08:00
parent d3155b82ea
commit e8e6f972b4
4 changed files with 279 additions and 461 deletions

View File

@@ -5,6 +5,7 @@ const apiKeyService = require('../services/apiKeyService')
const CostCalculator = require('../utils/costCalculator') const CostCalculator = require('../utils/costCalculator')
const claudeAccountService = require('../services/claudeAccountService') const claudeAccountService = require('../services/claudeAccountService')
const openaiAccountService = require('../services/openaiAccountService') const openaiAccountService = require('../services/openaiAccountService')
const { createClaudeTestPayload } = require('../utils/testPayloadHelper')
const router = express.Router() const router = express.Router()
@@ -792,8 +793,8 @@ router.post('/api/batch-model-stats', async (req, res) => {
// 🧪 API Key 端点测试接口 - 测试API Key是否能正常访问服务 // 🧪 API Key 端点测试接口 - 测试API Key是否能正常访问服务
router.post('/api-key/test', async (req, res) => { router.post('/api-key/test', async (req, res) => {
const axios = require('axios')
const config = require('../../config/config') const config = require('../../config/config')
const { sendStreamTestRequest } = require('../utils/testPayloadHelper')
try { try {
const { apiKey, model = 'claude-sonnet-4-5-20250929' } = req.body const { apiKey, model = 'claude-sonnet-4-5-20250929' } = req.body
@@ -805,7 +806,6 @@ router.post('/api-key/test', async (req, res) => {
}) })
} }
// 基本格式验证
if (typeof apiKey !== 'string' || apiKey.length < 10 || apiKey.length > 512) { if (typeof apiKey !== 'string' || apiKey.length < 10 || apiKey.length > 512) {
return res.status(400).json({ return res.status(400).json({
error: 'Invalid API key format', error: 'Invalid API key format',
@@ -813,7 +813,6 @@ router.post('/api-key/test', async (req, res) => {
}) })
} }
// 首先验证API Key是否有效不触发激活
const validation = await apiKeyService.validateApiKeyForStats(apiKey) const validation = await apiKeyService.validateApiKeyForStats(apiKey)
if (!validation.valid) { if (!validation.valid) {
return res.status(401).json({ return res.status(401).json({
@@ -824,244 +823,29 @@ router.post('/api-key/test', async (req, res) => {
logger.api(`🧪 API Key test started for: ${validation.keyData.name} (${validation.keyData.id})`) logger.api(`🧪 API Key test started for: ${validation.keyData.name} (${validation.keyData.id})`)
// 设置SSE响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
// 发送测试开始事件
res.write(`data: ${JSON.stringify({ type: 'test_start', message: 'Test started' })}\n\n`)
// 构建测试请求,模拟 Claude CLI 客户端
const port = config.server.port || 3000 const port = config.server.port || 3000
const baseURL = `http://127.0.0.1:${port}` const apiUrl = `http://127.0.0.1:${port}/api/v1/messages?beta=true`
const testPayload = { await sendStreamTestRequest({
model, apiUrl,
messages: [ authorization: apiKey,
{ responseStream: res,
role: 'user', payload: createClaudeTestPayload(model, { stream: true }),
content: 'hi' timeout: 60000,
} extraHeaders: { 'x-api-key': apiKey }
],
system: [
{
type: 'text',
text: "You are Claude Code, Anthropic's official CLI for Claude."
}
],
max_tokens: 32000,
temperature: 1,
stream: true
}
const headers = {
'Content-Type': 'application/json',
'User-Agent': 'claude-cli/2.0.52 (external, cli)',
'x-api-key': apiKey,
'anthropic-version': config.claude.apiVersion || '2023-06-01'
}
// 向自身服务发起测试请求
// 使用 validateStatus 允许所有状态码通过,以便我们可以处理流式错误响应
const response = await axios.post(`${baseURL}/api/v1/messages`, testPayload, {
headers,
responseType: 'stream',
timeout: 60000, // 60秒超时
validateStatus: () => true // 接受所有状态码,自行处理错误
})
// 检查响应状态码如果不是2xx尝试读取错误信息
if (response.status >= 400) {
logger.error(
`🧪 API Key test received error status ${response.status} for: ${validation.keyData.name}`
)
// 尝试从流中读取错误信息
let errorBody = ''
for await (const chunk of response.data) {
errorBody += chunk.toString()
}
let errorMessage = `HTTP ${response.status}`
try {
// 尝试解析SSE格式的错误
const lines = errorBody.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const dataStr = line.substring(6).trim()
if (dataStr && dataStr !== '[DONE]') {
const data = JSON.parse(dataStr)
if (data.error?.message) {
errorMessage = data.error.message
break
} else if (data.message) {
errorMessage = data.message
break
} else if (typeof data.error === 'string') {
errorMessage = data.error
break
}
}
}
}
// 如果不是SSE格式尝试直接解析JSON
if (errorMessage === `HTTP ${response.status}`) {
const jsonError = JSON.parse(errorBody)
errorMessage =
jsonError.error?.message || jsonError.message || jsonError.error || errorMessage
}
} catch {
// 解析失败,使用原始错误体或默认消息
if (errorBody && errorBody.length < 500) {
errorMessage = errorBody
}
}
res.write(`data: ${JSON.stringify({ type: 'error', error: errorMessage })}\n\n`)
res.write(
`data: ${JSON.stringify({ type: 'test_complete', success: false, error: errorMessage })}\n\n`
)
res.end()
return
}
let receivedContent = ''
let testSuccess = false
let upstreamError = null
// 处理流式响应
response.data.on('data', (chunk) => {
const lines = chunk.toString().split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const dataStr = line.substring(6).trim()
if (dataStr === '[DONE]') {
continue
}
try {
const data = JSON.parse(dataStr)
// 检查上游返回的错误事件
if (data.type === 'error' || data.error) {
let errorMsg = 'Unknown upstream error'
// 优先从 data.error 提取(如果是对象,获取其 message
if (typeof data.error === 'object' && data.error?.message) {
errorMsg = data.error.message
} else if (typeof data.error === 'string' && data.error !== 'Claude API error') {
// 如果 error 是字符串且不是通用错误,直接使用
errorMsg = data.error
} else if (data.details) {
// 尝试从 details 字段解析详细错误claudeRelayService 格式)
try {
const details =
typeof data.details === 'string' ? JSON.parse(data.details) : data.details
if (details.error?.message) {
errorMsg = details.error.message
} else if (details.message) {
errorMsg = details.message
}
} catch {
// details 不是有效 JSON尝试直接使用
if (typeof data.details === 'string' && data.details.length < 500) {
errorMsg = data.details
}
}
} else if (data.message) {
errorMsg = data.message
}
// 添加状态码信息(如果有)
if (data.status && errorMsg !== 'Unknown upstream error') {
errorMsg = `[${data.status}] ${errorMsg}`
}
upstreamError = errorMsg
logger.error(`🧪 Upstream error in test for: ${validation.keyData.name}:`, errorMsg)
res.write(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}\n\n`)
continue
}
// 提取文本内容
if (data.type === 'content_block_delta' && data.delta?.text) {
receivedContent += data.delta.text
res.write(`data: ${JSON.stringify({ type: 'content', text: data.delta.text })}\n\n`)
}
// 消息结束
if (data.type === 'message_stop') {
testSuccess = true
res.write(`data: ${JSON.stringify({ type: 'message_stop' })}\n\n`)
}
} catch {
// 忽略解析错误
}
}
}
})
response.data.on('end', () => {
// 如果有上游错误,标记为失败
if (upstreamError) {
testSuccess = false
}
logger.api(
`🧪 API Key test completed for: ${validation.keyData.name}, success: ${testSuccess}, content length: ${receivedContent.length}${upstreamError ? `, error: ${upstreamError}` : ''}`
)
res.write(
`data: ${JSON.stringify({
type: 'test_complete',
success: testSuccess,
contentLength: receivedContent.length,
error: upstreamError || undefined
})}\n\n`
)
res.end()
})
response.data.on('error', (err) => {
logger.error(`🧪 API Key test stream error for: ${validation.keyData.name}`, err)
// 如果已经捕获了上游错误,优先使用那个
let errorMsg = upstreamError || err.message || 'Stream error'
// 如果错误消息是通用的 "Claude API error: xxx",提供更友好的提示
if (errorMsg.startsWith('Claude API error:') && upstreamError) {
errorMsg = upstreamError
}
res.write(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}\n\n`)
res.write(
`data: ${JSON.stringify({ type: 'test_complete', success: false, error: errorMsg })}\n\n`
)
res.end()
})
// 处理客户端断开连接
req.on('close', () => {
if (!res.writableEnded) {
response.data.destroy()
}
}) })
} catch (error) { } catch (error) {
logger.error('❌ API Key test failed:', error) logger.error('❌ API Key test failed:', error)
// 如果还未发送响应头返回JSON错误
if (!res.headersSent) { if (!res.headersSent) {
return res.status(500).json({ return res.status(500).json({
error: 'Test failed', error: 'Test failed',
message: error.response?.data?.error?.message || error.message || 'Internal server error' message: error.message || 'Internal server error'
}) })
} }
// 如果已经是SSE流发送错误事件
res.write( res.write(
`data: ${JSON.stringify({ type: 'error', error: error.response?.data?.error?.message || error.message || 'Test failed' })}\n\n` `data: ${JSON.stringify({ type: 'error', error: error.message || 'Test failed' })}\n\n`
) )
res.end() res.end()
} }

View File

@@ -1113,55 +1113,11 @@ class ClaudeConsoleRelayService {
} }
} }
// 🧪 测试账号连接供Admin API使用独立处理以确保错误时也返回SSE格式 // 🧪 测试账号连接供Admin API使用
async testAccountConnection(accountId, responseStream) { async testAccountConnection(accountId, responseStream) {
const testRequestBody = { const { sendStreamTestRequest } = require('../utils/testPayloadHelper')
model: 'claude-sonnet-4-5-20250929',
max_tokens: 32000,
stream: true,
messages: [
{
role: 'user',
content: 'hi'
}
],
system: [
{
type: 'text',
text: "You are Claude Code, Anthropic's official CLI for Claude."
}
]
}
// 辅助函数:发送 SSE 事件
const sendSSEEvent = (type, data) => {
if (!responseStream.destroyed && !responseStream.writableEnded) {
try {
responseStream.write(`data: ${JSON.stringify({ type, ...data })}\n\n`)
} catch {
// 忽略写入错误
}
}
}
// 辅助函数:结束测试并关闭流
const endTest = (success, error = null) => {
if (!responseStream.destroyed && !responseStream.writableEnded) {
try {
if (success) {
sendSSEEvent('test_complete', { success: true })
} else {
sendSSEEvent('test_complete', { success: false, error: error || '测试失败' })
}
responseStream.end()
} catch {
// 忽略写入错误
}
}
}
try { try {
// 获取账户信息
const account = await claudeConsoleAccountService.getAccount(accountId) const account = await claudeConsoleAccountService.getAccount(accountId)
if (!account) { if (!account) {
throw new Error('Account not found') throw new Error('Account not found')
@@ -1169,178 +1125,32 @@ class ClaudeConsoleRelayService {
logger.info(`🧪 Testing Claude Console account connection: ${account.name} (${accountId})`) logger.info(`🧪 Testing Claude Console account connection: ${account.name} (${accountId})`)
// 创建代理agent
const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy)
// 设置响应头
if (!responseStream.headersSent) {
responseStream.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no'
})
}
// 发送测试开始事件
sendSSEEvent('test_start', {})
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') const cleanUrl = account.apiUrl.replace(/\/$/, '')
const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` const apiUrl = cleanUrl.endsWith('/v1/messages')
? cleanUrl
: `${cleanUrl}/v1/messages?beta=true`
// 决定使用的 User-Agent await sendStreamTestRequest({
const userAgent = account.userAgent || this.defaultUserAgent apiUrl,
authorization: `Bearer ${account.apiKey}`,
// 准备请求配置 responseStream,
const requestConfig = { proxyAgent: claudeConsoleAccountService._createProxyAgent(account.proxy),
method: 'POST', extraHeaders: account.userAgent ? { 'User-Agent': account.userAgent } : {}
url: apiEndpoint,
data: testRequestBody,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': userAgent
},
timeout: 30000, // 测试请求使用较短超时
responseType: 'stream',
validateStatus: () => true
}
if (proxyAgent) {
requestConfig.httpAgent = proxyAgent
requestConfig.httpsAgent = proxyAgent
requestConfig.proxy = false
}
// 设置认证方式
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) {
requestConfig.headers['x-api-key'] = account.apiKey
} else {
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}`
}
// 发送请求
const response = await axios(requestConfig)
logger.debug(`🌊 Claude Console test response status: ${response.status}`)
// 处理非200响应
if (response.status !== 200) {
logger.error(
`❌ Claude Console API returned error status: ${response.status} | Account: ${account?.name || accountId}`
)
// 收集错误响应数据
return new Promise((resolve) => {
const errorChunks = []
response.data.on('data', (chunk) => {
errorChunks.push(chunk)
})
response.data.on('end', () => {
try {
const fullErrorData = Buffer.concat(errorChunks).toString()
logger.error(
`📝 [Test] Upstream error response from ${account?.name || accountId}: ${fullErrorData.substring(0, 500)}`
)
// 尝试解析错误信息
let errorMessage = `API Error: ${response.status}`
try {
const errorJson = JSON.parse(fullErrorData)
// 直接提取所有可能的错误信息字段
errorMessage =
errorJson.message ||
errorJson.error?.message ||
errorJson.statusMessage ||
errorJson.error ||
(typeof errorJson === 'string' ? errorJson : JSON.stringify(errorJson))
} catch {
errorMessage = fullErrorData.substring(0, 200) || `API Error: ${response.status}`
}
endTest(false, errorMessage)
resolve()
} catch {
endTest(false, `API Error: ${response.status}`)
resolve()
}
})
response.data.on('error', (err) => {
endTest(false, err.message || '流读取错误')
resolve()
})
})
}
// 处理成功的流式响应
return new Promise((resolve) => {
let buffer = ''
response.data.on('data', (chunk) => {
try {
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.startsWith('data: ')) {
continue
}
const jsonStr = line.substring(6).trim()
if (!jsonStr || jsonStr === '[DONE]') {
continue
}
try {
const data = JSON.parse(jsonStr)
// 转换 content_block_delta 为 content
if (data.type === 'content_block_delta' && data.delta && data.delta.text) {
sendSSEEvent('content', { text: data.delta.text })
}
// 处理消息完成
if (data.type === 'message_stop') {
endTest(true)
}
// 处理错误事件
if (data.type === 'error') {
const errorMsg = data.error?.message || data.message || '未知错误'
endTest(false, errorMsg)
}
} catch {
// 忽略解析错误
}
}
} catch {
// 忽略处理错误
}
})
response.data.on('end', () => {
logger.info(`✅ Test request completed for account: ${account.name}`)
// 如果还没结束,发送完成事件
if (!responseStream.destroyed && !responseStream.writableEnded) {
endTest(true)
}
resolve()
})
response.data.on('error', (err) => {
logger.error(`❌ Test stream error:`, err)
endTest(false, err.message || '流处理错误')
resolve()
})
}) })
} catch (error) { } catch (error) {
logger.error(`❌ Test account connection failed:`, error) logger.error(`❌ Test account connection failed:`, error)
endTest(false, error.message || '测试失败') if (!responseStream.headersSent) {
responseStream.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
})
}
if (!responseStream.destroyed && !responseStream.writableEnded) {
responseStream.write(
`data: ${JSON.stringify({ type: 'test_complete', success: false, error: error.message })}\n\n`
)
responseStream.end()
}
} }
} }

View File

@@ -13,6 +13,7 @@ const redis = require('../models/redis')
const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator') const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator')
const { formatDateWithTimezone } = require('../utils/dateHelper') const { formatDateWithTimezone } = require('../utils/dateHelper')
const requestIdentityService = require('./requestIdentityService') const requestIdentityService = require('./requestIdentityService')
const { createClaudeTestPayload } = require('../utils/testPayloadHelper')
class ClaudeRelayService { class ClaudeRelayService {
constructor() { constructor() {
@@ -2244,26 +2245,7 @@ class ClaudeRelayService {
// 🧪 测试账号连接供Admin API使用直接复用 _makeClaudeStreamRequestWithUsageCapture // 🧪 测试账号连接供Admin API使用直接复用 _makeClaudeStreamRequestWithUsageCapture
async testAccountConnection(accountId, responseStream) { async testAccountConnection(accountId, responseStream) {
const testRequestBody = { const testRequestBody = createClaudeTestPayload('claude-sonnet-4-5-20250929', { stream: true })
model: 'claude-sonnet-4-5-20250929',
max_tokens: 100,
stream: true,
system: [
{
type: 'text',
text: this.claudeCodeSystemPrompt,
cache_control: {
type: 'ephemeral'
}
}
],
messages: [
{
role: 'user',
content: 'hi'
}
]
}
try { try {
// 获取账户信息 // 获取账户信息

View File

@@ -0,0 +1,242 @@
const crypto = require('crypto')
/**
* 生成随机十六进制字符串
* @param {number} bytes - 字节数
* @returns {string} 十六进制字符串
*/
function randomHex(bytes = 32) {
return crypto.randomBytes(bytes).toString('hex')
}
/**
* 生成 Claude Code 风格的会话字符串
* @returns {string} 会话字符串,格式: user_{64位hex}_account__session_{uuid}
*/
function generateSessionString() {
const hex64 = randomHex(32) // 32 bytes => 64 hex characters
const uuid = crypto.randomUUID()
return `user_${hex64}_account__session_${uuid}`
}
/**
* 生成 Claude 测试请求体
* @param {string} model - 模型名称
* @param {object} options - 可选配置
* @param {boolean} options.stream - 是否流式默认false
* @returns {object} 测试请求体
*/
function createClaudeTestPayload(model = 'claude-sonnet-4-5-20250929', options = {}) {
const payload = {
model,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: 'hi',
cache_control: {
type: 'ephemeral'
}
}
]
}
],
system: [
{
type: 'text',
text: "You are Claude Code, Anthropic's official CLI for Claude.",
cache_control: {
type: 'ephemeral'
}
}
],
metadata: {
user_id: generateSessionString()
},
max_tokens: 21333,
temperature: 1
}
if (options.stream) {
payload.stream = true
}
return payload
}
/**
* 发送流式测试请求并处理SSE响应
* @param {object} options - 配置选项
* @param {string} options.apiUrl - API URL
* @param {string} options.authorization - Authorization header值
* @param {object} options.responseStream - Express响应流
* @param {object} [options.payload] - 请求体默认使用createClaudeTestPayload
* @param {object} [options.proxyAgent] - 代理agent
* @param {number} [options.timeout] - 超时时间默认30000
* @param {object} [options.extraHeaders] - 额外的请求头
* @returns {Promise<void>}
*/
async function sendStreamTestRequest(options) {
const axios = require('axios')
const logger = require('./logger')
const {
apiUrl,
authorization,
responseStream,
payload = createClaudeTestPayload('claude-sonnet-4-5-20250929', { stream: true }),
proxyAgent = null,
timeout = 30000,
extraHeaders = {}
} = options
const sendSSE = (type, data = {}) => {
if (!responseStream.destroyed && !responseStream.writableEnded) {
try {
responseStream.write(`data: ${JSON.stringify({ type, ...data })}\n\n`)
} catch {
// ignore
}
}
}
const endTest = (success, error = null) => {
if (!responseStream.destroyed && !responseStream.writableEnded) {
try {
responseStream.write(
`data: ${JSON.stringify({ type: 'test_complete', success, error: error || undefined })}\n\n`
)
responseStream.end()
} catch {
// ignore
}
}
}
// 设置响应头
if (!responseStream.headersSent) {
responseStream.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no'
})
}
sendSSE('test_start', { message: 'Test started' })
const requestConfig = {
method: 'POST',
url: apiUrl,
data: payload,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': 'claude-cli/2.0.52 (external, cli)',
authorization,
...extraHeaders
},
timeout,
responseType: 'stream',
validateStatus: () => true
}
if (proxyAgent) {
requestConfig.httpAgent = proxyAgent
requestConfig.httpsAgent = proxyAgent
requestConfig.proxy = false
}
try {
const response = await axios(requestConfig)
logger.debug(`🌊 Test response status: ${response.status}`)
// 处理非200响应
if (response.status !== 200) {
return new Promise((resolve) => {
const chunks = []
response.data.on('data', (chunk) => chunks.push(chunk))
response.data.on('end', () => {
const errorData = Buffer.concat(chunks).toString()
let errorMsg = `API Error: ${response.status}`
try {
const json = JSON.parse(errorData)
errorMsg = json.message || json.error?.message || json.error || errorMsg
} catch {
if (errorData.length < 200) {
errorMsg = errorData || errorMsg
}
}
endTest(false, errorMsg)
resolve()
})
response.data.on('error', (err) => {
endTest(false, err.message)
resolve()
})
})
}
// 处理成功的流式响应
return new Promise((resolve) => {
let buffer = ''
response.data.on('data', (chunk) => {
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.startsWith('data:')) {
continue
}
const jsonStr = line.substring(5).trim()
if (!jsonStr || jsonStr === '[DONE]') {
continue
}
try {
const data = JSON.parse(jsonStr)
if (data.type === 'content_block_delta' && data.delta?.text) {
sendSSE('content', { text: data.delta.text })
}
if (data.type === 'message_stop') {
sendSSE('message_stop')
}
if (data.type === 'error' || data.error) {
const errMsg = data.error?.message || data.message || data.error || 'Unknown error'
sendSSE('error', { error: errMsg })
}
} catch {
// ignore parse errors
}
}
})
response.data.on('end', () => {
if (!responseStream.destroyed && !responseStream.writableEnded) {
endTest(true)
}
resolve()
})
response.data.on('error', (err) => {
endTest(false, err.message)
resolve()
})
})
} catch (error) {
logger.error('❌ Stream test request failed:', error.message)
endTest(false, error.message)
}
}
module.exports = {
randomHex,
generateSessionString,
createClaudeTestPayload,
sendStreamTestRequest
}