mirror of
https://github.com/QuantumNous/new-api.git
synced 2026-03-30 01:45:41 +00:00
94 lines
2.2 KiB
Go
94 lines
2.2 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/QuantumNous/new-api/common"
|
|
"github.com/QuantumNous/new-api/logger"
|
|
"github.com/QuantumNous/new-api/model"
|
|
|
|
"github.com/bytedance/gopkg/util/gopool"
|
|
)
|
|
|
|
const (
|
|
subscriptionResetTickInterval = 1 * time.Minute
|
|
subscriptionResetBatchSize = 300
|
|
subscriptionCleanupInterval = 30 * time.Minute
|
|
)
|
|
|
|
var (
|
|
subscriptionResetOnce sync.Once
|
|
subscriptionResetRunning atomic.Bool
|
|
subscriptionCleanupLast atomic.Int64
|
|
)
|
|
|
|
func StartSubscriptionQuotaResetTask() {
|
|
subscriptionResetOnce.Do(func() {
|
|
if !common.IsMasterNode {
|
|
return
|
|
}
|
|
gopool.Go(func() {
|
|
logger.LogInfo(context.Background(), fmt.Sprintf("subscription quota reset task started: tick=%s", subscriptionResetTickInterval))
|
|
ticker := time.NewTicker(subscriptionResetTickInterval)
|
|
defer ticker.Stop()
|
|
|
|
runSubscriptionQuotaResetOnce()
|
|
for range ticker.C {
|
|
runSubscriptionQuotaResetOnce()
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
func runSubscriptionQuotaResetOnce() {
|
|
if !subscriptionResetRunning.CompareAndSwap(false, true) {
|
|
return
|
|
}
|
|
defer subscriptionResetRunning.Store(false)
|
|
|
|
ctx := context.Background()
|
|
totalReset := 0
|
|
totalExpired := 0
|
|
for {
|
|
n, err := model.ExpireDueSubscriptions(subscriptionResetBatchSize)
|
|
if err != nil {
|
|
logger.LogWarn(ctx, fmt.Sprintf("subscription expire task failed: %v", err))
|
|
return
|
|
}
|
|
if n == 0 {
|
|
break
|
|
}
|
|
totalExpired += n
|
|
if n < subscriptionResetBatchSize {
|
|
break
|
|
}
|
|
}
|
|
for {
|
|
n, err := model.ResetDueSubscriptions(subscriptionResetBatchSize)
|
|
if err != nil {
|
|
logger.LogWarn(ctx, fmt.Sprintf("subscription quota reset task failed: %v", err))
|
|
return
|
|
}
|
|
if n == 0 {
|
|
break
|
|
}
|
|
totalReset += n
|
|
if n < subscriptionResetBatchSize {
|
|
break
|
|
}
|
|
}
|
|
lastCleanup := time.Unix(subscriptionCleanupLast.Load(), 0)
|
|
if time.Since(lastCleanup) >= subscriptionCleanupInterval {
|
|
if _, err := model.CleanupSubscriptionPreConsumeRecords(7 * 24 * 3600); err == nil {
|
|
subscriptionCleanupLast.Store(time.Now().Unix())
|
|
}
|
|
}
|
|
if common.DebugEnabled && (totalReset > 0 || totalExpired > 0) {
|
|
logger.LogDebug(ctx, "subscription maintenance: reset_count=%d, expired_count=%d", totalReset, totalExpired)
|
|
}
|
|
}
|