mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 08:12:43 +00:00
fix: refactor TUI stream assembly (#1202, thanks @aaronveklabs)
Co-authored-by: Aaron <aaron@vektor-labs.com>
This commit is contained in:
@@ -1,12 +1,7 @@
|
||||
import type { TUI } from "@mariozechner/pi-tui";
|
||||
import type { ChatLog } from "./components/chat-log.js";
|
||||
import {
|
||||
asString,
|
||||
extractTextFromMessage,
|
||||
extractThinkingFromMessage,
|
||||
extractContentFromMessage,
|
||||
resolveFinalAssistantText,
|
||||
} from "./tui-formatters.js";
|
||||
import { asString } from "./tui-formatters.js";
|
||||
import { TuiStreamAssembler } from "./tui-stream-assembler.js";
|
||||
import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
|
||||
|
||||
type EventHandlerContext = {
|
||||
@@ -17,25 +12,14 @@ type EventHandlerContext = {
|
||||
refreshSessionInfo?: () => Promise<void>;
|
||||
};
|
||||
|
||||
/**
|
||||
* Per-run stream buffer for tracking thinking/content separately.
|
||||
* Enables proper sequencing regardless of network arrival order.
|
||||
*/
|
||||
interface RunStreamBuffer {
|
||||
thinkingText: string;
|
||||
contentText: string;
|
||||
lastUpdateMs: number;
|
||||
}
|
||||
|
||||
export function createEventHandlers(context: EventHandlerContext) {
|
||||
const { chatLog, tui, state, setActivityStatus, refreshSessionInfo } = context;
|
||||
const finalizedRuns = new Map<string, number>();
|
||||
// FIXED: Per-run stream buffers for proper isolation
|
||||
const runBuffers = new Map<string, RunStreamBuffer>();
|
||||
const streamAssembler = new TuiStreamAssembler();
|
||||
|
||||
const noteFinalizedRun = (runId: string) => {
|
||||
finalizedRuns.set(runId, Date.now());
|
||||
runBuffers.delete(runId); // Clean up buffer
|
||||
streamAssembler.drop(runId);
|
||||
if (finalizedRuns.size <= 200) return;
|
||||
const keepUntil = Date.now() - 10 * 60 * 1000;
|
||||
for (const [key, ts] of finalizedRuns) {
|
||||
@@ -50,22 +34,6 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Get or create a stream buffer for a specific runId.
|
||||
*/
|
||||
const getOrCreateBuffer = (runId: string): RunStreamBuffer => {
|
||||
let buffer = runBuffers.get(runId);
|
||||
if (!buffer) {
|
||||
buffer = {
|
||||
thinkingText: "",
|
||||
contentText: "",
|
||||
lastUpdateMs: Date.now(),
|
||||
};
|
||||
runBuffers.set(runId, buffer);
|
||||
}
|
||||
return buffer;
|
||||
};
|
||||
|
||||
const handleChatEvent = (payload: unknown) => {
|
||||
if (!payload || typeof payload !== "object") return;
|
||||
const evt = payload as ChatEvent;
|
||||
@@ -75,33 +43,9 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
if (evt.state === "final") return;
|
||||
}
|
||||
if (evt.state === "delta") {
|
||||
const buffer = getOrCreateBuffer(evt.runId);
|
||||
|
||||
// FIXED: Extract thinking and content SEPARATELY for proper sequencing
|
||||
// This is model-agnostic: models without thinking blocks just return empty string
|
||||
const thinkingText = extractThinkingFromMessage(evt.message);
|
||||
const contentText = extractContentFromMessage(evt.message);
|
||||
|
||||
// Update buffer with new content
|
||||
// In streaming, we typically receive the full accumulated text each time
|
||||
if (thinkingText) {
|
||||
buffer.thinkingText = thinkingText;
|
||||
}
|
||||
if (contentText) {
|
||||
buffer.contentText = contentText;
|
||||
}
|
||||
buffer.lastUpdateMs = Date.now();
|
||||
|
||||
// Skip render if both are empty
|
||||
if (!buffer.thinkingText && !buffer.contentText) return;
|
||||
|
||||
// FIXED: Pass separated streams to ChatLog for proper sequencing
|
||||
chatLog.updateAssistant("", evt.runId, {
|
||||
thinkingText: buffer.thinkingText,
|
||||
contentText: buffer.contentText,
|
||||
showThinking: state.showThinking,
|
||||
});
|
||||
|
||||
const displayText = streamAssembler.ingestDelta(evt.runId, evt.message, state.showThinking);
|
||||
if (!displayText) return;
|
||||
chatLog.updateAssistant(displayText, evt.runId);
|
||||
setActivityStatus("streaming");
|
||||
}
|
||||
if (evt.state === "final") {
|
||||
@@ -112,24 +56,7 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
: ""
|
||||
: "";
|
||||
|
||||
// FIXED: Extract final content with proper thinking handling
|
||||
const thinkingText = extractThinkingFromMessage(evt.message);
|
||||
const contentText = extractContentFromMessage(evt.message);
|
||||
|
||||
// Compose final text with proper ordering (thinking before content)
|
||||
const parts: string[] = [];
|
||||
if (state.showThinking && thinkingText.trim()) {
|
||||
parts.push(`[thinking]\n${thinkingText}`);
|
||||
}
|
||||
if (contentText.trim()) {
|
||||
parts.push(contentText);
|
||||
}
|
||||
const finalComposed = parts.join("\n\n").trim();
|
||||
|
||||
const finalText = resolveFinalAssistantText({
|
||||
finalText: finalComposed,
|
||||
streamedText: chatLog.getStreamingText(evt.runId),
|
||||
});
|
||||
const finalText = streamAssembler.finalize(evt.runId, evt.message, state.showThinking);
|
||||
chatLog.finalizeAssistant(finalText, evt.runId);
|
||||
noteFinalizedRun(evt.runId);
|
||||
state.activeChatRunId = null;
|
||||
@@ -139,14 +66,14 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
}
|
||||
if (evt.state === "aborted") {
|
||||
chatLog.addSystem("run aborted");
|
||||
runBuffers.delete(evt.runId);
|
||||
streamAssembler.drop(evt.runId);
|
||||
state.activeChatRunId = null;
|
||||
setActivityStatus("aborted");
|
||||
void refreshSessionInfo?.();
|
||||
}
|
||||
if (evt.state === "error") {
|
||||
chatLog.addSystem(`run error: ${evt.errorMessage ?? "unknown"}`);
|
||||
runBuffers.delete(evt.runId);
|
||||
streamAssembler.drop(evt.runId);
|
||||
state.activeChatRunId = null;
|
||||
setActivityStatus("error");
|
||||
void refreshSessionInfo?.();
|
||||
|
||||
Reference in New Issue
Block a user