From 0debe0a80c9ae53d848bc04c0b46e76473fbe11b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E4=B9=90?= <6682635@qq.com> Date: Sat, 7 Mar 2026 20:02:58 +0800 Subject: [PATCH 1/6] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20OpenAI=20WS=20?= =?UTF-8?q?=E7=94=A8=E9=87=8F=E7=AA=97=E5=8F=A3=E5=88=B7=E6=96=B0=E4=B8=8E?= =?UTF-8?q?=E9=99=90=E9=A2=9D=E7=BA=A0=E5=81=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/service/account_test_service.go | 10 ++ .../account_test_service_openai_test.go | 66 +++++++ .../internal/service/account_usage_service.go | 64 ++++++- .../service/account_usage_service_test.go | 68 +++++++ backend/internal/service/ratelimit_service.go | 18 ++ .../service/ratelimit_service_openai_test.go | 46 +++++ .../components/account/AccountUsageCell.vue | 68 ++++++- .../components/account/UsageProgressBar.vue | 60 +------ .../__tests__/AccountUsageCell.spec.ts | 167 ++++++++++++++++++ .../__tests__/accountUsageRefresh.spec.ts | 39 ++++ frontend/src/utils/accountUsageRefresh.ts | 28 +++ frontend/src/views/admin/AccountsView.vue | 4 +- 12 files changed, 568 insertions(+), 70 deletions(-) create mode 100644 backend/internal/service/account_test_service_openai_test.go create mode 100644 backend/internal/service/account_usage_service_test.go create mode 100644 frontend/src/utils/__tests__/accountUsageRefresh.spec.ts create mode 100644 frontend/src/utils/accountUsageRefresh.ts diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index 9557e175..5b22c645 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -408,6 +408,16 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + if isOAuth && s.accountRepo != nil { + if updates, err := extractOpenAICodexProbeUpdates(resp); err == nil && len(updates) > 0 { + _ = s.accountRepo.UpdateExtra(ctx, account.ID, updates) + mergeAccountExtra(account, updates) + } + if resetAt := (&RateLimitService{}).calculateOpenAI429ResetTime(resp.Header); resetAt != nil { + _ = s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt) + account.RateLimitResetAt = resetAt + } + } return s.sendErrorAndEnd(c, fmt.Sprintf("API returned %d: %s", resp.StatusCode, string(body))) } diff --git a/backend/internal/service/account_test_service_openai_test.go b/backend/internal/service/account_test_service_openai_test.go new file mode 100644 index 00000000..61a755a7 --- /dev/null +++ b/backend/internal/service/account_test_service_openai_test.go @@ -0,0 +1,66 @@ +//go:build unit + +package service + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" +) + +type openAIAccountTestRepo struct { + mockAccountRepoForGemini + updatedExtra map[string]any + rateLimitedID int64 + rateLimitedAt *time.Time +} + +func (r *openAIAccountTestRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error { + r.updatedExtra = updates + return nil +} + +func (r *openAIAccountTestRepo) SetRateLimited(_ context.Context, id int64, resetAt time.Time) error { + r.rateLimitedID = id + r.rateLimitedAt = &resetAt + return nil +} + +func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T) { + gin.SetMode(gin.TestMode) + ctx, _ := newSoraTestContext() + + resp := newJSONResponse(http.StatusTooManyRequests, `{"error":{"type":"usage_limit_reached","message":"limit reached"}}`) + resp.Header.Set("x-codex-primary-used-percent", "100") + resp.Header.Set("x-codex-primary-reset-after-seconds", "604800") + resp.Header.Set("x-codex-primary-window-minutes", "10080") + resp.Header.Set("x-codex-secondary-used-percent", "100") + resp.Header.Set("x-codex-secondary-reset-after-seconds", "18000") + resp.Header.Set("x-codex-secondary-window-minutes", "300") + + repo := &openAIAccountTestRepo{} + upstream := &queuedHTTPUpstream{responses: []*http.Response{resp}} + svc := &AccountTestService{accountRepo: repo, httpUpstream: upstream} + account := &Account{ + ID: 88, + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{"access_token": "test-token"}, + } + + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4") + require.Error(t, err) + require.NotEmpty(t, repo.updatedExtra) + require.Equal(t, 100.0, repo.updatedExtra["codex_5h_used_percent"]) + require.Equal(t, int64(88), repo.rateLimitedID) + require.NotNil(t, repo.rateLimitedAt) + require.NotNil(t, account.RateLimitResetAt) + if account.RateLimitResetAt != nil && repo.rateLimitedAt != nil { + require.WithinDuration(t, *repo.rateLimitedAt, *account.RateLimitResetAt, time.Second) + } +} diff --git a/backend/internal/service/account_usage_service.go b/backend/internal/service/account_usage_service.go index b0a4900d..9bb3aa0b 100644 --- a/backend/internal/service/account_usage_service.go +++ b/backend/internal/service/account_usage_service.go @@ -367,7 +367,7 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou usage.SevenDay = progress } - if (usage.FiveHour == nil || usage.SevenDay == nil) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) { + if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) { if updates, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && len(updates) > 0 { mergeAccountExtra(account, updates) if usage.UpdatedAt == nil { @@ -409,6 +409,40 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou return usage, nil } +func shouldRefreshOpenAICodexSnapshot(account *Account, usage *UsageInfo, now time.Time) bool { + if account == nil { + return false + } + if usage == nil { + return true + } + if usage.FiveHour == nil || usage.SevenDay == nil { + return true + } + if account.IsRateLimited() { + return true + } + return isOpenAICodexSnapshotStale(account, now) +} + +func isOpenAICodexSnapshotStale(account *Account, now time.Time) bool { + if account == nil || !account.IsOpenAIOAuth() || !account.IsOpenAIResponsesWebSocketV2Enabled() { + return false + } + if account.Extra == nil { + return true + } + raw, ok := account.Extra["codex_usage_updated_at"] + if !ok { + return true + } + ts, err := parseTime(fmt.Sprint(raw)) + if err != nil { + return true + } + return now.Sub(ts) >= openAIProbeCacheTTL +} + func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, now time.Time) bool { if s == nil || s.cache == nil || accountID <= 0 { return true @@ -478,20 +512,34 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco } defer func() { _ = resp.Body.Close() }() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode) + updates, err := extractOpenAICodexProbeUpdates(resp) + if err != nil { + return nil, err + } + if len(updates) > 0 { + go func(accountID int64, updates map[string]any) { + updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer updateCancel() + _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates) + }(account.ID, updates) + return updates, nil + } + return nil, nil +} + +func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) { + if resp == nil { + return nil, nil } if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil { updates := buildCodexUsageExtraUpdates(snapshot, time.Now()) if len(updates) > 0 { - go func(accountID int64, updates map[string]any) { - updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer updateCancel() - _ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates) - }(account.ID, updates) return updates, nil } } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode) + } return nil, nil } diff --git a/backend/internal/service/account_usage_service_test.go b/backend/internal/service/account_usage_service_test.go new file mode 100644 index 00000000..974d9029 --- /dev/null +++ b/backend/internal/service/account_usage_service_test.go @@ -0,0 +1,68 @@ +package service + +import ( + "net/http" + "testing" + "time" +) + +func TestShouldRefreshOpenAICodexSnapshot(t *testing.T) { + t.Parallel() + + rateLimitedUntil := time.Now().Add(5 * time.Minute) + now := time.Now() + usage := &UsageInfo{ + FiveHour: &UsageProgress{Utilization: 0}, + SevenDay: &UsageProgress{Utilization: 0}, + } + + if !shouldRefreshOpenAICodexSnapshot(&Account{RateLimitResetAt: &rateLimitedUntil}, usage, now) { + t.Fatal("expected rate-limited account to force codex snapshot refresh") + } + + if shouldRefreshOpenAICodexSnapshot(&Account{}, usage, now) { + t.Fatal("expected complete non-rate-limited usage to skip codex snapshot refresh") + } + + if !shouldRefreshOpenAICodexSnapshot(&Account{}, &UsageInfo{FiveHour: nil, SevenDay: &UsageProgress{}}, now) { + t.Fatal("expected missing 5h snapshot to require refresh") + } + + staleAt := now.Add(-(openAIProbeCacheTTL + time.Minute)).Format(time.RFC3339) + if !shouldRefreshOpenAICodexSnapshot(&Account{ + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Extra: map[string]any{ + "openai_oauth_responses_websockets_v2_enabled": true, + "codex_usage_updated_at": staleAt, + }, + }, usage, now) { + t.Fatal("expected stale ws snapshot to trigger refresh") + } +} + +func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T) { + t.Parallel() + + headers := make(http.Header) + headers.Set("x-codex-primary-used-percent", "100") + headers.Set("x-codex-primary-reset-after-seconds", "604800") + headers.Set("x-codex-primary-window-minutes", "10080") + headers.Set("x-codex-secondary-used-percent", "100") + headers.Set("x-codex-secondary-reset-after-seconds", "18000") + headers.Set("x-codex-secondary-window-minutes", "300") + + updates, err := extractOpenAICodexProbeUpdates(&http.Response{StatusCode: http.StatusTooManyRequests, Header: headers}) + if err != nil { + t.Fatalf("extractOpenAICodexProbeUpdates() error = %v", err) + } + if len(updates) == 0 { + t.Fatal("expected codex probe updates from 429 headers") + } + if got := updates["codex_5h_used_percent"]; got != 100.0 { + t.Fatalf("codex_5h_used_percent = %v, want 100", got) + } + if got := updates["codex_7d_used_percent"]; got != 100.0 { + t.Fatalf("codex_7d_used_percent = %v, want 100", got) + } +} diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index f8f3154b..60ad99d0 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -615,6 +615,7 @@ func (s *RateLimitService) handleCustomErrorCode(ctx context.Context, account *A func (s *RateLimitService) handle429(ctx context.Context, account *Account, headers http.Header, responseBody []byte) { // 1. OpenAI 平台:优先尝试解析 x-codex-* 响应头(用于 rate_limit_exceeded) if account.Platform == PlatformOpenAI { + s.persistOpenAICodexSnapshot(ctx, account, headers) if resetAt := s.calculateOpenAI429ResetTime(headers); resetAt != nil { if err := s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt); err != nil { slog.Warn("rate_limit_set_failed", "account_id", account.ID, "error", err) @@ -878,6 +879,23 @@ func pickSooner(a, b *time.Time) *time.Time { } } +func (s *RateLimitService) persistOpenAICodexSnapshot(ctx context.Context, account *Account, headers http.Header) { + if s == nil || s.accountRepo == nil || account == nil || headers == nil { + return + } + snapshot := ParseCodexRateLimitHeaders(headers) + if snapshot == nil { + return + } + updates := buildCodexUsageExtraUpdates(snapshot, time.Now()) + if len(updates) == 0 { + return + } + if err := s.accountRepo.UpdateExtra(ctx, account.ID, updates); err != nil { + slog.Warn("openai_codex_snapshot_persist_failed", "account_id", account.ID, "error", err) + } +} + // parseOpenAIRateLimitResetTime 解析 OpenAI 格式的 429 响应,返回重置时间的 Unix 时间戳 // OpenAI 的 usage_limit_reached 错误格式: // diff --git a/backend/internal/service/ratelimit_service_openai_test.go b/backend/internal/service/ratelimit_service_openai_test.go index 00902068..94b9a170 100644 --- a/backend/internal/service/ratelimit_service_openai_test.go +++ b/backend/internal/service/ratelimit_service_openai_test.go @@ -1,6 +1,7 @@ package service import ( + "context" "net/http" "testing" "time" @@ -141,6 +142,51 @@ func TestCalculateOpenAI429ResetTime_ReversedWindowOrder(t *testing.T) { } } +type openAI429SnapshotRepo struct { + mockAccountRepoForGemini + rateLimitedID int64 + updatedExtra map[string]any +} + +func (r *openAI429SnapshotRepo) SetRateLimited(_ context.Context, id int64, _ time.Time) error { + r.rateLimitedID = id + return nil +} + +func (r *openAI429SnapshotRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error { + r.updatedExtra = updates + return nil +} + +func TestHandle429_OpenAIPersistsCodexSnapshotImmediately(t *testing.T) { + repo := &openAI429SnapshotRepo{} + svc := NewRateLimitService(repo, nil, nil, nil, nil) + account := &Account{ID: 123, Platform: PlatformOpenAI, Type: AccountTypeOAuth} + + headers := http.Header{} + headers.Set("x-codex-primary-used-percent", "100") + headers.Set("x-codex-primary-reset-after-seconds", "604800") + headers.Set("x-codex-primary-window-minutes", "10080") + headers.Set("x-codex-secondary-used-percent", "100") + headers.Set("x-codex-secondary-reset-after-seconds", "18000") + headers.Set("x-codex-secondary-window-minutes", "300") + + svc.handle429(context.Background(), account, headers, nil) + + if repo.rateLimitedID != account.ID { + t.Fatalf("rateLimitedID = %d, want %d", repo.rateLimitedID, account.ID) + } + if len(repo.updatedExtra) == 0 { + t.Fatal("expected codex snapshot to be persisted on 429") + } + if got := repo.updatedExtra["codex_5h_used_percent"]; got != 100.0 { + t.Fatalf("codex_5h_used_percent = %v, want 100", got) + } + if got := repo.updatedExtra["codex_7d_used_percent"]; got != 100.0 { + t.Fatalf("codex_7d_used_percent = %v, want 100", got) + } +} + func TestNormalizedCodexLimits(t *testing.T) { // Test the Normalize() method directly pUsed := 100.0 diff --git a/frontend/src/components/account/AccountUsageCell.vue b/frontend/src/components/account/AccountUsageCell.vue index 20b4b629..44c8e209 100644 --- a/frontend/src/components/account/AccountUsageCell.vue +++ b/frontend/src/components/account/AccountUsageCell.vue @@ -69,9 +69,39 @@