feat: 新增 OpenAI-Responses 账户管理功能和独立自动停止标记机制

## 功能新增
- 实现 OpenAI-Responses 账户服务(openaiResponsesAccountService.js)
  - 支持使用账户内置 API Key 进行请求转发
  - 实现每日额度管理和重置机制
  - 支持代理配置和优先级设置
- 实现 OpenAI-Responses 中继服务(openaiResponsesRelayService.js)
  - 处理请求转发和响应流处理
  - 自动记录使用统计信息
  - 支持流式和非流式响应
- 新增管理界面的 OpenAI-Responses 账户管理功能
  - 完整的 CRUD 操作支持
  - 实时额度监控和状态管理
  - 支持手动重置限流和每日额度

## 架构改进
- 引入独立的自动停止标记机制,区分不同原因的自动停止
  - rateLimitAutoStopped: 限流自动停止
  - fiveHourAutoStopped: 5小时限制自动停止
  - tempErrorAutoStopped: 临时错误自动停止
  - quotaAutoStopped: 额度耗尽自动停止
- 修复手动修改调度状态时自动恢复的问题
- 统一清理逻辑,防止状态冲突

## 其他优化
- getAccountUsageStats 支持不同账户类型参数
- 统一调度器支持 OpenAI-Responses 账户类型
- WebHook 通知增强,支持新账户类型的事件

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
shaw
2025-09-10 15:41:52 +08:00
parent 1c3b74f45b
commit 08946c67ea
17 changed files with 3061 additions and 238 deletions

View File

