mirror of
https://github.com/Wei-Shaw/sub2api.git
synced 2026-03-30 04:44:49 +00:00
Merge pull request #938 from xvhuan/fix/account-extra-scheduler-pressure-20260311
精准收紧 accounts.extra 观测字段触发的调度重建
This commit is contained in:
@@ -51,6 +51,18 @@ type accountRepository struct {
|
|||||||
schedulerCache service.SchedulerCache
|
schedulerCache service.SchedulerCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var schedulerNeutralExtraKeyPrefixes = []string{
|
||||||
|
"codex_primary_",
|
||||||
|
"codex_secondary_",
|
||||||
|
"codex_5h_",
|
||||||
|
"codex_7d_",
|
||||||
|
}
|
||||||
|
|
||||||
|
var schedulerNeutralExtraKeys = map[string]struct{}{
|
||||||
|
"codex_usage_updated_at": {},
|
||||||
|
"session_window_utilization": {},
|
||||||
|
}
|
||||||
|
|
||||||
// NewAccountRepository 创建账户仓储实例。
|
// NewAccountRepository 创建账户仓储实例。
|
||||||
// 这是对外暴露的构造函数,返回接口类型以便于依赖注入。
|
// 这是对外暴露的构造函数,返回接口类型以便于依赖注入。
|
||||||
func NewAccountRepository(client *dbent.Client, sqlDB *sql.DB, schedulerCache service.SchedulerCache) service.AccountRepository {
|
func NewAccountRepository(client *dbent.Client, sqlDB *sql.DB, schedulerCache service.SchedulerCache) service.AccountRepository {
|
||||||
@@ -1190,8 +1202,10 @@ func (r *accountRepository) UpdateExtra(ctx context.Context, id int64, updates m
|
|||||||
if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil {
|
if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil {
|
||||||
logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err)
|
logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err)
|
||||||
}
|
}
|
||||||
} else if shouldSyncSchedulerSnapshotForExtraUpdates(updates) {
|
} else {
|
||||||
// codex 限流快照仍需要让调度缓存尽快看见,避免 DB 抖动时丢失自愈链路。
|
// 观测型 extra 字段不需要触发 bucket 重建,但仍同步单账号快照,
|
||||||
|
// 让 sticky session / GetAccount 命中缓存时也能读到最新数据,
|
||||||
|
// 同时避免缓存局部 patch 覆盖掉并发写入的其它账号字段。
|
||||||
r.syncSchedulerAccountSnapshot(ctx, id)
|
r.syncSchedulerAccountSnapshot(ctx, id)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -1202,7 +1216,7 @@ func shouldEnqueueSchedulerOutboxForExtraUpdates(updates map[string]any) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for key := range updates {
|
for key := range updates {
|
||||||
if isSchedulerNeutralAccountExtraKey(key) {
|
if isSchedulerNeutralExtraKey(key) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
@@ -1210,91 +1224,20 @@ func shouldEnqueueSchedulerOutboxForExtraUpdates(updates map[string]any) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldSyncSchedulerSnapshotForExtraUpdates(updates map[string]any) bool {
|
func isSchedulerNeutralExtraKey(key string) bool {
|
||||||
return codexExtraIndicatesRateLimit(updates, "7d") || codexExtraIndicatesRateLimit(updates, "5h")
|
|
||||||
}
|
|
||||||
|
|
||||||
func isSchedulerNeutralAccountExtraKey(key string) bool {
|
|
||||||
key = strings.TrimSpace(key)
|
key = strings.TrimSpace(key)
|
||||||
if key == "" {
|
if key == "" {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if key == "session_window_utilization" {
|
if _, ok := schedulerNeutralExtraKeys[key]; ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return strings.HasPrefix(key, "codex_")
|
for _, prefix := range schedulerNeutralExtraKeyPrefixes {
|
||||||
}
|
if strings.HasPrefix(key, prefix) {
|
||||||
|
return true
|
||||||
func codexExtraIndicatesRateLimit(updates map[string]any, window string) bool {
|
}
|
||||||
if len(updates) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
usedValue, ok := updates["codex_"+window+"_used_percent"]
|
|
||||||
if !ok || !extraValueIndicatesExhausted(usedValue) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return extraValueHasResetMarker(updates["codex_"+window+"_reset_at"]) ||
|
|
||||||
extraValueHasPositiveNumber(updates["codex_"+window+"_reset_after_seconds"])
|
|
||||||
}
|
|
||||||
|
|
||||||
func extraValueIndicatesExhausted(value any) bool {
|
|
||||||
number, ok := extraValueToFloat64(value)
|
|
||||||
return ok && number >= 100-1e-9
|
|
||||||
}
|
|
||||||
|
|
||||||
func extraValueHasPositiveNumber(value any) bool {
|
|
||||||
number, ok := extraValueToFloat64(value)
|
|
||||||
return ok && number > 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func extraValueHasResetMarker(value any) bool {
|
|
||||||
switch v := value.(type) {
|
|
||||||
case string:
|
|
||||||
return strings.TrimSpace(v) != ""
|
|
||||||
case time.Time:
|
|
||||||
return !v.IsZero()
|
|
||||||
case *time.Time:
|
|
||||||
return v != nil && !v.IsZero()
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func extraValueToFloat64(value any) (float64, bool) {
|
|
||||||
switch v := value.(type) {
|
|
||||||
case float64:
|
|
||||||
return v, true
|
|
||||||
case float32:
|
|
||||||
return float64(v), true
|
|
||||||
case int:
|
|
||||||
return float64(v), true
|
|
||||||
case int8:
|
|
||||||
return float64(v), true
|
|
||||||
case int16:
|
|
||||||
return float64(v), true
|
|
||||||
case int32:
|
|
||||||
return float64(v), true
|
|
||||||
case int64:
|
|
||||||
return float64(v), true
|
|
||||||
case uint:
|
|
||||||
return float64(v), true
|
|
||||||
case uint8:
|
|
||||||
return float64(v), true
|
|
||||||
case uint16:
|
|
||||||
return float64(v), true
|
|
||||||
case uint32:
|
|
||||||
return float64(v), true
|
|
||||||
case uint64:
|
|
||||||
return float64(v), true
|
|
||||||
case json.Number:
|
|
||||||
parsed, err := v.Float64()
|
|
||||||
return parsed, err == nil
|
|
||||||
case string:
|
|
||||||
parsed, err := strconv.ParseFloat(strings.TrimSpace(v), 64)
|
|
||||||
return parsed, err == nil
|
|
||||||
default:
|
|
||||||
return 0, false
|
|
||||||
}
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) {
|
func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) {
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ type AccountRepoSuite struct {
|
|||||||
|
|
||||||
type schedulerCacheRecorder struct {
|
type schedulerCacheRecorder struct {
|
||||||
setAccounts []*service.Account
|
setAccounts []*service.Account
|
||||||
|
accounts map[int64]*service.Account
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerCacheRecorder) GetSnapshot(ctx context.Context, bucket service.SchedulerBucket) ([]*service.Account, bool, error) {
|
func (s *schedulerCacheRecorder) GetSnapshot(ctx context.Context, bucket service.SchedulerBucket) ([]*service.Account, bool, error) {
|
||||||
@@ -34,11 +35,20 @@ func (s *schedulerCacheRecorder) SetSnapshot(ctx context.Context, bucket service
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerCacheRecorder) GetAccount(ctx context.Context, accountID int64) (*service.Account, error) {
|
func (s *schedulerCacheRecorder) GetAccount(ctx context.Context, accountID int64) (*service.Account, error) {
|
||||||
return nil, nil
|
if s.accounts == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return s.accounts[accountID], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerCacheRecorder) SetAccount(ctx context.Context, account *service.Account) error {
|
func (s *schedulerCacheRecorder) SetAccount(ctx context.Context, account *service.Account) error {
|
||||||
s.setAccounts = append(s.setAccounts, account)
|
s.setAccounts = append(s.setAccounts, account)
|
||||||
|
if s.accounts == nil {
|
||||||
|
s.accounts = make(map[int64]*service.Account)
|
||||||
|
}
|
||||||
|
if account != nil {
|
||||||
|
s.accounts[account.ID] = account
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -623,21 +633,46 @@ func (s *AccountRepoSuite) TestUpdateExtra_NilExtra() {
|
|||||||
s.Require().Equal("val", got.Extra["key"])
|
s.Require().Equal("val", got.Extra["key"])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountRepoSuite) TestUpdateExtra_SchedulerNeutralKeysSkipOutbox() {
|
func (s *AccountRepoSuite) TestUpdateExtra_SchedulerNeutralSkipsOutboxAndSyncsFreshSnapshot() {
|
||||||
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-extra-neutral", Extra: map[string]any{}})
|
account := mustCreateAccount(s.T(), s.client, &service.Account{
|
||||||
_, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox")
|
Name: "acc-extra-neutral",
|
||||||
s.Require().NoError(err)
|
Platform: service.PlatformOpenAI,
|
||||||
|
Extra: map[string]any{"codex_usage_updated_at": "old"},
|
||||||
|
})
|
||||||
|
cacheRecorder := &schedulerCacheRecorder{
|
||||||
|
accounts: map[int64]*service.Account{
|
||||||
|
account.ID: {
|
||||||
|
ID: account.ID,
|
||||||
|
Platform: account.Platform,
|
||||||
|
Status: service.StatusDisabled,
|
||||||
|
Extra: map[string]any{
|
||||||
|
"codex_usage_updated_at": "old",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
s.repo.schedulerCache = cacheRecorder
|
||||||
|
|
||||||
s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, map[string]any{
|
updates := map[string]any{
|
||||||
"codex_usage_updated_at": "2026-03-11T13:00:00Z",
|
"codex_usage_updated_at": "2026-03-11T10:00:00Z",
|
||||||
"codex_5h_used_percent": 12.5,
|
"codex_5h_used_percent": 88.5,
|
||||||
"session_window_utilization": 0.42,
|
"session_window_utilization": 0.42,
|
||||||
}))
|
}
|
||||||
|
s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, updates))
|
||||||
|
|
||||||
var count int
|
got, err := s.repo.GetByID(s.ctx, account.ID)
|
||||||
err = scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &count)
|
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().Equal(0, count)
|
s.Require().Equal("2026-03-11T10:00:00Z", got.Extra["codex_usage_updated_at"])
|
||||||
|
s.Require().Equal(88.5, got.Extra["codex_5h_used_percent"])
|
||||||
|
s.Require().Equal(0.42, got.Extra["session_window_utilization"])
|
||||||
|
|
||||||
|
var outboxCount int
|
||||||
|
s.Require().NoError(scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &outboxCount))
|
||||||
|
s.Require().Zero(outboxCount)
|
||||||
|
s.Require().Len(cacheRecorder.setAccounts, 1)
|
||||||
|
s.Require().NotNil(cacheRecorder.accounts[account.ID])
|
||||||
|
s.Require().Equal(service.StatusActive, cacheRecorder.accounts[account.ID].Status)
|
||||||
|
s.Require().Equal("2026-03-11T10:00:00Z", cacheRecorder.accounts[account.ID].Extra["codex_usage_updated_at"])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountRepoSuite) TestUpdateExtra_ExhaustedCodexSnapshotSyncsSchedulerCache() {
|
func (s *AccountRepoSuite) TestUpdateExtra_ExhaustedCodexSnapshotSyncsSchedulerCache() {
|
||||||
@@ -664,16 +699,22 @@ func (s *AccountRepoSuite) TestUpdateExtra_ExhaustedCodexSnapshotSyncsSchedulerC
|
|||||||
s.Require().Equal(0, count)
|
s.Require().Equal(0, count)
|
||||||
s.Require().Len(cacheRecorder.setAccounts, 1)
|
s.Require().Len(cacheRecorder.setAccounts, 1)
|
||||||
s.Require().Equal(account.ID, cacheRecorder.setAccounts[0].ID)
|
s.Require().Equal(account.ID, cacheRecorder.setAccounts[0].ID)
|
||||||
|
s.Require().Equal(service.StatusActive, cacheRecorder.setAccounts[0].Status)
|
||||||
s.Require().Equal(100.0, cacheRecorder.setAccounts[0].Extra["codex_7d_used_percent"])
|
s.Require().Equal(100.0, cacheRecorder.setAccounts[0].Extra["codex_7d_used_percent"])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountRepoSuite) TestUpdateExtra_CustomKeysStillEnqueueOutbox() {
|
func (s *AccountRepoSuite) TestUpdateExtra_SchedulerRelevantStillEnqueuesOutbox() {
|
||||||
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-extra-custom", Extra: map[string]any{}})
|
account := mustCreateAccount(s.T(), s.client, &service.Account{
|
||||||
|
Name: "acc-extra-mixed",
|
||||||
|
Platform: service.PlatformAntigravity,
|
||||||
|
Extra: map[string]any{},
|
||||||
|
})
|
||||||
_, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox")
|
_, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox")
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, map[string]any{
|
s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, map[string]any{
|
||||||
"custom_scheduler_sensitive_key": true,
|
"mixed_scheduling": true,
|
||||||
|
"codex_usage_updated_at": "2026-03-11T10:00:00Z",
|
||||||
}))
|
}))
|
||||||
|
|
||||||
var count int
|
var count int
|
||||||
|
|||||||
Reference in New Issue
Block a user