Emit session.message websocket events for transcript updates

This commit is contained in:
Tyler Yust
2026-03-12 01:47:01 -07:00
parent 840ae327c1
commit ee2563a38b
10 changed files with 229 additions and 19 deletions

View File

@@ -245,7 +245,7 @@ export function installSessionToolResultGuard(
sessionManager as { getSessionFile?: () => string | null }
).getSessionFile?.();
if (sessionFile) {
emitSessionTranscriptUpdate(sessionFile);
emitSessionTranscriptUpdate({ sessionFile, message: finalMessage });
}
if (toolCalls.length > 0) {

View File

@@ -293,27 +293,37 @@ async function persistAcpTurnTranscript(params: {
});
if (promptText) {
sessionManager.appendMessage({
role: "user",
const promptMessage = {
role: "user" as const,
content: promptText,
timestamp: Date.now(),
};
sessionManager.appendMessage(promptMessage);
emitSessionTranscriptUpdate({
sessionFile,
sessionKey: params.sessionKey,
message: promptMessage,
});
}
if (replyText) {
sessionManager.appendMessage({
role: "assistant",
const replyMessage = {
role: "assistant" as const,
content: [{ type: "text", text: replyText }],
api: "openai-responses",
provider: "openclaw",
model: "acp-runtime",
usage: ACP_TRANSCRIPT_USAGE,
stopReason: "stop",
stopReason: "stop" as const,
timestamp: Date.now(),
} as Parameters<typeof sessionManager.appendMessage>[0];
sessionManager.appendMessage(replyMessage);
emitSessionTranscriptUpdate({
sessionFile,
sessionKey: params.sessionKey,
message: replyMessage,
});
}
emitSessionTranscriptUpdate(sessionFile);
return sessionEntry;
}

View File

@@ -179,9 +179,8 @@ export async function appendAssistantMessageToSessionTranscript(params: {
await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId });
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage({
role: "assistant",
const message = {
role: "assistant" as const,
content: [{ type: "text", text: mirrorText }],
api: "openai-responses",
provider: "openclaw",
@@ -200,10 +199,12 @@ export async function appendAssistantMessageToSessionTranscript(params: {
total: 0,
},
},
stopReason: "stop",
stopReason: "stop" as const,
timestamp: Date.now(),
});
} as Parameters<SessionManager["appendMessage"]>[0];
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(message);
emitSessionTranscriptUpdate(sessionFile);
emitSessionTranscriptUpdate({ sessionFile, sessionKey, message });
return { ok: true, sessionFile };
}

View File

@@ -24,6 +24,7 @@ export function createGatewayCloseHandler(params: {
mediaCleanup: ReturnType<typeof setInterval> | null;
agentUnsub: (() => void) | null;
heartbeatUnsub: (() => void) | null;
transcriptUnsub: (() => void) | null;
chatRunState: { clear: () => void };
clients: Set<{ socket: { close: (code: number, reason: string) => void } }>;
configReloader: { stop: () => Promise<void> };
@@ -105,6 +106,13 @@ export function createGatewayCloseHandler(params: {
/* ignore */
}
}
if (params.transcriptUnsub) {
try {
params.transcriptUnsub();
} catch {
/* ignore */
}
}
params.chatRunState.clear();
for (const c of params.clients) {
try {

View File

@@ -1,4 +1,5 @@
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
type AppendMessageArg = Parameters<SessionManager["appendMessage"]>[0];
@@ -68,6 +69,10 @@ export function appendInjectedAssistantMessageToTranscript(params: {
// Raw jsonl appends break the parent chain and can hide compaction summaries from context.
const sessionManager = SessionManager.open(params.transcriptPath);
const messageId = sessionManager.appendMessage(messageBody);
emitSessionTranscriptUpdate({
sessionFile: params.transcriptPath,
message: messageBody,
});
return { ok: true, messageId, message: messageBody };
} catch (err) {
return { ok: false, error: err instanceof Error ? err.message : String(err) };

View File

@@ -63,6 +63,7 @@ import {
prepareSecretsRuntimeSnapshot,
resolveCommandSecretsFromActiveRuntimeSnapshot,
} from "../secrets/runtime.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { runOnboardingWizard } from "../wizard/onboarding.js";
import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
@@ -110,6 +111,7 @@ import {
import { resolveHookClientIpConfig } from "./server/hooks.js";
import { createReadinessChecker } from "./server/readiness.js";
import { loadGatewayTlsRuntime } from "./server/tls.js";
import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js";
import {
ensureGatewayStartupAuth,
mergeGatewayAuthConfig,
@@ -748,6 +750,24 @@ export async function startGatewayServer(
broadcast("heartbeat", evt, { dropIfSlow: true });
});
const transcriptUnsub = minimalTestGateway
? null
: onSessionTranscriptUpdate((update) => {
const sessionKey =
update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile);
if (!sessionKey || update.message === undefined) {
return;
}
broadcast(
"session.message",
{
sessionKey,
message: update.message,
},
{ dropIfSlow: true },
);
});
let heartbeatRunner: HeartbeatRunner = minimalTestGateway
? {
stop: () => {},
@@ -1035,6 +1055,7 @@ export async function startGatewayServer(
mediaCleanup,
agentUnsub,
heartbeatUnsub,
transcriptUnsub,
chatRunState,
clients,
configReloader,

View File

@@ -0,0 +1,79 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, test } from "vitest";
import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js";
import { testState } from "./test-helpers.mocks.js";
import {
connectOk,
createGatewaySuiteHarness,
installGatewayTestHooks,
onceMessage,
writeSessionStore,
} from "./test-helpers.server.js";
installGatewayTestHooks();
const cleanupDirs: string[] = [];
afterEach(async () => {
await Promise.all(
cleanupDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })),
);
});
async function createSessionStoreFile(): Promise<string> {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-message-"));
cleanupDirs.push(dir);
const storePath = path.join(dir, "sessions.json");
testState.sessionStorePath = storePath;
return storePath;
}
describe("session.message websocket events", () => {
test("broadcasts appended transcript messages with the session key", async () => {
const storePath = await createSessionStoreFile();
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
storePath,
});
const harness = await createGatewaySuiteHarness();
try {
const ws = await harness.openWs();
try {
await connectOk(ws);
const appendPromise = appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "live websocket message",
storePath,
});
const eventPromise = onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
);
const [appended, event] = await Promise.all([appendPromise, eventPromise]);
expect(appended.ok).toBe(true);
expect(
(event.payload as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("live websocket message");
} finally {
ws.close();
}
} finally {
await harness.close();
}
});
});

View File

@@ -0,0 +1,53 @@
import fs from "node:fs";
import path from "node:path";
import { loadConfig } from "../config/config.js";
import { normalizeAgentId } from "../routing/session-key.js";
import {
loadCombinedSessionStoreForGateway,
resolveGatewaySessionStoreTarget,
resolveSessionTranscriptCandidates,
} from "./session-utils.js";
function resolveTranscriptPathForComparison(value: string | undefined): string | undefined {
const trimmed = value?.trim();
if (!trimmed) {
return undefined;
}
const resolved = path.resolve(trimmed);
try {
return fs.realpathSync(resolved);
} catch {
return resolved;
}
}
export function resolveSessionKeyForTranscriptFile(sessionFile: string): string | undefined {
const targetPath = resolveTranscriptPathForComparison(sessionFile);
if (!targetPath) {
return undefined;
}
const cfg = loadConfig();
const { store } = loadCombinedSessionStoreForGateway(cfg);
for (const [key, entry] of Object.entries(store)) {
if (!entry?.sessionId) {
continue;
}
const target = resolveGatewaySessionStoreTarget({
cfg,
key,
scanLegacyKeys: false,
store,
});
const sessionAgentId = normalizeAgentId(target.agentId);
const matches = resolveSessionTranscriptCandidates(
entry.sessionId,
target.storePath,
entry.sessionFile,
sessionAgentId,
).some((candidate) => resolveTranscriptPathForComparison(candidate) === targetPath);
if (matches) {
return key;
}
}
return undefined;
}

View File

@@ -20,6 +20,23 @@ describe("transcript events", () => {
expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" });
});
it("includes optional session metadata when provided", () => {
const listener = vi.fn();
cleanup.push(onSessionTranscriptUpdate(listener));
emitSessionTranscriptUpdate({
sessionFile: " /tmp/session.jsonl ",
sessionKey: " agent:main:main ",
message: { role: "assistant", content: "hi" },
});
expect(listener).toHaveBeenCalledWith({
sessionFile: "/tmp/session.jsonl",
sessionKey: "agent:main:main",
message: { role: "assistant", content: "hi" },
});
});
it("continues notifying other listeners when one throws", () => {
const first = vi.fn(() => {
throw new Error("boom");

View File

@@ -1,5 +1,7 @@
type SessionTranscriptUpdate = {
export type SessionTranscriptUpdate = {
sessionFile: string;
sessionKey?: string;
message?: unknown;
};
type SessionTranscriptListener = (update: SessionTranscriptUpdate) => void;
@@ -13,15 +15,29 @@ export function onSessionTranscriptUpdate(listener: SessionTranscriptListener):
};
}
export function emitSessionTranscriptUpdate(sessionFile: string): void {
const trimmed = sessionFile.trim();
export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUpdate): void {
const normalized =
typeof update === "string"
? { sessionFile: update }
: {
sessionFile: update.sessionFile,
sessionKey: update.sessionKey,
message: update.message,
};
const trimmed = normalized.sessionFile.trim();
if (!trimmed) {
return;
}
const update = { sessionFile: trimmed };
const nextUpdate: SessionTranscriptUpdate = {
sessionFile: trimmed,
...(typeof normalized.sessionKey === "string" && normalized.sessionKey.trim()
? { sessionKey: normalized.sessionKey.trim() }
: {}),
...(normalized.message !== undefined ? { message: normalized.message } : {}),
};
for (const listener of SESSION_TRANSCRIPT_LISTENERS) {
try {
listener(update);
listener(nextUpdate);
} catch {
/* ignore */
}