From 2b30e3b6d78c778a930374f9d8fc778b0698f5e3 Mon Sep 17 00:00:00 2001 From: ius Date: Wed, 11 Mar 2026 19:16:19 +0800 Subject: [PATCH 1/2] Reduce scheduler rebuilds on neutral extra updates --- backend/internal/repository/account_repo.go | 75 ++++++++++++++++++- .../account_repo_integration_test.go | 70 ++++++++++++++++- 2 files changed, 142 insertions(+), 3 deletions(-) diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index c7642152..daa3a878 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -16,6 +16,7 @@ import ( "encoding/json" "errors" "strconv" + "strings" "time" dbent "github.com/Wei-Shaw/sub2api/ent" @@ -50,6 +51,18 @@ type accountRepository struct { schedulerCache service.SchedulerCache } +var schedulerNeutralExtraKeyPrefixes = []string{ + "codex_primary_", + "codex_secondary_", + "codex_5h_", + "codex_7d_", +} + +var schedulerNeutralExtraKeys = map[string]struct{}{ + "codex_usage_updated_at": {}, + "session_window_utilization": {}, +} + // NewAccountRepository 创建账户仓储实例。 // 这是对外暴露的构造函数,返回接口类型以便于依赖注入。 func NewAccountRepository(client *dbent.Client, sqlDB *sql.DB, schedulerCache service.SchedulerCache) service.AccountRepository { @@ -613,6 +626,29 @@ func (r *accountRepository) syncSchedulerAccountSnapshot(ctx context.Context, ac } } +func (r *accountRepository) patchSchedulerAccountExtra(ctx context.Context, accountID int64, updates map[string]any) { + if r == nil || r.schedulerCache == nil || accountID <= 0 || len(updates) == 0 { + return + } + account, err := r.schedulerCache.GetAccount(ctx, accountID) + if err != nil { + logger.LegacyPrintf("repository.account", "[Scheduler] patch account extra read failed: id=%d err=%v", accountID, err) + return + } + if account == nil { + return + } + if account.Extra == nil { + account.Extra = make(map[string]any, len(updates)) + } + for key, value := range updates { + account.Extra[key] = value + } + if err := r.schedulerCache.SetAccount(ctx, account); err != nil { + logger.LegacyPrintf("repository.account", "[Scheduler] patch account extra write failed: id=%d err=%v", accountID, err) + } +} + func (r *accountRepository) syncSchedulerAccountSnapshots(ctx context.Context, accountIDs []int64) { if r == nil || r.schedulerCache == nil || len(accountIDs) == 0 { return @@ -1185,12 +1221,47 @@ func (r *accountRepository) UpdateExtra(ctx context.Context, id int64, updates m if affected == 0 { return service.ErrAccountNotFound } - if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { - logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err) + + if shouldEnqueueSchedulerOutboxForExtraUpdates(updates) { + if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { + logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err) + } + } else { + // 观测型 extra 字段不需要触发 bucket 重建,但尽量把单账号缓存补到最新, + // 让 sticky session / GetAccount 命中缓存时也能读到最新快照。 + r.patchSchedulerAccountExtra(ctx, id, updates) } return nil } +func shouldEnqueueSchedulerOutboxForExtraUpdates(updates map[string]any) bool { + if len(updates) == 0 { + return false + } + for key := range updates { + if !isSchedulerNeutralExtraKey(key) { + return true + } + } + return false +} + +func isSchedulerNeutralExtraKey(key string) bool { + key = strings.TrimSpace(key) + if key == "" { + return false + } + if _, ok := schedulerNeutralExtraKeys[key]; ok { + return true + } + for _, prefix := range schedulerNeutralExtraKeyPrefixes { + if strings.HasPrefix(key, prefix) { + return true + } + } + return false +} + func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) { if len(ids) == 0 { return 0, nil diff --git a/backend/internal/repository/account_repo_integration_test.go b/backend/internal/repository/account_repo_integration_test.go index 58971933..1381fd11 100644 --- a/backend/internal/repository/account_repo_integration_test.go +++ b/backend/internal/repository/account_repo_integration_test.go @@ -23,6 +23,7 @@ type AccountRepoSuite struct { type schedulerCacheRecorder struct { setAccounts []*service.Account + accounts map[int64]*service.Account } func (s *schedulerCacheRecorder) GetSnapshot(ctx context.Context, bucket service.SchedulerBucket) ([]*service.Account, bool, error) { @@ -34,11 +35,20 @@ func (s *schedulerCacheRecorder) SetSnapshot(ctx context.Context, bucket service } func (s *schedulerCacheRecorder) GetAccount(ctx context.Context, accountID int64) (*service.Account, error) { - return nil, nil + if s.accounts == nil { + return nil, nil + } + return s.accounts[accountID], nil } func (s *schedulerCacheRecorder) SetAccount(ctx context.Context, account *service.Account) error { s.setAccounts = append(s.setAccounts, account) + if s.accounts == nil { + s.accounts = make(map[int64]*service.Account) + } + if account != nil { + s.accounts[account.ID] = account + } return nil } @@ -623,6 +633,64 @@ func (s *AccountRepoSuite) TestUpdateExtra_NilExtra() { s.Require().Equal("val", got.Extra["key"]) } +func (s *AccountRepoSuite) TestUpdateExtra_SchedulerNeutralSkipsOutboxAndPatchesCache() { + account := mustCreateAccount(s.T(), s.client, &service.Account{ + Name: "acc-extra-neutral", + Platform: service.PlatformOpenAI, + Extra: map[string]any{"codex_usage_updated_at": "old"}, + }) + cacheRecorder := &schedulerCacheRecorder{ + accounts: map[int64]*service.Account{ + account.ID: { + ID: account.ID, + Platform: account.Platform, + Extra: map[string]any{ + "codex_usage_updated_at": "old", + }, + }, + }, + } + s.repo.schedulerCache = cacheRecorder + + updates := map[string]any{ + "codex_usage_updated_at": "2026-03-11T10:00:00Z", + "codex_5h_used_percent": 88.5, + "session_window_utilization": 0.42, + } + s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, updates)) + + got, err := s.repo.GetByID(s.ctx, account.ID) + s.Require().NoError(err) + s.Require().Equal("2026-03-11T10:00:00Z", got.Extra["codex_usage_updated_at"]) + s.Require().Equal(88.5, got.Extra["codex_5h_used_percent"]) + s.Require().Equal(0.42, got.Extra["session_window_utilization"]) + + var outboxCount int + s.Require().NoError(scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &outboxCount)) + s.Require().Zero(outboxCount) + s.Require().Len(cacheRecorder.setAccounts, 1) + s.Require().NotNil(cacheRecorder.accounts[account.ID]) + s.Require().Equal("2026-03-11T10:00:00Z", cacheRecorder.accounts[account.ID].Extra["codex_usage_updated_at"]) +} + +func (s *AccountRepoSuite) TestUpdateExtra_SchedulerRelevantStillEnqueuesOutbox() { + account := mustCreateAccount(s.T(), s.client, &service.Account{ + Name: "acc-extra-mixed", + Platform: service.PlatformAntigravity, + Extra: map[string]any{}, + }) + + updates := map[string]any{ + "mixed_scheduling": true, + "codex_usage_updated_at": "2026-03-11T10:00:00Z", + } + s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, updates)) + + var outboxCount int + s.Require().NoError(scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &outboxCount)) + s.Require().Equal(1, outboxCount) +} + // --- GetByCRSAccountID --- func (s *AccountRepoSuite) TestGetByCRSAccountID() { From 5c13ec312186e750dbddd5b03d5909c514d12346 Mon Sep 17 00:00:00 2001 From: ius Date: Thu, 12 Mar 2026 11:20:59 +0800 Subject: [PATCH 2/2] Fix lint after rebasing PR #938 branch --- backend/internal/repository/account_repo.go | 76 --------------------- 1 file changed, 76 deletions(-) diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index 4e7c418c..a9cb2cba 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -1240,82 +1240,6 @@ func isSchedulerNeutralExtraKey(key string) bool { return false } -func shouldSyncSchedulerSnapshotForExtraUpdates(updates map[string]any) bool { - return codexExtraIndicatesRateLimit(updates, "7d") || codexExtraIndicatesRateLimit(updates, "5h") -} - -func codexExtraIndicatesRateLimit(updates map[string]any, window string) bool { - if len(updates) == 0 { - return false - } - usedValue, ok := updates["codex_"+window+"_used_percent"] - if !ok || !extraValueIndicatesExhausted(usedValue) { - return false - } - return extraValueHasResetMarker(updates["codex_"+window+"_reset_at"]) || - extraValueHasPositiveNumber(updates["codex_"+window+"_reset_after_seconds"]) -} - -func extraValueIndicatesExhausted(value any) bool { - number, ok := extraValueToFloat64(value) - return ok && number >= 100-1e-9 -} - -func extraValueHasPositiveNumber(value any) bool { - number, ok := extraValueToFloat64(value) - return ok && number > 0 -} - -func extraValueHasResetMarker(value any) bool { - switch v := value.(type) { - case string: - return strings.TrimSpace(v) != "" - case time.Time: - return !v.IsZero() - case *time.Time: - return v != nil && !v.IsZero() - default: - return false - } -} - -func extraValueToFloat64(value any) (float64, bool) { - switch v := value.(type) { - case float64: - return v, true - case float32: - return float64(v), true - case int: - return float64(v), true - case int8: - return float64(v), true - case int16: - return float64(v), true - case int32: - return float64(v), true - case int64: - return float64(v), true - case uint: - return float64(v), true - case uint8: - return float64(v), true - case uint16: - return float64(v), true - case uint32: - return float64(v), true - case uint64: - return float64(v), true - case json.Number: - parsed, err := v.Float64() - return parsed, err == nil - case string: - parsed, err := strconv.ParseFloat(strings.TrimSpace(v), 64) - return parsed, err == nil - default: - return 0, false - } -} - func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) { if len(ids) == 0 { return 0, nil