diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index cb5d465754e..b00416fa8e1 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -245,7 +245,7 @@ export function installSessionToolResultGuard( sessionManager as { getSessionFile?: () => string | null } ).getSessionFile?.(); if (sessionFile) { - emitSessionTranscriptUpdate(sessionFile); + emitSessionTranscriptUpdate({ sessionFile, message: finalMessage }); } if (toolCalls.length > 0) { diff --git a/src/commands/agent.ts b/src/commands/agent.ts index ab690b37666..d4a63a4a2b8 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -293,27 +293,37 @@ async function persistAcpTurnTranscript(params: { }); if (promptText) { - sessionManager.appendMessage({ - role: "user", + const promptMessage = { + role: "user" as const, content: promptText, timestamp: Date.now(), + }; + sessionManager.appendMessage(promptMessage); + emitSessionTranscriptUpdate({ + sessionFile, + sessionKey: params.sessionKey, + message: promptMessage, }); } if (replyText) { - sessionManager.appendMessage({ - role: "assistant", + const replyMessage = { + role: "assistant" as const, content: [{ type: "text", text: replyText }], api: "openai-responses", provider: "openclaw", model: "acp-runtime", usage: ACP_TRANSCRIPT_USAGE, - stopReason: "stop", + stopReason: "stop" as const, timestamp: Date.now(), + } as Parameters[0]; + sessionManager.appendMessage(replyMessage); + emitSessionTranscriptUpdate({ + sessionFile, + sessionKey: params.sessionKey, + message: replyMessage, }); } - - emitSessionTranscriptUpdate(sessionFile); return sessionEntry; } diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index e6a8044f5c6..f65eebb1a07 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -179,9 +179,8 @@ export async function appendAssistantMessageToSessionTranscript(params: { await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId }); - const sessionManager = SessionManager.open(sessionFile); - sessionManager.appendMessage({ - role: "assistant", + const message = { + role: "assistant" as const, content: [{ type: "text", text: mirrorText }], api: "openai-responses", provider: "openclaw", @@ -200,10 +199,12 @@ export async function appendAssistantMessageToSessionTranscript(params: { total: 0, }, }, - stopReason: "stop", + stopReason: "stop" as const, timestamp: Date.now(), - }); + } as Parameters[0]; + const sessionManager = SessionManager.open(sessionFile); + sessionManager.appendMessage(message); - emitSessionTranscriptUpdate(sessionFile); + emitSessionTranscriptUpdate({ sessionFile, sessionKey, message }); return { ok: true, sessionFile }; } diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 1d941c0e206..cdc6946688d 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -24,6 +24,7 @@ export function createGatewayCloseHandler(params: { mediaCleanup: ReturnType | null; agentUnsub: (() => void) | null; heartbeatUnsub: (() => void) | null; + transcriptUnsub: (() => void) | null; chatRunState: { clear: () => void }; clients: Set<{ socket: { close: (code: number, reason: string) => void } }>; configReloader: { stop: () => Promise }; @@ -105,6 +106,13 @@ export function createGatewayCloseHandler(params: { /* ignore */ } } + if (params.transcriptUnsub) { + try { + params.transcriptUnsub(); + } catch { + /* ignore */ + } + } params.chatRunState.clear(); for (const c of params.clients) { try { diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts index f8c6bfd39f4..be64a5a60c1 100644 --- a/src/gateway/server-methods/chat-transcript-inject.ts +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -1,4 +1,5 @@ import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; type AppendMessageArg = Parameters[0]; @@ -68,6 +69,10 @@ export function appendInjectedAssistantMessageToTranscript(params: { // Raw jsonl appends break the parent chain and can hide compaction summaries from context. const sessionManager = SessionManager.open(params.transcriptPath); const messageId = sessionManager.appendMessage(messageBody); + emitSessionTranscriptUpdate({ + sessionFile: params.transcriptPath, + message: messageBody, + }); return { ok: true, messageId, message: messageBody }; } catch (err) { return { ok: false, error: err instanceof Error ? err.message : String(err) }; diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 9b3941d1432..a910d64e0f6 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -63,6 +63,7 @@ import { prepareSecretsRuntimeSnapshot, resolveCommandSecretsFromActiveRuntimeSnapshot, } from "../secrets/runtime.js"; +import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { runOnboardingWizard } from "../wizard/onboarding.js"; import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js"; import { startChannelHealthMonitor } from "./channel-health-monitor.js"; @@ -110,6 +111,7 @@ import { import { resolveHookClientIpConfig } from "./server/hooks.js"; import { createReadinessChecker } from "./server/readiness.js"; import { loadGatewayTlsRuntime } from "./server/tls.js"; +import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js"; import { ensureGatewayStartupAuth, mergeGatewayAuthConfig, @@ -748,6 +750,24 @@ export async function startGatewayServer( broadcast("heartbeat", evt, { dropIfSlow: true }); }); + const transcriptUnsub = minimalTestGateway + ? null + : onSessionTranscriptUpdate((update) => { + const sessionKey = + update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile); + if (!sessionKey || update.message === undefined) { + return; + } + broadcast( + "session.message", + { + sessionKey, + message: update.message, + }, + { dropIfSlow: true }, + ); + }); + let heartbeatRunner: HeartbeatRunner = minimalTestGateway ? { stop: () => {}, @@ -1035,6 +1055,7 @@ export async function startGatewayServer( mediaCleanup, agentUnsub, heartbeatUnsub, + transcriptUnsub, chatRunState, clients, configReloader, diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts new file mode 100644 index 00000000000..9eef254d8ba --- /dev/null +++ b/src/gateway/session-message-events.test.ts @@ -0,0 +1,79 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, test } from "vitest"; +import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js"; +import { testState } from "./test-helpers.mocks.js"; +import { + connectOk, + createGatewaySuiteHarness, + installGatewayTestHooks, + onceMessage, + writeSessionStore, +} from "./test-helpers.server.js"; + +installGatewayTestHooks(); + +const cleanupDirs: string[] = []; + +afterEach(async () => { + await Promise.all( + cleanupDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })), + ); +}); + +async function createSessionStoreFile(): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-message-")); + cleanupDirs.push(dir); + const storePath = path.join(dir, "sessions.json"); + testState.sessionStorePath = storePath; + return storePath; +} + +describe("session.message websocket events", () => { + test("broadcasts appended transcript messages with the session key", async () => { + const storePath = await createSessionStoreFile(); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + storePath, + }); + + const harness = await createGatewaySuiteHarness(); + try { + const ws = await harness.openWs(); + try { + await connectOk(ws); + + const appendPromise = appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "live websocket message", + storePath, + }); + const eventPromise = onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "session.message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:main", + ); + + const [appended, event] = await Promise.all([appendPromise, eventPromise]); + expect(appended.ok).toBe(true); + expect( + (event.payload as { message?: { content?: Array<{ text?: string }> } }).message + ?.content?.[0]?.text, + ).toBe("live websocket message"); + } finally { + ws.close(); + } + } finally { + await harness.close(); + } + }); +}); diff --git a/src/gateway/session-transcript-key.ts b/src/gateway/session-transcript-key.ts new file mode 100644 index 00000000000..fea2c3b949b --- /dev/null +++ b/src/gateway/session-transcript-key.ts @@ -0,0 +1,53 @@ +import fs from "node:fs"; +import path from "node:path"; +import { loadConfig } from "../config/config.js"; +import { normalizeAgentId } from "../routing/session-key.js"; +import { + loadCombinedSessionStoreForGateway, + resolveGatewaySessionStoreTarget, + resolveSessionTranscriptCandidates, +} from "./session-utils.js"; + +function resolveTranscriptPathForComparison(value: string | undefined): string | undefined { + const trimmed = value?.trim(); + if (!trimmed) { + return undefined; + } + const resolved = path.resolve(trimmed); + try { + return fs.realpathSync(resolved); + } catch { + return resolved; + } +} + +export function resolveSessionKeyForTranscriptFile(sessionFile: string): string | undefined { + const targetPath = resolveTranscriptPathForComparison(sessionFile); + if (!targetPath) { + return undefined; + } + const cfg = loadConfig(); + const { store } = loadCombinedSessionStoreForGateway(cfg); + for (const [key, entry] of Object.entries(store)) { + if (!entry?.sessionId) { + continue; + } + const target = resolveGatewaySessionStoreTarget({ + cfg, + key, + scanLegacyKeys: false, + store, + }); + const sessionAgentId = normalizeAgentId(target.agentId); + const matches = resolveSessionTranscriptCandidates( + entry.sessionId, + target.storePath, + entry.sessionFile, + sessionAgentId, + ).some((candidate) => resolveTranscriptPathForComparison(candidate) === targetPath); + if (matches) { + return key; + } + } + return undefined; +} diff --git a/src/sessions/transcript-events.test.ts b/src/sessions/transcript-events.test.ts index f9d8c7f3a99..bb7a366f80e 100644 --- a/src/sessions/transcript-events.test.ts +++ b/src/sessions/transcript-events.test.ts @@ -20,6 +20,23 @@ describe("transcript events", () => { expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" }); }); + it("includes optional session metadata when provided", () => { + const listener = vi.fn(); + cleanup.push(onSessionTranscriptUpdate(listener)); + + emitSessionTranscriptUpdate({ + sessionFile: " /tmp/session.jsonl ", + sessionKey: " agent:main:main ", + message: { role: "assistant", content: "hi" }, + }); + + expect(listener).toHaveBeenCalledWith({ + sessionFile: "/tmp/session.jsonl", + sessionKey: "agent:main:main", + message: { role: "assistant", content: "hi" }, + }); + }); + it("continues notifying other listeners when one throws", () => { const first = vi.fn(() => { throw new Error("boom"); diff --git a/src/sessions/transcript-events.ts b/src/sessions/transcript-events.ts index 9179713581f..964072967af 100644 --- a/src/sessions/transcript-events.ts +++ b/src/sessions/transcript-events.ts @@ -1,5 +1,7 @@ -type SessionTranscriptUpdate = { +export type SessionTranscriptUpdate = { sessionFile: string; + sessionKey?: string; + message?: unknown; }; type SessionTranscriptListener = (update: SessionTranscriptUpdate) => void; @@ -13,15 +15,29 @@ export function onSessionTranscriptUpdate(listener: SessionTranscriptListener): }; } -export function emitSessionTranscriptUpdate(sessionFile: string): void { - const trimmed = sessionFile.trim(); +export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUpdate): void { + const normalized = + typeof update === "string" + ? { sessionFile: update } + : { + sessionFile: update.sessionFile, + sessionKey: update.sessionKey, + message: update.message, + }; + const trimmed = normalized.sessionFile.trim(); if (!trimmed) { return; } - const update = { sessionFile: trimmed }; + const nextUpdate: SessionTranscriptUpdate = { + sessionFile: trimmed, + ...(typeof normalized.sessionKey === "string" && normalized.sessionKey.trim() + ? { sessionKey: normalized.sessionKey.trim() } + : {}), + ...(normalized.message !== undefined ? { message: normalized.message } : {}), + }; for (const listener of SESSION_TRANSCRIPT_LISTENERS) { try { - listener(update); + listener(nextUpdate); } catch { /* ignore */ }