fix: patch message_delta usage via gjson/sjson and skip on passthrough

This commit is contained in:
Seefs
2026-02-07 19:13:58 +08:00
parent bbad917101
commit 0b3a0b38d6
3 changed files with 177 additions and 147 deletions

View File

@@ -0,0 +1,111 @@
package claude
import (
"testing"
"github.com/QuantumNous/new-api/dto"
relaycommon "github.com/QuantumNous/new-api/relay/common"
"github.com/QuantumNous/new-api/setting/model_setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
)
func TestPatchClaudeMessageDeltaUsageDataPreserveUnknownFields(t *testing.T) {
originalData := `{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":53},"vendor_meta":{"trace_id":"trace_001"}}`
usage := &dto.ClaudeUsage{
InputTokens: 100,
CacheReadInputTokens: 30,
CacheCreationInputTokens: 50,
}
patchedData := patchClaudeMessageDeltaUsageData(originalData, usage)
require.Equal(t, "message_delta", gjson.Get(patchedData, "type").String())
require.Equal(t, "end_turn", gjson.Get(patchedData, "delta.stop_reason").String())
require.Equal(t, "trace_001", gjson.Get(patchedData, "vendor_meta.trace_id").String())
require.EqualValues(t, 53, gjson.Get(patchedData, "usage.output_tokens").Int())
require.EqualValues(t, 100, gjson.Get(patchedData, "usage.input_tokens").Int())
require.EqualValues(t, 30, gjson.Get(patchedData, "usage.cache_read_input_tokens").Int())
require.EqualValues(t, 50, gjson.Get(patchedData, "usage.cache_creation_input_tokens").Int())
}
func TestPatchClaudeMessageDeltaUsageDataZeroValueChecks(t *testing.T) {
originalData := `{"type":"message_delta","usage":{"output_tokens":53,"input_tokens":9,"cache_read_input_tokens":0}}`
usage := &dto.ClaudeUsage{
InputTokens: 100,
CacheReadInputTokens: 30,
CacheCreationInputTokens: 0,
}
patchedData := patchClaudeMessageDeltaUsageData(originalData, usage)
require.EqualValues(t, 9, gjson.Get(patchedData, "usage.input_tokens").Int())
require.EqualValues(t, 30, gjson.Get(patchedData, "usage.cache_read_input_tokens").Int())
assert.False(t, gjson.Get(patchedData, "usage.cache_creation_input_tokens").Exists())
}
func TestShouldSkipClaudeMessageDeltaUsagePatch(t *testing.T) {
originGlobalPassThrough := model_setting.GetGlobalSettings().PassThroughRequestEnabled
t.Cleanup(func() {
model_setting.GetGlobalSettings().PassThroughRequestEnabled = originGlobalPassThrough
})
model_setting.GetGlobalSettings().PassThroughRequestEnabled = true
assert.True(t, shouldSkipClaudeMessageDeltaUsagePatch(&relaycommon.RelayInfo{}))
model_setting.GetGlobalSettings().PassThroughRequestEnabled = false
assert.True(t, shouldSkipClaudeMessageDeltaUsagePatch(&relaycommon.RelayInfo{
ChannelMeta: &relaycommon.ChannelMeta{ChannelSetting: dto.ChannelSettings{PassThroughBodyEnabled: true}},
}))
assert.False(t, shouldSkipClaudeMessageDeltaUsagePatch(&relaycommon.RelayInfo{
ChannelMeta: &relaycommon.ChannelMeta{ChannelSetting: dto.ChannelSettings{PassThroughBodyEnabled: false}},
}))
}
func TestBuildMessageDeltaPatchUsage(t *testing.T) {
t.Run("merge missing fields from claudeInfo", func(t *testing.T) {
claudeResponse := &dto.ClaudeResponse{Usage: &dto.ClaudeUsage{OutputTokens: 53}}
claudeInfo := &ClaudeResponseInfo{
Usage: &dto.Usage{
PromptTokens: 100,
PromptTokensDetails: dto.InputTokenDetails{
CachedTokens: 30,
CachedCreationTokens: 50,
},
ClaudeCacheCreation5mTokens: 10,
ClaudeCacheCreation1hTokens: 20,
},
}
usage := buildMessageDeltaPatchUsage(claudeResponse, claudeInfo)
require.NotNil(t, usage)
require.EqualValues(t, 100, usage.InputTokens)
require.EqualValues(t, 30, usage.CacheReadInputTokens)
require.EqualValues(t, 50, usage.CacheCreationInputTokens)
require.EqualValues(t, 53, usage.OutputTokens)
require.NotNil(t, usage.CacheCreation)
require.EqualValues(t, 10, usage.CacheCreation.Ephemeral5mInputTokens)
require.EqualValues(t, 20, usage.CacheCreation.Ephemeral1hInputTokens)
})
t.Run("keep upstream non-zero values", func(t *testing.T) {
claudeResponse := &dto.ClaudeResponse{Usage: &dto.ClaudeUsage{
InputTokens: 9,
CacheReadInputTokens: 7,
CacheCreationInputTokens: 6,
}}
claudeInfo := &ClaudeResponseInfo{Usage: &dto.Usage{
PromptTokens: 100,
PromptTokensDetails: dto.InputTokenDetails{
CachedTokens: 30,
CachedCreationTokens: 50,
},
}}
usage := buildMessageDeltaPatchUsage(claudeResponse, claudeInfo)
require.EqualValues(t, 9, usage.InputTokens)
require.EqualValues(t, 7, usage.CacheReadInputTokens)
require.EqualValues(t, 6, usage.CacheCreationInputTokens)
})
}

