diff --git a/src/gateway/server-methods/chat.abort-persistence.test.ts b/src/gateway/server-methods/chat.abort-persistence.test.ts index df8551e6c26..b7add3740eb 100644 --- a/src/gateway/server-methods/chat.abort-persistence.test.ts +++ b/src/gateway/server-methods/chat.abort-persistence.test.ts @@ -72,25 +72,71 @@ function setMockSessionEntry(transcriptPath: string, sessionId: string) { sessionEntryState.sessionId = sessionId; } +async function createTranscriptFixture(prefix: string) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); + const sessionId = "sess-main"; + const transcriptPath = path.join(dir, `${sessionId}.jsonl`); + await writeTranscriptHeader(transcriptPath, sessionId); + setMockSessionEntry(transcriptPath, sessionId); + return { transcriptPath, sessionId }; +} + +function createChatAbortContext(overrides: Record = {}): { + chatAbortControllers: Map>; + chatRunBuffers: Map; + chatDeltaSentAt: Map; + chatAbortedRuns: Map; + removeChatRun: ReturnType; + agentRunSeq: Map; + broadcast: ReturnType; + nodeSendToSession: ReturnType; + logGateway: { warn: ReturnType }; + dedupe?: { get: ReturnType }; +} { + return { + chatAbortControllers: new Map(), + chatRunBuffers: new Map(), + chatDeltaSentAt: new Map(), + chatAbortedRuns: new Map(), + removeChatRun: vi + .fn() + .mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })), + agentRunSeq: new Map(), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + logGateway: { warn: vi.fn() }, + ...overrides, + }; +} + +async function invokeChatAbort( + context: ReturnType, + params: { sessionKey: string; runId?: string }, + respond: ReturnType, +) { + await chatHandlers["chat.abort"]({ + params, + respond: respond as never, + context: context as never, + req: {} as never, + client: null, + isWebchatConnect: () => false, + }); +} + afterEach(() => { vi.restoreAllMocks(); }); describe("chat abort transcript persistence", () => { it("persists run-scoped abort partial with rpc metadata and idempotency", async () => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-run-")); - const transcriptPath = path.join(dir, "sess-main.jsonl"); - const sessionId = "sess-main"; + const { transcriptPath, sessionId } = await createTranscriptFixture("openclaw-chat-abort-run-"); const runId = "idem-abort-run-1"; - await writeTranscriptHeader(transcriptPath, sessionId); - - setMockSessionEntry(transcriptPath, sessionId); const respond = vi.fn(); - const context = { + const context = createChatAbortContext({ chatAbortControllers: new Map([[runId, createActiveRun("main", sessionId)]]), chatRunBuffers: new Map([[runId, "Partial from run abort"]]), chatDeltaSentAt: new Map([[runId, Date.now()]]), - chatAbortedRuns: new Map(), removeChatRun: vi .fn() .mockReturnValue({ sessionKey: "main", clientRunId: "client-idem-abort-run-1" }), @@ -101,17 +147,10 @@ describe("chat abort transcript persistence", () => { broadcast: vi.fn(), nodeSendToSession: vi.fn(), logGateway: { warn: vi.fn() }, - }; - - await chatHandlers["chat.abort"]({ - params: { sessionKey: "main", runId }, - respond, - context: context as never, - req: {} as never, - client: null, - isWebchatConnect: () => false, }); + await invokeChatAbort(context, { sessionKey: "main", runId }, respond); + const [ok1, payload1] = respond.mock.calls.at(-1) ?? []; expect(ok1).toBe(true); expect(payload1).toMatchObject({ aborted: true, runIds: [runId] }); @@ -120,14 +159,7 @@ describe("chat abort transcript persistence", () => { context.chatRunBuffers.set(runId, "Partial from run abort"); context.chatDeltaSentAt.set(runId, Date.now()); - await chatHandlers["chat.abort"]({ - params: { sessionKey: "main", runId }, - respond, - context: context as never, - req: {} as never, - client: null, - isWebchatConnect: () => false, - }); + await invokeChatAbort(context, { sessionKey: "main", runId }, respond); const lines = await readTranscriptLines(transcriptPath); const persisted = lines @@ -150,14 +182,11 @@ describe("chat abort transcript persistence", () => { }); it("persists session-scoped abort partials with rpc metadata", async () => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-session-")); - const transcriptPath = path.join(dir, "sess-main.jsonl"); - const sessionId = "sess-main"; - await writeTranscriptHeader(transcriptPath, sessionId); - - setMockSessionEntry(transcriptPath, sessionId); + const { transcriptPath, sessionId } = await createTranscriptFixture( + "openclaw-chat-abort-session-", + ); const respond = vi.fn(); - const context = { + const context = createChatAbortContext({ chatAbortControllers: new Map([ ["run-a", createActiveRun("main", sessionId)], ["run-b", createActiveRun("main", sessionId)], @@ -170,25 +199,10 @@ describe("chat abort transcript persistence", () => { ["run-a", Date.now()], ["run-b", Date.now()], ]), - chatAbortedRuns: new Map(), - removeChatRun: vi - .fn() - .mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })), - agentRunSeq: new Map(), - broadcast: vi.fn(), - nodeSendToSession: vi.fn(), - logGateway: { warn: vi.fn() }, - }; - - await chatHandlers["chat.abort"]({ - params: { sessionKey: "main" }, - respond, - context: context as never, - req: {} as never, - client: null, - isWebchatConnect: () => false, }); + await invokeChatAbort(context, { sessionKey: "main" }, respond); + const [ok, payload] = respond.mock.calls.at(-1) ?? []; expect(ok).toBe(true); expect(payload).toMatchObject({ aborted: true }); @@ -214,27 +228,18 @@ describe("chat abort transcript persistence", () => { }); it("persists /stop partials with stop-command metadata", async () => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-stop-")); - const transcriptPath = path.join(dir, "sess-main.jsonl"); - const sessionId = "sess-main"; - await writeTranscriptHeader(transcriptPath, sessionId); - - setMockSessionEntry(transcriptPath, sessionId); + const { transcriptPath, sessionId } = await createTranscriptFixture("openclaw-chat-stop-"); const respond = vi.fn(); - const context = { + const context = createChatAbortContext({ chatAbortControllers: new Map([["run-stop-1", createActiveRun("main", sessionId)]]), chatRunBuffers: new Map([["run-stop-1", "Partial from /stop"]]), chatDeltaSentAt: new Map([["run-stop-1", Date.now()]]), - chatAbortedRuns: new Map(), removeChatRun: vi.fn().mockReturnValue({ sessionKey: "main", clientRunId: "client-stop-1" }), agentRunSeq: new Map([["run-stop-1", 1]]), - broadcast: vi.fn(), - nodeSendToSession: vi.fn(), - logGateway: { warn: vi.fn() }, dedupe: { get: vi.fn(), }, - }; + }); await chatHandlers["chat.send"]({ params: { @@ -267,4 +272,29 @@ describe("chat abort transcript persistence", () => { }, }); }); + + it("skips run-scoped transcript persistence when partial text is blank", async () => { + const { transcriptPath, sessionId } = await createTranscriptFixture( + "openclaw-chat-abort-run-blank-", + ); + const runId = "idem-abort-run-blank"; + const respond = vi.fn(); + const context = createChatAbortContext({ + chatAbortControllers: new Map([[runId, createActiveRun("main", sessionId)]]), + chatRunBuffers: new Map([[runId, " \n\t "]]), + chatDeltaSentAt: new Map([[runId, Date.now()]]), + }); + + await invokeChatAbort(context, { sessionKey: "main", runId }, respond); + + const [ok, payload] = respond.mock.calls.at(-1) ?? []; + expect(ok).toBe(true); + expect(payload).toMatchObject({ aborted: true, runIds: [runId] }); + + const lines = await readTranscriptLines(transcriptPath); + const persisted = lines + .map((line) => line.message) + .find((message) => message?.idempotencyKey === `${runId}:assistant`); + expect(persisted).toBeUndefined(); + }); }); diff --git a/src/gateway/server-methods/update.test.ts b/src/gateway/server-methods/update.test.ts index 4468ba35ccf..93dbe59342e 100644 --- a/src/gateway/server-methods/update.test.ts +++ b/src/gateway/server-methods/update.test.ts @@ -88,19 +88,26 @@ beforeEach(() => { scheduleGatewaySigusr1RestartMock.mockReturnValue({ scheduled: true }); }); +async function invokeUpdateRun( + params: Record, + respond: ((ok: boolean, response?: unknown) => void) | undefined = undefined, +) { + const { updateHandlers } = await import("./update.js"); + const onRespond = respond ?? (() => {}); + await updateHandlers["update.run"]({ + params, + respond: onRespond as never, + } as never); +} + describe("update.run sentinel deliveryContext", () => { it("includes deliveryContext in sentinel payload when sessionKey is provided", async () => { capturedPayload = undefined; - const { updateHandlers } = await import("./update.js"); - const handler = updateHandlers["update.run"]; let responded = false; - await handler({ - params: { sessionKey: "agent:main:webchat:dm:user-123" }, - respond: () => { - responded = true; - }, - } as never); + await invokeUpdateRun({ sessionKey: "agent:main:webchat:dm:user-123" }, () => { + responded = true; + }); expect(responded).toBe(true); expect(capturedPayload).toBeDefined(); @@ -113,13 +120,8 @@ describe("update.run sentinel deliveryContext", () => { it("omits deliveryContext when no sessionKey is provided", async () => { capturedPayload = undefined; - const { updateHandlers } = await import("./update.js"); - const handler = updateHandlers["update.run"]; - await handler({ - params: {}, - respond: () => {}, - } as never); + await invokeUpdateRun({}); expect(capturedPayload).toBeDefined(); expect(capturedPayload!.deliveryContext).toBeUndefined(); @@ -128,13 +130,8 @@ describe("update.run sentinel deliveryContext", () => { it("includes threadId in sentinel payload for threaded sessions", async () => { capturedPayload = undefined; - const { updateHandlers } = await import("./update.js"); - const handler = updateHandlers["update.run"]; - await handler({ - params: { sessionKey: "agent:main:slack:dm:C0123ABC:thread:1234567890.123456" }, - respond: () => {}, - } as never); + await invokeUpdateRun({ sessionKey: "agent:main:slack:dm:C0123ABC:thread:1234567890.123456" }); expect(capturedPayload).toBeDefined(); expect(capturedPayload!.deliveryContext).toEqual({ @@ -146,18 +143,26 @@ describe("update.run sentinel deliveryContext", () => { }); }); +describe("update.run timeout normalization", () => { + it("enforces a 1000ms minimum timeout for tiny values", async () => { + await invokeUpdateRun({ timeoutMs: 1 }); + + expect(runGatewayUpdateMock).toHaveBeenCalledWith( + expect.objectContaining({ + timeoutMs: 1000, + }), + ); + }); +}); + describe("update.run restart scheduling", () => { it("schedules restart when update succeeds", async () => { - const { updateHandlers } = await import("./update.js"); - const handler = updateHandlers["update.run"]; let payload: { ok: boolean; restart: unknown } | undefined; - await handler({ - params: {}, - respond: (_ok: boolean, response: { ok: boolean; restart: unknown }) => { - payload = response; - }, - } as never); + await invokeUpdateRun({}, (_ok: boolean, response: unknown) => { + const typed = response as { ok: boolean; restart: unknown }; + payload = typed; + }); expect(scheduleGatewaySigusr1RestartMock).toHaveBeenCalledTimes(1); expect(payload?.ok).toBe(true); @@ -173,16 +178,12 @@ describe("update.run restart scheduling", () => { durationMs: 100, }); - const { updateHandlers } = await import("./update.js"); - const handler = updateHandlers["update.run"]; let payload: { ok: boolean; restart: unknown } | undefined; - await handler({ - params: {}, - respond: (_ok: boolean, response: { ok: boolean; restart: unknown }) => { - payload = response; - }, - } as never); + await invokeUpdateRun({}, (_ok: boolean, response: unknown) => { + const typed = response as { ok: boolean; restart: unknown }; + payload = typed; + }); expect(scheduleGatewaySigusr1RestartMock).not.toHaveBeenCalled(); expect(payload?.ok).toBe(false);