mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 05:27:26 +00:00
fix: preserve assistant partial stream during reasoning
This commit is contained in:
@@ -82,6 +82,30 @@ export function handleMessageUpdate(
|
||||
: undefined;
|
||||
const evtType = typeof assistantRecord?.type === "string" ? assistantRecord.type : "";
|
||||
|
||||
if (evtType === "thinking_start" || evtType === "thinking_delta" || evtType === "thinking_end") {
|
||||
const thinkingDelta = typeof assistantRecord?.delta === "string" ? assistantRecord.delta : "";
|
||||
const thinkingContent =
|
||||
typeof assistantRecord?.content === "string" ? assistantRecord.content : "";
|
||||
appendRawStream({
|
||||
ts: Date.now(),
|
||||
event: "assistant_thinking_stream",
|
||||
runId: ctx.params.runId,
|
||||
sessionId: (ctx.params.session as { id?: string }).id,
|
||||
evtType,
|
||||
delta: thinkingDelta,
|
||||
content: thinkingContent,
|
||||
});
|
||||
if (ctx.state.streamReasoning) {
|
||||
// Prefer full partial-message thinking when available; fall back to event payloads.
|
||||
const partialThinking = extractAssistantThinking(msg);
|
||||
ctx.emitReasoningStream(partialThinking || thinkingContent || thinkingDelta);
|
||||
}
|
||||
if (evtType === "thinking_end") {
|
||||
void ctx.params.onReasoningEnd?.();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (evtType !== "text_delta" && evtType !== "text_start" && evtType !== "text_end") {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -201,6 +201,56 @@ describe("subscribeEmbeddedPiSession", () => {
|
||||
},
|
||||
);
|
||||
|
||||
it("streams native thinking_delta events and signals reasoning end", () => {
|
||||
let handler: ((evt: unknown) => void) | undefined;
|
||||
const session: StubSession = {
|
||||
subscribe: (fn) => {
|
||||
handler = fn;
|
||||
return () => {};
|
||||
},
|
||||
};
|
||||
|
||||
const onReasoningStream = vi.fn();
|
||||
const onReasoningEnd = vi.fn();
|
||||
|
||||
subscribeEmbeddedPiSession({
|
||||
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
|
||||
runId: "run",
|
||||
reasoningMode: "stream",
|
||||
onReasoningStream,
|
||||
onReasoningEnd,
|
||||
});
|
||||
|
||||
handler?.({
|
||||
type: "message_update",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "thinking", thinking: "Checking files" }],
|
||||
},
|
||||
assistantMessageEvent: {
|
||||
type: "thinking_delta",
|
||||
delta: "Checking files",
|
||||
},
|
||||
});
|
||||
|
||||
handler?.({
|
||||
type: "message_update",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "thinking", thinking: "Checking files done" }],
|
||||
},
|
||||
assistantMessageEvent: {
|
||||
type: "thinking_end",
|
||||
},
|
||||
});
|
||||
|
||||
const streamTexts = onReasoningStream.mock.calls
|
||||
.map((call) => call[0]?.text)
|
||||
.filter((value): value is string => typeof value === "string");
|
||||
expect(streamTexts.at(-1)).toBe("Reasoning:\n_Checking files done_");
|
||||
expect(onReasoningEnd).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("emits delta chunks in agent events for streaming assistant text", () => {
|
||||
const { emit, onAgentEvent } = createAgentEventHarness();
|
||||
|
||||
|
||||
@@ -104,13 +104,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const allowPartialStream = !(
|
||||
params.followupRun.run.reasoningLevel === "stream" && params.opts?.onReasoningStream
|
||||
);
|
||||
const normalizeStreamingText = (payload: ReplyPayload): { text?: string; skip: boolean } => {
|
||||
if (!allowPartialStream) {
|
||||
return { skip: true };
|
||||
}
|
||||
let text = payload.text;
|
||||
if (!params.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
@@ -290,18 +284,16 @@ export async function runAgentTurnWithFallback(params: {
|
||||
abortSignal: params.opts?.abortSignal,
|
||||
blockReplyBreak: params.resolvedBlockStreamingBreak,
|
||||
blockReplyChunking: params.blockReplyChunking,
|
||||
onPartialReply: allowPartialStream
|
||||
? async (payload) => {
|
||||
const textForTyping = await handlePartialForTyping(payload);
|
||||
if (!params.opts?.onPartialReply || textForTyping === undefined) {
|
||||
return;
|
||||
}
|
||||
await params.opts.onPartialReply({
|
||||
text: textForTyping,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
});
|
||||
}
|
||||
: undefined,
|
||||
onPartialReply: async (payload) => {
|
||||
const textForTyping = await handlePartialForTyping(payload);
|
||||
if (!params.opts?.onPartialReply || textForTyping === undefined) {
|
||||
return;
|
||||
}
|
||||
await params.opts.onPartialReply({
|
||||
text: textForTyping,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
});
|
||||
},
|
||||
onAssistantMessageStart: async () => {
|
||||
await params.typingSignals.signalMessageStart();
|
||||
await params.opts?.onAssistantMessageStart?.();
|
||||
|
||||
@@ -91,6 +91,7 @@ function createMinimalRun(params?: {
|
||||
storePath?: string;
|
||||
typingMode?: TypingMode;
|
||||
blockStreamingEnabled?: boolean;
|
||||
runOverrides?: Partial<FollowupRun["run"]>;
|
||||
}) {
|
||||
const typing = createMockTypingController();
|
||||
const opts = params?.opts;
|
||||
@@ -124,6 +125,7 @@ function createMinimalRun(params?: {
|
||||
},
|
||||
timeoutMs: 1_000,
|
||||
blockReplyBreak: "message_end",
|
||||
...params?.runOverrides,
|
||||
},
|
||||
} as unknown as FollowupRun;
|
||||
|
||||
@@ -411,6 +413,25 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
||||
expect(typing.startTypingOnText).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps assistant partial streaming enabled when reasoning mode is stream", async () => {
|
||||
const onPartialReply = vi.fn();
|
||||
const onReasoningStream = vi.fn();
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
|
||||
await params.onReasoningStream?.({ text: "Reasoning:\n_step_" });
|
||||
await params.onPartialReply?.({ text: "answer chunk" });
|
||||
return { payloads: [{ text: "final" }], meta: {} };
|
||||
});
|
||||
|
||||
const { run } = createMinimalRun({
|
||||
opts: { onPartialReply, onReasoningStream },
|
||||
runOverrides: { reasoningLevel: "stream" },
|
||||
});
|
||||
await run();
|
||||
|
||||
expect(onReasoningStream).toHaveBeenCalled();
|
||||
expect(onPartialReply).toHaveBeenCalledWith({ text: "answer chunk", mediaUrls: undefined });
|
||||
});
|
||||
|
||||
it("suppresses typing in never mode", async () => {
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
|
||||
await params.onPartialReply?.({ text: "hi" });
|
||||
|
||||
Reference in New Issue
Block a user