From 101ef0cf6213ebf693677d677c2a2ef2f3f917f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E4=B9=90?= <6682635@qq.com> Date: Sat, 7 Mar 2026 20:59:17 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=99=90=E6=B5=81=E8=B4=A6=E5=8F=B7?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E9=80=80=E5=87=BA=E8=B0=83=E5=BA=A6=E5=B9=B6?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8F=90=E7=A4=BA=E6=96=87=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/internal/repository/account_repo.go | 2 + .../service/openai_account_scheduler.go | 39 ++++++---- .../service/openai_account_scheduler_test.go | 72 ++++++++++++++++++ .../service/openai_gateway_service.go | 74 +++++++++++++------ .../service/openai_ws_account_sticky_test.go | 37 ++++++++++ .../internal/service/openai_ws_forwarder.go | 2 +- .../account/AccountStatusIndicator.vue | 19 +++-- frontend/src/i18n/locales/en.ts | 3 +- frontend/src/i18n/locales/zh.ts | 3 +- 9 files changed, 204 insertions(+), 47 deletions(-) diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index ffbfd466..0e56ebd1 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -925,6 +925,7 @@ func (r *accountRepository) SetRateLimited(ctx context.Context, id int64, resetA if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue rate limit failed: account=%d err=%v", id, err) } + r.syncSchedulerAccountSnapshot(ctx, id) return nil } @@ -1040,6 +1041,7 @@ func (r *accountRepository) ClearRateLimit(ctx context.Context, id int64) error if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue clear rate limit failed: account=%d err=%v", id, err) } + r.syncSchedulerAccountSnapshot(ctx, id) return nil } diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index cf4bc26e..0fcf450b 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -319,7 +319,7 @@ func (s *defaultOpenAIAccountScheduler) selectBySessionHash( _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) return nil, nil } - if shouldClearStickySession(account, req.RequestedModel) || !account.IsOpenAI() { + if shouldClearStickySession(account, req.RequestedModel) || !account.IsOpenAI() || !account.IsSchedulable() { _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) return nil, nil } @@ -687,16 +687,20 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( for i := 0; i < len(selectionOrder); i++ { candidate := selectionOrder[i] - result, acquireErr := s.service.tryAcquireAccountSlot(ctx, candidate.account.ID, candidate.account.Concurrency) + fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, candidate.account, req.RequestedModel) + if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { + continue + } + result, acquireErr := s.service.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency) if acquireErr != nil { return nil, len(candidates), topK, loadSkew, acquireErr } if result != nil && result.Acquired { if req.SessionHash != "" { - _ = s.service.BindStickySession(ctx, req.GroupID, req.SessionHash, candidate.account.ID) + _ = s.service.BindStickySession(ctx, req.GroupID, req.SessionHash, fresh.ID) } return &AccountSelectionResult{ - Account: candidate.account, + Account: fresh, Acquired: true, ReleaseFunc: result.ReleaseFunc, }, len(candidates), topK, loadSkew, nil @@ -705,16 +709,23 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( cfg := s.service.schedulingConfig() // WaitPlan.MaxConcurrency 使用 Concurrency(非 EffectiveLoadFactor),因为 WaitPlan 控制的是 Redis 实际并发槽位等待。 - candidate := selectionOrder[0] - return &AccountSelectionResult{ - Account: candidate.account, - WaitPlan: &AccountWaitPlan{ - AccountID: candidate.account.ID, - MaxConcurrency: candidate.account.Concurrency, - Timeout: cfg.FallbackWaitTimeout, - MaxWaiting: cfg.FallbackMaxWaiting, - }, - }, len(candidates), topK, loadSkew, nil + for _, candidate := range selectionOrder { + fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, candidate.account, req.RequestedModel) + if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { + continue + } + return &AccountSelectionResult{ + Account: fresh, + WaitPlan: &AccountWaitPlan{ + AccountID: fresh.ID, + MaxConcurrency: fresh.Concurrency, + Timeout: cfg.FallbackWaitTimeout, + MaxWaiting: cfg.FallbackMaxWaiting, + }, + }, len(candidates), topK, loadSkew, nil + } + + return nil, len(candidates), topK, loadSkew, errors.New("no available accounts") } func (s *defaultOpenAIAccountScheduler) isAccountTransportCompatible(account *Account, requiredTransport OpenAIUpstreamTransport) bool { diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 7f6f1b66..977c4ee8 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -12,6 +12,78 @@ import ( "github.com/stretchr/testify/require" ) +type openAISnapshotCacheStub struct { + SchedulerCache + snapshotAccounts []*Account + accountsByID map[int64]*Account +} + +func (s *openAISnapshotCacheStub) GetSnapshot(ctx context.Context, bucket SchedulerBucket) ([]*Account, bool, error) { + if len(s.snapshotAccounts) == 0 { + return nil, false, nil + } + out := make([]*Account, 0, len(s.snapshotAccounts)) + for _, account := range s.snapshotAccounts { + if account == nil { + continue + } + cloned := *account + out = append(out, &cloned) + } + return out, true, nil +} + +func (s *openAISnapshotCacheStub) GetAccount(ctx context.Context, accountID int64) (*Account, error) { + if s.accountsByID == nil { + return nil, nil + } + account := s.accountsByID[accountID] + if account == nil { + return nil, nil + } + cloned := *account + return &cloned, nil +} + +func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyRateLimitedAccountFallsBackToFreshCandidate(t *testing.T) { + ctx := context.Background() + groupID := int64(10101) + rateLimitedUntil := time.Now().Add(30 * time.Minute) + staleSticky := &Account{ID: 31001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0} + staleBackup := &Account{ID: 31002, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + freshSticky := &Account{ID: 31001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0, RateLimitResetAt: &rateLimitedUntil} + freshBackup := &Account{ID: 31002, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + cache := &stubGatewayCache{sessionBindings: map[string]int64{"openai:session_hash_rate_limited": 31001}} + snapshotCache := &openAISnapshotCacheStub{snapshotAccounts: []*Account{staleSticky, staleBackup}, accountsByID: map[int64]*Account{31001: freshSticky, 31002: freshBackup}} + snapshotService := &SchedulerSnapshotService{cache: snapshotCache} + svc := &OpenAIGatewayService{accountRepo: stubOpenAIAccountRepo{accounts: []Account{*freshSticky, *freshBackup}}, cache: cache, cfg: &config.Config{}, schedulerSnapshot: snapshotService, concurrencyService: NewConcurrencyService(stubConcurrencyCache{})} + + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_rate_limited", "gpt-5.1", nil, OpenAIUpstreamTransportAny) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(31002), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) +} + +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_SkipsFreshlyRateLimitedSnapshotCandidate(t *testing.T) { + ctx := context.Background() + groupID := int64(10102) + rateLimitedUntil := time.Now().Add(30 * time.Minute) + stalePrimary := &Account{ID: 32001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0} + staleSecondary := &Account{ID: 32002, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + freshPrimary := &Account{ID: 32001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0, RateLimitResetAt: &rateLimitedUntil} + freshSecondary := &Account{ID: 32002, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + snapshotCache := &openAISnapshotCacheStub{snapshotAccounts: []*Account{stalePrimary, staleSecondary}, accountsByID: map[int64]*Account{32001: freshPrimary, 32002: freshSecondary}} + snapshotService := &SchedulerSnapshotService{cache: snapshotCache} + svc := &OpenAIGatewayService{accountRepo: stubOpenAIAccountRepo{accounts: []Account{*freshPrimary, *freshSecondary}}, cfg: &config.Config{}, schedulerSnapshot: snapshotService} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, &groupID, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(32002), account.ID) +} + func TestOpenAIGatewayService_SelectAccountWithScheduler_PreviousResponseSticky(t *testing.T) { ctx := context.Background() groupID := int64(9) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 5c8c2710..e642ff60 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1026,7 +1026,7 @@ func (s *OpenAIGatewayService) selectAccountForModelWithExclusions(ctx context.C // 3. 按优先级 + LRU 选择最佳账号 // Select by priority + LRU - selected := s.selectBestAccount(accounts, requestedModel, excludedIDs) + selected := s.selectBestAccount(ctx, accounts, requestedModel, excludedIDs) if selected == nil { if requestedModel != "" { @@ -1099,7 +1099,7 @@ func (s *OpenAIGatewayService) tryStickySessionHit(ctx context.Context, groupID // // selectBestAccount selects the best account from candidates (priority + LRU). // Returns nil if no available account. -func (s *OpenAIGatewayService) selectBestAccount(accounts []Account, requestedModel string, excludedIDs map[int64]struct{}) *Account { +func (s *OpenAIGatewayService) selectBestAccount(ctx context.Context, accounts []Account, requestedModel string, excludedIDs map[int64]struct{}) *Account { var selected *Account for i := range accounts { @@ -1111,27 +1111,20 @@ func (s *OpenAIGatewayService) selectBestAccount(accounts []Account, requestedMo continue } - // 调度器快照可能暂时过时,这里重新检查可调度性和平台 - // Scheduler snapshots can be temporarily stale; re-check schedulability and platform - if !acc.IsSchedulable() || !acc.IsOpenAI() { - continue - } - - // 检查模型支持 - // Check model support - if requestedModel != "" && !acc.IsModelSupported(requestedModel) { + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel) + if fresh == nil { continue } // 选择优先级最高且最久未使用的账号 // Select highest priority and least recently used if selected == nil { - selected = acc + selected = fresh continue } - if s.isBetterAccount(acc, selected) { - selected = acc + if s.isBetterAccount(fresh, selected) { + selected = fresh } } @@ -1309,13 +1302,17 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex ordered := append([]*Account(nil), candidates...) sortAccountsByPriorityAndLastUsed(ordered, false) for _, acc := range ordered { - result, err := s.tryAcquireAccountSlot(ctx, acc.ID, acc.Concurrency) + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel) + if fresh == nil { + continue + } + result, err := s.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency) if err == nil && result.Acquired { if sessionHash != "" { - _ = s.setStickySessionAccountID(ctx, groupID, sessionHash, acc.ID, openaiStickySessionTTL) + _ = s.setStickySessionAccountID(ctx, groupID, sessionHash, fresh.ID, openaiStickySessionTTL) } return &AccountSelectionResult{ - Account: acc, + Account: fresh, Acquired: true, ReleaseFunc: result.ReleaseFunc, }, nil @@ -1359,13 +1356,17 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex shuffleWithinSortGroups(available) for _, item := range available { - result, err := s.tryAcquireAccountSlot(ctx, item.account.ID, item.account.Concurrency) + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, item.account, requestedModel) + if fresh == nil { + continue + } + result, err := s.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency) if err == nil && result.Acquired { if sessionHash != "" { - _ = s.setStickySessionAccountID(ctx, groupID, sessionHash, item.account.ID, openaiStickySessionTTL) + _ = s.setStickySessionAccountID(ctx, groupID, sessionHash, fresh.ID, openaiStickySessionTTL) } return &AccountSelectionResult{ - Account: item.account, + Account: fresh, Acquired: true, ReleaseFunc: result.ReleaseFunc, }, nil @@ -1377,11 +1378,15 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex // ============ Layer 3: Fallback wait ============ sortAccountsByPriorityAndLastUsed(candidates, false) for _, acc := range candidates { + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel) + if fresh == nil { + continue + } return &AccountSelectionResult{ - Account: acc, + Account: fresh, WaitPlan: &AccountWaitPlan{ - AccountID: acc.ID, - MaxConcurrency: acc.Concurrency, + AccountID: fresh.ID, + MaxConcurrency: fresh.Concurrency, Timeout: cfg.FallbackWaitTimeout, MaxWaiting: cfg.FallbackMaxWaiting, }, @@ -1418,6 +1423,29 @@ func (s *OpenAIGatewayService) tryAcquireAccountSlot(ctx context.Context, accoun return s.concurrencyService.AcquireAccountSlot(ctx, accountID, maxConcurrency) } +func (s *OpenAIGatewayService) resolveFreshSchedulableOpenAIAccount(ctx context.Context, account *Account, requestedModel string) *Account { + if account == nil { + return nil + } + + fresh := account + if s.schedulerSnapshot != nil { + current, err := s.getSchedulableAccount(ctx, account.ID) + if err != nil || current == nil { + return nil + } + fresh = current + } + + if !fresh.IsSchedulable() || !fresh.IsOpenAI() { + return nil + } + if requestedModel != "" && !fresh.IsModelSupported(requestedModel) { + return nil + } + return fresh +} + func (s *OpenAIGatewayService) getSchedulableAccount(ctx context.Context, accountID int64) (*Account, error) { if s.schedulerSnapshot != nil { return s.schedulerSnapshot.GetAccount(ctx, accountID) diff --git a/backend/internal/service/openai_ws_account_sticky_test.go b/backend/internal/service/openai_ws_account_sticky_test.go index 3fe08179..9a8803d3 100644 --- a/backend/internal/service/openai_ws_account_sticky_test.go +++ b/backend/internal/service/openai_ws_account_sticky_test.go @@ -48,6 +48,43 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_Hit(t *testing.T } } +func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_RateLimitedMiss(t *testing.T) { + ctx := context.Background() + groupID := int64(23) + rateLimitedUntil := time.Now().Add(30 * time.Minute) + account := Account{ + ID: 12, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + RateLimitResetAt: &rateLimitedUntil, + Extra: map[string]any{ + "openai_apikey_responses_websockets_v2_enabled": true, + }, + } + cache := &stubGatewayCache{} + store := NewOpenAIWSStateStore(cache) + cfg := newOpenAIWSV2TestConfig() + svc := &OpenAIGatewayService{ + accountRepo: stubOpenAIAccountRepo{accounts: []Account{account}}, + cache: cache, + cfg: cfg, + concurrencyService: NewConcurrencyService(stubConcurrencyCache{}), + openaiWSStateStore: store, + } + + require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_rl", account.ID, time.Hour)) + + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_rl", "gpt-5.1", nil) + require.NoError(t, err) + require.Nil(t, selection, "限额中的账号不应继续命中 previous_response_id 粘连") + boundAccountID, getErr := store.GetResponseAccount(ctx, groupID, "resp_prev_rl") + require.NoError(t, getErr) + require.Zero(t, boundAccountID) +} + func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_Excluded(t *testing.T) { ctx := context.Background() groupID := int64(23) diff --git a/backend/internal/service/openai_ws_forwarder.go b/backend/internal/service/openai_ws_forwarder.go index f9e93f85..36bf8ff8 100644 --- a/backend/internal/service/openai_ws_forwarder.go +++ b/backend/internal/service/openai_ws_forwarder.go @@ -3798,7 +3798,7 @@ func (s *OpenAIGatewayService) SelectAccountByPreviousResponseID( if s.getOpenAIWSProtocolResolver().Resolve(account).Transport != OpenAIUpstreamTransportResponsesWebsocketV2 { return nil, nil } - if shouldClearStickySession(account, requestedModel) || !account.IsOpenAI() { + if shouldClearStickySession(account, requestedModel) || !account.IsOpenAI() || !account.IsSchedulable() { _ = store.DeleteResponseAccount(ctx, derefGroupID(groupID), responseID) return nil, nil } diff --git a/frontend/src/components/account/AccountStatusIndicator.vue b/frontend/src/components/account/AccountStatusIndicator.vue index e8331c25..1dc4f287 100644 --- a/frontend/src/components/account/AccountStatusIndicator.vue +++ b/frontend/src/components/account/AccountStatusIndicator.vue @@ -3,7 +3,7 @@