Files
new-api/pkg/cachex/hybrid_cache.go
Seefs d7d3a2f763 feat: channel affinity (#2669)
* feat: channel affinity

* feat: channel affinity -> model setting

* fix: channel affinity

* feat: channel affinity op

* feat: channel_type setting

* feat: clean

* feat: cache supports both memory and Redis.

* feat: Optimise ui/ux

* feat: Optimise ui/ux

* feat: Optimise codex usage ui/ux

* feat: Optimise ui/ux

* feat: Optimise ui/ux

* feat: Optimise ui/ux

* feat: If the affinitized channel fails and a retry succeeds on another channel, update the affinity to the successful channel
2026-01-26 19:57:41 +08:00

286 lines
6.0 KiB
Go

package cachex
import (
"context"
"errors"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/samber/hot"
)
const (
defaultRedisOpTimeout = 2 * time.Second
defaultRedisScanTimeout = 30 * time.Second
defaultRedisDelTimeout = 10 * time.Second
)
type HybridCacheConfig[V any] struct {
Namespace Namespace
// Redis is used when RedisEnabled returns true (or RedisEnabled is nil) and Redis is not nil.
Redis *redis.Client
RedisCodec ValueCodec[V]
RedisEnabled func() bool
// Memory builds a hot cache used when Redis is disabled. Keys stored in memory are fully namespaced.
Memory func() *hot.HotCache[string, V]
}
// HybridCache is a small helper that uses Redis when enabled, otherwise falls back to in-memory hot cache.
type HybridCache[V any] struct {
ns Namespace
redis *redis.Client
redisCodec ValueCodec[V]
redisEnabled func() bool
memOnce sync.Once
memInit func() *hot.HotCache[string, V]
mem *hot.HotCache[string, V]
}
func NewHybridCache[V any](cfg HybridCacheConfig[V]) *HybridCache[V] {
return &HybridCache[V]{
ns: cfg.Namespace,
redis: cfg.Redis,
redisCodec: cfg.RedisCodec,
redisEnabled: cfg.RedisEnabled,
memInit: cfg.Memory,
}
}
func (c *HybridCache[V]) FullKey(key string) string {
return c.ns.FullKey(key)
}
func (c *HybridCache[V]) redisOn() bool {
if c.redis == nil || c.redisCodec == nil {
return false
}
if c.redisEnabled == nil {
return true
}
return c.redisEnabled()
}
func (c *HybridCache[V]) memCache() *hot.HotCache[string, V] {
c.memOnce.Do(func() {
if c.memInit == nil {
c.mem = hot.NewHotCache[string, V](hot.LRU, 1).Build()
return
}
c.mem = c.memInit()
})
return c.mem
}
func (c *HybridCache[V]) Get(key string) (value V, found bool, err error) {
full := c.ns.FullKey(key)
if full == "" {
var zero V
return zero, false, nil
}
if c.redisOn() {
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
defer cancel()
raw, e := c.redis.Get(ctx, full).Result()
if e == nil {
v, decErr := c.redisCodec.Decode(raw)
if decErr != nil {
var zero V
return zero, false, decErr
}
return v, true, nil
}
if errors.Is(e, redis.Nil) {
var zero V
return zero, false, nil
}
var zero V
return zero, false, e
}
return c.memCache().Get(full)
}
func (c *HybridCache[V]) SetWithTTL(key string, v V, ttl time.Duration) error {
full := c.ns.FullKey(key)
if full == "" {
return nil
}
if c.redisOn() {
raw, err := c.redisCodec.Encode(v)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
defer cancel()
return c.redis.Set(ctx, full, raw, ttl).Err()
}
c.memCache().SetWithTTL(full, v, ttl)
return nil
}
// Keys returns keys with valid values. In Redis, it returns all matching keys.
func (c *HybridCache[V]) Keys() ([]string, error) {
if c.redisOn() {
return c.scanKeys(c.ns.MatchPattern())
}
return c.memCache().Keys(), nil
}
func (c *HybridCache[V]) scanKeys(match string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisScanTimeout)
defer cancel()
var cursor uint64
keys := make([]string, 0, 1024)
for {
k, next, err := c.redis.Scan(ctx, cursor, match, 1000).Result()
if err != nil {
return keys, err
}
keys = append(keys, k...)
cursor = next
if cursor == 0 {
break
}
}
return keys, nil
}
func (c *HybridCache[V]) Purge() error {
if c.redisOn() {
keys, err := c.scanKeys(c.ns.MatchPattern())
if err != nil {
return err
}
if len(keys) == 0 {
return nil
}
_, err = c.DeleteMany(keys)
return err
}
c.memCache().Purge()
return nil
}
func (c *HybridCache[V]) DeleteByPrefix(prefix string) (int, error) {
fullPrefix := c.ns.FullKey(prefix)
if fullPrefix == "" {
return 0, nil
}
if !strings.HasSuffix(fullPrefix, ":") {
fullPrefix += ":"
}
if c.redisOn() {
match := fullPrefix + "*"
keys, err := c.scanKeys(match)
if err != nil {
return 0, err
}
if len(keys) == 0 {
return 0, nil
}
res, err := c.DeleteMany(keys)
if err != nil {
return 0, err
}
deleted := 0
for _, ok := range res {
if ok {
deleted++
}
}
return deleted, nil
}
// In memory, we filter keys and bulk delete.
allKeys := c.memCache().Keys()
keys := make([]string, 0, 128)
for _, k := range allKeys {
if strings.HasPrefix(k, fullPrefix) {
keys = append(keys, k)
}
}
if len(keys) == 0 {
return 0, nil
}
res, _ := c.DeleteMany(keys)
deleted := 0
for _, ok := range res {
if ok {
deleted++
}
}
return deleted, nil
}
// DeleteMany accepts either fully namespaced keys or raw keys and deletes them.
// It returns a map keyed by fully namespaced keys.
func (c *HybridCache[V]) DeleteMany(keys []string) (map[string]bool, error) {
res := make(map[string]bool, len(keys))
if len(keys) == 0 {
return res, nil
}
fullKeys := make([]string, 0, len(keys))
for _, k := range keys {
k = c.ns.FullKey(k)
if k == "" {
continue
}
fullKeys = append(fullKeys, k)
}
if len(fullKeys) == 0 {
return res, nil
}
if c.redisOn() {
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisDelTimeout)
defer cancel()
pipe := c.redis.Pipeline()
cmds := make([]*redis.IntCmd, 0, len(fullKeys))
for _, k := range fullKeys {
// UNLINK is non-blocking vs DEL for large key batches.
cmds = append(cmds, pipe.Unlink(ctx, k))
}
_, err := pipe.Exec(ctx)
if err != nil && !errors.Is(err, redis.Nil) {
return res, err
}
for i, cmd := range cmds {
deleted := cmd != nil && cmd.Err() == nil && cmd.Val() > 0
res[fullKeys[i]] = deleted
}
return res, nil
}
return c.memCache().DeleteMany(fullKeys), nil
}
func (c *HybridCache[V]) Capacity() (mainCacheCapacity int, missingCacheCapacity int) {
if c.redisOn() {
return 0, 0
}
return c.memCache().Capacity()
}
func (c *HybridCache[V]) Algorithm() (mainCacheAlgorithm string, missingCacheAlgorithm string) {
if c.redisOn() {
return "redis", ""
}
return c.memCache().Algorithm()
}