feat: channel affinity (#2669)

* feat: channel affinity

* feat: channel affinity -> model setting

* fix: channel affinity

* feat: channel affinity op

* feat: channel_type setting

* feat: clean

* feat: cache supports both memory and Redis.

* feat: Optimise ui/ux

* feat: Optimise ui/ux

* feat: Optimise codex usage ui/ux

* feat: Optimise ui/ux

* feat: Optimise ui/ux

* feat: Optimise ui/ux

* feat: If the affinitized channel fails and a retry succeeds on another channel, update the affinity to the successful channel
This commit is contained in:
Seefs
2026-01-26 19:57:41 +08:00
committed by GitHub
parent 7da04be52b
commit d7d3a2f763
24 changed files with 2542 additions and 125 deletions

53
pkg/cachex/codec.go Normal file
View File

@@ -0,0 +1,53 @@
package cachex
import (
"encoding/json"
"fmt"
"strconv"
"strings"
)
type ValueCodec[V any] interface {
Encode(v V) (string, error)
Decode(s string) (V, error)
}
type IntCodec struct{}
func (c IntCodec) Encode(v int) (string, error) {
return strconv.Itoa(v), nil
}
func (c IntCodec) Decode(s string) (int, error) {
s = strings.TrimSpace(s)
if s == "" {
return 0, fmt.Errorf("empty int value")
}
return strconv.Atoi(s)
}
type StringCodec struct{}
func (c StringCodec) Encode(v string) (string, error) { return v, nil }
func (c StringCodec) Decode(s string) (string, error) { return s, nil }
type JSONCodec[V any] struct{}
func (c JSONCodec[V]) Encode(v V) (string, error) {
b, err := json.Marshal(v)
if err != nil {
return "", err
}
return string(b), nil
}
func (c JSONCodec[V]) Decode(s string) (V, error) {
var v V
if strings.TrimSpace(s) == "" {
return v, fmt.Errorf("empty json value")
}
if err := json.Unmarshal([]byte(s), &v); err != nil {
return v, err
}
return v, nil
}

285
pkg/cachex/hybrid_cache.go Normal file
View File

@@ -0,0 +1,285 @@
package cachex
import (
"context"
"errors"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/samber/hot"
)
const (
defaultRedisOpTimeout = 2 * time.Second
defaultRedisScanTimeout = 30 * time.Second
defaultRedisDelTimeout = 10 * time.Second
)
type HybridCacheConfig[V any] struct {
Namespace Namespace
// Redis is used when RedisEnabled returns true (or RedisEnabled is nil) and Redis is not nil.
Redis *redis.Client
RedisCodec ValueCodec[V]
RedisEnabled func() bool
// Memory builds a hot cache used when Redis is disabled. Keys stored in memory are fully namespaced.
Memory func() *hot.HotCache[string, V]
}
// HybridCache is a small helper that uses Redis when enabled, otherwise falls back to in-memory hot cache.
type HybridCache[V any] struct {
ns Namespace
redis *redis.Client
redisCodec ValueCodec[V]
redisEnabled func() bool
memOnce sync.Once
memInit func() *hot.HotCache[string, V]
mem *hot.HotCache[string, V]
}
func NewHybridCache[V any](cfg HybridCacheConfig[V]) *HybridCache[V] {
return &HybridCache[V]{
ns: cfg.Namespace,
redis: cfg.Redis,
redisCodec: cfg.RedisCodec,
redisEnabled: cfg.RedisEnabled,
memInit: cfg.Memory,
}
}
func (c *HybridCache[V]) FullKey(key string) string {
return c.ns.FullKey(key)
}
func (c *HybridCache[V]) redisOn() bool {
if c.redis == nil || c.redisCodec == nil {
return false
}
if c.redisEnabled == nil {
return true
}
return c.redisEnabled()
}
func (c *HybridCache[V]) memCache() *hot.HotCache[string, V] {
c.memOnce.Do(func() {
if c.memInit == nil {
c.mem = hot.NewHotCache[string, V](hot.LRU, 1).Build()
return
}
c.mem = c.memInit()
})
return c.mem
}
func (c *HybridCache[V]) Get(key string) (value V, found bool, err error) {
full := c.ns.FullKey(key)
if full == "" {
var zero V
return zero, false, nil
}
if c.redisOn() {
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
defer cancel()
raw, e := c.redis.Get(ctx, full).Result()
if e == nil {
v, decErr := c.redisCodec.Decode(raw)
if decErr != nil {
var zero V
return zero, false, decErr
}
return v, true, nil
}
if errors.Is(e, redis.Nil) {
var zero V
return zero, false, nil
}
var zero V
return zero, false, e
}
return c.memCache().Get(full)
}
func (c *HybridCache[V]) SetWithTTL(key string, v V, ttl time.Duration) error {
full := c.ns.FullKey(key)
if full == "" {
return nil
}
if c.redisOn() {
raw, err := c.redisCodec.Encode(v)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisOpTimeout)
defer cancel()
return c.redis.Set(ctx, full, raw, ttl).Err()
}
c.memCache().SetWithTTL(full, v, ttl)
return nil
}
// Keys returns keys with valid values. In Redis, it returns all matching keys.
func (c *HybridCache[V]) Keys() ([]string, error) {
if c.redisOn() {
return c.scanKeys(c.ns.MatchPattern())
}
return c.memCache().Keys(), nil
}
func (c *HybridCache[V]) scanKeys(match string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisScanTimeout)
defer cancel()
var cursor uint64
keys := make([]string, 0, 1024)
for {
k, next, err := c.redis.Scan(ctx, cursor, match, 1000).Result()
if err != nil {
return keys, err
}
keys = append(keys, k...)
cursor = next
if cursor == 0 {
break
}
}
return keys, nil
}
func (c *HybridCache[V]) Purge() error {
if c.redisOn() {
keys, err := c.scanKeys(c.ns.MatchPattern())
if err != nil {
return err
}
if len(keys) == 0 {
return nil
}
_, err = c.DeleteMany(keys)
return err
}
c.memCache().Purge()
return nil
}
func (c *HybridCache[V]) DeleteByPrefix(prefix string) (int, error) {
fullPrefix := c.ns.FullKey(prefix)
if fullPrefix == "" {
return 0, nil
}
if !strings.HasSuffix(fullPrefix, ":") {
fullPrefix += ":"
}
if c.redisOn() {
match := fullPrefix + "*"
keys, err := c.scanKeys(match)
if err != nil {
return 0, err
}
if len(keys) == 0 {
return 0, nil
}
res, err := c.DeleteMany(keys)
if err != nil {
return 0, err
}
deleted := 0
for _, ok := range res {
if ok {
deleted++
}
}
return deleted, nil
}
// In memory, we filter keys and bulk delete.
allKeys := c.memCache().Keys()
keys := make([]string, 0, 128)
for _, k := range allKeys {
if strings.HasPrefix(k, fullPrefix) {
keys = append(keys, k)
}
}
if len(keys) == 0 {
return 0, nil
}
res, _ := c.DeleteMany(keys)
deleted := 0
for _, ok := range res {
if ok {
deleted++
}
}
return deleted, nil
}
// DeleteMany accepts either fully namespaced keys or raw keys and deletes them.
// It returns a map keyed by fully namespaced keys.
func (c *HybridCache[V]) DeleteMany(keys []string) (map[string]bool, error) {
res := make(map[string]bool, len(keys))
if len(keys) == 0 {
return res, nil
}
fullKeys := make([]string, 0, len(keys))
for _, k := range keys {
k = c.ns.FullKey(k)
if k == "" {
continue
}
fullKeys = append(fullKeys, k)
}
if len(fullKeys) == 0 {
return res, nil
}
if c.redisOn() {
ctx, cancel := context.WithTimeout(context.Background(), defaultRedisDelTimeout)
defer cancel()
pipe := c.redis.Pipeline()
cmds := make([]*redis.IntCmd, 0, len(fullKeys))
for _, k := range fullKeys {
// UNLINK is non-blocking vs DEL for large key batches.
cmds = append(cmds, pipe.Unlink(ctx, k))
}
_, err := pipe.Exec(ctx)
if err != nil && !errors.Is(err, redis.Nil) {
return res, err
}
for i, cmd := range cmds {
deleted := cmd != nil && cmd.Err() == nil && cmd.Val() > 0
res[fullKeys[i]] = deleted
}
return res, nil
}
return c.memCache().DeleteMany(fullKeys), nil
}
func (c *HybridCache[V]) Capacity() (mainCacheCapacity int, missingCacheCapacity int) {
if c.redisOn() {
return 0, 0
}
return c.memCache().Capacity()
}
func (c *HybridCache[V]) Algorithm() (mainCacheAlgorithm string, missingCacheAlgorithm string) {
if c.redisOn() {
return "redis", ""
}
return c.memCache().Algorithm()
}

38
pkg/cachex/namespace.go Normal file
View File

@@ -0,0 +1,38 @@
package cachex
import "strings"
// Namespace isolates keys between different cache use-cases. (e.g. "channel_affinity:v1").
type Namespace string
func (n Namespace) prefix() string {
ns := strings.TrimSpace(string(n))
ns = strings.TrimRight(ns, ":")
if ns == "" {
return ""
}
return ns + ":"
}
func (n Namespace) FullKey(key string) string {
key = strings.TrimSpace(key)
if key == "" {
return ""
}
p := n.prefix()
if p == "" {
return strings.TrimLeft(key, ":")
}
if strings.HasPrefix(key, p) {
return key
}
return p + strings.TrimLeft(key, ":")
}
func (n Namespace) MatchPattern() string {
p := n.prefix()
if p == "" {
return "*"
}
return p + "*"
}