Merge pull request #858 from james-6-23/fix/pool-mode-03bf3485

支持 API Key 上游池模式的同账号重试次数配置与自定义错误策略
This commit is contained in:
Wesley Liddick
2026-03-09 08:48:53 +08:00
committed by GitHub
13 changed files with 558 additions and 40 deletions

View File

@@ -30,7 +30,7 @@ const (
const ( const (
// maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误) // maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误)
maxSameAccountRetries = 2 maxSameAccountRetries = 3
// sameAccountRetryDelay 同账号重试间隔 // sameAccountRetryDelay 同账号重试间隔
sameAccountRetryDelay = 500 * time.Millisecond sameAccountRetryDelay = 500 * time.Millisecond
// singleAccountBackoffDelay 单账号分组 503 退避重试固定延时。 // singleAccountBackoffDelay 单账号分组 503 退避重试固定延时。

View File

@@ -291,35 +291,31 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
require.Less(t, elapsed, 2*time.Second) require.Less(t, elapsed, 2*time.Second)
}) })
t.Run("第二次重试仍返回FailoverContinue", func(t *testing.T) { t.Run("达到最大重试次数前均返回FailoverContinue", func(t *testing.T) {
mock := &mockTempUnscheduler{} mock := &mockTempUnscheduler{}
fs := NewFailoverState(3, false) fs := NewFailoverState(3, false)
err := newTestFailoverErr(400, true, false) err := newTestFailoverErr(400, true, false)
// 第一次 for i := 1; i <= maxSameAccountRetries; i++ {
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverContinue, action) require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SameAccountRetryCount[100]) require.Equal(t, i, fs.SameAccountRetryCount[100])
}
// 第二次 require.Empty(t, mock.calls, "达到最大重试次数前均不应调用 TempUnschedule")
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 2, fs.SameAccountRetryCount[100])
require.Empty(t, mock.calls, "两次重试期间均不应调用 TempUnschedule")
}) })
t.Run("第三次重试耗尽_触发TempUnschedule并切换", func(t *testing.T) { t.Run("超过最大重试次数后触发TempUnschedule并切换", func(t *testing.T) {
mock := &mockTempUnscheduler{} mock := &mockTempUnscheduler{}
fs := NewFailoverState(3, false) fs := NewFailoverState(3, false)
err := newTestFailoverErr(400, true, false) err := newTestFailoverErr(400, true, false)
// 第一次、第二次重试 for i := 0; i < maxSameAccountRetries; i++ {
fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) }
require.Equal(t, 2, fs.SameAccountRetryCount[100]) require.Equal(t, maxSameAccountRetries, fs.SameAccountRetryCount[100])
// 第三次:重试已达到 maxSameAccountRetries(2),应切换账号 // 第 maxSameAccountRetries+1 次:重试耗尽,应切换账号
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverContinue, action) require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SwitchCount) require.Equal(t, 1, fs.SwitchCount)
@@ -354,13 +350,14 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
err := newTestFailoverErr(400, true, false) err := newTestFailoverErr(400, true, false)
// 耗尽账号 100 的重试 // 耗尽账号 100 的重试
fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) for i := 0; i < maxSameAccountRetries; i++ {
fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
// 第三次: 重试耗尽 → 切换 }
// 第 maxSameAccountRetries+1 次: 重试耗尽 → 切换
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverContinue, action) require.Equal(t, FailoverContinue, action)
// 再次遇到账号 100计数仍为 2,条件不满足 → 直接切换 // 再次遇到账号 100计数仍为 maxSameAccountRetries,条件不满足 → 直接切换
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverContinue, action) require.Equal(t, FailoverContinue, action)
require.Len(t, mock.calls, 2, "第二次耗尽也应调用 TempUnschedule") require.Len(t, mock.calls, 2, "第二次耗尽也应调用 TempUnschedule")
@@ -386,9 +383,10 @@ func TestHandleFailoverError_TempUnschedule(t *testing.T) {
fs := NewFailoverState(3, false) fs := NewFailoverState(3, false)
err := newTestFailoverErr(502, true, false) err := newTestFailoverErr(502, true, false)
// 耗尽重试 for i := 0; i < maxSameAccountRetries; i++ {
fs.HandleFailoverError(context.Background(), mock, 42, "openai", err) fs.HandleFailoverError(context.Background(), mock, 42, "openai", err)
fs.HandleFailoverError(context.Background(), mock, 42, "openai", err) }
// 再次触发时才会执行 TempUnschedule + 切换
fs.HandleFailoverError(context.Background(), mock, 42, "openai", err) fs.HandleFailoverError(context.Background(), mock, 42, "openai", err)
require.Len(t, mock.calls, 1) require.Len(t, mock.calls, 1)
@@ -521,17 +519,16 @@ func TestHandleFailoverError_IntegrationScenario(t *testing.T) {
mock := &mockTempUnscheduler{} mock := &mockTempUnscheduler{}
fs := NewFailoverState(3, true) // hasBoundSession=true fs := NewFailoverState(3, true) // hasBoundSession=true
// 1. 账号 100 遇到可重试错误,同账号重试 2 // 1. 账号 100 遇到可重试错误,同账号重试 maxSameAccountRetries
retryErr := newTestFailoverErr(400, true, false) retryErr := newTestFailoverErr(400, true, false)
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr) for i := 0; i < maxSameAccountRetries; i++ {
require.Equal(t, FailoverContinue, action) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr)
require.Equal(t, FailoverContinue, action)
}
require.True(t, fs.ForceCacheBilling, "hasBoundSession=true 应设置 ForceCacheBilling") require.True(t, fs.ForceCacheBilling, "hasBoundSession=true 应设置 ForceCacheBilling")
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr) // 2. 账号 100 超过重试上限 → TempUnschedule + 切换
require.Equal(t, FailoverContinue, action) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr)
// 2. 账号 100 重试耗尽 → TempUnschedule + 切换
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr)
require.Equal(t, FailoverContinue, action) require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SwitchCount) require.Equal(t, 1, fs.SwitchCount)
require.Len(t, mock.calls, 1) require.Len(t, mock.calls, 1)

