mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-02 13:47:15 +00:00
fix(slack-stream): re-deliver full accumulated text on mid-stream failure
When appendSlackStream throws for a later payload, the fallback was calling
deliverNormally(payload, ...) with only the current chunk — dropping all text
from earlier payloads that was already live in the stream message.
dispatchReplyFromConfig can emit multiple final payloads per turn (it
iterates the replies array), so a mid-stream Slack API error could silently
truncate the visible answer.
Fix: accumulate all successfully-streamed text in streamedText (updated after
each successful startSlackStream / appendSlackStream). On failure, re-deliver
{ ...payload, text: streamedText + current chunk } so the user always gets
the complete content. The finalizer fallback (stopSlackStream failure) also
uses streamedText for the same reason.
This commit is contained in:
@@ -222,6 +222,11 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
});
|
});
|
||||||
let streamSession: SlackStreamSession | null = null;
|
let streamSession: SlackStreamSession | null = null;
|
||||||
let streamFailed = false;
|
let streamFailed = false;
|
||||||
|
// Accumulates all text that has been successfully flushed into the Slack
|
||||||
|
// stream message. Used to reconstruct a complete fallback reply if the
|
||||||
|
// stream fails mid-turn or stopSlackStream fails at finalization — so the
|
||||||
|
// user always receives the full answer, not just the most recent chunk.
|
||||||
|
let streamedText = "";
|
||||||
let lastStreamPayload: ReplyPayload | null = null;
|
let lastStreamPayload: ReplyPayload | null = null;
|
||||||
let usedReplyThreadTs: string | undefined;
|
let usedReplyThreadTs: string | undefined;
|
||||||
|
|
||||||
@@ -250,8 +255,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
await deliverNormally(payload, streamSession?.threadTs);
|
await deliverNormally(payload, streamSession?.threadTs);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Track the last payload so the stream finalizer can fall back to normal
|
// Track the last payload for metadata (thread ts, media, etc.) and
|
||||||
// delivery if stopSlackStream fails after all content has been streamed.
|
// accumulate its text so a mid-stream failure can re-deliver the complete
|
||||||
|
// answer rather than only the failing chunk.
|
||||||
lastStreamPayload = payload;
|
lastStreamPayload = payload;
|
||||||
|
|
||||||
const text = payload.text.trim();
|
const text = payload.text.trim();
|
||||||
@@ -277,6 +283,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
teamId: ctx.teamId,
|
teamId: ctx.teamId,
|
||||||
userId: message.user,
|
userId: message.user,
|
||||||
});
|
});
|
||||||
|
// Record text that is now live in the Slack stream message.
|
||||||
|
streamedText = text;
|
||||||
usedReplyThreadTs ??= streamThreadTs;
|
usedReplyThreadTs ??= streamThreadTs;
|
||||||
replyPlan.markSent();
|
replyPlan.markSent();
|
||||||
return;
|
return;
|
||||||
@@ -286,6 +294,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
session: streamSession,
|
session: streamSession,
|
||||||
text: "\n" + text,
|
text: "\n" + text,
|
||||||
});
|
});
|
||||||
|
// Record text that was successfully appended to the stream message.
|
||||||
|
streamedText += "\n" + text;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
runtime.error?.(
|
runtime.error?.(
|
||||||
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
|
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
|
||||||
@@ -317,7 +327,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs);
|
// Re-deliver the full content: everything already in the stream message
|
||||||
|
// plus the current payload that failed to append. Using only `payload`
|
||||||
|
// here would drop all previously-streamed text.
|
||||||
|
const fallbackText = streamedText ? `${streamedText}\n${text}` : text;
|
||||||
|
await deliverNormally(
|
||||||
|
{ ...payload, text: fallbackText },
|
||||||
|
streamSession?.threadTs ?? plannedThreadTs,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -514,10 +531,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Fall back to normal delivery so the user gets a response even when
|
// Fall back to normal delivery with the full accumulated streamed text
|
||||||
// the stream could not be finalized.
|
// so the user receives the complete answer even when stop() fails.
|
||||||
if (lastStreamPayload) {
|
if (lastStreamPayload && streamedText) {
|
||||||
await deliverNormally(lastStreamPayload, finalStream.threadTs);
|
await deliverNormally(
|
||||||
|
{ ...lastStreamPayload, text: streamedText },
|
||||||
|
finalStream.threadTs,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user