mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:43:35 +00:00
fix: 修复流式响应的 Parse Error 和缓冲问题
主要修改: 1. 从 compression 中间件中排除 SSE 流式响应,避免压缩导致的缓冲 2. 移除导致 Parse Error 的 res.flushHeaders() 调用 3. 改进流式响应的错误处理,发送 SSE 错误事件而不是破坏流 4. 在写入数据前检查流状态,避免写入已销毁的流 5. 优化响应结束时的处理逻辑,确保缓冲区数据正确处理 这些修改确保了流式请求能够正常显示打字机效果,同时保留了 usage token 收集功能。 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -47,11 +47,8 @@ async function handleMessagesRequest(req, res) {
|
|||||||
res.setHeader('Access-Control-Allow-Origin', '*');
|
res.setHeader('Access-Control-Allow-Origin', '*');
|
||||||
res.setHeader('X-Accel-Buffering', 'no'); // 禁用 Nginx 缓冲
|
res.setHeader('X-Accel-Buffering', 'no'); // 禁用 Nginx 缓冲
|
||||||
|
|
||||||
// 立即发送响应头,防止缓冲
|
|
||||||
res.flushHeaders();
|
|
||||||
|
|
||||||
// 禁用 Nagle 算法,确保数据立即发送
|
// 禁用 Nagle 算法,确保数据立即发送
|
||||||
if (res.socket && res.socket.setNoDelay) {
|
if (res.socket && typeof res.socket.setNoDelay === 'function') {
|
||||||
res.socket.setNoDelay(true);
|
res.socket.setNoDelay(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -680,11 +680,34 @@ class ClaudeRelayService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const req = https.request(options, (res) => {
|
const req = https.request(options, (res) => {
|
||||||
// 设置响应头
|
logger.debug(`🌊 Claude stream response status: ${res.statusCode}`);
|
||||||
responseStream.statusCode = res.statusCode;
|
|
||||||
Object.keys(res.headers).forEach(key => {
|
// 错误响应处理
|
||||||
responseStream.setHeader(key, res.headers[key]);
|
if (res.statusCode !== 200) {
|
||||||
});
|
logger.error(`❌ Claude API returned error status: ${res.statusCode}`);
|
||||||
|
let errorData = '';
|
||||||
|
|
||||||
|
res.on('data', (chunk) => {
|
||||||
|
errorData += chunk.toString();
|
||||||
|
});
|
||||||
|
|
||||||
|
res.on('end', () => {
|
||||||
|
logger.error('❌ Claude API error response:', errorData);
|
||||||
|
if (!responseStream.destroyed) {
|
||||||
|
// 发送错误事件
|
||||||
|
responseStream.write('event: error\n');
|
||||||
|
responseStream.write(`data: ${JSON.stringify({
|
||||||
|
error: 'Claude API error',
|
||||||
|
status: res.statusCode,
|
||||||
|
details: errorData,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
})}\n\n`);
|
||||||
|
responseStream.end();
|
||||||
|
}
|
||||||
|
reject(new Error(`Claude API error: ${res.statusCode}`));
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let buffer = '';
|
let buffer = '';
|
||||||
let finalUsageReported = false; // 防止重复统计的标志
|
let finalUsageReported = false; // 防止重复统计的标志
|
||||||
@@ -693,31 +716,28 @@ class ClaudeRelayService {
|
|||||||
|
|
||||||
// 监听数据块,解析SSE并寻找usage信息
|
// 监听数据块,解析SSE并寻找usage信息
|
||||||
res.on('data', (chunk) => {
|
res.on('data', (chunk) => {
|
||||||
const chunkStr = chunk.toString();
|
try {
|
||||||
|
const chunkStr = chunk.toString();
|
||||||
buffer += chunkStr;
|
|
||||||
|
buffer += chunkStr;
|
||||||
// 处理完整的SSE行
|
|
||||||
const lines = buffer.split('\n');
|
// 处理完整的SSE行
|
||||||
buffer = lines.pop() || ''; // 保留最后的不完整行
|
const lines = buffer.split('\n');
|
||||||
|
buffer = lines.pop() || ''; // 保留最后的不完整行
|
||||||
// 转发已处理的完整行到客户端
|
|
||||||
if (lines.length > 0) {
|
// 转发已处理的完整行到客户端
|
||||||
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '');
|
if (lines.length > 0 && !responseStream.destroyed) {
|
||||||
// 如果有流转换器,应用转换
|
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '');
|
||||||
if (streamTransformer) {
|
// 如果有流转换器,应用转换
|
||||||
const transformed = streamTransformer(linesToForward);
|
if (streamTransformer) {
|
||||||
if (transformed) {
|
const transformed = streamTransformer(linesToForward);
|
||||||
responseStream.write(transformed);
|
if (transformed) {
|
||||||
// 立即刷新数据,确保实时发送
|
responseStream.write(transformed);
|
||||||
if (responseStream.flush) responseStream.flush();
|
}
|
||||||
|
} else {
|
||||||
|
responseStream.write(linesToForward);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
responseStream.write(linesToForward);
|
|
||||||
// 立即刷新数据,确保实时发送
|
|
||||||
if (responseStream.flush) responseStream.flush();
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
// 解析SSE数据寻找usage信息
|
// 解析SSE数据寻找usage信息
|
||||||
@@ -764,21 +784,41 @@ class ClaudeRelayService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Error processing stream data:', error);
|
||||||
|
// 发送错误但不破坏流,让它自然结束
|
||||||
|
if (!responseStream.destroyed) {
|
||||||
|
responseStream.write('event: error\n');
|
||||||
|
responseStream.write(`data: ${JSON.stringify({
|
||||||
|
error: 'Stream processing error',
|
||||||
|
message: error.message,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
})}\n\n`);
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
res.on('end', async () => {
|
res.on('end', async () => {
|
||||||
// 处理缓冲区中剩余的数据
|
try {
|
||||||
if (buffer.trim()) {
|
// 处理缓冲区中剩余的数据
|
||||||
if (streamTransformer) {
|
if (buffer.trim() && !responseStream.destroyed) {
|
||||||
const transformed = streamTransformer(buffer);
|
if (streamTransformer) {
|
||||||
if (transformed) {
|
const transformed = streamTransformer(buffer);
|
||||||
responseStream.write(transformed);
|
if (transformed) {
|
||||||
|
responseStream.write(transformed);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
responseStream.write(buffer);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
responseStream.write(buffer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 确保流正确结束
|
||||||
|
if (!responseStream.destroyed) {
|
||||||
|
responseStream.end();
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Error processing stream end:', error);
|
||||||
}
|
}
|
||||||
responseStream.end();
|
|
||||||
|
|
||||||
// 检查是否捕获到usage数据
|
// 检查是否捕获到usage数据
|
||||||
if (!finalUsageReported) {
|
if (!finalUsageReported) {
|
||||||
|
|||||||
Reference in New Issue
Block a user