fix: 修复 OpenAI WS 用量窗口刷新与限额纠偏

This commit is contained in:
神乐
2026-03-07 20:02:58 +08:00
parent bcb6444f89
commit 0debe0a80c
12 changed files with 568 additions and 70 deletions

View File

@@ -408,6 +408,16 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
if isOAuth && s.accountRepo != nil {
if updates, err := extractOpenAICodexProbeUpdates(resp); err == nil && len(updates) > 0 {
_ = s.accountRepo.UpdateExtra(ctx, account.ID, updates)
mergeAccountExtra(account, updates)
}
if resetAt := (&RateLimitService{}).calculateOpenAI429ResetTime(resp.Header); resetAt != nil {
_ = s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt)
account.RateLimitResetAt = resetAt
}
}
return s.sendErrorAndEnd(c, fmt.Sprintf("API returned %d: %s", resp.StatusCode, string(body)))
}

View File

@@ -0,0 +1,66 @@
//go:build unit
package service
import (
"context"
"net/http"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
type openAIAccountTestRepo struct {
mockAccountRepoForGemini
updatedExtra map[string]any
rateLimitedID int64
rateLimitedAt *time.Time
}
func (r *openAIAccountTestRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error {
r.updatedExtra = updates
return nil
}
func (r *openAIAccountTestRepo) SetRateLimited(_ context.Context, id int64, resetAt time.Time) error {
r.rateLimitedID = id
r.rateLimitedAt = &resetAt
return nil
}
func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimit(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx, _ := newSoraTestContext()
resp := newJSONResponse(http.StatusTooManyRequests, `{"error":{"type":"usage_limit_reached","message":"limit reached"}}`)
resp.Header.Set("x-codex-primary-used-percent", "100")
resp.Header.Set("x-codex-primary-reset-after-seconds", "604800")
resp.Header.Set("x-codex-primary-window-minutes", "10080")
resp.Header.Set("x-codex-secondary-used-percent", "100")
resp.Header.Set("x-codex-secondary-reset-after-seconds", "18000")
resp.Header.Set("x-codex-secondary-window-minutes", "300")
repo := &openAIAccountTestRepo{}
upstream := &queuedHTTPUpstream{responses: []*http.Response{resp}}
svc := &AccountTestService{accountRepo: repo, httpUpstream: upstream}
account := &Account{
ID: 88,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{"access_token": "test-token"},
}
err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4")
require.Error(t, err)
require.NotEmpty(t, repo.updatedExtra)
require.Equal(t, 100.0, repo.updatedExtra["codex_5h_used_percent"])
require.Equal(t, int64(88), repo.rateLimitedID)
require.NotNil(t, repo.rateLimitedAt)
require.NotNil(t, account.RateLimitResetAt)
if account.RateLimitResetAt != nil && repo.rateLimitedAt != nil {
require.WithinDuration(t, *repo.rateLimitedAt, *account.RateLimitResetAt, time.Second)
}
}

View File

@@ -367,7 +367,7 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
usage.SevenDay = progress
}
if (usage.FiveHour == nil || usage.SevenDay == nil) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) {
if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) {
if updates, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && len(updates) > 0 {
mergeAccountExtra(account, updates)
if usage.UpdatedAt == nil {
@@ -409,6 +409,40 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
return usage, nil
}
func shouldRefreshOpenAICodexSnapshot(account *Account, usage *UsageInfo, now time.Time) bool {
if account == nil {
return false
}
if usage == nil {
return true
}
if usage.FiveHour == nil || usage.SevenDay == nil {
return true
}
if account.IsRateLimited() {
return true
}
return isOpenAICodexSnapshotStale(account, now)
}
func isOpenAICodexSnapshotStale(account *Account, now time.Time) bool {
if account == nil || !account.IsOpenAIOAuth() || !account.IsOpenAIResponsesWebSocketV2Enabled() {
return false
}
if account.Extra == nil {
return true
}
raw, ok := account.Extra["codex_usage_updated_at"]
if !ok {
return true
}
ts, err := parseTime(fmt.Sprint(raw))
if err != nil {
return true
}
return now.Sub(ts) >= openAIProbeCacheTTL
}
func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, now time.Time) bool {
if s == nil || s.cache == nil || accountID <= 0 {
return true
@@ -478,20 +512,34 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode)
updates, err := extractOpenAICodexProbeUpdates(resp)
if err != nil {
return nil, err
}
if len(updates) > 0 {
go func(accountID int64, updates map[string]any) {
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer updateCancel()
_ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
}(account.ID, updates)
return updates, nil
}
return nil, nil
}
func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) {
if resp == nil {
return nil, nil
}
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
updates := buildCodexUsageExtraUpdates(snapshot, time.Now())
if len(updates) > 0 {
go func(accountID int64, updates map[string]any) {
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer updateCancel()
_ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
}(account.ID, updates)
return updates, nil
}
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode)
}
return nil, nil
}

View File

