feat: 大规模性能优化 - Redis Pipeline 批量操作、索引系统、连接池优化

This commit is contained in:
SunSeekerX
2025-12-31 02:08:47 +08:00
parent a345812cd7
commit 584fa8c9c1
68 changed files with 6541 additions and 4536 deletions

View File

@@ -1,9 +1,65 @@
const redis = require('../models/redis')
const apiKeyService = require('./apiKeyService')
const CostCalculator = require('../utils/costCalculator')
const logger = require('../utils/logger')
// HMGET 需要的字段
const USAGE_FIELDS = [
'totalInputTokens',
'inputTokens',
'totalOutputTokens',
'outputTokens',
'totalCacheCreateTokens',
'cacheCreateTokens',
'totalCacheReadTokens',
'cacheReadTokens'
]
class CostInitService {
/**
* 带并发限制的并行执行
*/
async parallelLimit(items, fn, concurrency = 20) {
let index = 0
const results = []
async function worker() {
while (index < items.length) {
const currentIndex = index++
try {
results[currentIndex] = await fn(items[currentIndex], currentIndex)
} catch (error) {
results[currentIndex] = { error }
}
}
}
await Promise.all(Array(Math.min(concurrency, items.length)).fill().map(worker))
return results
}
/**
* 使用 SCAN 获取匹配的 keys带去重
*/
async scanKeysWithDedup(client, pattern, count = 500) {
const seen = new Set()
const allKeys = []
let cursor = '0'
do {
const [newCursor, keys] = await client.scan(cursor, 'MATCH', pattern, 'COUNT', count)
cursor = newCursor
for (const key of keys) {
if (!seen.has(key)) {
seen.add(key)
allKeys.push(key)
}
}
} while (cursor !== '0')
return allKeys
}
/**
* 初始化所有API Key的费用数据
* 扫描历史使用记录并计算费用
@@ -12,25 +68,55 @@ class CostInitService {
try {
logger.info('💰 Starting cost initialization for all API Keys...')
const apiKeys = await apiKeyService.getAllApiKeys()
// 用 scanApiKeyIds 获取 ID然后过滤已删除的
const allKeyIds = await redis.scanApiKeyIds()
const client = redis.getClientSafe()
// 批量检查 isDeleted 状态,过滤已删除的 key
const FILTER_BATCH = 100
const apiKeyIds = []
for (let i = 0; i < allKeyIds.length; i += FILTER_BATCH) {
const batch = allKeyIds.slice(i, i + FILTER_BATCH)
const pipeline = client.pipeline()
for (const keyId of batch) {
pipeline.hget(`apikey:${keyId}`, 'isDeleted')
}
const results = await pipeline.exec()
for (let j = 0; j < results.length; j++) {
const [err, isDeleted] = results[j]
if (!err && isDeleted !== 'true') {
apiKeyIds.push(batch[j])
}
}
}
logger.info(`💰 Found ${apiKeyIds.length} active API Keys to process (filtered ${allKeyIds.length - apiKeyIds.length} deleted)`)
let processedCount = 0
let errorCount = 0
for (const apiKey of apiKeys) {
try {
await this.initializeApiKeyCosts(apiKey.id, client)
processedCount++
// 优化6: 并行处理 + 并发限制
await this.parallelLimit(
apiKeyIds,
async (apiKeyId) => {
try {
await this.initializeApiKeyCosts(apiKeyId, client)
processedCount++
if (processedCount % 10 === 0) {
logger.info(`💰 Processed ${processedCount} API Keys...`)
if (processedCount % 100 === 0) {
logger.info(`💰 Processed ${processedCount}/${apiKeyIds.length} API Keys...`)
}
} catch (error) {
errorCount++
logger.error(`❌ Failed to initialize costs for API Key ${apiKeyId}:`, error)
}
} catch (error) {
errorCount++
logger.error(`❌ Failed to initialize costs for API Key ${apiKey.id}:`, error)
}
}
},
20 // 并发数
)
logger.success(
`💰 Cost initialization completed! Processed: ${processedCount}, Errors: ${errorCount}`
@@ -46,32 +132,60 @@ class CostInitService {
* 初始化单个API Key的费用数据
*/
async initializeApiKeyCosts(apiKeyId, client) {
// 获取所有时间的模型使用统计
const modelKeys = await client.keys(`usage:${apiKeyId}:model:*:*:*`)
// 优化4: 使用 SCAN 获取 keys带去重
const modelKeys = await this.scanKeysWithDedup(client, `usage:${apiKeyId}:model:*:*:*`)
if (modelKeys.length === 0) {
return
}
// 优化5: 使用 Pipeline + HMGET 批量获取数据
const BATCH_SIZE = 100
const allData = []
for (let i = 0; i < modelKeys.length; i += BATCH_SIZE) {
const batch = modelKeys.slice(i, i + BATCH_SIZE)
const pipeline = client.pipeline()
for (const key of batch) {
pipeline.hmget(key, ...USAGE_FIELDS)
}
const results = await pipeline.exec()
for (let j = 0; j < results.length; j++) {
const [err, values] = results[j]
if (err) continue
// 将数组转换为对象
const data = {}
let hasData = false
for (let k = 0; k < USAGE_FIELDS.length; k++) {
if (values[k] !== null) {
data[USAGE_FIELDS[k]] = values[k]
hasData = true
}
}
if (hasData) {
allData.push({ key: batch[j], data })
}
}
}
// 按日期分组统计
const dailyCosts = new Map() // date -> cost
const monthlyCosts = new Map() // month -> cost
const hourlyCosts = new Map() // hour -> cost
const dailyCosts = new Map()
const monthlyCosts = new Map()
const hourlyCosts = new Map()
for (const key of modelKeys) {
// 解析key格式: usage:{keyId}:model:{period}:{model}:{date}
for (const { key, data } of allData) {
const match = key.match(
/usage:(.+):model:(daily|monthly|hourly):(.+):(\d{4}-\d{2}(?:-\d{2})?(?::\d{2})?)$/
)
if (!match) {
continue
}
if (!match) continue
const [, , period, model, dateStr] = match
// 获取使用数据
const data = await client.hgetall(key)
if (!data || Object.keys(data).length === 0) {
continue
}
// 计算费用
const usage = {
input_tokens: parseInt(data.totalInputTokens) || parseInt(data.inputTokens) || 0,
output_tokens: parseInt(data.totalOutputTokens) || parseInt(data.outputTokens) || 0,
@@ -84,47 +198,34 @@ class CostInitService {
const costResult = CostCalculator.calculateCost(usage, model)
const cost = costResult.costs.total
// 根据period分组累加费用
if (period === 'daily') {
const currentCost = dailyCosts.get(dateStr) || 0
dailyCosts.set(dateStr, currentCost + cost)
dailyCosts.set(dateStr, (dailyCosts.get(dateStr) || 0) + cost)
} else if (period === 'monthly') {
const currentCost = monthlyCosts.get(dateStr) || 0
monthlyCosts.set(dateStr, currentCost + cost)
monthlyCosts.set(dateStr, (monthlyCosts.get(dateStr) || 0) + cost)
} else if (period === 'hourly') {
const currentCost = hourlyCosts.get(dateStr) || 0
hourlyCosts.set(dateStr, currentCost + cost)
hourlyCosts.set(dateStr, (hourlyCosts.get(dateStr) || 0) + cost)
}
}
// 将计算出的费用写入Redis
const promises = []
// 使用 SET NX EX 只补缺失的键,不覆盖已存在的
const pipeline = client.pipeline()
// 写入每日费用
// 写入每日费用(只补缺失)
for (const [date, cost] of dailyCosts) {
const key = `usage:cost:daily:${apiKeyId}:${date}`
promises.push(
client.set(key, cost.toString()),
client.expire(key, 86400 * 30) // 30天过期
)
pipeline.set(key, cost.toString(), 'EX', 86400 * 30, 'NX')
}
// 写入每月费用
// 写入每月费用(只补缺失)
for (const [month, cost] of monthlyCosts) {
const key = `usage:cost:monthly:${apiKeyId}:${month}`
promises.push(
client.set(key, cost.toString()),
client.expire(key, 86400 * 90) // 90天过期
)
pipeline.set(key, cost.toString(), 'EX', 86400 * 90, 'NX')
}
// 写入每小时费用
// 写入每小时费用(只补缺失)
for (const [hour, cost] of hourlyCosts) {
const key = `usage:cost:hourly:${apiKeyId}:${hour}`
promises.push(
client.set(key, cost.toString()),
client.expire(key, 86400 * 7) // 7天过期
)
pipeline.set(key, cost.toString(), 'EX', 86400 * 7, 'NX')
}
// 计算总费用
@@ -133,37 +234,25 @@ class CostInitService {
totalCost += cost
}
// 写入总费用 - 修复:只在总费用不存在时初始化,避免覆盖现有累计值
// 写入总费用(只补缺失)
if (totalCost > 0) {
const totalKey = `usage:cost:total:${apiKeyId}`
// 先检查总费用是否已存在
const existingTotal = await client.get(totalKey)
if (!existingTotal || parseFloat(existingTotal) === 0) {
// 仅在总费用不存在或为0时才初始化
promises.push(client.set(totalKey, totalCost.toString()))
pipeline.set(totalKey, totalCost.toString())
logger.info(`💰 Initialized total cost for API Key ${apiKeyId}: $${totalCost.toFixed(6)}`)
} else {
// 如果总费用已存在,保持不变,避免覆盖累计值
// 注意这个逻辑防止因每日费用键过期30天导致的错误覆盖
// 如果需要强制重新计算,请先手动删除 usage:cost:total:{keyId} 键
const existing = parseFloat(existingTotal)
const calculated = totalCost
if (calculated > existing * 1.1) {
// 如果计算值比现有值大 10% 以上,记录警告(可能是数据不一致)
if (totalCost > existing * 1.1) {
logger.warn(
`💰 Total cost mismatch for API Key ${apiKeyId}: existing=$${existing.toFixed(6)}, calculated=$${calculated.toFixed(6)} (from last 30 days). Keeping existing value to prevent data loss.`
)
} else {
logger.debug(
`💰 Skipping total cost initialization for API Key ${apiKeyId} - existing: $${existing.toFixed(6)}, calculated: $${calculated.toFixed(6)}`
`💰 Total cost mismatch for API Key ${apiKeyId}: existing=$${existing.toFixed(6)}, calculated=$${totalCost.toFixed(6)} (from last 30 days). Keeping existing value.`
)
}
}
}
await Promise.all(promises)
await pipeline.exec()
logger.debug(
`💰 Initialized costs for API Key ${apiKeyId}: Daily entries: ${dailyCosts.size}, Total cost: $${totalCost.toFixed(2)}`
@@ -172,41 +261,66 @@ class CostInitService {
/**
* 检查是否需要初始化费用数据
* 使用 SCAN 代替 KEYS正确处理 cursor
*/
async needsInitialization() {
try {
const client = redis.getClientSafe()
// 检查是否有任何费用数据
const costKeys = await client.keys('usage:cost:*')
// 正确循环 SCAN 检查是否有任何费用数据
let cursor = '0'
let hasCostData = false
// 如果没有费用数据,需要初始化
if (costKeys.length === 0) {
do {
const [newCursor, keys] = await client.scan(cursor, 'MATCH', 'usage:cost:*', 'COUNT', 100)
cursor = newCursor
if (keys.length > 0) {
hasCostData = true
break
}
} while (cursor !== '0')
if (!hasCostData) {
logger.info('💰 No cost data found, initialization needed')
return true
}
// 检查是否有使用数据但没有对应的费用数据
const sampleKeys = await client.keys('usage:*:model:daily:*:*')
if (sampleKeys.length > 10) {
// 抽样检查
const sampleSize = Math.min(10, sampleKeys.length)
for (let i = 0; i < sampleSize; i++) {
const usageKey = sampleKeys[Math.floor(Math.random() * sampleKeys.length)]
// 抽样检查使用数据是否有对应的费用数据
cursor = '0'
let samplesChecked = 0
const maxSamples = 10
do {
const [newCursor, usageKeys] = await client.scan(
cursor,
'MATCH',
'usage:*:model:daily:*:*',
'COUNT',
100
)
cursor = newCursor
for (const usageKey of usageKeys) {
if (samplesChecked >= maxSamples) break
const match = usageKey.match(/usage:(.+):model:daily:(.+):(\d{4}-\d{2}-\d{2})$/)
if (match) {
const [, keyId, , date] = match
const costKey = `usage:cost:daily:${keyId}:${date}`
const hasCost = await client.exists(costKey)
if (!hasCost) {
logger.info(
`💰 Found usage without cost data for key ${keyId} on ${date}, initialization needed`
)
return true
}
samplesChecked++
}
}
}
if (samplesChecked >= maxSamples) break
} while (cursor !== '0')
logger.info('💰 Cost data appears to be up to date')
return false