Files
claude-relay-service/src/services/ccrRelayService.js
QTom f5d1c25295 feat: 添加用户消息串行队列功能,防止同账户并发请求触发限流
- 新增 userMessageQueueService.js 实现基于 Redis 的队列锁机制
- 在 claudeRelayService、claudeConsoleRelayService、bedrockRelayService、ccrRelayService 中集成队列锁
- 添加 Redis 原子性 Lua 脚本:acquireUserMessageLock、releaseUserMessageLock、refreshUserMessageLock
- 支持锁续租机制,防止长时间请求锁过期
- 添加可配置参数:USER_MESSAGE_QUEUE_ENABLED、USER_MESSAGE_QUEUE_DELAY_MS、USER_MESSAGE_QUEUE_TIMEOUT_MS
- 添加 Web 管理界面配置入口
- 添加 logger.performance 方法用于结构化性能日志
- 添加完整单元测试 (tests/userMessageQueue.test.js)
2025-12-09 17:04:01 +08:00

813 lines
29 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const axios = require('axios')
const ccrAccountService = require('./ccrAccountService')
const logger = require('../utils/logger')
const config = require('../../config/config')
const { parseVendorPrefixedModel } = require('../utils/modelHelper')
const userMessageQueueService = require('./userMessageQueueService')
class CcrRelayService {
constructor() {
this.defaultUserAgent = 'claude-relay-service/1.0.0'
}
// 🚀 转发请求到CCR API
async relayRequest(
requestBody,
apiKeyData,
clientRequest,
clientResponse,
clientHeaders,
accountId,
options = {}
) {
let abortController = null
let account = null
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理
if (userMessageQueueService.isUserMessageRequest(requestBody)) {
// 校验 accountId 非空,避免空值污染队列锁键
if (!accountId || accountId === '') {
logger.error('❌ accountId missing for queue lock in CCR relayRequest')
throw new Error('accountId missing for queue lock')
}
const queueResult = await userMessageQueueService.acquireQueueLock(accountId)
if (!queueResult.acquired && !queueResult.skipped) {
// 区分 Redis 后端错误和队列超时
const isBackendError = queueResult.error === 'queue_backend_error'
const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT'
const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout'
const errorMessage = isBackendError
? 'Queue service temporarily unavailable, please retry later'
: 'User message queue wait timeout, please retry later'
const statusCode = isBackendError ? 500 : 503
// 结构化性能日志,用于后续统计
logger.performance('user_message_queue_error', {
errorType,
errorCode,
accountId,
statusCode,
backendError: isBackendError ? queueResult.errorMessage : undefined
})
logger.warn(
`📬 User message queue ${errorType} for CCR account ${accountId}`,
isBackendError ? { backendError: queueResult.errorMessage } : {}
)
return {
statusCode,
headers: {
'Content-Type': 'application/json',
'x-user-message-queue-error': errorType
},
body: JSON.stringify({
type: 'error',
error: {
type: errorType,
code: errorCode,
message: errorMessage
}
}),
accountId
}
}
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
}
}
// 获取账户信息
account = await ccrAccountService.getAccount(accountId)
if (!account) {
throw new Error('CCR account not found')
}
logger.info(
`📤 Processing CCR API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})`
)
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`)
logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`)
logger.debug(`📝 Request model: ${requestBody.model}`)
// 处理模型前缀解析和映射
const { baseModel } = parseVendorPrefixedModel(requestBody.model)
logger.debug(`🔄 Parsed base model: ${baseModel} from original: ${requestBody.model}`)
let mappedModel = baseModel
if (
account.supportedModels &&
typeof account.supportedModels === 'object' &&
!Array.isArray(account.supportedModels)
) {
const newModel = ccrAccountService.getMappedModel(account.supportedModels, baseModel)
if (newModel !== baseModel) {
logger.info(`🔄 Mapping model from ${baseModel} to ${newModel}`)
mappedModel = newModel
}
}
// 创建修改后的请求体,使用去前缀后的模型名
const modifiedRequestBody = {
...requestBody,
model: mappedModel
}
// 创建代理agent
const proxyAgent = ccrAccountService._createProxyAgent(account.proxy)
// 创建AbortController用于取消请求
abortController = new AbortController()
// 设置客户端断开监听器
const handleClientDisconnect = () => {
logger.info('🔌 Client disconnected, aborting CCR request')
if (abortController && !abortController.signal.aborted) {
abortController.abort()
}
}
// 监听客户端断开事件
if (clientRequest) {
clientRequest.once('close', handleClientDisconnect)
}
if (clientResponse) {
clientResponse.once('close', handleClientDisconnect)
}
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠
let apiEndpoint
if (options.customPath) {
// 如果指定了自定义路径(如 count_tokens使用它
const baseUrl = cleanUrl.replace(/\/v1\/messages$/, '') // 移除已有的 /v1/messages
apiEndpoint = `${baseUrl}${options.customPath}`
} else {
// 默认使用 messages 端点
apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
}
logger.debug(`🎯 Final API endpoint: ${apiEndpoint}`)
logger.debug(`[DEBUG] Options passed to relayRequest: ${JSON.stringify(options)}`)
logger.debug(`[DEBUG] Client headers received: ${JSON.stringify(clientHeaders)}`)
// 过滤客户端请求头
const filteredHeaders = this._filterClientHeaders(clientHeaders)
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`)
// 决定使用的 User-Agent优先使用账户自定义的否则透传客户端的最后才使用默认值
const userAgent =
account.userAgent ||
clientHeaders?.['user-agent'] ||
clientHeaders?.['User-Agent'] ||
this.defaultUserAgent
// 准备请求配置
const requestConfig = {
method: 'POST',
url: apiEndpoint,
data: modifiedRequestBody,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': userAgent,
...filteredHeaders
},
timeout: config.requestTimeout || 600000,
signal: abortController.signal,
validateStatus: () => true // 接受所有状态码
}
if (proxyAgent) {
requestConfig.httpAgent = proxyAgent
requestConfig.httpsAgent = proxyAgent
requestConfig.proxy = false
}
// 根据 API Key 格式选择认证方式
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) {
// Anthropic 官方 API Key 使用 x-api-key
requestConfig.headers['x-api-key'] = account.apiKey
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key')
} else {
// 其他 API Key (包括CCR API Key) 使用 Authorization Bearer
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}`
logger.debug('[DEBUG] Using Authorization Bearer authentication')
}
logger.debug(
`[DEBUG] Initial headers before beta: ${JSON.stringify(requestConfig.headers, null, 2)}`
)
// 添加beta header如果需要
if (options.betaHeader) {
logger.debug(`[DEBUG] Adding beta header: ${options.betaHeader}`)
requestConfig.headers['anthropic-beta'] = options.betaHeader
} else {
logger.debug('[DEBUG] No beta header to add')
}
// 发送请求
logger.debug(
'📤 Sending request to CCR API with headers:',
JSON.stringify(requestConfig.headers, null, 2)
)
const response = await axios(requestConfig)
// 移除监听器(请求成功完成)
if (clientRequest) {
clientRequest.removeListener('close', handleClientDisconnect)
}
if (clientResponse) {
clientResponse.removeListener('close', handleClientDisconnect)
}
logger.debug(`🔗 CCR API response: ${response.status}`)
logger.debug(`[DEBUG] Response headers: ${JSON.stringify(response.headers)}`)
logger.debug(`[DEBUG] Response data type: ${typeof response.data}`)
logger.debug(
`[DEBUG] Response data length: ${response.data ? (typeof response.data === 'string' ? response.data.length : JSON.stringify(response.data).length) : 0}`
)
logger.debug(
`[DEBUG] Response data preview: ${typeof response.data === 'string' ? response.data.substring(0, 200) : JSON.stringify(response.data).substring(0, 200)}`
)
// 检查错误状态并相应处理
if (response.status === 401) {
logger.warn(`🚫 Unauthorized error detected for CCR account ${accountId}`)
await ccrAccountService.markAccountUnauthorized(accountId)
} else if (response.status === 429) {
logger.warn(`🚫 Rate limit detected for CCR account ${accountId}`)
// 收到429先检查是否因为超过了手动配置的每日额度
await ccrAccountService.checkQuotaUsage(accountId).catch((err) => {
logger.error('❌ Failed to check quota after 429 error:', err)
})
await ccrAccountService.markAccountRateLimited(accountId)
} else if (response.status === 529) {
logger.warn(`🚫 Overload error detected for CCR account ${accountId}`)
await ccrAccountService.markAccountOverloaded(accountId)
} else if (response.status === 200 || response.status === 201) {
// 如果请求成功,检查并移除错误状态
const isRateLimited = await ccrAccountService.isAccountRateLimited(accountId)
if (isRateLimited) {
await ccrAccountService.removeAccountRateLimit(accountId)
}
const isOverloaded = await ccrAccountService.isAccountOverloaded(accountId)
if (isOverloaded) {
await ccrAccountService.removeAccountOverload(accountId)
}
}
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
const responseBody =
typeof response.data === 'string' ? response.data : JSON.stringify(response.data)
logger.debug(`[DEBUG] Final response body to return: ${responseBody}`)
return {
statusCode: response.status,
headers: response.headers,
body: responseBody,
accountId
}
} catch (error) {
// 处理特定错误
if (error.name === 'AbortError' || error.code === 'ECONNABORTED') {
logger.info('Request aborted due to client disconnect')
throw new Error('Client disconnected')
}
logger.error(
`❌ CCR relay request failed (Account: ${account?.name || accountId}):`,
error.message
)
throw error
} finally {
// 📬 释放用户消息队列锁
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for CCR account ${accountId}:`,
releaseError.message
)
}
}
}
}
// 🌊 处理流式响应
async relayStreamRequestWithUsageCapture(
requestBody,
apiKeyData,
responseStream,
clientHeaders,
usageCallback,
accountId,
streamTransformer = null,
options = {}
) {
let account = null
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理
if (userMessageQueueService.isUserMessageRequest(requestBody)) {
// 校验 accountId 非空,避免空值污染队列锁键
if (!accountId || accountId === '') {
logger.error(
'❌ accountId missing for queue lock in CCR relayStreamRequestWithUsageCapture'
)
throw new Error('accountId missing for queue lock')
}
const queueResult = await userMessageQueueService.acquireQueueLock(accountId)
if (!queueResult.acquired && !queueResult.skipped) {
// 区分 Redis 后端错误和队列超时
const isBackendError = queueResult.error === 'queue_backend_error'
const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT'
const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout'
const errorMessage = isBackendError
? 'Queue service temporarily unavailable, please retry later'
: 'User message queue wait timeout, please retry later'
const statusCode = isBackendError ? 500 : 503
// 结构化性能日志用于后续<E5908E><E7BBAD>
logger.performance('user_message_queue_error', {
errorType,
errorCode,
accountId,
statusCode,
stream: true,
backendError: isBackendError ? queueResult.errorMessage : undefined
})
logger.warn(
`📬 User message queue ${errorType} for CCR account ${accountId} (stream)`,
isBackendError ? { backendError: queueResult.errorMessage } : {}
)
if (!responseStream.headersSent) {
responseStream.writeHead(statusCode, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'x-user-message-queue-error': errorType
})
}
const errorEvent = `event: error\ndata: ${JSON.stringify({
type: 'error',
error: {
type: errorType,
code: errorCode,
message: errorMessage
}
})}\n\n`
responseStream.write(errorEvent)
responseStream.write('data: [DONE]\n\n')
responseStream.end()
return
}
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
}
}
// 获取账户信息
account = await ccrAccountService.getAccount(accountId)
if (!account) {
throw new Error('CCR account not found')
}
logger.info(
`📡 Processing streaming CCR API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})`
)
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
// 处理模型前缀解析和映射
const { baseModel } = parseVendorPrefixedModel(requestBody.model)
logger.debug(`🔄 Parsed base model: ${baseModel} from original: ${requestBody.model}`)
let mappedModel = baseModel
if (
account.supportedModels &&
typeof account.supportedModels === 'object' &&
!Array.isArray(account.supportedModels)
) {
const newModel = ccrAccountService.getMappedModel(account.supportedModels, baseModel)
if (newModel !== baseModel) {
logger.info(`🔄 [Stream] Mapping model from ${baseModel} to ${newModel}`)
mappedModel = newModel
}
}
// 创建修改后的请求体,使用去前缀后的模型名
const modifiedRequestBody = {
...requestBody,
model: mappedModel
}
// 创建代理agent
const proxyAgent = ccrAccountService._createProxyAgent(account.proxy)
// 发送流式请求
await this._makeCcrStreamRequest(
modifiedRequestBody,
account,
proxyAgent,
clientHeaders,
responseStream,
accountId,
usageCallback,
streamTransformer,
options
)
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
} catch (error) {
logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error)
throw error
} finally {
// 📬 释放用户消息队列锁
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for CCR stream account ${accountId}:`,
releaseError.message
)
}
}
}
}
// 🌊 发送流式请求到CCR API
async _makeCcrStreamRequest(
body,
account,
proxyAgent,
clientHeaders,
responseStream,
accountId,
usageCallback,
streamTransformer = null,
requestOptions = {}
) {
return new Promise((resolve, reject) => {
let aborted = false
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠
const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
logger.debug(`🎯 Final API endpoint for stream: ${apiEndpoint}`)
// 过滤客户端请求头
const filteredHeaders = this._filterClientHeaders(clientHeaders)
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`)
// 决定使用的 User-Agent优先使用账户自定义的否则透传客户端的最后才使用默认值
const userAgent =
account.userAgent ||
clientHeaders?.['user-agent'] ||
clientHeaders?.['User-Agent'] ||
this.defaultUserAgent
// 准备请求配置
const requestConfig = {
method: 'POST',
url: apiEndpoint,
data: body,
headers: {
'Content-Type': 'application/json',
'anthropic-version': '2023-06-01',
'User-Agent': userAgent,
...filteredHeaders
},
timeout: config.requestTimeout || 600000,
responseType: 'stream',
validateStatus: () => true // 接受所有状态码
}
if (proxyAgent) {
requestConfig.httpAgent = proxyAgent
requestConfig.httpsAgent = proxyAgent
requestConfig.proxy = false
}
// 根据 API Key 格式选择认证方式
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) {
// Anthropic 官方 API Key 使用 x-api-key
requestConfig.headers['x-api-key'] = account.apiKey
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key')
} else {
// 其他 API Key (包括CCR API Key) 使用 Authorization Bearer
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}`
logger.debug('[DEBUG] Using Authorization Bearer authentication')
}
// 添加beta header如果需要
if (requestOptions.betaHeader) {
requestConfig.headers['anthropic-beta'] = requestOptions.betaHeader
}
// 发送请求
const request = axios(requestConfig)
request
.then((response) => {
logger.debug(`🌊 CCR stream response status: ${response.status}`)
// 错误响应处理
if (response.status !== 200) {
logger.error(
`❌ CCR API returned error status: ${response.status} | Account: ${account?.name || accountId}`
)
if (response.status === 401) {
ccrAccountService.markAccountUnauthorized(accountId)
} else if (response.status === 429) {
ccrAccountService.markAccountRateLimited(accountId)
// 检查是否因为超过每日额度
ccrAccountService.checkQuotaUsage(accountId).catch((err) => {
logger.error('❌ Failed to check quota after 429 error:', err)
})
} else if (response.status === 529) {
ccrAccountService.markAccountOverloaded(accountId)
}
// 设置错误响应的状态码和响应头
if (!responseStream.headersSent) {
const errorHeaders = {
'Content-Type': response.headers['content-type'] || 'application/json',
'Cache-Control': 'no-cache',
Connection: 'keep-alive'
}
// 避免 Transfer-Encoding 冲突,让 Express 自动处理
delete errorHeaders['Transfer-Encoding']
delete errorHeaders['Content-Length']
responseStream.writeHead(response.status, errorHeaders)
}
// 直接透传错误数据,不进行包装
response.data.on('data', (chunk) => {
if (!responseStream.destroyed) {
responseStream.write(chunk)
}
})
response.data.on('end', () => {
if (!responseStream.destroyed) {
responseStream.end()
}
resolve() // 不抛出异常,正常完成流处理
})
return
}
// 成功响应,检查并移除错误状态
ccrAccountService.isAccountRateLimited(accountId).then((isRateLimited) => {
if (isRateLimited) {
ccrAccountService.removeAccountRateLimit(accountId)
}
})
ccrAccountService.isAccountOverloaded(accountId).then((isOverloaded) => {
if (isOverloaded) {
ccrAccountService.removeAccountOverload(accountId)
}
})
// 设置响应头
if (!responseStream.headersSent) {
const headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
}
responseStream.writeHead(200, headers)
}
// 处理流数据和使用统计收集
let rawBuffer = ''
const collectedUsage = {}
response.data.on('data', (chunk) => {
if (aborted || responseStream.destroyed) {
return
}
try {
const chunkStr = chunk.toString('utf8')
rawBuffer += chunkStr
// 按行分割处理 SSE 数据
const lines = rawBuffer.split('\n')
rawBuffer = lines.pop() // 保留最后一个可能不完整的行
for (const line of lines) {
if (line.trim()) {
// 解析 SSE 数据并收集使用统计
const usageData = this._parseSSELineForUsage(line)
if (usageData) {
Object.assign(collectedUsage, usageData)
}
// 应用流转换器(如果提供)
let outputLine = line
if (streamTransformer && typeof streamTransformer === 'function') {
outputLine = streamTransformer(line)
}
// 写入到响应流
if (outputLine && !responseStream.destroyed) {
responseStream.write(`${outputLine}\n`)
}
} else {
// 空行也需要传递
if (!responseStream.destroyed) {
responseStream.write('\n')
}
}
}
} catch (err) {
logger.error('❌ Error processing SSE chunk:', err)
}
})
response.data.on('end', () => {
if (!responseStream.destroyed) {
responseStream.end()
}
// 如果收集到使用统计数据,调用回调
if (usageCallback && Object.keys(collectedUsage).length > 0) {
try {
logger.debug(`📊 Collected usage data: ${JSON.stringify(collectedUsage)}`)
// 在 usage 回调中包含模型信息
usageCallback({ ...collectedUsage, accountId, model: body.model })
} catch (err) {
logger.error('❌ Error in usage callback:', err)
}
}
resolve()
})
response.data.on('error', (err) => {
logger.error('❌ Stream data error:', err)
if (!responseStream.destroyed) {
responseStream.end()
}
reject(err)
})
// 客户端断开处理
responseStream.on('close', () => {
logger.info('🔌 Client disconnected from CCR stream')
aborted = true
if (response.data && typeof response.data.destroy === 'function') {
response.data.destroy()
}
})
responseStream.on('error', (err) => {
logger.error('❌ Response stream error:', err)
aborted = true
})
})
.catch((error) => {
if (!responseStream.headersSent) {
responseStream.writeHead(500, { 'Content-Type': 'application/json' })
}
const errorResponse = {
error: {
type: 'internal_error',
message: 'CCR API request failed'
}
}
if (!responseStream.destroyed) {
responseStream.write(`data: ${JSON.stringify(errorResponse)}\n\n`)
responseStream.end()
}
reject(error)
})
})
}
// 📊 解析SSE行以提取使用统计信息
_parseSSELineForUsage(line) {
try {
if (line.startsWith('data: ')) {
const data = line.substring(6).trim()
if (data === '[DONE]') {
return null
}
const jsonData = JSON.parse(data)
// 检查是否包含使用统计信息
if (jsonData.usage) {
return {
input_tokens: jsonData.usage.input_tokens || 0,
output_tokens: jsonData.usage.output_tokens || 0,
cache_creation_input_tokens: jsonData.usage.cache_creation_input_tokens || 0,
cache_read_input_tokens: jsonData.usage.cache_read_input_tokens || 0,
// 支持 ephemeral cache 字段
cache_creation_input_tokens_ephemeral_5m:
jsonData.usage.cache_creation_input_tokens_ephemeral_5m || 0,
cache_creation_input_tokens_ephemeral_1h:
jsonData.usage.cache_creation_input_tokens_ephemeral_1h || 0
}
}
// 检查 message_delta 事件中的使用统计
if (jsonData.type === 'message_delta' && jsonData.delta && jsonData.delta.usage) {
return {
input_tokens: jsonData.delta.usage.input_tokens || 0,
output_tokens: jsonData.delta.usage.output_tokens || 0,
cache_creation_input_tokens: jsonData.delta.usage.cache_creation_input_tokens || 0,
cache_read_input_tokens: jsonData.delta.usage.cache_read_input_tokens || 0,
cache_creation_input_tokens_ephemeral_5m:
jsonData.delta.usage.cache_creation_input_tokens_ephemeral_5m || 0,
cache_creation_input_tokens_ephemeral_1h:
jsonData.delta.usage.cache_creation_input_tokens_ephemeral_1h || 0
}
}
}
} catch (err) {
// 忽略解析错误,不是所有行都包含 JSON
}
return null
}
// 🔍 过滤客户端请求头
_filterClientHeaders(clientHeaders) {
if (!clientHeaders) {
return {}
}
const filteredHeaders = {}
const allowedHeaders = [
'accept-language',
'anthropic-beta',
'anthropic-dangerous-direct-browser-access'
]
// 只保留允许的头部信息
for (const [key, value] of Object.entries(clientHeaders)) {
const lowerKey = key.toLowerCase()
if (allowedHeaders.includes(lowerKey)) {
filteredHeaders[key] = value
}
}
return filteredHeaders
}
// ⏰ 更新账户最后使用时间
async _updateLastUsedTime(accountId) {
try {
const redis = require('../models/redis')
const client = redis.getClientSafe()
await client.hset(`ccr_account:${accountId}`, 'lastUsedAt', new Date().toISOString())
} catch (error) {
logger.error(`❌ Failed to update last used time for CCR account ${accountId}:`, error)
}
}
}
module.exports = new CcrRelayService()