From 2fc6aaf9364f50f9af3c119a1b1abd3e43c064e6 Mon Sep 17 00:00:00 2001 From: ius Date: Wed, 11 Mar 2026 15:47:39 +0800 Subject: [PATCH] Fix Codex exhausted snapshot propagation --- backend/internal/repository/account_repo.go | 79 +++++++++++++++++ .../account_repo_integration_test.go | 27 ++++++ .../internal/service/account_usage_service.go | 87 ++++++++++++------- .../service/account_usage_service_test.go | 82 +++++++++++++++++ .../service/openai_gateway_service.go | 3 + 5 files changed, 248 insertions(+), 30 deletions(-) diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index 2aa72ebb..8083d3d1 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -1190,6 +1190,9 @@ func (r *accountRepository) UpdateExtra(ctx context.Context, id int64, updates m if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err) } + } else if shouldSyncSchedulerSnapshotForExtraUpdates(updates) { + // codex 限流快照仍需要让调度缓存尽快看见,避免 DB 抖动时丢失自愈链路。 + r.syncSchedulerAccountSnapshot(ctx, id) } return nil } @@ -1207,6 +1210,10 @@ func shouldEnqueueSchedulerOutboxForExtraUpdates(updates map[string]any) bool { return false } +func shouldSyncSchedulerSnapshotForExtraUpdates(updates map[string]any) bool { + return codexExtraIndicatesRateLimit(updates, "7d") || codexExtraIndicatesRateLimit(updates, "5h") +} + func isSchedulerNeutralAccountExtraKey(key string) bool { key = strings.TrimSpace(key) if key == "" { @@ -1218,6 +1225,78 @@ func isSchedulerNeutralAccountExtraKey(key string) bool { return strings.HasPrefix(key, "codex_") } +func codexExtraIndicatesRateLimit(updates map[string]any, window string) bool { + if len(updates) == 0 { + return false + } + usedValue, ok := updates["codex_"+window+"_used_percent"] + if !ok || !extraValueIndicatesExhausted(usedValue) { + return false + } + return extraValueHasResetMarker(updates["codex_"+window+"_reset_at"]) || + extraValueHasPositiveNumber(updates["codex_"+window+"_reset_after_seconds"]) +} + +func extraValueIndicatesExhausted(value any) bool { + number, ok := extraValueToFloat64(value) + return ok && number >= 100-1e-9 +} + +func extraValueHasPositiveNumber(value any) bool { + number, ok := extraValueToFloat64(value) + return ok && number > 0 +} + +func extraValueHasResetMarker(value any) bool { + switch v := value.(type) { + case string: + return strings.TrimSpace(v) != "" + case time.Time: + return !v.IsZero() + case *time.Time: + return v != nil && !v.IsZero() + default: + return false + } +} + +func extraValueToFloat64(value any) (float64, bool) { + switch v := value.(type) { + case float64: + return v, true + case float32: + return float64(v), true + case int: + return float64(v), true + case int8: + return float64(v), true + case int16: + return float64(v), true + case int32: + return float64(v), true + case int64: + return float64(v), true + case uint: + return float64(v), true + case uint8: + return float64(v), true + case uint16: + return float64(v), true + case uint32: + return float64(v), true + case uint64: + return float64(v), true + case json.Number: + parsed, err := v.Float64() + return parsed, err == nil + case string: + parsed, err := strconv.ParseFloat(strings.TrimSpace(v), 64) + return parsed, err == nil + default: + return 0, false + } +} + func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) { if len(ids) == 0 { return 0, nil diff --git a/backend/internal/repository/account_repo_integration_test.go b/backend/internal/repository/account_repo_integration_test.go index caf8d3f3..56f62491 100644 --- a/backend/internal/repository/account_repo_integration_test.go +++ b/backend/internal/repository/account_repo_integration_test.go @@ -640,6 +640,33 @@ func (s *AccountRepoSuite) TestUpdateExtra_SchedulerNeutralKeysSkipOutbox() { s.Require().Equal(0, count) } +func (s *AccountRepoSuite) TestUpdateExtra_ExhaustedCodexSnapshotSyncsSchedulerCache() { + account := mustCreateAccount(s.T(), s.client, &service.Account{ + Name: "acc-extra-codex-exhausted", + Platform: service.PlatformOpenAI, + Type: service.AccountTypeOAuth, + Extra: map[string]any{}, + }) + cacheRecorder := &schedulerCacheRecorder{} + s.repo.schedulerCache = cacheRecorder + _, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox") + s.Require().NoError(err) + + s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, map[string]any{ + "codex_7d_used_percent": 100.0, + "codex_7d_reset_at": "2026-03-12T13:00:00Z", + "codex_7d_reset_after_seconds": 86400, + })) + + var count int + err = scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &count) + s.Require().NoError(err) + s.Require().Equal(0, count) + s.Require().Len(cacheRecorder.setAccounts, 1) + s.Require().Equal(account.ID, cacheRecorder.setAccounts[0].ID) + s.Require().Equal(100.0, cacheRecorder.setAccounts[0].Extra["codex_7d_used_percent"]) +} + func (s *AccountRepoSuite) TestUpdateExtra_CustomKeysStillEnqueueOutbox() { account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-extra-custom", Extra: map[string]any{}}) _, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox") diff --git a/backend/internal/service/account_usage_service.go b/backend/internal/service/account_usage_service.go index 7c001118..e4245133 100644 --- a/backend/internal/service/account_usage_service.go +++ b/backend/internal/service/account_usage_service.go @@ -369,8 +369,11 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou } if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) { - if updates, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && len(updates) > 0 { + if updates, resetAt, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && (len(updates) > 0 || resetAt != nil) { mergeAccountExtra(account, updates) + if resetAt != nil { + account.RateLimitResetAt = resetAt + } if usage.UpdatedAt == nil { usage.UpdatedAt = &now } @@ -457,26 +460,26 @@ func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, no return true } -func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, error) { +func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, *time.Time, error) { if account == nil || !account.IsOAuth() { - return nil, nil + return nil, nil, nil } accessToken := account.GetOpenAIAccessToken() if accessToken == "" { - return nil, fmt.Errorf("no access token available") + return nil, nil, fmt.Errorf("no access token available") } modelID := openaipkg.DefaultTestModel payload := createOpenAITestPayload(modelID, true) payloadBytes, err := json.Marshal(payload) if err != nil { - return nil, fmt.Errorf("marshal openai probe payload: %w", err) + return nil, nil, fmt.Errorf("marshal openai probe payload: %w", err) } reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, chatgptCodexURL, bytes.NewReader(payloadBytes)) if err != nil { - return nil, fmt.Errorf("create openai probe request: %w", err) + return nil, nil, fmt.Errorf("create openai probe request: %w", err) } req.Host = "chatgpt.com" req.Header.Set("Content-Type", "application/json") @@ -505,43 +508,67 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco ResponseHeaderTimeout: 10 * time.Second, }) if err != nil { - return nil, fmt.Errorf("build openai probe client: %w", err) + return nil, nil, fmt.Errorf("build openai probe client: %w", err) } resp, err := client.Do(req) if err != nil { - return nil, fmt.Errorf("openai codex probe request failed: %w", err) + return nil, nil, fmt.Errorf("openai codex probe request failed: %w", err) } defer func() { _ = resp.Body.Close() }() - updates, err := extractOpenAICodexProbeUpdates(resp) + updates, resetAt, err := extractOpenAICodexProbeSnapshot(resp) if err != nil { - return nil, err + return nil, nil, err } - if len(updates) > 0 { - go func(accountID int64, updates map[string]any) { - updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer updateCancel() + if len(updates) > 0 || resetAt != nil { + s.persistOpenAICodexProbeSnapshot(account.ID, updates, resetAt) + return updates, resetAt, nil + } + return nil, nil, nil +} + +func (s *AccountUsageService) persistOpenAICodexProbeSnapshot(accountID int64, updates map[string]any, resetAt *time.Time) { + if s == nil || s.accountRepo == nil || accountID <= 0 { + return + } + if len(updates) == 0 && resetAt == nil { + return + } + + go func() { + updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer updateCancel() + if len(updates) > 0 { _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates) - }(account.ID, updates) - return updates, nil + } + if resetAt != nil { + _ = s.accountRepo.SetRateLimited(updateCtx, accountID, *resetAt) + } + }() +} + +func extractOpenAICodexProbeSnapshot(resp *http.Response) (map[string]any, *time.Time, error) { + if resp == nil { + return nil, nil, nil } - return nil, nil + if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil { + baseTime := time.Now() + updates := buildCodexUsageExtraUpdates(snapshot, baseTime) + resetAt := codexRateLimitResetAtFromSnapshot(snapshot, baseTime) + if len(updates) > 0 { + return updates, resetAt, nil + } + return nil, resetAt, nil + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode) + } + return nil, nil, nil } func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) { - if resp == nil { - return nil, nil - } - if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil { - updates := buildCodexUsageExtraUpdates(snapshot, time.Now()) - if len(updates) > 0 { - return updates, nil - } - } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode) - } - return nil, nil + updates, _, err := extractOpenAICodexProbeSnapshot(resp) + return updates, err } func mergeAccountExtra(account *Account, updates map[string]any) { diff --git a/backend/internal/service/account_usage_service_test.go b/backend/internal/service/account_usage_service_test.go index 974d9029..a063fe26 100644 --- a/backend/internal/service/account_usage_service_test.go +++ b/backend/internal/service/account_usage_service_test.go @@ -1,11 +1,36 @@ package service import ( + "context" "net/http" "testing" "time" ) +type accountUsageCodexProbeRepo struct { + stubOpenAIAccountRepo + updateExtraCh chan map[string]any + rateLimitCh chan time.Time +} + +func (r *accountUsageCodexProbeRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error { + if r.updateExtraCh != nil { + copied := make(map[string]any, len(updates)) + for k, v := range updates { + copied[k] = v + } + r.updateExtraCh <- copied + } + return nil +} + +func (r *accountUsageCodexProbeRepo) SetRateLimited(_ context.Context, _ int64, resetAt time.Time) error { + if r.rateLimitCh != nil { + r.rateLimitCh <- resetAt + } + return nil +} + func TestShouldRefreshOpenAICodexSnapshot(t *testing.T) { t.Parallel() @@ -66,3 +91,60 @@ func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T) t.Fatalf("codex_7d_used_percent = %v, want 100", got) } } + +func TestExtractOpenAICodexProbeSnapshotAccepts429WithResetAt(t *testing.T) { + t.Parallel() + + headers := make(http.Header) + headers.Set("x-codex-primary-used-percent", "100") + headers.Set("x-codex-primary-reset-after-seconds", "604800") + headers.Set("x-codex-primary-window-minutes", "10080") + headers.Set("x-codex-secondary-used-percent", "100") + headers.Set("x-codex-secondary-reset-after-seconds", "18000") + headers.Set("x-codex-secondary-window-minutes", "300") + + updates, resetAt, err := extractOpenAICodexProbeSnapshot(&http.Response{StatusCode: http.StatusTooManyRequests, Header: headers}) + if err != nil { + t.Fatalf("extractOpenAICodexProbeSnapshot() error = %v", err) + } + if len(updates) == 0 { + t.Fatal("expected codex probe updates from 429 headers") + } + if resetAt == nil { + t.Fatal("expected resetAt from exhausted codex headers") + } +} + +func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *testing.T) { + t.Parallel() + + repo := &accountUsageCodexProbeRepo{ + updateExtraCh: make(chan map[string]any, 1), + rateLimitCh: make(chan time.Time, 1), + } + svc := &AccountUsageService{accountRepo: repo} + resetAt := time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second) + + svc.persistOpenAICodexProbeSnapshot(321, map[string]any{ + "codex_7d_used_percent": 100.0, + "codex_7d_reset_at": resetAt.Format(time.RFC3339), + }, &resetAt) + + select { + case updates := <-repo.updateExtraCh: + if got := updates["codex_7d_used_percent"]; got != 100.0 { + t.Fatalf("codex_7d_used_percent = %v, want 100", got) + } + case <-time.After(2 * time.Second): + t.Fatal("waiting for codex probe extra persistence timed out") + } + + select { + case got := <-repo.rateLimitCh: + if got.Before(resetAt.Add(-time.Second)) || got.After(resetAt.Add(time.Second)) { + t.Fatalf("rate limit resetAt = %v, want around %v", got, resetAt) + } + case <-time.After(2 * time.Second): + t.Fatal("waiting for codex probe rate limit persistence timed out") + } +} diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 233566f3..0bf924b8 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -4102,6 +4102,9 @@ func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, acc return } shouldPersistUpdates := len(updates) > 0 && s.getCodexSnapshotThrottle().Allow(accountID, now) + if !shouldPersistUpdates && resetAt == nil { + return + } go func() { updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)