mirror of
https://github.com/Wei-Shaw/sub2api.git
synced 2026-04-19 07:27:28 +00:00
Fix Codex exhausted snapshot propagation
This commit is contained in:
@@ -1190,6 +1190,9 @@ func (r *accountRepository) UpdateExtra(ctx context.Context, id int64, updates m
|
|||||||
if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil {
|
if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil {
|
||||||
logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err)
|
logger.LegacyPrintf("repository.account", "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v", id, err)
|
||||||
}
|
}
|
||||||
|
} else if shouldSyncSchedulerSnapshotForExtraUpdates(updates) {
|
||||||
|
// codex 限流快照仍需要让调度缓存尽快看见,避免 DB 抖动时丢失自愈链路。
|
||||||
|
r.syncSchedulerAccountSnapshot(ctx, id)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -1207,6 +1210,10 @@ func shouldEnqueueSchedulerOutboxForExtraUpdates(updates map[string]any) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func shouldSyncSchedulerSnapshotForExtraUpdates(updates map[string]any) bool {
|
||||||
|
return codexExtraIndicatesRateLimit(updates, "7d") || codexExtraIndicatesRateLimit(updates, "5h")
|
||||||
|
}
|
||||||
|
|
||||||
func isSchedulerNeutralAccountExtraKey(key string) bool {
|
func isSchedulerNeutralAccountExtraKey(key string) bool {
|
||||||
key = strings.TrimSpace(key)
|
key = strings.TrimSpace(key)
|
||||||
if key == "" {
|
if key == "" {
|
||||||
@@ -1218,6 +1225,78 @@ func isSchedulerNeutralAccountExtraKey(key string) bool {
|
|||||||
return strings.HasPrefix(key, "codex_")
|
return strings.HasPrefix(key, "codex_")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func codexExtraIndicatesRateLimit(updates map[string]any, window string) bool {
|
||||||
|
if len(updates) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
usedValue, ok := updates["codex_"+window+"_used_percent"]
|
||||||
|
if !ok || !extraValueIndicatesExhausted(usedValue) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return extraValueHasResetMarker(updates["codex_"+window+"_reset_at"]) ||
|
||||||
|
extraValueHasPositiveNumber(updates["codex_"+window+"_reset_after_seconds"])
|
||||||
|
}
|
||||||
|
|
||||||
|
func extraValueIndicatesExhausted(value any) bool {
|
||||||
|
number, ok := extraValueToFloat64(value)
|
||||||
|
return ok && number >= 100-1e-9
|
||||||
|
}
|
||||||
|
|
||||||
|
func extraValueHasPositiveNumber(value any) bool {
|
||||||
|
number, ok := extraValueToFloat64(value)
|
||||||
|
return ok && number > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func extraValueHasResetMarker(value any) bool {
|
||||||
|
switch v := value.(type) {
|
||||||
|
case string:
|
||||||
|
return strings.TrimSpace(v) != ""
|
||||||
|
case time.Time:
|
||||||
|
return !v.IsZero()
|
||||||
|
case *time.Time:
|
||||||
|
return v != nil && !v.IsZero()
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func extraValueToFloat64(value any) (float64, bool) {
|
||||||
|
switch v := value.(type) {
|
||||||
|
case float64:
|
||||||
|
return v, true
|
||||||
|
case float32:
|
||||||
|
return float64(v), true
|
||||||
|
case int:
|
||||||
|
return float64(v), true
|
||||||
|
case int8:
|
||||||
|
return float64(v), true
|
||||||
|
case int16:
|
||||||
|
return float64(v), true
|
||||||
|
case int32:
|
||||||
|
return float64(v), true
|
||||||
|
case int64:
|
||||||
|
return float64(v), true
|
||||||
|
case uint:
|
||||||
|
return float64(v), true
|
||||||
|
case uint8:
|
||||||
|
return float64(v), true
|
||||||
|
case uint16:
|
||||||
|
return float64(v), true
|
||||||
|
case uint32:
|
||||||
|
return float64(v), true
|
||||||
|
case uint64:
|
||||||
|
return float64(v), true
|
||||||
|
case json.Number:
|
||||||
|
parsed, err := v.Float64()
|
||||||
|
return parsed, err == nil
|
||||||
|
case string:
|
||||||
|
parsed, err := strconv.ParseFloat(strings.TrimSpace(v), 64)
|
||||||
|
return parsed, err == nil
|
||||||
|
default:
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) {
|
func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates service.AccountBulkUpdate) (int64, error) {
|
||||||
if len(ids) == 0 {
|
if len(ids) == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
|
|||||||
@@ -640,6 +640,33 @@ func (s *AccountRepoSuite) TestUpdateExtra_SchedulerNeutralKeysSkipOutbox() {
|
|||||||
s.Require().Equal(0, count)
|
s.Require().Equal(0, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *AccountRepoSuite) TestUpdateExtra_ExhaustedCodexSnapshotSyncsSchedulerCache() {
|
||||||
|
account := mustCreateAccount(s.T(), s.client, &service.Account{
|
||||||
|
Name: "acc-extra-codex-exhausted",
|
||||||
|
Platform: service.PlatformOpenAI,
|
||||||
|
Type: service.AccountTypeOAuth,
|
||||||
|
Extra: map[string]any{},
|
||||||
|
})
|
||||||
|
cacheRecorder := &schedulerCacheRecorder{}
|
||||||
|
s.repo.schedulerCache = cacheRecorder
|
||||||
|
_, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox")
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
s.Require().NoError(s.repo.UpdateExtra(s.ctx, account.ID, map[string]any{
|
||||||
|
"codex_7d_used_percent": 100.0,
|
||||||
|
"codex_7d_reset_at": "2026-03-12T13:00:00Z",
|
||||||
|
"codex_7d_reset_after_seconds": 86400,
|
||||||
|
}))
|
||||||
|
|
||||||
|
var count int
|
||||||
|
err = scanSingleRow(s.ctx, s.repo.sql, "SELECT COUNT(*) FROM scheduler_outbox", nil, &count)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.Require().Equal(0, count)
|
||||||
|
s.Require().Len(cacheRecorder.setAccounts, 1)
|
||||||
|
s.Require().Equal(account.ID, cacheRecorder.setAccounts[0].ID)
|
||||||
|
s.Require().Equal(100.0, cacheRecorder.setAccounts[0].Extra["codex_7d_used_percent"])
|
||||||
|
}
|
||||||
|
|
||||||
func (s *AccountRepoSuite) TestUpdateExtra_CustomKeysStillEnqueueOutbox() {
|
func (s *AccountRepoSuite) TestUpdateExtra_CustomKeysStillEnqueueOutbox() {
|
||||||
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-extra-custom", Extra: map[string]any{}})
|
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-extra-custom", Extra: map[string]any{}})
|
||||||
_, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox")
|
_, err := s.repo.sql.ExecContext(s.ctx, "TRUNCATE scheduler_outbox")
|
||||||
|
|||||||
@@ -369,8 +369,11 @@ func (s *AccountUsageService) getOpenAIUsage(ctx context.Context, account *Accou
|
|||||||
}
|
}
|
||||||
|
|
||||||
if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) {
|
if shouldRefreshOpenAICodexSnapshot(account, usage, now) && s.shouldProbeOpenAICodexSnapshot(account.ID, now) {
|
||||||
if updates, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && len(updates) > 0 {
|
if updates, resetAt, err := s.probeOpenAICodexSnapshot(ctx, account); err == nil && (len(updates) > 0 || resetAt != nil) {
|
||||||
mergeAccountExtra(account, updates)
|
mergeAccountExtra(account, updates)
|
||||||
|
if resetAt != nil {
|
||||||
|
account.RateLimitResetAt = resetAt
|
||||||
|
}
|
||||||
if usage.UpdatedAt == nil {
|
if usage.UpdatedAt == nil {
|
||||||
usage.UpdatedAt = &now
|
usage.UpdatedAt = &now
|
||||||
}
|
}
|
||||||
@@ -457,26 +460,26 @@ func (s *AccountUsageService) shouldProbeOpenAICodexSnapshot(accountID int64, no
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, error) {
|
func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, account *Account) (map[string]any, *time.Time, error) {
|
||||||
if account == nil || !account.IsOAuth() {
|
if account == nil || !account.IsOAuth() {
|
||||||
return nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
accessToken := account.GetOpenAIAccessToken()
|
accessToken := account.GetOpenAIAccessToken()
|
||||||
if accessToken == "" {
|
if accessToken == "" {
|
||||||
return nil, fmt.Errorf("no access token available")
|
return nil, nil, fmt.Errorf("no access token available")
|
||||||
}
|
}
|
||||||
modelID := openaipkg.DefaultTestModel
|
modelID := openaipkg.DefaultTestModel
|
||||||
payload := createOpenAITestPayload(modelID, true)
|
payload := createOpenAITestPayload(modelID, true)
|
||||||
payloadBytes, err := json.Marshal(payload)
|
payloadBytes, err := json.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("marshal openai probe payload: %w", err)
|
return nil, nil, fmt.Errorf("marshal openai probe payload: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
reqCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, chatgptCodexURL, bytes.NewReader(payloadBytes))
|
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, chatgptCodexURL, bytes.NewReader(payloadBytes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create openai probe request: %w", err)
|
return nil, nil, fmt.Errorf("create openai probe request: %w", err)
|
||||||
}
|
}
|
||||||
req.Host = "chatgpt.com"
|
req.Host = "chatgpt.com"
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
@@ -505,43 +508,67 @@ func (s *AccountUsageService) probeOpenAICodexSnapshot(ctx context.Context, acco
|
|||||||
ResponseHeaderTimeout: 10 * time.Second,
|
ResponseHeaderTimeout: 10 * time.Second,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("build openai probe client: %w", err)
|
return nil, nil, fmt.Errorf("build openai probe client: %w", err)
|
||||||
}
|
}
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("openai codex probe request failed: %w", err)
|
return nil, nil, fmt.Errorf("openai codex probe request failed: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
updates, err := extractOpenAICodexProbeUpdates(resp)
|
updates, resetAt, err := extractOpenAICodexProbeSnapshot(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if len(updates) > 0 {
|
if len(updates) > 0 || resetAt != nil {
|
||||||
go func(accountID int64, updates map[string]any) {
|
s.persistOpenAICodexProbeSnapshot(account.ID, updates, resetAt)
|
||||||
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
return updates, resetAt, nil
|
||||||
defer updateCancel()
|
}
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *AccountUsageService) persistOpenAICodexProbeSnapshot(accountID int64, updates map[string]any, resetAt *time.Time) {
|
||||||
|
if s == nil || s.accountRepo == nil || accountID <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(updates) == 0 && resetAt == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer updateCancel()
|
||||||
|
if len(updates) > 0 {
|
||||||
_ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
|
_ = s.accountRepo.UpdateExtra(updateCtx, accountID, updates)
|
||||||
}(account.ID, updates)
|
}
|
||||||
return updates, nil
|
if resetAt != nil {
|
||||||
|
_ = s.accountRepo.SetRateLimited(updateCtx, accountID, *resetAt)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractOpenAICodexProbeSnapshot(resp *http.Response) (map[string]any, *time.Time, error) {
|
||||||
|
if resp == nil {
|
||||||
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
return nil, nil
|
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
|
||||||
|
baseTime := time.Now()
|
||||||
|
updates := buildCodexUsageExtraUpdates(snapshot, baseTime)
|
||||||
|
resetAt := codexRateLimitResetAtFromSnapshot(snapshot, baseTime)
|
||||||
|
if len(updates) > 0 {
|
||||||
|
return updates, resetAt, nil
|
||||||
|
}
|
||||||
|
return nil, resetAt, nil
|
||||||
|
}
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return nil, nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) {
|
func extractOpenAICodexProbeUpdates(resp *http.Response) (map[string]any, error) {
|
||||||
if resp == nil {
|
updates, _, err := extractOpenAICodexProbeSnapshot(resp)
|
||||||
return nil, nil
|
return updates, err
|
||||||
}
|
|
||||||
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
|
|
||||||
updates := buildCodexUsageExtraUpdates(snapshot, time.Now())
|
|
||||||
if len(updates) > 0 {
|
|
||||||
return updates, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
||||||
return nil, fmt.Errorf("openai codex probe returned status %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeAccountExtra(account *Account, updates map[string]any) {
|
func mergeAccountExtra(account *Account, updates map[string]any) {
|
||||||
|
|||||||
@@ -1,11 +1,36 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type accountUsageCodexProbeRepo struct {
|
||||||
|
stubOpenAIAccountRepo
|
||||||
|
updateExtraCh chan map[string]any
|
||||||
|
rateLimitCh chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *accountUsageCodexProbeRepo) UpdateExtra(_ context.Context, _ int64, updates map[string]any) error {
|
||||||
|
if r.updateExtraCh != nil {
|
||||||
|
copied := make(map[string]any, len(updates))
|
||||||
|
for k, v := range updates {
|
||||||
|
copied[k] = v
|
||||||
|
}
|
||||||
|
r.updateExtraCh <- copied
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *accountUsageCodexProbeRepo) SetRateLimited(_ context.Context, _ int64, resetAt time.Time) error {
|
||||||
|
if r.rateLimitCh != nil {
|
||||||
|
r.rateLimitCh <- resetAt
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestShouldRefreshOpenAICodexSnapshot(t *testing.T) {
|
func TestShouldRefreshOpenAICodexSnapshot(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
@@ -66,3 +91,60 @@ func TestExtractOpenAICodexProbeUpdatesAccepts429WithCodexHeaders(t *testing.T)
|
|||||||
t.Fatalf("codex_7d_used_percent = %v, want 100", got)
|
t.Fatalf("codex_7d_used_percent = %v, want 100", got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExtractOpenAICodexProbeSnapshotAccepts429WithResetAt(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
headers := make(http.Header)
|
||||||
|
headers.Set("x-codex-primary-used-percent", "100")
|
||||||
|
headers.Set("x-codex-primary-reset-after-seconds", "604800")
|
||||||
|
headers.Set("x-codex-primary-window-minutes", "10080")
|
||||||
|
headers.Set("x-codex-secondary-used-percent", "100")
|
||||||
|
headers.Set("x-codex-secondary-reset-after-seconds", "18000")
|
||||||
|
headers.Set("x-codex-secondary-window-minutes", "300")
|
||||||
|
|
||||||
|
updates, resetAt, err := extractOpenAICodexProbeSnapshot(&http.Response{StatusCode: http.StatusTooManyRequests, Header: headers})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("extractOpenAICodexProbeSnapshot() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(updates) == 0 {
|
||||||
|
t.Fatal("expected codex probe updates from 429 headers")
|
||||||
|
}
|
||||||
|
if resetAt == nil {
|
||||||
|
t.Fatal("expected resetAt from exhausted codex headers")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccountUsageService_PersistOpenAICodexProbeSnapshotSetsRateLimit(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
repo := &accountUsageCodexProbeRepo{
|
||||||
|
updateExtraCh: make(chan map[string]any, 1),
|
||||||
|
rateLimitCh: make(chan time.Time, 1),
|
||||||
|
}
|
||||||
|
svc := &AccountUsageService{accountRepo: repo}
|
||||||
|
resetAt := time.Now().Add(2 * time.Hour).UTC().Truncate(time.Second)
|
||||||
|
|
||||||
|
svc.persistOpenAICodexProbeSnapshot(321, map[string]any{
|
||||||
|
"codex_7d_used_percent": 100.0,
|
||||||
|
"codex_7d_reset_at": resetAt.Format(time.RFC3339),
|
||||||
|
}, &resetAt)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case updates := <-repo.updateExtraCh:
|
||||||
|
if got := updates["codex_7d_used_percent"]; got != 100.0 {
|
||||||
|
t.Fatalf("codex_7d_used_percent = %v, want 100", got)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("waiting for codex probe extra persistence timed out")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case got := <-repo.rateLimitCh:
|
||||||
|
if got.Before(resetAt.Add(-time.Second)) || got.After(resetAt.Add(time.Second)) {
|
||||||
|
t.Fatalf("rate limit resetAt = %v, want around %v", got, resetAt)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("waiting for codex probe rate limit persistence timed out")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -4102,6 +4102,9 @@ func (s *OpenAIGatewayService) updateCodexUsageSnapshot(ctx context.Context, acc
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
shouldPersistUpdates := len(updates) > 0 && s.getCodexSnapshotThrottle().Allow(accountID, now)
|
shouldPersistUpdates := len(updates) > 0 && s.getCodexSnapshotThrottle().Allow(accountID, now)
|
||||||
|
if !shouldPersistUpdates && resetAt == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
|||||||
Reference in New Issue
Block a user