Files
new-api/service/subscription_reset_task.go

94 lines
2.2 KiB
Go

package service
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/QuantumNous/new-api/common"
"github.com/QuantumNous/new-api/logger"
"github.com/QuantumNous/new-api/model"
"github.com/bytedance/gopkg/util/gopool"
)
const (
subscriptionResetTickInterval = 1 * time.Minute
subscriptionResetBatchSize = 300
subscriptionCleanupInterval = 30 * time.Minute
)
var (
subscriptionResetOnce sync.Once
subscriptionResetRunning atomic.Bool
subscriptionCleanupLast atomic.Int64
)
func StartSubscriptionQuotaResetTask() {
subscriptionResetOnce.Do(func() {
if !common.IsMasterNode {
return
}
gopool.Go(func() {
logger.LogInfo(context.Background(), fmt.Sprintf("subscription quota reset task started: tick=%s", subscriptionResetTickInterval))
ticker := time.NewTicker(subscriptionResetTickInterval)
defer ticker.Stop()
runSubscriptionQuotaResetOnce()
for range ticker.C {
runSubscriptionQuotaResetOnce()
}
})
})
}
func runSubscriptionQuotaResetOnce() {
if !subscriptionResetRunning.CompareAndSwap(false, true) {
return
}
defer subscriptionResetRunning.Store(false)
ctx := context.Background()
totalReset := 0
totalExpired := 0
for {
n, err := model.ExpireDueSubscriptions(subscriptionResetBatchSize)
if err != nil {
logger.LogWarn(ctx, fmt.Sprintf("subscription expire task failed: %v", err))
return
}
if n == 0 {
break
}
totalExpired += n
if n < subscriptionResetBatchSize {
break
}
}
for {
n, err := model.ResetDueSubscriptions(subscriptionResetBatchSize)
if err != nil {
logger.LogWarn(ctx, fmt.Sprintf("subscription quota reset task failed: %v", err))
return
}
if n == 0 {
break
}
totalReset += n
if n < subscriptionResetBatchSize {
break
}
}
lastCleanup := time.Unix(subscriptionCleanupLast.Load(), 0)
if time.Since(lastCleanup) >= subscriptionCleanupInterval {
if _, err := model.CleanupSubscriptionPreConsumeRecords(7 * 24 * 3600); err == nil {
subscriptionCleanupLast.Store(time.Now().Unix())
}
}
if common.DebugEnabled && (totalReset > 0 || totalExpired > 0) {
logger.LogDebug(ctx, "subscription maintenance: reset_count=%d, expired_count=%d", totalReset, totalExpired)
}
}