@@ -0,0 +1,68 @@
package service
import (
"net/http"
"testing"
"time"
)
func TestShouldRefreshOpenAICodexSnapshot(t *testing.T) {
t.Parallel()
rateLimitedUntil := time.Now().Add(5 * time.Minute)
now := time.Now()
usage := &UsageInfo{
FiveHour: &UsageProgress{Utilization: 0},
SevenDay: &UsageProgress{Utilization: 0},
}
if !shouldRefreshOpenAICodexSnapshot(&Account{RateLimitResetAt: &rateLimitedUntil}, usage, now) {
t.Fatal("expected rate-limited account to force codex snapshot refresh")
}
if shouldRefreshOpenAICodexSnapshot(&Account{}, usage, now) {
t.Fatal("expected complete non-rate-limited usage to skip codex snapshot refresh")
}
if !shouldRefreshOpenAICodexSnapshot(&Account{}, &UsageInfo{FiveHour: nil, SevenDay: &UsageProgress{}}, now) {
t.Fatal("expected missing 5h snapshot to require refresh")
}
staleAt := now.Add(-(openAIProbeCacheTTL + time.Minute)).Format(time.RFC3339)
if !shouldRefreshOpenAICodexSnapshot(&Account{
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Extra: map[string]any{
"openai_oauth_responses_websockets_v2_enabled": true,
"codex_usage_updated_at": staleAt,
},
}, usage, now) {
t.Fatal("expected stale ws snapshot to trigger refresh")
}
}
func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T) {
t.Parallel()
headers := make(http.Header)
headers.Set("x-codex-primary-used-percent", "100")
headers.Set("x-codex-primary-reset-after-seconds", "604800")
headers.Set("x-codex-primary-window-minutes", "10080")
headers.Set("x-codex-secondary-used-percent", "100")
headers.Set("x-codex-secondary-reset-after-seconds", "18000")
headers.Set("x-codex-secondary-window-minutes", "300")
updates, err := extractOpenAICodexProbeUpdates(&http.Response{StatusCode: http.StatusTooManyRequests, Header: headers})
if err != nil {
t.Fatalf("extractOpenAICodexProbeUpdates() error = %v", err)
}
if len(updates) == 0 {
t.Fatal("expected codex probe updates from 429 headers")
}
if got := updates["codex_5h_used_percent"]; got != 100.0 {
t.Fatalf("codex_5h_used_percent = %v, want 100", got)
}
if got := updates["codex_7d_used_percent"]; got != 100.0 {
t.Fatalf("codex_7d_used_percent = %v, want 100", got)
}
}

View File

@@ -615,6 +615,7 @@ func (s *RateLimitService) handleCustomErrorCode(ctx context.Context, account *A
func (s *RateLimitService) handle429(ctx context.Context, account *Account, headers http.Header, responseBody []byte) {
// 1. OpenAI 平台:优先尝试解析 x-codex-* 响应头(用于 rate_limit_exceeded
if account.Platform == PlatformOpenAI {
s.persistOpenAICodexSnapshot(ctx, account, headers)
if resetAt := s.calculateOpenAI429ResetTime(headers); resetAt != nil {
if err := s.accountRepo.SetRateLimited(ctx, account.ID, *resetAt); err != nil {
slog.Warn("rate_limit_set_failed", "account_id", account.ID, "error", err)
@@ -878,6 +879,23 @@ func pickSooner(a, b *time.Time) *time.Time {
}
}
func (s *RateLimitService) persistOpenAICodexSnapshot(ctx context.Context, account *Account, headers http.Header) {
if s == nil || s.accountRepo == nil || account == nil || headers == nil {
return
}
snapshot := ParseCodexRateLimitHeaders(headers)
if snapshot == nil {
return
}
updates := buildCodexUsageExtraUpdates(snapshot, time.Now())
if len(updates) == 0 {
return
}
if err := s.accountRepo.UpdateExtra(ctx, account.ID, updates); err != nil {
slog.Warn("openai_codex_snapshot_persist_failed", "account_id", account.ID, "error", err)
}
}
// parseOpenAIRateLimitResetTime 解析 OpenAI 格式的 429 响应,返回重置时间的 Unix 时间戳
// OpenAI 的 usage_limit_reached 错误格式:
//

View File

@@ -1,6 +1,7 @@
package service
import (
"context"
"net/http"
"testing"
"time"
@@ -141,6 +142,51 @@ func TestCalculateOpenAI429ResetTime_ReversedWindowOrder(t *testing.T) {
}
}
type openAI429SnapshotRepo struct {
mockAccountRepoForGemini
rateLimitedID int64
updatedExtra map[string]any
}
func (r *openAI429SnapshotRepo) SetRateLimited(_ context.Context, id int64, _ time.Time) error {
r.rateLimitedID = id
return nil
}
func (r *openAI429SnapshotRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error {
r.updatedExtra = updates
return nil
}
func TestHandle429_OpenAIPersistsCodexSnapshotImmediately(t *testing.T) {
repo := &openAI429SnapshotRepo{}
svc := NewRateLimitService(repo, nil, nil, nil, nil)
account := &Account{ID: 123, Platform: PlatformOpenAI, Type: AccountTypeOAuth}
headers := http.Header{}
headers.Set("x-codex-primary-used-percent", "100")
headers.Set("x-codex-primary-reset-after-seconds", "604800")
headers.Set("x-codex-primary-window-minutes", "10080")
headers.Set("x-codex-secondary-used-percent", "100")
headers.Set("x-codex-secondary-reset-after-seconds", "18000")
headers.Set("x-codex-secondary-window-minutes", "300")
svc.handle429(context.Background(), account, headers, nil)
if repo.rateLimitedID != account.ID {
t.Fatalf("rateLimitedID = %d, want %d", repo.rateLimitedID, account.ID)
}
if len(repo.updatedExtra) == 0 {
t.Fatal("expected codex snapshot to be persisted on 429")
}
if got := repo.updatedExtra["codex_5h_used_percent"]; got != 100.0 {
t.Fatalf("codex_5h_used_percent = %v, want 100", got)
}
if got := repo.updatedExtra["codex_7d_used_percent"]; got != 100.0 {
t.Fatalf("codex_7d_used_percent = %v, want 100", got)
}
}
func TestNormalizedCodexLimits(t *testing.T) {
// Test the Normalize() method directly
pUsed := 100.0