Add dashboard session API improvements

This commit is contained in:
Tyler Yust
2026-03-12 11:00:17 -07:00
parent c8ae47a9fe
commit d8de86870c
22 changed files with 561 additions and 24 deletions

View File

@@ -245,7 +245,11 @@ export function installSessionToolResultGuard(
sessionManager as { getSessionFile?: () => string | null }
).getSessionFile?.();
if (sessionFile) {
emitSessionTranscriptUpdate({ sessionFile, message: finalMessage });
emitSessionTranscriptUpdate({
sessionFile,
message: finalMessage,
messageId: typeof result === "string" ? result : undefined,
});
}
if (toolCalls.length > 0) {

View File

@@ -137,7 +137,7 @@ export async function appendAssistantMessageToSessionTranscript(params: {
mediaUrls?: string[];
/** Optional override for store path (mostly for tests). */
storePath?: string;
}): Promise<{ ok: true; sessionFile: string } | { ok: false; reason: string }> {
}): Promise<{ ok: true; sessionFile: string; messageId: string } | { ok: false; reason: string }> {
const sessionKey = params.sessionKey.trim();
if (!sessionKey) {
return { ok: false, reason: "missing sessionKey" };
@@ -203,8 +203,8 @@ export async function appendAssistantMessageToSessionTranscript(params: {
timestamp: Date.now(),
} as Parameters<SessionManager["appendMessage"]>[0];
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage(message);
const messageId = sessionManager.appendMessage(message);
emitSessionTranscriptUpdate({ sessionFile, sessionKey, message });
return { ok: true, sessionFile };
emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId });
return { ok: true, sessionFile, messageId };
}

View File

@@ -24,6 +24,12 @@ describe("method scope resolution", () => {
expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.abort")).toEqual([
"operator.write",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.messages.subscribe")).toEqual([
"operator.read",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("sessions.messages.unsubscribe")).toEqual([
"operator.read",
]);
expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]);
});

View File

@@ -71,6 +71,8 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
"sessions.resolve",
"sessions.subscribe",
"sessions.unsubscribe",
"sessions.messages.subscribe",
"sessions.messages.unsubscribe",
"sessions.usage",
"sessions.usage.timeseries",
"sessions.usage.logs",

View File

