[feat/cron-test-support]done.

This commit is contained in:
guoyongchang
2025-12-19 10:25:43 +08:00
parent 53cda0fd18
commit 09cf951cdc
10 changed files with 1571 additions and 25 deletions

View File

@@ -661,6 +661,19 @@ class Application {
'🚦 Skipping concurrency queue cleanup on startup (CLEAR_CONCURRENCY_QUEUES_ON_STARTUP=false)'
)
}
// 🧪 启动账户定时测试调度器
// 根据配置定期测试账户连通性并保存测试历史
const accountTestSchedulerEnabled =
process.env.ACCOUNT_TEST_SCHEDULER_ENABLED !== 'false' &&
config.accountTestScheduler?.enabled !== false
if (accountTestSchedulerEnabled) {
const accountTestSchedulerService = require('./services/accountTestSchedulerService')
accountTestSchedulerService.start()
logger.info('🧪 Account test scheduler service started')
} else {
logger.info('🧪 Account test scheduler service disabled')
}
}
setupGracefulShutdown() {
@@ -715,6 +728,15 @@ class Application {
logger.error('❌ Error stopping cost rank service:', error)
}
// 停止账户定时测试调度器
try {
const accountTestSchedulerService = require('./services/accountTestSchedulerService')
accountTestSchedulerService.stop()
logger.info('🧪 Account test scheduler service stopped')
} catch (error) {
logger.error('❌ Error stopping account test scheduler service:', error)
}
// 🔢 清理所有并发计数Phase 1 修复:防止重启泄漏)
try {
logger.info('🔢 Cleaning up all concurrency counters...')

View File

@@ -96,7 +96,25 @@ class RedisClient {
logger.warn('⚠️ Redis connection closed')
})
await this.client.connect()
// 只有在 lazyConnect 模式下才需要手动调用 connect()
// 如果 Redis 已经连接或正在连接中,则跳过
if (
this.client.status !== 'connecting' &&
this.client.status !== 'connect' &&
this.client.status !== 'ready'
) {
await this.client.connect()
} else {
// 等待 ready 状态
await new Promise((resolve, reject) => {
if (this.client.status === 'ready') {
resolve()
} else {
this.client.once('ready', resolve)
this.client.once('error', reject)
}
})
}
return this.client
} catch (error) {
logger.error('💥 Failed to connect to Redis:', error)
@@ -3157,4 +3175,246 @@ redisClient.scanConcurrencyQueueStatsKeys = async function () {
}
}
// ============================================================================
// 账户测试历史相关操作
// ============================================================================
const ACCOUNT_TEST_HISTORY_MAX = 5 // 保留最近5次测试记录
const ACCOUNT_TEST_HISTORY_TTL = 86400 * 30 // 30天过期
/**
* 保存账户测试结果
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型 (claude/gemini/openai等)
* @param {Object} testResult - 测试结果对象
* @param {boolean} testResult.success - 是否成功
* @param {string} testResult.message - 测试消息/响应
* @param {number} testResult.latencyMs - 延迟毫秒数
* @param {string} testResult.error - 错误信息(如有)
* @param {string} testResult.timestamp - 测试时间戳
*/
redisClient.saveAccountTestResult = async function (accountId, platform, testResult) {
const key = `account:test_history:${platform}:${accountId}`
try {
const record = JSON.stringify({
...testResult,
timestamp: testResult.timestamp || new Date().toISOString()
})
// 使用 LPUSH + LTRIM 保持最近5条记录
const client = this.getClientSafe()
await client.lpush(key, record)
await client.ltrim(key, 0, ACCOUNT_TEST_HISTORY_MAX - 1)
await client.expire(key, ACCOUNT_TEST_HISTORY_TTL)
logger.debug(`📝 Saved test result for ${platform} account ${accountId}`)
} catch (error) {
logger.error(`Failed to save test result for ${accountId}:`, error)
}
}
/**
* 获取账户测试历史
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @returns {Promise<Array>} 测试历史记录数组(最新在前)
*/
redisClient.getAccountTestHistory = async function (accountId, platform) {
const key = `account:test_history:${platform}:${accountId}`
try {
const client = this.getClientSafe()
const records = await client.lrange(key, 0, -1)
return records.map((r) => JSON.parse(r))
} catch (error) {
logger.error(`Failed to get test history for ${accountId}:`, error)
return []
}
}
/**
* 获取账户最新测试结果
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @returns {Promise<Object|null>} 最新测试结果
*/
redisClient.getAccountLatestTestResult = async function (accountId, platform) {
const key = `account:test_history:${platform}:${accountId}`
try {
const client = this.getClientSafe()
const record = await client.lindex(key, 0)
return record ? JSON.parse(record) : null
} catch (error) {
logger.error(`Failed to get latest test result for ${accountId}:`, error)
return null
}
}
/**
* 批量获取多个账户的测试历史
* @param {Array<{accountId: string, platform: string}>} accounts - 账户列表
* @returns {Promise<Object>} 以 accountId 为 key 的测试历史映射
*/
redisClient.getAccountsTestHistory = async function (accounts) {
const result = {}
try {
const client = this.getClientSafe()
const pipeline = client.pipeline()
for (const { accountId, platform } of accounts) {
const key = `account:test_history:${platform}:${accountId}`
pipeline.lrange(key, 0, -1)
}
const responses = await pipeline.exec()
accounts.forEach(({ accountId }, index) => {
const [err, records] = responses[index]
if (!err && records) {
result[accountId] = records.map((r) => JSON.parse(r))
} else {
result[accountId] = []
}
})
} catch (error) {
logger.error('Failed to get batch test history:', error)
}
return result
}
/**
* 保存定时测试配置
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @param {Object} config - 配置对象
* @param {boolean} config.enabled - 是否启用定时测试
* @param {string} config.cronExpression - Cron 表达式 (如 "0 8 * * *" 表示每天8点)
* @param {string} config.model - 测试使用的模型
*/
redisClient.saveAccountTestConfig = async function (accountId, platform, testConfig) {
const key = `account:test_config:${platform}:${accountId}`
try {
const client = this.getClientSafe()
await client.hset(key, {
enabled: testConfig.enabled ? 'true' : 'false',
cronExpression: testConfig.cronExpression || '0 8 * * *', // 默认每天早上8点
model: testConfig.model || 'claude-sonnet-4-5-20250929', // 默认模型
updatedAt: new Date().toISOString()
})
} catch (error) {
logger.error(`Failed to save test config for ${accountId}:`, error)
}
}
/**
* 获取定时测试配置
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @returns {Promise<Object|null>} 配置对象
*/
redisClient.getAccountTestConfig = async function (accountId, platform) {
const key = `account:test_config:${platform}:${accountId}`
try {
const client = this.getClientSafe()
const testConfig = await client.hgetall(key)
if (!testConfig || Object.keys(testConfig).length === 0) {
return null
}
// 向后兼容:如果存在旧的 testHour 字段,转换为 cron 表达式
let { cronExpression } = testConfig
if (!cronExpression && testConfig.testHour) {
const hour = parseInt(testConfig.testHour, 10)
cronExpression = `0 ${hour} * * *`
}
return {
enabled: testConfig.enabled === 'true',
cronExpression: cronExpression || '0 8 * * *',
model: testConfig.model || 'claude-sonnet-4-5-20250929',
updatedAt: testConfig.updatedAt
}
} catch (error) {
logger.error(`Failed to get test config for ${accountId}:`, error)
return null
}
}
/**
* 获取所有启用定时测试的账户
* @param {string} platform - 平台类型
* @returns {Promise<Array>} 账户ID列表及 cron 配置
*/
redisClient.getEnabledTestAccounts = async function (platform) {
const accountIds = []
let cursor = '0'
try {
const client = this.getClientSafe()
do {
const [newCursor, keys] = await client.scan(
cursor,
'MATCH',
`account:test_config:${platform}:*`,
'COUNT',
100
)
cursor = newCursor
for (const key of keys) {
const testConfig = await client.hgetall(key)
if (testConfig && testConfig.enabled === 'true') {
const accountId = key.replace(`account:test_config:${platform}:`, '')
// 向后兼容:如果存在旧的 testHour 字段,转换为 cron 表达式
let { cronExpression } = testConfig
if (!cronExpression && testConfig.testHour) {
const hour = parseInt(testConfig.testHour, 10)
cronExpression = `0 ${hour} * * *`
}
accountIds.push({
accountId,
cronExpression: cronExpression || '0 8 * * *',
model: testConfig.model || 'claude-sonnet-4-5-20250929'
})
}
}
} while (cursor !== '0')
return accountIds
} catch (error) {
logger.error(`Failed to get enabled test accounts for ${platform}:`, error)
return []
}
}
/**
* 保存账户上次测试时间(用于调度器判断是否需要测试)
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
*/
redisClient.setAccountLastTestTime = async function (accountId, platform) {
const key = `account:last_test:${platform}:${accountId}`
try {
const client = this.getClientSafe()
await client.set(key, Date.now().toString(), 'EX', 86400 * 7) // 7天过期
} catch (error) {
logger.error(`Failed to set last test time for ${accountId}:`, error)
}
}
/**
* 获取账户上次测试时间
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @returns {Promise<number|null>} 上次测试时间戳
*/
redisClient.getAccountLastTestTime = async function (accountId, platform) {
const key = `account:last_test:${platform}:${accountId}`
try {
const client = this.getClientSafe()
const timestamp = await client.get(key)
return timestamp ? parseInt(timestamp, 10) : null
} catch (error) {
logger.error(`Failed to get last test time for ${accountId}:`, error)
return null
}
}
module.exports = redisClient

