Reduce ops and scheduler write amplification

This commit is contained in:
ius
2026-03-11 17:32:00 +08:00
parent 7455476c60
commit f740d2c291
8 changed files with 499 additions and 106 deletions

View File

@@ -7,6 +7,7 @@ import (
type OpsRepository interface {
InsertErrorLog(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error)
BatchInsertErrorLogs(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error)
ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error)
GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error)
ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) ([]*OpsRequestDetail, int64, error)

View File

@@ -7,6 +7,8 @@ import (
// opsRepoMock is a test-only OpsRepository implementation with optional function hooks.
type opsRepoMock struct {
InsertErrorLogFn func(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error)
BatchInsertErrorLogsFn func(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error)
BatchInsertSystemLogsFn func(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error)
ListSystemLogsFn func(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error)
DeleteSystemLogsFn func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error)
@@ -14,9 +16,19 @@ type opsRepoMock struct {
}
func (m *opsRepoMock) InsertErrorLog(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) {
if m.InsertErrorLogFn != nil {
return m.InsertErrorLogFn(ctx, input)
}
return 0, nil
}
func (m *opsRepoMock) BatchInsertErrorLogs(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) {
if m.BatchInsertErrorLogsFn != nil {
return m.BatchInsertErrorLogsFn(ctx, inputs)
}
return int64(len(inputs)), nil
}
func (m *opsRepoMock) ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error) {
return &OpsErrorLogList{Errors: []*OpsErrorLog{}, Page: 1, PageSize: 20}, nil
}

View File

