mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 08:32:17 +00:00
341 lines
9.1 KiB
JavaScript
Executable File
341 lines
9.1 KiB
JavaScript
Executable File
#!/usr/bin/env node
|
||
|
||
/**
|
||
* 计费事件测试脚本
|
||
*
|
||
* 用于测试计费事件的发布和消费功能
|
||
*
|
||
* 使用方法:
|
||
* node scripts/test-billing-events.js [command]
|
||
*
|
||
* 命令:
|
||
* publish - 发布测试事件
|
||
* consume - 消费事件(测试模式)
|
||
* info - 查看队列状态
|
||
* clear - 清空队列(危险操作)
|
||
*/
|
||
|
||
const path = require('path')
|
||
const Redis = require('ioredis')
|
||
|
||
// 加载配置
|
||
require('dotenv').config({ path: path.join(__dirname, '../.env') })
|
||
|
||
const config = {
|
||
host: process.env.REDIS_HOST || 'localhost',
|
||
port: parseInt(process.env.REDIS_PORT) || 6379,
|
||
password: process.env.REDIS_PASSWORD || '',
|
||
db: parseInt(process.env.REDIS_DB) || 0
|
||
}
|
||
|
||
const redis = new Redis(config)
|
||
const STREAM_KEY = 'billing:events'
|
||
|
||
// ========================================
|
||
// 命令实现
|
||
// ========================================
|
||
|
||
/**
|
||
* 发布测试事件
|
||
*/
|
||
async function publishTestEvent() {
|
||
console.log('📤 Publishing test billing event...')
|
||
|
||
const testEvent = {
|
||
eventId: `test-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
|
||
eventType: 'usage.recorded',
|
||
timestamp: new Date().toISOString(),
|
||
version: '1.0',
|
||
apiKey: {
|
||
id: 'test-key-123',
|
||
name: 'Test API Key',
|
||
userId: 'test-user-456'
|
||
},
|
||
usage: {
|
||
model: 'claude-sonnet-4-20250514',
|
||
inputTokens: 1500,
|
||
outputTokens: 800,
|
||
cacheCreateTokens: 200,
|
||
cacheReadTokens: 100,
|
||
ephemeral5mTokens: 150,
|
||
ephemeral1hTokens: 50,
|
||
totalTokens: 2600
|
||
},
|
||
cost: {
|
||
total: 0.0156,
|
||
currency: 'USD',
|
||
breakdown: {
|
||
input: 0.0045,
|
||
output: 0.012,
|
||
cacheCreate: 0.00075,
|
||
cacheRead: 0.00003,
|
||
ephemeral5m: 0.0005625,
|
||
ephemeral1h: 0.0001875
|
||
}
|
||
},
|
||
account: {
|
||
id: 'test-account-789',
|
||
type: 'claude-official'
|
||
},
|
||
context: {
|
||
isLongContext: false,
|
||
requestTimestamp: new Date().toISOString()
|
||
}
|
||
}
|
||
|
||
try {
|
||
const messageId = await redis.xadd(
|
||
STREAM_KEY,
|
||
'MAXLEN',
|
||
'~',
|
||
100000,
|
||
'*',
|
||
'data',
|
||
JSON.stringify(testEvent)
|
||
)
|
||
|
||
console.log('✅ Event published successfully!')
|
||
console.log(` Message ID: ${messageId}`)
|
||
console.log(` Event ID: ${testEvent.eventId}`)
|
||
console.log(` Cost: $${testEvent.cost.total}`)
|
||
} catch (error) {
|
||
console.error('❌ Failed to publish event:', error.message)
|
||
process.exit(1)
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 消费事件(测试模式,不创建消费者组)
|
||
*/
|
||
async function consumeTestEvents() {
|
||
console.log('📬 Consuming test events...')
|
||
console.log(' Press Ctrl+C to stop\n')
|
||
|
||
let isRunning = true
|
||
|
||
process.on('SIGINT', () => {
|
||
console.log('\n⏹️ Stopping consumer...')
|
||
isRunning = false
|
||
})
|
||
|
||
let lastId = '0' // 从头开始
|
||
|
||
while (isRunning) {
|
||
try {
|
||
// 使用 XREAD 而不是 XREADGROUP(测试模式)
|
||
const messages = await redis.xread('BLOCK', 5000, 'COUNT', 10, 'STREAMS', STREAM_KEY, lastId)
|
||
|
||
if (!messages || messages.length === 0) {
|
||
continue
|
||
}
|
||
|
||
const [streamKey, entries] = messages[0]
|
||
console.log(`📬 Received ${entries.length} messages from ${streamKey}\n`)
|
||
|
||
for (const [messageId, fields] of entries) {
|
||
try {
|
||
const data = {}
|
||
for (let i = 0; i < fields.length; i += 2) {
|
||
data[fields[i]] = fields[i + 1]
|
||
}
|
||
|
||
const event = JSON.parse(data.data)
|
||
|
||
console.log(`📊 Event: ${event.eventId}`)
|
||
console.log(` API Key: ${event.apiKey.name} (${event.apiKey.id})`)
|
||
console.log(` Model: ${event.usage.model}`)
|
||
console.log(` Tokens: ${event.usage.totalTokens}`)
|
||
console.log(` Cost: $${event.cost.total.toFixed(6)}`)
|
||
console.log(` Timestamp: ${event.timestamp}`)
|
||
console.log('')
|
||
|
||
lastId = messageId // 更新位置
|
||
} catch (parseError) {
|
||
console.error(`❌ Failed to parse message ${messageId}:`, parseError.message)
|
||
}
|
||
}
|
||
} catch (error) {
|
||
if (isRunning) {
|
||
console.error('❌ Error consuming messages:', error.message)
|
||
await new Promise((resolve) => setTimeout(resolve, 5000))
|
||
}
|
||
}
|
||
}
|
||
|
||
console.log('👋 Consumer stopped')
|
||
}
|
||
|
||
/**
|
||
* 查看队列状态
|
||
*/
|
||
async function showQueueInfo() {
|
||
console.log('📊 Queue Information\n')
|
||
|
||
try {
|
||
// Stream 长度
|
||
const length = await redis.xlen(STREAM_KEY)
|
||
console.log(`Stream: ${STREAM_KEY}`)
|
||
console.log(`Length: ${length} messages\n`)
|
||
|
||
if (length === 0) {
|
||
console.log('ℹ️ Queue is empty')
|
||
return
|
||
}
|
||
|
||
// Stream 详细信息
|
||
const info = await redis.xinfo('STREAM', STREAM_KEY)
|
||
const infoObj = {}
|
||
for (let i = 0; i < info.length; i += 2) {
|
||
infoObj[info[i]] = info[i + 1]
|
||
}
|
||
|
||
console.log('Stream Details:')
|
||
console.log(` First Entry ID: ${infoObj['first-entry'] ? infoObj['first-entry'][0] : 'N/A'}`)
|
||
console.log(` Last Entry ID: ${infoObj['last-entry'] ? infoObj['last-entry'][0] : 'N/A'}`)
|
||
console.log(` Consumer Groups: ${infoObj.groups || 0}\n`)
|
||
|
||
// 消费者组信息
|
||
if (infoObj.groups > 0) {
|
||
console.log('Consumer Groups:')
|
||
const groups = await redis.xinfo('GROUPS', STREAM_KEY)
|
||
|
||
for (let i = 0; i < groups.length; i++) {
|
||
const group = groups[i]
|
||
const groupObj = {}
|
||
for (let j = 0; j < group.length; j += 2) {
|
||
groupObj[group[j]] = group[j + 1]
|
||
}
|
||
|
||
console.log(`\n Group: ${groupObj.name}`)
|
||
console.log(` Consumers: ${groupObj.consumers}`)
|
||
console.log(` Pending: ${groupObj.pending}`)
|
||
console.log(` Last Delivered ID: ${groupObj['last-delivered-id']}`)
|
||
|
||
// 消费者详情
|
||
if (groupObj.consumers > 0) {
|
||
const consumers = await redis.xinfo('CONSUMERS', STREAM_KEY, groupObj.name)
|
||
console.log(' Consumer Details:')
|
||
|
||
for (let k = 0; k < consumers.length; k++) {
|
||
const consumer = consumers[k]
|
||
const consumerObj = {}
|
||
for (let l = 0; l < consumer.length; l += 2) {
|
||
consumerObj[consumer[l]] = consumer[l + 1]
|
||
}
|
||
|
||
console.log(` - ${consumerObj.name}`)
|
||
console.log(` Pending: ${consumerObj.pending}`)
|
||
console.log(` Idle: ${Math.round(consumerObj.idle / 1000)}s`)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 最新 5 条消息
|
||
console.log('\n📬 Latest 5 Messages:')
|
||
const latest = await redis.xrevrange(STREAM_KEY, '+', '-', 'COUNT', 5)
|
||
|
||
if (latest.length === 0) {
|
||
console.log(' No messages')
|
||
} else {
|
||
for (const [messageId, fields] of latest) {
|
||
const data = {}
|
||
for (let i = 0; i < fields.length; i += 2) {
|
||
data[fields[i]] = fields[i + 1]
|
||
}
|
||
|
||
try {
|
||
const event = JSON.parse(data.data)
|
||
console.log(`\n ${messageId}`)
|
||
console.log(` Event ID: ${event.eventId}`)
|
||
console.log(` Model: ${event.usage.model}`)
|
||
console.log(` Cost: $${event.cost.total.toFixed(6)}`)
|
||
console.log(` Time: ${event.timestamp}`)
|
||
} catch (e) {
|
||
console.log(`\n ${messageId} (Parse Error)`)
|
||
}
|
||
}
|
||
}
|
||
} catch (error) {
|
||
console.error('❌ Failed to get queue info:', error.message)
|
||
process.exit(1)
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 清空队列(危险操作)
|
||
*/
|
||
async function clearQueue() {
|
||
console.log('⚠️ WARNING: This will delete all messages in the queue!')
|
||
console.log(` Stream: ${STREAM_KEY}`)
|
||
|
||
// 简单的确认机制
|
||
const readline = require('readline')
|
||
const rl = readline.createInterface({
|
||
input: process.stdin,
|
||
output: process.stdout
|
||
})
|
||
|
||
rl.question('Type "yes" to confirm: ', async (answer) => {
|
||
if (answer.toLowerCase() === 'yes') {
|
||
try {
|
||
await redis.del(STREAM_KEY)
|
||
console.log('✅ Queue cleared successfully')
|
||
} catch (error) {
|
||
console.error('❌ Failed to clear queue:', error.message)
|
||
}
|
||
} else {
|
||
console.log('❌ Operation cancelled')
|
||
}
|
||
rl.close()
|
||
redis.quit()
|
||
})
|
||
}
|
||
|
||
// ========================================
|
||
// CLI 处理
|
||
// ========================================
|
||
|
||
async function main() {
|
||
const command = process.argv[2] || 'info'
|
||
|
||
console.log('🔧 Billing Events Test Tool\n')
|
||
|
||
try {
|
||
switch (command) {
|
||
case 'publish':
|
||
await publishTestEvent()
|
||
break
|
||
|
||
case 'consume':
|
||
await consumeTestEvents()
|
||
break
|
||
|
||
case 'info':
|
||
await showQueueInfo()
|
||
break
|
||
|
||
case 'clear':
|
||
await clearQueue()
|
||
return // clearQueue 会自己关闭连接
|
||
|
||
default:
|
||
console.error(`❌ Unknown command: ${command}`)
|
||
console.log('\nAvailable commands:')
|
||
console.log(' publish - Publish a test event')
|
||
console.log(' consume - Consume events (test mode)')
|
||
console.log(' info - Show queue status')
|
||
console.log(' clear - Clear the queue (dangerous)')
|
||
process.exit(1)
|
||
}
|
||
|
||
await redis.quit()
|
||
} catch (error) {
|
||
console.error('💥 Fatal error:', error)
|
||
await redis.quit()
|
||
process.exit(1)
|
||
}
|
||
}
|
||
|
||
main()
|