View File

@@ -20,6 +20,7 @@ import (
coderws "github.com/coder/websocket" coderws "github.com/coder/websocket"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -212,6 +213,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
maxAccountSwitches := h.maxAccountSwitches maxAccountSwitches := h.maxAccountSwitches
switchCount := 0 switchCount := 0
failedAccountIDs := make(map[int64]struct{}) failedAccountIDs := make(map[int64]struct{})
sameAccountRetryCount := make(map[int64]int)
var lastFailoverErr *service.UpstreamFailoverError var lastFailoverErr *service.UpstreamFailoverError
for { for {
@@ -259,6 +261,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
zap.Float64("load_skew", scheduleDecision.LoadSkew), zap.Float64("load_skew", scheduleDecision.LoadSkew),
) )
account := selection.Account account := selection.Account
sessionHash = ensureOpenAIPoolModeSessionHash(sessionHash, account)
reqLog.Debug("openai.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name)) reqLog.Debug("openai.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name))
setOpsSelectedAccount(c, account.ID, account.Platform) setOpsSelectedAccount(c, account.ID, account.Platform)
@@ -288,6 +291,25 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
var failoverErr *service.UpstreamFailoverError var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) { if errors.As(err, &failoverErr) {
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil) h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
// 池模式:同账号重试
if failoverErr.RetryableOnSameAccount {
retryLimit := account.GetPoolModeRetryCount()
if sameAccountRetryCount[account.ID] < retryLimit {
sameAccountRetryCount[account.ID]++
reqLog.Warn("openai.pool_mode_same_account_retry",
zap.Int64("account_id", account.ID),
zap.Int("upstream_status", failoverErr.StatusCode),
zap.Int("retry_limit", retryLimit),
zap.Int("retry_count", sameAccountRetryCount[account.ID]),
)
select {
case <-c.Request.Context().Done():
return
case <-time.After(sameAccountRetryDelay):
}
continue
}
}
h.gatewayService.RecordOpenAIAccountSwitch() h.gatewayService.RecordOpenAIAccountSwitch()
failedAccountIDs[account.ID] = struct{}{} failedAccountIDs[account.ID] = struct{}{}
lastFailoverErr = failoverErr lastFailoverErr = failoverErr
@@ -541,6 +563,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
maxAccountSwitches := h.maxAccountSwitches maxAccountSwitches := h.maxAccountSwitches
switchCount := 0 switchCount := 0
failedAccountIDs := make(map[int64]struct{}) failedAccountIDs := make(map[int64]struct{})
sameAccountRetryCount := make(map[int64]int)
var lastFailoverErr *service.UpstreamFailoverError var lastFailoverErr *service.UpstreamFailoverError
for { for {
@@ -602,6 +625,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
return return
} }
account := selection.Account account := selection.Account
sessionHash = ensureOpenAIPoolModeSessionHash(sessionHash, account)
reqLog.Debug("openai_messages.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name)) reqLog.Debug("openai_messages.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name))
_ = scheduleDecision _ = scheduleDecision
setOpsSelectedAccount(c, account.ID, account.Platform) setOpsSelectedAccount(c, account.ID, account.Platform)
@@ -641,6 +665,25 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
var failoverErr *service.UpstreamFailoverError var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) { if errors.As(err, &failoverErr) {
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil) h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
// 池模式:同账号重试
if failoverErr.RetryableOnSameAccount {
retryLimit := account.GetPoolModeRetryCount()
if sameAccountRetryCount[account.ID] < retryLimit {
sameAccountRetryCount[account.ID]++
reqLog.Warn("openai_messages.pool_mode_same_account_retry",
zap.Int64("account_id", account.ID),
zap.Int("upstream_status", failoverErr.StatusCode),
zap.Int("retry_limit", retryLimit),
zap.Int("retry_count", sameAccountRetryCount[account.ID]),
)
select {
case <-c.Request.Context().Done():
return
case <-time.After(sameAccountRetryDelay):
}
continue
}
}
h.gatewayService.RecordOpenAIAccountSwitch() h.gatewayService.RecordOpenAIAccountSwitch()
failedAccountIDs[account.ID] = struct{}{} failedAccountIDs[account.ID] = struct{}{}
lastFailoverErr = failoverErr lastFailoverErr = failoverErr
@@ -1456,6 +1499,14 @@ func setOpenAIClientTransportWS(c *gin.Context) {
service.SetOpenAIClientTransport(c, service.OpenAIClientTransportWS) service.SetOpenAIClientTransport(c, service.OpenAIClientTransportWS)
} }
func ensureOpenAIPoolModeSessionHash(sessionHash string, account *service.Account) string {
if sessionHash != "" || account == nil || !account.IsPoolMode() {
return sessionHash
}
// 为当前请求生成一次性粘性会话键,确保同账号重试不会重新负载均衡到其他账号。
return "openai-pool-retry-" + uuid.NewString()
}
func openAIWSIngressFallbackSessionSeed(userID, apiKeyID int64, groupID *int64) string { func openAIWSIngressFallbackSessionSeed(userID, apiKeyID int64, groupID *int64) string {
gid := int64(0) gid := int64(0)
if groupID != nil { if groupID != nil {

View File

@@ -647,6 +647,75 @@ func (a *Account) IsCustomErrorCodesEnabled() bool {
return false return false
} }
// IsPoolMode 检查 API Key 账号是否启用池模式。
// 池模式下,上游错误不标记本地账号状态,而是在同一账号上重试。
func (a *Account) IsPoolMode() bool {
if a.Type != AccountTypeAPIKey || a.Credentials == nil {
return false
}
if v, ok := a.Credentials["pool_mode"]; ok {
if enabled, ok := v.(bool); ok {
return enabled
}
}
return false
}
const (
defaultPoolModeRetryCount = 3
maxPoolModeRetryCount = 10
)
// GetPoolModeRetryCount 返回池模式同账号重试次数。
// 未配置或配置非法时回退为默认值 3小于 0 按 0 处理;过大则截断到 10。
func (a *Account) GetPoolModeRetryCount() int {
if a == nil || !a.IsPoolMode() || a.Credentials == nil {
return defaultPoolModeRetryCount
}
raw, ok := a.Credentials["pool_mode_retry_count"]
if !ok || raw == nil {
return defaultPoolModeRetryCount
}
count := parsePoolModeRetryCount(raw)
if count < 0 {
return 0
}
if count > maxPoolModeRetryCount {
return maxPoolModeRetryCount
}
return count
}
func parsePoolModeRetryCount(value any) int {
switch v := value.(type) {
case int:
return v
case int64:
return int(v)
case float64:
return int(v)
case json.Number:
if i, err := v.Int64(); err == nil {
return int(i)
}
case string:
if i, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
return i
}
}
return defaultPoolModeRetryCount
}
// isPoolModeRetryableStatus 池模式下应触发同账号重试的状态码
func isPoolModeRetryableStatus(statusCode int) bool {
switch statusCode {
case 401, 403, 429:
return true
default:
return false
}
}
func (a *Account) GetCustomErrorCodes() []int { func (a *Account) GetCustomErrorCodes() []int {
if a.Credentials == nil { if a.Credentials == nil {
return nil return nil

View File

@@ -0,0 +1,117 @@
//go:build unit
package service
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/require"
)
func TestGetPoolModeRetryCount(t *testing.T) {
tests := []struct {
name string
account *Account
expected int
}{
{
name: "default_when_not_pool_mode",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{},
},
expected: defaultPoolModeRetryCount,
},
{
name: "default_when_missing_retry_count",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
},
},
expected: defaultPoolModeRetryCount,
},
{
name: "supports_float64_from_json_credentials",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"pool_mode_retry_count": float64(5),
},
},
expected: 5,
},
{
name: "supports_json_number",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"pool_mode_retry_count": json.Number("4"),
},
},
expected: 4,
},
{
name: "supports_string_value",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"pool_mode_retry_count": "2",
},
},
expected: 2,
},
{
name: "negative_value_is_clamped_to_zero",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"pool_mode_retry_count": -1,
},
},
expected: 0,
},
{
name: "oversized_value_is_clamped_to_max",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"pool_mode_retry_count": 99,
},
},
expected: maxPoolModeRetryCount,
},
{
name: "invalid_value_falls_back_to_default",
account: &Account{
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"pool_mode_retry_count": "oops",
},
},
expected: defaultPoolModeRetryCount,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, tt.account.GetPoolModeRetryCount())
})
}
}

