diff --git a/src/routes/api.js b/src/routes/api.js index ddf92714..8f38347b 100644 --- a/src/routes/api.js +++ b/src/routes/api.js @@ -47,11 +47,8 @@ async function handleMessagesRequest(req, res) { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('X-Accel-Buffering', 'no'); // 禁用 Nginx 缓冲 - // 立即发送响应头,防止缓冲 - res.flushHeaders(); - // 禁用 Nagle 算法,确保数据立即发送 - if (res.socket && res.socket.setNoDelay) { + if (res.socket && typeof res.socket.setNoDelay === 'function') { res.socket.setNoDelay(true); } diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index fbf15247..76585547 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -680,11 +680,34 @@ class ClaudeRelayService { } const req = https.request(options, (res) => { - // 设置响应头 - responseStream.statusCode = res.statusCode; - Object.keys(res.headers).forEach(key => { - responseStream.setHeader(key, res.headers[key]); - }); + logger.debug(`🌊 Claude stream response status: ${res.statusCode}`); + + // 错误响应处理 + if (res.statusCode !== 200) { + logger.error(`❌ Claude API returned error status: ${res.statusCode}`); + let errorData = ''; + + res.on('data', (chunk) => { + errorData += chunk.toString(); + }); + + res.on('end', () => { + logger.error('❌ Claude API error response:', errorData); + if (!responseStream.destroyed) { + // 发送错误事件 + responseStream.write('event: error\n'); + responseStream.write(`data: ${JSON.stringify({ + error: 'Claude API error', + status: res.statusCode, + details: errorData, + timestamp: new Date().toISOString() + })}\n\n`); + responseStream.end(); + } + reject(new Error(`Claude API error: ${res.statusCode}`)); + }); + return; + } let buffer = ''; let finalUsageReported = false; // 防止重复统计的标志 @@ -693,31 +716,28 @@ class ClaudeRelayService { // 监听数据块,解析SSE并寻找usage信息 res.on('data', (chunk) => { - const chunkStr = chunk.toString(); - - buffer += chunkStr; - - // 处理完整的SSE行 - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; // 保留最后的不完整行 - - // 转发已处理的完整行到客户端 - if (lines.length > 0) { - const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : ''); - // 如果有流转换器,应用转换 - if (streamTransformer) { - const transformed = streamTransformer(linesToForward); - if (transformed) { - responseStream.write(transformed); - // 立即刷新数据,确保实时发送 - if (responseStream.flush) responseStream.flush(); + try { + const chunkStr = chunk.toString(); + + buffer += chunkStr; + + // 处理完整的SSE行 + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; // 保留最后的不完整行 + + // 转发已处理的完整行到客户端 + if (lines.length > 0 && !responseStream.destroyed) { + const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : ''); + // 如果有流转换器,应用转换 + if (streamTransformer) { + const transformed = streamTransformer(linesToForward); + if (transformed) { + responseStream.write(transformed); + } + } else { + responseStream.write(linesToForward); } - } else { - responseStream.write(linesToForward); - // 立即刷新数据,确保实时发送 - if (responseStream.flush) responseStream.flush(); } - } for (const line of lines) { // 解析SSE数据寻找usage信息 @@ -764,21 +784,41 @@ class ClaudeRelayService { } } } + } catch (error) { + logger.error('❌ Error processing stream data:', error); + // 发送错误但不破坏流,让它自然结束 + if (!responseStream.destroyed) { + responseStream.write('event: error\n'); + responseStream.write(`data: ${JSON.stringify({ + error: 'Stream processing error', + message: error.message, + timestamp: new Date().toISOString() + })}\n\n`); + } + } }); res.on('end', async () => { - // 处理缓冲区中剩余的数据 - if (buffer.trim()) { - if (streamTransformer) { - const transformed = streamTransformer(buffer); - if (transformed) { - responseStream.write(transformed); + try { + // 处理缓冲区中剩余的数据 + if (buffer.trim() && !responseStream.destroyed) { + if (streamTransformer) { + const transformed = streamTransformer(buffer); + if (transformed) { + responseStream.write(transformed); + } + } else { + responseStream.write(buffer); } - } else { - responseStream.write(buffer); } + + // 确保流正确结束 + if (!responseStream.destroyed) { + responseStream.end(); + } + } catch (error) { + logger.error('❌ Error processing stream end:', error); } - responseStream.end(); // 检查是否捕获到usage数据 if (!finalUsageReported) {