View File

@@ -903,4 +903,199 @@ router.post('/claude-accounts/:accountId/test', authenticateAdmin, async (req, r
}
})
// ============================================================================
// 账户定时测试相关端点
// ============================================================================
// 获取账户测试历史
router.get('/claude-accounts/:accountId/test-history', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
try {
const history = await redis.getAccountTestHistory(accountId, 'claude')
return res.json({
success: true,
data: {
accountId,
platform: 'claude',
history
}
})
} catch (error) {
logger.error(`❌ Failed to get test history for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to get test history',
message: error.message
})
}
})
// 获取账户定时测试配置
router.get('/claude-accounts/:accountId/test-config', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
try {
const testConfig = await redis.getAccountTestConfig(accountId, 'claude')
return res.json({
success: true,
data: {
accountId,
platform: 'claude',
config: testConfig || { enabled: false, cronExpression: '0 8 * * *', model: 'claude-sonnet-4-5-20250929' }
}
})
} catch (error) {
logger.error(`❌ Failed to get test config for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to get test config',
message: error.message
})
}
})
// 设置账户定时测试配置
router.put('/claude-accounts/:accountId/test-config', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
const { enabled, cronExpression, model } = req.body
try {
// 验证参数
if (typeof enabled !== 'boolean') {
return res.status(400).json({
error: 'Invalid parameter',
message: 'enabled must be a boolean'
})
}
// 验证 cron 表达式
if (!cronExpression || typeof cronExpression !== 'string') {
return res.status(400).json({
error: 'Invalid parameter',
message: 'cronExpression is required and must be a string'
})
}
// 使用 node-cron 验证表达式
const cron = require('node-cron')
if (!cron.validate(cronExpression)) {
return res.status(400).json({
error: 'Invalid parameter',
message: `Invalid cron expression: ${cronExpression}. Format: "minute hour day month weekday" (e.g., "0 8 * * *" for daily at 8:00)`
})
}
// 验证模型(可选,有默认值)
const testModel = model || 'claude-sonnet-4-5-20250929'
// 检查账户是否存在
const account = await claudeAccountService.getAccount(accountId)
if (!account) {
return res.status(404).json({
error: 'Account not found'
})
}
// 保存配置
await redis.saveAccountTestConfig(accountId, 'claude', {
enabled,
cronExpression,
model: testModel
})
logger.success(
`📝 Updated test config for Claude account ${accountId}: enabled=${enabled}, cronExpression=${cronExpression}, model=${testModel}`
)
return res.json({
success: true,
message: 'Test config updated successfully',
data: {
accountId,
platform: 'claude',
config: { enabled, cronExpression, model: testModel }
}
})
} catch (error) {
logger.error(`❌ Failed to update test config for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to update test config',
message: error.message
})
}
})
// 手动触发账户测试非流式返回JSON结果
router.post('/claude-accounts/:accountId/test-sync', authenticateAdmin, async (req, res) => {
const { accountId } = req.params
try {
// 检查账户是否存在
const account = await claudeAccountService.getAccount(accountId)
if (!account) {
return res.status(404).json({
error: 'Account not found'
})
}
logger.info(`🧪 Manual sync test triggered for Claude account: ${accountId}`)
// 执行测试
const testResult = await claudeRelayService.testAccountConnectionSync(accountId)
// 保存测试结果到历史
await redis.saveAccountTestResult(accountId, 'claude', testResult)
await redis.setAccountLastTestTime(accountId, 'claude')
return res.json({
success: true,
data: {
accountId,
platform: 'claude',
result: testResult
}
})
} catch (error) {
logger.error(`❌ Failed to run sync test for account ${accountId}:`, error)
return res.status(500).json({
error: 'Failed to run test',
message: error.message
})
}
})
// 批量获取多个账户的测试历史
router.post('/claude-accounts/batch-test-history', authenticateAdmin, async (req, res) => {
const { accountIds } = req.body
try {
if (!Array.isArray(accountIds) || accountIds.length === 0) {
return res.status(400).json({
error: 'Invalid parameter',
message: 'accountIds must be a non-empty array'
})
}
// 限制批量查询数量
const limitedIds = accountIds.slice(0, 100)
const accounts = limitedIds.map((accountId) => ({
accountId,
platform: 'claude'
}))
const historyMap = await redis.getAccountsTestHistory(accounts)
return res.json({
success: true,
data: historyMap
})
} catch (error) {
logger.error('❌ Failed to get batch test history:', error)
return res.status(500).json({
error: 'Failed to get batch test history',
message: error.message
})
}
})
module.exports = router

View File

@@ -0,0 +1,411 @@
/**
* 账户定时测试调度服务
* 使用 node-cron 支持 crontab 表达式,为每个账户创建独立的定时任务
*/
const cron = require('node-cron')
const redis = require('../models/redis')
const logger = require('../utils/logger')
class AccountTestSchedulerService {
constructor() {
// 存储每个账户的 cron 任务: Map<string, { task: ScheduledTask, cronExpression: string }>
this.scheduledTasks = new Map()
// 定期刷新配置的间隔 (毫秒)
this.refreshIntervalMs = 60 * 1000
this.refreshInterval = null
// 测试并发限制
this.maxConcurrentTests = 3
// 当前正在测试的账户
this.testingAccounts = new Set()
// 是否已启动
this.isStarted = false
}
/**
* 验证 cron 表达式是否有效
* @param {string} cronExpression - cron 表达式
* @returns {boolean}
*/
validateCronExpression(cronExpression) {
return cron.validate(cronExpression)
}
/**
* 启动调度器
*/
async start() {
if (this.isStarted) {
logger.warn('⚠️ Account test scheduler is already running')
return
}
this.isStarted = true
logger.info('🚀 Starting account test scheduler service (node-cron mode)')
// 初始化所有已配置账户的定时任务
await this._refreshAllTasks()
// 定期刷新配置,以便动态添加/修改的配置能生效
this.refreshInterval = setInterval(() => {
this._refreshAllTasks()
}, this.refreshIntervalMs)
logger.info(
`📅 Account test scheduler started (refreshing configs every ${this.refreshIntervalMs / 1000}s)`
)
}
/**
* 停止调度器
*/
stop() {
if (this.refreshInterval) {
clearInterval(this.refreshInterval)
this.refreshInterval = null
}
// 停止所有 cron 任务
for (const [accountKey, taskInfo] of this.scheduledTasks.entries()) {
taskInfo.task.stop()
logger.debug(`🛑 Stopped cron task for ${accountKey}`)
}
this.scheduledTasks.clear()
this.isStarted = false
logger.info('🛑 Account test scheduler stopped')
}
/**
* 刷新所有账户的定时任务
* @private
*/
async _refreshAllTasks() {
try {
const platforms = ['claude', 'gemini', 'openai']
const activeAccountKeys = new Set()
for (const platform of platforms) {
const enabledAccounts = await redis.getEnabledTestAccounts(platform)
for (const { accountId, cronExpression, model } of enabledAccounts) {
if (!cronExpression) {
logger.warn(
`⚠️ Account ${accountId} (${platform}) has no valid cron expression, skipping`
)
continue
}
const accountKey = `${platform}:${accountId}`
activeAccountKeys.add(accountKey)
// 检查是否需要更新任务
const existingTask = this.scheduledTasks.get(accountKey)
if (existingTask) {
// 如果 cron 表达式和模型都没变,不需要更新
if (existingTask.cronExpression === cronExpression && existingTask.model === model) {
continue
}
// 配置变了,停止旧任务
existingTask.task.stop()
logger.info(
`🔄 Updating cron task for ${accountKey}: ${cronExpression}, model: ${model}`
)
} else {
logger.info(
` Creating cron task for ${accountKey}: ${cronExpression}, model: ${model}`
)
}
// 创建新的 cron 任务
this._createCronTask(accountId, platform, cronExpression, model)
}
}
// 清理已删除或禁用的账户任务
for (const [accountKey, taskInfo] of this.scheduledTasks.entries()) {
if (!activeAccountKeys.has(accountKey)) {
taskInfo.task.stop()
this.scheduledTasks.delete(accountKey)
logger.info(` Removed cron task for ${accountKey} (disabled or deleted)`)
}
}
} catch (error) {
logger.error('❌ Error refreshing account test tasks:', error)
}
}
/**
* 为单个账户创建 cron 任务
* @param {string} accountId
* @param {string} platform
* @param {string} cronExpression
* @param {string} model - 测试使用的模型
* @private
*/
_createCronTask(accountId, platform, cronExpression, model) {
const accountKey = `${platform}:${accountId}`
// 验证 cron 表达式
if (!this.validateCronExpression(cronExpression)) {
logger.error(`❌ Invalid cron expression for ${accountKey}: ${cronExpression}`)
return
}
const task = cron.schedule(
cronExpression,
async () => {
await this._runAccountTest(accountId, platform, model)
},
{
scheduled: true,
timezone: process.env.TZ || 'Asia/Shanghai'
}
)
this.scheduledTasks.set(accountKey, {
task,
cronExpression,
model,
accountId,
platform
})
}
/**
* 执行单个账户测试
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @param {string} model - 测试使用的模型
* @private
*/
async _runAccountTest(accountId, platform, model) {
const accountKey = `${platform}:${accountId}`
// 避免重复测试
if (this.testingAccounts.has(accountKey)) {
logger.debug(`⏳ Account ${accountKey} is already being tested, skipping`)
return
}
this.testingAccounts.add(accountKey)
try {
logger.info(
`🧪 Running scheduled test for ${platform} account: ${accountId} (model: ${model})`
)
let testResult
// 根据平台调用对应的测试方法
switch (platform) {
case 'claude':
testResult = await this._testClaudeAccount(accountId, model)
break
case 'gemini':
testResult = await this._testGeminiAccount(accountId, model)
break
case 'openai':
testResult = await this._testOpenAIAccount(accountId, model)
break
default:
testResult = {
success: false,
error: `Unsupported platform: ${platform}`,
timestamp: new Date().toISOString()
}
}
// 保存测试结果
await redis.saveAccountTestResult(accountId, platform, testResult)
// 更新最后测试时间
await redis.setAccountLastTestTime(accountId, platform)
// 记录日志
if (testResult.success) {
logger.info(
`✅ Scheduled test passed for ${platform} account ${accountId} (${testResult.latencyMs}ms)`
)
} else {
logger.warn(
`❌ Scheduled test failed for ${platform} account ${accountId}: ${testResult.error}`
)
}
return testResult
} catch (error) {
logger.error(`❌ Error testing ${platform} account ${accountId}:`, error)
const errorResult = {
success: false,
error: error.message,
timestamp: new Date().toISOString()
}
await redis.saveAccountTestResult(accountId, platform, errorResult)
await redis.setAccountLastTestTime(accountId, platform)
return errorResult
} finally {
this.testingAccounts.delete(accountKey)
}
}
/**
* 测试 Claude 账户
* @param {string} accountId
* @param {string} model - 测试使用的模型
* @private
*/
async _testClaudeAccount(accountId, model) {
const claudeRelayService = require('./claudeRelayService')
return await claudeRelayService.testAccountConnectionSync(accountId, model)
}
/**
* 测试 Gemini 账户
* @param {string} _accountId
* @param {string} _model
* @private
*/
async _testGeminiAccount(_accountId, _model) {
// Gemini 测试暂时返回未实现
return {
success: false,
error: 'Gemini scheduled test not implemented yet',
timestamp: new Date().toISOString()
}
}
/**
* 测试 OpenAI 账户
* @param {string} _accountId
* @param {string} _model
* @private
*/
async _testOpenAIAccount(_accountId, _model) {
// OpenAI 测试暂时返回未实现
return {
success: false,
error: 'OpenAI scheduled test not implemented yet',
timestamp: new Date().toISOString()
}
}
/**
* 手动触发账户测试
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @param {string} model - 测试使用的模型
* @returns {Promise<Object>} 测试结果
*/
async triggerTest(accountId, platform, model = 'claude-sonnet-4-5-20250929') {
logger.info(`🎯 Manual test triggered for ${platform} account: ${accountId} (model: ${model})`)
return await this._runAccountTest(accountId, platform, model)
}
/**
* 获取账户测试历史
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @returns {Promise<Array>} 测试历史
*/
async getTestHistory(accountId, platform) {
return await redis.getAccountTestHistory(accountId, platform)
}
/**
* 获取账户测试配置
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @returns {Promise<Object|null>}
*/
async getTestConfig(accountId, platform) {
return await redis.getAccountTestConfig(accountId, platform)
}
/**
* 设置账户测试配置
* @param {string} accountId - 账户ID
* @param {string} platform - 平台类型
* @param {Object} testConfig - 测试配置 { enabled: boolean, cronExpression: string, model: string }
* @returns {Promise<void>}
*/
async setTestConfig(accountId, platform, testConfig) {
// 验证 cron 表达式
if (testConfig.cronExpression && !this.validateCronExpression(testConfig.cronExpression)) {
throw new Error(`Invalid cron expression: ${testConfig.cronExpression}`)
}
await redis.saveAccountTestConfig(accountId, platform, testConfig)
logger.info(
`📝 Test config updated for ${platform} account ${accountId}: enabled=${testConfig.enabled}, cronExpression=${testConfig.cronExpression}, model=${testConfig.model}`
)
// 立即刷新任务,使配置立即生效
if (this.isStarted) {
await this._refreshAllTasks()
}
}
/**
* 更新单个账户的定时任务(配置变更时调用)
* @param {string} accountId
* @param {string} platform
*/
async refreshAccountTask(accountId, platform) {
if (!this.isStarted) {
return
}
const accountKey = `${platform}:${accountId}`
const testConfig = await redis.getAccountTestConfig(accountId, platform)
// 停止现有任务
const existingTask = this.scheduledTasks.get(accountKey)
if (existingTask) {
existingTask.task.stop()
this.scheduledTasks.delete(accountKey)
}
// 如果启用且有有效的 cron 表达式,创建新任务
if (testConfig?.enabled && testConfig?.cronExpression) {
this._createCronTask(accountId, platform, testConfig.cronExpression, testConfig.model)
logger.info(
`🔄 Refreshed cron task for ${accountKey}: ${testConfig.cronExpression}, model: ${testConfig.model}`
)
}
}
/**
* 获取调度器状态
* @returns {Object}
*/
getStatus() {
const tasks = []
for (const [accountKey, taskInfo] of this.scheduledTasks.entries()) {
tasks.push({
accountKey,
accountId: taskInfo.accountId,
platform: taskInfo.platform,
cronExpression: taskInfo.cronExpression,
model: taskInfo.model
})
}
return {
running: this.isStarted,
refreshIntervalMs: this.refreshIntervalMs,
maxConcurrentTests: this.maxConcurrentTests,
scheduledTasksCount: this.scheduledTasks.size,
scheduledTasks: tasks,
currentlyTesting: Array.from(this.testingAccounts)
}
}
}
// 单例模式
const accountTestSchedulerService = new AccountTestSchedulerService()
module.exports = accountTestSchedulerService

View File

@@ -2456,28 +2456,35 @@ class ClaudeRelayService {
}
}
// 🔧 准备测试请求的公共逻辑(供 testAccountConnection 和 testAccountConnectionSync 共用)
async _prepareAccountForTest(accountId) {
// 获取账户信息
const account = await claudeAccountService.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
// 获取有效的访问token
const accessToken = await claudeAccountService.getValidAccessToken(accountId)
if (!accessToken) {
throw new Error('Failed to get valid access token')
}
// 获取代理配置
const proxyAgent = await this._getProxyAgent(accountId)
return { account, accessToken, proxyAgent }
}
// 🧪 测试账号连接供Admin API使用直接复用 _makeClaudeStreamRequestWithUsageCapture
async testAccountConnection(accountId, responseStream) {
const testRequestBody = createClaudeTestPayload('claude-sonnet-4-5-20250929', { stream: true })
async testAccountConnection(accountId, responseStream, model = 'claude-sonnet-4-5-20250929') {
const testRequestBody = createClaudeTestPayload(model, { stream: true })
try {
// 获取账户信息
const account = await claudeAccountService.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const { account, accessToken, proxyAgent } = await this._prepareAccountForTest(accountId)
logger.info(`🧪 Testing Claude account connection: ${account.name} (${accountId})`)
// 获取有效的访问token
const accessToken = await claudeAccountService.getValidAccessToken(accountId)
if (!accessToken) {
throw new Error('Failed to get valid access token')
}
// 获取代理配置
const proxyAgent = await this._getProxyAgent(accountId)
// 设置响应头
if (!responseStream.headersSent) {
const existingConnection = responseStream.getHeader
@@ -2526,6 +2533,125 @@ class ClaudeRelayService {
}
}
// 🧪 非流式测试账号连接(供定时任务使用)
// 复用流式请求方法,收集结果后返回
async testAccountConnectionSync(accountId, model = 'claude-sonnet-4-5-20250929') {
const testRequestBody = createClaudeTestPayload(model, { stream: true })
const startTime = Date.now()
try {
// 使用公共方法准备测试所需的账户信息、token 和代理
const { account, accessToken, proxyAgent } = await this._prepareAccountForTest(accountId)
logger.info(`🧪 Testing Claude account connection (sync): ${account.name} (${accountId})`)
// 创建一个收集器来捕获流式响应
let responseText = ''
let capturedUsage = null
let capturedModel = model
let hasError = false
let errorMessage = ''
// 创建模拟的响应流对象
const mockResponseStream = {
headersSent: true, // 跳过设置响应头
write: (data) => {
// 解析 SSE 数据
if (typeof data === 'string' && data.startsWith('data: ')) {
try {
const jsonStr = data.replace('data: ', '').trim()
if (jsonStr && jsonStr !== '[DONE]') {
const parsed = JSON.parse(jsonStr)
// 提取文本内容
if (parsed.type === 'content_block_delta' && parsed.delta?.text) {
responseText += parsed.delta.text
}
// 提取 usage 信息
if (parsed.type === 'message_delta' && parsed.usage) {
capturedUsage = parsed.usage
}
// 提取模型信息
if (parsed.type === 'message_start' && parsed.message?.model) {
capturedModel = parsed.message.model
}
// 检测错误
if (parsed.type === 'error') {
hasError = true
errorMessage = parsed.error?.message || 'Unknown error'
}
}
} catch {
// 忽略解析错误
}
}
return true
},
end: () => {},
on: () => {},
once: () => {},
emit: () => {},
writable: true
}
// 复用流式请求方法
await this._makeClaudeStreamRequestWithUsageCapture(
testRequestBody,
accessToken,
proxyAgent,
{}, // clientHeaders - 测试不需要客户端headers
mockResponseStream,
null, // usageCallback - 测试不需要统计
accountId,
'claude-official', // accountType
null, // sessionHash - 测试不需要会话
null, // streamTransformer - 不需要转换,直接解析原始格式
{}, // requestOptions
false // isDedicatedOfficialAccount
)
const latencyMs = Date.now() - startTime
if (hasError) {
logger.warn(`⚠️ Test completed with error for account: ${account.name} - ${errorMessage}`)
return {
success: false,
error: errorMessage,
latencyMs,
timestamp: new Date().toISOString()
}
}
logger.info(`✅ Test completed for account: ${account.name} (${latencyMs}ms)`)
return {
success: true,
message: responseText.substring(0, 200), // 截取前200字符
latencyMs,
model: capturedModel,
usage: capturedUsage,
timestamp: new Date().toISOString()
}
} catch (error) {
const latencyMs = Date.now() - startTime
logger.error(`❌ Test account connection (sync) failed:`, error.message)
// 提取错误详情
let errorMessage = error.message
if (error.response) {
errorMessage =
error.response.data?.error?.message || error.response.statusText || error.message
}
return {
success: false,
error: errorMessage,
statusCode: error.response?.status,
latencyMs,
timestamp: new Date().toISOString()
}
}
}
// 🎯 健康检查
async healthCheck() {
try {