View File

@@ -177,6 +177,36 @@ func TestCheckErrorPolicy(t *testing.T) {
body: []byte(`overloaded`), body: []byte(`overloaded`),
expected: ErrorPolicyMatched, // custom codes take precedence expected: ErrorPolicyMatched, // custom codes take precedence
}, },
{
name: "pool_mode_custom_error_codes_hit_returns_matched",
account: &Account{
ID: 7,
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"custom_error_codes_enabled": true,
"custom_error_codes": []any{float64(401), float64(403)},
},
},
statusCode: 401,
body: []byte(`unauthorized`),
expected: ErrorPolicyMatched,
},
{
name: "pool_mode_without_custom_error_codes_returns_skipped",
account: &Account{
ID: 8,
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
},
},
statusCode: 401,
body: []byte(`unauthorized`),
expected: ErrorPolicySkipped,
},
} }
for _, tt := range tests { for _, tt := range tests {
@@ -190,6 +220,48 @@ func TestCheckErrorPolicy(t *testing.T) {
} }
} }
func TestHandleUpstreamError_PoolModeCustomErrorCodesOverride(t *testing.T) {
t.Run("pool_mode_without_custom_error_codes_still_skips", func(t *testing.T) {
repo := &errorPolicyRepoStub{}
svc := NewRateLimitService(repo, nil, &config.Config{}, nil, nil)
account := &Account{
ID: 30,
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
},
}
shouldDisable := svc.HandleUpstreamError(context.Background(), account, 401, http.Header{}, []byte("unauthorized"))
require.False(t, shouldDisable)
require.Equal(t, 0, repo.setErrCalls)
require.Equal(t, 0, repo.tempCalls)
})
t.Run("pool_mode_with_custom_error_codes_uses_local_error_policy", func(t *testing.T) {
repo := &errorPolicyRepoStub{}
svc := NewRateLimitService(repo, nil, &config.Config{}, nil, nil)
account := &Account{
ID: 31,
Type: AccountTypeAPIKey,
Platform: PlatformOpenAI,
Credentials: map[string]any{
"pool_mode": true,
"custom_error_codes_enabled": true,
"custom_error_codes": []any{float64(401)},
},
}
shouldDisable := svc.HandleUpstreamError(context.Background(), account, 401, http.Header{}, []byte("unauthorized"))
require.True(t, shouldDisable)
require.Equal(t, 1, repo.setErrCalls)
require.Equal(t, 0, repo.tempCalls)
})
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// TestApplyErrorPolicy — 4 table-driven cases for the wrapper method // TestApplyErrorPolicy — 4 table-driven cases for the wrapper method
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@@ -4319,7 +4319,11 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
return "" return ""
}(), }(),
}) })
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
RetryableOnSameAccount: account.IsPoolMode() && isPoolModeRetryableStatus(resp.StatusCode),
}
} }
return s.handleRetryExhaustedError(ctx, resp, c, account) return s.handleRetryExhaustedError(ctx, resp, c, account)
} }
@@ -4349,7 +4353,11 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
return "" return ""
}(), }(),
}) })
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
RetryableOnSameAccount: account.IsPoolMode() && isPoolModeRetryableStatus(resp.StatusCode),
}
} }
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
// 可选:对部分 400 触发 failover默认关闭以保持语义 // 可选:对部分 400 触发 failover默认关闭以保持语义
@@ -4584,7 +4592,11 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthrough(
return "" return ""
}(), }(),
}) })
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
RetryableOnSameAccount: account.IsPoolMode() && isPoolModeRetryableStatus(resp.StatusCode),
}
} }
return s.handleRetryExhaustedError(ctx, resp, c, account) return s.handleRetryExhaustedError(ctx, resp, c, account)
} }
@@ -4614,7 +4626,11 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthrough(
return "" return ""
}(), }(),
}) })
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
RetryableOnSameAccount: account.IsPoolMode() && isPoolModeRetryableStatus(resp.StatusCode),
}
} }
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {

View File

@@ -2040,7 +2040,11 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
}) })
s.handleFailoverSideEffects(ctx, resp, account) s.handleFailoverSideEffects(ctx, resp, account)
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
RetryableOnSameAccount: account.IsPoolMode() && isPoolModeRetryableStatus(resp.StatusCode),
}
} }
return s.handleErrorResponse(ctx, resp, c, account, body) return s.handleErrorResponse(ctx, resp, c, account, body)
} }
@@ -2853,7 +2857,11 @@ func (s *OpenAIGatewayService) handleErrorResponse(
Detail: upstreamDetail, Detail: upstreamDetail,
}) })
if shouldDisable { if shouldDisable {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: body} return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: body,
RetryableOnSameAccount: account.IsPoolMode() && isPoolModeRetryableStatus(resp.StatusCode),
}
} }
// Return appropriate error response // Return appropriate error response

