fix: drop active heartbeat followups from queue (#25610, thanks @mcaxtr)

Co-authored-by: Marcus Castro <mcaxtr@gmail.com>
This commit is contained in:
Peter Steinberger
2026-02-25 01:58:13 +00:00
parent 6fa7226a67
commit c736778b3f
3 changed files with 49 additions and 4 deletions

View File

@@ -8,7 +8,7 @@ import type { TypingMode } from "../../config/types.js";
import { withStateDirEnv } from "../../test-helpers/state-dir-env.js";
import type { TemplateContext } from "../templating.js";
import type { GetReplyOptions } from "../types.js";
import type { FollowupRun, QueueSettings } from "./queue.js";
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
type AgentRunParams = {
@@ -86,6 +86,7 @@ beforeAll(async () => {
beforeEach(() => {
state.runEmbeddedPiAgentMock.mockClear();
state.runCliAgentMock.mockClear();
vi.mocked(enqueueFollowupRun).mockClear();
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
});
@@ -98,6 +99,9 @@ function createMinimalRun(params?: {
storePath?: string;
typingMode?: TypingMode;
blockStreamingEnabled?: boolean;
isActive?: boolean;
shouldFollowup?: boolean;
resolvedQueueMode?: string;
runOverrides?: Partial<FollowupRun["run"]>;
}) {
const typing = createMockTypingController();
@@ -106,7 +110,9 @@ function createMinimalRun(params?: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext;
const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings;
const resolvedQueue = {
mode: params?.resolvedQueueMode ?? "interrupt",
} as unknown as QueueSettings;
const sessionKey = params?.sessionKey ?? "main";
const followupRun = {
prompt: "hello",
@@ -147,8 +153,8 @@ function createMinimalRun(params?: {
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
shouldFollowup: params?.shouldFollowup ?? false,
isActive: params?.isActive ?? false,
isStreaming: false,
opts,
typing,
@@ -274,6 +280,39 @@ async function runReplyAgentWithBase(params: {
});
}
describe("runReplyAgent heartbeat followup guard", () => {
it("drops heartbeat runs when another run is active", async () => {
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: true },
isActive: true,
shouldFollowup: true,
resolvedQueueMode: "collect",
});
const result = await run();
expect(result).toBeUndefined();
expect(vi.mocked(enqueueFollowupRun)).not.toHaveBeenCalled();
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
expect(typing.cleanup).toHaveBeenCalledTimes(1);
});
it("still enqueues non-heartbeat runs when another run is active", async () => {
const { run } = createMinimalRun({
opts: { isHeartbeat: false },
isActive: true,
shouldFollowup: true,
resolvedQueueMode: "collect",
});
const result = await run();
expect(result).toBeUndefined();
expect(vi.mocked(enqueueFollowupRun)).toHaveBeenCalledTimes(1);
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
});
});
describe("runReplyAgent typing (heartbeat)", () => {
async function withTempStateDir<T>(fn: (stateDir: string) => Promise<T>): Promise<T> {
return await withStateDirEnv(

View File

@@ -235,6 +235,11 @@ export async function runReplyAgent(params: {
}
}
if (isHeartbeat && isActive) {
typing.cleanup();
return undefined;
}
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
await touchActiveSessionEntry();