Merge branch 'main' into um-5

This commit is contained in:
Feng Yue
2025-08-25 17:19:24 +08:00
74 changed files with 9466 additions and 1901 deletions

View File

@@ -20,6 +20,7 @@ class ApiKeyService {
claudeConsoleAccountId = null,
geminiAccountId = null,
openaiAccountId = null,
azureOpenaiAccountId = null,
bedrockAccountId = null, // 添加 Bedrock 账号ID支持
permissions = 'all', // 'claude', 'gemini', 'openai', 'all'
isActive = true,
@@ -53,6 +54,7 @@ class ApiKeyService {
claudeConsoleAccountId: claudeConsoleAccountId || '',
geminiAccountId: geminiAccountId || '',
openaiAccountId: openaiAccountId || '',
azureOpenaiAccountId: azureOpenaiAccountId || '',
bedrockAccountId: bedrockAccountId || '', // 添加 Bedrock 账号ID
permissions: permissions || 'all',
enableModelRestriction: String(enableModelRestriction),
@@ -88,6 +90,7 @@ class ApiKeyService {
claudeConsoleAccountId: keyData.claudeConsoleAccountId,
geminiAccountId: keyData.geminiAccountId,
openaiAccountId: keyData.openaiAccountId,
azureOpenaiAccountId: keyData.azureOpenaiAccountId,
bedrockAccountId: keyData.bedrockAccountId, // 添加 Bedrock 账号ID
permissions: keyData.permissions,
enableModelRestriction: keyData.enableModelRestriction === 'true',
@@ -190,6 +193,7 @@ class ApiKeyService {
claudeConsoleAccountId: keyData.claudeConsoleAccountId,
geminiAccountId: keyData.geminiAccountId,
openaiAccountId: keyData.openaiAccountId,
azureOpenaiAccountId: keyData.azureOpenaiAccountId,
bedrockAccountId: keyData.bedrockAccountId, // 添加 Bedrock 账号ID
permissions: keyData.permissions || 'all',
tokenLimit: parseInt(keyData.tokenLimit),
@@ -337,6 +341,7 @@ class ApiKeyService {
'claudeConsoleAccountId',
'geminiAccountId',
'openaiAccountId',
'azureOpenaiAccountId',
'bedrockAccountId', // 添加 Bedrock 账号ID
'permissions',
'expiresAt',

View File

@@ -0,0 +1,479 @@
const redisClient = require('../models/redis')
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const config = require('../../config/config')
const logger = require('../utils/logger')
// 加密相关常量
const ALGORITHM = 'aes-256-cbc'
const IV_LENGTH = 16
// 🚀 安全的加密密钥生成支持动态salt
const ENCRYPTION_SALT = config.security?.azureOpenaiSalt || 'azure-openai-account-default-salt'
class EncryptionKeyManager {
constructor() {
this.keyCache = new Map()
this.keyRotationInterval = 24 * 60 * 60 * 1000 // 24小时
}
getKey(version = 'current') {
const cached = this.keyCache.get(version)
if (cached && Date.now() - cached.timestamp < this.keyRotationInterval) {
return cached.key
}
// 生成新密钥
const key = crypto.scryptSync(config.security.encryptionKey, ENCRYPTION_SALT, 32)
this.keyCache.set(version, {
key,
timestamp: Date.now()
})
logger.debug('🔑 Azure OpenAI encryption key generated/refreshed')
return key
}
// 清理过期密钥
cleanup() {
const now = Date.now()
for (const [version, cached] of this.keyCache.entries()) {
if (now - cached.timestamp > this.keyRotationInterval) {
this.keyCache.delete(version)
}
}
}
}
const encryptionKeyManager = new EncryptionKeyManager()
// 定期清理过期密钥
setInterval(
() => {
encryptionKeyManager.cleanup()
},
60 * 60 * 1000
) // 每小时清理一次
// 生成加密密钥 - 使用安全的密钥管理器
function generateEncryptionKey() {
return encryptionKeyManager.getKey()
}
// Azure OpenAI 账户键前缀
const AZURE_OPENAI_ACCOUNT_KEY_PREFIX = 'azure_openai:account:'
const SHARED_AZURE_OPENAI_ACCOUNTS_KEY = 'shared_azure_openai_accounts'
const ACCOUNT_SESSION_MAPPING_PREFIX = 'azure_openai_session_account_mapping:'
// 加密函数
function encrypt(text) {
if (!text) {
return ''
}
const key = generateEncryptionKey()
const iv = crypto.randomBytes(IV_LENGTH)
const cipher = crypto.createCipheriv(ALGORITHM, key, iv)
let encrypted = cipher.update(text)
encrypted = Buffer.concat([encrypted, cipher.final()])
return `${iv.toString('hex')}:${encrypted.toString('hex')}`
}
// 解密函数 - 移除缓存以提高安全性
function decrypt(text) {
if (!text) {
return ''
}
try {
const key = generateEncryptionKey()
// IV 是固定长度的 32 个十六进制字符16 字节)
const ivHex = text.substring(0, 32)
const encryptedHex = text.substring(33) // 跳过冒号
if (ivHex.length !== 32 || !encryptedHex) {
throw new Error('Invalid encrypted text format')
}
const iv = Buffer.from(ivHex, 'hex')
const encryptedText = Buffer.from(encryptedHex, 'hex')
const decipher = crypto.createDecipheriv(ALGORITHM, key, iv)
let decrypted = decipher.update(encryptedText)
decrypted = Buffer.concat([decrypted, decipher.final()])
const result = decrypted.toString()
return result
} catch (error) {
logger.error('Azure OpenAI decryption error:', error.message)
return ''
}
}
// 创建账户
async function createAccount(accountData) {
const accountId = uuidv4()
const now = new Date().toISOString()
const account = {
id: accountId,
name: accountData.name,
description: accountData.description || '',
accountType: accountData.accountType || 'shared',
groupId: accountData.groupId || null,
priority: accountData.priority || 50,
// Azure OpenAI 特有字段
azureEndpoint: accountData.azureEndpoint || '',
apiVersion: accountData.apiVersion || '2024-02-01', // 使用稳定版本
deploymentName: accountData.deploymentName || 'gpt-4', // 使用默认部署名称
apiKey: encrypt(accountData.apiKey || ''),
// 支持的模型
supportedModels: JSON.stringify(
accountData.supportedModels || ['gpt-4', 'gpt-4-turbo', 'gpt-35-turbo', 'gpt-35-turbo-16k']
),
// 状态字段
isActive: accountData.isActive !== false ? 'true' : 'false',
status: 'active',
schedulable: accountData.schedulable !== false ? 'true' : 'false',
createdAt: now,
updatedAt: now
}
// 代理配置
if (accountData.proxy) {
account.proxy =
typeof accountData.proxy === 'string' ? accountData.proxy : JSON.stringify(accountData.proxy)
}
const client = redisClient.getClientSafe()
await client.hset(`${AZURE_OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`, account)
// 如果是共享账户,添加到共享账户集合
if (account.accountType === 'shared') {
await client.sadd(SHARED_AZURE_OPENAI_ACCOUNTS_KEY, accountId)
}
logger.info(`Created Azure OpenAI account: ${accountId}`)
return account
}
// 获取账户
async function getAccount(accountId) {
const client = redisClient.getClientSafe()
const accountData = await client.hgetall(`${AZURE_OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`)
if (!accountData || Object.keys(accountData).length === 0) {
return null
}
// 解密敏感数据(仅用于内部处理,不返回给前端)
if (accountData.apiKey) {
accountData.apiKey = decrypt(accountData.apiKey)
}
// 解析代理配置
if (accountData.proxy && typeof accountData.proxy === 'string') {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
// 解析支持的模型
if (accountData.supportedModels && typeof accountData.supportedModels === 'string') {
try {
accountData.supportedModels = JSON.parse(accountData.supportedModels)
} catch (e) {
accountData.supportedModels = ['gpt-4', 'gpt-35-turbo']
}
}
return accountData
}
// 更新账户
async function updateAccount(accountId, updates) {
const existingAccount = await getAccount(accountId)
if (!existingAccount) {
throw new Error('Account not found')
}
updates.updatedAt = new Date().toISOString()
// 加密敏感数据
if (updates.apiKey) {
updates.apiKey = encrypt(updates.apiKey)
}
// 处理代理配置
if (updates.proxy) {
updates.proxy =
typeof updates.proxy === 'string' ? updates.proxy : JSON.stringify(updates.proxy)
}
// 处理支持的模型
if (updates.supportedModels) {
updates.supportedModels =
typeof updates.supportedModels === 'string'
? updates.supportedModels
: JSON.stringify(updates.supportedModels)
}
// 更新账户类型时处理共享账户集合
const client = redisClient.getClientSafe()
if (updates.accountType && updates.accountType !== existingAccount.accountType) {
if (updates.accountType === 'shared') {
await client.sadd(SHARED_AZURE_OPENAI_ACCOUNTS_KEY, accountId)
} else {
await client.srem(SHARED_AZURE_OPENAI_ACCOUNTS_KEY, accountId)
}
}
await client.hset(`${AZURE_OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`, updates)
logger.info(`Updated Azure OpenAI account: ${accountId}`)
// 合并更新后的账户数据
const updatedAccount = { ...existingAccount, ...updates }
// 返回时解析代理配置
if (updatedAccount.proxy && typeof updatedAccount.proxy === 'string') {
try {
updatedAccount.proxy = JSON.parse(updatedAccount.proxy)
} catch (e) {
updatedAccount.proxy = null
}
}
return updatedAccount
}
// 删除账户
async function deleteAccount(accountId) {
const client = redisClient.getClientSafe()
const accountKey = `${AZURE_OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`
// 从Redis中删除账户数据
await client.del(accountKey)
// 从共享账户集合中移除
await client.srem(SHARED_AZURE_OPENAI_ACCOUNTS_KEY, accountId)
logger.info(`Deleted Azure OpenAI account: ${accountId}`)
return true
}
// 获取所有账户
async function getAllAccounts() {
const client = redisClient.getClientSafe()
const keys = await client.keys(`${AZURE_OPENAI_ACCOUNT_KEY_PREFIX}*`)
if (!keys || keys.length === 0) {
return []
}
const accounts = []
for (const key of keys) {
const accountData = await client.hgetall(key)
if (accountData && Object.keys(accountData).length > 0) {
// 不返回敏感数据给前端
delete accountData.apiKey
// 解析代理配置
if (accountData.proxy && typeof accountData.proxy === 'string') {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
// 解析支持的模型
if (accountData.supportedModels && typeof accountData.supportedModels === 'string') {
try {
accountData.supportedModels = JSON.parse(accountData.supportedModels)
} catch (e) {
accountData.supportedModels = ['gpt-4', 'gpt-35-turbo']
}
}
accounts.push(accountData)
}
}
return accounts
}
// 获取共享账户
async function getSharedAccounts() {
const client = redisClient.getClientSafe()
const accountIds = await client.smembers(SHARED_AZURE_OPENAI_ACCOUNTS_KEY)
if (!accountIds || accountIds.length === 0) {
return []
}
const accounts = []
for (const accountId of accountIds) {
const account = await getAccount(accountId)
if (account && account.isActive === 'true') {
accounts.push(account)
}
}
return accounts
}
// 选择可用账户
async function selectAvailableAccount(sessionId = null) {
// 如果有会话ID尝试获取之前分配的账户
if (sessionId) {
const client = redisClient.getClientSafe()
const mappingKey = `${ACCOUNT_SESSION_MAPPING_PREFIX}${sessionId}`
const accountId = await client.get(mappingKey)
if (accountId) {
const account = await getAccount(accountId)
if (account && account.isActive === 'true' && account.schedulable === 'true') {
logger.debug(`Reusing Azure OpenAI account ${accountId} for session ${sessionId}`)
return account
}
}
}
// 获取所有共享账户
const sharedAccounts = await getSharedAccounts()
// 过滤出可用的账户
const availableAccounts = sharedAccounts.filter(
(acc) => acc.isActive === 'true' && acc.schedulable === 'true'
)
if (availableAccounts.length === 0) {
throw new Error('No available Azure OpenAI accounts')
}
// 按优先级排序并选择
availableAccounts.sort((a, b) => (b.priority || 50) - (a.priority || 50))
const selectedAccount = availableAccounts[0]
// 如果有会话ID保存映射关系
if (sessionId && selectedAccount) {
const client = redisClient.getClientSafe()
const mappingKey = `${ACCOUNT_SESSION_MAPPING_PREFIX}${sessionId}`
await client.setex(mappingKey, 3600, selectedAccount.id) // 1小时过期
}
logger.debug(`Selected Azure OpenAI account: ${selectedAccount.id}`)
return selectedAccount
}
// 更新账户使用量
async function updateAccountUsage(accountId, tokens) {
const client = redisClient.getClientSafe()
const now = new Date().toISOString()
// 使用 HINCRBY 原子操作更新使用量
await client.hincrby(`${AZURE_OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`, 'totalTokensUsed', tokens)
await client.hset(`${AZURE_OPENAI_ACCOUNT_KEY_PREFIX}${accountId}`, 'lastUsedAt', now)
logger.debug(`Updated Azure OpenAI account ${accountId} usage: ${tokens} tokens`)
}
// 健康检查单个账户
async function healthCheckAccount(accountId) {
try {
const account = await getAccount(accountId)
if (!account) {
return { id: accountId, status: 'error', message: 'Account not found' }
}
// 简单检查配置是否完整
if (!account.azureEndpoint || !account.apiKey || !account.deploymentName) {
return {
id: accountId,
status: 'error',
message: 'Incomplete configuration'
}
}
// 可以在这里添加实际的API调用测试
// 暂时返回成功状态
return {
id: accountId,
status: 'healthy',
message: 'Account is configured correctly'
}
} catch (error) {
logger.error(`Health check failed for Azure OpenAI account ${accountId}:`, error)
return {
id: accountId,
status: 'error',
message: error.message
}
}
}
// 批量健康检查
async function performHealthChecks() {
const accounts = await getAllAccounts()
const results = []
for (const account of accounts) {
const result = await healthCheckAccount(account.id)
results.push(result)
}
return results
}
// 切换账户的可调度状态
async function toggleSchedulable(accountId) {
const account = await getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const newSchedulable = account.schedulable === 'true' ? 'false' : 'true'
await updateAccount(accountId, { schedulable: newSchedulable })
return {
id: accountId,
schedulable: newSchedulable === 'true'
}
}
// 迁移 API Keys 以支持 Azure OpenAI
async function migrateApiKeysForAzureSupport() {
const client = redisClient.getClientSafe()
const apiKeyIds = await client.smembers('api_keys')
let migratedCount = 0
for (const keyId of apiKeyIds) {
const keyData = await client.hgetall(`api_key:${keyId}`)
if (keyData && !keyData.azureOpenaiAccountId) {
// 添加 Azure OpenAI 账户ID字段初始为空
await client.hset(`api_key:${keyId}`, 'azureOpenaiAccountId', '')
migratedCount++
}
}
logger.info(`Migrated ${migratedCount} API keys for Azure OpenAI support`)
return migratedCount
}
module.exports = {
createAccount,
getAccount,
updateAccount,
deleteAccount,
getAllAccounts,
getSharedAccounts,
selectAvailableAccount,
updateAccountUsage,
healthCheckAccount,
performHealthChecks,
toggleSchedulable,
migrateApiKeysForAzureSupport,
encrypt,
decrypt
}

View File

@@ -0,0 +1,529 @@
const axios = require('axios')
const ProxyHelper = require('../utils/proxyHelper')
const logger = require('../utils/logger')
// 转换模型名称(去掉 azure/ 前缀)
function normalizeModelName(model) {
if (model && model.startsWith('azure/')) {
return model.replace('azure/', '')
}
return model
}
// 处理 Azure OpenAI 请求
async function handleAzureOpenAIRequest({
account,
requestBody,
headers: _headers = {}, // 前缀下划线表示未使用
isStream = false,
endpoint = 'chat/completions'
}) {
// 声明变量在函数顶部,确保在 catch 块中也能访问
let requestUrl = ''
let proxyAgent = null
let deploymentName = ''
try {
// 构建 Azure OpenAI 请求 URL
const baseUrl = account.azureEndpoint
deploymentName = account.deploymentName || 'default'
// Azure Responses API requires preview versions; fall back appropriately
const apiVersion =
account.apiVersion || (endpoint === 'responses' ? '2024-10-01-preview' : '2024-02-01')
if (endpoint === 'chat/completions') {
requestUrl = `${baseUrl}/openai/deployments/${deploymentName}/chat/completions?api-version=${apiVersion}`
} else if (endpoint === 'responses') {
requestUrl = `${baseUrl}/openai/responses?api-version=${apiVersion}`
} else {
requestUrl = `${baseUrl}/openai/deployments/${deploymentName}/${endpoint}?api-version=${apiVersion}`
}
// 准备请求头
const requestHeaders = {
'Content-Type': 'application/json',
'api-key': account.apiKey
}
// 移除不需要的头部
delete requestHeaders['anthropic-version']
delete requestHeaders['x-api-key']
delete requestHeaders['host']
// 处理请求体
const processedBody = { ...requestBody }
// 标准化模型名称
if (processedBody.model) {
processedBody.model = normalizeModelName(processedBody.model)
} else {
processedBody.model = 'gpt-4'
}
// 使用统一的代理创建工具
proxyAgent = ProxyHelper.createProxyAgent(account.proxy)
// 配置请求选项
const axiosConfig = {
method: 'POST',
url: requestUrl,
headers: requestHeaders,
data: processedBody,
timeout: 600000, // 10 minutes for Azure OpenAI
validateStatus: () => true,
// 添加连接保活选项
keepAlive: true,
maxRedirects: 5,
// 防止socket hang up
socketKeepAlive: true
}
// 如果有代理,添加代理配置
if (proxyAgent) {
axiosConfig.httpsAgent = proxyAgent
// 为代理添加额外的keep-alive设置
if (proxyAgent.options) {
proxyAgent.options.keepAlive = true
proxyAgent.options.keepAliveMsecs = 1000
}
logger.debug(
`Using proxy for Azure OpenAI request: ${ProxyHelper.getProxyDescription(account.proxy)}`
)
}
// 流式请求特殊处理
if (isStream) {
axiosConfig.responseType = 'stream'
requestHeaders.accept = 'text/event-stream'
} else {
requestHeaders.accept = 'application/json'
}
logger.debug(`Making Azure OpenAI request`, {
requestUrl,
method: 'POST',
endpoint,
deploymentName,
apiVersion,
hasProxy: !!proxyAgent,
proxyInfo: ProxyHelper.maskProxyInfo(account.proxy),
isStream,
requestBodySize: JSON.stringify(processedBody).length
})
logger.debug('Azure OpenAI request headers', {
'content-type': requestHeaders['Content-Type'],
'user-agent': requestHeaders['user-agent'] || 'not-set',
customHeaders: Object.keys(requestHeaders).filter(
(key) => !['Content-Type', 'user-agent'].includes(key)
)
})
logger.debug('Azure OpenAI request body', {
model: processedBody.model,
messages: processedBody.messages?.length || 0,
otherParams: Object.keys(processedBody).filter((key) => !['model', 'messages'].includes(key))
})
const requestStartTime = Date.now()
logger.debug(`🔄 Starting Azure OpenAI HTTP request at ${new Date().toISOString()}`)
// 发送请求
const response = await axios(axiosConfig)
const requestDuration = Date.now() - requestStartTime
logger.debug(`✅ Azure OpenAI HTTP request completed at ${new Date().toISOString()}`)
logger.debug(`Azure OpenAI response received`, {
status: response.status,
statusText: response.statusText,
duration: `${requestDuration}ms`,
responseHeaders: Object.keys(response.headers || {}),
hasData: !!response.data,
contentType: response.headers?.['content-type'] || 'unknown'
})
return response
} catch (error) {
const errorDetails = {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
responseData: error.response?.data,
requestUrl: requestUrl || 'unknown',
endpoint,
deploymentName: deploymentName || account?.deploymentName || 'unknown',
hasProxy: !!proxyAgent,
proxyType: account?.proxy?.type || 'none',
isTimeout: error.code === 'ECONNABORTED',
isNetworkError: !error.response,
stack: error.stack
}
// 特殊错误类型的详细日志
if (error.code === 'ENOTFOUND') {
logger.error('DNS Resolution Failed for Azure OpenAI', {
...errorDetails,
hostname: requestUrl && requestUrl !== 'unknown' ? new URL(requestUrl).hostname : 'unknown',
suggestion: 'Check if Azure endpoint URL is correct and accessible'
})
} else if (error.code === 'ECONNREFUSED') {
logger.error('Connection Refused by Azure OpenAI', {
...errorDetails,
suggestion: 'Check if proxy settings are correct or Azure service is accessible'
})
} else if (error.code === 'ECONNRESET' || error.message.includes('socket hang up')) {
logger.error('🚨 Azure OpenAI Connection Reset / Socket Hang Up', {
...errorDetails,
suggestion:
'Connection was dropped by Azure OpenAI or proxy. This might be due to long request processing time, proxy timeout, or network instability. Try reducing request complexity or check proxy settings.'
})
} else if (error.code === 'ECONNABORTED' || error.code === 'ETIMEDOUT') {
logger.error('🚨 Azure OpenAI Request Timeout', {
...errorDetails,
timeoutMs: 600000,
suggestion:
'Request exceeded 10-minute timeout. Consider reducing model complexity or check if Azure service is responding slowly.'
})
} else if (
error.code === 'CERT_AUTHORITY_INVALID' ||
error.code === 'UNABLE_TO_VERIFY_LEAF_SIGNATURE'
) {
logger.error('SSL Certificate Error for Azure OpenAI', {
...errorDetails,
suggestion: 'SSL certificate validation failed - check proxy SSL settings'
})
} else if (error.response?.status === 401) {
logger.error('Azure OpenAI Authentication Failed', {
...errorDetails,
suggestion: 'Check if Azure OpenAI API key is valid and not expired'
})
} else if (error.response?.status === 404) {
logger.error('Azure OpenAI Deployment Not Found', {
...errorDetails,
suggestion: 'Check if deployment name and Azure endpoint are correct'
})
} else {
logger.error('Azure OpenAI Request Failed', errorDetails)
}
throw error
}
}
// 安全的流管理器
class StreamManager {
constructor() {
this.activeStreams = new Set()
this.cleanupCallbacks = new Map()
}
registerStream(streamId, cleanup) {
this.activeStreams.add(streamId)
this.cleanupCallbacks.set(streamId, cleanup)
}
cleanup(streamId) {
if (this.activeStreams.has(streamId)) {
try {
const cleanup = this.cleanupCallbacks.get(streamId)
if (cleanup) {
cleanup()
}
} catch (error) {
logger.warn(`Stream cleanup error for ${streamId}:`, error.message)
} finally {
this.activeStreams.delete(streamId)
this.cleanupCallbacks.delete(streamId)
}
}
}
getActiveStreamCount() {
return this.activeStreams.size
}
}
const streamManager = new StreamManager()
// SSE 缓冲区大小限制
const MAX_BUFFER_SIZE = 64 * 1024 // 64KB
const MAX_EVENT_SIZE = 16 * 1024 // 16KB 单个事件最大大小
// 处理流式响应
function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
const { onData, onEnd, onError } = options
const streamId = `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
logger.info(`Starting Azure OpenAI stream handling`, {
streamId,
upstreamStatus: upstreamResponse.status,
upstreamHeaders: Object.keys(upstreamResponse.headers || {}),
clientRemoteAddress: clientResponse.req?.connection?.remoteAddress,
hasOnData: !!onData,
hasOnEnd: !!onEnd,
hasOnError: !!onError
})
return new Promise((resolve, reject) => {
let buffer = ''
let usageData = null
let actualModel = null
let hasEnded = false
let eventCount = 0
const maxEvents = 10000 // 最大事件数量限制
// 设置响应头
clientResponse.setHeader('Content-Type', 'text/event-stream')
clientResponse.setHeader('Cache-Control', 'no-cache')
clientResponse.setHeader('Connection', 'keep-alive')
clientResponse.setHeader('X-Accel-Buffering', 'no')
// 透传某些头部
const passThroughHeaders = [
'x-request-id',
'x-ratelimit-remaining-requests',
'x-ratelimit-remaining-tokens'
]
passThroughHeaders.forEach((header) => {
const value = upstreamResponse.headers[header]
if (value) {
clientResponse.setHeader(header, value)
}
})
// 立即刷新响应头
if (typeof clientResponse.flushHeaders === 'function') {
clientResponse.flushHeaders()
}
// 解析 SSE 事件以捕获 usage 数据
const parseSSEForUsage = (data) => {
const lines = data.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const jsonStr = line.slice(6) // 移除 'data: ' 前缀
if (jsonStr.trim() === '[DONE]') {
continue
}
const eventData = JSON.parse(jsonStr)
// 获取模型信息
if (eventData.model) {
actualModel = eventData.model
}
// 获取使用统计Responses API: response.completed -> response.usage
if (eventData.type === 'response.completed' && eventData.response) {
if (eventData.response.model) {
actualModel = eventData.response.model
}
if (eventData.response.usage) {
usageData = eventData.response.usage
logger.debug('Captured Azure OpenAI nested usage (response.usage):', usageData)
}
}
// 兼容 Chat Completions 风格(顶层 usage
if (!usageData && eventData.usage) {
usageData = eventData.usage
logger.debug('Captured Azure OpenAI usage (top-level):', usageData)
}
// 检查是否是完成事件
if (eventData.choices && eventData.choices[0] && eventData.choices[0].finish_reason) {
// 这是最后一个 chunk
}
} catch (e) {
// 忽略解析错误
}
}
}
}
// 注册流清理
const cleanup = () => {
if (!hasEnded) {
hasEnded = true
try {
upstreamResponse.data?.removeAllListeners?.()
upstreamResponse.data?.destroy?.()
if (!clientResponse.headersSent) {
clientResponse.status(502).end()
} else if (!clientResponse.destroyed) {
clientResponse.end()
}
} catch (error) {
logger.warn('Stream cleanup error:', error.message)
}
}
}
streamManager.registerStream(streamId, cleanup)
upstreamResponse.data.on('data', (chunk) => {
try {
if (hasEnded || clientResponse.destroyed) {
return
}
eventCount++
if (eventCount > maxEvents) {
logger.warn(`Stream ${streamId} exceeded max events limit`)
cleanup()
return
}
const chunkStr = chunk.toString()
// 转发数据给客户端
if (!clientResponse.destroyed) {
clientResponse.write(chunk)
}
// 同时解析数据以捕获 usage 信息,带缓冲区大小限制
buffer += chunkStr
// 防止缓冲区过大
if (buffer.length > MAX_BUFFER_SIZE) {
logger.warn(`Stream ${streamId} buffer exceeded limit, truncating`)
buffer = buffer.slice(-MAX_BUFFER_SIZE / 2) // 保留后一半
}
// 处理完整的 SSE 事件
if (buffer.includes('\n\n')) {
const events = buffer.split('\n\n')
buffer = events.pop() || '' // 保留最后一个可能不完整的事件
for (const event of events) {
if (event.trim() && event.length <= MAX_EVENT_SIZE) {
parseSSEForUsage(event)
}
}
}
if (onData) {
onData(chunk, { usageData, actualModel })
}
} catch (error) {
logger.error('Error processing Azure OpenAI stream chunk:', error)
if (!hasEnded) {
cleanup()
reject(error)
}
}
})
upstreamResponse.data.on('end', () => {
if (hasEnded) {
return
}
streamManager.cleanup(streamId)
hasEnded = true
try {
// 处理剩余的 buffer
if (buffer.trim() && buffer.length <= MAX_EVENT_SIZE) {
parseSSEForUsage(buffer)
}
if (onEnd) {
onEnd({ usageData, actualModel })
}
if (!clientResponse.destroyed) {
clientResponse.end()
}
resolve({ usageData, actualModel })
} catch (error) {
logger.error('Stream end handling error:', error)
reject(error)
}
})
upstreamResponse.data.on('error', (error) => {
if (hasEnded) {
return
}
streamManager.cleanup(streamId)
hasEnded = true
logger.error('Upstream stream error:', error)
try {
if (onError) {
onError(error)
}
if (!clientResponse.headersSent) {
clientResponse.status(502).json({ error: { message: 'Upstream stream error' } })
} else if (!clientResponse.destroyed) {
clientResponse.end()
}
} catch (cleanupError) {
logger.warn('Error during stream error cleanup:', cleanupError.message)
}
reject(error)
})
// 客户端断开时清理
const clientCleanup = () => {
streamManager.cleanup(streamId)
}
clientResponse.on('close', clientCleanup)
clientResponse.on('aborted', clientCleanup)
clientResponse.on('error', clientCleanup)
})
}
// 处理非流式响应
function handleNonStreamResponse(upstreamResponse, clientResponse) {
try {
// 设置状态码
clientResponse.status(upstreamResponse.status)
// 设置响应头
clientResponse.setHeader('Content-Type', 'application/json')
// 透传某些头部
const passThroughHeaders = [
'x-request-id',
'x-ratelimit-remaining-requests',
'x-ratelimit-remaining-tokens'
]
passThroughHeaders.forEach((header) => {
const value = upstreamResponse.headers[header]
if (value) {
clientResponse.setHeader(header, value)
}
})
// 返回响应数据
const responseData = upstreamResponse.data
clientResponse.json(responseData)
// 提取 usage 数据
const usageData = responseData.usage
const actualModel = responseData.model
return { usageData, actualModel, responseData }
} catch (error) {
logger.error('Error handling Azure OpenAI non-stream response:', error)
throw error
}
}
module.exports = {
handleAzureOpenAIRequest,
handleStreamResponse,
handleNonStreamResponse,
normalizeModelName
}

View File

@@ -1,7 +1,6 @@
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const { SocksProxyAgent } = require('socks-proxy-agent')
const { HttpsProxyAgent } = require('https-proxy-agent')
const ProxyHelper = require('../utils/proxyHelper')
const axios = require('axios')
const redis = require('../models/redis')
const logger = require('../utils/logger')
@@ -55,6 +54,7 @@ class ClaudeAccountService {
proxy = null, // { type: 'socks5', host: 'localhost', port: 1080, username: '', password: '' }
isActive = true,
accountType = 'shared', // 'dedicated' or 'shared'
platform = 'claude',
priority = 50, // 调度优先级 (1-100数字越小优先级越高)
schedulable = true, // 是否可被调度
subscriptionInfo = null // 手动设置的订阅信息
@@ -79,7 +79,8 @@ class ClaudeAccountService {
scopes: claudeAiOauth.scopes.join(' '),
proxy: proxy ? JSON.stringify(proxy) : '',
isActive: isActive.toString(),
accountType, // 账号类型:'dedicated' 或 'shared'
accountType, // 账号类型:'dedicated' 或 'shared' 或 'group'
platform,
priority: priority.toString(), // 调度优先级
createdAt: new Date().toISOString(),
lastUsedAt: '',
@@ -108,7 +109,8 @@ class ClaudeAccountService {
scopes: '',
proxy: proxy ? JSON.stringify(proxy) : '',
isActive: isActive.toString(),
accountType, // 账号类型:'dedicated' 或 'shared'
accountType, // 账号类型:'dedicated' 或 'shared' 或 'group'
platform,
priority: priority.toString(), // 调度优先级
createdAt: new Date().toISOString(),
lastUsedAt: '',
@@ -151,6 +153,7 @@ class ClaudeAccountService {
isActive,
proxy,
accountType,
platform,
priority,
status: accountData.status,
createdAt: accountData.createdAt,
@@ -444,7 +447,7 @@ class ClaudeAccountService {
errorMessage: account.errorMessage,
accountType: account.accountType || 'shared', // 兼容旧数据,默认为共享
priority: parseInt(account.priority) || 50, // 兼容旧数据默认优先级50
platform: 'claude-oauth', // 添加平台标识,用于前端区分
platform: account.platform || 'claude', // 添加平台标识,用于前端区分
createdAt: account.createdAt,
lastUsedAt: account.lastUsedAt,
lastRefreshAt: account.lastRefreshAt,
@@ -857,29 +860,19 @@ class ClaudeAccountService {
}
}
// 🌐 创建代理agent
// 🌐 创建代理agent(使用统一的代理工具)
_createProxyAgent(proxyConfig) {
if (!proxyConfig) {
return null
const proxyAgent = ProxyHelper.createProxyAgent(proxyConfig)
if (proxyAgent) {
logger.info(
`🌐 Using proxy for Claude request: ${ProxyHelper.getProxyDescription(proxyConfig)}`
)
} else if (proxyConfig) {
logger.debug('🌐 Failed to create proxy agent for Claude')
} else {
logger.debug('🌐 No proxy configured for Claude request')
}
try {
const proxy = JSON.parse(proxyConfig)
if (proxy.type === 'socks5') {
const auth = proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const socksUrl = `socks5://${auth}${proxy.host}:${proxy.port}`
return new SocksProxyAgent(socksUrl)
} else if (proxy.type === 'http' || proxy.type === 'https') {
const auth = proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const httpUrl = `${proxy.type}://${auth}${proxy.host}:${proxy.port}`
return new HttpsProxyAgent(httpUrl)
}
} catch (error) {
logger.warn('⚠️ Invalid proxy configuration:', error)
}
return null
return proxyAgent
}
// 🔐 加密敏感数据
@@ -1094,6 +1087,22 @@ class ClaudeAccountService {
logger.info(`🗑️ Deleted sticky session mapping for rate limited account: ${accountId}`)
}
// 发送Webhook通知
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: accountData.name || 'Claude Account',
platform: 'claude-oauth',
status: 'error',
errorCode: 'CLAUDE_OAUTH_RATE_LIMITED',
reason: `Account rate limited (429 error). ${rateLimitResetTimestamp ? `Reset at: ${new Date(rateLimitResetTimestamp * 1000).toISOString()}` : 'Estimated reset in 1-5 hours'}`,
timestamp: new Date().toISOString()
})
} catch (webhookError) {
logger.error('Failed to send rate limit webhook notification:', webhookError)
}
return { success: true }
} catch (error) {
logger.error(`❌ Failed to mark account as rate limited: ${accountId}`, error)

View File

@@ -1,7 +1,6 @@
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const { SocksProxyAgent } = require('socks-proxy-agent')
const { HttpsProxyAgent } = require('https-proxy-agent')
const ProxyHelper = require('../utils/proxyHelper')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const config = require('../../config/config')
@@ -367,6 +366,22 @@ class ClaudeConsoleAccountService {
await client.hset(`${this.ACCOUNT_KEY_PREFIX}${accountId}`, updates)
// 发送Webhook通知
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || 'Claude Console Account',
platform: 'claude-console',
status: 'error',
errorCode: 'CLAUDE_CONSOLE_RATE_LIMITED',
reason: `Account rate limited (429 error). ${account.rateLimitDuration ? `Will be blocked for ${account.rateLimitDuration} hours` : 'Temporary rate limit'}`,
timestamp: new Date().toISOString()
})
} catch (webhookError) {
logger.error('Failed to send rate limit webhook notification:', webhookError)
}
logger.warn(
`🚫 Claude Console account marked as rate limited: ${account.name} (${accountId})`
)
@@ -480,29 +495,19 @@ class ClaudeConsoleAccountService {
}
}
// 🌐 创建代理agent
// 🌐 创建代理agent(使用统一的代理工具)
_createProxyAgent(proxyConfig) {
if (!proxyConfig) {
return null
const proxyAgent = ProxyHelper.createProxyAgent(proxyConfig)
if (proxyAgent) {
logger.info(
`🌐 Using proxy for Claude Console request: ${ProxyHelper.getProxyDescription(proxyConfig)}`
)
} else if (proxyConfig) {
logger.debug('🌐 Failed to create proxy agent for Claude Console')
} else {
logger.debug('🌐 No proxy configured for Claude Console request')
}
try {
const proxy = typeof proxyConfig === 'string' ? JSON.parse(proxyConfig) : proxyConfig
if (proxy.type === 'socks5') {
const auth = proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const socksUrl = `socks5://${auth}${proxy.host}:${proxy.port}`
return new SocksProxyAgent(socksUrl)
} else if (proxy.type === 'http' || proxy.type === 'https') {
const auth = proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const httpUrl = `${proxy.type}://${auth}${proxy.host}:${proxy.port}`
return new HttpsProxyAgent(httpUrl)
}
} catch (error) {
logger.warn('⚠️ Invalid proxy configuration:', error)
}
return null
return proxyAgent
}
// 🔐 加密敏感数据

View File

@@ -84,7 +84,16 @@ class ClaudeConsoleRelayService {
// 构建完整的API URL
const cleanUrl = account.apiUrl.replace(/\/$/, '') // 移除末尾斜杠
const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
let apiEndpoint
if (options.customPath) {
// 如果指定了自定义路径(如 count_tokens使用它
const baseUrl = cleanUrl.replace(/\/v1\/messages$/, '') // 移除已有的 /v1/messages
apiEndpoint = `${baseUrl}${options.customPath}`
} else {
// 默认使用 messages 端点
apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages`
}
logger.debug(`🎯 Final API endpoint: ${apiEndpoint}`)
logger.debug(`[DEBUG] Options passed to relayRequest: ${JSON.stringify(options)}`)

View File

@@ -2,8 +2,7 @@ const https = require('https')
const zlib = require('zlib')
const fs = require('fs')
const path = require('path')
const { SocksProxyAgent } = require('socks-proxy-agent')
const { HttpsProxyAgent } = require('https-proxy-agent')
const ProxyHelper = require('../utils/proxyHelper')
const claudeAccountService = require('./claudeAccountService')
const unifiedClaudeScheduler = require('./unifiedClaudeScheduler')
const sessionHelper = require('../utils/sessionHelper')
@@ -496,32 +495,28 @@ class ClaudeRelayService {
}
}
// 🌐 获取代理Agent
// 🌐 获取代理Agent(使用统一的代理工具)
async _getProxyAgent(accountId) {
try {
const accountData = await claudeAccountService.getAllAccounts()
const account = accountData.find((acc) => acc.id === accountId)
if (!account || !account.proxy) {
logger.debug('🌐 No proxy configured for Claude account')
return null
}
const { proxy } = account
if (proxy.type === 'socks5') {
const auth = proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const socksUrl = `socks5://${auth}${proxy.host}:${proxy.port}`
return new SocksProxyAgent(socksUrl)
} else if (proxy.type === 'http' || proxy.type === 'https') {
const auth = proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const httpUrl = `${proxy.type}://${auth}${proxy.host}:${proxy.port}`
return new HttpsProxyAgent(httpUrl)
const proxyAgent = ProxyHelper.createProxyAgent(account.proxy)
if (proxyAgent) {
logger.info(
`🌐 Using proxy for Claude request: ${ProxyHelper.getProxyDescription(account.proxy)}`
)
}
return proxyAgent
} catch (error) {
logger.warn('⚠️ Failed to create proxy agent:', error)
return null
}
return null
}
// 🔧 过滤客户端请求头
@@ -596,10 +591,18 @@ class ClaudeRelayService {
}
return new Promise((resolve, reject) => {
// 支持自定义路径(如 count_tokens
let requestPath = url.pathname
if (requestOptions.customPath) {
const baseUrl = new URL('https://api.anthropic.com')
const customUrl = new URL(requestOptions.customPath, baseUrl)
requestPath = customUrl.pathname
}
const options = {
hostname: url.hostname,
port: url.port || 443,
path: url.pathname,
path: requestPath,
method: 'POST',
headers: {
'Content-Type': 'application/json',

View File

@@ -5,6 +5,7 @@ const config = require('../../config/config')
const logger = require('../utils/logger')
const { OAuth2Client } = require('google-auth-library')
const { maskToken } = require('../utils/tokenMask')
const ProxyHelper = require('../utils/proxyHelper')
const {
logRefreshStart,
logRefreshSuccess,
@@ -109,11 +110,32 @@ setInterval(
10 * 60 * 1000
)
// 创建 OAuth2 客户端
function createOAuth2Client(redirectUri = null) {
// 创建 OAuth2 客户端(支持代理配置)
function createOAuth2Client(redirectUri = null, proxyConfig = null) {
// 如果没有提供 redirectUri使用默认值
const uri = redirectUri || 'http://localhost:45462'
return new OAuth2Client(OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, uri)
// 准备客户端选项
const clientOptions = {
clientId: OAUTH_CLIENT_ID,
clientSecret: OAUTH_CLIENT_SECRET,
redirectUri: uri
}
// 如果有代理配置,设置 transporterOptions
if (proxyConfig) {
const proxyAgent = ProxyHelper.createProxyAgent(proxyConfig)
if (proxyAgent) {
// 通过 transporterOptions 传递代理配置给底层的 Gaxios
clientOptions.transporterOptions = {
agent: proxyAgent,
httpsAgent: proxyAgent
}
logger.debug('Created OAuth2Client with proxy configuration')
}
}
return new OAuth2Client(clientOptions)
}
// 生成授权 URL (支持 PKCE)
@@ -196,11 +218,25 @@ async function pollAuthorizationStatus(sessionId, maxAttempts = 60, interval = 2
}
}
// 交换授权码获取 tokens (支持 PKCE)
async function exchangeCodeForTokens(code, redirectUri = null, codeVerifier = null) {
const oAuth2Client = createOAuth2Client(redirectUri)
// 交换授权码获取 tokens (支持 PKCE 和代理)
async function exchangeCodeForTokens(
code,
redirectUri = null,
codeVerifier = null,
proxyConfig = null
) {
try {
// 创建带代理配置的 OAuth2Client
const oAuth2Client = createOAuth2Client(redirectUri, proxyConfig)
if (proxyConfig) {
logger.info(
`🌐 Using proxy for Gemini token exchange: ${ProxyHelper.getProxyDescription(proxyConfig)}`
)
} else {
logger.debug('🌐 No proxy configured for Gemini token exchange')
}
const tokenParams = {
code,
redirect_uri: redirectUri
@@ -228,8 +264,9 @@ async function exchangeCodeForTokens(code, redirectUri = null, codeVerifier = nu
}
// 刷新访问令牌
async function refreshAccessToken(refreshToken) {
const oAuth2Client = createOAuth2Client()
async function refreshAccessToken(refreshToken, proxyConfig = null) {
// 创建带代理配置的 OAuth2Client
const oAuth2Client = createOAuth2Client(null, proxyConfig)
try {
// 设置 refresh_token
@@ -237,6 +274,14 @@ async function refreshAccessToken(refreshToken) {
refresh_token: refreshToken
})
if (proxyConfig) {
logger.info(
`🔄 Using proxy for Gemini token refresh: ${ProxyHelper.maskProxyInfo(proxyConfig)}`
)
} else {
logger.debug('🔄 No proxy configured for Gemini token refresh')
}
// 调用 refreshAccessToken 获取新的 tokens
const response = await oAuth2Client.refreshAccessToken()
const { credentials } = response
@@ -261,7 +306,9 @@ async function refreshAccessToken(refreshToken) {
logger.error('Error refreshing access token:', {
message: error.message,
code: error.code,
response: error.response?.data
response: error.response?.data,
hasProxy: !!proxyConfig,
proxy: proxyConfig ? ProxyHelper.maskProxyInfo(proxyConfig) : 'No proxy'
})
throw new Error(`Failed to refresh access token: ${error.message}`)
}
@@ -786,7 +833,8 @@ async function refreshAccountToken(accountId) {
logger.info(`🔄 Starting token refresh for Gemini account: ${account.name} (${accountId})`)
// account.refreshToken 已经是解密后的值(从 getAccount 返回)
const newTokens = await refreshAccessToken(account.refreshToken)
// 传入账户的代理配置
const newTokens = await refreshAccessToken(account.refreshToken, account.proxy)
// 更新账户信息
const updates = {
@@ -1169,7 +1217,8 @@ async function generateContent(
requestData,
userPromptId,
projectId = null,
sessionId = null
sessionId = null,
proxyConfig = null
) {
const axios = require('axios')
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com'
@@ -1206,6 +1255,17 @@ async function generateContent(
timeout: 60000 // 生成内容可能需要更长时间
}
// 添加代理配置
const proxyAgent = ProxyHelper.createProxyAgent(proxyConfig)
if (proxyAgent) {
axiosConfig.httpsAgent = proxyAgent
logger.info(
`🌐 Using proxy for Gemini generateContent: ${ProxyHelper.getProxyDescription(proxyConfig)}`
)
} else {
logger.debug('🌐 No proxy configured for Gemini generateContent')
}
const response = await axios(axiosConfig)
logger.info('✅ generateContent API调用成功')
@@ -1219,7 +1279,8 @@ async function generateContentStream(
userPromptId,
projectId = null,
sessionId = null,
signal = null
signal = null,
proxyConfig = null
) {
const axios = require('axios')
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com'
@@ -1260,6 +1321,17 @@ async function generateContentStream(
timeout: 60000
}
// 添加代理配置
const proxyAgent = ProxyHelper.createProxyAgent(proxyConfig)
if (proxyAgent) {
axiosConfig.httpsAgent = proxyAgent
logger.info(
`🌐 Using proxy for Gemini streamGenerateContent: ${ProxyHelper.getProxyDescription(proxyConfig)}`
)
} else {
logger.debug('🌐 No proxy configured for Gemini streamGenerateContent')
}
// 如果提供了中止信号,添加到配置中
if (signal) {
axiosConfig.signal = signal

View File

@@ -1,6 +1,5 @@
const axios = require('axios')
const { HttpsProxyAgent } = require('https-proxy-agent')
const { SocksProxyAgent } = require('socks-proxy-agent')
const ProxyHelper = require('../utils/proxyHelper')
const logger = require('../utils/logger')
const config = require('../../config/config')
const apiKeyService = require('./apiKeyService')
@@ -9,34 +8,9 @@ const apiKeyService = require('./apiKeyService')
const GEMINI_API_BASE = 'https://cloudcode.googleapis.com/v1'
const DEFAULT_MODEL = 'models/gemini-2.0-flash-exp'
// 创建代理 agent
// 创建代理 agent(使用统一的代理工具)
function createProxyAgent(proxyConfig) {
if (!proxyConfig) {
return null
}
try {
const proxy = typeof proxyConfig === 'string' ? JSON.parse(proxyConfig) : proxyConfig
if (!proxy.type || !proxy.host || !proxy.port) {
return null
}
const proxyUrl =
proxy.username && proxy.password
? `${proxy.type}://${proxy.username}:${proxy.password}@${proxy.host}:${proxy.port}`
: `${proxy.type}://${proxy.host}:${proxy.port}`
if (proxy.type === 'socks5') {
return new SocksProxyAgent(proxyUrl)
} else if (proxy.type === 'http' || proxy.type === 'https') {
return new HttpsProxyAgent(proxyUrl)
}
} catch (error) {
logger.error('Error creating proxy agent:', error)
}
return null
return ProxyHelper.createProxyAgent(proxyConfig)
}
// 转换 OpenAI 消息格式到 Gemini 格式
@@ -306,7 +280,9 @@ async function sendGeminiRequest({
const proxyAgent = createProxyAgent(proxy)
if (proxyAgent) {
axiosConfig.httpsAgent = proxyAgent
logger.debug('Using proxy for Gemini request')
logger.info(`🌐 Using proxy for Gemini API request: ${ProxyHelper.getProxyDescription(proxy)}`)
} else {
logger.debug('🌐 No proxy configured for Gemini API request')
}
// 添加 AbortController 信号支持
@@ -412,6 +388,11 @@ async function getAvailableModels(accessToken, proxy, projectId, location = 'us-
const proxyAgent = createProxyAgent(proxy)
if (proxyAgent) {
axiosConfig.httpsAgent = proxyAgent
logger.info(
`🌐 Using proxy for Gemini models request: ${ProxyHelper.getProxyDescription(proxy)}`
)
} else {
logger.debug('🌐 No proxy configured for Gemini models request')
}
try {
@@ -508,7 +489,11 @@ async function countTokens({
const proxyAgent = createProxyAgent(proxy)
if (proxyAgent) {
axiosConfig.httpsAgent = proxyAgent
logger.debug('Using proxy for Gemini countTokens request')
logger.info(
`🌐 Using proxy for Gemini countTokens request: ${ProxyHelper.getProxyDescription(proxy)}`
)
} else {
logger.debug('🌐 No proxy configured for Gemini countTokens request')
}
try {

View File

@@ -2,8 +2,7 @@ const redisClient = require('../models/redis')
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const axios = require('axios')
const { SocksProxyAgent } = require('socks-proxy-agent')
const { HttpsProxyAgent } = require('https-proxy-agent')
const ProxyHelper = require('../utils/proxyHelper')
const config = require('../../config/config')
const logger = require('../utils/logger')
// const { maskToken } = require('../utils/tokenMask')
@@ -133,18 +132,14 @@ async function refreshAccessToken(refreshToken, proxy = null) {
}
// 配置代理(如果有)
if (proxy && proxy.host && proxy.port) {
if (proxy.type === 'socks5') {
const proxyAuth =
proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const socksProxy = `socks5://${proxyAuth}${proxy.host}:${proxy.port}`
requestOptions.httpsAgent = new SocksProxyAgent(socksProxy)
} else if (proxy.type === 'http' || proxy.type === 'https') {
const proxyAuth =
proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''
const httpProxy = `http://${proxyAuth}${proxy.host}:${proxy.port}`
requestOptions.httpsAgent = new HttpsProxyAgent(httpProxy)
}
const proxyAgent = ProxyHelper.createProxyAgent(proxy)
if (proxyAgent) {
requestOptions.httpsAgent = proxyAgent
logger.info(
`🌐 Using proxy for OpenAI token refresh: ${ProxyHelper.getProxyDescription(proxy)}`
)
} else {
logger.debug('🌐 No proxy configured for OpenAI token refresh')
}
// 发送请求

View File

@@ -0,0 +1,272 @@
const redis = require('../models/redis')
const logger = require('../utils/logger')
const { v4: uuidv4 } = require('uuid')
class WebhookConfigService {
constructor() {
this.KEY_PREFIX = 'webhook_config'
this.DEFAULT_CONFIG_KEY = `${this.KEY_PREFIX}:default`
}
/**
* 获取webhook配置
*/
async getConfig() {
try {
const configStr = await redis.client.get(this.DEFAULT_CONFIG_KEY)
if (!configStr) {
// 返回默认配置
return this.getDefaultConfig()
}
return JSON.parse(configStr)
} catch (error) {
logger.error('获取webhook配置失败:', error)
return this.getDefaultConfig()
}
}
/**
* 保存webhook配置
*/
async saveConfig(config) {
try {
// 验证配置
this.validateConfig(config)
// 添加更新时间
config.updatedAt = new Date().toISOString()
await redis.client.set(this.DEFAULT_CONFIG_KEY, JSON.stringify(config))
logger.info('✅ Webhook配置已保存')
return config
} catch (error) {
logger.error('保存webhook配置失败:', error)
throw error
}
}
/**
* 验证配置
*/
validateConfig(config) {
if (!config || typeof config !== 'object') {
throw new Error('无效的配置格式')
}
// 验证平台配置
if (config.platforms) {
const validPlatforms = ['wechat_work', 'dingtalk', 'feishu', 'slack', 'discord', 'custom']
for (const platform of config.platforms) {
if (!validPlatforms.includes(platform.type)) {
throw new Error(`不支持的平台类型: ${platform.type}`)
}
if (!platform.url || !this.isValidUrl(platform.url)) {
throw new Error(`无效的webhook URL: ${platform.url}`)
}
// 验证平台特定的配置
this.validatePlatformConfig(platform)
}
}
}
/**
* 验证平台特定配置
*/
validatePlatformConfig(platform) {
switch (platform.type) {
case 'wechat_work':
// 企业微信不需要额外配置
break
case 'dingtalk':
// 钉钉可能需要secret用于签名
if (platform.enableSign && !platform.secret) {
throw new Error('钉钉启用签名时必须提供secret')
}
break
case 'feishu':
// 飞书可能需要签名
if (platform.enableSign && !platform.secret) {
throw new Error('飞书启用签名时必须提供secret')
}
break
case 'slack':
// Slack webhook URL通常包含token
if (!platform.url.includes('hooks.slack.com')) {
logger.warn('⚠️ Slack webhook URL格式可能不正确')
}
break
case 'discord':
// Discord webhook URL格式检查
if (!platform.url.includes('discord.com/api/webhooks')) {
logger.warn('⚠️ Discord webhook URL格式可能不正确')
}
break
case 'custom':
// 自定义webhook用户自行负责格式
break
}
}
/**
* 验证URL格式
*/
isValidUrl(url) {
try {
new URL(url)
return true
} catch {
return false
}
}
/**
* 获取默认配置
*/
getDefaultConfig() {
return {
enabled: false,
platforms: [],
notificationTypes: {
accountAnomaly: true, // 账号异常
quotaWarning: true, // 配额警告
systemError: true, // 系统错误
securityAlert: true, // 安全警报
test: true // 测试通知
},
retrySettings: {
maxRetries: 3,
retryDelay: 1000, // 毫秒
timeout: 10000 // 毫秒
},
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString()
}
}
/**
* 添加webhook平台
*/
async addPlatform(platform) {
try {
const config = await this.getConfig()
// 生成唯一ID
platform.id = platform.id || uuidv4()
platform.enabled = platform.enabled !== false
platform.createdAt = new Date().toISOString()
// 验证平台配置
this.validatePlatformConfig(platform)
// 添加到配置
config.platforms = config.platforms || []
config.platforms.push(platform)
await this.saveConfig(config)
return platform
} catch (error) {
logger.error('添加webhook平台失败:', error)
throw error
}
}
/**
* 更新webhook平台
*/
async updatePlatform(platformId, updates) {
try {
const config = await this.getConfig()
const index = config.platforms.findIndex((p) => p.id === platformId)
if (index === -1) {
throw new Error('找不到指定的webhook平台')
}
// 合并更新
config.platforms[index] = {
...config.platforms[index],
...updates,
updatedAt: new Date().toISOString()
}
// 验证更新后的配置
this.validatePlatformConfig(config.platforms[index])
await this.saveConfig(config)
return config.platforms[index]
} catch (error) {
logger.error('更新webhook平台失败:', error)
throw error
}
}
/**
* 删除webhook平台
*/
async deletePlatform(platformId) {
try {
const config = await this.getConfig()
config.platforms = config.platforms.filter((p) => p.id !== platformId)
await this.saveConfig(config)
logger.info(`✅ 已删除webhook平台: ${platformId}`)
return true
} catch (error) {
logger.error('删除webhook平台失败:', error)
throw error
}
}
/**
* 切换webhook平台启用状态
*/
async togglePlatform(platformId) {
try {
const config = await this.getConfig()
const platform = config.platforms.find((p) => p.id === platformId)
if (!platform) {
throw new Error('找不到指定的webhook平台')
}
platform.enabled = !platform.enabled
platform.updatedAt = new Date().toISOString()
await this.saveConfig(config)
logger.info(`✅ Webhook平台 ${platformId}${platform.enabled ? '启用' : '禁用'}`)
return platform
} catch (error) {
logger.error('切换webhook平台状态失败:', error)
throw error
}
}
/**
* 获取启用的平台列表
*/
async getEnabledPlatforms() {
try {
const config = await this.getConfig()
if (!config.enabled || !config.platforms) {
return []
}
return config.platforms.filter((p) => p.enabled)
} catch (error) {
logger.error('获取启用的webhook平台失败:', error)
return []
}
}
}
module.exports = new WebhookConfigService()

View File

@@ -0,0 +1,495 @@
const axios = require('axios')
const crypto = require('crypto')
const logger = require('../utils/logger')
const webhookConfigService = require('./webhookConfigService')
class WebhookService {
constructor() {
this.platformHandlers = {
wechat_work: this.sendToWechatWork.bind(this),
dingtalk: this.sendToDingTalk.bind(this),
feishu: this.sendToFeishu.bind(this),
slack: this.sendToSlack.bind(this),
discord: this.sendToDiscord.bind(this),
custom: this.sendToCustom.bind(this)
}
}
/**
* 发送通知到所有启用的平台
*/
async sendNotification(type, data) {
try {
const config = await webhookConfigService.getConfig()
// 检查是否启用webhook
if (!config.enabled) {
logger.debug('Webhook通知已禁用')
return
}
// 检查通知类型是否启用test类型始终允许发送
if (type !== 'test' && config.notificationTypes && !config.notificationTypes[type]) {
logger.debug(`通知类型 ${type} 已禁用`)
return
}
// 获取启用的平台
const enabledPlatforms = await webhookConfigService.getEnabledPlatforms()
if (enabledPlatforms.length === 0) {
logger.debug('没有启用的webhook平台')
return
}
logger.info(`📢 发送 ${type} 通知到 ${enabledPlatforms.length} 个平台`)
// 并发发送到所有平台
const promises = enabledPlatforms.map((platform) =>
this.sendToPlatform(platform, type, data, config.retrySettings)
)
const results = await Promise.allSettled(promises)
// 记录结果
const succeeded = results.filter((r) => r.status === 'fulfilled').length
const failed = results.filter((r) => r.status === 'rejected').length
if (failed > 0) {
logger.warn(`⚠️ Webhook通知: ${succeeded}成功, ${failed}失败`)
} else {
logger.info(`✅ 所有webhook通知发送成功`)
}
return { succeeded, failed }
} catch (error) {
logger.error('发送webhook通知失败:', error)
throw error
}
}
/**
* 发送到特定平台
*/
async sendToPlatform(platform, type, data, retrySettings) {
try {
const handler = this.platformHandlers[platform.type]
if (!handler) {
throw new Error(`不支持的平台类型: ${platform.type}`)
}
// 使用平台特定的处理器
await this.retryWithBackoff(
() => handler(platform, type, data),
retrySettings?.maxRetries || 3,
retrySettings?.retryDelay || 1000
)
logger.info(`✅ 成功发送到 ${platform.name || platform.type}`)
} catch (error) {
logger.error(`❌ 发送到 ${platform.name || platform.type} 失败:`, error.message)
throw error
}
}
/**
* 企业微信webhook
*/
async sendToWechatWork(platform, type, data) {
const content = this.formatMessageForWechatWork(type, data)
const payload = {
msgtype: 'markdown',
markdown: {
content
}
}
await this.sendHttpRequest(platform.url, payload, platform.timeout || 10000)
}
/**
* 钉钉webhook
*/
async sendToDingTalk(platform, type, data) {
const content = this.formatMessageForDingTalk(type, data)
let { url } = platform
const payload = {
msgtype: 'markdown',
markdown: {
title: this.getNotificationTitle(type),
text: content
}
}
// 如果启用签名
if (platform.enableSign && platform.secret) {
const timestamp = Date.now()
const sign = this.generateDingTalkSign(platform.secret, timestamp)
url = `${url}&timestamp=${timestamp}&sign=${encodeURIComponent(sign)}`
}
await this.sendHttpRequest(url, payload, platform.timeout || 10000)
}
/**
* 飞书webhook
*/
async sendToFeishu(platform, type, data) {
const content = this.formatMessageForFeishu(type, data)
const payload = {
msg_type: 'interactive',
card: {
elements: [
{
tag: 'markdown',
content
}
],
header: {
title: {
tag: 'plain_text',
content: this.getNotificationTitle(type)
},
template: this.getFeishuCardColor(type)
}
}
}
// 如果启用签名
if (platform.enableSign && platform.secret) {
const timestamp = Math.floor(Date.now() / 1000)
const sign = this.generateFeishuSign(platform.secret, timestamp)
payload.timestamp = timestamp.toString()
payload.sign = sign
}
await this.sendHttpRequest(platform.url, payload, platform.timeout || 10000)
}
/**
* Slack webhook
*/
async sendToSlack(platform, type, data) {
const text = this.formatMessageForSlack(type, data)
const payload = {
text,
username: 'Claude Relay Service',
icon_emoji: this.getSlackEmoji(type)
}
await this.sendHttpRequest(platform.url, payload, platform.timeout || 10000)
}
/**
* Discord webhook
*/
async sendToDiscord(platform, type, data) {
const embed = this.formatMessageForDiscord(type, data)
const payload = {
username: 'Claude Relay Service',
embeds: [embed]
}
await this.sendHttpRequest(platform.url, payload, platform.timeout || 10000)
}
/**
* 自定义webhook
*/
async sendToCustom(platform, type, data) {
// 使用通用格式
const payload = {
type,
service: 'claude-relay-service',
timestamp: new Date().toISOString(),
data
}
await this.sendHttpRequest(platform.url, payload, platform.timeout || 10000)
}
/**
* 发送HTTP请求
*/
async sendHttpRequest(url, payload, timeout) {
const response = await axios.post(url, payload, {
timeout,
headers: {
'Content-Type': 'application/json',
'User-Agent': 'claude-relay-service/2.0'
}
})
if (response.status < 200 || response.status >= 300) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`)
}
return response.data
}
/**
* 重试机制
*/
async retryWithBackoff(fn, maxRetries, baseDelay) {
let lastError
for (let i = 0; i < maxRetries; i++) {
try {
return await fn()
} catch (error) {
lastError = error
if (i < maxRetries - 1) {
const delay = baseDelay * Math.pow(2, i) // 指数退避
logger.debug(`🔄 重试 ${i + 1}/${maxRetries},等待 ${delay}ms`)
await new Promise((resolve) => setTimeout(resolve, delay))
}
}
}
throw lastError
}
/**
* 生成钉钉签名
*/
generateDingTalkSign(secret, timestamp) {
const stringToSign = `${timestamp}\n${secret}`
const hmac = crypto.createHmac('sha256', secret)
hmac.update(stringToSign)
return hmac.digest('base64')
}
/**
* 生成飞书签名
*/
generateFeishuSign(secret, timestamp) {
const stringToSign = `${timestamp}\n${secret}`
const hmac = crypto.createHmac('sha256', stringToSign)
hmac.update('')
return hmac.digest('base64')
}
/**
* 格式化企业微信消息
*/
formatMessageForWechatWork(type, data) {
const title = this.getNotificationTitle(type)
const details = this.formatNotificationDetails(data)
return (
`## ${title}\n\n` +
`> **服务**: Claude Relay Service\n` +
`> **时间**: ${new Date().toLocaleString('zh-CN')}\n\n${details}`
)
}
/**
* 格式化钉钉消息
*/
formatMessageForDingTalk(type, data) {
const details = this.formatNotificationDetails(data)
return (
`#### 服务: Claude Relay Service\n` +
`#### 时间: ${new Date().toLocaleString('zh-CN')}\n\n${details}`
)
}
/**
* 格式化飞书消息
*/
formatMessageForFeishu(type, data) {
return this.formatNotificationDetails(data)
}
/**
* 格式化Slack消息
*/
formatMessageForSlack(type, data) {
const title = this.getNotificationTitle(type)
const details = this.formatNotificationDetails(data)
return `*${title}*\n${details}`
}
/**
* 格式化Discord消息
*/
formatMessageForDiscord(type, data) {
const title = this.getNotificationTitle(type)
const color = this.getDiscordColor(type)
const fields = this.formatNotificationFields(data)
return {
title,
color,
fields,
timestamp: new Date().toISOString(),
footer: {
text: 'Claude Relay Service'
}
}
}
/**
* 获取通知标题
*/
getNotificationTitle(type) {
const titles = {
accountAnomaly: '⚠️ 账号异常通知',
quotaWarning: '📊 配额警告',
systemError: '❌ 系统错误',
securityAlert: '🔒 安全警报',
test: '🧪 测试通知'
}
return titles[type] || '📢 系统通知'
}
/**
* 格式化通知详情
*/
formatNotificationDetails(data) {
const lines = []
if (data.accountName) {
lines.push(`**账号**: ${data.accountName}`)
}
if (data.platform) {
lines.push(`**平台**: ${data.platform}`)
}
if (data.status) {
lines.push(`**状态**: ${data.status}`)
}
if (data.errorCode) {
lines.push(`**错误代码**: ${data.errorCode}`)
}
if (data.reason) {
lines.push(`**原因**: ${data.reason}`)
}
if (data.message) {
lines.push(`**消息**: ${data.message}`)
}
if (data.quota) {
lines.push(`**剩余配额**: ${data.quota.remaining}/${data.quota.total}`)
}
if (data.usage) {
lines.push(`**使用率**: ${data.usage}%`)
}
return lines.join('\n')
}
/**
* 格式化Discord字段
*/
formatNotificationFields(data) {
const fields = []
if (data.accountName) {
fields.push({ name: '账号', value: data.accountName, inline: true })
}
if (data.platform) {
fields.push({ name: '平台', value: data.platform, inline: true })
}
if (data.status) {
fields.push({ name: '状态', value: data.status, inline: true })
}
if (data.errorCode) {
fields.push({ name: '错误代码', value: data.errorCode, inline: false })
}
if (data.reason) {
fields.push({ name: '原因', value: data.reason, inline: false })
}
if (data.message) {
fields.push({ name: '消息', value: data.message, inline: false })
}
return fields
}
/**
* 获取飞书卡片颜色
*/
getFeishuCardColor(type) {
const colors = {
accountAnomaly: 'orange',
quotaWarning: 'yellow',
systemError: 'red',
securityAlert: 'red',
test: 'blue'
}
return colors[type] || 'blue'
}
/**
* 获取Slack emoji
*/
getSlackEmoji(type) {
const emojis = {
accountAnomaly: ':warning:',
quotaWarning: ':chart_with_downwards_trend:',
systemError: ':x:',
securityAlert: ':lock:',
test: ':test_tube:'
}
return emojis[type] || ':bell:'
}
/**
* 获取Discord颜色
*/
getDiscordColor(type) {
const colors = {
accountAnomaly: 0xff9800, // 橙色
quotaWarning: 0xffeb3b, // 黄色
systemError: 0xf44336, // 红色
securityAlert: 0xf44336, // 红色
test: 0x2196f3 // 蓝色
}
return colors[type] || 0x9e9e9e // 灰色
}
/**
* 测试webhook连接
*/
async testWebhook(platform) {
try {
const testData = {
message: 'Claude Relay Service webhook测试',
timestamp: new Date().toISOString()
}
await this.sendToPlatform(platform, 'test', testData, { maxRetries: 1, retryDelay: 1000 })
return { success: true }
} catch (error) {
return {
success: false,
error: error.message
}
}
}
}
module.exports = new WebhookService()