diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index b00416fa8e1..b65cff7bea0 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -245,7 +245,11 @@ export function installSessionToolResultGuard( sessionManager as { getSessionFile?: () => string | null } ).getSessionFile?.(); if (sessionFile) { - emitSessionTranscriptUpdate({ sessionFile, message: finalMessage }); + emitSessionTranscriptUpdate({ + sessionFile, + message: finalMessage, + messageId: typeof result === "string" ? result : undefined, + }); } if (toolCalls.length > 0) { diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index f65eebb1a07..848a8fe89ae 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -137,7 +137,7 @@ export async function appendAssistantMessageToSessionTranscript(params: { mediaUrls?: string[]; /** Optional override for store path (mostly for tests). */ storePath?: string; -}): Promise<{ ok: true; sessionFile: string } | { ok: false; reason: string }> { +}): Promise<{ ok: true; sessionFile: string; messageId: string } | { ok: false; reason: string }> { const sessionKey = params.sessionKey.trim(); if (!sessionKey) { return { ok: false, reason: "missing sessionKey" }; @@ -203,8 +203,8 @@ export async function appendAssistantMessageToSessionTranscript(params: { timestamp: Date.now(), } as Parameters[0]; const sessionManager = SessionManager.open(sessionFile); - sessionManager.appendMessage(message); + const messageId = sessionManager.appendMessage(message); - emitSessionTranscriptUpdate({ sessionFile, sessionKey, message }); - return { ok: true, sessionFile }; + emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId }); + return { ok: true, sessionFile, messageId }; } diff --git a/src/gateway/method-scopes.test.ts b/src/gateway/method-scopes.test.ts index ba47b3469b4..bca993e9271 100644 --- a/src/gateway/method-scopes.test.ts +++ b/src/gateway/method-scopes.test.ts @@ -24,6 +24,12 @@ describe("method scope resolution", () => { expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.abort")).toEqual([ "operator.write", ]); + expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.messages.subscribe")).toEqual([ + "operator.read", + ]); + expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.messages.unsubscribe")).toEqual([ + "operator.read", + ]); expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]); }); diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index 830ed9f2f16..c31ff30db7b 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -71,6 +71,8 @@ const METHOD_SCOPE_GROUPS: Record = { "sessions.resolve", "sessions.subscribe", "sessions.unsubscribe", + "sessions.messages.subscribe", + "sessions.messages.unsubscribe", "sessions.usage", "sessions.usage.timeseries", "sessions.usage.logs", diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index ad483853b03..408e3239cc1 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -196,6 +196,10 @@ import { SessionsDeleteParamsSchema, type SessionsListParams, SessionsListParamsSchema, + type SessionsMessagesSubscribeParams, + SessionsMessagesSubscribeParamsSchema, + type SessionsMessagesUnsubscribeParams, + SessionsMessagesUnsubscribeParamsSchema, type SessionsPatchParams, SessionsPatchParamsSchema, type SessionsPreviewParams, @@ -334,6 +338,11 @@ export const validateSessionsCreateParams = ajv.compile( SessionsCreateParamsSchema, ); export const validateSessionsSendParams = ajv.compile(SessionsSendParamsSchema); +export const validateSessionsMessagesSubscribeParams = ajv.compile( + SessionsMessagesSubscribeParamsSchema, +); +export const validateSessionsMessagesUnsubscribeParams = + ajv.compile(SessionsMessagesUnsubscribeParamsSchema); export const validateSessionsAbortParams = ajv.compile(SessionsAbortParamsSchema); export const validateSessionsPatchParams = diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index d3d1103f848..60636e3eb5f 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -143,6 +143,8 @@ import { SessionsCreateParamsSchema, SessionsDeleteParamsSchema, SessionsListParamsSchema, + SessionsMessagesSubscribeParamsSchema, + SessionsMessagesUnsubscribeParamsSchema, SessionsPatchParamsSchema, SessionsPreviewParamsSchema, SessionsResetParamsSchema, @@ -209,6 +211,8 @@ export const ProtocolSchemas = { SessionsResolveParams: SessionsResolveParamsSchema, SessionsCreateParams: SessionsCreateParamsSchema, SessionsSendParams: SessionsSendParamsSchema, + SessionsMessagesSubscribeParams: SessionsMessagesSubscribeParamsSchema, + SessionsMessagesUnsubscribeParams: SessionsMessagesUnsubscribeParamsSchema, SessionsAbortParams: SessionsAbortParamsSchema, SessionsPatchParams: SessionsPatchParamsSchema, SessionsResetParams: SessionsResetParamsSchema, diff --git a/src/gateway/protocol/schema/sessions.ts b/src/gateway/protocol/schema/sessions.ts index 1851782b40a..62bc895cf73 100644 --- a/src/gateway/protocol/schema/sessions.ts +++ b/src/gateway/protocol/schema/sessions.ts @@ -51,6 +51,8 @@ export const SessionsCreateParamsSchema = Type.Object( { agentId: Type.Optional(NonEmptyString), label: Type.Optional(SessionLabelString), + task: Type.Optional(Type.String()), + message: Type.Optional(Type.String()), }, { additionalProperties: false }, ); @@ -67,6 +69,20 @@ export const SessionsSendParamsSchema = Type.Object( { additionalProperties: false }, ); +export const SessionsMessagesSubscribeParamsSchema = Type.Object( + { + key: NonEmptyString, + }, + { additionalProperties: false }, +); + +export const SessionsMessagesUnsubscribeParamsSchema = Type.Object( + { + key: NonEmptyString, + }, + { additionalProperties: false }, +); + export const SessionsAbortParamsSchema = Type.Object( { key: NonEmptyString, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index f13aafe6d69..58ddb142cd5 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -43,6 +43,8 @@ export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">; export type SessionsResolveParams = SchemaType<"SessionsResolveParams">; export type SessionsCreateParams = SchemaType<"SessionsCreateParams">; export type SessionsSendParams = SchemaType<"SessionsSendParams">; +export type SessionsMessagesSubscribeParams = SchemaType<"SessionsMessagesSubscribeParams">; +export type SessionsMessagesUnsubscribeParams = SchemaType<"SessionsMessagesUnsubscribeParams">; export type SessionsAbortParams = SchemaType<"SessionsAbortParams">; export type SessionsPatchParams = SchemaType<"SessionsPatchParams">; export type SessionsResetParams = SchemaType<"SessionsResetParams">; diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 46c61ebe37a..1b4a06b174b 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -244,6 +244,14 @@ export type SessionEventSubscriberRegistry = { clear: () => void; }; +export type SessionMessageSubscriberRegistry = { + subscribe: (connId: string, sessionKey: string) => void; + unsubscribe: (connId: string, sessionKey: string) => void; + unsubscribeAll: (connId: string) => void; + get: (sessionKey: string) => ReadonlySet; + clear: () => void; +}; + type ToolRecipientEntry = { connIds: Set; updatedAt: number; @@ -279,6 +287,84 @@ export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRe }; } +export function createSessionMessageSubscriberRegistry(): SessionMessageSubscriberRegistry { + const sessionToConnIds = new Map>(); + const connToSessionKeys = new Map>(); + const empty = new Set(); + + const normalize = (value: string): string => value.trim(); + + return { + subscribe: (connId: string, sessionKey: string) => { + const normalizedConnId = normalize(connId); + const normalizedSessionKey = normalize(sessionKey); + if (!normalizedConnId || !normalizedSessionKey) { + return; + } + const connIds = sessionToConnIds.get(normalizedSessionKey) ?? new Set(); + connIds.add(normalizedConnId); + sessionToConnIds.set(normalizedSessionKey, connIds); + + const sessionKeys = connToSessionKeys.get(normalizedConnId) ?? new Set(); + sessionKeys.add(normalizedSessionKey); + connToSessionKeys.set(normalizedConnId, sessionKeys); + }, + unsubscribe: (connId: string, sessionKey: string) => { + const normalizedConnId = normalize(connId); + const normalizedSessionKey = normalize(sessionKey); + if (!normalizedConnId || !normalizedSessionKey) { + return; + } + const connIds = sessionToConnIds.get(normalizedSessionKey); + if (connIds) { + connIds.delete(normalizedConnId); + if (connIds.size === 0) { + sessionToConnIds.delete(normalizedSessionKey); + } + } + const sessionKeys = connToSessionKeys.get(normalizedConnId); + if (sessionKeys) { + sessionKeys.delete(normalizedSessionKey); + if (sessionKeys.size === 0) { + connToSessionKeys.delete(normalizedConnId); + } + } + }, + unsubscribeAll: (connId: string) => { + const normalizedConnId = normalize(connId); + if (!normalizedConnId) { + return; + } + const sessionKeys = connToSessionKeys.get(normalizedConnId); + if (!sessionKeys) { + return; + } + for (const sessionKey of sessionKeys) { + const connIds = sessionToConnIds.get(sessionKey); + if (!connIds) { + continue; + } + connIds.delete(normalizedConnId); + if (connIds.size === 0) { + sessionToConnIds.delete(sessionKey); + } + } + connToSessionKeys.delete(normalizedConnId); + }, + get: (sessionKey: string) => { + const normalizedSessionKey = normalize(sessionKey); + if (!normalizedSessionKey) { + return empty; + } + return sessionToConnIds.get(normalizedSessionKey) ?? empty; + }, + clear: () => { + sessionToConnIds.clear(); + connToSessionKeys.clear(); + }, + }; +} + export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry { const recipients = new Map(); diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index e70243d9e90..2ccd32c92f9 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -56,6 +56,8 @@ const BASE_METHODS = [ "sessions.list", "sessions.subscribe", "sessions.unsubscribe", + "sessions.messages.subscribe", + "sessions.messages.unsubscribe", "sessions.preview", "sessions.create", "sessions.send", diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts index be64a5a60c1..1b03fbccfdd 100644 --- a/src/gateway/server-methods/chat-transcript-inject.ts +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -72,6 +72,7 @@ export function appendInjectedAssistantMessageToTranscript(params: { emitSessionTranscriptUpdate({ sessionFile: params.transcriptPath, message: messageBody, + messageId, }); return { ok: true, messageId, message: messageBody }; } catch (err) { diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 826601e7475..df4b31ec9b6 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -22,6 +22,8 @@ import { validateSessionsCreateParams, validateSessionsDeleteParams, validateSessionsListParams, + validateSessionsMessagesSubscribeParams, + validateSessionsMessagesUnsubscribeParams, validateSessionsPatchParams, validateSessionsPreviewParams, validateSessionsResetParams, @@ -83,6 +85,30 @@ function resolveGatewaySessionTargetFromKey(key: string) { return { cfg, target, storePath: target.storePath }; } +function resolveOptionalInitialSessionMessage(params: { + task?: unknown; + message?: unknown; +}): string | undefined { + if (typeof params.task === "string" && params.task.trim()) { + return params.task; + } + if (typeof params.message === "string" && params.message.trim()) { + return params.message; + } + return undefined; +} + +function shouldAttachPendingMessageSeq(params: { payload: unknown; cached?: boolean }): boolean { + if (params.cached) { + return false; + } + const status = + params.payload && typeof params.payload === "object" + ? (params.payload as { status?: unknown }).status + : undefined; + return status === "started"; +} + function emitSessionsChanged( context: Pick, payload: { sessionKey?: string; reason: string; compacted?: boolean }, @@ -246,6 +272,52 @@ export const sessionsHandlers: GatewayRequestHandlers = { } respond(true, { subscribed: false }, undefined); }, + "sessions.messages.subscribe": ({ params, client, context, respond }) => { + if ( + !assertValidParams( + params, + validateSessionsMessagesSubscribeParams, + "sessions.messages.subscribe", + respond, + ) + ) { + return; + } + const connId = client?.connId?.trim(); + const key = requireSessionKey((params as { key?: unknown }).key, respond); + if (!key) { + return; + } + const { canonicalKey } = loadSessionEntry(key); + if (connId) { + context.subscribeSessionMessageEvents(connId, canonicalKey); + respond(true, { subscribed: true, key: canonicalKey }, undefined); + return; + } + respond(true, { subscribed: false, key: canonicalKey }, undefined); + }, + "sessions.messages.unsubscribe": ({ params, client, context, respond }) => { + if ( + !assertValidParams( + params, + validateSessionsMessagesUnsubscribeParams, + "sessions.messages.unsubscribe", + respond, + ) + ) { + return; + } + const connId = client?.connId?.trim(); + const key = requireSessionKey((params as { key?: unknown }).key, respond); + if (!key) { + return; + } + const { canonicalKey } = loadSessionEntry(key); + if (connId) { + context.unsubscribeSessionMessageEvents(connId, canonicalKey); + } + respond(true, { subscribed: false, key: canonicalKey }, undefined); + }, "sessions.preview": ({ params, respond }) => { if (!assertValidParams(params, validateSessionsPreviewParams, "sessions.preview", respond)) { return; @@ -322,7 +394,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { } respond(true, { ok: true, key: resolved.key }, undefined); }, - "sessions.create": async ({ params, respond, context }) => { + "sessions.create": async ({ req, params, respond, context, client, isWebchatConnect }) => { if (!assertValidParams(params, validateSessionsCreateParams, "sessions.create", respond)) { return; } @@ -366,6 +438,45 @@ export const sessionsHandlers: GatewayRequestHandlers = { ); return; } + + const initialMessage = resolveOptionalInitialSessionMessage(p); + let runPayload: Record | undefined; + let runError: unknown; + let runMeta: Record | undefined; + const messageSeq = initialMessage + ? readSessionMessages(created.entry.sessionId, target.storePath, created.entry.sessionFile) + .length + 1 + : undefined; + + if (initialMessage) { + await chatHandlers["chat.send"]({ + req, + params: { + sessionKey: target.canonicalKey, + message: initialMessage, + idempotencyKey: randomUUID(), + }, + respond: (ok, payload, error, meta) => { + if (ok && payload && typeof payload === "object") { + runPayload = payload as Record; + } else { + runError = error; + } + runMeta = meta; + }, + context, + client, + isWebchatConnect, + }); + } + + const runStarted = + runPayload !== undefined && + shouldAttachPendingMessageSeq({ + payload: runPayload, + cached: runMeta?.cached === true, + }); + respond( true, { @@ -373,6 +484,10 @@ export const sessionsHandlers: GatewayRequestHandlers = { key: target.canonicalKey, sessionId: created.entry.sessionId, entry: created.entry, + runStarted, + ...(runPayload ? runPayload : {}), + ...(runStarted && typeof messageSeq === "number" ? { messageSeq } : {}), + ...(runError ? { runError } : {}), }, undefined, ); @@ -380,6 +495,12 @@ export const sessionsHandlers: GatewayRequestHandlers = { sessionKey: target.canonicalKey, reason: "create", }); + if (runStarted) { + emitSessionsChanged(context, { + sessionKey: target.canonicalKey, + reason: "send", + }); + } }, "sessions.send": async ({ req, params, respond, context, client, isWebchatConnect }) => { if (!assertValidParams(params, validateSessionsSendParams, "sessions.send", respond)) { @@ -390,7 +511,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { if (!key) { return; } - const { entry, canonicalKey } = loadSessionEntry(key); + const { entry, canonicalKey, storePath } = loadSessionEntry(key); if (!entry?.sessionId) { respond( false, @@ -399,6 +520,8 @@ export const sessionsHandlers: GatewayRequestHandlers = { ); return; } + const messageSeq = + readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length + 1; let sendAcked = false; await chatHandlers["chat.send"]({ req, @@ -415,6 +538,18 @@ export const sessionsHandlers: GatewayRequestHandlers = { }, respond: (ok, payload, error, meta) => { sendAcked = ok; + if (ok && shouldAttachPendingMessageSeq({ payload, cached: meta?.cached === true })) { + respond( + true, + { + ...(payload && typeof payload === "object" ? payload : {}), + messageSeq, + }, + undefined, + meta, + ); + return; + } respond(ok, payload, error, meta); }, context, @@ -444,7 +579,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { canonicalKey, runId: typeof p.runId === "string" ? p.runId : undefined, }); - let aborted = false; + let abortedRunId: string | null = null; await chatHandlers["chat.abort"]({ req, params: { @@ -452,18 +587,35 @@ export const sessionsHandlers: GatewayRequestHandlers = { runId: typeof p.runId === "string" ? p.runId : undefined, }, respond: (ok, payload, error, meta) => { - aborted = - ok && - Boolean( - payload && typeof payload === "object" && (payload as { aborted?: boolean }).aborted, - ); - respond(ok, payload, error, meta); + if (!ok) { + respond(ok, payload, error, meta); + return; + } + const runIds = + payload && + typeof payload === "object" && + Array.isArray((payload as { runIds?: unknown[] }).runIds) + ? (payload as { runIds: unknown[] }).runIds.filter( + (value): value is string => typeof value === "string" && value.trim().length > 0, + ) + : []; + abortedRunId = runIds[0] ?? null; + respond( + true, + { + ok: true, + abortedRunId, + status: abortedRunId ? "aborted" : "no-active-run", + }, + undefined, + meta, + ); }, context, client, isWebchatConnect, }); - if (aborted) { + if (abortedRunId) { emitSessionsChanged(context, { sessionKey: canonicalKey, reason: "abort", diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index dfff3d02ed2..c4aff56cb84 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -65,6 +65,8 @@ export type GatewayRequestContext = { ) => { sessionKey: string; clientRunId: string } | undefined; subscribeSessionEvents: (connId: string) => void; unsubscribeSessionEvents: (connId: string) => void; + subscribeSessionMessageEvents: (connId: string, sessionKey: string) => void; + unsubscribeSessionMessageEvents: (connId: string, sessionKey: string) => void; unsubscribeAllSessionEvents: (connId: string) => void; getSessionEventSubscriberConnIds: () => ReadonlySet; registerToolEventRecipient: (runId: string, connId: string) => void; diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index 4e1c69872b2..79907340b1d 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -193,6 +193,7 @@ describe("gateway server chat", () => { }); expect(res.ok).toBe(true); expect(res.payload?.runId).toBe("idem-sessions-send-1"); + expect(res.payload?.messageSeq).toBe(1); await waitFor(() => spy.mock.calls.length > callsBefore, 1_000); const ctx = spy.mock.calls.at(-1)?.[0] as { Body?: string; SessionKey?: string } | undefined; @@ -258,8 +259,17 @@ describe("gateway server chat", () => { runId: "idem-sessions-abort-1", }); expect(abortRes.ok).toBe(true); - expect(abortRes.payload?.aborted).toBe(true); + expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1"); + expect(abortRes.payload?.status).toBe("aborted"); await waitFor(() => aborted, 1_000); + + const idleAbortRes = await rpcReq(ws, "sessions.abort", { + key: "agent:main:dashboard:test-abort", + runId: "idem-sessions-abort-1", + }); + expect(idleAbortRes.ok).toBe(true); + expect(idleAbortRes.payload?.abortedRunId).toBeNull(); + expect(idleAbortRes.payload?.status).toBe("no-active-run"); } finally { testState.sessionStorePath = undefined; await fs.rm(dir, { recursive: true, force: true }); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 1c2ab1b958b..fed40bdbaba 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -77,7 +77,11 @@ import { ExecApprovalManager } from "./exec-approval-manager.js"; import { NodeRegistry } from "./node-registry.js"; import type { startBrowserControlServerIfEnabled } from "./server-browser.js"; import { createChannelManager } from "./server-channels.js"; -import { createAgentEventHandler, createSessionEventSubscriberRegistry } from "./server-chat.js"; +import { + createAgentEventHandler, + createSessionEventSubscriberRegistry, + createSessionMessageSubscriberRegistry, +} from "./server-chat.js"; import { createGatewayCloseHandler } from "./server-close.js"; import { buildGatewayCronService } from "./server-cron.js"; import { startGatewayDiscovery } from "./server-discovery-runtime.js"; @@ -112,6 +116,11 @@ import { resolveHookClientIpConfig } from "./server/hooks.js"; import { createReadinessChecker } from "./server/readiness.js"; import { loadGatewayTlsRuntime } from "./server/tls.js"; import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js"; +import { + attachOpenClawTranscriptMeta, + loadSessionEntry, + readSessionMessages, +} from "./session-utils.js"; import { ensureGatewayStartupAuth, mergeGatewayAuthConfig, @@ -634,6 +643,7 @@ export async function startGatewayServer( const nodePresenceTimers = new Map>(); const nodeSubscriptions = createNodeSubscriptionManager(); const sessionEventSubscribers = createSessionEventSubscriberRegistry(); + const sessionMessageSubscribers = createSessionMessageSubscriberRegistry(); const nodeSendEvent = (opts: { nodeId: string; event: string; payloadJSON?: string | null }) => { const payload = safeParseJson(opts.payloadJSON ?? null); nodeRegistry.sendEvent(opts.nodeId, opts.event, payload); @@ -760,15 +770,31 @@ export async function startGatewayServer( if (!sessionKey || update.message === undefined) { return; } - const connIds = sessionEventSubscribers.getAll(); + const connIds = new Set(); + for (const connId of sessionEventSubscribers.getAll()) { + connIds.add(connId); + } + for (const connId of sessionMessageSubscribers.get(sessionKey)) { + connIds.add(connId); + } if (connIds.size === 0) { return; } + const { entry, storePath } = loadSessionEntry(sessionKey); + const messageSeq = entry?.sessionId + ? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length + : undefined; + const message = attachOpenClawTranscriptMeta(update.message, { + ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), + ...(typeof messageSeq === "number" ? { seq: messageSeq } : {}), + }); broadcastToConnIds( "session.message", { sessionKey, - message: update.message, + message, + ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), + ...(typeof messageSeq === "number" ? { messageSeq } : {}), }, connIds, { dropIfSlow: true }, @@ -882,7 +908,12 @@ export async function startGatewayServer( removeChatRun, subscribeSessionEvents: sessionEventSubscribers.subscribe, unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe, - unsubscribeAllSessionEvents: sessionEventSubscribers.unsubscribe, + subscribeSessionMessageEvents: sessionMessageSubscribers.subscribe, + unsubscribeSessionMessageEvents: sessionMessageSubscribers.unsubscribe, + unsubscribeAllSessionEvents: (connId: string) => { + sessionEventSubscribers.unsubscribe(connId); + sessionMessageSubscribers.unsubscribeAll(connId); + }, getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll, registerToolEventRecipient: toolEventRecipients.add, dedupe, diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 70d6821e44b..300abffe1ff 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -17,6 +17,7 @@ import { trackConnectChallengeNonce, writeSessionStore, } from "./test-helpers.js"; +import { getReplyFromConfig } from "./test-helpers.mocks.js"; const sessionCleanupMocks = vi.hoisted(() => ({ clearSessionQueues: vi.fn(() => ({ followupCleared: 0, laneCleared: 0, keys: [] })), @@ -274,6 +275,42 @@ describe("gateway server sessions", () => { ws.close(); }); + test("sessions.create can start the first agent turn from an initial task", async () => { + const { ws } = await openClient(); + const replySpy = vi.mocked(getReplyFromConfig); + const callsBefore = replySpy.mock.calls.length; + + const created = await rpcReq<{ + key?: string; + sessionId?: string; + runStarted?: boolean; + runId?: string; + messageSeq?: number; + }>(ws, "sessions.create", { + agentId: "ops", + label: "Dashboard Chat", + task: "hello from create", + }); + + expect(created.ok).toBe(true); + expect(created.payload?.key).toMatch(/^agent:ops:dashboard:/); + expect(created.payload?.sessionId).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ); + expect(created.payload?.runStarted).toBe(true); + expect(created.payload?.runId).toBeTruthy(); + expect(created.payload?.messageSeq).toBe(1); + + await vi.waitFor(() => replySpy.mock.calls.length > callsBefore); + const ctx = replySpy.mock.calls.at(-1)?.[0] as + | { Body?: string; SessionKey?: string } + | undefined; + expect(ctx?.Body).toContain("hello from create"); + expect(ctx?.SessionKey).toBe(created.payload?.key); + + ws.close(); + }); + test("lists and patches session store via sessions.* RPC", async () => { const { dir, storePath } = await createSessionStoreDir(); const now = Date.now(); diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts index f1556c82c71..08469910890 100644 --- a/src/gateway/session-message-events.test.ts +++ b/src/gateway/session-message-events.test.ts @@ -136,6 +136,115 @@ describe("session.message websocket events", () => { (event.payload as { message?: { content?: Array<{ text?: string }> } }).message ?.content?.[0]?.text, ).toBe("live websocket message"); + expect((event.payload as { messageSeq?: number }).messageSeq).toBe(1); + expect( + ( + event.payload as { + message?: { __openclaw?: { id?: string; seq?: number } }; + } + ).message?.__openclaw, + ).toMatchObject({ + id: appended.messageId, + seq: 1, + }); + } finally { + ws.close(); + } + } finally { + await harness.close(); + } + }); + + test("sessions.messages.subscribe only delivers transcript events for the requested session", async () => { + const storePath = await createSessionStoreFile(); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + worker: { + sessionId: "sess-worker", + updatedAt: Date.now(), + }, + }, + storePath, + }); + + const harness = await createGatewaySuiteHarness(); + try { + const ws = await harness.openWs(); + try { + await connectOk(ws, { scopes: ["operator.read"] }); + const subscribeRes = await rpcReq(ws, "sessions.messages.subscribe", { + key: "agent:main:main", + }); + expect(subscribeRes.ok).toBe(true); + expect(subscribeRes.payload?.subscribed).toBe(true); + expect(subscribeRes.payload?.key).toBe("agent:main:main"); + + const mainEvent = onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "session.message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:main", + ); + const workerEvent = Promise.race([ + onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "session.message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:worker", + ).then(() => "received"), + new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)), + ]); + + const [mainAppend] = await Promise.all([ + appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "main only", + storePath, + }), + mainEvent, + ]); + expect(mainAppend.ok).toBe(true); + + const workerAppend = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:worker", + text: "worker hidden", + storePath, + }); + expect(workerAppend.ok).toBe(true); + await expect(workerEvent).resolves.toBe("timeout"); + + const unsubscribeRes = await rpcReq(ws, "sessions.messages.unsubscribe", { + key: "agent:main:main", + }); + expect(unsubscribeRes.ok).toBe(true); + expect(unsubscribeRes.payload?.subscribed).toBe(false); + + const postUnsubscribeEvent = Promise.race([ + onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "session.message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:main", + ).then(() => "received"), + new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)), + ]); + const hiddenAppend = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "hidden after unsubscribe", + storePath, + }); + expect(hiddenAppend.ok).toBe(true); + await expect(postUnsubscribeEvent).resolves.toBe("timeout"); } finally { ws.close(); } diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 3712c8c8272..54f0843924a 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -71,6 +71,27 @@ function setCachedSessionTitleFields(cacheKey: string, stat: fs.Stats, value: Se } } +export function attachOpenClawTranscriptMeta( + message: unknown, + meta: Record, +): unknown { + if (!message || typeof message !== "object" || Array.isArray(message)) { + return message; + } + const record = message as Record; + const existing = + record.__openclaw && typeof record.__openclaw === "object" && !Array.isArray(record.__openclaw) + ? (record.__openclaw as Record) + : {}; + return { + ...record, + __openclaw: { + ...existing, + ...meta, + }, + }; +} + export function readSessionMessages( sessionId: string, storePath: string | undefined, @@ -85,6 +106,7 @@ export function readSessionMessages( const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/); const messages: unknown[] = []; + let messageSeq = 0; for (const line of lines) { if (!line.trim()) { continue; @@ -92,7 +114,13 @@ export function readSessionMessages( try { const parsed = JSON.parse(line); if (parsed?.message) { - messages.push(parsed.message); + messageSeq += 1; + messages.push( + attachOpenClawTranscriptMeta(parsed.message, { + ...(typeof parsed.id === "string" ? { id: parsed.id } : {}), + seq: messageSeq, + }), + ); continue; } @@ -101,6 +129,7 @@ export function readSessionMessages( if (parsed?.type === "compaction") { const ts = typeof parsed.timestamp === "string" ? Date.parse(parsed.timestamp) : Number.NaN; const timestamp = Number.isFinite(ts) ? ts : Date.now(); + messageSeq += 1; messages.push({ role: "system", content: [{ type: "text", text: "Compaction" }], @@ -108,6 +137,7 @@ export function readSessionMessages( __openclaw: { kind: "compaction", id: typeof parsed.id === "string" ? parsed.id : undefined, + seq: messageSeq, }, }); } diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 79237504197..cad8f3c8268 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -56,6 +56,7 @@ import type { export { archiveFileOnDisk, archiveSessionTranscripts, + attachOpenClawTranscriptMeta, capArrayByJsonBytes, readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts index b431bc7643f..dec57a75448 100644 --- a/src/gateway/sessions-history-http.test.ts +++ b/src/gateway/sessions-history-http.test.ts @@ -105,6 +105,15 @@ describe("session history HTTP endpoints", () => { expect(body.sessionKey).toBe("agent:main:main"); expect(body.messages).toHaveLength(1); expect(body.messages?.[0]?.content?.[0]?.text).toBe("hello from history"); + expect( + ( + body.messages?.[0] as { + __openclaw?: { id?: string; seq?: number }; + } + )?.__openclaw, + ).toMatchObject({ + seq: 1, + }); } finally { await harness.close(); } @@ -210,6 +219,17 @@ describe("session history HTTP endpoints", () => { (messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message ?.content?.[0]?.text, ).toBe("second message"); + expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(2); + expect( + ( + messageEvent.data as { + message?: { __openclaw?: { id?: string; seq?: number } }; + } + ).message?.__openclaw, + ).toMatchObject({ + id: appended.messageId, + seq: 2, + }); await reader?.cancel(); } finally { diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index 2a0831be525..a61d85c46ba 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -15,6 +15,7 @@ import { } from "./http-common.js"; import { getBearerToken, getHeader } from "./http-utils.js"; import { + attachOpenClawTranscriptMeta, readSessionMessages, resolveGatewaySessionStoreTarget, resolveSessionTranscriptCandidates, @@ -171,15 +172,22 @@ export async function handleSessionHistoryHttpRequest( return; } if (update.message !== undefined) { + const messageSeq = sentMessages.length + 1; + const nextMessage = attachOpenClawTranscriptMeta(update.message, { + ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), + seq: messageSeq, + }); if (limit === undefined) { - sentMessages = [...sentMessages, update.message]; + sentMessages = [...sentMessages, nextMessage]; sseWrite(res, "message", { sessionKey: target.canonicalKey, - message: update.message, + message: nextMessage, + ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), + messageSeq, }); return; } - sentMessages = maybeLimitMessages([...sentMessages, update.message], limit); + sentMessages = maybeLimitMessages([...sentMessages, nextMessage], limit); sseWrite(res, "history", { sessionKey: target.canonicalKey, messages: sentMessages, diff --git a/src/sessions/transcript-events.ts b/src/sessions/transcript-events.ts index 964072967af..c870b9407f0 100644 --- a/src/sessions/transcript-events.ts +++ b/src/sessions/transcript-events.ts @@ -2,6 +2,7 @@ export type SessionTranscriptUpdate = { sessionFile: string; sessionKey?: string; message?: unknown; + messageId?: string; }; type SessionTranscriptListener = (update: SessionTranscriptUpdate) => void; @@ -23,6 +24,7 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp sessionFile: update.sessionFile, sessionKey: update.sessionKey, message: update.message, + messageId: update.messageId, }; const trimmed = normalized.sessionFile.trim(); if (!trimmed) { @@ -34,6 +36,9 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp ? { sessionKey: normalized.sessionKey.trim() } : {}), ...(normalized.message !== undefined ? { message: normalized.message } : {}), + ...(typeof normalized.messageId === "string" && normalized.messageId.trim() + ? { messageId: normalized.messageId.trim() } + : {}), }; for (const listener of SESSION_TRANSCRIPT_LISTENERS) { try {