@@ -780,7 +780,7 @@ class RedisClient {
}
// 📊 获取账户使用统计
async getAccountUsageStats(accountId) {
async getAccountUsageStats(accountId, accountType = null) {
const accountKey = `account_usage:${accountId}`
const today = getDateStringInTimezone()
const accountDailyKey = `account_usage:daily:${accountId}:${today}`
@@ -794,8 +794,25 @@ class RedisClient {
this.client.hgetall(accountMonthlyKey)
])
// 获取账户创建时间来计算平均值
const accountData = await this.client.hgetall(`claude_account:${accountId}`)
// 获取账户创建时间来计算平均值 - 支持不同类型的账号
let accountData = {}
if (accountType === 'openai') {
accountData = await this.client.hgetall(`openai:account:${accountId}`)
} else if (accountType === 'openai-responses') {
accountData = await this.client.hgetall(`openai_responses_account:${accountId}`)
} else {
// 尝试多个前缀
accountData = await this.client.hgetall(`claude_account:${accountId}`)
if (!accountData.createdAt) {
accountData = await this.client.hgetall(`openai:account:${accountId}`)
}
if (!accountData.createdAt) {
accountData = await this.client.hgetall(`openai_responses_account:${accountId}`)
}
if (!accountData.createdAt) {
accountData = await this.client.hgetall(`openai_account:${accountId}`)
}
}
const createdAt = accountData.createdAt ? new Date(accountData.createdAt) : new Date()
const now = new Date()
const daysSinceCreated = Math.max(1, Math.ceil((now - createdAt) / (1000 * 60 * 60 * 24)))

View File

@@ -5,6 +5,7 @@ const claudeConsoleAccountService = require('../services/claudeConsoleAccountSer
const bedrockAccountService = require('../services/bedrockAccountService')
const geminiAccountService = require('../services/geminiAccountService')
const openaiAccountService = require('../services/openaiAccountService')
const openaiResponsesAccountService = require('../services/openaiResponsesAccountService')
const azureOpenaiAccountService = require('../services/azureOpenaiAccountService')
const accountGroupService = require('../services/accountGroupService')
const redis = require('../models/redis')
@@ -1946,7 +1947,7 @@ router.get('/claude-accounts', authenticateAdmin, async (req, res) => {
const accountsWithStats = await Promise.all(
accounts.map(async (account) => {
try {
const usageStats = await redis.getAccountUsageStats(account.id)
const usageStats = await redis.getAccountUsageStats(account.id, 'openai')
const groupInfos = await accountGroupService.getAccountGroups(account.id)
// 获取会话窗口使用统计(仅对有活跃窗口的账户)
@@ -2381,7 +2382,7 @@ router.get('/claude-console-accounts', authenticateAdmin, async (req, res) => {
const accountsWithStats = await Promise.all(
accounts.map(async (account) => {
try {
const usageStats = await redis.getAccountUsageStats(account.id)
const usageStats = await redis.getAccountUsageStats(account.id, 'openai')
const groupInfos = await accountGroupService.getAccountGroups(account.id)
return {
@@ -2784,7 +2785,7 @@ router.get('/bedrock-accounts', authenticateAdmin, async (req, res) => {
const accountsWithStats = await Promise.all(
accounts.map(async (account) => {
try {
const usageStats = await redis.getAccountUsageStats(account.id)
const usageStats = await redis.getAccountUsageStats(account.id, 'openai')
const groupInfos = await accountGroupService.getAccountGroups(account.id)
return {
@@ -3234,7 +3235,7 @@ router.get('/gemini-accounts', authenticateAdmin, async (req, res) => {
const accountsWithStats = await Promise.all(
accounts.map(async (account) => {
try {
const usageStats = await redis.getAccountUsageStats(account.id)
const usageStats = await redis.getAccountUsageStats(account.id, 'openai')
const groupInfos = await accountGroupService.getAccountGroups(account.id)
return {
@@ -5762,7 +5763,7 @@ router.get('/openai-accounts', authenticateAdmin, async (req, res) => {
const accountsWithStats = await Promise.all(
accounts.map(async (account) => {
try {
const usageStats = await redis.getAccountUsageStats(account.id)
const usageStats = await redis.getAccountUsageStats(account.id, 'openai')
return {
...account,
usage: {
@@ -6309,7 +6310,7 @@ router.get('/azure-openai-accounts', authenticateAdmin, async (req, res) => {
const accountsWithStats = await Promise.all(
accounts.map(async (account) => {
try {
const usageStats = await redis.getAccountUsageStats(account.id)
const usageStats = await redis.getAccountUsageStats(account.id, 'openai')
const groupInfos = await accountGroupService.getAccountGroups(account.id)
return {
...account,
@@ -6709,4 +6710,334 @@ router.post('/claude-code-version/clear', authenticateAdmin, async (req, res) =>
}
})
// ==================== OpenAI-Responses 账户管理 API ====================
// 获取所有 OpenAI-Responses 账户
router.get('/openai-responses-accounts', authenticateAdmin, async (req, res) => {
try {
const { platform, groupId } = req.query
let accounts = await openaiResponsesAccountService.getAllAccounts(true)
// 根据查询参数进行筛选
if (platform && platform !== 'openai-responses') {
accounts = []
}
// 根据分组ID筛选
if (groupId) {
const group = await accountGroupService.getGroup(groupId)
if (group && group.platform === 'openai' && group.memberIds && group.memberIds.length > 0) {
accounts = accounts.filter((account) => group.memberIds.includes(account.id))
} else {
accounts = []
}
}
// 处理额度信息、使用统计和绑定的 API Key 数量
const accountsWithStats = await Promise.all(
accounts.map(async (account) => {
try {
// 检查是否需要重置额度
const today = redis.getDateStringInTimezone()
if (account.lastResetDate !== today) {
// 今天还没重置过,需要重置
await openaiResponsesAccountService.updateAccount(account.id, {
dailyUsage: '0',
lastResetDate: today,
quotaStoppedAt: ''
})
account.dailyUsage = '0'
account.lastResetDate = today
account.quotaStoppedAt = ''
}
// 检查并清除过期的限流状态
await openaiResponsesAccountService.checkAndClearRateLimit(account.id)
// 获取使用统计信息
let usageStats
try {
usageStats = await redis.getAccountUsageStats(account.id, 'openai-responses')
} catch (error) {
logger.debug(
`Failed to get usage stats for OpenAI-Responses account ${account.id}:`,
error
)
usageStats = {
daily: { requests: 0, tokens: 0, allTokens: 0 },
total: { requests: 0, tokens: 0, allTokens: 0 },
monthly: { requests: 0, tokens: 0, allTokens: 0 }
}
}
// 计算绑定的API Key数量支持 responses: 前缀)
const allKeys = await redis.getAllApiKeys()
let boundCount = 0
for (const key of allKeys) {
// 检查是否绑定了该账户(包括 responses: 前缀)
if (
key.openaiAccountId === account.id ||
key.openaiAccountId === `responses:${account.id}`
) {
boundCount++
}
}
// 调试日志:检查绑定计数
if (boundCount > 0) {
logger.info(`OpenAI-Responses account ${account.id} has ${boundCount} bound API keys`)
}
return {
...account,
boundApiKeysCount: boundCount,
usage: {
daily: usageStats.daily,
total: usageStats.total,
monthly: usageStats.monthly
}
}
} catch (error) {
logger.error(`Failed to process OpenAI-Responses account ${account.id}:`, error)
return {
...account,
boundApiKeysCount: 0,
usage: {
daily: { requests: 0, tokens: 0, allTokens: 0 },
total: { requests: 0, tokens: 0, allTokens: 0 },
monthly: { requests: 0, tokens: 0, allTokens: 0 }
}
}
}
})
)
res.json({ success: true, data: accountsWithStats })
} catch (error) {
logger.error('Failed to get OpenAI-Responses accounts:', error)
res.status(500).json({ success: false, message: error.message })
}
})
// 创建 OpenAI-Responses 账户
router.post('/openai-responses-accounts', authenticateAdmin, async (req, res) => {
try {
const account = await openaiResponsesAccountService.createAccount(req.body)
res.json({ success: true, account })
} catch (error) {
logger.error('Failed to create OpenAI-Responses account:', error)
res.status(500).json({
success: false,
error: error.message
})
}
})
// 更新 OpenAI-Responses 账户
router.put('/openai-responses-accounts/:id', authenticateAdmin, async (req, res) => {
try {
const { id } = req.params
const updates = req.body
// 验证priority的有效性1-100
if (updates.priority !== undefined) {
const priority = parseInt(updates.priority)
if (isNaN(priority) || priority < 1 || priority > 100) {
return res.status(400).json({
success: false,
message: 'Priority must be a number between 1 and 100'
})
}
updates.priority = priority.toString()
}
const result = await openaiResponsesAccountService.updateAccount(id, updates)
if (!result.success) {
return res.status(400).json(result)
}
res.json({ success: true, ...result })
} catch (error) {
logger.error('Failed to update OpenAI-Responses account:', error)
res.status(500).json({
success: false,
error: error.message
})
}
})
// 删除 OpenAI-Responses 账户
router.delete('/openai-responses-accounts/:id', authenticateAdmin, async (req, res) => {
try {
const { id } = req.params
const account = await openaiResponsesAccountService.getAccount(id)
if (!account) {
return res.status(404).json({
success: false,
message: 'Account not found'
})
}
// 检查是否在分组中
const groups = await accountGroupService.getAllGroups()
for (const group of groups) {
if (group.platform === 'openai' && group.memberIds && group.memberIds.includes(id)) {
await accountGroupService.removeMemberFromGroup(group.id, id)
logger.info(`Removed OpenAI-Responses account ${id} from group ${group.id}`)
}
}
const result = await openaiResponsesAccountService.deleteAccount(id)
res.json({ success: true, ...result })
} catch (error) {
logger.error('Failed to delete OpenAI-Responses account:', error)
res.status(500).json({
success: false,
error: error.message
})
}
})
// 切换 OpenAI-Responses 账户调度状态
router.put(
'/openai-responses-accounts/:id/toggle-schedulable',
authenticateAdmin,
async (req, res) => {
try {
const { id } = req.params
const result = await openaiResponsesAccountService.toggleSchedulable(id)
if (!result.success) {
return res.status(400).json(result)
}
// 仅在停止调度时发送通知
if (!result.schedulable) {
await webhookNotifier.sendAccountEvent('account.status_changed', {
accountId: id,
platform: 'openai-responses',
schedulable: result.schedulable,
changedBy: 'admin',
action: 'stopped_scheduling'
})
}
res.json(result)
} catch (error) {
logger.error('Failed to toggle OpenAI-Responses account schedulable status:', error)
res.status(500).json({
success: false,
error: error.message
})
}
}
)
// 切换 OpenAI-Responses 账户激活状态
router.put('/openai-responses-accounts/:id/toggle', authenticateAdmin, async (req, res) => {
try {
const { id } = req.params
const account = await openaiResponsesAccountService.getAccount(id)
if (!account) {
return res.status(404).json({
success: false,
message: 'Account not found'
})
}
const newActiveStatus = account.isActive === 'true' ? 'false' : 'true'
await openaiResponsesAccountService.updateAccount(id, {
isActive: newActiveStatus
})
res.json({
success: true,
isActive: newActiveStatus === 'true'
})
} catch (error) {
logger.error('Failed to toggle OpenAI-Responses account status:', error)
res.status(500).json({
success: false,
error: error.message
})
}
})
// 重置 OpenAI-Responses 账户限流状态
router.post(
'/openai-responses-accounts/:id/reset-rate-limit',
authenticateAdmin,
async (req, res) => {
try {
const { id } = req.params
await openaiResponsesAccountService.updateAccount(id, {
rateLimitedAt: '',
rateLimitStatus: '',
status: 'active',
errorMessage: ''
})
logger.info(`🔄 Admin manually reset rate limit for OpenAI-Responses account ${id}`)
res.json({
success: true,
message: 'Rate limit reset successfully'
})
} catch (error) {
logger.error('Failed to reset OpenAI-Responses account rate limit:', error)
res.status(500).json({
success: false,
error: error.message
})
}
}
)
// 重置 OpenAI-Responses 账户状态(清除所有异常状态)
router.post('/openai-responses-accounts/:id/reset-status', authenticateAdmin, async (req, res) => {
try {
const { id } = req.params
const result = await openaiResponsesAccountService.resetAccountStatus(id)
logger.success(`✅ Admin reset status for OpenAI-Responses account: ${id}`)
return res.json({ success: true, data: result })
} catch (error) {
logger.error('❌ Failed to reset OpenAI-Responses account status:', error)
return res.status(500).json({ error: 'Failed to reset status', message: error.message })
}
})
// 手动重置 OpenAI-Responses 账户的每日使用量
router.post('/openai-responses-accounts/:id/reset-usage', authenticateAdmin, async (req, res) => {
try {
const { id } = req.params
await openaiResponsesAccountService.updateAccount(id, {
dailyUsage: '0',
lastResetDate: redis.getDateStringInTimezone(),
quotaStoppedAt: ''
})
logger.success(`✅ Admin manually reset daily usage for OpenAI-Responses account ${id}`)
res.json({
success: true,
message: 'Daily usage reset successfully'
})
} catch (error) {
logger.error('Failed to reset OpenAI-Responses account usage:', error)
res.status(500).json({
success: false,
error: error.message
})
}
})
module.exports = router

View File

@@ -6,6 +6,8 @@ const config = require('../../config/config')
const { authenticateApiKey } = require('../middleware/auth')
const unifiedOpenAIScheduler = require('../services/unifiedOpenAIScheduler')
const openaiAccountService = require('../services/openaiAccountService')
const openaiResponsesAccountService = require('../services/openaiResponsesAccountService')
const openaiResponsesRelayService = require('../services/openaiResponsesRelayService')
const apiKeyService = require('../services/apiKeyService')
const crypto = require('crypto')
const ProxyHelper = require('../utils/proxyHelper')
@@ -34,51 +36,81 @@ async function getOpenAIAuthToken(apiKeyData, sessionId = null, requestedModel =
throw new Error('No available OpenAI account found')
}
// 获取账户详情
let account = await openaiAccountService.getAccount(result.accountId)
if (!account || !account.accessToken) {
throw new Error(`OpenAI account ${result.accountId} has no valid accessToken`)
}
// 根据账户类型获取账户详情
let account,
accessToken,
proxy = null
// 检查 token 是否过期并自动刷新(双重保护)
if (openaiAccountService.isTokenExpired(account)) {
if (account.refreshToken) {
logger.info(`🔄 Token expired, auto-refreshing for account ${account.name} (fallback)`)
if (result.accountType === 'openai-responses') {
// 处理 OpenAI-Responses 账户
account = await openaiResponsesAccountService.getAccount(result.accountId)
if (!account || !account.apiKey) {
throw new Error(`OpenAI-Responses account ${result.accountId} has no valid apiKey`)
}
// OpenAI-Responses 账户不需要 accessToken直接返回账户信息
accessToken = null // OpenAI-Responses 使用账户内的 apiKey
// 解析代理配置
if (account.proxy) {
try {
await openaiAccountService.refreshAccountToken(result.accountId)
// 重新获取更新后的账户
account = await openaiAccountService.getAccount(result.accountId)
logger.info(`✅ Token refreshed successfully in route handler`)
} catch (refreshError) {
logger.error(`Failed to refresh token for ${account.name}:`, refreshError)
throw new Error(`Token expired and refresh failed: ${refreshError.message}`)
proxy = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy
} catch (e) {
logger.warn('Failed to parse proxy configuration:', e)
}
} else {
throw new Error(`Token expired and no refresh token available for account ${account.name}`)
}
}
// 解密 accessTokenaccount.accessToken 是加密的)
const accessToken = openaiAccountService.decrypt(account.accessToken)
if (!accessToken) {
throw new Error('Failed to decrypt OpenAI accessToken')
}
// 解析代理配置
let proxy = null
if (account.proxy) {
try {
proxy = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy
} catch (e) {
logger.warn('Failed to parse proxy configuration:', e)
logger.info(`Selected OpenAI-Responses account: ${account.name} (${result.accountId})`)
} else {
// 处理普通 OpenAI 账户
account = await openaiAccountService.getAccount(result.accountId)
if (!account || !account.accessToken) {
throw new Error(`OpenAI account ${result.accountId} has no valid accessToken`)
}
// 检查 token 是否过期并自动刷新(双重保护)
if (openaiAccountService.isTokenExpired(account)) {
if (account.refreshToken) {
logger.info(`🔄 Token expired, auto-refreshing for account ${account.name} (fallback)`)
try {
await openaiAccountService.refreshAccountToken(result.accountId)
// 重新获取更新后的账户
account = await openaiAccountService.getAccount(result.accountId)
logger.info(`✅ Token refreshed successfully in route handler`)
} catch (refreshError) {
logger.error(`Failed to refresh token for ${account.name}:`, refreshError)
throw new Error(`Token expired and refresh failed: ${refreshError.message}`)
}
} else {
throw new Error(
`Token expired and no refresh token available for account ${account.name}`
)
}
}
// 解密 accessTokenaccount.accessToken 是加密的)
accessToken = openaiAccountService.decrypt(account.accessToken)
if (!accessToken) {
throw new Error('Failed to decrypt OpenAI accessToken')
}
// 解析代理配置
if (account.proxy) {
try {
proxy = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy
} catch (e) {
logger.warn('Failed to parse proxy configuration:', e)
}
}
logger.info(`Selected OpenAI account: ${account.name} (${result.accountId})`)
}
logger.info(`Selected OpenAI account: ${account.name} (${result.accountId})`)
return {
accessToken,
accountId: result.accountId,
accountName: account.name,
accountType: result.accountType,
proxy,
account
}
@@ -151,9 +183,16 @@ const handleResponses = async (req, res) => {
accessToken,
accountId,
accountName: _accountName,
accountType,
proxy,
account
} = await getOpenAIAuthToken(apiKeyData, sessionId, requestedModel)
// 如果是 OpenAI-Responses 账户,使用专门的中继服务处理
if (accountType === 'openai-responses') {
logger.info(`🔀 Using OpenAI-Responses relay service for account: ${account.name}`)
return await openaiResponsesRelayService.handleRequest(req, res, account, apiKeyData)
}
// 基于白名单构造上游所需的请求头,确保键为小写且值受控
const incoming = req.headers || {}

View File

@@ -603,6 +603,25 @@ class ClaudeAccountService {
updatedData.updatedAt = new Date().toISOString()
// 如果是手动修改调度状态,清除所有自动停止相关的字段
if (Object.prototype.hasOwnProperty.call(updates, 'schedulable')) {
// 清除所有自动停止的标记,防止自动恢复
delete updatedData.rateLimitAutoStopped
delete updatedData.fiveHourAutoStopped
delete updatedData.fiveHourStoppedAt
delete updatedData.tempErrorAutoStopped
// 兼容旧的标记(逐步迁移)
delete updatedData.autoStoppedAt
delete updatedData.stoppedReason
// 如果是手动启用调度,记录日志
if (updates.schedulable === true || updates.schedulable === 'true') {
logger.info(`✅ Manually enabled scheduling for account ${accountId}`)
} else {
logger.info(`⛔ Manually disabled scheduling for account ${accountId}`)
}
}
// 检查是否手动禁用了账号如果是则发送webhook通知
if (updates.isActive === 'false' && accountData.isActive === 'true') {
try {
@@ -1088,7 +1107,9 @@ class ClaudeAccountService {
updatedAccountData.rateLimitedAt = new Date().toISOString()
updatedAccountData.rateLimitStatus = 'limited'
// 限流时停止调度,与 OpenAI 账号保持一致
updatedAccountData.schedulable = false
updatedAccountData.schedulable = 'false'
// 使用独立的限流自动停止标记,避免与其他自动停止冲突
updatedAccountData.rateLimitAutoStopped = 'true'
// 如果提供了准确的限流重置时间戳来自API响应头
if (rateLimitResetTimestamp) {
@@ -1173,13 +1194,16 @@ class ClaudeAccountService {
delete accountData.rateLimitedAt
delete accountData.rateLimitStatus
delete accountData.rateLimitEndAt // 清除限流结束时间
// 恢复可调度状态,与 OpenAI 账号保持一致
accountData.schedulable = true
// 只恢复因限流而自动停止的账户
if (accountData.rateLimitAutoStopped === 'true' && accountData.schedulable === 'false') {
accountData.schedulable = 'true'
delete accountData.rateLimitAutoStopped
logger.info(`✅ Auto-resuming scheduling for account ${accountId} after rate limit cleared`)
}
await redis.setClaudeAccount(accountId, accountData)
logger.success(
`✅ Rate limit removed for account: ${accountData.name} (${accountId}), schedulable restored`
)
logger.success(`✅ Rate limit removed for account: ${accountData.name} (${accountId})`)
return { success: true }
} catch (error) {
@@ -1331,17 +1355,13 @@ class ClaudeAccountService {
}
// 如果账户因为5小时限制被自动停止现在恢复调度
if (
accountData.autoStoppedAt &&
accountData.schedulable === 'false' &&
accountData.stoppedReason === '5小时使用量接近限制自动停止调度'
) {
if (accountData.fiveHourAutoStopped === 'true' && accountData.schedulable === 'false') {
logger.info(
`✅ Auto-resuming scheduling for account ${accountData.name} (${accountId}) - new session window started`
)
accountData.schedulable = 'true'
delete accountData.stoppedReason
delete accountData.autoStoppedAt
delete accountData.fiveHourAutoStopped
delete accountData.fiveHourStoppedAt
// 发送Webhook通知
try {
@@ -1823,8 +1843,16 @@ class ClaudeAccountService {
updatedAccountData.status = 'created'
}
// 恢复可调度状态
// 恢复可调度状态(管理员手动重置时恢复调度是合理的)
updatedAccountData.schedulable = 'true'
// 清除所有自动停止相关的标记
delete updatedAccountData.rateLimitAutoStopped
delete updatedAccountData.fiveHourAutoStopped
delete updatedAccountData.fiveHourStoppedAt
delete updatedAccountData.tempErrorAutoStopped
// 兼容旧的标记
delete updatedAccountData.autoStoppedAt
delete updatedAccountData.stoppedReason
// 清除错误相关字段
delete updatedAccountData.errorMessage
@@ -1850,7 +1878,15 @@ class ClaudeAccountService {
'rateLimitEndAt',
'tempErrorAt',
'sessionWindowStart',
'sessionWindowEnd'
'sessionWindowEnd',
// 新的独立标记
'rateLimitAutoStopped',
'fiveHourAutoStopped',
'fiveHourStoppedAt',
'tempErrorAutoStopped',
// 兼容旧的标记
'autoStoppedAt',
'stoppedReason'
]
await redis.client.hdel(`claude:account:${accountId}`, ...fieldsToDelete)
@@ -1901,13 +1937,22 @@ class ClaudeAccountService {
// 如果临时错误状态超过指定时间,尝试重新激活
if (minutesSinceTempError > TEMP_ERROR_RECOVERY_MINUTES) {
account.status = 'active' // 恢复为 active 状态
account.schedulable = 'true' // 恢复为可调度
// 恢复因临时错误而自动停止的账户
if (account.tempErrorAutoStopped === 'true') {
account.schedulable = 'true' // 恢复为可调度
delete account.tempErrorAutoStopped
}
delete account.errorMessage
delete account.tempErrorAt
await redis.setClaudeAccount(account.id, account)
// 显式从 Redis 中删除这些字段(因为 HSET 不会删除现有字段)
await redis.client.hdel(`claude:account:${account.id}`, 'errorMessage', 'tempErrorAt')
await redis.client.hdel(
`claude:account:${account.id}`,
'errorMessage',
'tempErrorAt',
'tempErrorAutoStopped'
)
// 同时清除500错误计数
await this.clearInternalErrors(account.id)
@@ -1992,6 +2037,8 @@ class ClaudeAccountService {
updatedAccountData.schedulable = 'false' // 设置为不可调度
updatedAccountData.errorMessage = 'Account temporarily disabled due to consecutive 500 errors'
updatedAccountData.tempErrorAt = new Date().toISOString()
// 使用独立的临时错误自动停止标记
updatedAccountData.tempErrorAutoStopped = 'true'
// 保存更新后的账户数据
await redis.setClaudeAccount(accountId, updatedAccountData)
@@ -2010,7 +2057,11 @@ class ClaudeAccountService {
if (minutesSince >= 5) {
// 恢复账户
account.status = 'active'
account.schedulable = 'true'
// 只恢复因临时错误而自动停止的账户
if (account.tempErrorAutoStopped === 'true') {
account.schedulable = 'true'
delete account.tempErrorAutoStopped
}
delete account.errorMessage
delete account.tempErrorAt
@@ -2020,7 +2071,8 @@ class ClaudeAccountService {
await redis.client.hdel(
`claude:account:${accountId}`,
'errorMessage',
'tempErrorAt'
'tempErrorAt',
'tempErrorAutoStopped'
)
// 清除 500 错误计数
@@ -2108,8 +2160,9 @@ class ClaudeAccountService {
`⚠️ Account ${accountData.name} (${accountId}) approaching 5h limit, auto-stopping scheduling`
)
accountData.schedulable = 'false'
accountData.stoppedReason = '5小时使用量接近限制自动停止调度'
accountData.autoStoppedAt = new Date().toISOString()
// 使用独立的5小时限制自动停止标记
accountData.fiveHourAutoStopped = 'true'
accountData.fiveHourStoppedAt = new Date().toISOString()
// 发送Webhook通知
try {

View File

@@ -285,6 +285,20 @@ class ClaudeConsoleAccountService {
}
if (updates.schedulable !== undefined) {
updatedData.schedulable = updates.schedulable.toString()
// 如果是手动修改调度状态,清除所有自动停止相关的字段
// 防止自动恢复
updatedData.rateLimitAutoStopped = ''
updatedData.quotaAutoStopped = ''
// 兼容旧的标记
updatedData.autoStoppedAt = ''
updatedData.stoppedReason = ''
// 记录日志
if (updates.schedulable === true || updates.schedulable === 'true') {
logger.info(`✅ Manually enabled scheduling for Claude Console account ${accountId}`)
} else {
logger.info(`⛔ Manually disabled scheduling for Claude Console account ${accountId}`)
}
}
// 额度管理相关字段
@@ -401,7 +415,9 @@ class ClaudeConsoleAccountService {
rateLimitStatus: 'limited',
isActive: 'false', // 禁用账户
schedulable: 'false', // 停止调度,与其他平台保持一致
errorMessage: `Rate limited at ${new Date().toISOString()}`
errorMessage: `Rate limited at ${new Date().toISOString()}`,
// 使用独立的限流自动停止标记
rateLimitAutoStopped: 'true'
}
// 只有当前状态不是quota_exceeded时才设置为rate_limited
@@ -467,12 +483,24 @@ class ClaudeConsoleAccountService {
logger.info(`⚠️ Rate limit removed but quota exceeded remains for account: ${accountId}`)
} else {
// 没有额度限制,完全恢复
await client.hset(accountKey, {
const accountData = await client.hgetall(accountKey)
const updateData = {
isActive: 'true',
schedulable: 'true', // 恢复调度,与其他平台保持一致
status: 'active',
errorMessage: ''
})
}
// 只恢复因限流而自动停止的账户
if (accountData.rateLimitAutoStopped === 'true' && accountData.schedulable === 'false') {
updateData.schedulable = 'true' // 恢复调度
// 删除限流自动停止标记
await client.hdel(accountKey, 'rateLimitAutoStopped')
logger.info(
`✅ Auto-resuming scheduling for Claude Console account ${accountId} after rate limit cleared`
)
}
await client.hset(accountKey, updateData)
logger.success(`✅ Rate limit removed and account re-enabled: ${accountId}`)
}
} else {
@@ -995,7 +1023,10 @@ class ClaudeConsoleAccountService {
const updates = {
isActive: false,
quotaStoppedAt: new Date().toISOString(),
errorMessage: `Daily quota exceeded: $${currentDailyCost.toFixed(2)} / $${dailyQuota.toFixed(2)}`
errorMessage: `Daily quota exceeded: $${currentDailyCost.toFixed(2)} / $${dailyQuota.toFixed(2)}`,
schedulable: false, // 停止调度
// 使用独立的额度超限自动停止标记
quotaAutoStopped: 'true'
}
// 只有当前状态是active时才改为quota_exceeded
@@ -1060,11 +1091,17 @@ class ClaudeConsoleAccountService {
updates.errorMessage = ''
updates.quotaStoppedAt = ''
// 只恢复因额度超限而自动停止的账户
if (accountData.quotaAutoStopped === 'true') {
updates.schedulable = true
updates.quotaAutoStopped = ''
}
// 如果是rate_limited状态也清除限流相关字段
if (accountData.status === 'rate_limited') {
const client = redis.getClientSafe()
const accountKey = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
await client.hdel(accountKey, 'rateLimitedAt', 'rateLimitStatus')
await client.hdel(accountKey, 'rateLimitedAt', 'rateLimitStatus', 'rateLimitAutoStopped')
}
logger.info(

View File

@@ -0,0 +1,574 @@
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const config = require('../../config/config')
const LRUCache = require('../utils/lruCache')
class OpenAIResponsesAccountService {
constructor() {
// 加密相关常量
this.ENCRYPTION_ALGORITHM = 'aes-256-cbc'
this.ENCRYPTION_SALT = 'openai-responses-salt'
// Redis 键前缀
this.ACCOUNT_KEY_PREFIX = 'openai_responses_account:'
this.SHARED_ACCOUNTS_KEY = 'shared_openai_responses_accounts'
// 🚀 性能优化:缓存派生的加密密钥,避免每次重复计算
this._encryptionKeyCache = null
// 🔄 解密结果缓存,提高解密性能
this._decryptCache = new LRUCache(500)
// 🧹 定期清理缓存每10分钟
setInterval(
() => {
this._decryptCache.cleanup()
logger.info(
'🧹 OpenAI-Responses decrypt cache cleanup completed',
this._decryptCache.getStats()
)
},
10 * 60 * 1000
)
}
// 创建账户
async createAccount(options = {}) {
const {
name = 'OpenAI Responses Account',
description = '',
baseApi = '', // 必填API 基础地址
apiKey = '', // 必填API 密钥
userAgent = '', // 可选:自定义 User-Agent空则透传原始请求
priority = 50, // 调度优先级 (1-100)
proxy = null,
isActive = true,
accountType = 'shared', // 'dedicated' or 'shared'
schedulable = true, // 是否可被调度
dailyQuota = 0, // 每日额度限制美元0表示不限制
quotaResetTime = '00:00', // 额度重置时间HH:mm格式
rateLimitDuration = 60 // 限流时间(分钟)
} = options
// 验证必填字段
if (!baseApi || !apiKey) {
throw new Error('Base API URL and API Key are required for OpenAI-Responses account')
}
// 规范化 baseApi确保不以 / 结尾)
const normalizedBaseApi = baseApi.endsWith('/') ? baseApi.slice(0, -1) : baseApi
const accountId = uuidv4()
const accountData = {
id: accountId,
platform: 'openai-responses',
name,
description,
baseApi: normalizedBaseApi,
apiKey: this._encryptSensitiveData(apiKey),
userAgent,
priority: priority.toString(),
proxy: proxy ? JSON.stringify(proxy) : '',
isActive: isActive.toString(),
accountType,
schedulable: schedulable.toString(),
createdAt: new Date().toISOString(),
lastUsedAt: '',
status: 'active',
errorMessage: '',
// 限流相关
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitDuration: rateLimitDuration.toString(),
// 额度管理
dailyQuota: dailyQuota.toString(),
dailyUsage: '0',
lastResetDate: redis.getDateStringInTimezone(),
quotaResetTime,
quotaStoppedAt: ''
}
// 保存到 Redis
await this._saveAccount(accountId, accountData)
logger.success(`🚀 Created OpenAI-Responses account: ${name} (${accountId})`)
return {
...accountData,
apiKey: '***' // 返回时隐藏敏感信息
}
}
// 获取账户
async getAccount(accountId) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
const accountData = await client.hgetall(key)
if (!accountData || !accountData.id) {
return null
}
// 解密敏感数据
accountData.apiKey = this._decryptSensitiveData(accountData.apiKey)
// 解析 JSON 字段
if (accountData.proxy) {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
return accountData
}
// 更新账户
async updateAccount(accountId, updates) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
// 处理敏感字段加密
if (updates.apiKey) {
updates.apiKey = this._encryptSensitiveData(updates.apiKey)
}
// 处理 JSON 字段
if (updates.proxy !== undefined) {
updates.proxy = updates.proxy ? JSON.stringify(updates.proxy) : ''
}
// 规范化 baseApi
if (updates.baseApi) {
updates.baseApi = updates.baseApi.endsWith('/')
? updates.baseApi.slice(0, -1)
: updates.baseApi
}
// 更新 Redis
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
await client.hset(key, updates)
logger.info(`📝 Updated OpenAI-Responses account: ${account.name}`)
return { success: true }
}
// 删除账户
async deleteAccount(accountId) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 从共享账户列表中移除
await client.srem(this.SHARED_ACCOUNTS_KEY, accountId)
// 删除账户数据
await client.del(key)
logger.info(`🗑️ Deleted OpenAI-Responses account: ${accountId}`)
return { success: true }
}
// 获取所有账户
async getAllAccounts(includeInactive = false) {
const client = redis.getClientSafe()
const accountIds = await client.smembers(this.SHARED_ACCOUNTS_KEY)
const accounts = []
for (const accountId of accountIds) {
const account = await this.getAccount(accountId)
if (account) {
// 过滤非活跃账户
if (includeInactive || account.isActive === 'true') {
// 隐藏敏感信息
account.apiKey = '***'
// 获取限流状态信息与普通OpenAI账号保持一致的格式
const rateLimitInfo = this._getRateLimitInfo(account)
// 格式化 rateLimitStatus 为对象(与普通 OpenAI 账号一致)
account.rateLimitStatus = rateLimitInfo.isRateLimited
? {
isRateLimited: true,
rateLimitedAt: account.rateLimitedAt || null,
minutesRemaining: rateLimitInfo.remainingMinutes || 0
}
: {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
// 转换 schedulable 字段为布尔值(前端需要布尔值来判断)
account.schedulable = account.schedulable !== 'false'
// 转换 isActive 字段为布尔值
account.isActive = account.isActive === 'true'
accounts.push(account)
}
}
}
// 直接从 Redis 获取所有账户(包括非共享账户)
const keys = await client.keys(`${this.ACCOUNT_KEY_PREFIX}*`)
for (const key of keys) {
const accountId = key.replace(this.ACCOUNT_KEY_PREFIX, '')
if (!accountIds.includes(accountId)) {
const accountData = await client.hgetall(key)
if (accountData && accountData.id) {
// 过滤非活跃账户
if (includeInactive || accountData.isActive === 'true') {
// 隐藏敏感信息
accountData.apiKey = '***'
// 解析 JSON 字段
if (accountData.proxy) {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
// 获取限流状态信息与普通OpenAI账号保持一致的格式
const rateLimitInfo = this._getRateLimitInfo(accountData)
// 格式化 rateLimitStatus 为对象(与普通 OpenAI 账号一致)
accountData.rateLimitStatus = rateLimitInfo.isRateLimited
? {
isRateLimited: true,
rateLimitedAt: accountData.rateLimitedAt || null,
minutesRemaining: rateLimitInfo.remainingMinutes || 0
}
: {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
// 转换 schedulable 字段为布尔值(前端需要布尔值来判断)
accountData.schedulable = accountData.schedulable !== 'false'
// 转换 isActive 字段为布尔值
accountData.isActive = accountData.isActive === 'true'
accounts.push(accountData)
}
}
}
}
return accounts
}
// 标记账户限流
async markAccountRateLimited(accountId, duration = null) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const rateLimitDuration = duration || parseInt(account.rateLimitDuration) || 60
const now = new Date()
const resetAt = new Date(now.getTime() + rateLimitDuration * 60000)
await this.updateAccount(accountId, {
rateLimitedAt: now.toISOString(),
rateLimitStatus: 'limited',
rateLimitResetAt: resetAt.toISOString(),
rateLimitDuration: rateLimitDuration.toString(),
status: 'rateLimited',
schedulable: 'false', // 防止被调度
errorMessage: `Rate limited until ${resetAt.toISOString()}`
})
logger.warn(
`⏳ Account ${account.name} marked as rate limited for ${rateLimitDuration} minutes (until ${resetAt.toISOString()})`
)
}
// 检查并清除过期的限流状态
async checkAndClearRateLimit(accountId) {
const account = await this.getAccount(accountId)
if (!account || account.rateLimitStatus !== 'limited') {
return false
}
const now = new Date()
let shouldClear = false
// 优先使用 rateLimitResetAt 字段
if (account.rateLimitResetAt) {
const resetAt = new Date(account.rateLimitResetAt)
shouldClear = now >= resetAt
} else {
// 如果没有 rateLimitResetAt使用旧的逻辑
const rateLimitedAt = new Date(account.rateLimitedAt)
const rateLimitDuration = parseInt(account.rateLimitDuration) || 60
shouldClear = now - rateLimitedAt > rateLimitDuration * 60000
}
if (shouldClear) {
// 限流已过期,清除状态
await this.updateAccount(accountId, {
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitResetAt: '',
status: 'active',
schedulable: 'true', // 恢复调度
errorMessage: ''
})
logger.info(`✅ Rate limit cleared for account ${account.name}`)
return true
}
return false
}
// 切换调度状态
async toggleSchedulable(accountId) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const newSchedulableStatus = account.schedulable === 'true' ? 'false' : 'true'
await this.updateAccount(accountId, {
schedulable: newSchedulableStatus
})
logger.info(
`🔄 Toggled schedulable status for account ${account.name}: ${newSchedulableStatus}`
)
return {
success: true,
schedulable: newSchedulableStatus === 'true'
}
}
// 更新使用额度
async updateUsageQuota(accountId, amount) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
// 检查是否需要重置额度
const today = redis.getDateStringInTimezone()
if (account.lastResetDate !== today) {
// 重置额度
await this.updateAccount(accountId, {
dailyUsage: amount.toString(),
lastResetDate: today,
quotaStoppedAt: ''
})
} else {
// 累加使用额度
const currentUsage = parseFloat(account.dailyUsage) || 0
const newUsage = currentUsage + amount
const dailyQuota = parseFloat(account.dailyQuota) || 0
const updates = {
dailyUsage: newUsage.toString()
}
// 检查是否超出额度
if (dailyQuota > 0 && newUsage >= dailyQuota) {
updates.status = 'quotaExceeded'
updates.quotaStoppedAt = new Date().toISOString()
updates.errorMessage = `Daily quota exceeded: $${newUsage.toFixed(2)} / $${dailyQuota.toFixed(2)}`
logger.warn(`💸 Account ${account.name} exceeded daily quota`)
}
await this.updateAccount(accountId, updates)
}
}
// 更新账户使用统计(记录 token 使用量)
async updateAccountUsage(accountId, tokens = 0) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const updates = {
lastUsedAt: new Date().toISOString()
}
// 如果有 tokens 参数且大于0同时更新使用统计
if (tokens > 0) {
const currentTokens = parseInt(account.totalUsedTokens) || 0
updates.totalUsedTokens = (currentTokens + tokens).toString()
}
await this.updateAccount(accountId, updates)
}
// 记录使用量(为了兼容性的别名)
async recordUsage(accountId, tokens = 0) {
return this.updateAccountUsage(accountId, tokens)
}
// 重置账户状态(清除所有异常状态)
async resetAccountStatus(accountId) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const updates = {
// 根据是否有有效的 apiKey 来设置 status
status: account.apiKey ? 'active' : 'created',
// 恢复可调度状态
schedulable: 'true',
// 清除错误相关字段
errorMessage: '',
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitResetAt: '',
rateLimitDuration: ''
}
await this.updateAccount(accountId, updates)
logger.info(`✅ Reset all error status for OpenAI-Responses account ${accountId}`)
// 发送 Webhook 通知
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || accountId,
platform: 'openai-responses',
status: 'recovered',
errorCode: 'STATUS_RESET',
reason: 'Account status manually reset',
timestamp: new Date().toISOString()
})
logger.info(
`📢 Webhook notification sent for OpenAI-Responses account ${account.name} status reset`
)
} catch (webhookError) {
logger.error('Failed to send status reset webhook notification:', webhookError)
}
return { success: true, message: 'Account status reset successfully' }
}
// 获取限流信息
_getRateLimitInfo(accountData) {
if (accountData.rateLimitStatus !== 'limited') {
return { isRateLimited: false }
}
const now = new Date()
let willBeAvailableAt
let remainingMinutes
// 优先使用 rateLimitResetAt 字段
if (accountData.rateLimitResetAt) {
willBeAvailableAt = new Date(accountData.rateLimitResetAt)
remainingMinutes = Math.max(0, Math.ceil((willBeAvailableAt - now) / 60000))
} else {
// 如果没有 rateLimitResetAt使用旧的逻辑
const rateLimitedAt = new Date(accountData.rateLimitedAt)
const rateLimitDuration = parseInt(accountData.rateLimitDuration) || 60
const elapsedMinutes = Math.floor((now - rateLimitedAt) / 60000)
remainingMinutes = Math.max(0, rateLimitDuration - elapsedMinutes)
willBeAvailableAt = new Date(rateLimitedAt.getTime() + rateLimitDuration * 60000)
}
return {
isRateLimited: remainingMinutes > 0,
remainingMinutes,
willBeAvailableAt
}
}
// 加密敏感数据
_encryptSensitiveData(text) {
if (!text) {
return ''
}
const key = this._getEncryptionKey()
const iv = crypto.randomBytes(16)
const cipher = crypto.createCipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let encrypted = cipher.update(text)
encrypted = Buffer.concat([encrypted, cipher.final()])
return `${iv.toString('hex')}:${encrypted.toString('hex')}`
}
// 解密敏感数据
_decryptSensitiveData(text) {
if (!text || text === '') {
return ''
}
// 检查缓存
const cacheKey = crypto.createHash('sha256').update(text).digest('hex')
const cached = this._decryptCache.get(cacheKey)
if (cached !== undefined) {
return cached
}
try {
const key = this._getEncryptionKey()
const [ivHex, encryptedHex] = text.split(':')
const iv = Buffer.from(ivHex, 'hex')
const encryptedText = Buffer.from(encryptedHex, 'hex')
const decipher = crypto.createDecipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let decrypted = decipher.update(encryptedText)
decrypted = Buffer.concat([decrypted, decipher.final()])
const result = decrypted.toString()
// 存入缓存5分钟过期
this._decryptCache.set(cacheKey, result, 5 * 60 * 1000)
return result
} catch (error) {
logger.error('Decryption error:', error)
return ''
}
}
// 获取加密密钥
_getEncryptionKey() {
if (!this._encryptionKeyCache) {
this._encryptionKeyCache = crypto.scryptSync(
config.security.encryptionKey,
this.ENCRYPTION_SALT,
32
)
}
return this._encryptionKeyCache
}
// 保存账户到 Redis
async _saveAccount(accountId, accountData) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 保存账户数据
await client.hset(key, accountData)
// 添加到共享账户列表
if (accountData.accountType === 'shared') {
await client.sadd(this.SHARED_ACCOUNTS_KEY, accountId)
}
}
}
module.exports = new OpenAIResponsesAccountService()

View File

@@ -0,0 +1,708 @@
const axios = require('axios')
const ProxyHelper = require('../utils/proxyHelper')
const logger = require('../utils/logger')
const openaiResponsesAccountService = require('./openaiResponsesAccountService')
const apiKeyService = require('./apiKeyService')
const unifiedOpenAIScheduler = require('./unifiedOpenAIScheduler')
const config = require('../../config/config')
const crypto = require('crypto')
class OpenAIResponsesRelayService {
constructor() {
this.defaultTimeout = config.requestTimeout || 600000
}
// 处理请求转发
async handleRequest(req, res, account, apiKeyData) {
let abortController = null
// 获取会话哈希(如果有的话)
const sessionId = req.headers['session_id'] || req.body?.session_id
const sessionHash = sessionId
? crypto.createHash('sha256').update(sessionId).digest('hex')
: null
try {
// 获取完整的账户信息(包含解密的 API Key
const fullAccount = await openaiResponsesAccountService.getAccount(account.id)
if (!fullAccount) {
throw new Error('Account not found')
}
// 创建 AbortController 用于取消请求
abortController = new AbortController()
// 设置客户端断开监听器
const handleClientDisconnect = () => {
logger.info('🔌 Client disconnected, aborting OpenAI-Responses request')
if (abortController && !abortController.signal.aborted) {
abortController.abort()
}
}
// 监听客户端断开事件
req.once('close', handleClientDisconnect)
res.once('close', handleClientDisconnect)
// 构建目标 URL
const targetUrl = `${fullAccount.baseApi}${req.path}`
logger.info(`🎯 Forwarding to: ${targetUrl}`)
// 构建请求头
const headers = {
...this._filterRequestHeaders(req.headers),
Authorization: `Bearer ${fullAccount.apiKey}`,
'Content-Type': 'application/json'
}
// 处理 User-Agent
if (fullAccount.userAgent) {
// 使用自定义 User-Agent
headers['User-Agent'] = fullAccount.userAgent
logger.debug(`📱 Using custom User-Agent: ${fullAccount.userAgent}`)
} else if (req.headers['user-agent']) {
// 透传原始 User-Agent
headers['User-Agent'] = req.headers['user-agent']
logger.debug(`📱 Forwarding original User-Agent: ${req.headers['user-agent']}`)
}
// 配置请求选项
const requestOptions = {
method: req.method,
url: targetUrl,
headers,
data: req.body,
timeout: this.defaultTimeout,
responseType: req.body?.stream ? 'stream' : 'json',
validateStatus: () => true, // 允许处理所有状态码
signal: abortController.signal
}
// 配置代理(如果有)
if (fullAccount.proxy) {
const proxyAgent = ProxyHelper.createProxyAgent(fullAccount.proxy)
if (proxyAgent) {
requestOptions.httpsAgent = proxyAgent
requestOptions.proxy = false
logger.info(
`🌐 Using proxy for OpenAI-Responses: ${ProxyHelper.getProxyDescription(fullAccount.proxy)}`
)
}
}
// 记录请求信息
logger.info('📤 OpenAI-Responses relay request', {
accountId: account.id,
accountName: account.name,
targetUrl,
method: req.method,
stream: req.body?.stream || false,
model: req.body?.model || 'unknown',
userAgent: headers['User-Agent'] || 'not set'
})
// 发送请求
const response = await axios(requestOptions)
// 处理 429 限流错误
if (response.status === 429) {
const { resetsInSeconds, errorData } = await this._handle429Error(
account,
response,
req.body?.stream,
sessionHash
)
// 返回错误响应(使用处理后的数据,避免循环引用)
const errorResponse = errorData || {
error: {
message: 'Rate limit exceeded',
type: 'rate_limit_error',
code: 'rate_limit_exceeded',
resets_in_seconds: resetsInSeconds
}
}
return res.status(429).json(errorResponse)
}
// 处理其他错误状态码
if (response.status >= 400) {
// 处理流式错误响应
let errorData = response.data
if (response.data && typeof response.data.pipe === 'function') {
// 流式响应需要先读取内容
const chunks = []
await new Promise((resolve) => {
response.data.on('data', (chunk) => chunks.push(chunk))
response.data.on('end', resolve)
response.data.on('error', resolve)
setTimeout(resolve, 5000) // 超时保护
})
const fullResponse = Buffer.concat(chunks).toString()
// 尝试解析错误响应
try {
if (fullResponse.includes('data: ')) {
// SSE格式
const lines = fullResponse.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const jsonStr = line.slice(6).trim()
if (jsonStr && jsonStr !== '[DONE]') {
errorData = JSON.parse(jsonStr)
break
}
}
}
} else {
// 普通JSON
errorData = JSON.parse(fullResponse)
}
} catch (e) {
logger.error('Failed to parse error response:', e)
errorData = { error: { message: fullResponse || 'Unknown error' } }
}
}
logger.error('OpenAI-Responses API error', {
status: response.status,
statusText: response.statusText,
errorData
})
// 清理监听器
req.removeListener('close', handleClientDisconnect)
res.removeListener('close', handleClientDisconnect)
return res.status(response.status).json(errorData)
}
// 更新最后使用时间
await openaiResponsesAccountService.updateAccount(account.id, {
lastUsedAt: new Date().toISOString()
})
// 处理流式响应
if (req.body?.stream && response.data && typeof response.data.pipe === 'function') {
return this._handleStreamResponse(
response,
res,
account,
apiKeyData,
req.body?.model,
handleClientDisconnect,
req
)
}
// 处理非流式响应
return this._handleNormalResponse(response, res, account, apiKeyData, req.body?.model)
} catch (error) {
// 清理 AbortController
if (abortController && !abortController.signal.aborted) {
abortController.abort()
}
// 安全地记录错误,避免循环引用
const errorInfo = {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText
}
logger.error('OpenAI-Responses relay error:', errorInfo)
// 检查是否是网络错误
if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') {
await openaiResponsesAccountService.updateAccount(account.id, {
status: 'error',
errorMessage: `Connection error: ${error.code}`
})
}
// 如果已经发送了响应头,直接结束
if (res.headersSent) {
return res.end()
}
// 检查是否是axios错误并包含响应
if (error.response) {
// 处理axios错误响应
const status = error.response.status || 500
let errorData = {
error: {
message: error.response.statusText || 'Request failed',
type: 'api_error',
code: error.code || 'unknown'
}
}
// 如果响应包含数据,尝试使用它
if (error.response.data) {
// 检查是否是流
if (typeof error.response.data === 'object' && !error.response.data.pipe) {
errorData = error.response.data
} else if (typeof error.response.data === 'string') {
try {
errorData = JSON.parse(error.response.data)
} catch (e) {
errorData.error.message = error.response.data
}
}
}
return res.status(status).json(errorData)
}
// 其他错误
return res.status(500).json({
error: {
message: 'Internal server error',
type: 'internal_error',
details: error.message
}
})
}
}
// 处理流式响应
async _handleStreamResponse(
response,
res,
account,
apiKeyData,
requestedModel,
handleClientDisconnect,
req
) {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
let usageData = null
let actualModel = null
let buffer = ''
let rateLimitDetected = false
let rateLimitResetsInSeconds = null
let streamEnded = false
// 解析 SSE 事件以捕获 usage 数据和 model
const parseSSEForUsage = (data) => {
const lines = data.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const jsonStr = line.slice(6)
if (jsonStr === '[DONE]') {
continue
}
const eventData = JSON.parse(jsonStr)
// 检查是否是 response.completed 事件OpenAI-Responses 格式)
if (eventData.type === 'response.completed' && eventData.response) {
// 从响应中获取真实的 model
if (eventData.response.model) {
actualModel = eventData.response.model
logger.debug(`📊 Captured actual model from response.completed: ${actualModel}`)
}
// 获取 usage 数据 - OpenAI-Responses 格式在 response.usage 下
if (eventData.response.usage) {
usageData = eventData.response.usage
logger.info('📊 Successfully captured usage data from OpenAI-Responses:', {
input_tokens: usageData.input_tokens,
output_tokens: usageData.output_tokens,
total_tokens: usageData.total_tokens
})
}
}
// 检查是否有限流错误
if (eventData.error) {
// 检查多种可能的限流错误类型
if (
eventData.error.type === 'rate_limit_error' ||
eventData.error.type === 'usage_limit_reached' ||
eventData.error.type === 'rate_limit_exceeded'
) {
rateLimitDetected = true
if (eventData.error.resets_in_seconds) {
rateLimitResetsInSeconds = eventData.error.resets_in_seconds
logger.warn(
`🚫 Rate limit detected in stream, resets in ${rateLimitResetsInSeconds} seconds (${Math.ceil(rateLimitResetsInSeconds / 60)} minutes)`
)
}
}
}
} catch (e) {
// 忽略解析错误
}
}
}
}
// 监听数据流
response.data.on('data', (chunk) => {
try {
const chunkStr = chunk.toString()
// 转发数据给客户端
if (!res.destroyed && !streamEnded) {
res.write(chunk)
}
// 同时解析数据以捕获 usage 信息
buffer += chunkStr
// 处理完整的 SSE 事件
if (buffer.includes('\n\n')) {
const events = buffer.split('\n\n')
buffer = events.pop() || ''
for (const event of events) {
if (event.trim()) {
parseSSEForUsage(event)
}
}
}
} catch (error) {
logger.error('Error processing stream chunk:', error)
}
})
response.data.on('end', async () => {
streamEnded = true
// 处理剩余的 buffer
if (buffer.trim()) {
parseSSEForUsage(buffer)
}
// 记录使用统计
if (usageData) {
try {
// OpenAI-Responses 使用 input_tokens/output_tokens标准 OpenAI 使用 prompt_tokens/completion_tokens
const inputTokens = usageData.input_tokens || usageData.prompt_tokens || 0
const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0
// 提取缓存相关的 tokens如果存在
const cacheCreateTokens = usageData.input_tokens_details?.cache_creation_tokens || 0
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0
const totalTokens = usageData.total_tokens || inputTokens + outputTokens
const modelToRecord = actualModel || requestedModel || 'gpt-4'
await apiKeyService.recordUsage(
apiKeyData.id,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
modelToRecord,
account.id
)
logger.info(
`📊 Recorded usage - Input: ${inputTokens}, Output: ${outputTokens}, CacheRead: ${cacheReadTokens}, CacheCreate: ${cacheCreateTokens}, Total: ${totalTokens}, Model: ${modelToRecord}`
)
// 更新账户的 token 使用统计
await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens)
// 更新账户使用额度(如果设置了额度限制)
if (parseFloat(account.dailyQuota) > 0) {
// 估算费用根据模型和token数量
const estimatedCost = this._estimateCost(modelToRecord, inputTokens, outputTokens)
await openaiResponsesAccountService.updateUsageQuota(account.id, estimatedCost)
}
} catch (error) {
logger.error('Failed to record usage:', error)
}
}
// 如果在流式响应中检测到限流
if (rateLimitDetected) {
// 使用统一调度器处理限流(与非流式响应保持一致)
const sessionId = req.headers['session_id'] || req.body?.session_id
const sessionHash = sessionId
? crypto.createHash('sha256').update(sessionId).digest('hex')
: null
await unifiedOpenAIScheduler.markAccountRateLimited(
account.id,
'openai-responses',
sessionHash,
rateLimitResetsInSeconds
)
logger.warn(
`🚫 Processing rate limit for OpenAI-Responses account ${account.id} from stream`
)
}
// 清理监听器
req.removeListener('close', handleClientDisconnect)
res.removeListener('close', handleClientDisconnect)
if (!res.destroyed) {
res.end()
}
logger.info('Stream response completed', {
accountId: account.id,
hasUsage: !!usageData,
actualModel: actualModel || 'unknown'
})
})
response.data.on('error', (error) => {
streamEnded = true
logger.error('Stream error:', error)
// 清理监听器
req.removeListener('close', handleClientDisconnect)
res.removeListener('close', handleClientDisconnect)
if (!res.headersSent) {
res.status(502).json({ error: { message: 'Upstream stream error' } })
} else if (!res.destroyed) {
res.end()
}
})
// 处理客户端断开连接
const cleanup = () => {
streamEnded = true
try {
response.data?.unpipe?.(res)
response.data?.destroy?.()
} catch (_) {
// 忽略清理错误
}
}
req.on('close', cleanup)
req.on('aborted', cleanup)
}
// 处理非流式响应
async _handleNormalResponse(response, res, account, apiKeyData, requestedModel) {
const responseData = response.data
// 提取 usage 数据和实际 model
// 支持两种格式:直接的 usage 或嵌套在 response 中的 usage
const usageData = responseData?.usage || responseData?.response?.usage
const actualModel =
responseData?.model || responseData?.response?.model || requestedModel || 'gpt-4'
// 记录使用统计
if (usageData) {
try {
// OpenAI-Responses 使用 input_tokens/output_tokens标准 OpenAI 使用 prompt_tokens/completion_tokens
const inputTokens = usageData.input_tokens || usageData.prompt_tokens || 0
const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0
// 提取缓存相关的 tokens如果存在
const cacheCreateTokens = usageData.input_tokens_details?.cache_creation_tokens || 0
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0
const totalTokens = usageData.total_tokens || inputTokens + outputTokens
await apiKeyService.recordUsage(
apiKeyData.id,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
actualModel,
account.id
)
logger.info(
`📊 Recorded non-stream usage - Input: ${inputTokens}, Output: ${outputTokens}, CacheRead: ${cacheReadTokens}, CacheCreate: ${cacheCreateTokens}, Total: ${totalTokens}, Model: ${actualModel}`
)
// 更新账户的 token 使用统计
await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens)
// 更新账户使用额度(如果设置了额度限制)
if (parseFloat(account.dailyQuota) > 0) {
// 估算费用根据模型和token数量
const estimatedCost = this._estimateCost(actualModel, inputTokens, outputTokens)
await openaiResponsesAccountService.updateUsageQuota(account.id, estimatedCost)
}
} catch (error) {
logger.error('Failed to record usage:', error)
}
}
// 返回响应
res.status(response.status).json(responseData)
logger.info('Normal response completed', {
accountId: account.id,
status: response.status,
hasUsage: !!usageData,
model: actualModel
})
}
// 处理 429 限流错误
async _handle429Error(account, response, isStream = false, sessionHash = null) {
let resetsInSeconds = null
let errorData = null
try {
// 对于429错误响应可能是JSON或SSE格式
if (isStream && response.data && typeof response.data.pipe === 'function') {
// 流式响应需要先收集数据
const chunks = []
await new Promise((resolve, reject) => {
response.data.on('data', (chunk) => chunks.push(chunk))
response.data.on('end', resolve)
response.data.on('error', reject)
// 设置超时防止无限等待
setTimeout(resolve, 5000)
})
const fullResponse = Buffer.concat(chunks).toString()
// 尝试解析SSE格式的错误响应
if (fullResponse.includes('data: ')) {
const lines = fullResponse.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const jsonStr = line.slice(6).trim()
if (jsonStr && jsonStr !== '[DONE]') {
errorData = JSON.parse(jsonStr)
break
}
} catch (e) {
// 继续尝试下一行
}
}
}
}
// 如果SSE解析失败尝试直接解析为JSON
if (!errorData) {
try {
errorData = JSON.parse(fullResponse)
} catch (e) {
logger.error('Failed to parse 429 error response:', e)
logger.debug('Raw response:', fullResponse)
}
}
} else if (response.data && typeof response.data !== 'object') {
// 如果response.data是字符串尝试解析为JSON
try {
errorData = JSON.parse(response.data)
} catch (e) {
logger.error('Failed to parse 429 error response as JSON:', e)
errorData = { error: { message: response.data } }
}
} else if (response.data && typeof response.data === 'object' && !response.data.pipe) {
// 非流式响应,且是对象,直接使用
errorData = response.data
}
// 从响应体中提取重置时间OpenAI 标准格式)
if (errorData && errorData.error) {
if (errorData.error.resets_in_seconds) {
resetsInSeconds = errorData.error.resets_in_seconds
logger.info(
`🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)`
)
} else if (errorData.error.resets_in) {
// 某些 API 可能使用不同的字段名
resetsInSeconds = parseInt(errorData.error.resets_in)
logger.info(
`🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)`
)
}
}
if (!resetsInSeconds) {
logger.warn('⚠️ Could not extract reset time from 429 response, using default 60 minutes')
}
} catch (e) {
logger.error('⚠️ Failed to parse rate limit error:', e)
}
// 使用统一调度器标记账户为限流状态与普通OpenAI账号保持一致
await unifiedOpenAIScheduler.markAccountRateLimited(
account.id,
'openai-responses',
sessionHash,
resetsInSeconds
)
logger.warn('OpenAI-Responses account rate limited', {
accountId: account.id,
accountName: account.name,
resetsInSeconds: resetsInSeconds || 'unknown',
resetInMinutes: resetsInSeconds ? Math.ceil(resetsInSeconds / 60) : 60,
resetInHours: resetsInSeconds ? Math.ceil(resetsInSeconds / 3600) : 1
})
// 返回处理后的数据,避免循环引用
return { resetsInSeconds, errorData }
}
// 过滤请求头
_filterRequestHeaders(headers) {
const filtered = {}
const skipHeaders = [
'host',
'content-length',
'authorization',
'x-api-key',
'x-cr-api-key',
'connection',
'upgrade',
'sec-websocket-key',
'sec-websocket-version',
'sec-websocket-extensions'
]
for (const [key, value] of Object.entries(headers)) {
if (!skipHeaders.includes(key.toLowerCase())) {
filtered[key] = value
}
}
return filtered
}
// 估算费用(简化版本,实际应该根据不同的定价模型)
_estimateCost(model, inputTokens, outputTokens) {
// 这是一个简化的费用估算,实际应该根据不同的 API 提供商和模型定价
const rates = {
'gpt-4': { input: 0.03, output: 0.06 }, // per 1K tokens
'gpt-4-turbo': { input: 0.01, output: 0.03 },
'gpt-3.5-turbo': { input: 0.0005, output: 0.0015 },
'claude-3-opus': { input: 0.015, output: 0.075 },
'claude-3-sonnet': { input: 0.003, output: 0.015 },
'claude-3-haiku': { input: 0.00025, output: 0.00125 }
}
// 查找匹配的模型定价
let rate = rates['gpt-3.5-turbo'] // 默认使用 GPT-3.5 的价格
for (const [modelKey, modelRate] of Object.entries(rates)) {
if (model.toLowerCase().includes(modelKey.toLowerCase())) {
rate = modelRate
break
}
}
const inputCost = (inputTokens / 1000) * rate.input
const outputCost = (outputTokens / 1000) * rate.output
return inputCost + outputCost
}
}
module.exports = new OpenAIResponsesRelayService()

View File

@@ -1,4 +1,5 @@
const openaiAccountService = require('./openaiAccountService')
const openaiResponsesAccountService = require('./openaiResponsesAccountService')
const accountGroupService = require('./accountGroupService')
const redis = require('../models/redis')
const logger = require('../utils/logger')
@@ -32,23 +33,53 @@ class UnifiedOpenAIScheduler {
return await this.selectAccountFromGroup(groupId, sessionHash, requestedModel, apiKeyData)
}
// 普通专属账户
const boundAccount = await openaiAccountService.getAccount(apiKeyData.openaiAccountId)
// 普通专属账户 - 根据前缀判断是 OpenAI 还是 OpenAI-Responses 类型
let boundAccount = null
let accountType = 'openai'
// 检查是否有 responses: 前缀(用于区分 OpenAI-Responses 账户)
if (apiKeyData.openaiAccountId.startsWith('responses:')) {
const accountId = apiKeyData.openaiAccountId.replace('responses:', '')
boundAccount = await openaiResponsesAccountService.getAccount(accountId)
accountType = 'openai-responses'
} else {
// 普通 OpenAI 账户
boundAccount = await openaiAccountService.getAccount(apiKeyData.openaiAccountId)
accountType = 'openai'
}
if (
boundAccount &&
(boundAccount.isActive === true || boundAccount.isActive === 'true') &&
boundAccount.status !== 'error'
) {
// 检查是否被限流
const isRateLimited = await this.isAccountRateLimited(boundAccount.id)
if (isRateLimited) {
const errorMsg = `Dedicated account ${boundAccount.name} is currently rate limited`
logger.warn(`⚠️ ${errorMsg}`)
throw new Error(errorMsg)
if (accountType === 'openai') {
const isRateLimited = await this.isAccountRateLimited(boundAccount.id)
if (isRateLimited) {
const errorMsg = `Dedicated account ${boundAccount.name} is currently rate limited`
logger.warn(`⚠️ ${errorMsg}`)
throw new Error(errorMsg)
}
} else if (
accountType === 'openai-responses' &&
boundAccount.rateLimitStatus === 'limited'
) {
// OpenAI-Responses 账户的限流检查
const isRateLimitCleared = await openaiResponsesAccountService.checkAndClearRateLimit(
boundAccount.id
)
if (!isRateLimitCleared) {
const errorMsg = `Dedicated account ${boundAccount.name} is currently rate limited`
logger.warn(`⚠️ ${errorMsg}`)
throw new Error(errorMsg)
}
}
// 专属账户可选的模型检查只有明确配置了supportedModels且不为空才检查
// OpenAI-Responses 账户默认支持所有模型
if (
accountType === 'openai' &&
requestedModel &&
boundAccount.supportedModels &&
boundAccount.supportedModels.length > 0
@@ -62,13 +93,19 @@ class UnifiedOpenAIScheduler {
}
logger.info(
`🎯 Using bound dedicated OpenAI account: ${boundAccount.name} (${apiKeyData.openaiAccountId}) for API key ${apiKeyData.name}`
`🎯 Using bound dedicated ${accountType} account: ${boundAccount.name} (${boundAccount.id}) for API key ${apiKeyData.name}`
)
// 更新账户的最后使用时间
await openaiAccountService.recordUsage(apiKeyData.openaiAccountId, 0)
if (accountType === 'openai') {
await openaiAccountService.recordUsage(boundAccount.id, 0)
} else {
await openaiResponsesAccountService.updateAccount(boundAccount.id, {
lastUsedAt: new Date().toISOString()
})
}
return {
accountId: apiKeyData.openaiAccountId,
accountType: 'openai'
accountId: boundAccount.id,
accountType
}
} else {
// 专属账户不可用时直接报错,不降级到共享池
@@ -230,6 +267,40 @@ class UnifiedOpenAIScheduler {
}
}
// 获取所有 OpenAI-Responses 账户(共享池)
const openaiResponsesAccounts = await openaiResponsesAccountService.getAllAccounts()
for (const account of openaiResponsesAccounts) {
if (
(account.isActive === true || account.isActive === 'true') &&
account.status !== 'error' &&
account.status !== 'rateLimited' &&
(account.accountType === 'shared' || !account.accountType) && // 兼容旧数据
this._isSchedulable(account.schedulable)
) {
// 检查并清除过期的限流状态
const isRateLimitCleared = await openaiResponsesAccountService.checkAndClearRateLimit(
account.id
)
// 如果仍然处于限流状态,跳过
if (account.rateLimitStatus === 'limited' && !isRateLimitCleared) {
logger.debug(`⏭️ Skipping OpenAI-Responses account ${account.name} - rate limited`)
continue
}
// OpenAI-Responses 账户默认支持所有模型
// 因为它们是第三方兼容 API模型支持由第三方决定
availableAccounts.push({
...account,
accountId: account.id,
accountType: 'openai-responses',
priority: parseInt(account.priority) || 50,
lastUsedAt: account.lastUsedAt || '0'
})
}
}
return availableAccounts
}
@@ -262,6 +333,24 @@ class UnifiedOpenAIScheduler {
return false
}
return !(await this.isAccountRateLimited(accountId))
} else if (accountType === 'openai-responses') {
const account = await openaiResponsesAccountService.getAccount(accountId)
if (
!account ||
(account.isActive !== true && account.isActive !== 'true') ||
account.status === 'error'
) {
return false
}
// 检查是否可调度
if (!this._isSchedulable(account.schedulable)) {
logger.info(`🚫 OpenAI-Responses account ${accountId} is not schedulable`)
return false
}
// 检查并清除过期的限流状态
const isRateLimitCleared =
await openaiResponsesAccountService.checkAndClearRateLimit(accountId)
return account.rateLimitStatus !== 'limited' || isRateLimitCleared
}
return false
} catch (error) {
@@ -307,6 +396,18 @@ class UnifiedOpenAIScheduler {
try {
if (accountType === 'openai') {
await openaiAccountService.setAccountRateLimited(accountId, true, resetsInSeconds)
} else if (accountType === 'openai-responses') {
// 对于 OpenAI-Responses 账户,使用与普通 OpenAI 账户类似的处理方式
const duration = resetsInSeconds ? Math.ceil(resetsInSeconds / 60) : null
await openaiResponsesAccountService.markAccountRateLimited(accountId, duration)
// 同时更新调度状态,避免继续被调度
await openaiResponsesAccountService.updateAccount(accountId, {
schedulable: 'false',
rateLimitResetAt: resetsInSeconds
? new Date(Date.now() + resetsInSeconds * 1000).toISOString()
: new Date(Date.now() + 3600000).toISOString() // 默认1小时
})
}
// 删除会话映射
@@ -329,6 +430,17 @@ class UnifiedOpenAIScheduler {
try {
if (accountType === 'openai') {
await openaiAccountService.setAccountRateLimited(accountId, false)
} else if (accountType === 'openai-responses') {
// 清除 OpenAI-Responses 账户的限流状态
await openaiResponsesAccountService.updateAccount(accountId, {
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitResetAt: '',
status: 'active',
errorMessage: '',
schedulable: 'true'
})
logger.info(`✅ Rate limit cleared for OpenAI-Responses account ${accountId}`)
}
return { success: true }

View File

@@ -58,6 +58,24 @@ class WebhookNotifier {
}
}
/**
* 发送账号事件通知
* @param {string} eventType - 事件类型 (account.created, account.updated, account.deleted, account.status_changed)
* @param {Object} data - 事件数据
*/
async sendAccountEvent(eventType, data) {
try {
// 使用webhookService发送通知
await webhookService.sendNotification('accountEvent', {
eventType,
...data,
timestamp: data.timestamp || getISOStringWithTimezone(new Date())
})
} catch (error) {
logger.error(`Failed to send account event (${eventType}):`, error)
}
}
/**
* 获取错误代码映射
* @param {string} platform - 平台类型