This commit is contained in:
SunSeekerX
2026-01-19 20:24:47 +08:00
parent 12fd5e1cb4
commit 76ecbe18a5
98 changed files with 8182 additions and 1896 deletions

View File

@@ -299,6 +299,96 @@ class RedisClient {
}
}
// 🔄 自动迁移 alltime 模型统计(启动时调用)
async migrateAlltimeModelStats() {
const migrationKey = 'system:migration:alltime_model_stats_v1'
const migrated = await this.client.get(migrationKey)
if (migrated) {
logger.debug('📊 Alltime model stats migration already completed')
return
}
logger.info('📊 Starting alltime model stats migration...')
const stats = { keys: 0, models: 0 }
try {
// 扫描所有月度模型统计数据并聚合到 alltime
// 格式: usage:{keyId}:model:monthly:{model}:{month}
let cursor = '0'
const aggregatedData = new Map() // keyId:model -> {inputTokens, outputTokens, ...}
do {
const [newCursor, keys] = await this.client.scan(
cursor,
'MATCH',
'usage:*:model:monthly:*:*',
'COUNT',
500
)
cursor = newCursor
for (const key of keys) {
// usage:{keyId}:model:monthly:{model}:{month}
const match = key.match(/^usage:([^:]+):model:monthly:(.+):(\d{4}-\d{2})$/)
if (match) {
const [, keyId, model] = match
const aggregateKey = `${keyId}:${model}`
// 获取该月的数据
const data = await this.client.hgetall(key)
if (data && Object.keys(data).length > 0) {
if (!aggregatedData.has(aggregateKey)) {
aggregatedData.set(aggregateKey, {
keyId,
model,
inputTokens: 0,
outputTokens: 0,
cacheCreateTokens: 0,
cacheReadTokens: 0,
requests: 0
})
}
const agg = aggregatedData.get(aggregateKey)
agg.inputTokens += parseInt(data.inputTokens) || 0
agg.outputTokens += parseInt(data.outputTokens) || 0
agg.cacheCreateTokens += parseInt(data.cacheCreateTokens) || 0
agg.cacheReadTokens += parseInt(data.cacheReadTokens) || 0
agg.requests += parseInt(data.requests) || 0
stats.keys++
}
}
}
} while (cursor !== '0')
// 写入聚合后的 alltime 数据
const pipeline = this.client.pipeline()
for (const [, agg] of aggregatedData) {
const alltimeKey = `usage:${agg.keyId}:model:alltime:${agg.model}`
pipeline.hset(alltimeKey, {
inputTokens: agg.inputTokens.toString(),
outputTokens: agg.outputTokens.toString(),
cacheCreateTokens: agg.cacheCreateTokens.toString(),
cacheReadTokens: agg.cacheReadTokens.toString(),
requests: agg.requests.toString()
})
stats.models++
}
if (stats.models > 0) {
await pipeline.exec()
}
// 标记迁移完成
await this.client.set(migrationKey, Date.now().toString())
logger.info(
`📊 Alltime model stats migration completed: scanned ${stats.keys} monthly keys, created ${stats.models} alltime keys`
)
} catch (error) {
logger.error('📊 Alltime model stats migration failed:', error)
}
}
async disconnect() {
if (this.client) {
await this.client.quit()
@@ -996,6 +1086,14 @@ class RedisClient {
pipeline.hincrby(keyModelMonthly, 'ephemeral5mTokens', ephemeral5mTokens)
pipeline.hincrby(keyModelMonthly, 'ephemeral1hTokens', ephemeral1hTokens)
// API Key级别的模型统计 - 所有时间(无 TTL
const keyModelAlltime = `usage:${keyId}:model:alltime:${normalizedModel}`
pipeline.hincrby(keyModelAlltime, 'inputTokens', finalInputTokens)
pipeline.hincrby(keyModelAlltime, 'outputTokens', finalOutputTokens)
pipeline.hincrby(keyModelAlltime, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(keyModelAlltime, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(keyModelAlltime, 'requests', 1)
// 小时级别统计
pipeline.hincrby(hourly, 'tokens', coreTokens)
pipeline.hincrby(hourly, 'inputTokens', finalInputTokens)
@@ -1040,9 +1138,9 @@ class RedisClient {
pipeline.expire(keyModelMonthly, 86400 * 365) // API Key模型每月统计1年过期
pipeline.expire(keyModelHourly, 86400 * 7) // API Key模型小时统计7天过期
// 系统级分钟统计的过期时间窗口时间的2倍
// 系统级分钟统计的过期时间窗口时间的2倍默认5分钟
const configLocal = require('../../config/config')
const { metricsWindow } = configLocal.system
const metricsWindow = configLocal.system?.metricsWindow || 5
pipeline.expire(systemMinuteKey, metricsWindow * 60 * 2)
// 添加索引(用于快速查询,避免 SCAN
@@ -1071,6 +1169,30 @@ class RedisClient {
pipeline.expire(`usage:keymodel:daily:index:${today}`, 86400 * 32)
pipeline.expire(`usage:keymodel:hourly:index:${currentHour}`, 86400 * 7)
// 全局预聚合统计
const globalDaily = `usage:global:daily:${today}`
const globalMonthly = `usage:global:monthly:${currentMonth}`
pipeline.hincrby('usage:global:total', 'requests', 1)
pipeline.hincrby('usage:global:total', 'inputTokens', finalInputTokens)
pipeline.hincrby('usage:global:total', 'outputTokens', finalOutputTokens)
pipeline.hincrby('usage:global:total', 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby('usage:global:total', 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby('usage:global:total', 'allTokens', totalTokens)
pipeline.hincrby(globalDaily, 'requests', 1)
pipeline.hincrby(globalDaily, 'inputTokens', finalInputTokens)
pipeline.hincrby(globalDaily, 'outputTokens', finalOutputTokens)
pipeline.hincrby(globalDaily, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(globalDaily, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(globalDaily, 'allTokens', totalTokens)
pipeline.hincrby(globalMonthly, 'requests', 1)
pipeline.hincrby(globalMonthly, 'inputTokens', finalInputTokens)
pipeline.hincrby(globalMonthly, 'outputTokens', finalOutputTokens)
pipeline.hincrby(globalMonthly, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(globalMonthly, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(globalMonthly, 'allTokens', totalTokens)
pipeline.expire(globalDaily, 86400 * 32)
pipeline.expire(globalMonthly, 86400 * 365)
// 执行Pipeline
await pipeline.exec()
}
@@ -4521,4 +4643,151 @@ redisClient.removeFromIndex = async function (indexKey, id) {
await client.srem(indexKey, id)
}
// ============================================
// 数据迁移相关
// ============================================
// 迁移全局统计数据(从 API Key 数据聚合)
redisClient.migrateGlobalStats = async function () {
const logger = require('../utils/logger')
logger.info('🔄 开始迁移全局统计数据...')
const keyIds = await this.scanApiKeyIds()
if (!keyIds || keyIds.length === 0) {
logger.info('📊 没有 API Key 数据需要迁移')
return { success: true, migrated: 0 }
}
const total = {
requests: 0,
inputTokens: 0,
outputTokens: 0,
cacheCreateTokens: 0,
cacheReadTokens: 0,
allTokens: 0
}
// 批量获取所有 usage 数据
const pipeline = this.client.pipeline()
keyIds.forEach((id) => pipeline.hgetall(`usage:${id}`))
const results = await pipeline.exec()
results.forEach(([err, usage]) => {
if (err || !usage) {
return
}
// 兼容新旧字段格式(带 total 前缀和不带的)
total.requests += parseInt(usage.totalRequests || usage.requests) || 0
total.inputTokens += parseInt(usage.totalInputTokens || usage.inputTokens) || 0
total.outputTokens += parseInt(usage.totalOutputTokens || usage.outputTokens) || 0
total.cacheCreateTokens +=
parseInt(usage.totalCacheCreateTokens || usage.cacheCreateTokens) || 0
total.cacheReadTokens += parseInt(usage.totalCacheReadTokens || usage.cacheReadTokens) || 0
total.allTokens += parseInt(usage.totalAllTokens || usage.allTokens || usage.totalTokens) || 0
})
// 写入全局统计
await this.client.hset('usage:global:total', total)
logger.success(
`✅ 迁移完成: ${keyIds.length} 个 API Key, ${total.requests} 请求, ${total.allTokens} tokens`
)
return { success: true, migrated: keyIds.length, total }
}
// 检查是否需要迁移
redisClient.needsGlobalStatsMigration = async function () {
const exists = await this.client.exists('usage:global:total')
return exists === 0
}
// 获取已迁移版本
redisClient.getMigratedVersion = async function () {
return (await this.client.get('system:migrated:version')) || '0.0.0'
}
// 设置已迁移版本
redisClient.setMigratedVersion = async function (version) {
await this.client.set('system:migrated:version', version)
}
// 获取全局统计(用于 dashboard 快速查询)
redisClient.getGlobalStats = async function () {
const stats = await this.client.hgetall('usage:global:total')
if (!stats || !stats.requests) {
return null
}
return {
requests: parseInt(stats.requests) || 0,
inputTokens: parseInt(stats.inputTokens) || 0,
outputTokens: parseInt(stats.outputTokens) || 0,
cacheCreateTokens: parseInt(stats.cacheCreateTokens) || 0,
cacheReadTokens: parseInt(stats.cacheReadTokens) || 0,
allTokens: parseInt(stats.allTokens) || 0
}
}
// 快速获取 API Key 计数(不拉全量数据)
redisClient.getApiKeyCount = async function () {
const keyIds = await this.scanApiKeyIds()
if (!keyIds || keyIds.length === 0) {
return { total: 0, active: 0 }
}
// 批量获取 isActive 字段
const pipeline = this.client.pipeline()
keyIds.forEach((id) => pipeline.hget(`apikey:${id}`, 'isActive'))
const results = await pipeline.exec()
let active = 0
results.forEach(([err, val]) => {
if (!err && (val === 'true' || val === true)) {
active++
}
})
return { total: keyIds.length, active }
}
// 清理过期的系统分钟统计数据(启动时调用)
redisClient.cleanupSystemMetrics = async function () {
const logger = require('../utils/logger')
logger.info('🧹 清理过期的系统分钟统计数据...')
const keys = await this.scanKeys('system:metrics:minute:*')
if (!keys || keys.length === 0) {
logger.info('📊 没有需要清理的系统分钟统计数据')
return { cleaned: 0 }
}
// 计算当前分钟时间戳和保留窗口
const config = require('../../config/config')
const metricsWindow = config.system?.metricsWindow || 5
const currentMinute = Math.floor(Date.now() / 60000)
const keepAfter = currentMinute - metricsWindow * 2 // 保留窗口的2倍
// 筛选需要删除的 key
const toDelete = keys.filter((key) => {
const match = key.match(/system:metrics:minute:(\d+)/)
if (!match) {
return false
}
const minute = parseInt(match[1])
return minute < keepAfter
})
if (toDelete.length === 0) {
logger.info('📊 没有过期的系统分钟统计数据')
return { cleaned: 0 }
}
// 分批删除
const batchSize = 1000
for (let i = 0; i < toDelete.length; i += batchSize) {
const batch = toDelete.slice(i, i + batchSize)
await this.client.del(...batch)
}
logger.success(`✅ 清理完成: 删除 ${toDelete.length} 个过期的系统分钟统计 key`)
return { cleaned: toDelete.length }
}
module.exports = redisClient