View File

@@ -98,6 +98,9 @@ func (s *RateLimitService) CheckErrorPolicy(ctx context.Context, account *Accoun
slog.Info("account_error_code_skipped", "account_id", account.ID, "status_code", statusCode) slog.Info("account_error_code_skipped", "account_id", account.ID, "status_code", statusCode)
return ErrorPolicySkipped return ErrorPolicySkipped
} }
if account.IsPoolMode() {
return ErrorPolicySkipped
}
if s.tryTempUnschedulable(ctx, account, statusCode, responseBody) { if s.tryTempUnschedulable(ctx, account, statusCode, responseBody) {
return ErrorPolicyTempUnscheduled return ErrorPolicyTempUnscheduled
} }
@@ -107,9 +110,16 @@ func (s *RateLimitService) CheckErrorPolicy(ctx context.Context, account *Accoun
// HandleUpstreamError 处理上游错误响应,标记账号状态 // HandleUpstreamError 处理上游错误响应,标记账号状态
// 返回是否应该停止该账号的调度 // 返回是否应该停止该账号的调度
func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Account, statusCode int, headers http.Header, responseBody []byte) (shouldDisable bool) { func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Account, statusCode int, headers http.Header, responseBody []byte) (shouldDisable bool) {
customErrorCodesEnabled := account.IsCustomErrorCodesEnabled()
// 池模式默认不标记本地账号状态;仅当用户显式配置自定义错误码时按本地策略处理。
if account.IsPoolMode() && !customErrorCodesEnabled {
slog.Info("pool_mode_error_skipped", "account_id", account.ID, "status_code", statusCode)
return false
}
// apikey 类型账号:检查自定义错误码配置 // apikey 类型账号:检查自定义错误码配置
// 如果启用且错误码不在列表中,则不处理(不停止调度、不标记限流/过载) // 如果启用且错误码不在列表中,则不处理(不停止调度、不标记限流/过载)
customErrorCodesEnabled := account.IsCustomErrorCodesEnabled()
if !account.ShouldHandleErrorCode(statusCode) { if !account.ShouldHandleErrorCode(statusCode) {
slog.Info("account_error_code_skipped", "account_id", account.ID, "status_code", statusCode) slog.Info("account_error_code_skipped", "account_id", account.ID, "status_code", statusCode)
return false return false

View File

@@ -1127,6 +1127,58 @@
</template> </template>
</div> </div>
<!-- Pool Mode Section -->
<div class="border-t border-gray-200 pt-4 dark:border-dark-600">
<div class="mb-3 flex items-center justify-between">
<div>
<label class="input-label mb-0">{{ t('admin.accounts.poolMode') }}</label>
<p class="mt-1 text-xs text-gray-500 dark:text-gray-400">
{{ t('admin.accounts.poolModeHint') }}
</p>
</div>
<button
type="button"
@click="poolModeEnabled = !poolModeEnabled"
:class="[
'relative inline-flex h-6 w-11 flex-shrink-0 cursor-pointer rounded-full border-2 border-transparent transition-colors duration-200 ease-in-out focus:outline-none focus:ring-2 focus:ring-primary-500 focus:ring-offset-2',
poolModeEnabled ? 'bg-primary-600' : 'bg-gray-200 dark:bg-dark-600'
]"
>
<span
:class="[
'pointer-events-none inline-block h-5 w-5 transform rounded-full bg-white shadow ring-0 transition duration-200 ease-in-out',
poolModeEnabled ? 'translate-x-5' : 'translate-x-0'
]"
/>
</button>
</div>
<div v-if="poolModeEnabled" class="rounded-lg bg-blue-50 p-3 dark:bg-blue-900/20">
<p class="text-xs text-blue-700 dark:text-blue-400">
<Icon name="exclamationCircle" size="sm" class="mr-1 inline" :stroke-width="2" />
{{ t('admin.accounts.poolModeInfo') }}
</p>
</div>
<div v-if="poolModeEnabled" class="mt-3">
<label class="input-label">{{ t('admin.accounts.poolModeRetryCount') }}</label>
<input
v-model.number="poolModeRetryCount"
type="number"
min="0"
:max="MAX_POOL_MODE_RETRY_COUNT"
step="1"
class="input"
/>
<p class="mt-1 text-xs text-gray-500 dark:text-gray-400">
{{
t('admin.accounts.poolModeRetryCountHint', {
default: DEFAULT_POOL_MODE_RETRY_COUNT,
max: MAX_POOL_MODE_RETRY_COUNT
})
}}
</p>
</div>
</div>
<!-- Custom Error Codes Section --> <!-- Custom Error Codes Section -->
<div class="border-t border-gray-200 pt-4 dark:border-dark-600"> <div class="border-t border-gray-200 pt-4 dark:border-dark-600">
<div class="mb-3 flex items-center justify-between"> <div class="mb-3 flex items-center justify-between">
@@ -2629,6 +2681,10 @@ const editQuotaWeeklyLimit = ref<number | null>(null)
const modelMappings = ref<ModelMapping[]>([]) const modelMappings = ref<ModelMapping[]>([])
const modelRestrictionMode = ref<'whitelist' | 'mapping'>('whitelist') const modelRestrictionMode = ref<'whitelist' | 'mapping'>('whitelist')
const allowedModels = ref<string[]>([]) const allowedModels = ref<string[]>([])
const DEFAULT_POOL_MODE_RETRY_COUNT = 3
const MAX_POOL_MODE_RETRY_COUNT = 10
const poolModeEnabled = ref(false)
const poolModeRetryCount = ref(DEFAULT_POOL_MODE_RETRY_COUNT)
const customErrorCodesEnabled = ref(false) const customErrorCodesEnabled = ref(false)
const selectedErrorCodes = ref<number[]>([]) const selectedErrorCodes = ref<number[]>([])
const customErrorCodeInput = ref<number | null>(null) const customErrorCodeInput = ref<number | null>(null)
@@ -3300,6 +3356,8 @@ const resetForm = () => {
fetchAntigravityDefaultMappings().then(mappings => { fetchAntigravityDefaultMappings().then(mappings => {
antigravityModelMappings.value = [...mappings] antigravityModelMappings.value = [...mappings]
}) })
poolModeEnabled.value = false
poolModeRetryCount.value = DEFAULT_POOL_MODE_RETRY_COUNT
customErrorCodesEnabled.value = false customErrorCodesEnabled.value = false
selectedErrorCodes.value = [] selectedErrorCodes.value = []
customErrorCodeInput.value = null customErrorCodeInput.value = null
@@ -3452,6 +3510,20 @@ const handleMixedChannelCancel = () => {
clearMixedChannelDialog() clearMixedChannelDialog()
} }
const normalizePoolModeRetryCount = (value: number) => {
if (!Number.isFinite(value)) {
return DEFAULT_POOL_MODE_RETRY_COUNT
}
const normalized = Math.trunc(value)
if (normalized < 0) {
return 0
}
if (normalized > MAX_POOL_MODE_RETRY_COUNT) {
return MAX_POOL_MODE_RETRY_COUNT
}
return normalized
}
const handleSubmit = async () => { const handleSubmit = async () => {
// For OAuth-based type, handle OAuth flow (goes to step 2) // For OAuth-based type, handle OAuth flow (goes to step 2)
if (isOAuthFlow.value) { if (isOAuthFlow.value) {
@@ -3551,6 +3623,12 @@ const handleSubmit = async () => {
} }
} }
// Add pool mode if enabled
if (poolModeEnabled.value) {
credentials.pool_mode = true
credentials.pool_mode_retry_count = normalizePoolModeRetryCount(poolModeRetryCount.value)
}
// Add custom error codes if enabled // Add custom error codes if enabled
if (customErrorCodesEnabled.value) { if (customErrorCodesEnabled.value) {
credentials.custom_error_codes_enabled = true credentials.custom_error_codes_enabled = true

View File

@@ -251,6 +251,58 @@
</template> </template>
</div> </div>
<!-- Pool Mode Section -->
<div class="border-t border-gray-200 pt-4 dark:border-dark-600">
<div class="mb-3 flex items-center justify-between">
<div>
<label class="input-label mb-0">{{ t('admin.accounts.poolMode') }}</label>
<p class="mt-1 text-xs text-gray-500 dark:text-gray-400">
{{ t('admin.accounts.poolModeHint') }}
</p>
</div>
<button
type="button"
@click="poolModeEnabled = !poolModeEnabled"
:class="[
'relative inline-flex h-6 w-11 flex-shrink-0 cursor-pointer rounded-full border-2 border-transparent transition-colors duration-200 ease-in-out focus:outline-none focus:ring-2 focus:ring-primary-500 focus:ring-offset-2',
poolModeEnabled ? 'bg-primary-600' : 'bg-gray-200 dark:bg-dark-600'
]"
>
<span
:class="[
'pointer-events-none inline-block h-5 w-5 transform rounded-full bg-white shadow ring-0 transition duration-200 ease-in-out',
poolModeEnabled ? 'translate-x-5' : 'translate-x-0'
]"
/>
</button>
</div>
<div v-if="poolModeEnabled" class="rounded-lg bg-blue-50 p-3 dark:bg-blue-900/20">
<p class="text-xs text-blue-700 dark:text-blue-400">
<Icon name="exclamationCircle" size="sm" class="mr-1 inline" :stroke-width="2" />
{{ t('admin.accounts.poolModeInfo') }}
</p>
</div>
<div v-if="poolModeEnabled" class="mt-3">
<label class="input-label">{{ t('admin.accounts.poolModeRetryCount') }}</label>
<input
v-model.number="poolModeRetryCount"
type="number"
min="0"
:max="MAX_POOL_MODE_RETRY_COUNT"
step="1"
class="input"
/>
<p class="mt-1 text-xs text-gray-500 dark:text-gray-400">
{{
t('admin.accounts.poolModeRetryCountHint', {
default: DEFAULT_POOL_MODE_RETRY_COUNT,
max: MAX_POOL_MODE_RETRY_COUNT
})
}}
</p>
</div>
</div>
<!-- Custom Error Codes Section --> <!-- Custom Error Codes Section -->
<div class="border-t border-gray-200 pt-4 dark:border-dark-600"> <div class="border-t border-gray-200 pt-4 dark:border-dark-600">
<div class="mb-3 flex items-center justify-between"> <div class="mb-3 flex items-center justify-between">
@@ -1498,6 +1550,10 @@ const editApiKey = ref('')
const modelMappings = ref<ModelMapping[]>([]) const modelMappings = ref<ModelMapping[]>([])
const modelRestrictionMode = ref<'whitelist' | 'mapping'>('whitelist') const modelRestrictionMode = ref<'whitelist' | 'mapping'>('whitelist')
const allowedModels = ref<string[]>([]) const allowedModels = ref<string[]>([])
const DEFAULT_POOL_MODE_RETRY_COUNT = 3
const MAX_POOL_MODE_RETRY_COUNT = 10
const poolModeEnabled = ref(false)
const poolModeRetryCount = ref(DEFAULT_POOL_MODE_RETRY_COUNT)
const customErrorCodesEnabled = ref(false) const customErrorCodesEnabled = ref(false)
const selectedErrorCodes = ref<number[]>([]) const selectedErrorCodes = ref<number[]>([])
const customErrorCodeInput = ref<number | null>(null) const customErrorCodeInput = ref<number | null>(null)
@@ -1658,6 +1714,20 @@ const expiresAtInput = computed({
}) })
// Watchers // Watchers
const normalizePoolModeRetryCount = (value: number) => {
if (!Number.isFinite(value)) {
return DEFAULT_POOL_MODE_RETRY_COUNT
}
const normalized = Math.trunc(value)
if (normalized < 0) {
return 0
}
if (normalized > MAX_POOL_MODE_RETRY_COUNT) {
return MAX_POOL_MODE_RETRY_COUNT
}
return normalized
}
watch( watch(
() => props.account, () => props.account,
(newAccount) => { (newAccount) => {
@@ -1805,6 +1875,12 @@ watch(
allowedModels.value = [] allowedModels.value = []
} }
// Load pool mode
poolModeEnabled.value = credentials.pool_mode === true
poolModeRetryCount.value = normalizePoolModeRetryCount(
Number(credentials.pool_mode_retry_count ?? DEFAULT_POOL_MODE_RETRY_COUNT)
)
// Load custom error codes // Load custom error codes
customErrorCodesEnabled.value = credentials.custom_error_codes_enabled === true customErrorCodesEnabled.value = credentials.custom_error_codes_enabled === true
const existingErrorCodes = credentials.custom_error_codes as number[] | undefined const existingErrorCodes = credentials.custom_error_codes as number[] | undefined
@@ -1851,6 +1927,8 @@ watch(
modelMappings.value = [] modelMappings.value = []
allowedModels.value = [] allowedModels.value = []
} }
poolModeEnabled.value = false
poolModeRetryCount.value = DEFAULT_POOL_MODE_RETRY_COUNT
customErrorCodesEnabled.value = false customErrorCodesEnabled.value = false
selectedErrorCodes.value = [] selectedErrorCodes.value = []
} }
@@ -2311,6 +2389,15 @@ const handleSubmit = async () => {
newCredentials.model_mapping = currentCredentials.model_mapping newCredentials.model_mapping = currentCredentials.model_mapping
} }
// Add pool mode if enabled
if (poolModeEnabled.value) {
newCredentials.pool_mode = true
newCredentials.pool_mode_retry_count = normalizePoolModeRetryCount(poolModeRetryCount.value)
} else {
delete newCredentials.pool_mode
delete newCredentials.pool_mode_retry_count
}
// Add custom error codes if enabled // Add custom error codes if enabled
if (customErrorCodesEnabled.value) { if (customErrorCodesEnabled.value) {
newCredentials.custom_error_codes_enabled = true newCredentials.custom_error_codes_enabled = true

View File

@@ -1942,6 +1942,13 @@ export default {
addModel: 'Add', addModel: 'Add',
modelExists: 'Model already exists', modelExists: 'Model already exists',
modelCount: '{count} models', modelCount: '{count} models',
poolMode: 'Pool Mode',
poolModeHint: 'Enable when upstream is an account pool; errors won\'t mark local account status',
poolModeInfo:
'When enabled, upstream 429/403/401 errors will auto-retry without marking the account as rate-limited or errored. Suitable for upstream pointing to another sub2api instance.',
poolModeRetryCount: 'Same-Account Retries',
poolModeRetryCountHint:
'Only applies in pool mode. Use 0 to disable in-place retry. Default {default}, maximum {max}.',
customErrorCodes: 'Custom Error Codes', customErrorCodes: 'Custom Error Codes',
customErrorCodesHint: 'Only stop scheduling for selected error codes', customErrorCodesHint: 'Only stop scheduling for selected error codes',
customErrorCodesWarning: customErrorCodesWarning:

View File

@@ -2086,6 +2086,12 @@ export default {
addModel: '填入', addModel: '填入',
modelExists: '该模型已存在', modelExists: '该模型已存在',
modelCount: '{count} 个模型', modelCount: '{count} 个模型',
poolMode: '池模式',
poolModeHint: '上游为账号池时启用,错误不标记本地账号状态',
poolModeInfo:
'启用后,上游 429/403/401 错误将自动重试而不标记账号限流或错误,适用于上游指向另一个 sub2api 实例的场景。',
poolModeRetryCount: '同账号重试次数',
poolModeRetryCountHint: '仅在池模式下生效。0 表示不原地重试;默认 {default},最大 {max}。',
customErrorCodes: '自定义错误码', customErrorCodes: '自定义错误码',
customErrorCodesHint: '仅对选中的错误码停止调度', customErrorCodesHint: '仅对选中的错误码停止调度',
customErrorCodesWarning: '仅选中的错误码会停止调度,其他错误将返回 500。', customErrorCodesWarning: '仅选中的错误码会停止调度,其他错误将返回 500。',