mirror of
https://github.com/Wei-Shaw/sub2api.git
synced 2026-03-30 05:02:17 +00:00
Merge pull request #933 from xvhuan/fix/dashboard-read-pressure-20260311
降低 admin/dashboard 读路径压力,避免 snapshot-v2 并发击穿
This commit is contained in:
@@ -249,11 +249,12 @@ func (h *DashboardHandler) GetUsageTrend(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trend, err := h.dashboardService.GetUsageTrendWithFilters(c.Request.Context(), startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, requestType, stream, billingType)
|
trend, hit, err := h.getUsageTrendCached(c.Request.Context(), startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, requestType, stream, billingType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get usage trend")
|
response.Error(c, 500, "Failed to get usage trend")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
c.Header("X-Snapshot-Cache", cacheStatusValue(hit))
|
||||||
|
|
||||||
response.Success(c, gin.H{
|
response.Success(c, gin.H{
|
||||||
"trend": trend,
|
"trend": trend,
|
||||||
@@ -321,11 +322,12 @@ func (h *DashboardHandler) GetModelStats(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, err := h.dashboardService.GetModelStatsWithFilters(c.Request.Context(), startTime, endTime, userID, apiKeyID, accountID, groupID, requestType, stream, billingType)
|
stats, hit, err := h.getModelStatsCached(c.Request.Context(), startTime, endTime, userID, apiKeyID, accountID, groupID, requestType, stream, billingType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get model statistics")
|
response.Error(c, 500, "Failed to get model statistics")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
c.Header("X-Snapshot-Cache", cacheStatusValue(hit))
|
||||||
|
|
||||||
response.Success(c, gin.H{
|
response.Success(c, gin.H{
|
||||||
"models": stats,
|
"models": stats,
|
||||||
@@ -391,11 +393,12 @@ func (h *DashboardHandler) GetGroupStats(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, err := h.dashboardService.GetGroupStatsWithFilters(c.Request.Context(), startTime, endTime, userID, apiKeyID, accountID, groupID, requestType, stream, billingType)
|
stats, hit, err := h.getGroupStatsCached(c.Request.Context(), startTime, endTime, userID, apiKeyID, accountID, groupID, requestType, stream, billingType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get group statistics")
|
response.Error(c, 500, "Failed to get group statistics")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
c.Header("X-Snapshot-Cache", cacheStatusValue(hit))
|
||||||
|
|
||||||
response.Success(c, gin.H{
|
response.Success(c, gin.H{
|
||||||
"groups": stats,
|
"groups": stats,
|
||||||
@@ -416,11 +419,12 @@ func (h *DashboardHandler) GetAPIKeyUsageTrend(c *gin.Context) {
|
|||||||
limit = 5
|
limit = 5
|
||||||
}
|
}
|
||||||
|
|
||||||
trend, err := h.dashboardService.GetAPIKeyUsageTrend(c.Request.Context(), startTime, endTime, granularity, limit)
|
trend, hit, err := h.getAPIKeyUsageTrendCached(c.Request.Context(), startTime, endTime, granularity, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get API key usage trend")
|
response.Error(c, 500, "Failed to get API key usage trend")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
c.Header("X-Snapshot-Cache", cacheStatusValue(hit))
|
||||||
|
|
||||||
response.Success(c, gin.H{
|
response.Success(c, gin.H{
|
||||||
"trend": trend,
|
"trend": trend,
|
||||||
@@ -442,11 +446,12 @@ func (h *DashboardHandler) GetUserUsageTrend(c *gin.Context) {
|
|||||||
limit = 12
|
limit = 12
|
||||||
}
|
}
|
||||||
|
|
||||||
trend, err := h.dashboardService.GetUserUsageTrend(c.Request.Context(), startTime, endTime, granularity, limit)
|
trend, hit, err := h.getUserUsageTrendCached(c.Request.Context(), startTime, endTime, granularity, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get user usage trend")
|
response.Error(c, 500, "Failed to get user usage trend")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
c.Header("X-Snapshot-Cache", cacheStatusValue(hit))
|
||||||
|
|
||||||
response.Success(c, gin.H{
|
response.Success(c, gin.H{
|
||||||
"trend": trend,
|
"trend": trend,
|
||||||
|
|||||||
118
backend/internal/handler/admin/dashboard_handler_cache_test.go
Normal file
118
backend/internal/handler/admin/dashboard_handler_cache_test.go
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
package admin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type dashboardUsageRepoCacheProbe struct {
|
||||||
|
service.UsageLogRepository
|
||||||
|
trendCalls atomic.Int32
|
||||||
|
usersTrendCalls atomic.Int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *dashboardUsageRepoCacheProbe) GetUsageTrendWithFilters(
|
||||||
|
ctx context.Context,
|
||||||
|
startTime, endTime time.Time,
|
||||||
|
granularity string,
|
||||||
|
userID, apiKeyID, accountID, groupID int64,
|
||||||
|
model string,
|
||||||
|
requestType *int16,
|
||||||
|
stream *bool,
|
||||||
|
billingType *int8,
|
||||||
|
) ([]usagestats.TrendDataPoint, error) {
|
||||||
|
r.trendCalls.Add(1)
|
||||||
|
return []usagestats.TrendDataPoint{{
|
||||||
|
Date: "2026-03-11",
|
||||||
|
Requests: 1,
|
||||||
|
TotalTokens: 2,
|
||||||
|
Cost: 3,
|
||||||
|
ActualCost: 4,
|
||||||
|
}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *dashboardUsageRepoCacheProbe) GetUserUsageTrend(
|
||||||
|
ctx context.Context,
|
||||||
|
startTime, endTime time.Time,
|
||||||
|
granularity string,
|
||||||
|
limit int,
|
||||||
|
) ([]usagestats.UserUsageTrendPoint, error) {
|
||||||
|
r.usersTrendCalls.Add(1)
|
||||||
|
return []usagestats.UserUsageTrendPoint{{
|
||||||
|
Date: "2026-03-11",
|
||||||
|
UserID: 1,
|
||||||
|
Email: "cache@test.dev",
|
||||||
|
Requests: 2,
|
||||||
|
Tokens: 20,
|
||||||
|
Cost: 2,
|
||||||
|
ActualCost: 1,
|
||||||
|
}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetDashboardReadCachesForTest() {
|
||||||
|
dashboardTrendCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardUsersTrendCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardAPIKeysTrendCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardModelStatsCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardGroupStatsCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardSnapshotV2Cache = newSnapshotCache(30 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDashboardHandler_GetUsageTrend_UsesCache(t *testing.T) {
|
||||||
|
t.Cleanup(resetDashboardReadCachesForTest)
|
||||||
|
resetDashboardReadCachesForTest()
|
||||||
|
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
repo := &dashboardUsageRepoCacheProbe{}
|
||||||
|
dashboardSvc := service.NewDashboardService(repo, nil, nil, nil)
|
||||||
|
handler := NewDashboardHandler(dashboardSvc, nil)
|
||||||
|
router := gin.New()
|
||||||
|
router.GET("/admin/dashboard/trend", handler.GetUsageTrend)
|
||||||
|
|
||||||
|
req1 := httptest.NewRequest(http.MethodGet, "/admin/dashboard/trend?start_date=2026-03-01&end_date=2026-03-07&granularity=day", nil)
|
||||||
|
rec1 := httptest.NewRecorder()
|
||||||
|
router.ServeHTTP(rec1, req1)
|
||||||
|
require.Equal(t, http.StatusOK, rec1.Code)
|
||||||
|
require.Equal(t, "miss", rec1.Header().Get("X-Snapshot-Cache"))
|
||||||
|
|
||||||
|
req2 := httptest.NewRequest(http.MethodGet, "/admin/dashboard/trend?start_date=2026-03-01&end_date=2026-03-07&granularity=day", nil)
|
||||||
|
rec2 := httptest.NewRecorder()
|
||||||
|
router.ServeHTTP(rec2, req2)
|
||||||
|
require.Equal(t, http.StatusOK, rec2.Code)
|
||||||
|
require.Equal(t, "hit", rec2.Header().Get("X-Snapshot-Cache"))
|
||||||
|
require.Equal(t, int32(1), repo.trendCalls.Load())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDashboardHandler_GetUserUsageTrend_UsesCache(t *testing.T) {
|
||||||
|
t.Cleanup(resetDashboardReadCachesForTest)
|
||||||
|
resetDashboardReadCachesForTest()
|
||||||
|
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
repo := &dashboardUsageRepoCacheProbe{}
|
||||||
|
dashboardSvc := service.NewDashboardService(repo, nil, nil, nil)
|
||||||
|
handler := NewDashboardHandler(dashboardSvc, nil)
|
||||||
|
router := gin.New()
|
||||||
|
router.GET("/admin/dashboard/users-trend", handler.GetUserUsageTrend)
|
||||||
|
|
||||||
|
req1 := httptest.NewRequest(http.MethodGet, "/admin/dashboard/users-trend?start_date=2026-03-01&end_date=2026-03-07&granularity=day&limit=8", nil)
|
||||||
|
rec1 := httptest.NewRecorder()
|
||||||
|
router.ServeHTTP(rec1, req1)
|
||||||
|
require.Equal(t, http.StatusOK, rec1.Code)
|
||||||
|
require.Equal(t, "miss", rec1.Header().Get("X-Snapshot-Cache"))
|
||||||
|
|
||||||
|
req2 := httptest.NewRequest(http.MethodGet, "/admin/dashboard/users-trend?start_date=2026-03-01&end_date=2026-03-07&granularity=day&limit=8", nil)
|
||||||
|
rec2 := httptest.NewRecorder()
|
||||||
|
router.ServeHTTP(rec2, req2)
|
||||||
|
require.Equal(t, http.StatusOK, rec2.Code)
|
||||||
|
require.Equal(t, "hit", rec2.Header().Get("X-Snapshot-Cache"))
|
||||||
|
require.Equal(t, int32(1), repo.usersTrendCalls.Load())
|
||||||
|
}
|
||||||
200
backend/internal/handler/admin/dashboard_query_cache.go
Normal file
200
backend/internal/handler/admin/dashboard_query_cache.go
Normal file
@@ -0,0 +1,200 @@
|
|||||||
|
package admin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
dashboardTrendCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardModelStatsCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardGroupStatsCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardUsersTrendCache = newSnapshotCache(30 * time.Second)
|
||||||
|
dashboardAPIKeysTrendCache = newSnapshotCache(30 * time.Second)
|
||||||
|
)
|
||||||
|
|
||||||
|
type dashboardTrendCacheKey struct {
|
||||||
|
StartTime string `json:"start_time"`
|
||||||
|
EndTime string `json:"end_time"`
|
||||||
|
Granularity string `json:"granularity"`
|
||||||
|
UserID int64 `json:"user_id"`
|
||||||
|
APIKeyID int64 `json:"api_key_id"`
|
||||||
|
AccountID int64 `json:"account_id"`
|
||||||
|
GroupID int64 `json:"group_id"`
|
||||||
|
Model string `json:"model"`
|
||||||
|
RequestType *int16 `json:"request_type"`
|
||||||
|
Stream *bool `json:"stream"`
|
||||||
|
BillingType *int8 `json:"billing_type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type dashboardModelGroupCacheKey struct {
|
||||||
|
StartTime string `json:"start_time"`
|
||||||
|
EndTime string `json:"end_time"`
|
||||||
|
UserID int64 `json:"user_id"`
|
||||||
|
APIKeyID int64 `json:"api_key_id"`
|
||||||
|
AccountID int64 `json:"account_id"`
|
||||||
|
GroupID int64 `json:"group_id"`
|
||||||
|
RequestType *int16 `json:"request_type"`
|
||||||
|
Stream *bool `json:"stream"`
|
||||||
|
BillingType *int8 `json:"billing_type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type dashboardEntityTrendCacheKey struct {
|
||||||
|
StartTime string `json:"start_time"`
|
||||||
|
EndTime string `json:"end_time"`
|
||||||
|
Granularity string `json:"granularity"`
|
||||||
|
Limit int `json:"limit"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func cacheStatusValue(hit bool) string {
|
||||||
|
if hit {
|
||||||
|
return "hit"
|
||||||
|
}
|
||||||
|
return "miss"
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustMarshalDashboardCacheKey(value any) string {
|
||||||
|
raw, err := json.Marshal(value)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return string(raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
func snapshotPayloadAs[T any](payload any) (T, error) {
|
||||||
|
typed, ok := payload.(T)
|
||||||
|
if !ok {
|
||||||
|
var zero T
|
||||||
|
return zero, fmt.Errorf("unexpected cache payload type %T", payload)
|
||||||
|
}
|
||||||
|
return typed, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DashboardHandler) getUsageTrendCached(
|
||||||
|
ctx context.Context,
|
||||||
|
startTime, endTime time.Time,
|
||||||
|
granularity string,
|
||||||
|
userID, apiKeyID, accountID, groupID int64,
|
||||||
|
model string,
|
||||||
|
requestType *int16,
|
||||||
|
stream *bool,
|
||||||
|
billingType *int8,
|
||||||
|
) ([]usagestats.TrendDataPoint, bool, error) {
|
||||||
|
key := mustMarshalDashboardCacheKey(dashboardTrendCacheKey{
|
||||||
|
StartTime: startTime.UTC().Format(time.RFC3339),
|
||||||
|
EndTime: endTime.UTC().Format(time.RFC3339),
|
||||||
|
Granularity: granularity,
|
||||||
|
UserID: userID,
|
||||||
|
APIKeyID: apiKeyID,
|
||||||
|
AccountID: accountID,
|
||||||
|
GroupID: groupID,
|
||||||
|
Model: model,
|
||||||
|
RequestType: requestType,
|
||||||
|
Stream: stream,
|
||||||
|
BillingType: billingType,
|
||||||
|
})
|
||||||
|
entry, hit, err := dashboardTrendCache.GetOrLoad(key, func() (any, error) {
|
||||||
|
return h.dashboardService.GetUsageTrendWithFilters(ctx, startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, requestType, stream, billingType)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, hit, err
|
||||||
|
}
|
||||||
|
trend, err := snapshotPayloadAs[[]usagestats.TrendDataPoint](entry.Payload)
|
||||||
|
return trend, hit, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DashboardHandler) getModelStatsCached(
|
||||||
|
ctx context.Context,
|
||||||
|
startTime, endTime time.Time,
|
||||||
|
userID, apiKeyID, accountID, groupID int64,
|
||||||
|
requestType *int16,
|
||||||
|
stream *bool,
|
||||||
|
billingType *int8,
|
||||||
|
) ([]usagestats.ModelStat, bool, error) {
|
||||||
|
key := mustMarshalDashboardCacheKey(dashboardModelGroupCacheKey{
|
||||||
|
StartTime: startTime.UTC().Format(time.RFC3339),
|
||||||
|
EndTime: endTime.UTC().Format(time.RFC3339),
|
||||||
|
UserID: userID,
|
||||||
|
APIKeyID: apiKeyID,
|
||||||
|
AccountID: accountID,
|
||||||
|
GroupID: groupID,
|
||||||
|
RequestType: requestType,
|
||||||
|
Stream: stream,
|
||||||
|
BillingType: billingType,
|
||||||
|
})
|
||||||
|
entry, hit, err := dashboardModelStatsCache.GetOrLoad(key, func() (any, error) {
|
||||||
|
return h.dashboardService.GetModelStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, accountID, groupID, requestType, stream, billingType)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, hit, err
|
||||||
|
}
|
||||||
|
stats, err := snapshotPayloadAs[[]usagestats.ModelStat](entry.Payload)
|
||||||
|
return stats, hit, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DashboardHandler) getGroupStatsCached(
|
||||||
|
ctx context.Context,
|
||||||
|
startTime, endTime time.Time,
|
||||||
|
userID, apiKeyID, accountID, groupID int64,
|
||||||
|
requestType *int16,
|
||||||
|
stream *bool,
|
||||||
|
billingType *int8,
|
||||||
|
) ([]usagestats.GroupStat, bool, error) {
|
||||||
|
key := mustMarshalDashboardCacheKey(dashboardModelGroupCacheKey{
|
||||||
|
StartTime: startTime.UTC().Format(time.RFC3339),
|
||||||
|
EndTime: endTime.UTC().Format(time.RFC3339),
|
||||||
|
UserID: userID,
|
||||||
|
APIKeyID: apiKeyID,
|
||||||
|
AccountID: accountID,
|
||||||
|
GroupID: groupID,
|
||||||
|
RequestType: requestType,
|
||||||
|
Stream: stream,
|
||||||
|
BillingType: billingType,
|
||||||
|
})
|
||||||
|
entry, hit, err := dashboardGroupStatsCache.GetOrLoad(key, func() (any, error) {
|
||||||
|
return h.dashboardService.GetGroupStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, accountID, groupID, requestType, stream, billingType)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, hit, err
|
||||||
|
}
|
||||||
|
stats, err := snapshotPayloadAs[[]usagestats.GroupStat](entry.Payload)
|
||||||
|
return stats, hit, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DashboardHandler) getAPIKeyUsageTrendCached(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, bool, error) {
|
||||||
|
key := mustMarshalDashboardCacheKey(dashboardEntityTrendCacheKey{
|
||||||
|
StartTime: startTime.UTC().Format(time.RFC3339),
|
||||||
|
EndTime: endTime.UTC().Format(time.RFC3339),
|
||||||
|
Granularity: granularity,
|
||||||
|
Limit: limit,
|
||||||
|
})
|
||||||
|
entry, hit, err := dashboardAPIKeysTrendCache.GetOrLoad(key, func() (any, error) {
|
||||||
|
return h.dashboardService.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, hit, err
|
||||||
|
}
|
||||||
|
trend, err := snapshotPayloadAs[[]usagestats.APIKeyUsageTrendPoint](entry.Payload)
|
||||||
|
return trend, hit, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DashboardHandler) getUserUsageTrendCached(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.UserUsageTrendPoint, bool, error) {
|
||||||
|
key := mustMarshalDashboardCacheKey(dashboardEntityTrendCacheKey{
|
||||||
|
StartTime: startTime.UTC().Format(time.RFC3339),
|
||||||
|
EndTime: endTime.UTC().Format(time.RFC3339),
|
||||||
|
Granularity: granularity,
|
||||||
|
Limit: limit,
|
||||||
|
})
|
||||||
|
entry, hit, err := dashboardUsersTrendCache.GetOrLoad(key, func() (any, error) {
|
||||||
|
return h.dashboardService.GetUserUsageTrend(ctx, startTime, endTime, granularity, limit)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, hit, err
|
||||||
|
}
|
||||||
|
trend, err := snapshotPayloadAs[[]usagestats.UserUsageTrendPoint](entry.Payload)
|
||||||
|
return trend, hit, err
|
||||||
|
}
|
||||||
@@ -1,7 +1,9 @@
|
|||||||
package admin
|
package admin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -111,7 +113,25 @@ func (h *DashboardHandler) GetSnapshotV2(c *gin.Context) {
|
|||||||
})
|
})
|
||||||
cacheKey := string(keyRaw)
|
cacheKey := string(keyRaw)
|
||||||
|
|
||||||
if cached, ok := dashboardSnapshotV2Cache.Get(cacheKey); ok {
|
cached, hit, err := dashboardSnapshotV2Cache.GetOrLoad(cacheKey, func() (any, error) {
|
||||||
|
return h.buildSnapshotV2Response(
|
||||||
|
c.Request.Context(),
|
||||||
|
startTime,
|
||||||
|
endTime,
|
||||||
|
granularity,
|
||||||
|
filters,
|
||||||
|
includeStats,
|
||||||
|
includeTrend,
|
||||||
|
includeModels,
|
||||||
|
includeGroups,
|
||||||
|
includeUsersTrend,
|
||||||
|
usersTrendLimit,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
response.Error(c, 500, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
if cached.ETag != "" {
|
if cached.ETag != "" {
|
||||||
c.Header("ETag", cached.ETag)
|
c.Header("ETag", cached.ETag)
|
||||||
c.Header("Vary", "If-None-Match")
|
c.Header("Vary", "If-None-Match")
|
||||||
@@ -120,11 +140,18 @@ func (h *DashboardHandler) GetSnapshotV2(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.Header("X-Snapshot-Cache", "hit")
|
c.Header("X-Snapshot-Cache", cacheStatusValue(hit))
|
||||||
response.Success(c, cached.Payload)
|
response.Success(c, cached.Payload)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *DashboardHandler) buildSnapshotV2Response(
|
||||||
|
ctx context.Context,
|
||||||
|
startTime, endTime time.Time,
|
||||||
|
granularity string,
|
||||||
|
filters *dashboardSnapshotV2Filters,
|
||||||
|
includeStats, includeTrend, includeModels, includeGroups, includeUsersTrend bool,
|
||||||
|
usersTrendLimit int,
|
||||||
|
) (*dashboardSnapshotV2Response, error) {
|
||||||
resp := &dashboardSnapshotV2Response{
|
resp := &dashboardSnapshotV2Response{
|
||||||
GeneratedAt: time.Now().UTC().Format(time.RFC3339),
|
GeneratedAt: time.Now().UTC().Format(time.RFC3339),
|
||||||
StartDate: startTime.Format("2006-01-02"),
|
StartDate: startTime.Format("2006-01-02"),
|
||||||
@@ -133,10 +160,9 @@ func (h *DashboardHandler) GetSnapshotV2(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if includeStats {
|
if includeStats {
|
||||||
stats, err := h.dashboardService.GetDashboardStats(c.Request.Context())
|
stats, err := h.dashboardService.GetDashboardStats(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get dashboard statistics")
|
return nil, errors.New("failed to get dashboard statistics")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
resp.Stats = &dashboardSnapshotV2Stats{
|
resp.Stats = &dashboardSnapshotV2Stats{
|
||||||
DashboardStats: *stats,
|
DashboardStats: *stats,
|
||||||
@@ -145,8 +171,8 @@ func (h *DashboardHandler) GetSnapshotV2(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if includeTrend {
|
if includeTrend {
|
||||||
trend, err := h.dashboardService.GetUsageTrendWithFilters(
|
trend, _, err := h.getUsageTrendCached(
|
||||||
c.Request.Context(),
|
ctx,
|
||||||
startTime,
|
startTime,
|
||||||
endTime,
|
endTime,
|
||||||
granularity,
|
granularity,
|
||||||
@@ -160,15 +186,14 @@ func (h *DashboardHandler) GetSnapshotV2(c *gin.Context) {
|
|||||||
filters.BillingType,
|
filters.BillingType,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get usage trend")
|
return nil, errors.New("failed to get usage trend")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
resp.Trend = trend
|
resp.Trend = trend
|
||||||
}
|
}
|
||||||
|
|
||||||
if includeModels {
|
if includeModels {
|
||||||
models, err := h.dashboardService.GetModelStatsWithFilters(
|
models, _, err := h.getModelStatsCached(
|
||||||
c.Request.Context(),
|
ctx,
|
||||||
startTime,
|
startTime,
|
||||||
endTime,
|
endTime,
|
||||||
filters.UserID,
|
filters.UserID,
|
||||||
@@ -180,15 +205,14 @@ func (h *DashboardHandler) GetSnapshotV2(c *gin.Context) {
|
|||||||
filters.BillingType,
|
filters.BillingType,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get model statistics")
|
return nil, errors.New("failed to get model statistics")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
resp.Models = models
|
resp.Models = models
|
||||||
}
|
}
|
||||||
|
|
||||||
if includeGroups {
|
if includeGroups {
|
||||||
groups, err := h.dashboardService.GetGroupStatsWithFilters(
|
groups, _, err := h.getGroupStatsCached(
|
||||||
c.Request.Context(),
|
ctx,
|
||||||
startTime,
|
startTime,
|
||||||
endTime,
|
endTime,
|
||||||
filters.UserID,
|
filters.UserID,
|
||||||
@@ -200,34 +224,20 @@ func (h *DashboardHandler) GetSnapshotV2(c *gin.Context) {
|
|||||||
filters.BillingType,
|
filters.BillingType,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get group statistics")
|
return nil, errors.New("failed to get group statistics")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
resp.Groups = groups
|
resp.Groups = groups
|
||||||
}
|
}
|
||||||
|
|
||||||
if includeUsersTrend {
|
if includeUsersTrend {
|
||||||
usersTrend, err := h.dashboardService.GetUserUsageTrend(
|
usersTrend, _, err := h.getUserUsageTrendCached(ctx, startTime, endTime, granularity, usersTrendLimit)
|
||||||
c.Request.Context(),
|
|
||||||
startTime,
|
|
||||||
endTime,
|
|
||||||
granularity,
|
|
||||||
usersTrendLimit,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.Error(c, 500, "Failed to get user usage trend")
|
return nil, errors.New("failed to get user usage trend")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
resp.UsersTrend = usersTrend
|
resp.UsersTrend = usersTrend
|
||||||
}
|
}
|
||||||
|
|
||||||
cached := dashboardSnapshotV2Cache.Set(cacheKey, resp)
|
return resp, nil
|
||||||
if cached.ETag != "" {
|
|
||||||
c.Header("ETag", cached.ETag)
|
|
||||||
c.Header("Vary", "If-None-Match")
|
|
||||||
}
|
|
||||||
c.Header("X-Snapshot-Cache", "miss")
|
|
||||||
response.Success(c, resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseDashboardSnapshotV2Filters(c *gin.Context) (*dashboardSnapshotV2Filters, error) {
|
func parseDashboardSnapshotV2Filters(c *gin.Context) (*dashboardSnapshotV2Filters, error) {
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/singleflight"
|
||||||
)
|
)
|
||||||
|
|
||||||
type snapshotCacheEntry struct {
|
type snapshotCacheEntry struct {
|
||||||
@@ -19,6 +21,12 @@ type snapshotCache struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
items map[string]snapshotCacheEntry
|
items map[string]snapshotCacheEntry
|
||||||
|
sf singleflight.Group
|
||||||
|
}
|
||||||
|
|
||||||
|
type snapshotCacheLoadResult struct {
|
||||||
|
Entry snapshotCacheEntry
|
||||||
|
Hit bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSnapshotCache(ttl time.Duration) *snapshotCache {
|
func newSnapshotCache(ttl time.Duration) *snapshotCache {
|
||||||
@@ -70,6 +78,41 @@ func (c *snapshotCache) Set(key string, payload any) snapshotCacheEntry {
|
|||||||
return entry
|
return entry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *snapshotCache) GetOrLoad(key string, load func() (any, error)) (snapshotCacheEntry, bool, error) {
|
||||||
|
if load == nil {
|
||||||
|
return snapshotCacheEntry{}, false, nil
|
||||||
|
}
|
||||||
|
if entry, ok := c.Get(key); ok {
|
||||||
|
return entry, true, nil
|
||||||
|
}
|
||||||
|
if c == nil || key == "" {
|
||||||
|
payload, err := load()
|
||||||
|
if err != nil {
|
||||||
|
return snapshotCacheEntry{}, false, err
|
||||||
|
}
|
||||||
|
return c.Set(key, payload), false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err, _ := c.sf.Do(key, func() (any, error) {
|
||||||
|
if entry, ok := c.Get(key); ok {
|
||||||
|
return snapshotCacheLoadResult{Entry: entry, Hit: true}, nil
|
||||||
|
}
|
||||||
|
payload, err := load()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return snapshotCacheLoadResult{Entry: c.Set(key, payload), Hit: false}, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return snapshotCacheEntry{}, false, err
|
||||||
|
}
|
||||||
|
result, ok := value.(snapshotCacheLoadResult)
|
||||||
|
if !ok {
|
||||||
|
return snapshotCacheEntry{}, false, nil
|
||||||
|
}
|
||||||
|
return result.Entry, result.Hit, nil
|
||||||
|
}
|
||||||
|
|
||||||
func buildETagFromAny(payload any) string {
|
func buildETagFromAny(payload any) string {
|
||||||
raw, err := json.Marshal(payload)
|
raw, err := json.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -3,6 +3,8 @@
|
|||||||
package admin
|
package admin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -95,6 +97,61 @@ func TestBuildETagFromAny_UnmarshalablePayload(t *testing.T) {
|
|||||||
require.Empty(t, etag)
|
require.Empty(t, etag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSnapshotCache_GetOrLoad_MissThenHit(t *testing.T) {
|
||||||
|
c := newSnapshotCache(5 * time.Second)
|
||||||
|
var loads atomic.Int32
|
||||||
|
|
||||||
|
entry, hit, err := c.GetOrLoad("key1", func() (any, error) {
|
||||||
|
loads.Add(1)
|
||||||
|
return map[string]string{"hello": "world"}, nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, hit)
|
||||||
|
require.NotEmpty(t, entry.ETag)
|
||||||
|
require.Equal(t, int32(1), loads.Load())
|
||||||
|
|
||||||
|
entry2, hit, err := c.GetOrLoad("key1", func() (any, error) {
|
||||||
|
loads.Add(1)
|
||||||
|
return map[string]string{"unexpected": "value"}, nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, hit)
|
||||||
|
require.Equal(t, entry.ETag, entry2.ETag)
|
||||||
|
require.Equal(t, int32(1), loads.Load())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSnapshotCache_GetOrLoad_ConcurrentSingleflight(t *testing.T) {
|
||||||
|
c := newSnapshotCache(5 * time.Second)
|
||||||
|
var loads atomic.Int32
|
||||||
|
start := make(chan struct{})
|
||||||
|
const callers = 8
|
||||||
|
errCh := make(chan error, callers)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(callers)
|
||||||
|
for range callers {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-start
|
||||||
|
_, _, err := c.GetOrLoad("shared", func() (any, error) {
|
||||||
|
loads.Add(1)
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
return "value", nil
|
||||||
|
})
|
||||||
|
errCh <- err
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
|
||||||
|
for err := range errCh {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, int32(1), loads.Load())
|
||||||
|
}
|
||||||
|
|
||||||
func TestParseBoolQueryWithDefault(t *testing.T) {
|
func TestParseBoolQueryWithDefault(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|||||||
Reference in New Issue
Block a user