diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index 8083d3d1..a9cb2cba 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -51,6 +51,18 @@ type accountRepository struct { schedulerCache service.SchedulerCache } +var schedulerNeutralExtraKeyPrefixes = []string{ + "codex_primary_", + "codex_secondary_", + "codex_5h_", + "codex_7d_", +} + +var schedulerNeutralExtraKeys = map[string]struct{}{ + "codex_usage_updated_at": {}, + "session_window_utilization": {}, +} + // NewAccountRepository 创建账户仓储实例。 // 这是对外暴露的构造函数,返回接口类型以便于依赖注入。 func NewAccountRepository(client *dbent.Client, sqlDB *sql.DB, schedulerCache service.SchedulerCache) service.AccountRepository { @@ -1190,8 +1202,10 @@ func (r *accountRepository) UpdateExtra(ctx context.Context, id int64, updates m if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err) } - } else if shouldSyncSchedulerSnapshotForExtraUpdates(updates) { - // codex 限流快照仍需要让调度缓存尽快看见,避免 DB 抖动时丢失自愈链路。 + } else { + // 观测型 extra 字段不需要触发 bucket 重建,但仍同步单账号快照, + // 让 sticky session / GetAccount 命中缓存时也能读到最新数据, + // 同时避免缓存局部 patch 覆盖掉并发写入的其它账号字段。 r.syncSchedulerAccountSnapshot(ctx, id) } return nil @@ -1202,7 +1216,7 @@ func shouldEnqueueSchedulerOutboxForExtraUpdates(updates map[string]any) bool { return false } for key := range updates { - if isSchedulerNeutralAccountExtraKey(key) { + if isSchedulerNeutralExtraKey(key) { continue } return true @@ -1210,91 +1224,20 @@ func shouldEnqueueSchedulerOutboxForExtraUpdates(updates map[string]any) bool { return false } -func shouldSyncSchedulerSnapshotForExtraUpdates(updates map[string]any) bool { - return codexExtraIndicatesRateLimit(updates, "7d") || codexExtraIndicatesRateLimit(updates, "5h") -} - -func isSchedulerNeutralAccountExtraKey(key string) bool { +func isSchedulerNeutralExtraKey(key string) bool { key = strings.TrimSpace(key) if key == "" { return false } - if key == "session_window_utilization" { + if _, ok := schedulerNeutralExtraKeys[key]; ok { return true } - return strings.HasPrefix(key, "codex_") -} - -func codexExtraIndicatesRateLimit(updates map[string]any, window string) bool { - if len(updates) == 0 { - return false - } - usedValue, ok := updates["codex_"+window+"_used_percent"] - if !ok || !extraValueIndicatesExhausted(usedValue) { - return false - } - return extraValueHasResetMarker(updates["codex_"+window+"_reset_at"]) || - extraValueHasPositiveNumber(updates["codex_"+window+"_reset_after_seconds"]) -} - -func extraValueIndicatesExhausted(value any) bool { - number, ok := extraValueToFloat64(value) - return ok && number >= 100-1e-9 -} - -func extraValueHasPositiveNumber(value any) bool { - number, ok := extraValueToFloat64(value) - return ok && number > 0 -} - -func extraValueHasResetMarker(value any) bool { - switch v := value.(type) { - case string: - return strings.TrimSpace(v) != "" - case time.Time: - return !v.IsZero() - case *time.Time: - return v != nil && !v.IsZero() - default: - return false - } -} - -func extraValueToFloat64(value any) (float64, bool) { - switch v := value.(type) { - case float64: - return v, true - case float32: - return float64(v), true - case int: - return float64(v), true - case int8: - return float64(v), true - case int16: - return float64(v), true - case int32: - return float64(v), true - case int64: - return float64(v), true - case uint: - return float64(v), true - case uint8: - return float64(v), true - case uint16: - return float64(v), true - case uint32: - return float64(v), true - case uint64: - return float64(v), true - case json.Number: - parsed, err := v.Float64() - return parsed, err == nil - case string: - parsed, err := strconv.ParseFloat(strings.TrimSpace(v), 64) - return parsed, err == nil - default: - return 0, false + for _, prefix := range schedulerNeutralExtraKeyPrefixes { + if strings.HasPrefix(key, prefix) { + return true + } } + return false } func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) { diff --git a/backend/internal/repository/account_repo_integration_test.go b/backend/internal/repository/account_repo_integration_test.go index 56f62491..29b699e6 100644 --- a/backend/internal/repository/account_repo_integration_test.go +++ b/backend/internal/repository/account_repo_integration_test.go @@ -23,6 +23,7 @@ type AccountRepoSuite struct { type schedulerCacheRecorder struct { setAccounts []*service.Account + accounts map[int64]*service.Account } func (s *schedulerCacheRecorder) GetSnapshot(ctx context.Context, bucket service.SchedulerBucket) ([]*service.Account, bool, error) { @@ -34,11 +35,20 @@ func (s *schedulerCacheRecorder) SetSnapshot(ctx context.Context, bucket service } func (s *schedulerCacheRecorder) GetAccount(ctx context.Context, accountID int64) (*service.Account, error) { - return nil, nil + if s.accounts == nil { + return nil, nil + } + return s.accounts[accountID], nil } func (s *schedulerCacheRecorder) SetAccount(ctx context.Context, account *service.Account) error { s.setAccounts = append(s.setAccounts, account) + if s.accounts == nil { + s.accounts = make(map[int64]*service.Account) + } + if account != nil { + s.accounts[account.ID] = account + } return nil } @@ -623,21 +633,46 @@ func (s *AccountRepoSuite) TestUpdateExtra_NilExtra() { s.Require().Equal("val", got.Extra["key"]) } -func (s *AccountRepoSuite) TestUpdateExtra_SchedulerNeutralKeysSkipOutbox() { - account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-extra-neutral", Extra: map[string]any{}}) - _, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox") - s.Require().NoError(err) +func (s *AccountRepoSuite) TestUpdateExtra_SchedulerNeutralSkipsOutboxAndSyncsFreshSnapshot() { + account := mustCreateAccount(s.T(), s.client, &service.Account{ + Name: "acc-extra-neutral", + Platform: service.PlatformOpenAI, + Extra: map[string]any{"codex_usage_updated_at": "old"}, + }) + cacheRecorder := &schedulerCacheRecorder{ + accounts: map[int64]*service.Account{ + account.ID: { + ID: account.ID, + Platform: account.Platform, + Status: service.StatusDisabled, + Extra: map[string]any{ + "codex_usage_updated_at": "old", + }, + }, + }, + } + s.repo.schedulerCache = cacheRecorder - s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, map[string]any{ - "codex_usage_updated_at": "2026-03-11T13:00:00Z", - "codex_5h_used_percent": 12.5, + updates := map[string]any{ + "codex_usage_updated_at": "2026-03-11T10:00:00Z", + "codex_5h_used_percent": 88.5, "session_window_utilization": 0.42, - })) + } + s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, updates)) - var count int - err = scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &count) + got, err := s.repo.GetByID(s.ctx, account.ID) s.Require().NoError(err) - s.Require().Equal(0, count) + s.Require().Equal("2026-03-11T10:00:00Z", got.Extra["codex_usage_updated_at"]) + s.Require().Equal(88.5, got.Extra["codex_5h_used_percent"]) + s.Require().Equal(0.42, got.Extra["session_window_utilization"]) + + var outboxCount int + s.Require().NoError(scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &outboxCount)) + s.Require().Zero(outboxCount) + s.Require().Len(cacheRecorder.setAccounts, 1) + s.Require().NotNil(cacheRecorder.accounts[account.ID]) + s.Require().Equal(service.StatusActive, cacheRecorder.accounts[account.ID].Status) + s.Require().Equal("2026-03-11T10:00:00Z", cacheRecorder.accounts[account.ID].Extra["codex_usage_updated_at"]) } func (s *AccountRepoSuite) TestUpdateExtra_ExhaustedCodexSnapshotSyncsSchedulerCache() { @@ -664,16 +699,22 @@ func (s *AccountRepoSuite) TestUpdateExtra_ExhaustedCodexSnapshotSyncsSchedulerC s.Require().Equal(0, count) s.Require().Len(cacheRecorder.setAccounts, 1) s.Require().Equal(account.ID, cacheRecorder.setAccounts[0].ID) + s.Require().Equal(service.StatusActive, cacheRecorder.setAccounts[0].Status) s.Require().Equal(100.0, cacheRecorder.setAccounts[0].Extra["codex_7d_used_percent"]) } -func (s *AccountRepoSuite) TestUpdateExtra_CustomKeysStillEnqueueOutbox() { - account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-extra-custom", Extra: map[string]any{}}) +func (s *AccountRepoSuite) TestUpdateExtra_SchedulerRelevantStillEnqueuesOutbox() { + account := mustCreateAccount(s.T(), s.client, &service.Account{ + Name: "acc-extra-mixed", + Platform: service.PlatformAntigravity, + Extra: map[string]any{}, + }) _, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox") s.Require().NoError(err) s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, map[string]any{ - "custom_scheduler_sensitive_key": true, + "mixed_scheduling": true, + "codex_usage_updated_at": "2026-03-11T10:00:00Z", })) var count int