mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 05:54:43 +00:00
fix(gateway): flush only pending delta tail before emitChatFinal
- Send text.slice(lastBroadcastLen) instead of full accumulated text - Prevents duplication for naive concat clients - Incorporates silent lead fragment + heartbeat streaming guards - Update test to assert on content correctness, not emission count
This commit is contained in:
@@ -548,7 +548,16 @@ describe("agent event handler", () => {
|
||||
|
||||
emitLifecycleEnd(handler, "run-heartbeat-alert");
|
||||
|
||||
const payload = expectSingleFinalChatPayload(broadcast) as {
|
||||
// The flush-before-final path may emit a delta with the pending tail
|
||||
// followed by the authoritative final. Assert on content, not count.
|
||||
const chatCalls = chatBroadcastCalls(broadcast);
|
||||
expect(chatCalls.length).toBeGreaterThanOrEqual(1);
|
||||
|
||||
const finalCall = chatCalls.find(
|
||||
(c) => (c[1] as { state?: string }).state === "final",
|
||||
);
|
||||
expect(finalCall).toBeDefined();
|
||||
const payload = finalCall![1] as {
|
||||
message?: { content?: Array<{ text?: string }> };
|
||||
};
|
||||
expect(payload.message?.content?.[0]?.text).toBe(
|
||||
|
||||
@@ -358,13 +358,24 @@ export function createAgentEventHandler({
|
||||
const text = normalizedHeartbeatText.text.trim();
|
||||
const shouldSuppressSilent =
|
||||
normalizedHeartbeatText.suppress || isSilentReplyText(text, SILENT_REPLY_TOKEN);
|
||||
const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text);
|
||||
const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput(
|
||||
clientRunId,
|
||||
sourceRunId,
|
||||
);
|
||||
// Flush any throttled delta so streaming clients receive the complete text
|
||||
// before the final event. The 150 ms throttle in emitChatDelta may have
|
||||
// suppressed the most recent chunk, leaving the client with stale text.
|
||||
// Only flush if the buffer has grown since the last broadcast to avoid duplicates.
|
||||
if (text && !shouldSuppressSilent) {
|
||||
if (
|
||||
text &&
|
||||
!shouldSuppressSilent &&
|
||||
!shouldSuppressSilentLeadFragment &&
|
||||
!shouldSuppressHeartbeatStreaming
|
||||
) {
|
||||
const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0;
|
||||
if (text.length > lastBroadcastLen) {
|
||||
const pendingTail = text.slice(lastBroadcastLen);
|
||||
const flushPayload = {
|
||||
runId: clientRunId,
|
||||
sessionKey,
|
||||
@@ -372,12 +383,13 @@ export function createAgentEventHandler({
|
||||
state: "delta" as const,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text }],
|
||||
content: [{ type: "text", text: pendingTail }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
};
|
||||
broadcast("chat", flushPayload, { dropIfSlow: true });
|
||||
nodeSendToSession(sessionKey, "chat", flushPayload);
|
||||
chatRunState.deltaLastBroadcastLen.set(clientRunId, text.length);
|
||||
}
|
||||
}
|
||||
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
|
||||
|
||||
Reference in New Issue
Block a user