ACPX: ignore replayed updates outside active prompt

This commit is contained in:
Onur
2026-02-28 22:46:52 +01:00
committed by Onur Solmaz
parent d669b27a45
commit f88bc09f85
2 changed files with 86 additions and 36 deletions

View File

@@ -5,9 +5,24 @@ function jsonLine(payload: unknown): string {
return JSON.stringify(payload); return JSON.stringify(payload);
} }
function beginPrompt(projector: PromptStreamProjector, id = "req-1") {
projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id,
method: "session/prompt",
params: {
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
},
}),
);
}
describe("PromptStreamProjector", () => { describe("PromptStreamProjector", () => {
it("maps agent message chunks to output deltas", () => { it("maps agent message chunks to output deltas", () => {
const projector = new PromptStreamProjector(); const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine( const event = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
@@ -34,6 +49,7 @@ describe("PromptStreamProjector", () => {
it("preserves leading spaces in streamed output chunks", () => { it("preserves leading spaces in streamed output chunks", () => {
const projector = new PromptStreamProjector(); const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine( const event = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
@@ -60,6 +76,7 @@ describe("PromptStreamProjector", () => {
it("maps agent thought chunks to thought deltas", () => { it("maps agent thought chunks to thought deltas", () => {
const projector = new PromptStreamProjector(); const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine( const event = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
@@ -86,6 +103,7 @@ describe("PromptStreamProjector", () => {
it("maps tool call updates to tool_call events", () => { it("maps tool call updates to tool_call events", () => {
const projector = new PromptStreamProjector(); const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine( const event = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
@@ -108,19 +126,53 @@ describe("PromptStreamProjector", () => {
}); });
}); });
it("maps prompt response stop reasons to done events", () => { it("ignores replayed updates before current prompt starts", () => {
const projector = new PromptStreamProjector(); const projector = new PromptStreamProjector();
projector.ingestLine( const replayed = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
id: "req-1", method: "session/update",
method: "session/prompt",
params: { params: {
sessionId: "session-1", sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }], update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "old turn",
},
},
}, },
}), }),
); );
beginPrompt(projector, "req-2");
const current = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "new turn",
},
},
},
}),
);
expect(replayed).toBeNull();
expect(current).toEqual({
type: "text_delta",
text: "new turn",
stream: "output",
});
});
it("maps prompt response stop reasons to done events", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine( const event = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
@@ -139,17 +191,7 @@ describe("PromptStreamProjector", () => {
it("maps json-rpc errors to runtime errors", () => { it("maps json-rpc errors to runtime errors", () => {
const projector = new PromptStreamProjector(); const projector = new PromptStreamProjector();
projector.ingestLine( beginPrompt(projector);
jsonLine({
jsonrpc: "2.0",
id: "req-1",
method: "session/prompt",
params: {
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
},
}),
);
const event = projector.ingestLine( const event = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
@@ -170,17 +212,7 @@ describe("PromptStreamProjector", () => {
it("ignores non-prompt response errors", () => { it("ignores non-prompt response errors", () => {
const projector = new PromptStreamProjector(); const projector = new PromptStreamProjector();
projector.ingestLine( beginPrompt(projector, "3");
jsonLine({
jsonrpc: "2.0",
id: 3,
method: "session/prompt",
params: {
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
},
}),
);
const loadError = projector.ingestLine( const loadError = projector.ingestLine(
jsonLine({ jsonLine({
jsonrpc: "2.0", jsonrpc: "2.0",
@@ -200,11 +232,25 @@ describe("PromptStreamProjector", () => {
}, },
}), }),
); );
const trailingReplay = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "should be ignored" },
},
},
}),
);
expect(loadError).toBeNull(); expect(loadError).toBeNull();
expect(promptDone).toEqual({ expect(promptDone).toEqual({
type: "done", type: "done",
stopReason: "end_turn", stopReason: "end_turn",
}); });
expect(trailingReplay).toBeNull();
}); });
}); });

View File

@@ -180,11 +180,6 @@ export class PromptStreamProjector {
return null; return null;
} }
const updateEvent = parseSessionUpdateEvent(parsed);
if (updateEvent) {
return updateEvent;
}
if (asTrimmedString(parsed.method) === "session/prompt") { if (asTrimmedString(parsed.method) === "session/prompt") {
const id = normalizeJsonRpcId(parsed.id); const id = normalizeJsonRpcId(parsed.id);
if (id) { if (id) {
@@ -193,8 +188,13 @@ export class PromptStreamProjector {
return null; return null;
} }
const updateEvent = parseSessionUpdateEvent(parsed);
if (updateEvent) {
return this.promptRequestIds.size > 0 ? updateEvent : null;
}
if (Object.hasOwn(parsed, "error")) { if (Object.hasOwn(parsed, "error")) {
if (!this.shouldHandlePromptResponse(parsed)) { if (!this.consumePromptResponse(parsed)) {
return null; return null;
} }
const error = isRecord(parsed.error) ? parsed.error : null; const error = isRecord(parsed.error) ? parsed.error : null;
@@ -211,7 +211,7 @@ export class PromptStreamProjector {
} }
const stopReason = parsePromptStopReason(parsed); const stopReason = parsePromptStopReason(parsed);
if (!stopReason || !this.shouldHandlePromptResponse(parsed)) { if (!stopReason || !this.consumePromptResponse(parsed)) {
return null; return null;
} }
@@ -221,11 +221,15 @@ export class PromptStreamProjector {
}; };
} }
private shouldHandlePromptResponse(message: Record<string, unknown>): boolean { private consumePromptResponse(message: Record<string, unknown>): boolean {
const id = normalizeJsonRpcId(message.id); const id = normalizeJsonRpcId(message.id);
if (!id) { if (!id) {
return false; return false;
} }
return this.promptRequestIds.has(id); if (!this.promptRequestIds.has(id)) {
return false;
}
this.promptRequestIds.delete(id);
return true;
} }
} }