Files
new-api/service/channel_affinity.go

916 lines
25 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"fmt"
"hash/fnv"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/QuantumNous/new-api/common"
"github.com/QuantumNous/new-api/dto"
"github.com/QuantumNous/new-api/pkg/cachex"
"github.com/QuantumNous/new-api/setting/operation_setting"
"github.com/QuantumNous/new-api/types"
"github.com/gin-gonic/gin"
"github.com/samber/hot"
"github.com/tidwall/gjson"
)
const (
ginKeyChannelAffinityCacheKey = "channel_affinity_cache_key"
ginKeyChannelAffinityTTLSeconds = "channel_affinity_ttl_seconds"
ginKeyChannelAffinityMeta = "channel_affinity_meta"
ginKeyChannelAffinityLogInfo = "channel_affinity_log_info"
ginKeyChannelAffinitySkipRetry = "channel_affinity_skip_retry_on_failure"
channelAffinityCacheNamespace = "new-api:channel_affinity:v1"
channelAffinityUsageCacheStatsNamespace = "new-api:channel_affinity_usage_cache_stats:v1"
)
var (
channelAffinityCacheOnce sync.Once
channelAffinityCache *cachex.HybridCache[int]
channelAffinityUsageCacheStatsOnce sync.Once
channelAffinityUsageCacheStatsCache *cachex.HybridCache[ChannelAffinityUsageCacheCounters]
channelAffinityRegexCache sync.Map // map[string]*regexp.Regexp
)
type channelAffinityMeta struct {
CacheKey string
TTLSeconds int
RuleName string
SkipRetry bool
ParamTemplate map[string]interface{}
KeySourceType string
KeySourceKey string
KeySourcePath string
KeyHint string
KeyFingerprint string
UsingGroup string
ModelName string
RequestPath string
}
type ChannelAffinityStatsContext struct {
RuleName string
UsingGroup string
KeyFingerprint string
TTLSeconds int64
}
const (
cacheTokenRateModeCachedOverPrompt = "cached_over_prompt"
cacheTokenRateModeCachedOverPromptPlusCached = "cached_over_prompt_plus_cached"
cacheTokenRateModeMixed = "mixed"
)
type ChannelAffinityCacheStats struct {
Enabled bool `json:"enabled"`
Total int `json:"total"`
Unknown int `json:"unknown"`
ByRuleName map[string]int `json:"by_rule_name"`
CacheCapacity int `json:"cache_capacity"`
CacheAlgo string `json:"cache_algo"`
}
func getChannelAffinityCache() *cachex.HybridCache[int] {
channelAffinityCacheOnce.Do(func() {
setting := operation_setting.GetChannelAffinitySetting()
capacity := setting.MaxEntries
if capacity <= 0 {
capacity = 100_000
}
defaultTTLSeconds := setting.DefaultTTLSeconds
if defaultTTLSeconds <= 0 {
defaultTTLSeconds = 3600
}
channelAffinityCache = cachex.NewHybridCache[int](cachex.HybridCacheConfig[int]{
Namespace: cachex.Namespace(channelAffinityCacheNamespace),
Redis: common.RDB,
RedisEnabled: func() bool {
return common.RedisEnabled && common.RDB != nil
},
RedisCodec: cachex.IntCodec{},
Memory: func() *hot.HotCache[string, int] {
return hot.NewHotCache[string, int](hot.LRU, capacity).
WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
WithJanitor().
Build()
},
})
})
return channelAffinityCache
}
func GetChannelAffinityCacheStats() ChannelAffinityCacheStats {
setting := operation_setting.GetChannelAffinitySetting()
if setting == nil {
return ChannelAffinityCacheStats{
Enabled: false,
Total: 0,
Unknown: 0,
ByRuleName: map[string]int{},
}
}
cache := getChannelAffinityCache()
mainCap, _ := cache.Capacity()
mainAlgo, _ := cache.Algorithm()
rules := setting.Rules
ruleByName := make(map[string]operation_setting.ChannelAffinityRule, len(rules))
for _, r := range rules {
name := strings.TrimSpace(r.Name)
if name == "" {
continue
}
if !r.IncludeRuleName {
continue
}
ruleByName[name] = r
}
byRuleName := make(map[string]int, len(ruleByName))
for name := range ruleByName {
byRuleName[name] = 0
}
keys, err := cache.Keys()
if err != nil {
common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
keys = nil
}
total := len(keys)
unknown := 0
for _, k := range keys {
prefix := channelAffinityCacheNamespace + ":"
if !strings.HasPrefix(k, prefix) {
unknown++
continue
}
rest := strings.TrimPrefix(k, prefix)
parts := strings.Split(rest, ":")
if len(parts) < 2 {
unknown++
continue
}
ruleName := parts[0]
rule, ok := ruleByName[ruleName]
if !ok {
unknown++
continue
}
if rule.IncludeUsingGroup {
if len(parts) < 3 {
unknown++
continue
}
}
byRuleName[ruleName]++
}
return ChannelAffinityCacheStats{
Enabled: setting.Enabled,
Total: total,
Unknown: unknown,
ByRuleName: byRuleName,
CacheCapacity: mainCap,
CacheAlgo: mainAlgo,
}
}
func ClearChannelAffinityCacheAll() int {
cache := getChannelAffinityCache()
keys, err := cache.Keys()
if err != nil {
common.SysError(fmt.Sprintf("channel affinity cache list keys failed: err=%v", err))
keys = nil
}
if len(keys) > 0 {
if _, err := cache.DeleteMany(keys); err != nil {
common.SysError(fmt.Sprintf("channel affinity cache delete many failed: err=%v", err))
}
}
return len(keys)
}
func ClearChannelAffinityCacheByRuleName(ruleName string) (int, error) {
ruleName = strings.TrimSpace(ruleName)
if ruleName == "" {
return 0, fmt.Errorf("rule_name 不能为空")
}
setting := operation_setting.GetChannelAffinitySetting()
if setting == nil {
return 0, fmt.Errorf("channel_affinity_setting 未初始化")
}
var matchedRule *operation_setting.ChannelAffinityRule
for i := range setting.Rules {
r := &setting.Rules[i]
if strings.TrimSpace(r.Name) != ruleName {
continue
}
matchedRule = r
break
}
if matchedRule == nil {
return 0, fmt.Errorf("未知规则名称")
}
if !matchedRule.IncludeRuleName {
return 0, fmt.Errorf("该规则未启用 include_rule_name无法按规则清空缓存")
}
cache := getChannelAffinityCache()
deleted, err := cache.DeleteByPrefix(ruleName)
if err != nil {
return 0, err
}
return deleted, nil
}
func matchAnyRegexCached(patterns []string, s string) bool {
if len(patterns) == 0 || s == "" {
return false
}
for _, pattern := range patterns {
if pattern == "" {
continue
}
re, ok := channelAffinityRegexCache.Load(pattern)
if !ok {
compiled, err := regexp.Compile(pattern)
if err != nil {
continue
}
re = compiled
channelAffinityRegexCache.Store(pattern, re)
}
if re.(*regexp.Regexp).MatchString(s) {
return true
}
}
return false
}
func matchAnyIncludeFold(patterns []string, s string) bool {
if len(patterns) == 0 || s == "" {
return false
}
sLower := strings.ToLower(s)
for _, p := range patterns {
p = strings.TrimSpace(p)
if p == "" {
continue
}
if strings.Contains(sLower, strings.ToLower(p)) {
return true
}
}
return false
}
func extractChannelAffinityValue(c *gin.Context, src operation_setting.ChannelAffinityKeySource) string {
switch src.Type {
case "context_int":
if src.Key == "" {
return ""
}
v := c.GetInt(src.Key)
if v <= 0 {
return ""
}
return strconv.Itoa(v)
case "context_string":
if src.Key == "" {
return ""
}
return strings.TrimSpace(c.GetString(src.Key))
case "gjson":
if src.Path == "" {
return ""
}
storage, err := common.GetBodyStorage(c)
if err != nil {
return ""
}
body, err := storage.Bytes()
if err != nil || len(body) == 0 {
return ""
}
res := gjson.GetBytes(body, src.Path)
if !res.Exists() {
return ""
}
switch res.Type {
case gjson.String, gjson.Number, gjson.True, gjson.False:
return strings.TrimSpace(res.String())
default:
return strings.TrimSpace(res.Raw)
}
default:
return ""
}
}
func buildChannelAffinityCacheKeySuffix(rule operation_setting.ChannelAffinityRule, usingGroup string, affinityValue string) string {
parts := make([]string, 0, 3)
if rule.IncludeRuleName && rule.Name != "" {
parts = append(parts, rule.Name)
}
if rule.IncludeUsingGroup && usingGroup != "" {
parts = append(parts, usingGroup)
}
parts = append(parts, affinityValue)
return strings.Join(parts, ":")
}
func setChannelAffinityContext(c *gin.Context, meta channelAffinityMeta) {
c.Set(ginKeyChannelAffinityCacheKey, meta.CacheKey)
c.Set(ginKeyChannelAffinityTTLSeconds, meta.TTLSeconds)
c.Set(ginKeyChannelAffinityMeta, meta)
}
func getChannelAffinityContext(c *gin.Context) (string, int, bool) {
keyAny, ok := c.Get(ginKeyChannelAffinityCacheKey)
if !ok {
return "", 0, false
}
key, ok := keyAny.(string)
if !ok || key == "" {
return "", 0, false
}
ttlAny, ok := c.Get(ginKeyChannelAffinityTTLSeconds)
if !ok {
return key, 0, true
}
ttlSeconds, _ := ttlAny.(int)
return key, ttlSeconds, true
}
func getChannelAffinityMeta(c *gin.Context) (channelAffinityMeta, bool) {
anyMeta, ok := c.Get(ginKeyChannelAffinityMeta)
if !ok {
return channelAffinityMeta{}, false
}
meta, ok := anyMeta.(channelAffinityMeta)
if !ok {
return channelAffinityMeta{}, false
}
return meta, true
}
func GetChannelAffinityStatsContext(c *gin.Context) (ChannelAffinityStatsContext, bool) {
if c == nil {
return ChannelAffinityStatsContext{}, false
}
meta, ok := getChannelAffinityMeta(c)
if !ok {
return ChannelAffinityStatsContext{}, false
}
ruleName := strings.TrimSpace(meta.RuleName)
keyFp := strings.TrimSpace(meta.KeyFingerprint)
usingGroup := strings.TrimSpace(meta.UsingGroup)
if ruleName == "" || keyFp == "" {
return ChannelAffinityStatsContext{}, false
}
ttlSeconds := int64(meta.TTLSeconds)
if ttlSeconds <= 0 {
return ChannelAffinityStatsContext{}, false
}
return ChannelAffinityStatsContext{
RuleName: ruleName,
UsingGroup: usingGroup,
KeyFingerprint: keyFp,
TTLSeconds: ttlSeconds,
}, true
}
func affinityFingerprint(s string) string {
if s == "" {
return ""
}
hex := common.Sha1([]byte(s))
if len(hex) >= 8 {
return hex[:8]
}
return hex
}
func buildChannelAffinityKeyHint(s string) string {
s = strings.TrimSpace(s)
if s == "" {
return ""
}
s = strings.ReplaceAll(s, "\n", " ")
s = strings.ReplaceAll(s, "\r", " ")
if len(s) <= 12 {
return s
}
return s[:4] + "..." + s[len(s)-4:]
}
func cloneStringAnyMap(src map[string]interface{}) map[string]interface{} {
if len(src) == 0 {
return map[string]interface{}{}
}
dst := make(map[string]interface{}, len(src))
for k, v := range src {
dst[k] = v
}
return dst
}
func mergeChannelOverride(base map[string]interface{}, tpl map[string]interface{}) map[string]interface{} {
if len(base) == 0 && len(tpl) == 0 {
return map[string]interface{}{}
}
if len(tpl) == 0 {
return base
}
out := cloneStringAnyMap(base)
for k, v := range tpl {
out[k] = v
}
return out
}
func appendChannelAffinityTemplateAdminInfo(c *gin.Context, meta channelAffinityMeta) {
if c == nil {
return
}
if len(meta.ParamTemplate) == 0 {
return
}
templateInfo := map[string]interface{}{
"applied": true,
"rule_name": meta.RuleName,
"param_override_keys": len(meta.ParamTemplate),
}
if anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo); ok {
if info, ok := anyInfo.(map[string]interface{}); ok {
info["override_template"] = templateInfo
c.Set(ginKeyChannelAffinityLogInfo, info)
return
}
}
c.Set(ginKeyChannelAffinityLogInfo, map[string]interface{}{
"reason": meta.RuleName,
"rule_name": meta.RuleName,
"using_group": meta.UsingGroup,
"model": meta.ModelName,
"request_path": meta.RequestPath,
"key_source": meta.KeySourceType,
"key_key": meta.KeySourceKey,
"key_path": meta.KeySourcePath,
"key_hint": meta.KeyHint,
"key_fp": meta.KeyFingerprint,
"override_template": templateInfo,
})
}
// ApplyChannelAffinityOverrideTemplate merges per-rule channel override templates onto the selected channel override config.
func ApplyChannelAffinityOverrideTemplate(c *gin.Context, paramOverride map[string]interface{}) (map[string]interface{}, bool) {
if c == nil {
return paramOverride, false
}
meta, ok := getChannelAffinityMeta(c)
if !ok {
return paramOverride, false
}
if len(meta.ParamTemplate) == 0 {
return paramOverride, false
}
mergedParam := mergeChannelOverride(paramOverride, meta.ParamTemplate)
appendChannelAffinityTemplateAdminInfo(c, meta)
return mergedParam, true
}
func GetPreferredChannelByAffinity(c *gin.Context, modelName string, usingGroup string) (int, bool) {
setting := operation_setting.GetChannelAffinitySetting()
if setting == nil || !setting.Enabled {
return 0, false
}
path := ""
if c != nil && c.Request != nil && c.Request.URL != nil {
path = c.Request.URL.Path
}
userAgent := ""
if c != nil && c.Request != nil {
userAgent = c.Request.UserAgent()
}
for _, rule := range setting.Rules {
if !matchAnyRegexCached(rule.ModelRegex, modelName) {
continue
}
if len(rule.PathRegex) > 0 && !matchAnyRegexCached(rule.PathRegex, path) {
continue
}
if len(rule.UserAgentInclude) > 0 && !matchAnyIncludeFold(rule.UserAgentInclude, userAgent) {
continue
}
var affinityValue string
var usedSource operation_setting.ChannelAffinityKeySource
for _, src := range rule.KeySources {
affinityValue = extractChannelAffinityValue(c, src)
if affinityValue != "" {
usedSource = src
break
}
}
if affinityValue == "" {
continue
}
if rule.ValueRegex != "" && !matchAnyRegexCached([]string{rule.ValueRegex}, affinityValue) {
continue
}
ttlSeconds := rule.TTLSeconds
if ttlSeconds <= 0 {
ttlSeconds = setting.DefaultTTLSeconds
}
cacheKeySuffix := buildChannelAffinityCacheKeySuffix(rule, usingGroup, affinityValue)
cacheKeyFull := channelAffinityCacheNamespace + ":" + cacheKeySuffix
setChannelAffinityContext(c, channelAffinityMeta{
CacheKey: cacheKeyFull,
TTLSeconds: ttlSeconds,
RuleName: rule.Name,
SkipRetry: rule.SkipRetryOnFailure,
ParamTemplate: cloneStringAnyMap(rule.ParamOverrideTemplate),
KeySourceType: strings.TrimSpace(usedSource.Type),
KeySourceKey: strings.TrimSpace(usedSource.Key),
KeySourcePath: strings.TrimSpace(usedSource.Path),
KeyHint: buildChannelAffinityKeyHint(affinityValue),
KeyFingerprint: affinityFingerprint(affinityValue),
UsingGroup: usingGroup,
ModelName: modelName,
RequestPath: path,
})
cache := getChannelAffinityCache()
channelID, found, err := cache.Get(cacheKeySuffix)
if err != nil {
common.SysError(fmt.Sprintf("channel affinity cache get failed: key=%s, err=%v", cacheKeyFull, err))
return 0, false
}
if found {
return channelID, true
}
return 0, false
}
return 0, false
}
func ShouldSkipRetryAfterChannelAffinityFailure(c *gin.Context) bool {
if c == nil {
return false
}
v, ok := c.Get(ginKeyChannelAffinitySkipRetry)
if !ok {
return false
}
b, ok := v.(bool)
if !ok {
return false
}
return b
}
func MarkChannelAffinityUsed(c *gin.Context, selectedGroup string, channelID int) {
if c == nil || channelID <= 0 {
return
}
meta, ok := getChannelAffinityMeta(c)
if !ok {
return
}
c.Set(ginKeyChannelAffinitySkipRetry, meta.SkipRetry)
info := map[string]interface{}{
"reason": meta.RuleName,
"rule_name": meta.RuleName,
"using_group": meta.UsingGroup,
"selected_group": selectedGroup,
"model": meta.ModelName,
"request_path": meta.RequestPath,
"channel_id": channelID,
"key_source": meta.KeySourceType,
"key_key": meta.KeySourceKey,
"key_path": meta.KeySourcePath,
"key_hint": meta.KeyHint,
"key_fp": meta.KeyFingerprint,
}
c.Set(ginKeyChannelAffinityLogInfo, info)
}
func AppendChannelAffinityAdminInfo(c *gin.Context, adminInfo map[string]interface{}) {
if c == nil || adminInfo == nil {
return
}
anyInfo, ok := c.Get(ginKeyChannelAffinityLogInfo)
if !ok || anyInfo == nil {
return
}
adminInfo["channel_affinity"] = anyInfo
}
func RecordChannelAffinity(c *gin.Context, channelID int) {
if channelID <= 0 {
return
}
setting := operation_setting.GetChannelAffinitySetting()
if setting == nil || !setting.Enabled {
return
}
if setting.SwitchOnSuccess && c != nil {
if successChannelID := c.GetInt("channel_id"); successChannelID > 0 {
channelID = successChannelID
}
}
cacheKey, ttlSeconds, ok := getChannelAffinityContext(c)
if !ok {
return
}
if ttlSeconds <= 0 {
ttlSeconds = setting.DefaultTTLSeconds
}
if ttlSeconds <= 0 {
ttlSeconds = 3600
}
cache := getChannelAffinityCache()
if err := cache.SetWithTTL(cacheKey, channelID, time.Duration(ttlSeconds)*time.Second); err != nil {
common.SysError(fmt.Sprintf("channel affinity cache set failed: key=%s, err=%v", cacheKey, err))
}
}
type ChannelAffinityUsageCacheStats struct {
RuleName string `json:"rule_name"`
UsingGroup string `json:"using_group"`
KeyFingerprint string `json:"key_fp"`
CachedTokenRateMode string `json:"cached_token_rate_mode"`
Hit int64 `json:"hit"`
Total int64 `json:"total"`
WindowSeconds int64 `json:"window_seconds"`
PromptTokens int64 `json:"prompt_tokens"`
CompletionTokens int64 `json:"completion_tokens"`
TotalTokens int64 `json:"total_tokens"`
CachedTokens int64 `json:"cached_tokens"`
PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
LastSeenAt int64 `json:"last_seen_at"`
}
type ChannelAffinityUsageCacheCounters struct {
CachedTokenRateMode string `json:"cached_token_rate_mode"`
Hit int64 `json:"hit"`
Total int64 `json:"total"`
WindowSeconds int64 `json:"window_seconds"`
PromptTokens int64 `json:"prompt_tokens"`
CompletionTokens int64 `json:"completion_tokens"`
TotalTokens int64 `json:"total_tokens"`
CachedTokens int64 `json:"cached_tokens"`
PromptCacheHitTokens int64 `json:"prompt_cache_hit_tokens"`
LastSeenAt int64 `json:"last_seen_at"`
}
var channelAffinityUsageCacheStatsLocks [64]sync.Mutex
// ObserveChannelAffinityUsageCacheByRelayFormat records usage cache stats with a stable rate mode derived from relay format.
func ObserveChannelAffinityUsageCacheByRelayFormat(c *gin.Context, usage *dto.Usage, relayFormat types.RelayFormat) {
ObserveChannelAffinityUsageCacheFromContext(c, usage, cachedTokenRateModeByRelayFormat(relayFormat))
}
func ObserveChannelAffinityUsageCacheFromContext(c *gin.Context, usage *dto.Usage, cachedTokenRateMode string) {
statsCtx, ok := GetChannelAffinityStatsContext(c)
if !ok {
return
}
observeChannelAffinityUsageCache(statsCtx, usage, cachedTokenRateMode)
}
func GetChannelAffinityUsageCacheStats(ruleName, usingGroup, keyFp string) ChannelAffinityUsageCacheStats {
ruleName = strings.TrimSpace(ruleName)
usingGroup = strings.TrimSpace(usingGroup)
keyFp = strings.TrimSpace(keyFp)
entryKey := channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp)
if entryKey == "" {
return ChannelAffinityUsageCacheStats{
RuleName: ruleName,
UsingGroup: usingGroup,
KeyFingerprint: keyFp,
}
}
cache := getChannelAffinityUsageCacheStatsCache()
v, found, err := cache.Get(entryKey)
if err != nil || !found {
return ChannelAffinityUsageCacheStats{
RuleName: ruleName,
UsingGroup: usingGroup,
KeyFingerprint: keyFp,
}
}
return ChannelAffinityUsageCacheStats{
CachedTokenRateMode: v.CachedTokenRateMode,
RuleName: ruleName,
UsingGroup: usingGroup,
KeyFingerprint: keyFp,
Hit: v.Hit,
Total: v.Total,
WindowSeconds: v.WindowSeconds,
PromptTokens: v.PromptTokens,
CompletionTokens: v.CompletionTokens,
TotalTokens: v.TotalTokens,
CachedTokens: v.CachedTokens,
PromptCacheHitTokens: v.PromptCacheHitTokens,
LastSeenAt: v.LastSeenAt,
}
}
func observeChannelAffinityUsageCache(statsCtx ChannelAffinityStatsContext, usage *dto.Usage, cachedTokenRateMode string) {
entryKey := channelAffinityUsageCacheEntryKey(statsCtx.RuleName, statsCtx.UsingGroup, statsCtx.KeyFingerprint)
if entryKey == "" {
return
}
windowSeconds := statsCtx.TTLSeconds
if windowSeconds <= 0 {
return
}
cache := getChannelAffinityUsageCacheStatsCache()
ttl := time.Duration(windowSeconds) * time.Second
lock := channelAffinityUsageCacheStatsLock(entryKey)
lock.Lock()
defer lock.Unlock()
prev, found, err := cache.Get(entryKey)
if err != nil {
return
}
next := prev
if !found {
next = ChannelAffinityUsageCacheCounters{}
}
currentMode := normalizeCachedTokenRateMode(cachedTokenRateMode)
if currentMode != "" {
if next.CachedTokenRateMode == "" {
next.CachedTokenRateMode = currentMode
} else if next.CachedTokenRateMode != currentMode && next.CachedTokenRateMode != cacheTokenRateModeMixed {
next.CachedTokenRateMode = cacheTokenRateModeMixed
}
}
next.Total++
hit, cachedTokens, promptCacheHitTokens := usageCacheSignals(usage)
if hit {
next.Hit++
}
next.WindowSeconds = windowSeconds
next.LastSeenAt = time.Now().Unix()
next.CachedTokens += cachedTokens
next.PromptCacheHitTokens += promptCacheHitTokens
next.PromptTokens += int64(usagePromptTokens(usage))
next.CompletionTokens += int64(usageCompletionTokens(usage))
next.TotalTokens += int64(usageTotalTokens(usage))
_ = cache.SetWithTTL(entryKey, next, ttl)
}
func normalizeCachedTokenRateMode(mode string) string {
switch mode {
case cacheTokenRateModeCachedOverPrompt:
return cacheTokenRateModeCachedOverPrompt
case cacheTokenRateModeCachedOverPromptPlusCached:
return cacheTokenRateModeCachedOverPromptPlusCached
case cacheTokenRateModeMixed:
return cacheTokenRateModeMixed
default:
return ""
}
}
func cachedTokenRateModeByRelayFormat(relayFormat types.RelayFormat) string {
switch relayFormat {
case types.RelayFormatOpenAI, types.RelayFormatOpenAIResponses, types.RelayFormatOpenAIResponsesCompaction:
return cacheTokenRateModeCachedOverPrompt
case types.RelayFormatClaude:
return cacheTokenRateModeCachedOverPromptPlusCached
default:
return ""
}
}
func channelAffinityUsageCacheEntryKey(ruleName, usingGroup, keyFp string) string {
ruleName = strings.TrimSpace(ruleName)
usingGroup = strings.TrimSpace(usingGroup)
keyFp = strings.TrimSpace(keyFp)
if ruleName == "" || keyFp == "" {
return ""
}
return ruleName + "\n" + usingGroup + "\n" + keyFp
}
func usageCacheSignals(usage *dto.Usage) (hit bool, cachedTokens int64, promptCacheHitTokens int64) {
if usage == nil {
return false, 0, 0
}
cached := int64(0)
if usage.PromptTokensDetails.CachedTokens > 0 {
cached = int64(usage.PromptTokensDetails.CachedTokens)
} else if usage.InputTokensDetails != nil && usage.InputTokensDetails.CachedTokens > 0 {
cached = int64(usage.InputTokensDetails.CachedTokens)
}
pcht := int64(0)
if usage.PromptCacheHitTokens > 0 {
pcht = int64(usage.PromptCacheHitTokens)
}
return cached > 0 || pcht > 0, cached, pcht
}
func usagePromptTokens(usage *dto.Usage) int {
if usage == nil {
return 0
}
if usage.PromptTokens > 0 {
return usage.PromptTokens
}
return usage.InputTokens
}
func usageCompletionTokens(usage *dto.Usage) int {
if usage == nil {
return 0
}
if usage.CompletionTokens > 0 {
return usage.CompletionTokens
}
return usage.OutputTokens
}
func usageTotalTokens(usage *dto.Usage) int {
if usage == nil {
return 0
}
if usage.TotalTokens > 0 {
return usage.TotalTokens
}
pt := usagePromptTokens(usage)
ct := usageCompletionTokens(usage)
if pt > 0 || ct > 0 {
return pt + ct
}
return 0
}
func getChannelAffinityUsageCacheStatsCache() *cachex.HybridCache[ChannelAffinityUsageCacheCounters] {
channelAffinityUsageCacheStatsOnce.Do(func() {
setting := operation_setting.GetChannelAffinitySetting()
capacity := 100_000
defaultTTLSeconds := 3600
if setting != nil {
if setting.MaxEntries > 0 {
capacity = setting.MaxEntries
}
if setting.DefaultTTLSeconds > 0 {
defaultTTLSeconds = setting.DefaultTTLSeconds
}
}
channelAffinityUsageCacheStatsCache = cachex.NewHybridCache[ChannelAffinityUsageCacheCounters](cachex.HybridCacheConfig[ChannelAffinityUsageCacheCounters]{
Namespace: cachex.Namespace(channelAffinityUsageCacheStatsNamespace),
Redis: common.RDB,
RedisEnabled: func() bool {
return common.RedisEnabled && common.RDB != nil
},
RedisCodec: cachex.JSONCodec[ChannelAffinityUsageCacheCounters]{},
Memory: func() *hot.HotCache[string, ChannelAffinityUsageCacheCounters] {
return hot.NewHotCache[string, ChannelAffinityUsageCacheCounters](hot.LRU, capacity).
WithTTL(time.Duration(defaultTTLSeconds) * time.Second).
WithJanitor().
Build()
},
})
})
return channelAffinityUsageCacheStatsCache
}
func channelAffinityUsageCacheStatsLock(key string) *sync.Mutex {
h := fnv.New32a()
_, _ = h.Write([]byte(key))
idx := h.Sum32() % uint32(len(channelAffinityUsageCacheStatsLocks))
return &channelAffinityUsageCacheStatsLocks[idx]
}