test(gateway): dedupe update and chat abort persistence fixtures

This commit is contained in:
Peter Steinberger
2026-02-18 12:43:54 +00:00
parent bb84452c62
commit 2aec380fb3
2 changed files with 128 additions and 97 deletions

View File

@@ -72,25 +72,71 @@ function setMockSessionEntry(transcriptPath: string, sessionId: string) {
sessionEntryState.sessionId = sessionId; sessionEntryState.sessionId = sessionId;
} }
async function createTranscriptFixture(prefix: string) {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
const sessionId = "sess-main";
const transcriptPath = path.join(dir, `${sessionId}.jsonl`);
await writeTranscriptHeader(transcriptPath, sessionId);
setMockSessionEntry(transcriptPath, sessionId);
return { transcriptPath, sessionId };
}
function createChatAbortContext(overrides: Record<string, unknown> = {}): {
chatAbortControllers: Map<string, ReturnType<typeof createActiveRun>>;
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
chatAbortedRuns: Map<string, number>;
removeChatRun: ReturnType<typeof vi.fn>;
agentRunSeq: Map<string, number>;
broadcast: ReturnType<typeof vi.fn>;
nodeSendToSession: ReturnType<typeof vi.fn>;
logGateway: { warn: ReturnType<typeof vi.fn> };
dedupe?: { get: ReturnType<typeof vi.fn> };
} {
return {
chatAbortControllers: new Map(),
chatRunBuffers: new Map(),
chatDeltaSentAt: new Map(),
chatAbortedRuns: new Map<string, number>(),
removeChatRun: vi
.fn()
.mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })),
agentRunSeq: new Map<string, number>(),
broadcast: vi.fn(),
nodeSendToSession: vi.fn(),
logGateway: { warn: vi.fn() },
...overrides,
};
}
async function invokeChatAbort(
context: ReturnType<typeof createChatAbortContext>,
params: { sessionKey: string; runId?: string },
respond: ReturnType<typeof vi.fn>,
) {
await chatHandlers["chat.abort"]({
params,
respond: respond as never,
context: context as never,
req: {} as never,
client: null,
isWebchatConnect: () => false,
});
}
afterEach(() => { afterEach(() => {
vi.restoreAllMocks(); vi.restoreAllMocks();
}); });
describe("chat abort transcript persistence", () => { describe("chat abort transcript persistence", () => {
it("persists run-scoped abort partial with rpc metadata and idempotency", async () => { it("persists run-scoped abort partial with rpc metadata and idempotency", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-run-")); const { transcriptPath, sessionId } = await createTranscriptFixture("openclaw-chat-abort-run-");
const transcriptPath = path.join(dir, "sess-main.jsonl");
const sessionId = "sess-main";
const runId = "idem-abort-run-1"; const runId = "idem-abort-run-1";
await writeTranscriptHeader(transcriptPath, sessionId);
setMockSessionEntry(transcriptPath, sessionId);
const respond = vi.fn(); const respond = vi.fn();
const context = { const context = createChatAbortContext({
chatAbortControllers: new Map([[runId, createActiveRun("main", sessionId)]]), chatAbortControllers: new Map([[runId, createActiveRun("main", sessionId)]]),
chatRunBuffers: new Map([[runId, "Partial from run abort"]]), chatRunBuffers: new Map([[runId, "Partial from run abort"]]),
chatDeltaSentAt: new Map([[runId, Date.now()]]), chatDeltaSentAt: new Map([[runId, Date.now()]]),
chatAbortedRuns: new Map<string, number>(),
removeChatRun: vi removeChatRun: vi
.fn() .fn()
.mockReturnValue({ sessionKey: "main", clientRunId: "client-idem-abort-run-1" }), .mockReturnValue({ sessionKey: "main", clientRunId: "client-idem-abort-run-1" }),
@@ -101,17 +147,10 @@ describe("chat abort transcript persistence", () => {
broadcast: vi.fn(), broadcast: vi.fn(),
nodeSendToSession: vi.fn(), nodeSendToSession: vi.fn(),
logGateway: { warn: vi.fn() }, logGateway: { warn: vi.fn() },
};
await chatHandlers["chat.abort"]({
params: { sessionKey: "main", runId },
respond,
context: context as never,
req: {} as never,
client: null,
isWebchatConnect: () => false,
}); });
await invokeChatAbort(context, { sessionKey: "main", runId }, respond);
const [ok1, payload1] = respond.mock.calls.at(-1) ?? []; const [ok1, payload1] = respond.mock.calls.at(-1) ?? [];
expect(ok1).toBe(true); expect(ok1).toBe(true);
expect(payload1).toMatchObject({ aborted: true, runIds: [runId] }); expect(payload1).toMatchObject({ aborted: true, runIds: [runId] });
@@ -120,14 +159,7 @@ describe("chat abort transcript persistence", () => {
context.chatRunBuffers.set(runId, "Partial from run abort"); context.chatRunBuffers.set(runId, "Partial from run abort");
context.chatDeltaSentAt.set(runId, Date.now()); context.chatDeltaSentAt.set(runId, Date.now());
await chatHandlers["chat.abort"]({ await invokeChatAbort(context, { sessionKey: "main", runId }, respond);
params: { sessionKey: "main", runId },
respond,
context: context as never,
req: {} as never,
client: null,
isWebchatConnect: () => false,
});
const lines = await readTranscriptLines(transcriptPath); const lines = await readTranscriptLines(transcriptPath);
const persisted = lines const persisted = lines
@@ -150,14 +182,11 @@ describe("chat abort transcript persistence", () => {
}); });
it("persists session-scoped abort partials with rpc metadata", async () => { it("persists session-scoped abort partials with rpc metadata", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-session-")); const { transcriptPath, sessionId } = await createTranscriptFixture(
const transcriptPath = path.join(dir, "sess-main.jsonl"); "openclaw-chat-abort-session-",
const sessionId = "sess-main"; );
await writeTranscriptHeader(transcriptPath, sessionId);
setMockSessionEntry(transcriptPath, sessionId);
const respond = vi.fn(); const respond = vi.fn();
const context = { const context = createChatAbortContext({
chatAbortControllers: new Map([ chatAbortControllers: new Map([
["run-a", createActiveRun("main", sessionId)], ["run-a", createActiveRun("main", sessionId)],
["run-b", createActiveRun("main", sessionId)], ["run-b", createActiveRun("main", sessionId)],
@@ -170,25 +199,10 @@ describe("chat abort transcript persistence", () => {
["run-a", Date.now()], ["run-a", Date.now()],
["run-b", Date.now()], ["run-b", Date.now()],
]), ]),
chatAbortedRuns: new Map<string, number>(),
removeChatRun: vi
.fn()
.mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })),
agentRunSeq: new Map<string, number>(),
broadcast: vi.fn(),
nodeSendToSession: vi.fn(),
logGateway: { warn: vi.fn() },
};
await chatHandlers["chat.abort"]({
params: { sessionKey: "main" },
respond,
context: context as never,
req: {} as never,
client: null,
isWebchatConnect: () => false,
}); });
await invokeChatAbort(context, { sessionKey: "main" }, respond);
const [ok, payload] = respond.mock.calls.at(-1) ?? []; const [ok, payload] = respond.mock.calls.at(-1) ?? [];
expect(ok).toBe(true); expect(ok).toBe(true);
expect(payload).toMatchObject({ aborted: true }); expect(payload).toMatchObject({ aborted: true });
@@ -214,27 +228,18 @@ describe("chat abort transcript persistence", () => {
}); });
it("persists /stop partials with stop-command metadata", async () => { it("persists /stop partials with stop-command metadata", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-stop-")); const { transcriptPath, sessionId } = await createTranscriptFixture("openclaw-chat-stop-");
const transcriptPath = path.join(dir, "sess-main.jsonl");
const sessionId = "sess-main";
await writeTranscriptHeader(transcriptPath, sessionId);
setMockSessionEntry(transcriptPath, sessionId);
const respond = vi.fn(); const respond = vi.fn();
const context = { const context = createChatAbortContext({
chatAbortControllers: new Map([["run-stop-1", createActiveRun("main", sessionId)]]), chatAbortControllers: new Map([["run-stop-1", createActiveRun("main", sessionId)]]),
chatRunBuffers: new Map([["run-stop-1", "Partial from /stop"]]), chatRunBuffers: new Map([["run-stop-1", "Partial from /stop"]]),
chatDeltaSentAt: new Map([["run-stop-1", Date.now()]]), chatDeltaSentAt: new Map([["run-stop-1", Date.now()]]),
chatAbortedRuns: new Map<string, number>(),
removeChatRun: vi.fn().mockReturnValue({ sessionKey: "main", clientRunId: "client-stop-1" }), removeChatRun: vi.fn().mockReturnValue({ sessionKey: "main", clientRunId: "client-stop-1" }),
agentRunSeq: new Map<string, number>([["run-stop-1", 1]]), agentRunSeq: new Map<string, number>([["run-stop-1", 1]]),
broadcast: vi.fn(),
nodeSendToSession: vi.fn(),
logGateway: { warn: vi.fn() },
dedupe: { dedupe: {
get: vi.fn(), get: vi.fn(),
}, },
}; });
await chatHandlers["chat.send"]({ await chatHandlers["chat.send"]({
params: { params: {
@@ -267,4 +272,29 @@ describe("chat abort transcript persistence", () => {
}, },
}); });
}); });
it("skips run-scoped transcript persistence when partial text is blank", async () => {
const { transcriptPath, sessionId } = await createTranscriptFixture(
"openclaw-chat-abort-run-blank-",
);
const runId = "idem-abort-run-blank";
const respond = vi.fn();
const context = createChatAbortContext({
chatAbortControllers: new Map([[runId, createActiveRun("main", sessionId)]]),
chatRunBuffers: new Map([[runId, " \n\t "]]),
chatDeltaSentAt: new Map([[runId, Date.now()]]),
});
await invokeChatAbort(context, { sessionKey: "main", runId }, respond);
const [ok, payload] = respond.mock.calls.at(-1) ?? [];
expect(ok).toBe(true);
expect(payload).toMatchObject({ aborted: true, runIds: [runId] });
const lines = await readTranscriptLines(transcriptPath);
const persisted = lines
.map((line) => line.message)
.find((message) => message?.idempotencyKey === `${runId}:assistant`);
expect(persisted).toBeUndefined();
});
}); });

View File

@@ -88,19 +88,26 @@ beforeEach(() => {
scheduleGatewaySigusr1RestartMock.mockReturnValue({ scheduled: true }); scheduleGatewaySigusr1RestartMock.mockReturnValue({ scheduled: true });
}); });
async function invokeUpdateRun(
params: Record<string, unknown>,
respond: ((ok: boolean, response?: unknown) => void) | undefined = undefined,
) {
const { updateHandlers } = await import("./update.js");
const onRespond = respond ?? (() => {});
await updateHandlers["update.run"]({
params,
respond: onRespond as never,
} as never);
}
describe("update.run sentinel deliveryContext", () => { describe("update.run sentinel deliveryContext", () => {
it("includes deliveryContext in sentinel payload when sessionKey is provided", async () => { it("includes deliveryContext in sentinel payload when sessionKey is provided", async () => {
capturedPayload = undefined; capturedPayload = undefined;
const { updateHandlers } = await import("./update.js");
const handler = updateHandlers["update.run"];
let responded = false; let responded = false;
await handler({ await invokeUpdateRun({ sessionKey: "agent:main:webchat:dm:user-123" }, () => {
params: { sessionKey: "agent:main:webchat:dm:user-123" }, responded = true;
respond: () => { });
responded = true;
},
} as never);
expect(responded).toBe(true); expect(responded).toBe(true);
expect(capturedPayload).toBeDefined(); expect(capturedPayload).toBeDefined();
@@ -113,13 +120,8 @@ describe("update.run sentinel deliveryContext", () => {
it("omits deliveryContext when no sessionKey is provided", async () => { it("omits deliveryContext when no sessionKey is provided", async () => {
capturedPayload = undefined; capturedPayload = undefined;
const { updateHandlers } = await import("./update.js");
const handler = updateHandlers["update.run"];
await handler({ await invokeUpdateRun({});
params: {},
respond: () => {},
} as never);
expect(capturedPayload).toBeDefined(); expect(capturedPayload).toBeDefined();
expect(capturedPayload!.deliveryContext).toBeUndefined(); expect(capturedPayload!.deliveryContext).toBeUndefined();
@@ -128,13 +130,8 @@ describe("update.run sentinel deliveryContext", () => {
it("includes threadId in sentinel payload for threaded sessions", async () => { it("includes threadId in sentinel payload for threaded sessions", async () => {
capturedPayload = undefined; capturedPayload = undefined;
const { updateHandlers } = await import("./update.js");
const handler = updateHandlers["update.run"];
await handler({ await invokeUpdateRun({ sessionKey: "agent:main:slack:dm:C0123ABC:thread:1234567890.123456" });
params: { sessionKey: "agent:main:slack:dm:C0123ABC:thread:1234567890.123456" },
respond: () => {},
} as never);
expect(capturedPayload).toBeDefined(); expect(capturedPayload).toBeDefined();
expect(capturedPayload!.deliveryContext).toEqual({ expect(capturedPayload!.deliveryContext).toEqual({
@@ -146,18 +143,26 @@ describe("update.run sentinel deliveryContext", () => {
}); });
}); });
describe("update.run timeout normalization", () => {
it("enforces a 1000ms minimum timeout for tiny values", async () => {
await invokeUpdateRun({ timeoutMs: 1 });
expect(runGatewayUpdateMock).toHaveBeenCalledWith(
expect.objectContaining({
timeoutMs: 1000,
}),
);
});
});
describe("update.run restart scheduling", () => { describe("update.run restart scheduling", () => {
it("schedules restart when update succeeds", async () => { it("schedules restart when update succeeds", async () => {
const { updateHandlers } = await import("./update.js");
const handler = updateHandlers["update.run"];
let payload: { ok: boolean; restart: unknown } | undefined; let payload: { ok: boolean; restart: unknown } | undefined;
await handler({ await invokeUpdateRun({}, (_ok: boolean, response: unknown) => {
params: {}, const typed = response as { ok: boolean; restart: unknown };
respond: (_ok: boolean, response: { ok: boolean; restart: unknown }) => { payload = typed;
payload = response; });
},
} as never);
expect(scheduleGatewaySigusr1RestartMock).toHaveBeenCalledTimes(1); expect(scheduleGatewaySigusr1RestartMock).toHaveBeenCalledTimes(1);
expect(payload?.ok).toBe(true); expect(payload?.ok).toBe(true);
@@ -173,16 +178,12 @@ describe("update.run restart scheduling", () => {
durationMs: 100, durationMs: 100,
}); });
const { updateHandlers } = await import("./update.js");
const handler = updateHandlers["update.run"];
let payload: { ok: boolean; restart: unknown } | undefined; let payload: { ok: boolean; restart: unknown } | undefined;
await handler({ await invokeUpdateRun({}, (_ok: boolean, response: unknown) => {
params: {}, const typed = response as { ok: boolean; restart: unknown };
respond: (_ok: boolean, response: { ok: boolean; restart: unknown }) => { payload = typed;
payload = response; });
},
} as never);
expect(scheduleGatewaySigusr1RestartMock).not.toHaveBeenCalled(); expect(scheduleGatewaySigusr1RestartMock).not.toHaveBeenCalled();
expect(payload?.ok).toBe(false); expect(payload?.ok).toBe(false);