@@ -121,14 +121,74 @@ func (s *OpsService) IsMonitoringEnabled(ctx context.Context) bool {
}
func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) error {
if entry == nil {
prepared, ok, err := s.prepareErrorLogInput(ctx, entry, rawRequestBody)
if err != nil {
log.Printf("[Ops] RecordError prepare failed: %v", err)
return err
}
if !ok {
return nil
}
if _, err := s.opsRepo.InsertErrorLog(ctx, prepared); err != nil {
// Never bubble up to gateway; best-effort logging.
log.Printf("[Ops] RecordError failed: %v", err)
return err
}
return nil
}
func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertErrorLogInput) error {
if len(entries) == 0 {
return nil
}
prepared := make([]*OpsInsertErrorLogInput, 0, len(entries))
for _, entry := range entries {
item, ok, err := s.prepareErrorLogInput(ctx, entry, nil)
if err != nil {
log.Printf("[Ops] RecordErrorBatch prepare failed: %v", err)
continue
}
if ok {
prepared = append(prepared, item)
}
}
if len(prepared) == 0 {
return nil
}
if len(prepared) == 1 {
_, err := s.opsRepo.InsertErrorLog(ctx, prepared[0])
if err != nil {
log.Printf("[Ops] RecordErrorBatch single insert failed: %v", err)
}
return err
}
if _, err := s.opsRepo.BatchInsertErrorLogs(ctx, prepared); err != nil {
log.Printf("[Ops] RecordErrorBatch failed, fallback to single inserts: %v", err)
var firstErr error
for _, entry := range prepared {
if _, insertErr := s.opsRepo.InsertErrorLog(ctx, entry); insertErr != nil {
log.Printf("[Ops] RecordErrorBatch fallback insert failed: %v", insertErr)
if firstErr == nil {
firstErr = insertErr
}
}
}
return firstErr
}
return nil
}
func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) (*OpsInsertErrorLogInput, bool, error) {
if entry == nil {
return nil, false, nil
}
if !s.IsMonitoringEnabled(ctx) {
return nil
return nil, false, nil
}
if s.opsRepo == nil {
return nil
return nil, false, nil
}
// Ensure timestamps are always populated.
@@ -185,85 +245,88 @@ func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogIn
}
}
// Sanitize + serialize upstream error events list.
if len(entry.UpstreamErrors) > 0 {
const maxEvents = 32
events := entry.UpstreamErrors
if len(events) > maxEvents {
events = events[len(events)-maxEvents:]
if err := sanitizeOpsUpstreamErrors(entry); err != nil {
return nil, false, err
}
return entry, true, nil
}
func sanitizeOpsUpstreamErrors(entry *OpsInsertErrorLogInput) error {
if entry == nil || len(entry.UpstreamErrors) == 0 {
return nil
}
const maxEvents = 32
events := entry.UpstreamErrors
if len(events) > maxEvents {
events = events[len(events)-maxEvents:]
}
sanitized := make([]*OpsUpstreamErrorEvent, 0, len(events))
for _, ev := range events {
if ev == nil {
continue
}
out := *ev
out.Platform = strings.TrimSpace(out.Platform)
out.UpstreamRequestID = truncateString(strings.TrimSpace(out.UpstreamRequestID), 128)
out.Kind = truncateString(strings.TrimSpace(out.Kind), 64)
if out.AccountID < 0 {
out.AccountID = 0
}
if out.UpstreamStatusCode < 0 {
out.UpstreamStatusCode = 0
}
if out.AtUnixMs < 0 {
out.AtUnixMs = 0
}
sanitized := make([]*OpsUpstreamErrorEvent, 0, len(events))
for _, ev := range events {
if ev == nil {
continue
}
out := *ev
msg := sanitizeUpstreamErrorMessage(strings.TrimSpace(out.Message))
msg = truncateString(msg, 2048)
out.Message = msg
out.Platform = strings.TrimSpace(out.Platform)
out.UpstreamRequestID = truncateString(strings.TrimSpace(out.UpstreamRequestID), 128)
out.Kind = truncateString(strings.TrimSpace(out.Kind), 64)
detail := strings.TrimSpace(out.Detail)
if detail != "" {
// Keep upstream detail small; request bodies are not stored here, only upstream error payloads.
sanitizedDetail, _ := sanitizeErrorBodyForStorage(detail, opsMaxStoredErrorBodyBytes)
out.Detail = sanitizedDetail
} else {
out.Detail = ""
}
if out.AccountID < 0 {
out.AccountID = 0
}
if out.UpstreamStatusCode < 0 {
out.UpstreamStatusCode = 0
}
if out.AtUnixMs < 0 {
out.AtUnixMs = 0
}
msg := sanitizeUpstreamErrorMessage(strings.TrimSpace(out.Message))
msg = truncateString(msg, 2048)
out.Message = msg
detail := strings.TrimSpace(out.Detail)
if detail != "" {
// Keep upstream detail small; request bodies are not stored here, only upstream error payloads.
sanitizedDetail, _ := sanitizeErrorBodyForStorage(detail, opsMaxStoredErrorBodyBytes)
out.Detail = sanitizedDetail
} else {
out.Detail = ""
}
out.UpstreamRequestBody = strings.TrimSpace(out.UpstreamRequestBody)
if out.UpstreamRequestBody != "" {
// Reuse the same sanitization/trimming strategy as request body storage.
// Keep it small so it is safe to persist in ops_error_logs JSON.
sanitized, truncated, _ := sanitizeAndTrimRequestBody([]byte(out.UpstreamRequestBody), 10*1024)
if sanitized != "" {
out.UpstreamRequestBody = sanitized
if truncated {
out.Kind = strings.TrimSpace(out.Kind)
if out.Kind == "" {
out.Kind = "upstream"
}
out.Kind = out.Kind + ":request_body_truncated"
out.UpstreamRequestBody = strings.TrimSpace(out.UpstreamRequestBody)
if out.UpstreamRequestBody != "" {
// Reuse the same sanitization/trimming strategy as request body storage.
// Keep it small so it is safe to persist in ops_error_logs JSON.
sanitizedBody, truncated, _ := sanitizeAndTrimRequestBody([]byte(out.UpstreamRequestBody), 10*1024)
if sanitizedBody != "" {
out.UpstreamRequestBody = sanitizedBody
if truncated {
out.Kind = strings.TrimSpace(out.Kind)
if out.Kind == "" {
out.Kind = "upstream"
}
} else {
out.UpstreamRequestBody = ""
out.Kind = out.Kind + ":request_body_truncated"
}
} else {
out.UpstreamRequestBody = ""
}
// Drop fully-empty events (can happen if only status code was known).
if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" {
continue
}
evCopy := out
sanitized = append(sanitized, &evCopy)
}
entry.UpstreamErrorsJSON = marshalOpsUpstreamErrors(sanitized)
entry.UpstreamErrors = nil
// Drop fully-empty events (can happen if only status code was known).
if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" {
continue
}
evCopy := out
sanitized = append(sanitized, &evCopy)
}
if _, err := s.opsRepo.InsertErrorLog(ctx, entry); err != nil {
// Never bubble up to gateway; best-effort logging.
log.Printf("[Ops] RecordError failed: %v", err)
return err
}
entry.UpstreamErrorsJSON = marshalOpsUpstreamErrors(sanitized)
entry.UpstreamErrors = nil
return nil
}

View File

@@ -0,0 +1,103 @@
package service
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestOpsServiceRecordErrorBatch_SanitizesAndBatches(t *testing.T) {
t.Parallel()
var captured []*OpsInsertErrorLogInput
repo := &opsRepoMock{
BatchInsertErrorLogsFn: func(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) {
captured = append(captured, inputs...)
return int64(len(inputs)), nil
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := " upstream failed: https://example.com?access_token=secret-value "
detail := `{"authorization":"Bearer secret-token"}`
entries := []*OpsInsertErrorLogInput{
{
ErrorBody: `{"error":"bad","access_token":"secret"}`,
UpstreamStatusCode: intPtr(-10),
UpstreamErrorMessage: strPtr(msg),
UpstreamErrorDetail: strPtr(detail),
UpstreamErrors: []*OpsUpstreamErrorEvent{
{
AccountID: -2,
UpstreamStatusCode: 429,
Message: " token leaked ",
Detail: `{"refresh_token":"secret"}`,
UpstreamRequestBody: `{"api_key":"secret","messages":[{"role":"user","content":"hello"}]}`,
},
},
},
{
ErrorPhase: "upstream",
ErrorType: "upstream_error",
CreatedAt: time.Now().UTC(),
},
}
require.NoError(t, svc.RecordErrorBatch(context.Background(), entries))
require.Len(t, captured, 2)
first := captured[0]
require.Equal(t, "internal", first.ErrorPhase)
require.Equal(t, "api_error", first.ErrorType)
require.Nil(t, first.UpstreamStatusCode)
require.NotNil(t, first.UpstreamErrorMessage)
require.NotContains(t, *first.UpstreamErrorMessage, "secret-value")
require.Contains(t, *first.UpstreamErrorMessage, "access_token=***")
require.NotNil(t, first.UpstreamErrorDetail)
require.NotContains(t, *first.UpstreamErrorDetail, "secret-token")
require.NotContains(t, first.ErrorBody, "secret")
require.Nil(t, first.UpstreamErrors)
require.NotNil(t, first.UpstreamErrorsJSON)
require.NotContains(t, *first.UpstreamErrorsJSON, "secret")
require.Contains(t, *first.UpstreamErrorsJSON, "[REDACTED]")
second := captured[1]
require.Equal(t, "upstream", second.ErrorPhase)
require.Equal(t, "upstream_error", second.ErrorType)
require.False(t, second.CreatedAt.IsZero())
}
func TestOpsServiceRecordErrorBatch_FallsBackToSingleInsert(t *testing.T) {
t.Parallel()
var (
batchCalls int
singleCalls int
)
repo := &opsRepoMock{
BatchInsertErrorLogsFn: func(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) {
batchCalls++
return 0, errors.New("batch failed")
},
InsertErrorLogFn: func(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) {
singleCalls++
return int64(singleCalls), nil
},
}
svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
err := svc.RecordErrorBatch(context.Background(), []*OpsInsertErrorLogInput{
{ErrorMessage: "first"},
{ErrorMessage: "second"},
})
require.NoError(t, err)
require.Equal(t, 1, batchCalls)
require.Equal(t, 2, singleCalls)
}
func strPtr(v string) *string {
return &v
}