fix: 修复流式响应缓冲问题,实现真正的实时流传输

- 配置 compression 中间件排除 SSE 流式响应,避免压缩导致的缓冲
- 添加 X-Accel-Buffering: no 响应头,禁用 Nginx 等代理的缓冲
- 使用 res.flushHeaders() 立即发送响应头
- 禁用 Nagle 算法确保数据立即发送
- 在每次写入流数据后调用 flush() 确保实时传输

这些修复确保了流式请求能够正常显示打字机效果,数据从上游 Claude API 接收后能够立即转发给客户端。

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
shaw
2025-07-23 15:56:27 +08:00
parent 2f4730baba
commit 1e372dd365
3 changed files with 79 additions and 39 deletions

View File

@@ -64,8 +64,17 @@ class Application {
this.app.use(corsMiddleware); this.app.use(corsMiddleware);
} }
// 📦 压缩 // 📦 压缩 - 排除流式响应SSE
this.app.use(compression()); this.app.use(compression({
filter: (req, res) => {
// 不压缩 Server-Sent Events
if (res.getHeader('Content-Type') === 'text/event-stream') {
return false;
}
// 使用默认的压缩判断
return compression.filter(req, res);
}
}));
// 🚦 全局速率限制(仅在生产环境启用) // 🚦 全局速率限制(仅在生产环境启用)
if (process.env.NODE_ENV === 'production') { if (process.env.NODE_ENV === 'production') {

View File

@@ -45,6 +45,15 @@ async function handleMessagesRequest(req, res) {
res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('X-Accel-Buffering', 'no'); // 禁用 Nginx 缓冲
// 立即发送响应头,防止缓冲
res.flushHeaders();
// 禁用 Nagle 算法,确保数据立即发送
if (res.socket && res.socket.setNoDelay) {
res.socket.setNoDelay(true);
}
// 流式响应不需要额外处理,中间件已经设置了监听器 // 流式响应不需要额外处理,中间件已经设置了监听器

View File

@@ -36,17 +36,22 @@ class ClaudeRelayService {
_hasClaudeCodeSystemPrompt(requestBody) { _hasClaudeCodeSystemPrompt(requestBody) {
if (!requestBody || !requestBody.system) return false; if (!requestBody || !requestBody.system) return false;
let systemText = ''; // 如果是字符串格式,一定不是真实的 Claude Code 请求
if (typeof requestBody.system === 'string') { if (typeof requestBody.system === 'string') {
systemText = requestBody.system; return false;
} else if (Array.isArray(requestBody.system)) { }
systemText = requestBody.system
.filter(item => item && item.type === 'text' && item.text) // 处理数组格式
.map(item => item.text) if (Array.isArray(requestBody.system) && requestBody.system.length > 0) {
.join(' '); const firstItem = requestBody.system[0];
// 检查第一个元素是否包含 Claude Code 提示词
return firstItem &&
firstItem.type === 'text' &&
firstItem.text &&
firstItem.text === this.claudeCodeSystemPrompt;
} }
return systemText.includes(this.claudeCodeSystemPrompt); return false;
} }
// 🚀 转发请求到Claude API // 🚀 转发请求到Claude API
@@ -203,24 +208,47 @@ class ClaudeRelayService {
if (!isRealClaudeCode) { if (!isRealClaudeCode) {
const claudeCodePrompt = { const claudeCodePrompt = {
type: 'text', type: 'text',
text: this.claudeCodeSystemPrompt text: this.claudeCodeSystemPrompt,
cache_control: {
type: 'ephemeral'
}
}; };
if (processedBody.system) { if (processedBody.system) {
if (Array.isArray(processedBody.system)) { if (typeof processedBody.system === 'string') {
// 检查是否已经有 Claude Code 系统提示词 // 字符串格式:转换为数组,Claude Code 提示词在第一位
const hasClaudeCodePrompt = processedBody.system.some(item => const userSystemPrompt = {
item && item.text && item.text.includes(this.claudeCodeSystemPrompt) type: 'text',
); text: processedBody.system
};
// 如果用户的提示词与 Claude Code 提示词相同,只保留一个
if (processedBody.system.trim() === this.claudeCodeSystemPrompt) {
processedBody.system = [claudeCodePrompt];
} else {
processedBody.system = [claudeCodePrompt, userSystemPrompt];
}
} else if (Array.isArray(processedBody.system)) {
// 检查第一个元素是否是 Claude Code 系统提示词
const firstItem = processedBody.system[0];
const isFirstItemClaudeCode = firstItem &&
firstItem.type === 'text' &&
firstItem.text === this.claudeCodeSystemPrompt;
if (!hasClaudeCodePrompt) { if (!isFirstItemClaudeCode) {
// 添加 Claude Code 系统提示词到开头 // 如果第一个不是 Claude Code 提示词,需要在开头插入
processedBody.system.unshift(claudeCodePrompt); // 同时检查数组中是否有其他位置包含 Claude Code 提示词,如果有则移除
const filteredSystem = processedBody.system.filter(item =>
!(item && item.type === 'text' && item.text === this.claudeCodeSystemPrompt)
);
processedBody.system = [claudeCodePrompt, ...filteredSystem];
} }
} else { } else {
throw new Error('system field must be an array'); // 其他格式,记录警告但不抛出错误,尝试处理
logger.warn('⚠️ Unexpected system field type:', typeof processedBody.system);
processedBody.system = [claudeCodePrompt];
} }
} else { } else {
// 用户没有传递 system需要添加 Claude Code 提示词
processedBody.system = [claudeCodePrompt]; processedBody.system = [claudeCodePrompt];
} }
} }
@@ -232,27 +260,17 @@ class ClaudeRelayService {
text: this.systemPrompt text: this.systemPrompt
}; };
if (processedBody.system) { // 经过上面的处理system 现在应该总是数组格式
if (Array.isArray(processedBody.system)) { if (processedBody.system && Array.isArray(processedBody.system)) {
// 如果system数组存在但为空或者没有有效内容则添加系统提示 // 不要重复添加相同的系统提示
const hasValidContent = processedBody.system.some(item => const hasSystemPrompt = processedBody.system.some(item =>
item && item.text && item.text.trim() item && item.text && item.text === this.systemPrompt
); );
if (!hasValidContent) { if (!hasSystemPrompt) {
processedBody.system = [systemPrompt]; processedBody.system.push(systemPrompt);
} else {
// 不要重复添加相同的系统提示
const hasSystemPrompt = processedBody.system.some(item =>
item && item.text && item.text === this.systemPrompt
);
if (!hasSystemPrompt) {
processedBody.system.push(systemPrompt);
}
}
} else {
throw new Error('system field must be an array');
} }
} else { } else {
// 理论上不应该走到这里,但为了安全起见
processedBody.system = [systemPrompt]; processedBody.system = [systemPrompt];
} }
} else { } else {
@@ -691,9 +709,13 @@ class ClaudeRelayService {
const transformed = streamTransformer(linesToForward); const transformed = streamTransformer(linesToForward);
if (transformed) { if (transformed) {
responseStream.write(transformed); responseStream.write(transformed);
// 立即刷新数据,确保实时发送
if (responseStream.flush) responseStream.flush();
} }
} else { } else {
responseStream.write(linesToForward); responseStream.write(linesToForward);
// 立即刷新数据,确保实时发送
if (responseStream.flush) responseStream.flush();
} }
} }