View File

@@ -21,6 +21,8 @@ import (
"github.com/QuantumNous/new-api/types"
"github.com/gin-gonic/gin"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
const (
@@ -544,28 +546,76 @@ type ClaudeResponseInfo struct {
Done bool
}
// enrichMessageDeltaUsage 补全 message_delta 事件中缺失的 input_tokens 和 cache 相关字段
// 当上游(如 AWS Bedrock的 message_delta 不包含这些字段时,从 claudeInfo 中积累的数据补全
func enrichMessageDeltaUsage(claudeResponse *dto.ClaudeResponse, claudeInfo *ClaudeResponseInfo) {
if claudeResponse.Usage == nil {
claudeResponse.Usage = &dto.ClaudeUsage{}
func buildMessageDeltaPatchUsage(claudeResponse *dto.ClaudeResponse, claudeInfo *ClaudeResponseInfo) *dto.ClaudeUsage {
usage := &dto.ClaudeUsage{}
if claudeResponse != nil && claudeResponse.Usage != nil {
*usage = *claudeResponse.Usage
}
if claudeResponse.Usage.InputTokens == 0 && claudeInfo.Usage.PromptTokens > 0 {
claudeResponse.Usage.InputTokens = claudeInfo.Usage.PromptTokens
if claudeInfo == nil || claudeInfo.Usage == nil {
return usage
}
if claudeResponse.Usage.CacheReadInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedTokens > 0 {
claudeResponse.Usage.CacheReadInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedTokens
if usage.InputTokens == 0 && claudeInfo.Usage.PromptTokens > 0 {
usage.InputTokens = claudeInfo.Usage.PromptTokens
}
if claudeResponse.Usage.CacheCreationInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens > 0 {
claudeResponse.Usage.CacheCreationInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens
if usage.CacheReadInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedTokens > 0 {
usage.CacheReadInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedTokens
}
if claudeResponse.Usage.CacheCreation == nil &&
(claudeInfo.Usage.ClaudeCacheCreation5mTokens > 0 || claudeInfo.Usage.ClaudeCacheCreation1hTokens > 0) {
claudeResponse.Usage.CacheCreation = &dto.ClaudeCacheCreationUsage{
if usage.CacheCreationInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens > 0 {
usage.CacheCreationInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens
}
if usage.CacheCreation == nil && (claudeInfo.Usage.ClaudeCacheCreation5mTokens > 0 || claudeInfo.Usage.ClaudeCacheCreation1hTokens > 0) {
usage.CacheCreation = &dto.ClaudeCacheCreationUsage{
Ephemeral5mInputTokens: claudeInfo.Usage.ClaudeCacheCreation5mTokens,
Ephemeral1hInputTokens: claudeInfo.Usage.ClaudeCacheCreation1hTokens,
}
}
return usage
}
func shouldSkipClaudeMessageDeltaUsagePatch(info *relaycommon.RelayInfo) bool {
if model_setting.GetGlobalSettings().PassThroughRequestEnabled {
return true
}
if info == nil {
return false
}
return info.ChannelSetting.PassThroughBodyEnabled
}
func patchClaudeMessageDeltaUsageData(data string, usage *dto.ClaudeUsage) string {
if data == "" || usage == nil {
return data
}
data = setMessageDeltaUsageInt(data, "usage.input_tokens", usage.InputTokens)
data = setMessageDeltaUsageInt(data, "usage.cache_read_input_tokens", usage.CacheReadInputTokens)
data = setMessageDeltaUsageInt(data, "usage.cache_creation_input_tokens", usage.CacheCreationInputTokens)
if usage.CacheCreation != nil {
data = setMessageDeltaUsageInt(data, "usage.cache_creation.ephemeral_5m_input_tokens", usage.CacheCreation.Ephemeral5mInputTokens)
data = setMessageDeltaUsageInt(data, "usage.cache_creation.ephemeral_1h_input_tokens", usage.CacheCreation.Ephemeral1hInputTokens)
}
return data
}
func setMessageDeltaUsageInt(data string, path string, localValue int) string {
if localValue <= 0 {
return data
}
upstreamValue := gjson.Get(data, path)
if upstreamValue.Exists() && upstreamValue.Int() > 0 {
return data
}
patchedData, err := sjson.Set(data, path, localValue)
if err != nil {
return data
}
return patchedData
}
func FormatClaudeResponseInfo(claudeResponse *dto.ClaudeResponse, oaiResponse *dto.ChatCompletionsStreamResponse, claudeInfo *ClaudeResponseInfo) bool {
@@ -665,9 +715,8 @@ func HandleStreamResponseData(c *gin.Context, info *relaycommon.RelayInfo, claud
} else if claudeResponse.Type == "message_delta" {
// 确保 message_delta 的 usage 包含完整的 input_tokens 和 cache 相关字段
// 解决 AWS Bedrock 等上游返回的 message_delta 缺少这些字段的问题
enrichMessageDeltaUsage(&claudeResponse, claudeInfo)
if newData, err := json.Marshal(claudeResponse); err == nil {
data = string(newData)
if !shouldSkipClaudeMessageDeltaUsagePatch(info) {
data = patchClaudeMessageDeltaUsageData(data, buildMessageDeltaPatchUsage(&claudeResponse, claudeInfo))
}
}
helper.ClaudeChunkData(c, claudeResponse, data)

View File

@@ -173,133 +173,3 @@ func TestFormatClaudeResponseInfo_ContentBlockDelta(t *testing.T) {
t.Errorf("ResponseText = %q, want %q", claudeInfo.ResponseText.String(), "hello")
}
}
// TestEnrichMessageDeltaUsage 测试 message_delta 事件的 usage 补全逻辑
// 这是修复 issue #2881 的核心逻辑:当上游(如 Bedrock的 message_delta 缺少
// input_tokens 和 cache 相关字段时,用 claudeInfo 中积累的数据补全
func TestEnrichMessageDeltaUsage(t *testing.T) {
tests := []struct {
name string
claudeInfo *ClaudeResponseInfo
deltaUsage *dto.ClaudeUsage
wantInput int
wantCacheRead int
wantCacheCreate int
wantOutput int
want5m int
want1h int
}{
{
name: "Bedrock: delta 只有 output_tokens从 claudeInfo 补全其他字段",
claudeInfo: &ClaudeResponseInfo{
Usage: &dto.Usage{
PromptTokens: 100,
PromptTokensDetails: dto.InputTokenDetails{
CachedTokens: 30,
CachedCreationTokens: 50,
},
ClaudeCacheCreation5mTokens: 10,
ClaudeCacheCreation1hTokens: 20,
},
},
deltaUsage: &dto.ClaudeUsage{OutputTokens: 200},
wantInput: 100,
wantCacheRead: 30,
wantCacheCreate: 50,
wantOutput: 200,
want5m: 10,
want1h: 20,
},
{
name: "原生 Anthropic: delta 已包含所有字段,不覆盖",
claudeInfo: &ClaudeResponseInfo{
Usage: &dto.Usage{
PromptTokens: 100,
PromptTokensDetails: dto.InputTokenDetails{
CachedTokens: 30,
CachedCreationTokens: 50,
},
},
},
deltaUsage: &dto.ClaudeUsage{
InputTokens: 100,
OutputTokens: 200,
CacheReadInputTokens: 30,
CacheCreationInputTokens: 50,
},
wantInput: 100,
wantCacheRead: 30,
wantCacheCreate: 50,
wantOutput: 200,
},
{
name: "delta usage 为 nil创建并补全",
claudeInfo: &ClaudeResponseInfo{
Usage: &dto.Usage{
PromptTokens: 80,
PromptTokensDetails: dto.InputTokenDetails{
CachedTokens: 20,
CachedCreationTokens: 40,
},
},
},
deltaUsage: nil,
wantInput: 80,
wantCacheRead: 20,
wantCacheCreate: 40,
wantOutput: 0,
},
{
name: "没有 cache 数据,不补全",
claudeInfo: &ClaudeResponseInfo{
Usage: &dto.Usage{
PromptTokens: 100,
},
},
deltaUsage: &dto.ClaudeUsage{OutputTokens: 50},
wantInput: 100,
wantCacheRead: 0,
wantCacheCreate: 0,
wantOutput: 50,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
claudeResponse := &dto.ClaudeResponse{
Type: "message_delta",
Usage: tt.deltaUsage,
}
// 模拟 HandleStreamResponseData 中 Claude 格式的补全逻辑
enrichMessageDeltaUsage(claudeResponse, tt.claudeInfo)
if claudeResponse.Usage == nil {
t.Fatal("Usage should not be nil after enrichment")
}
if claudeResponse.Usage.InputTokens != tt.wantInput {
t.Errorf("InputTokens = %d, want %d", claudeResponse.Usage.InputTokens, tt.wantInput)
}
if claudeResponse.Usage.CacheReadInputTokens != tt.wantCacheRead {
t.Errorf("CacheReadInputTokens = %d, want %d", claudeResponse.Usage.CacheReadInputTokens, tt.wantCacheRead)
}
if claudeResponse.Usage.CacheCreationInputTokens != tt.wantCacheCreate {
t.Errorf("CacheCreationInputTokens = %d, want %d", claudeResponse.Usage.CacheCreationInputTokens, tt.wantCacheCreate)
}
if claudeResponse.Usage.OutputTokens != tt.wantOutput {
t.Errorf("OutputTokens = %d, want %d", claudeResponse.Usage.OutputTokens, tt.wantOutput)
}
if tt.want5m > 0 || tt.want1h > 0 {
if claudeResponse.Usage.CacheCreation == nil {
t.Fatal("CacheCreation should not be nil")
}
if claudeResponse.Usage.CacheCreation.Ephemeral5mInputTokens != tt.want5m {
t.Errorf("Ephemeral5mInputTokens = %d, want %d", claudeResponse.Usage.CacheCreation.Ephemeral5mInputTokens, tt.want5m)
}
if claudeResponse.Usage.CacheCreation.Ephemeral1hInputTokens != tt.want1h {
t.Errorf("Ephemeral1hInputTokens = %d, want %d", claudeResponse.Usage.CacheCreation.Ephemeral1hInputTokens, tt.want1h)
}
}
})
}
}