mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 16:04:59 +00:00
refactor(agents): split tools + PI subscribe
This commit is contained in:
273
src/agents/pi-embedded-subscribe.handlers.messages.ts
Normal file
273
src/agents/pi-embedded-subscribe.handlers.messages.ts
Normal file
@@ -0,0 +1,273 @@
|
||||
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import type { AssistantMessage } from "@mariozechner/pi-ai";
|
||||
|
||||
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import {
|
||||
isMessagingToolDuplicateNormalized,
|
||||
normalizeTextForComparison,
|
||||
} from "./pi-embedded-helpers.js";
|
||||
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
||||
import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js";
|
||||
import {
|
||||
extractAssistantText,
|
||||
extractAssistantThinking,
|
||||
extractThinkingFromTaggedStream,
|
||||
extractThinkingFromTaggedText,
|
||||
formatReasoningMessage,
|
||||
promoteThinkingTagsToBlocks,
|
||||
} from "./pi-embedded-utils.js";
|
||||
|
||||
export function handleMessageStart(
|
||||
ctx: EmbeddedPiSubscribeContext,
|
||||
evt: AgentEvent & { message: AgentMessage },
|
||||
) {
|
||||
const msg = evt.message;
|
||||
if (msg?.role !== "assistant") return;
|
||||
|
||||
// KNOWN: Resetting at `text_end` is unsafe (late/duplicate end events).
|
||||
// ASSUME: `message_start` is the only reliable boundary for “new assistant message begins”.
|
||||
// Start-of-message is a safer reset point than message_end: some providers
|
||||
// may deliver late text_end updates after message_end, which would otherwise
|
||||
// re-trigger block replies.
|
||||
ctx.resetAssistantMessageState(ctx.state.assistantTexts.length);
|
||||
// Use assistant message_start as the earliest "writing" signal for typing.
|
||||
void ctx.params.onAssistantMessageStart?.();
|
||||
}
|
||||
|
||||
export function handleMessageUpdate(
|
||||
ctx: EmbeddedPiSubscribeContext,
|
||||
evt: AgentEvent & { message: AgentMessage; assistantMessageEvent?: unknown },
|
||||
) {
|
||||
const msg = evt.message;
|
||||
if (msg?.role !== "assistant") return;
|
||||
|
||||
const assistantEvent = evt.assistantMessageEvent;
|
||||
const assistantRecord =
|
||||
assistantEvent && typeof assistantEvent === "object"
|
||||
? (assistantEvent as Record<string, unknown>)
|
||||
: undefined;
|
||||
const evtType =
|
||||
typeof assistantRecord?.type === "string" ? assistantRecord.type : "";
|
||||
|
||||
if (
|
||||
evtType !== "text_delta" &&
|
||||
evtType !== "text_start" &&
|
||||
evtType !== "text_end"
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const delta =
|
||||
typeof assistantRecord?.delta === "string" ? assistantRecord.delta : "";
|
||||
const content =
|
||||
typeof assistantRecord?.content === "string" ? assistantRecord.content : "";
|
||||
|
||||
appendRawStream({
|
||||
ts: Date.now(),
|
||||
event: "assistant_text_stream",
|
||||
runId: ctx.params.runId,
|
||||
sessionId: (ctx.params.session as { id?: string }).id,
|
||||
evtType,
|
||||
delta,
|
||||
content,
|
||||
});
|
||||
|
||||
let chunk = "";
|
||||
if (evtType === "text_delta") {
|
||||
chunk = delta;
|
||||
} else if (evtType === "text_start" || evtType === "text_end") {
|
||||
if (delta) {
|
||||
chunk = delta;
|
||||
} else if (content) {
|
||||
// KNOWN: Some providers resend full content on `text_end`.
|
||||
// We only append a suffix (or nothing) to keep output monotonic.
|
||||
if (content.startsWith(ctx.state.deltaBuffer)) {
|
||||
chunk = content.slice(ctx.state.deltaBuffer.length);
|
||||
} else if (ctx.state.deltaBuffer.startsWith(content)) {
|
||||
chunk = "";
|
||||
} else if (!ctx.state.deltaBuffer.includes(content)) {
|
||||
chunk = content;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (chunk) {
|
||||
ctx.state.deltaBuffer += chunk;
|
||||
if (ctx.blockChunker) {
|
||||
ctx.blockChunker.append(chunk);
|
||||
} else {
|
||||
ctx.state.blockBuffer += chunk;
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx.state.streamReasoning) {
|
||||
// Handle partial <think> tags: stream whatever reasoning is visible so far.
|
||||
ctx.emitReasoningStream(
|
||||
extractThinkingFromTaggedStream(ctx.state.deltaBuffer),
|
||||
);
|
||||
}
|
||||
|
||||
const next = ctx
|
||||
.stripBlockTags(ctx.state.deltaBuffer, {
|
||||
thinking: false,
|
||||
final: false,
|
||||
})
|
||||
.trim();
|
||||
if (next && next !== ctx.state.lastStreamedAssistant) {
|
||||
ctx.state.lastStreamedAssistant = next;
|
||||
const { text: cleanedText, mediaUrls } = parseReplyDirectives(next);
|
||||
emitAgentEvent({
|
||||
runId: ctx.params.runId,
|
||||
stream: "assistant",
|
||||
data: {
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
},
|
||||
});
|
||||
ctx.params.onAgentEvent?.({
|
||||
stream: "assistant",
|
||||
data: {
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
},
|
||||
});
|
||||
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
|
||||
void ctx.params.onPartialReply({
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
ctx.params.onBlockReply &&
|
||||
ctx.blockChunking &&
|
||||
ctx.state.blockReplyBreak === "text_end"
|
||||
) {
|
||||
ctx.blockChunker?.drain({ force: false, emit: ctx.emitBlockChunk });
|
||||
}
|
||||
|
||||
if (evtType === "text_end" && ctx.state.blockReplyBreak === "text_end") {
|
||||
if (ctx.blockChunker?.hasBuffered()) {
|
||||
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
|
||||
ctx.blockChunker.reset();
|
||||
} else if (ctx.state.blockBuffer.length > 0) {
|
||||
ctx.emitBlockChunk(ctx.state.blockBuffer);
|
||||
ctx.state.blockBuffer = "";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function handleMessageEnd(
|
||||
ctx: EmbeddedPiSubscribeContext,
|
||||
evt: AgentEvent & { message: AgentMessage },
|
||||
) {
|
||||
const msg = evt.message;
|
||||
if (msg?.role !== "assistant") return;
|
||||
|
||||
const assistantMessage = msg as AssistantMessage;
|
||||
promoteThinkingTagsToBlocks(assistantMessage);
|
||||
|
||||
const rawText = extractAssistantText(assistantMessage);
|
||||
appendRawStream({
|
||||
ts: Date.now(),
|
||||
event: "assistant_message_end",
|
||||
runId: ctx.params.runId,
|
||||
sessionId: (ctx.params.session as { id?: string }).id,
|
||||
rawText,
|
||||
rawThinking: extractAssistantThinking(assistantMessage),
|
||||
});
|
||||
|
||||
const text = ctx.stripBlockTags(rawText, { thinking: false, final: false });
|
||||
const rawThinking =
|
||||
ctx.state.includeReasoning || ctx.state.streamReasoning
|
||||
? extractAssistantThinking(assistantMessage) ||
|
||||
extractThinkingFromTaggedText(rawText)
|
||||
: "";
|
||||
const formattedReasoning = rawThinking
|
||||
? formatReasoningMessage(rawThinking)
|
||||
: "";
|
||||
|
||||
const addedDuringMessage =
|
||||
ctx.state.assistantTexts.length > ctx.state.assistantTextBaseline;
|
||||
const chunkerHasBuffered = ctx.blockChunker?.hasBuffered() ?? false;
|
||||
ctx.finalizeAssistantTexts({ text, addedDuringMessage, chunkerHasBuffered });
|
||||
|
||||
const onBlockReply = ctx.params.onBlockReply;
|
||||
const shouldEmitReasoning = Boolean(
|
||||
ctx.state.includeReasoning &&
|
||||
formattedReasoning &&
|
||||
onBlockReply &&
|
||||
formattedReasoning !== ctx.state.lastReasoningSent,
|
||||
);
|
||||
const shouldEmitReasoningBeforeAnswer =
|
||||
shouldEmitReasoning &&
|
||||
ctx.state.blockReplyBreak === "message_end" &&
|
||||
!addedDuringMessage;
|
||||
const maybeEmitReasoning = () => {
|
||||
if (!shouldEmitReasoning || !formattedReasoning) return;
|
||||
ctx.state.lastReasoningSent = formattedReasoning;
|
||||
void onBlockReply?.({ text: formattedReasoning });
|
||||
};
|
||||
|
||||
if (shouldEmitReasoningBeforeAnswer) maybeEmitReasoning();
|
||||
|
||||
if (
|
||||
(ctx.state.blockReplyBreak === "message_end" ||
|
||||
(ctx.blockChunker
|
||||
? ctx.blockChunker.hasBuffered()
|
||||
: ctx.state.blockBuffer.length > 0)) &&
|
||||
text &&
|
||||
onBlockReply
|
||||
) {
|
||||
if (ctx.blockChunker?.hasBuffered()) {
|
||||
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
|
||||
ctx.blockChunker.reset();
|
||||
} else if (text !== ctx.state.lastBlockReplyText) {
|
||||
// Check for duplicates before emitting (same logic as emitBlockChunk).
|
||||
const normalizedText = normalizeTextForComparison(text);
|
||||
if (
|
||||
isMessagingToolDuplicateNormalized(
|
||||
normalizedText,
|
||||
ctx.state.messagingToolSentTextsNormalized,
|
||||
)
|
||||
) {
|
||||
ctx.log.debug(
|
||||
`Skipping message_end block reply - already sent via messaging tool: ${text.slice(0, 50)}...`,
|
||||
);
|
||||
} else {
|
||||
ctx.state.lastBlockReplyText = text;
|
||||
const {
|
||||
text: cleanedText,
|
||||
mediaUrls,
|
||||
audioAsVoice,
|
||||
} = parseReplyDirectives(text);
|
||||
// Emit if there's content OR audioAsVoice flag (to propagate the flag).
|
||||
if (
|
||||
cleanedText ||
|
||||
(mediaUrls && mediaUrls.length > 0) ||
|
||||
audioAsVoice
|
||||
) {
|
||||
void onBlockReply({
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
audioAsVoice,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!shouldEmitReasoningBeforeAnswer) maybeEmitReasoning();
|
||||
if (ctx.state.streamReasoning && rawThinking) {
|
||||
ctx.emitReasoningStream(rawThinking);
|
||||
}
|
||||
|
||||
ctx.state.deltaBuffer = "";
|
||||
ctx.state.blockBuffer = "";
|
||||
ctx.blockChunker?.reset();
|
||||
ctx.state.blockState.thinking = false;
|
||||
ctx.state.blockState.final = false;
|
||||
ctx.state.lastStreamedAssistant = undefined;
|
||||
}
|
||||
Reference in New Issue
Block a user