mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:43:35 +00:00
使用 Cloudflare 橙色云(CDN 代理模式)时,Cloudflare 会自动添加 CDN 相关的 headers (cf-*, x-forwarded-*, cdn-loop 等),这会触发上游 API 提供商的安全检查: 1. 已确认问题:88code API 检测到 CDN headers 后返回 403 Forbidden, 导致 Codex CLI 无法使用 2. 潜在风险:其他 API 提供商(OpenAI、Anthropic)可能也会因检测到 代理/CDN 特征而采取限制措施 创建统一的 headerFilter 工具类,在所有转发服务中过滤 Cloudflare CDN headers, 使转发请求伪装成正常的直接客户端请求。 1. 新增 src/utils/headerFilter.js - 统一的 CDN headers 过滤列表(13 个 Cloudflare headers) - 提供 filterForOpenAI() 和 filterForClaude() 方法 - 在现有过滤逻辑基础上添加 CDN header 过滤 2. 更新 src/services/openaiResponsesRelayService.js - 使用 filterForOpenAI() 替代内联的 _filterRequestHeaders() - 保持向后兼容性 3. 更新 src/services/claudeRelayService.js - 使用 filterForClaude() 替代 _filterClientHeaders() 实现 - 简化代码,移除重复的 header 列表定义 4. 修复 src/routes/openaiRoutes.js - 添加对 input 字段的类型检查(可以是数组或字符串) - 防止 "startsWith is not a function" 错误 x-real-ip, x-forwarded-for, x-forwarded-proto, x-forwarded-host, x-forwarded-port, x-accel-buffering, cf-ray, cf-connecting-ip, cf-ipcountry, cf-visitor, cf-request-id, cdn-loop, true-client-ip - ✅ Codex CLI 通过中转服务成功调用 88code API(之前返回 403) - ✅ 保留所有业务必需的 headers(conversation_id、session_id 等) - ✅ 移除所有 Cloudflare CDN 痕迹 - ✅ 保持橙色云的 DDoS 防护和 CDN 加速优势 - ✅ Docker 构建成功 1. 解决 88code 403 问题,Codex CLI 可正常使用 2. 降低因 CDN/代理特征被上游 API 识别的风险 3. 提升与各种 API 提供商的兼容性 4. 统一管理 CDN headers 过滤逻辑,便于维护
848 lines
29 KiB
JavaScript
848 lines
29 KiB
JavaScript
const axios = require('axios')
|
||
const ProxyHelper = require('../utils/proxyHelper')
|
||
const logger = require('../utils/logger')
|
||
const { filterForOpenAI } = require('../utils/headerFilter')
|
||
const openaiResponsesAccountService = require('./openaiResponsesAccountService')
|
||
const apiKeyService = require('./apiKeyService')
|
||
const unifiedOpenAIScheduler = require('./unifiedOpenAIScheduler')
|
||
const config = require('../../config/config')
|
||
const crypto = require('crypto')
|
||
|
||
// 抽取缓存写入 token,兼容多种字段命名
|
||
function extractCacheCreationTokens(usageData) {
|
||
if (!usageData || typeof usageData !== 'object') {
|
||
return 0
|
||
}
|
||
|
||
const details = usageData.input_tokens_details || usageData.prompt_tokens_details || {}
|
||
const candidates = [
|
||
details.cache_creation_input_tokens,
|
||
details.cache_creation_tokens,
|
||
usageData.cache_creation_input_tokens,
|
||
usageData.cache_creation_tokens
|
||
]
|
||
|
||
for (const value of candidates) {
|
||
if (value !== undefined && value !== null && value !== '') {
|
||
const parsed = Number(value)
|
||
if (!Number.isNaN(parsed)) {
|
||
return parsed
|
||
}
|
||
}
|
||
}
|
||
|
||
return 0
|
||
}
|
||
|
||
class OpenAIResponsesRelayService {
|
||
constructor() {
|
||
this.defaultTimeout = config.requestTimeout || 600000
|
||
}
|
||
|
||
// 处理请求转发
|
||
async handleRequest(req, res, account, apiKeyData) {
|
||
let abortController = null
|
||
// 获取会话哈希(如果有的话)
|
||
const sessionId = req.headers['session_id'] || req.body?.session_id
|
||
const sessionHash = sessionId
|
||
? crypto.createHash('sha256').update(sessionId).digest('hex')
|
||
: null
|
||
|
||
try {
|
||
// 获取完整的账户信息(包含解密的 API Key)
|
||
const fullAccount = await openaiResponsesAccountService.getAccount(account.id)
|
||
if (!fullAccount) {
|
||
throw new Error('Account not found')
|
||
}
|
||
|
||
// 创建 AbortController 用于取消请求
|
||
abortController = new AbortController()
|
||
|
||
// 设置客户端断开监听器
|
||
const handleClientDisconnect = () => {
|
||
logger.info('🔌 Client disconnected, aborting OpenAI-Responses request')
|
||
if (abortController && !abortController.signal.aborted) {
|
||
abortController.abort()
|
||
}
|
||
}
|
||
|
||
// 监听客户端断开事件
|
||
req.once('close', handleClientDisconnect)
|
||
res.once('close', handleClientDisconnect)
|
||
|
||
// 构建目标 URL
|
||
const targetUrl = `${fullAccount.baseApi}${req.path}`
|
||
logger.info(`🎯 Forwarding to: ${targetUrl}`)
|
||
|
||
// 构建请求头 - 使用统一的 headerFilter 移除 CDN headers
|
||
const headers = {
|
||
...filterForOpenAI(req.headers),
|
||
Authorization: `Bearer ${fullAccount.apiKey}`,
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
// 处理 User-Agent
|
||
if (fullAccount.userAgent) {
|
||
// 使用自定义 User-Agent
|
||
headers['User-Agent'] = fullAccount.userAgent
|
||
logger.debug(`📱 Using custom User-Agent: ${fullAccount.userAgent}`)
|
||
} else if (req.headers['user-agent']) {
|
||
// 透传原始 User-Agent
|
||
headers['User-Agent'] = req.headers['user-agent']
|
||
logger.debug(`📱 Forwarding original User-Agent: ${req.headers['user-agent']}`)
|
||
}
|
||
|
||
// 配置请求选项
|
||
const requestOptions = {
|
||
method: req.method,
|
||
url: targetUrl,
|
||
headers,
|
||
data: req.body,
|
||
timeout: this.defaultTimeout,
|
||
responseType: req.body?.stream ? 'stream' : 'json',
|
||
validateStatus: () => true, // 允许处理所有状态码
|
||
signal: abortController.signal
|
||
}
|
||
|
||
// 配置代理(如果有)
|
||
if (fullAccount.proxy) {
|
||
const proxyAgent = ProxyHelper.createProxyAgent(fullAccount.proxy)
|
||
if (proxyAgent) {
|
||
requestOptions.httpAgent = proxyAgent
|
||
requestOptions.httpsAgent = proxyAgent
|
||
requestOptions.proxy = false
|
||
logger.info(
|
||
`🌐 Using proxy for OpenAI-Responses: ${ProxyHelper.getProxyDescription(fullAccount.proxy)}`
|
||
)
|
||
}
|
||
}
|
||
|
||
// 记录请求信息
|
||
logger.info('📤 OpenAI-Responses relay request', {
|
||
accountId: account.id,
|
||
accountName: account.name,
|
||
targetUrl,
|
||
method: req.method,
|
||
stream: req.body?.stream || false,
|
||
model: req.body?.model || 'unknown',
|
||
userAgent: headers['User-Agent'] || 'not set'
|
||
})
|
||
|
||
// 发送请求
|
||
const response = await axios(requestOptions)
|
||
|
||
// 处理 429 限流错误
|
||
if (response.status === 429) {
|
||
const { resetsInSeconds, errorData } = await this._handle429Error(
|
||
account,
|
||
response,
|
||
req.body?.stream,
|
||
sessionHash
|
||
)
|
||
|
||
// 返回错误响应(使用处理后的数据,避免循环引用)
|
||
const errorResponse = errorData || {
|
||
error: {
|
||
message: 'Rate limit exceeded',
|
||
type: 'rate_limit_error',
|
||
code: 'rate_limit_exceeded',
|
||
resets_in_seconds: resetsInSeconds
|
||
}
|
||
}
|
||
return res.status(429).json(errorResponse)
|
||
}
|
||
|
||
// 处理其他错误状态码
|
||
if (response.status >= 400) {
|
||
// 处理流式错误响应
|
||
let errorData = response.data
|
||
if (response.data && typeof response.data.pipe === 'function') {
|
||
// 流式响应需要先读取内容
|
||
const chunks = []
|
||
await new Promise((resolve) => {
|
||
response.data.on('data', (chunk) => chunks.push(chunk))
|
||
response.data.on('end', resolve)
|
||
response.data.on('error', resolve)
|
||
setTimeout(resolve, 5000) // 超时保护
|
||
})
|
||
const fullResponse = Buffer.concat(chunks).toString()
|
||
|
||
// 尝试解析错误响应
|
||
try {
|
||
if (fullResponse.includes('data: ')) {
|
||
// SSE格式
|
||
const lines = fullResponse.split('\n')
|
||
for (const line of lines) {
|
||
if (line.startsWith('data: ')) {
|
||
const jsonStr = line.slice(6).trim()
|
||
if (jsonStr && jsonStr !== '[DONE]') {
|
||
errorData = JSON.parse(jsonStr)
|
||
break
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
// 普通JSON
|
||
errorData = JSON.parse(fullResponse)
|
||
}
|
||
} catch (e) {
|
||
logger.error('Failed to parse error response:', e)
|
||
errorData = { error: { message: fullResponse || 'Unknown error' } }
|
||
}
|
||
}
|
||
|
||
logger.error('OpenAI-Responses API error', {
|
||
status: response.status,
|
||
statusText: response.statusText,
|
||
errorData
|
||
})
|
||
|
||
if (response.status === 401) {
|
||
let reason = 'OpenAI Responses账号认证失败(401错误)'
|
||
if (errorData) {
|
||
if (typeof errorData === 'string' && errorData.trim()) {
|
||
reason = `OpenAI Responses账号认证失败(401错误):${errorData.trim()}`
|
||
} else if (
|
||
errorData.error &&
|
||
typeof errorData.error.message === 'string' &&
|
||
errorData.error.message.trim()
|
||
) {
|
||
reason = `OpenAI Responses账号认证失败(401错误):${errorData.error.message.trim()}`
|
||
} else if (typeof errorData.message === 'string' && errorData.message.trim()) {
|
||
reason = `OpenAI Responses账号认证失败(401错误):${errorData.message.trim()}`
|
||
}
|
||
}
|
||
|
||
try {
|
||
await unifiedOpenAIScheduler.markAccountUnauthorized(
|
||
account.id,
|
||
'openai-responses',
|
||
sessionHash,
|
||
reason
|
||
)
|
||
} catch (markError) {
|
||
logger.error(
|
||
'❌ Failed to mark OpenAI-Responses account unauthorized after 401:',
|
||
markError
|
||
)
|
||
}
|
||
|
||
let unauthorizedResponse = errorData
|
||
if (
|
||
!unauthorizedResponse ||
|
||
typeof unauthorizedResponse !== 'object' ||
|
||
unauthorizedResponse.pipe ||
|
||
Buffer.isBuffer(unauthorizedResponse)
|
||
) {
|
||
const fallbackMessage =
|
||
typeof errorData === 'string' && errorData.trim() ? errorData.trim() : 'Unauthorized'
|
||
unauthorizedResponse = {
|
||
error: {
|
||
message: fallbackMessage,
|
||
type: 'unauthorized',
|
||
code: 'unauthorized'
|
||
}
|
||
}
|
||
}
|
||
|
||
// 清理监听器
|
||
req.removeListener('close', handleClientDisconnect)
|
||
res.removeListener('close', handleClientDisconnect)
|
||
|
||
return res.status(401).json(unauthorizedResponse)
|
||
}
|
||
|
||
// 清理监听器
|
||
req.removeListener('close', handleClientDisconnect)
|
||
res.removeListener('close', handleClientDisconnect)
|
||
|
||
return res.status(response.status).json(errorData)
|
||
}
|
||
|
||
// 更新最后使用时间
|
||
await openaiResponsesAccountService.updateAccount(account.id, {
|
||
lastUsedAt: new Date().toISOString()
|
||
})
|
||
|
||
// 处理流式响应
|
||
if (req.body?.stream && response.data && typeof response.data.pipe === 'function') {
|
||
return this._handleStreamResponse(
|
||
response,
|
||
res,
|
||
account,
|
||
apiKeyData,
|
||
req.body?.model,
|
||
handleClientDisconnect,
|
||
req
|
||
)
|
||
}
|
||
|
||
// 处理非流式响应
|
||
return this._handleNormalResponse(response, res, account, apiKeyData, req.body?.model)
|
||
} catch (error) {
|
||
// 清理 AbortController
|
||
if (abortController && !abortController.signal.aborted) {
|
||
abortController.abort()
|
||
}
|
||
|
||
// 安全地记录错误,避免循环引用
|
||
const errorInfo = {
|
||
message: error.message,
|
||
code: error.code,
|
||
status: error.response?.status,
|
||
statusText: error.response?.statusText
|
||
}
|
||
logger.error('OpenAI-Responses relay error:', errorInfo)
|
||
|
||
// 检查是否是网络错误
|
||
if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') {
|
||
await openaiResponsesAccountService.updateAccount(account.id, {
|
||
status: 'error',
|
||
errorMessage: `Connection error: ${error.code}`
|
||
})
|
||
}
|
||
|
||
// 如果已经发送了响应头,直接结束
|
||
if (res.headersSent) {
|
||
return res.end()
|
||
}
|
||
|
||
// 检查是否是axios错误并包含响应
|
||
if (error.response) {
|
||
// 处理axios错误响应
|
||
const status = error.response.status || 500
|
||
let errorData = {
|
||
error: {
|
||
message: error.response.statusText || 'Request failed',
|
||
type: 'api_error',
|
||
code: error.code || 'unknown'
|
||
}
|
||
}
|
||
|
||
// 如果响应包含数据,尝试使用它
|
||
if (error.response.data) {
|
||
// 检查是否是流
|
||
if (typeof error.response.data === 'object' && !error.response.data.pipe) {
|
||
errorData = error.response.data
|
||
} else if (typeof error.response.data === 'string') {
|
||
try {
|
||
errorData = JSON.parse(error.response.data)
|
||
} catch (e) {
|
||
errorData.error.message = error.response.data
|
||
}
|
||
}
|
||
}
|
||
|
||
if (status === 401) {
|
||
let reason = 'OpenAI Responses账号认证失败(401错误)'
|
||
if (errorData) {
|
||
if (typeof errorData === 'string' && errorData.trim()) {
|
||
reason = `OpenAI Responses账号认证失败(401错误):${errorData.trim()}`
|
||
} else if (
|
||
errorData.error &&
|
||
typeof errorData.error.message === 'string' &&
|
||
errorData.error.message.trim()
|
||
) {
|
||
reason = `OpenAI Responses账号认证失败(401错误):${errorData.error.message.trim()}`
|
||
} else if (typeof errorData.message === 'string' && errorData.message.trim()) {
|
||
reason = `OpenAI Responses账号认证失败(401错误):${errorData.message.trim()}`
|
||
}
|
||
}
|
||
|
||
try {
|
||
await unifiedOpenAIScheduler.markAccountUnauthorized(
|
||
account.id,
|
||
'openai-responses',
|
||
sessionHash,
|
||
reason
|
||
)
|
||
} catch (markError) {
|
||
logger.error(
|
||
'❌ Failed to mark OpenAI-Responses account unauthorized in catch handler:',
|
||
markError
|
||
)
|
||
}
|
||
|
||
let unauthorizedResponse = errorData
|
||
if (
|
||
!unauthorizedResponse ||
|
||
typeof unauthorizedResponse !== 'object' ||
|
||
unauthorizedResponse.pipe ||
|
||
Buffer.isBuffer(unauthorizedResponse)
|
||
) {
|
||
const fallbackMessage =
|
||
typeof errorData === 'string' && errorData.trim() ? errorData.trim() : 'Unauthorized'
|
||
unauthorizedResponse = {
|
||
error: {
|
||
message: fallbackMessage,
|
||
type: 'unauthorized',
|
||
code: 'unauthorized'
|
||
}
|
||
}
|
||
}
|
||
|
||
return res.status(401).json(unauthorizedResponse)
|
||
}
|
||
|
||
return res.status(status).json(errorData)
|
||
}
|
||
|
||
// 其他错误
|
||
return res.status(500).json({
|
||
error: {
|
||
message: 'Internal server error',
|
||
type: 'internal_error',
|
||
details: error.message
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// 处理流式响应
|
||
async _handleStreamResponse(
|
||
response,
|
||
res,
|
||
account,
|
||
apiKeyData,
|
||
requestedModel,
|
||
handleClientDisconnect,
|
||
req
|
||
) {
|
||
// 设置 SSE 响应头
|
||
res.setHeader('Content-Type', 'text/event-stream')
|
||
res.setHeader('Cache-Control', 'no-cache')
|
||
res.setHeader('Connection', 'keep-alive')
|
||
res.setHeader('X-Accel-Buffering', 'no')
|
||
|
||
let usageData = null
|
||
let actualModel = null
|
||
let buffer = ''
|
||
let rateLimitDetected = false
|
||
let rateLimitResetsInSeconds = null
|
||
let streamEnded = false
|
||
|
||
// 解析 SSE 事件以捕获 usage 数据和 model
|
||
const parseSSEForUsage = (data) => {
|
||
const lines = data.split('\n')
|
||
|
||
for (const line of lines) {
|
||
if (line.startsWith('data: ')) {
|
||
try {
|
||
const jsonStr = line.slice(6)
|
||
if (jsonStr === '[DONE]') {
|
||
continue
|
||
}
|
||
|
||
const eventData = JSON.parse(jsonStr)
|
||
|
||
// 检查是否是 response.completed 事件(OpenAI-Responses 格式)
|
||
if (eventData.type === 'response.completed' && eventData.response) {
|
||
// 从响应中获取真实的 model
|
||
if (eventData.response.model) {
|
||
actualModel = eventData.response.model
|
||
logger.debug(`📊 Captured actual model from response.completed: ${actualModel}`)
|
||
}
|
||
|
||
// 获取 usage 数据 - OpenAI-Responses 格式在 response.usage 下
|
||
if (eventData.response.usage) {
|
||
usageData = eventData.response.usage
|
||
logger.info('📊 Successfully captured usage data from OpenAI-Responses:', {
|
||
input_tokens: usageData.input_tokens,
|
||
output_tokens: usageData.output_tokens,
|
||
total_tokens: usageData.total_tokens
|
||
})
|
||
}
|
||
}
|
||
|
||
// 检查是否有限流错误
|
||
if (eventData.error) {
|
||
// 检查多种可能的限流错误类型
|
||
if (
|
||
eventData.error.type === 'rate_limit_error' ||
|
||
eventData.error.type === 'usage_limit_reached' ||
|
||
eventData.error.type === 'rate_limit_exceeded'
|
||
) {
|
||
rateLimitDetected = true
|
||
if (eventData.error.resets_in_seconds) {
|
||
rateLimitResetsInSeconds = eventData.error.resets_in_seconds
|
||
logger.warn(
|
||
`🚫 Rate limit detected in stream, resets in ${rateLimitResetsInSeconds} seconds (${Math.ceil(rateLimitResetsInSeconds / 60)} minutes)`
|
||
)
|
||
}
|
||
}
|
||
}
|
||
} catch (e) {
|
||
// 忽略解析错误
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 监听数据流
|
||
response.data.on('data', (chunk) => {
|
||
try {
|
||
const chunkStr = chunk.toString()
|
||
|
||
// 转发数据给客户端
|
||
if (!res.destroyed && !streamEnded) {
|
||
res.write(chunk)
|
||
}
|
||
|
||
// 同时解析数据以捕获 usage 信息
|
||
buffer += chunkStr
|
||
|
||
// 处理完整的 SSE 事件
|
||
if (buffer.includes('\n\n')) {
|
||
const events = buffer.split('\n\n')
|
||
buffer = events.pop() || ''
|
||
|
||
for (const event of events) {
|
||
if (event.trim()) {
|
||
parseSSEForUsage(event)
|
||
}
|
||
}
|
||
}
|
||
} catch (error) {
|
||
logger.error('Error processing stream chunk:', error)
|
||
}
|
||
})
|
||
|
||
response.data.on('end', async () => {
|
||
streamEnded = true
|
||
|
||
// 处理剩余的 buffer
|
||
if (buffer.trim()) {
|
||
parseSSEForUsage(buffer)
|
||
}
|
||
|
||
// 记录使用统计
|
||
if (usageData) {
|
||
try {
|
||
// OpenAI-Responses 使用 input_tokens/output_tokens,标准 OpenAI 使用 prompt_tokens/completion_tokens
|
||
const totalInputTokens = usageData.input_tokens || usageData.prompt_tokens || 0
|
||
const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0
|
||
|
||
// 提取缓存相关的 tokens(如果存在)
|
||
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0
|
||
const cacheCreateTokens = extractCacheCreationTokens(usageData)
|
||
// 计算实际输入token(总输入减去缓存部分)
|
||
const actualInputTokens = Math.max(0, totalInputTokens - cacheReadTokens)
|
||
|
||
const totalTokens =
|
||
usageData.total_tokens || totalInputTokens + outputTokens + cacheCreateTokens
|
||
const modelToRecord = actualModel || requestedModel || 'gpt-4'
|
||
|
||
await apiKeyService.recordUsage(
|
||
apiKeyData.id,
|
||
actualInputTokens, // 传递实际输入(不含缓存)
|
||
outputTokens,
|
||
cacheCreateTokens,
|
||
cacheReadTokens,
|
||
modelToRecord,
|
||
account.id
|
||
)
|
||
|
||
logger.info(
|
||
`📊 Recorded usage - Input: ${totalInputTokens}(actual:${actualInputTokens}+cached:${cacheReadTokens}), CacheCreate: ${cacheCreateTokens}, Output: ${outputTokens}, Total: ${totalTokens}, Model: ${modelToRecord}`
|
||
)
|
||
|
||
// 更新账户的 token 使用统计
|
||
await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens)
|
||
|
||
// 更新账户使用额度(如果设置了额度限制)
|
||
if (parseFloat(account.dailyQuota) > 0) {
|
||
// 使用CostCalculator正确计算费用(考虑缓存token的不同价格)
|
||
const CostCalculator = require('../utils/costCalculator')
|
||
const costInfo = CostCalculator.calculateCost(
|
||
{
|
||
input_tokens: actualInputTokens, // 实际输入(不含缓存)
|
||
output_tokens: outputTokens,
|
||
cache_creation_input_tokens: cacheCreateTokens,
|
||
cache_read_input_tokens: cacheReadTokens
|
||
},
|
||
modelToRecord
|
||
)
|
||
await openaiResponsesAccountService.updateUsageQuota(account.id, costInfo.costs.total)
|
||
}
|
||
} catch (error) {
|
||
logger.error('Failed to record usage:', error)
|
||
}
|
||
}
|
||
|
||
// 如果在流式响应中检测到限流
|
||
if (rateLimitDetected) {
|
||
// 使用统一调度器处理限流(与非流式响应保持一致)
|
||
const sessionId = req.headers['session_id'] || req.body?.session_id
|
||
const sessionHash = sessionId
|
||
? crypto.createHash('sha256').update(sessionId).digest('hex')
|
||
: null
|
||
|
||
await unifiedOpenAIScheduler.markAccountRateLimited(
|
||
account.id,
|
||
'openai-responses',
|
||
sessionHash,
|
||
rateLimitResetsInSeconds
|
||
)
|
||
|
||
logger.warn(
|
||
`🚫 Processing rate limit for OpenAI-Responses account ${account.id} from stream`
|
||
)
|
||
}
|
||
|
||
// 清理监听器
|
||
req.removeListener('close', handleClientDisconnect)
|
||
res.removeListener('close', handleClientDisconnect)
|
||
|
||
if (!res.destroyed) {
|
||
res.end()
|
||
}
|
||
|
||
logger.info('Stream response completed', {
|
||
accountId: account.id,
|
||
hasUsage: !!usageData,
|
||
actualModel: actualModel || 'unknown'
|
||
})
|
||
})
|
||
|
||
response.data.on('error', (error) => {
|
||
streamEnded = true
|
||
logger.error('Stream error:', error)
|
||
|
||
// 清理监听器
|
||
req.removeListener('close', handleClientDisconnect)
|
||
res.removeListener('close', handleClientDisconnect)
|
||
|
||
if (!res.headersSent) {
|
||
res.status(502).json({ error: { message: 'Upstream stream error' } })
|
||
} else if (!res.destroyed) {
|
||
res.end()
|
||
}
|
||
})
|
||
|
||
// 处理客户端断开连接
|
||
const cleanup = () => {
|
||
streamEnded = true
|
||
try {
|
||
response.data?.unpipe?.(res)
|
||
response.data?.destroy?.()
|
||
} catch (_) {
|
||
// 忽略清理错误
|
||
}
|
||
}
|
||
|
||
req.on('close', cleanup)
|
||
req.on('aborted', cleanup)
|
||
}
|
||
|
||
// 处理非流式响应
|
||
async _handleNormalResponse(response, res, account, apiKeyData, requestedModel) {
|
||
const responseData = response.data
|
||
|
||
// 提取 usage 数据和实际 model
|
||
// 支持两种格式:直接的 usage 或嵌套在 response 中的 usage
|
||
const usageData = responseData?.usage || responseData?.response?.usage
|
||
const actualModel =
|
||
responseData?.model || responseData?.response?.model || requestedModel || 'gpt-4'
|
||
|
||
// 记录使用统计
|
||
if (usageData) {
|
||
try {
|
||
// OpenAI-Responses 使用 input_tokens/output_tokens,标准 OpenAI 使用 prompt_tokens/completion_tokens
|
||
const totalInputTokens = usageData.input_tokens || usageData.prompt_tokens || 0
|
||
const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0
|
||
|
||
// 提取缓存相关的 tokens(如果存在)
|
||
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0
|
||
const cacheCreateTokens = extractCacheCreationTokens(usageData)
|
||
// 计算实际输入token(总输入减去缓存部分)
|
||
const actualInputTokens = Math.max(0, totalInputTokens - cacheReadTokens)
|
||
|
||
const totalTokens =
|
||
usageData.total_tokens || totalInputTokens + outputTokens + cacheCreateTokens
|
||
|
||
await apiKeyService.recordUsage(
|
||
apiKeyData.id,
|
||
actualInputTokens, // 传递实际输入(不含缓存)
|
||
outputTokens,
|
||
cacheCreateTokens,
|
||
cacheReadTokens,
|
||
actualModel,
|
||
account.id
|
||
)
|
||
|
||
logger.info(
|
||
`📊 Recorded non-stream usage - Input: ${totalInputTokens}(actual:${actualInputTokens}+cached:${cacheReadTokens}), CacheCreate: ${cacheCreateTokens}, Output: ${outputTokens}, Total: ${totalTokens}, Model: ${actualModel}`
|
||
)
|
||
|
||
// 更新账户的 token 使用统计
|
||
await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens)
|
||
|
||
// 更新账户使用额度(如果设置了额度限制)
|
||
if (parseFloat(account.dailyQuota) > 0) {
|
||
// 使用CostCalculator正确计算费用(考虑缓存token的不同价格)
|
||
const CostCalculator = require('../utils/costCalculator')
|
||
const costInfo = CostCalculator.calculateCost(
|
||
{
|
||
input_tokens: actualInputTokens, // 实际输入(不含缓存)
|
||
output_tokens: outputTokens,
|
||
cache_creation_input_tokens: cacheCreateTokens,
|
||
cache_read_input_tokens: cacheReadTokens
|
||
},
|
||
actualModel
|
||
)
|
||
await openaiResponsesAccountService.updateUsageQuota(account.id, costInfo.costs.total)
|
||
}
|
||
} catch (error) {
|
||
logger.error('Failed to record usage:', error)
|
||
}
|
||
}
|
||
|
||
// 返回响应
|
||
res.status(response.status).json(responseData)
|
||
|
||
logger.info('Normal response completed', {
|
||
accountId: account.id,
|
||
status: response.status,
|
||
hasUsage: !!usageData,
|
||
model: actualModel
|
||
})
|
||
}
|
||
|
||
// 处理 429 限流错误
|
||
async _handle429Error(account, response, isStream = false, sessionHash = null) {
|
||
let resetsInSeconds = null
|
||
let errorData = null
|
||
|
||
try {
|
||
// 对于429错误,响应可能是JSON或SSE格式
|
||
if (isStream && response.data && typeof response.data.pipe === 'function') {
|
||
// 流式响应需要先收集数据
|
||
const chunks = []
|
||
await new Promise((resolve, reject) => {
|
||
response.data.on('data', (chunk) => chunks.push(chunk))
|
||
response.data.on('end', resolve)
|
||
response.data.on('error', reject)
|
||
// 设置超时防止无限等待
|
||
setTimeout(resolve, 5000)
|
||
})
|
||
|
||
const fullResponse = Buffer.concat(chunks).toString()
|
||
|
||
// 尝试解析SSE格式的错误响应
|
||
if (fullResponse.includes('data: ')) {
|
||
const lines = fullResponse.split('\n')
|
||
for (const line of lines) {
|
||
if (line.startsWith('data: ')) {
|
||
try {
|
||
const jsonStr = line.slice(6).trim()
|
||
if (jsonStr && jsonStr !== '[DONE]') {
|
||
errorData = JSON.parse(jsonStr)
|
||
break
|
||
}
|
||
} catch (e) {
|
||
// 继续尝试下一行
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 如果SSE解析失败,尝试直接解析为JSON
|
||
if (!errorData) {
|
||
try {
|
||
errorData = JSON.parse(fullResponse)
|
||
} catch (e) {
|
||
logger.error('Failed to parse 429 error response:', e)
|
||
logger.debug('Raw response:', fullResponse)
|
||
}
|
||
}
|
||
} else if (response.data && typeof response.data !== 'object') {
|
||
// 如果response.data是字符串,尝试解析为JSON
|
||
try {
|
||
errorData = JSON.parse(response.data)
|
||
} catch (e) {
|
||
logger.error('Failed to parse 429 error response as JSON:', e)
|
||
errorData = { error: { message: response.data } }
|
||
}
|
||
} else if (response.data && typeof response.data === 'object' && !response.data.pipe) {
|
||
// 非流式响应,且是对象,直接使用
|
||
errorData = response.data
|
||
}
|
||
|
||
// 从响应体中提取重置时间(OpenAI 标准格式)
|
||
if (errorData && errorData.error) {
|
||
if (errorData.error.resets_in_seconds) {
|
||
resetsInSeconds = errorData.error.resets_in_seconds
|
||
logger.info(
|
||
`🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)`
|
||
)
|
||
} else if (errorData.error.resets_in) {
|
||
// 某些 API 可能使用不同的字段名
|
||
resetsInSeconds = parseInt(errorData.error.resets_in)
|
||
logger.info(
|
||
`🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)`
|
||
)
|
||
}
|
||
}
|
||
|
||
if (!resetsInSeconds) {
|
||
logger.warn('⚠️ Could not extract reset time from 429 response, using default 60 minutes')
|
||
}
|
||
} catch (e) {
|
||
logger.error('⚠️ Failed to parse rate limit error:', e)
|
||
}
|
||
|
||
// 使用统一调度器标记账户为限流状态(与普通OpenAI账号保持一致)
|
||
await unifiedOpenAIScheduler.markAccountRateLimited(
|
||
account.id,
|
||
'openai-responses',
|
||
sessionHash,
|
||
resetsInSeconds
|
||
)
|
||
|
||
logger.warn('OpenAI-Responses account rate limited', {
|
||
accountId: account.id,
|
||
accountName: account.name,
|
||
resetsInSeconds: resetsInSeconds || 'unknown',
|
||
resetInMinutes: resetsInSeconds ? Math.ceil(resetsInSeconds / 60) : 60,
|
||
resetInHours: resetsInSeconds ? Math.ceil(resetsInSeconds / 3600) : 1
|
||
})
|
||
|
||
// 返回处理后的数据,避免循环引用
|
||
return { resetsInSeconds, errorData }
|
||
}
|
||
|
||
// 过滤请求头 - 已迁移到 headerFilter 工具类
|
||
// 此方法保留用于向后兼容,实际使用 filterForOpenAI()
|
||
_filterRequestHeaders(headers) {
|
||
return filterForOpenAI(headers)
|
||
}
|
||
|
||
// 估算费用(简化版本,实际应该根据不同的定价模型)
|
||
_estimateCost(model, inputTokens, outputTokens) {
|
||
// 这是一个简化的费用估算,实际应该根据不同的 API 提供商和模型定价
|
||
const rates = {
|
||
'gpt-4': { input: 0.03, output: 0.06 }, // per 1K tokens
|
||
'gpt-4-turbo': { input: 0.01, output: 0.03 },
|
||
'gpt-3.5-turbo': { input: 0.0005, output: 0.0015 },
|
||
'claude-3-opus': { input: 0.015, output: 0.075 },
|
||
'claude-3-sonnet': { input: 0.003, output: 0.015 },
|
||
'claude-3-haiku': { input: 0.00025, output: 0.00125 }
|
||
}
|
||
|
||
// 查找匹配的模型定价
|
||
let rate = rates['gpt-3.5-turbo'] // 默认使用 GPT-3.5 的价格
|
||
for (const [modelKey, modelRate] of Object.entries(rates)) {
|
||
if (model.toLowerCase().includes(modelKey.toLowerCase())) {
|
||
rate = modelRate
|
||
break
|
||
}
|
||
}
|
||
|
||
const inputCost = (inputTokens / 1000) * rate.input
|
||
const outputCost = (outputTokens / 1000) * rate.output
|
||
return inputCost + outputCost
|
||
}
|
||
}
|
||
|
||
module.exports = new OpenAIResponsesRelayService()
|