diff --git a/scripts/test-billing-events.js b/scripts/test-billing-events.js new file mode 100755 index 00000000..68edc3f6 --- /dev/null +++ b/scripts/test-billing-events.js @@ -0,0 +1,340 @@ +#!/usr/bin/env node + +/** + * 计费事件测试脚本 + * + * 用于测试计费事件的发布和消费功能 + * + * 使用方法: + * node scripts/test-billing-events.js [command] + * + * 命令: + * publish - 发布测试事件 + * consume - 消费事件(测试模式) + * info - 查看队列状态 + * clear - 清空队列(危险操作) + */ + +const path = require('path') +const Redis = require('ioredis') + +// 加载配置 +require('dotenv').config({ path: path.join(__dirname, '../.env') }) + +const config = { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT) || 6379, + password: process.env.REDIS_PASSWORD || '', + db: parseInt(process.env.REDIS_DB) || 0 +} + +const redis = new Redis(config) +const STREAM_KEY = 'billing:events' + +// ======================================== +// 命令实现 +// ======================================== + +/** + * 发布测试事件 + */ +async function publishTestEvent() { + console.log('📤 Publishing test billing event...') + + const testEvent = { + eventId: `test-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, + eventType: 'usage.recorded', + timestamp: new Date().toISOString(), + version: '1.0', + apiKey: { + id: 'test-key-123', + name: 'Test API Key', + userId: 'test-user-456' + }, + usage: { + model: 'claude-sonnet-4-20250514', + inputTokens: 1500, + outputTokens: 800, + cacheCreateTokens: 200, + cacheReadTokens: 100, + ephemeral5mTokens: 150, + ephemeral1hTokens: 50, + totalTokens: 2600 + }, + cost: { + total: 0.0156, + currency: 'USD', + breakdown: { + input: 0.0045, + output: 0.012, + cacheCreate: 0.00075, + cacheRead: 0.00003, + ephemeral5m: 0.0005625, + ephemeral1h: 0.0001875 + } + }, + account: { + id: 'test-account-789', + type: 'claude-official' + }, + context: { + isLongContext: false, + requestTimestamp: new Date().toISOString() + } + } + + try { + const messageId = await redis.xadd( + STREAM_KEY, + 'MAXLEN', + '~', + 100000, + '*', + 'data', + JSON.stringify(testEvent) + ) + + console.log('✅ Event published successfully!') + console.log(` Message ID: ${messageId}`) + console.log(` Event ID: ${testEvent.eventId}`) + console.log(` Cost: $${testEvent.cost.total}`) + } catch (error) { + console.error('❌ Failed to publish event:', error.message) + process.exit(1) + } +} + +/** + * 消费事件(测试模式,不创建消费者组) + */ +async function consumeTestEvents() { + console.log('📬 Consuming test events...') + console.log(' Press Ctrl+C to stop\n') + + let isRunning = true + + process.on('SIGINT', () => { + console.log('\n⏹️ Stopping consumer...') + isRunning = false + }) + + let lastId = '0' // 从头开始 + + while (isRunning) { + try { + // 使用 XREAD 而不是 XREADGROUP(测试模式) + const messages = await redis.xread('BLOCK', 5000, 'COUNT', 10, 'STREAMS', STREAM_KEY, lastId) + + if (!messages || messages.length === 0) { + continue + } + + const [streamKey, entries] = messages[0] + console.log(`📬 Received ${entries.length} messages from ${streamKey}\n`) + + for (const [messageId, fields] of entries) { + try { + const data = {} + for (let i = 0; i < fields.length; i += 2) { + data[fields[i]] = fields[i + 1] + } + + const event = JSON.parse(data.data) + + console.log(`📊 Event: ${event.eventId}`) + console.log(` API Key: ${event.apiKey.name} (${event.apiKey.id})`) + console.log(` Model: ${event.usage.model}`) + console.log(` Tokens: ${event.usage.totalTokens}`) + console.log(` Cost: $${event.cost.total.toFixed(6)}`) + console.log(` Timestamp: ${event.timestamp}`) + console.log('') + + lastId = messageId // 更新位置 + } catch (parseError) { + console.error(`❌ Failed to parse message ${messageId}:`, parseError.message) + } + } + } catch (error) { + if (isRunning) { + console.error('❌ Error consuming messages:', error.message) + await new Promise((resolve) => setTimeout(resolve, 5000)) + } + } + } + + console.log('👋 Consumer stopped') +} + +/** + * 查看队列状态 + */ +async function showQueueInfo() { + console.log('📊 Queue Information\n') + + try { + // Stream 长度 + const length = await redis.xlen(STREAM_KEY) + console.log(`Stream: ${STREAM_KEY}`) + console.log(`Length: ${length} messages\n`) + + if (length === 0) { + console.log('ℹ️ Queue is empty') + return + } + + // Stream 详细信息 + const info = await redis.xinfo('STREAM', STREAM_KEY) + const infoObj = {} + for (let i = 0; i < info.length; i += 2) { + infoObj[info[i]] = info[i + 1] + } + + console.log('Stream Details:') + console.log(` First Entry ID: ${infoObj['first-entry'] ? infoObj['first-entry'][0] : 'N/A'}`) + console.log(` Last Entry ID: ${infoObj['last-entry'] ? infoObj['last-entry'][0] : 'N/A'}`) + console.log(` Consumer Groups: ${infoObj.groups || 0}\n`) + + // 消费者组信息 + if (infoObj.groups > 0) { + console.log('Consumer Groups:') + const groups = await redis.xinfo('GROUPS', STREAM_KEY) + + for (let i = 0; i < groups.length; i++) { + const group = groups[i] + const groupObj = {} + for (let j = 0; j < group.length; j += 2) { + groupObj[group[j]] = group[j + 1] + } + + console.log(`\n Group: ${groupObj.name}`) + console.log(` Consumers: ${groupObj.consumers}`) + console.log(` Pending: ${groupObj.pending}`) + console.log(` Last Delivered ID: ${groupObj['last-delivered-id']}`) + + // 消费者详情 + if (groupObj.consumers > 0) { + const consumers = await redis.xinfo('CONSUMERS', STREAM_KEY, groupObj.name) + console.log(' Consumer Details:') + + for (let k = 0; k < consumers.length; k++) { + const consumer = consumers[k] + const consumerObj = {} + for (let l = 0; l < consumer.length; l += 2) { + consumerObj[consumer[l]] = consumer[l + 1] + } + + console.log(` - ${consumerObj.name}`) + console.log(` Pending: ${consumerObj.pending}`) + console.log(` Idle: ${Math.round(consumerObj.idle / 1000)}s`) + } + } + } + } + + // 最新 5 条消息 + console.log('\n📬 Latest 5 Messages:') + const latest = await redis.xrevrange(STREAM_KEY, '+', '-', 'COUNT', 5) + + if (latest.length === 0) { + console.log(' No messages') + } else { + for (const [messageId, fields] of latest) { + const data = {} + for (let i = 0; i < fields.length; i += 2) { + data[fields[i]] = fields[i + 1] + } + + try { + const event = JSON.parse(data.data) + console.log(`\n ${messageId}`) + console.log(` Event ID: ${event.eventId}`) + console.log(` Model: ${event.usage.model}`) + console.log(` Cost: $${event.cost.total.toFixed(6)}`) + console.log(` Time: ${event.timestamp}`) + } catch (e) { + console.log(`\n ${messageId} (Parse Error)`) + } + } + } + } catch (error) { + console.error('❌ Failed to get queue info:', error.message) + process.exit(1) + } +} + +/** + * 清空队列(危险操作) + */ +async function clearQueue() { + console.log('⚠️ WARNING: This will delete all messages in the queue!') + console.log(` Stream: ${STREAM_KEY}`) + + // 简单的确认机制 + const readline = require('readline') + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout + }) + + rl.question('Type "yes" to confirm: ', async (answer) => { + if (answer.toLowerCase() === 'yes') { + try { + await redis.del(STREAM_KEY) + console.log('✅ Queue cleared successfully') + } catch (error) { + console.error('❌ Failed to clear queue:', error.message) + } + } else { + console.log('❌ Operation cancelled') + } + rl.close() + redis.quit() + }) +} + +// ======================================== +// CLI 处理 +// ======================================== + +async function main() { + const command = process.argv[2] || 'info' + + console.log('🔧 Billing Events Test Tool\n') + + try { + switch (command) { + case 'publish': + await publishTestEvent() + break + + case 'consume': + await consumeTestEvents() + break + + case 'info': + await showQueueInfo() + break + + case 'clear': + await clearQueue() + return // clearQueue 会自己关闭连接 + + default: + console.error(`❌ Unknown command: ${command}`) + console.log('\nAvailable commands:') + console.log(' publish - Publish a test event') + console.log(' consume - Consume events (test mode)') + console.log(' info - Show queue status') + console.log(' clear - Clear the queue (dangerous)') + process.exit(1) + } + + await redis.quit() + } catch (error) { + console.error('💥 Fatal error:', error) + await redis.quit() + process.exit(1) + } +} + +main() diff --git a/src/routes/openaiClaudeRoutes.js b/src/routes/openaiClaudeRoutes.js index 3492168c..032c242f 100644 --- a/src/routes/openaiClaudeRoutes.js +++ b/src/routes/openaiClaudeRoutes.js @@ -5,8 +5,6 @@ const express = require('express') const router = express.Router() -const fs = require('fs') -const path = require('path') const logger = require('../utils/logger') const { authenticateApiKey } = require('../middleware/auth') const claudeRelayService = require('../services/claudeRelayService') @@ -16,32 +14,7 @@ const unifiedClaudeScheduler = require('../services/unifiedClaudeScheduler') const claudeCodeHeadersService = require('../services/claudeCodeHeadersService') const sessionHelper = require('../utils/sessionHelper') const { updateRateLimitCounters } = require('../utils/rateLimitHelper') - -const dataDir = path.join(__dirname, '../../data') -const localPricingPath = path.join(dataDir, 'model_pricing.json') -const fallbackPricingPath = path.join( - __dirname, - '../../resources/model-pricing/model_prices_and_context_window.json' -) - -// 加载模型定价数据 -let modelPricingData = {} -try { - if (!fs.existsSync(dataDir)) { - fs.mkdirSync(dataDir, { recursive: true }) - } - - if (!fs.existsSync(localPricingPath) && fs.existsSync(fallbackPricingPath)) { - fs.copyFileSync(fallbackPricingPath, localPricingPath) - logger.warn('⚠️ 未找到 data/model_pricing.json,已使用备用价格文件初始化') - } - - const pricingContent = fs.readFileSync(localPricingPath, 'utf8') - modelPricingData = JSON.parse(pricingContent) - logger.info('✅ Model pricing data loaded successfully') -} catch (error) { - logger.error('❌ Failed to load model pricing data:', error) -} +const pricingService = require('../services/pricingService') // 🔧 辅助函数:检查 API Key 权限 function checkPermissions(apiKeyData, requiredPermission = 'claude') { @@ -155,7 +128,7 @@ router.get('/v1/models/:model', authenticateApiKey, async (req, res) => { } // 从 model_pricing.json 获取模型信息 - const modelData = modelPricingData[modelId] + const modelData = pricingService.getModelPricing(modelId) // 构建标准 OpenAI 格式的模型响应 let modelInfo diff --git a/src/services/apiKeyService.js b/src/services/apiKeyService.js index d187d54c..962d14ee 100644 --- a/src/services/apiKeyService.js +++ b/src/services/apiKeyService.js @@ -1125,11 +1125,53 @@ class ApiKeyService { logParts.push(`Total: ${totalTokens} tokens`) logger.database(`📊 Recorded usage: ${keyId} - ${logParts.join(', ')}`) + + // 🔔 发布计费事件到消息队列(异步非阻塞) + this._publishBillingEvent({ + keyId, + keyName: keyData?.name, + userId: keyData?.userId, + model, + inputTokens, + outputTokens, + cacheCreateTokens, + cacheReadTokens, + ephemeral5mTokens, + ephemeral1hTokens, + totalTokens, + cost: costInfo.totalCost || 0, + costBreakdown: { + input: costInfo.inputCost || 0, + output: costInfo.outputCost || 0, + cacheCreate: costInfo.cacheCreateCost || 0, + cacheRead: costInfo.cacheReadCost || 0, + ephemeral5m: costInfo.ephemeral5mCost || 0, + ephemeral1h: costInfo.ephemeral1hCost || 0 + }, + accountId, + accountType, + isLongContext: costInfo.isLongContextRequest || false, + requestTimestamp: usageRecord.timestamp + }).catch((err) => { + // 发布失败不影响主流程,只记录错误 + logger.warn('⚠️ Failed to publish billing event:', err.message) + }) } catch (error) { logger.error('❌ Failed to record usage:', error) } } + // 🔔 发布计费事件(内部方法) + async _publishBillingEvent(eventData) { + try { + const billingEventPublisher = require('./billingEventPublisher') + await billingEventPublisher.publishBillingEvent(eventData) + } catch (error) { + // 静默失败,不影响主流程 + logger.debug('Failed to publish billing event:', error.message) + } + } + // 🔐 生成密钥 _generateSecretKey() { return crypto.randomBytes(32).toString('hex') diff --git a/src/services/billingEventPublisher.js b/src/services/billingEventPublisher.js new file mode 100644 index 00000000..5be0a6b4 --- /dev/null +++ b/src/services/billingEventPublisher.js @@ -0,0 +1,224 @@ +const redis = require('../models/redis') +const logger = require('../utils/logger') + +/** + * 计费事件发布器 - 使用 Redis Stream 解耦计费系统 + * + * 设计原则: + * 1. 异步非阻塞: 发布失败不影响主流程 + * 2. 结构化数据: 使用标准化的事件格式 + * 3. 可追溯性: 每个事件包含完整上下文 + */ +class BillingEventPublisher { + constructor() { + this.streamKey = 'billing:events' + this.maxLength = 100000 // 保留最近 10 万条事件 + this.enabled = process.env.BILLING_EVENTS_ENABLED !== 'false' // 默认开启 + } + + /** + * 发布计费事件 + * @param {Object} eventData - 事件数据 + * @returns {Promise} - 事件ID 或 null + */ + async publishBillingEvent(eventData) { + if (!this.enabled) { + logger.debug('📭 Billing events disabled, skipping publish') + return null + } + + try { + const client = redis.getClientSafe() + + // 构建标准化事件 + const event = { + // 事件元数据 + eventId: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, + eventType: 'usage.recorded', + timestamp: new Date().toISOString(), + version: '1.0', + + // 核心计费数据 + apiKey: { + id: eventData.keyId, + name: eventData.keyName || null, + userId: eventData.userId || null + }, + + // 使用量详情 + usage: { + model: eventData.model, + inputTokens: eventData.inputTokens || 0, + outputTokens: eventData.outputTokens || 0, + cacheCreateTokens: eventData.cacheCreateTokens || 0, + cacheReadTokens: eventData.cacheReadTokens || 0, + ephemeral5mTokens: eventData.ephemeral5mTokens || 0, + ephemeral1hTokens: eventData.ephemeral1hTokens || 0, + totalTokens: eventData.totalTokens || 0 + }, + + // 费用详情 + cost: { + total: eventData.cost || 0, + currency: 'USD', + breakdown: { + input: eventData.costBreakdown?.input || 0, + output: eventData.costBreakdown?.output || 0, + cacheCreate: eventData.costBreakdown?.cacheCreate || 0, + cacheRead: eventData.costBreakdown?.cacheRead || 0, + ephemeral5m: eventData.costBreakdown?.ephemeral5m || 0, + ephemeral1h: eventData.costBreakdown?.ephemeral1h || 0 + } + }, + + // 账户信息 + account: { + id: eventData.accountId || null, + type: eventData.accountType || null + }, + + // 请求上下文 + context: { + isLongContext: eventData.isLongContext || false, + requestTimestamp: eventData.requestTimestamp || new Date().toISOString() + } + } + + // 使用 XADD 发布事件到 Stream + // MAXLEN ~ 10000: 近似截断,保持性能 + const messageId = await client.xadd( + this.streamKey, + 'MAXLEN', + '~', + this.maxLength, + '*', // 自动生成消息ID + 'data', + JSON.stringify(event) + ) + + logger.debug( + `📤 Published billing event: ${messageId} | Key: ${eventData.keyId} | Cost: $${event.cost.total.toFixed(6)}` + ) + + return messageId + } catch (error) { + // ⚠️ 发布失败不影响主流程,只记录错误 + logger.error('❌ Failed to publish billing event:', error) + return null + } + } + + /** + * 批量发布计费事件(优化性能) + * @param {Array} events - 事件数组 + * @returns {Promise} - 成功发布的事件数 + */ + async publishBatchBillingEvents(events) { + if (!this.enabled || !events || events.length === 0) { + return 0 + } + + try { + const client = redis.getClientSafe() + const pipeline = client.pipeline() + + events.forEach((eventData) => { + const event = { + eventId: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, + eventType: 'usage.recorded', + timestamp: new Date().toISOString(), + version: '1.0', + apiKey: { + id: eventData.keyId, + name: eventData.keyName || null + }, + usage: { + model: eventData.model, + inputTokens: eventData.inputTokens || 0, + outputTokens: eventData.outputTokens || 0, + totalTokens: eventData.totalTokens || 0 + }, + cost: { + total: eventData.cost || 0, + currency: 'USD' + } + } + + pipeline.xadd( + this.streamKey, + 'MAXLEN', + '~', + this.maxLength, + '*', + 'data', + JSON.stringify(event) + ) + }) + + const results = await pipeline.exec() + const successCount = results.filter((r) => r[0] === null).length + + logger.info(`📤 Batch published ${successCount}/${events.length} billing events`) + return successCount + } catch (error) { + logger.error('❌ Failed to batch publish billing events:', error) + return 0 + } + } + + /** + * 获取 Stream 信息(用于监控) + * @returns {Promise} + */ + async getStreamInfo() { + try { + const client = redis.getClientSafe() + const info = await client.xinfo('STREAM', this.streamKey) + + // 解析 Redis XINFO 返回的数组格式 + const result = {} + for (let i = 0; i < info.length; i += 2) { + result[info[i]] = info[i + 1] + } + + return { + length: result.length || 0, + firstEntry: result['first-entry'] || null, + lastEntry: result['last-entry'] || null, + groups: result.groups || 0 + } + } catch (error) { + if (error.message.includes('no such key')) { + return { length: 0, groups: 0 } + } + logger.error('❌ Failed to get stream info:', error) + return null + } + } + + /** + * 创建消费者组(供外部计费系统使用) + * @param {string} groupName - 消费者组名称 + * @returns {Promise} + */ + async createConsumerGroup(groupName = 'billing-system') { + try { + const client = redis.getClientSafe() + + // MKSTREAM: 如果 stream 不存在则创建 + await client.xgroup('CREATE', this.streamKey, groupName, '0', 'MKSTREAM') + + logger.success(`✅ Created consumer group: ${groupName}`) + return true + } catch (error) { + if (error.message.includes('BUSYGROUP')) { + logger.debug(`Consumer group ${groupName} already exists`) + return true + } + logger.error(`❌ Failed to create consumer group ${groupName}:`, error) + return false + } + } +} + +module.exports = new BillingEventPublisher() diff --git a/src/services/ccrAccountService.js b/src/services/ccrAccountService.js index fa967d58..eeb119c2 100644 --- a/src/services/ccrAccountService.js +++ b/src/services/ccrAccountService.js @@ -563,8 +563,21 @@ class CcrAccountService { if (!modelMapping || Object.keys(modelMapping).length === 0) { return true } - // 检查请求的模型是否在映射表的键中 - return Object.prototype.hasOwnProperty.call(modelMapping, requestedModel) + + // 检查请求的模型是否在映射表的键中(精确匹配) + if (Object.prototype.hasOwnProperty.call(modelMapping, requestedModel)) { + return true + } + + // 尝试大小写不敏感匹配 + const requestedModelLower = requestedModel.toLowerCase() + for (const key of Object.keys(modelMapping)) { + if (key.toLowerCase() === requestedModelLower) { + return true + } + } + + return false } // 🔄 获取映射后的模型名称 @@ -574,8 +587,21 @@ class CcrAccountService { return requestedModel } - // 返回映射后的模型名,如果不存在映射则返回原模型名 - return modelMapping[requestedModel] || requestedModel + // 精确匹配 + if (modelMapping[requestedModel]) { + return modelMapping[requestedModel] + } + + // 大小写不敏感匹配 + const requestedModelLower = requestedModel.toLowerCase() + for (const [key, value] of Object.entries(modelMapping)) { + if (key.toLowerCase() === requestedModelLower) { + return value + } + } + + // 如果不存在映射则返回原模型名 + return requestedModel } // 🔐 加密敏感数据 diff --git a/src/services/claudeConsoleAccountService.js b/src/services/claudeConsoleAccountService.js index dbdee522..61237e06 100644 --- a/src/services/claudeConsoleAccountService.js +++ b/src/services/claudeConsoleAccountService.js @@ -990,8 +990,20 @@ class ClaudeConsoleAccountService { return true } - // 检查请求的模型是否在映射表的键中 - return Object.prototype.hasOwnProperty.call(modelMapping, requestedModel) + // 检查请求的模型是否在映射表的键中(精确匹配) + if (Object.prototype.hasOwnProperty.call(modelMapping, requestedModel)) { + return true + } + + // 尝试大小写不敏感匹配 + const requestedModelLower = requestedModel.toLowerCase() + for (const key of Object.keys(modelMapping)) { + if (key.toLowerCase() === requestedModelLower) { + return true + } + } + + return false } // 🔄 获取映射后的模型名称 @@ -1001,8 +1013,21 @@ class ClaudeConsoleAccountService { return requestedModel } - // 返回映射后的模型,如果不存在则返回原模型 - return modelMapping[requestedModel] || requestedModel + // 精确匹配 + if (modelMapping[requestedModel]) { + return modelMapping[requestedModel] + } + + // 大小写不敏感匹配 + const requestedModelLower = requestedModel.toLowerCase() + for (const [key, value] of Object.entries(modelMapping)) { + if (key.toLowerCase() === requestedModelLower) { + return value + } + } + + // 如果不存在则返回原模型 + return requestedModel } // 💰 检查账户使用额度(基于实时统计数据) diff --git a/web/admin-spa/src/components/accounts/AccountForm.vue b/web/admin-spa/src/components/accounts/AccountForm.vue index da6a169e..b2d4e572 100644 --- a/web/admin-spa/src/components/accounts/AccountForm.vue +++ b/web/admin-spa/src/components/accounts/AccountForm.vue @@ -1276,6 +1276,15 @@ > + Sonnet 4 + + + + + + + + + + + +