Files
openclaw/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts
Sid 7a22b3fa0b feat(agents): flush reply pipeline before compaction wait (#35489)
Merged via squash.

Prepared head SHA: 7dbbcc510b
Co-authored-by: Sid-Qin <201593046+Sid-Qin@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
2026-03-05 18:22:19 -08:00

92 lines
2.8 KiB
TypeScript

import { emitAgentEvent } from "../infra/agent-events.js";
import { createInlineCodeState } from "../markdown/code-spans.js";
import { formatAssistantErrorText } from "./pi-embedded-helpers.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
import { isAssistantMessage } from "./pi-embedded-utils.js";
export {
handleAutoCompactionEnd,
handleAutoCompactionStart,
} from "./pi-embedded-subscribe.handlers.compaction.js";
export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
ctx.log.debug(`embedded run agent start: runId=${ctx.params.runId}`);
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "start",
startedAt: Date.now(),
},
});
void ctx.params.onAgentEvent?.({
stream: "lifecycle",
data: { phase: "start" },
});
}
export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
const lastAssistant = ctx.state.lastAssistant;
const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error";
if (isError && lastAssistant) {
const friendlyError = formatAssistantErrorText(lastAssistant, {
cfg: ctx.params.config,
sessionKey: ctx.params.sessionKey,
provider: lastAssistant.provider,
model: lastAssistant.model,
});
const errorText = (friendlyError || lastAssistant.errorMessage || "LLM request failed.").trim();
ctx.log.warn(
`embedded run agent end: runId=${ctx.params.runId} isError=true error=${errorText}`,
);
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "error",
error: errorText,
endedAt: Date.now(),
},
});
void ctx.params.onAgentEvent?.({
stream: "lifecycle",
data: {
phase: "error",
error: errorText,
},
});
} else {
ctx.log.debug(`embedded run agent end: runId=${ctx.params.runId} isError=${isError}`);
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "end",
endedAt: Date.now(),
},
});
void ctx.params.onAgentEvent?.({
stream: "lifecycle",
data: { phase: "end" },
});
}
ctx.flushBlockReplyBuffer();
// Flush the reply pipeline so the response reaches the channel before
// compaction wait blocks the run. This mirrors the pattern used by
// handleToolExecutionStart and ensures delivery is not held hostage to
// long-running compaction (#35074).
void ctx.params.onBlockReplyFlush?.();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
if (ctx.state.pendingCompactionRetry > 0) {
ctx.resolveCompactionRetry();
} else {
ctx.maybeResolveCompactionWait();
}
}