@@ -196,6 +196,10 @@ import {
SessionsDeleteParamsSchema,
type SessionsListParams,
SessionsListParamsSchema,
type SessionsMessagesSubscribeParams,
SessionsMessagesSubscribeParamsSchema,
type SessionsMessagesUnsubscribeParams,
SessionsMessagesUnsubscribeParamsSchema,
type SessionsPatchParams,
SessionsPatchParamsSchema,
type SessionsPreviewParams,
@@ -334,6 +338,11 @@ export const validateSessionsCreateParams = ajv.compile<SessionsCreateParams>(
SessionsCreateParamsSchema,
);
export const validateSessionsSendParams = ajv.compile<SessionsSendParams>(SessionsSendParamsSchema);
export const validateSessionsMessagesSubscribeParams = ajv.compile<SessionsMessagesSubscribeParams>(
SessionsMessagesSubscribeParamsSchema,
);
export const validateSessionsMessagesUnsubscribeParams =
ajv.compile<SessionsMessagesUnsubscribeParams>(SessionsMessagesUnsubscribeParamsSchema);
export const validateSessionsAbortParams =
ajv.compile<SessionsAbortParams>(SessionsAbortParamsSchema);
export const validateSessionsPatchParams =

View File

@@ -143,6 +143,8 @@ import {
SessionsCreateParamsSchema,
SessionsDeleteParamsSchema,
SessionsListParamsSchema,
SessionsMessagesSubscribeParamsSchema,
SessionsMessagesUnsubscribeParamsSchema,
SessionsPatchParamsSchema,
SessionsPreviewParamsSchema,
SessionsResetParamsSchema,
@@ -209,6 +211,8 @@ export const ProtocolSchemas = {
SessionsResolveParams: SessionsResolveParamsSchema,
SessionsCreateParams: SessionsCreateParamsSchema,
SessionsSendParams: SessionsSendParamsSchema,
SessionsMessagesSubscribeParams: SessionsMessagesSubscribeParamsSchema,
SessionsMessagesUnsubscribeParams: SessionsMessagesUnsubscribeParamsSchema,
SessionsAbortParams: SessionsAbortParamsSchema,
SessionsPatchParams: SessionsPatchParamsSchema,
SessionsResetParams: SessionsResetParamsSchema,

View File

@@ -51,6 +51,8 @@ export const SessionsCreateParamsSchema = Type.Object(
{
agentId: Type.Optional(NonEmptyString),
label: Type.Optional(SessionLabelString),
task: Type.Optional(Type.String()),
message: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
@@ -67,6 +69,20 @@ export const SessionsSendParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const SessionsMessagesSubscribeParamsSchema = Type.Object(
{
key: NonEmptyString,
},
{ additionalProperties: false },
);
export const SessionsMessagesUnsubscribeParamsSchema = Type.Object(
{
key: NonEmptyString,
},
{ additionalProperties: false },
);
export const SessionsAbortParamsSchema = Type.Object(
{
key: NonEmptyString,

View File

@@ -43,6 +43,8 @@ export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">;
export type SessionsResolveParams = SchemaType<"SessionsResolveParams">;
export type SessionsCreateParams = SchemaType<"SessionsCreateParams">;
export type SessionsSendParams = SchemaType<"SessionsSendParams">;
export type SessionsMessagesSubscribeParams = SchemaType<"SessionsMessagesSubscribeParams">;
export type SessionsMessagesUnsubscribeParams = SchemaType<"SessionsMessagesUnsubscribeParams">;
export type SessionsAbortParams = SchemaType<"SessionsAbortParams">;
export type SessionsPatchParams = SchemaType<"SessionsPatchParams">;
export type SessionsResetParams = SchemaType<"SessionsResetParams">;

View File

@@ -244,6 +244,14 @@ export type SessionEventSubscriberRegistry = {
clear: () => void;
};
export type SessionMessageSubscriberRegistry = {
subscribe: (connId: string, sessionKey: string) => void;
unsubscribe: (connId: string, sessionKey: string) => void;
unsubscribeAll: (connId: string) => void;
get: (sessionKey: string) => ReadonlySet<string>;
clear: () => void;
};
type ToolRecipientEntry = {
connIds: Set<string>;
updatedAt: number;
@@ -279,6 +287,84 @@ export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRe
};
}
export function createSessionMessageSubscriberRegistry(): SessionMessageSubscriberRegistry {
const sessionToConnIds = new Map<string, Set<string>>();
const connToSessionKeys = new Map<string, Set<string>>();
const empty = new Set<string>();
const normalize = (value: string): string => value.trim();
return {
subscribe: (connId: string, sessionKey: string) => {
const normalizedConnId = normalize(connId);
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedConnId || !normalizedSessionKey) {
return;
}
const connIds = sessionToConnIds.get(normalizedSessionKey) ?? new Set<string>();
connIds.add(normalizedConnId);
sessionToConnIds.set(normalizedSessionKey, connIds);
const sessionKeys = connToSessionKeys.get(normalizedConnId) ?? new Set<string>();
sessionKeys.add(normalizedSessionKey);
connToSessionKeys.set(normalizedConnId, sessionKeys);
},
unsubscribe: (connId: string, sessionKey: string) => {
const normalizedConnId = normalize(connId);
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedConnId || !normalizedSessionKey) {
return;
}
const connIds = sessionToConnIds.get(normalizedSessionKey);
if (connIds) {
connIds.delete(normalizedConnId);
if (connIds.size === 0) {
sessionToConnIds.delete(normalizedSessionKey);
}
}
const sessionKeys = connToSessionKeys.get(normalizedConnId);
if (sessionKeys) {
sessionKeys.delete(normalizedSessionKey);
if (sessionKeys.size === 0) {
connToSessionKeys.delete(normalizedConnId);
}
}
},
unsubscribeAll: (connId: string) => {
const normalizedConnId = normalize(connId);
if (!normalizedConnId) {
return;
}
const sessionKeys = connToSessionKeys.get(normalizedConnId);
if (!sessionKeys) {
return;
}
for (const sessionKey of sessionKeys) {
const connIds = sessionToConnIds.get(sessionKey);
if (!connIds) {
continue;
}
connIds.delete(normalizedConnId);
if (connIds.size === 0) {
sessionToConnIds.delete(sessionKey);
}
}
connToSessionKeys.delete(normalizedConnId);
},
get: (sessionKey: string) => {
const normalizedSessionKey = normalize(sessionKey);
if (!normalizedSessionKey) {
return empty;
}
return sessionToConnIds.get(normalizedSessionKey) ?? empty;
},
clear: () => {
sessionToConnIds.clear();
connToSessionKeys.clear();
},
};
}
export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry {
const recipients = new Map<string, ToolRecipientEntry>();

View File

@@ -56,6 +56,8 @@ const BASE_METHODS = [
"sessions.list",
"sessions.subscribe",
"sessions.unsubscribe",
"sessions.messages.subscribe",
"sessions.messages.unsubscribe",
"sessions.preview",
"sessions.create",
"sessions.send",

View File

@@ -72,6 +72,7 @@ export function appendInjectedAssistantMessageToTranscript(params: {
emitSessionTranscriptUpdate({
sessionFile: params.transcriptPath,
message: messageBody,
messageId,
});
return { ok: true, messageId, message: messageBody };
} catch (err) {

View File

@@ -22,6 +22,8 @@ import {
validateSessionsCreateParams,
validateSessionsDeleteParams,
validateSessionsListParams,
validateSessionsMessagesSubscribeParams,
validateSessionsMessagesUnsubscribeParams,
validateSessionsPatchParams,
validateSessionsPreviewParams,
validateSessionsResetParams,
@@ -83,6 +85,30 @@ function resolveGatewaySessionTargetFromKey(key: string) {
return { cfg, target, storePath: target.storePath };
}
function resolveOptionalInitialSessionMessage(params: {
task?: unknown;
message?: unknown;
}): string | undefined {
if (typeof params.task === "string" && params.task.trim()) {
return params.task;
}
if (typeof params.message === "string" && params.message.trim()) {
return params.message;
}
return undefined;
}
function shouldAttachPendingMessageSeq(params: { payload: unknown; cached?: boolean }): boolean {
if (params.cached) {
return false;
}
const status =
params.payload && typeof params.payload === "object"
? (params.payload as { status?: unknown }).status
: undefined;
return status === "started";
}
function emitSessionsChanged(
context: Pick<GatewayRequestContext, "broadcastToConnIds" | "getSessionEventSubscriberConnIds">,
payload: { sessionKey?: string; reason: string; compacted?: boolean },
@@ -246,6 +272,52 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
respond(true, { subscribed: false }, undefined);
},
"sessions.messages.subscribe": ({ params, client, context, respond }) => {
if (
!assertValidParams(
params,
validateSessionsMessagesSubscribeParams,
"sessions.messages.subscribe",
respond,
)
) {
return;
}
const connId = client?.connId?.trim();
const key = requireSessionKey((params as { key?: unknown }).key, respond);
if (!key) {
return;
}
const { canonicalKey } = loadSessionEntry(key);
if (connId) {
context.subscribeSessionMessageEvents(connId, canonicalKey);
respond(true, { subscribed: true, key: canonicalKey }, undefined);
return;
}
respond(true, { subscribed: false, key: canonicalKey }, undefined);
},
"sessions.messages.unsubscribe": ({ params, client, context, respond }) => {
if (
!assertValidParams(
params,
validateSessionsMessagesUnsubscribeParams,
"sessions.messages.unsubscribe",
respond,
)
) {
return;
}
const connId = client?.connId?.trim();
const key = requireSessionKey((params as { key?: unknown }).key, respond);
if (!key) {
return;
}
const { canonicalKey } = loadSessionEntry(key);
if (connId) {
context.unsubscribeSessionMessageEvents(connId, canonicalKey);
}
respond(true, { subscribed: false, key: canonicalKey }, undefined);
},
"sessions.preview": ({ params, respond }) => {
if (!assertValidParams(params, validateSessionsPreviewParams, "sessions.preview", respond)) {
return;
@@ -322,7 +394,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
respond(true, { ok: true, key: resolved.key }, undefined);
},
"sessions.create": async ({ params, respond, context }) => {
"sessions.create": async ({ req, params, respond, context, client, isWebchatConnect }) => {
if (!assertValidParams(params, validateSessionsCreateParams, "sessions.create", respond)) {
return;
}
@@ -366,6 +438,45 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
const initialMessage = resolveOptionalInitialSessionMessage(p);
let runPayload: Record<string, unknown> | undefined;
let runError: unknown;
let runMeta: Record<string, unknown> | undefined;
const messageSeq = initialMessage
? readSessionMessages(created.entry.sessionId, target.storePath, created.entry.sessionFile)
.length + 1
: undefined;
if (initialMessage) {
await chatHandlers["chat.send"]({
req,
params: {
sessionKey: target.canonicalKey,
message: initialMessage,
idempotencyKey: randomUUID(),
},
respond: (ok, payload, error, meta) => {
if (ok && payload && typeof payload === "object") {
runPayload = payload as Record<string, unknown>;
} else {
runError = error;
}
runMeta = meta;
},
context,
client,
isWebchatConnect,
});
}
const runStarted =
runPayload !== undefined &&
shouldAttachPendingMessageSeq({
payload: runPayload,
cached: runMeta?.cached === true,
});
respond(
true,
{
@@ -373,6 +484,10 @@ export const sessionsHandlers: GatewayRequestHandlers = {
key: target.canonicalKey,
sessionId: created.entry.sessionId,
entry: created.entry,
runStarted,
...(runPayload ? runPayload : {}),
...(runStarted && typeof messageSeq === "number" ? { messageSeq } : {}),
...(runError ? { runError } : {}),
},
undefined,
);
@@ -380,6 +495,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
sessionKey: target.canonicalKey,
reason: "create",
});
if (runStarted) {
emitSessionsChanged(context, {
sessionKey: target.canonicalKey,
reason: "send",
});
}
},
"sessions.send": async ({ req, params, respond, context, client, isWebchatConnect }) => {
if (!assertValidParams(params, validateSessionsSendParams, "sessions.send", respond)) {
@@ -390,7 +511,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
if (!key) {
return;
}
const { entry, canonicalKey } = loadSessionEntry(key);
const { entry, canonicalKey, storePath } = loadSessionEntry(key);
if (!entry?.sessionId) {
respond(
false,
@@ -399,6 +520,8 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
const messageSeq =
readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length + 1;
let sendAcked = false;
await chatHandlers["chat.send"]({
req,
@@ -415,6 +538,18 @@ export const sessionsHandlers: GatewayRequestHandlers = {
},
respond: (ok, payload, error, meta) => {
sendAcked = ok;
if (ok && shouldAttachPendingMessageSeq({ payload, cached: meta?.cached === true })) {
respond(
true,
{
...(payload && typeof payload === "object" ? payload : {}),
messageSeq,
},
undefined,
meta,
);
return;
}
respond(ok, payload, error, meta);
},
context,
@@ -444,7 +579,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
canonicalKey,
runId: typeof p.runId === "string" ? p.runId : undefined,
});
let aborted = false;
let abortedRunId: string | null = null;
await chatHandlers["chat.abort"]({
req,
params: {
@@ -452,18 +587,35 @@ export const sessionsHandlers: GatewayRequestHandlers = {
runId: typeof p.runId === "string" ? p.runId : undefined,
},
respond: (ok, payload, error, meta) => {
aborted =
ok &&
Boolean(
payload && typeof payload === "object" && (payload as { aborted?: boolean }).aborted,
);
respond(ok, payload, error, meta);
if (!ok) {
respond(ok, payload, error, meta);
return;
}
const runIds =
payload &&
typeof payload === "object" &&
Array.isArray((payload as { runIds?: unknown[] }).runIds)
? (payload as { runIds: unknown[] }).runIds.filter(
(value): value is string => typeof value === "string" && value.trim().length > 0,
)
: [];
abortedRunId = runIds[0] ?? null;
respond(
true,
{
ok: true,
abortedRunId,
status: abortedRunId ? "aborted" : "no-active-run",
},
undefined,
meta,
);
},
context,
client,
isWebchatConnect,
});
if (aborted) {
if (abortedRunId) {
emitSessionsChanged(context, {
sessionKey: canonicalKey,
reason: "abort",

View File

@@ -65,6 +65,8 @@ export type GatewayRequestContext = {
) => { sessionKey: string; clientRunId: string } | undefined;
subscribeSessionEvents: (connId: string) => void;
unsubscribeSessionEvents: (connId: string) => void;
subscribeSessionMessageEvents: (connId: string, sessionKey: string) => void;
unsubscribeSessionMessageEvents: (connId: string, sessionKey: string) => void;
unsubscribeAllSessionEvents: (connId: string) => void;
getSessionEventSubscriberConnIds: () => ReadonlySet<string>;
registerToolEventRecipient: (runId: string, connId: string) => void;

View File

@@ -193,6 +193,7 @@ describe("gateway server chat", () => {
});
expect(res.ok).toBe(true);
expect(res.payload?.runId).toBe("idem-sessions-send-1");
expect(res.payload?.messageSeq).toBe(1);
await waitFor(() => spy.mock.calls.length > callsBefore, 1_000);
const ctx = spy.mock.calls.at(-1)?.[0] as { Body?: string; SessionKey?: string } | undefined;
@@ -258,8 +259,17 @@ describe("gateway server chat", () => {
runId: "idem-sessions-abort-1",
});
expect(abortRes.ok).toBe(true);
expect(abortRes.payload?.aborted).toBe(true);
expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1");
expect(abortRes.payload?.status).toBe("aborted");
await waitFor(() => aborted, 1_000);
const idleAbortRes = await rpcReq(ws, "sessions.abort", {
key: "agent:main:dashboard:test-abort",
runId: "idem-sessions-abort-1",
});
expect(idleAbortRes.ok).toBe(true);
expect(idleAbortRes.payload?.abortedRunId).toBeNull();
expect(idleAbortRes.payload?.status).toBe("no-active-run");
} finally {
testState.sessionStorePath = undefined;
await fs.rm(dir, { recursive: true, force: true });

View File

@@ -77,7 +77,11 @@ import { ExecApprovalManager } from "./exec-approval-manager.js";
import { NodeRegistry } from "./node-registry.js";
import type { startBrowserControlServerIfEnabled } from "./server-browser.js";
import { createChannelManager } from "./server-channels.js";
import { createAgentEventHandler, createSessionEventSubscriberRegistry } from "./server-chat.js";
import {
createAgentEventHandler,
createSessionEventSubscriberRegistry,
createSessionMessageSubscriberRegistry,
} from "./server-chat.js";
import { createGatewayCloseHandler } from "./server-close.js";
import { buildGatewayCronService } from "./server-cron.js";
import { startGatewayDiscovery } from "./server-discovery-runtime.js";
@@ -112,6 +116,11 @@ import { resolveHookClientIpConfig } from "./server/hooks.js";
import { createReadinessChecker } from "./server/readiness.js";
import { loadGatewayTlsRuntime } from "./server/tls.js";
import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js";
import {
attachOpenClawTranscriptMeta,
loadSessionEntry,
readSessionMessages,
} from "./session-utils.js";
import {
ensureGatewayStartupAuth,
mergeGatewayAuthConfig,
@@ -634,6 +643,7 @@ export async function startGatewayServer(
const nodePresenceTimers = new Map<string, ReturnType<typeof setInterval>>();
const nodeSubscriptions = createNodeSubscriptionManager();
const sessionEventSubscribers = createSessionEventSubscriberRegistry();
const sessionMessageSubscribers = createSessionMessageSubscriberRegistry();
const nodeSendEvent = (opts: { nodeId: string; event: string; payloadJSON?: string | null }) => {
const payload = safeParseJson(opts.payloadJSON ?? null);
nodeRegistry.sendEvent(opts.nodeId, opts.event, payload);
@@ -760,15 +770,31 @@ export async function startGatewayServer(
if (!sessionKey || update.message === undefined) {
return;
}
const connIds = sessionEventSubscribers.getAll();
const connIds = new Set<string>();
for (const connId of sessionEventSubscribers.getAll()) {
connIds.add(connId);
}
for (const connId of sessionMessageSubscribers.get(sessionKey)) {
connIds.add(connId);
}
if (connIds.size === 0) {
return;
}
const { entry, storePath } = loadSessionEntry(sessionKey);
const messageSeq = entry?.sessionId
? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length
: undefined;
const message = attachOpenClawTranscriptMeta(update.message, {
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
...(typeof messageSeq === "number" ? { seq: messageSeq } : {}),
});
broadcastToConnIds(
"session.message",
{
sessionKey,
message: update.message,
message,
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
...(typeof messageSeq === "number" ? { messageSeq } : {}),
},
connIds,
{ dropIfSlow: true },
@@ -882,7 +908,12 @@ export async function startGatewayServer(
removeChatRun,
subscribeSessionEvents: sessionEventSubscribers.subscribe,
unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe,
unsubscribeAllSessionEvents: sessionEventSubscribers.unsubscribe,
subscribeSessionMessageEvents: sessionMessageSubscribers.subscribe,
unsubscribeSessionMessageEvents: sessionMessageSubscribers.unsubscribe,
unsubscribeAllSessionEvents: (connId: string) => {
sessionEventSubscribers.unsubscribe(connId);
sessionMessageSubscribers.unsubscribeAll(connId);
},
getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll,
registerToolEventRecipient: toolEventRecipients.add,
dedupe,

View File

@@ -17,6 +17,7 @@ import {
trackConnectChallengeNonce,
writeSessionStore,
} from "./test-helpers.js";
import { getReplyFromConfig } from "./test-helpers.mocks.js";
const sessionCleanupMocks = vi.hoisted(() => ({
clearSessionQueues: vi.fn(() => ({ followupCleared: 0, laneCleared: 0, keys: [] })),
@@ -274,6 +275,42 @@ describe("gateway server sessions", () => {
ws.close();
});
test("sessions.create can start the first agent turn from an initial task", async () => {
const { ws } = await openClient();
const replySpy = vi.mocked(getReplyFromConfig);
const callsBefore = replySpy.mock.calls.length;
const created = await rpcReq<{
key?: string;
sessionId?: string;
runStarted?: boolean;
runId?: string;
messageSeq?: number;
}>(ws, "sessions.create", {
agentId: "ops",
label: "Dashboard Chat",
task: "hello from create",
});
expect(created.ok).toBe(true);
expect(created.payload?.key).toMatch(/^agent:ops:dashboard:/);
expect(created.payload?.sessionId).toMatch(
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/,
);
expect(created.payload?.runStarted).toBe(true);
expect(created.payload?.runId).toBeTruthy();
expect(created.payload?.messageSeq).toBe(1);
await vi.waitFor(() => replySpy.mock.calls.length > callsBefore);
const ctx = replySpy.mock.calls.at(-1)?.[0] as
| { Body?: string; SessionKey?: string }
| undefined;
expect(ctx?.Body).toContain("hello from create");
expect(ctx?.SessionKey).toBe(created.payload?.key);
ws.close();
});
test("lists and patches session store via sessions.* RPC", async () => {
const { dir, storePath } = await createSessionStoreDir();
const now = Date.now();

View File

@@ -136,6 +136,115 @@ describe("session.message websocket events", () => {
(event.payload as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("live websocket message");
expect((event.payload as { messageSeq?: number }).messageSeq).toBe(1);
expect(
(
event.payload as {
message?: { __openclaw?: { id?: string; seq?: number } };
}
).message?.__openclaw,
).toMatchObject({
id: appended.messageId,
seq: 1,
});
} finally {
ws.close();
}
} finally {
await harness.close();
}
});
test("sessions.messages.subscribe only delivers transcript events for the requested session", async () => {
const storePath = await createSessionStoreFile();
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
worker: {
sessionId: "sess-worker",
updatedAt: Date.now(),
},
},
storePath,
});
const harness = await createGatewaySuiteHarness();
try {
const ws = await harness.openWs();
try {
await connectOk(ws, { scopes: ["operator.read"] });
const subscribeRes = await rpcReq(ws, "sessions.messages.subscribe", {
key: "agent:main:main",
});
expect(subscribeRes.ok).toBe(true);
expect(subscribeRes.payload?.subscribed).toBe(true);
expect(subscribeRes.payload?.key).toBe("agent:main:main");
const mainEvent = onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
);
const workerEvent = Promise.race([
onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:worker",
).then(() => "received"),
new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)),
]);
const [mainAppend] = await Promise.all([
appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "main only",
storePath,
}),
mainEvent,
]);
expect(mainAppend.ok).toBe(true);
const workerAppend = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:worker",
text: "worker hidden",
storePath,
});
expect(workerAppend.ok).toBe(true);
await expect(workerEvent).resolves.toBe("timeout");
const unsubscribeRes = await rpcReq(ws, "sessions.messages.unsubscribe", {
key: "agent:main:main",
});
expect(unsubscribeRes.ok).toBe(true);
expect(unsubscribeRes.payload?.subscribed).toBe(false);
const postUnsubscribeEvent = Promise.race([
onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
).then(() => "received"),
new Promise((resolve) => setTimeout(() => resolve("timeout"), 300)),
]);
const hiddenAppend = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "hidden after unsubscribe",
storePath,
});
expect(hiddenAppend.ok).toBe(true);
await expect(postUnsubscribeEvent).resolves.toBe("timeout");
} finally {
ws.close();
}

View File

@@ -71,6 +71,27 @@ function setCachedSessionTitleFields(cacheKey: string, stat: fs.Stats, value: Se
}
}
export function attachOpenClawTranscriptMeta(
message: unknown,
meta: Record<string, unknown>,
): unknown {
if (!message || typeof message !== "object" || Array.isArray(message)) {
return message;
}
const record = message as Record<string, unknown>;
const existing =
record.__openclaw && typeof record.__openclaw === "object" && !Array.isArray(record.__openclaw)
? (record.__openclaw as Record<string, unknown>)
: {};
return {
...record,
__openclaw: {
...existing,
...meta,
},
};
}
export function readSessionMessages(
sessionId: string,
storePath: string | undefined,
@@ -85,6 +106,7 @@ export function readSessionMessages(
const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
const messages: unknown[] = [];
let messageSeq = 0;
for (const line of lines) {
if (!line.trim()) {
continue;
@@ -92,7 +114,13 @@ export function readSessionMessages(
try {
const parsed = JSON.parse(line);
if (parsed?.message) {
messages.push(parsed.message);
messageSeq += 1;
messages.push(
attachOpenClawTranscriptMeta(parsed.message, {
...(typeof parsed.id === "string" ? { id: parsed.id } : {}),
seq: messageSeq,
}),
);
continue;
}
@@ -101,6 +129,7 @@ export function readSessionMessages(
if (parsed?.type === "compaction") {
const ts = typeof parsed.timestamp === "string" ? Date.parse(parsed.timestamp) : Number.NaN;
const timestamp = Number.isFinite(ts) ? ts : Date.now();
messageSeq += 1;
messages.push({
role: "system",
content: [{ type: "text", text: "Compaction" }],
@@ -108,6 +137,7 @@ export function readSessionMessages(
__openclaw: {
kind: "compaction",
id: typeof parsed.id === "string" ? parsed.id : undefined,
seq: messageSeq,
},
});
}

View File

@@ -56,6 +56,7 @@ import type {
export {
archiveFileOnDisk,
archiveSessionTranscripts,
attachOpenClawTranscriptMeta,
capArrayByJsonBytes,
readFirstUserMessageFromTranscript,
readLastMessagePreviewFromTranscript,

View File

@@ -105,6 +105,15 @@ describe("session history HTTP endpoints", () => {
expect(body.sessionKey).toBe("agent:main:main");
expect(body.messages).toHaveLength(1);
expect(body.messages?.[0]?.content?.[0]?.text).toBe("hello from history");
expect(
(
body.messages?.[0] as {
__openclaw?: { id?: string; seq?: number };
}
)?.__openclaw,
).toMatchObject({
seq: 1,
});
} finally {
await harness.close();
}
@@ -210,6 +219,17 @@ describe("session history HTTP endpoints", () => {
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("second message");
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(2);
expect(
(
messageEvent.data as {
message?: { __openclaw?: { id?: string; seq?: number } };
}
).message?.__openclaw,
).toMatchObject({
id: appended.messageId,
seq: 2,
});
await reader?.cancel();
} finally {

View File

@@ -15,6 +15,7 @@ import {
} from "./http-common.js";
import { getBearerToken, getHeader } from "./http-utils.js";
import {
attachOpenClawTranscriptMeta,
readSessionMessages,
resolveGatewaySessionStoreTarget,
resolveSessionTranscriptCandidates,
@@ -171,15 +172,22 @@ export async function handleSessionHistoryHttpRequest(
return;
}
if (update.message !== undefined) {
const messageSeq = sentMessages.length + 1;
const nextMessage = attachOpenClawTranscriptMeta(update.message, {
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
seq: messageSeq,
});
if (limit === undefined) {
sentMessages = [...sentMessages, update.message];
sentMessages = [...sentMessages, nextMessage];
sseWrite(res, "message", {
sessionKey: target.canonicalKey,
message: update.message,
message: nextMessage,
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
messageSeq,
});
return;
}
sentMessages = maybeLimitMessages([...sentMessages, update.message], limit);
sentMessages = maybeLimitMessages([...sentMessages, nextMessage], limit);
sseWrite(res, "history", {
sessionKey: target.canonicalKey,
messages: sentMessages,

View File

@@ -2,6 +2,7 @@ export type SessionTranscriptUpdate = {
sessionFile: string;
sessionKey?: string;
message?: unknown;
messageId?: string;
};
type SessionTranscriptListener = (update: SessionTranscriptUpdate) => void;
@@ -23,6 +24,7 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
sessionFile: update.sessionFile,
sessionKey: update.sessionKey,
message: update.message,
messageId: update.messageId,
};
const trimmed = normalized.sessionFile.trim();
if (!trimmed) {
@@ -34,6 +36,9 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
? { sessionKey: normalized.sessionKey.trim() }
: {}),
...(normalized.message !== undefined ? { message: normalized.message } : {}),
...(typeof normalized.messageId === "string" && normalized.messageId.trim()
? { messageId: normalized.messageId.trim() }
: {}),
};
for (const listener of SESSION_TRANSCRIPT_LISTENERS) {
try {