mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:43:35 +00:00
feat: 合并 PR #578 并接入统一定价服务
This commit is contained in:
340
scripts/test-billing-events.js
Executable file
340
scripts/test-billing-events.js
Executable file
@@ -0,0 +1,340 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
/**
|
||||
* 计费事件测试脚本
|
||||
*
|
||||
* 用于测试计费事件的发布和消费功能
|
||||
*
|
||||
* 使用方法:
|
||||
* node scripts/test-billing-events.js [command]
|
||||
*
|
||||
* 命令:
|
||||
* publish - 发布测试事件
|
||||
* consume - 消费事件(测试模式)
|
||||
* info - 查看队列状态
|
||||
* clear - 清空队列(危险操作)
|
||||
*/
|
||||
|
||||
const path = require('path')
|
||||
const Redis = require('ioredis')
|
||||
|
||||
// 加载配置
|
||||
require('dotenv').config({ path: path.join(__dirname, '../.env') })
|
||||
|
||||
const config = {
|
||||
host: process.env.REDIS_HOST || 'localhost',
|
||||
port: parseInt(process.env.REDIS_PORT) || 6379,
|
||||
password: process.env.REDIS_PASSWORD || '',
|
||||
db: parseInt(process.env.REDIS_DB) || 0
|
||||
}
|
||||
|
||||
const redis = new Redis(config)
|
||||
const STREAM_KEY = 'billing:events'
|
||||
|
||||
// ========================================
|
||||
// 命令实现
|
||||
// ========================================
|
||||
|
||||
/**
|
||||
* 发布测试事件
|
||||
*/
|
||||
async function publishTestEvent() {
|
||||
console.log('📤 Publishing test billing event...')
|
||||
|
||||
const testEvent = {
|
||||
eventId: `test-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
|
||||
eventType: 'usage.recorded',
|
||||
timestamp: new Date().toISOString(),
|
||||
version: '1.0',
|
||||
apiKey: {
|
||||
id: 'test-key-123',
|
||||
name: 'Test API Key',
|
||||
userId: 'test-user-456'
|
||||
},
|
||||
usage: {
|
||||
model: 'claude-sonnet-4-20250514',
|
||||
inputTokens: 1500,
|
||||
outputTokens: 800,
|
||||
cacheCreateTokens: 200,
|
||||
cacheReadTokens: 100,
|
||||
ephemeral5mTokens: 150,
|
||||
ephemeral1hTokens: 50,
|
||||
totalTokens: 2600
|
||||
},
|
||||
cost: {
|
||||
total: 0.0156,
|
||||
currency: 'USD',
|
||||
breakdown: {
|
||||
input: 0.0045,
|
||||
output: 0.012,
|
||||
cacheCreate: 0.00075,
|
||||
cacheRead: 0.00003,
|
||||
ephemeral5m: 0.0005625,
|
||||
ephemeral1h: 0.0001875
|
||||
}
|
||||
},
|
||||
account: {
|
||||
id: 'test-account-789',
|
||||
type: 'claude-official'
|
||||
},
|
||||
context: {
|
||||
isLongContext: false,
|
||||
requestTimestamp: new Date().toISOString()
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const messageId = await redis.xadd(
|
||||
STREAM_KEY,
|
||||
'MAXLEN',
|
||||
'~',
|
||||
100000,
|
||||
'*',
|
||||
'data',
|
||||
JSON.stringify(testEvent)
|
||||
)
|
||||
|
||||
console.log('✅ Event published successfully!')
|
||||
console.log(` Message ID: ${messageId}`)
|
||||
console.log(` Event ID: ${testEvent.eventId}`)
|
||||
console.log(` Cost: $${testEvent.cost.total}`)
|
||||
} catch (error) {
|
||||
console.error('❌ Failed to publish event:', error.message)
|
||||
process.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 消费事件(测试模式,不创建消费者组)
|
||||
*/
|
||||
async function consumeTestEvents() {
|
||||
console.log('📬 Consuming test events...')
|
||||
console.log(' Press Ctrl+C to stop\n')
|
||||
|
||||
let isRunning = true
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
console.log('\n⏹️ Stopping consumer...')
|
||||
isRunning = false
|
||||
})
|
||||
|
||||
let lastId = '0' // 从头开始
|
||||
|
||||
while (isRunning) {
|
||||
try {
|
||||
// 使用 XREAD 而不是 XREADGROUP(测试模式)
|
||||
const messages = await redis.xread('BLOCK', 5000, 'COUNT', 10, 'STREAMS', STREAM_KEY, lastId)
|
||||
|
||||
if (!messages || messages.length === 0) {
|
||||
continue
|
||||
}
|
||||
|
||||
const [streamKey, entries] = messages[0]
|
||||
console.log(`📬 Received ${entries.length} messages from ${streamKey}\n`)
|
||||
|
||||
for (const [messageId, fields] of entries) {
|
||||
try {
|
||||
const data = {}
|
||||
for (let i = 0; i < fields.length; i += 2) {
|
||||
data[fields[i]] = fields[i + 1]
|
||||
}
|
||||
|
||||
const event = JSON.parse(data.data)
|
||||
|
||||
console.log(`📊 Event: ${event.eventId}`)
|
||||
console.log(` API Key: ${event.apiKey.name} (${event.apiKey.id})`)
|
||||
console.log(` Model: ${event.usage.model}`)
|
||||
console.log(` Tokens: ${event.usage.totalTokens}`)
|
||||
console.log(` Cost: $${event.cost.total.toFixed(6)}`)
|
||||
console.log(` Timestamp: ${event.timestamp}`)
|
||||
console.log('')
|
||||
|
||||
lastId = messageId // 更新位置
|
||||
} catch (parseError) {
|
||||
console.error(`❌ Failed to parse message ${messageId}:`, parseError.message)
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
if (isRunning) {
|
||||
console.error('❌ Error consuming messages:', error.message)
|
||||
await new Promise((resolve) => setTimeout(resolve, 5000))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
console.log('👋 Consumer stopped')
|
||||
}
|
||||
|
||||
/**
|
||||
* 查看队列状态
|
||||
*/
|
||||
async function showQueueInfo() {
|
||||
console.log('📊 Queue Information\n')
|
||||
|
||||
try {
|
||||
// Stream 长度
|
||||
const length = await redis.xlen(STREAM_KEY)
|
||||
console.log(`Stream: ${STREAM_KEY}`)
|
||||
console.log(`Length: ${length} messages\n`)
|
||||
|
||||
if (length === 0) {
|
||||
console.log('ℹ️ Queue is empty')
|
||||
return
|
||||
}
|
||||
|
||||
// Stream 详细信息
|
||||
const info = await redis.xinfo('STREAM', STREAM_KEY)
|
||||
const infoObj = {}
|
||||
for (let i = 0; i < info.length; i += 2) {
|
||||
infoObj[info[i]] = info[i + 1]
|
||||
}
|
||||
|
||||
console.log('Stream Details:')
|
||||
console.log(` First Entry ID: ${infoObj['first-entry'] ? infoObj['first-entry'][0] : 'N/A'}`)
|
||||
console.log(` Last Entry ID: ${infoObj['last-entry'] ? infoObj['last-entry'][0] : 'N/A'}`)
|
||||
console.log(` Consumer Groups: ${infoObj.groups || 0}\n`)
|
||||
|
||||
// 消费者组信息
|
||||
if (infoObj.groups > 0) {
|
||||
console.log('Consumer Groups:')
|
||||
const groups = await redis.xinfo('GROUPS', STREAM_KEY)
|
||||
|
||||
for (let i = 0; i < groups.length; i++) {
|
||||
const group = groups[i]
|
||||
const groupObj = {}
|
||||
for (let j = 0; j < group.length; j += 2) {
|
||||
groupObj[group[j]] = group[j + 1]
|
||||
}
|
||||
|
||||
console.log(`\n Group: ${groupObj.name}`)
|
||||
console.log(` Consumers: ${groupObj.consumers}`)
|
||||
console.log(` Pending: ${groupObj.pending}`)
|
||||
console.log(` Last Delivered ID: ${groupObj['last-delivered-id']}`)
|
||||
|
||||
// 消费者详情
|
||||
if (groupObj.consumers > 0) {
|
||||
const consumers = await redis.xinfo('CONSUMERS', STREAM_KEY, groupObj.name)
|
||||
console.log(' Consumer Details:')
|
||||
|
||||
for (let k = 0; k < consumers.length; k++) {
|
||||
const consumer = consumers[k]
|
||||
const consumerObj = {}
|
||||
for (let l = 0; l < consumer.length; l += 2) {
|
||||
consumerObj[consumer[l]] = consumer[l + 1]
|
||||
}
|
||||
|
||||
console.log(` - ${consumerObj.name}`)
|
||||
console.log(` Pending: ${consumerObj.pending}`)
|
||||
console.log(` Idle: ${Math.round(consumerObj.idle / 1000)}s`)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 最新 5 条消息
|
||||
console.log('\n📬 Latest 5 Messages:')
|
||||
const latest = await redis.xrevrange(STREAM_KEY, '+', '-', 'COUNT', 5)
|
||||
|
||||
if (latest.length === 0) {
|
||||
console.log(' No messages')
|
||||
} else {
|
||||
for (const [messageId, fields] of latest) {
|
||||
const data = {}
|
||||
for (let i = 0; i < fields.length; i += 2) {
|
||||
data[fields[i]] = fields[i + 1]
|
||||
}
|
||||
|
||||
try {
|
||||
const event = JSON.parse(data.data)
|
||||
console.log(`\n ${messageId}`)
|
||||
console.log(` Event ID: ${event.eventId}`)
|
||||
console.log(` Model: ${event.usage.model}`)
|
||||
console.log(` Cost: $${event.cost.total.toFixed(6)}`)
|
||||
console.log(` Time: ${event.timestamp}`)
|
||||
} catch (e) {
|
||||
console.log(`\n ${messageId} (Parse Error)`)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('❌ Failed to get queue info:', error.message)
|
||||
process.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空队列(危险操作)
|
||||
*/
|
||||
async function clearQueue() {
|
||||
console.log('⚠️ WARNING: This will delete all messages in the queue!')
|
||||
console.log(` Stream: ${STREAM_KEY}`)
|
||||
|
||||
// 简单的确认机制
|
||||
const readline = require('readline')
|
||||
const rl = readline.createInterface({
|
||||
input: process.stdin,
|
||||
output: process.stdout
|
||||
})
|
||||
|
||||
rl.question('Type "yes" to confirm: ', async (answer) => {
|
||||
if (answer.toLowerCase() === 'yes') {
|
||||
try {
|
||||
await redis.del(STREAM_KEY)
|
||||
console.log('✅ Queue cleared successfully')
|
||||
} catch (error) {
|
||||
console.error('❌ Failed to clear queue:', error.message)
|
||||
}
|
||||
} else {
|
||||
console.log('❌ Operation cancelled')
|
||||
}
|
||||
rl.close()
|
||||
redis.quit()
|
||||
})
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// CLI 处理
|
||||
// ========================================
|
||||
|
||||
async function main() {
|
||||
const command = process.argv[2] || 'info'
|
||||
|
||||
console.log('🔧 Billing Events Test Tool\n')
|
||||
|
||||
try {
|
||||
switch (command) {
|
||||
case 'publish':
|
||||
await publishTestEvent()
|
||||
break
|
||||
|
||||
case 'consume':
|
||||
await consumeTestEvents()
|
||||
break
|
||||
|
||||
case 'info':
|
||||
await showQueueInfo()
|
||||
break
|
||||
|
||||
case 'clear':
|
||||
await clearQueue()
|
||||
return // clearQueue 会自己关闭连接
|
||||
|
||||
default:
|
||||
console.error(`❌ Unknown command: ${command}`)
|
||||
console.log('\nAvailable commands:')
|
||||
console.log(' publish - Publish a test event')
|
||||
console.log(' consume - Consume events (test mode)')
|
||||
console.log(' info - Show queue status')
|
||||
console.log(' clear - Clear the queue (dangerous)')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
await redis.quit()
|
||||
} catch (error) {
|
||||
console.error('💥 Fatal error:', error)
|
||||
await redis.quit()
|
||||
process.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
main()
|
||||
Reference in New Issue
Block a user