From c397a02c9ad544c74b7a59630cdb9acc5a58dd59 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 26 Feb 2026 13:43:30 +0100 Subject: [PATCH] fix(queue): harden drain/abort/timeout race handling - reject new lane enqueues once gateway drain begins - always reset lane draining state and isolate onWait callback failures - persist per-session abort cutoff and skip stale queued messages - avoid false 600s agentTurn timeout in isolated cron jobs Fixes #27407 Fixes #27332 Fixes #27427 Co-authored-by: Kevin Shenghui Co-authored-by: zjmy Co-authored-by: suko --- CHANGELOG.md | 1 + src/auto-reply/reply/abort.test.ts | 122 ++++++++++++++++++ src/auto-reply/reply/abort.ts | 84 ++++++++++++ src/auto-reply/reply/commands-session.ts | 24 +++- ...ine-actions.skip-when-config-empty.test.ts | 75 +++++++++++ .../reply/get-reply-inline-actions.ts | 54 +++++++- src/cli/gateway-cli/run-loop.test.ts | 4 + src/cli/gateway-cli/run-loop.ts | 4 + src/config/sessions/types.ts | 8 ++ src/cron/service.issue-regressions.test.ts | 54 +++++++- src/cron/service/timer.ts | 3 +- src/process/command-queue.test.ts | 46 +++++++ src/process/command-queue.ts | 114 ++++++++++------ 13 files changed, 551 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32c6fd8008e..411848441b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Cron/Hooks isolated routing: preserve canonical `agent:*` session keys in isolated runs so already-qualified keys are not double-prefixed (for example `agent:main:main` no longer becomes `agent:main:agent:main:main`). Landed from contributor PR #27333 by @MaheshBhushan. (#27289, #27282) +- Queue/Drain/Cron reliability: harden lane draining with guaranteed `draining` flag reset on synchronous pump failures, reject new queue enqueues during gateway restart drain windows (instead of silently killing accepted tasks), add `/stop` queued-backlog cutoff metadata with stale-message skipping (while avoiding cross-session native-stop cutoff bleed), and raise isolated cron `agentTurn` outer safety timeout to avoid false 10-minute timeout races against longer agent session timeouts. (#27407, #27332, #27427) - Security/Plugin channel HTTP auth: normalize protected `/api/channels` path checks against canonicalized request paths (case + percent-decoding + slash normalization), and fail closed on malformed `%`-encoded channel prefixes so alternate-path variants cannot bypass gateway auth. - Security/Exec approvals forwarding: prefer turn-source channel/account/thread metadata when resolving approval delivery targets so stale session routes do not misroute approval prompts. - Security/Sandbox path alias guard: reject broken symlink targets by resolving through existing ancestors and failing closed on out-of-root targets, preventing workspace-only `apply_patch` writes from escaping sandbox/workspace boundaries via dangling symlinks. This ships in the next npm release (`2026.2.26`). Thanks @tdjackey for reporting. diff --git a/src/auto-reply/reply/abort.test.ts b/src/auto-reply/reply/abort.test.ts index 68bb923fd16..a76eb9b1b2d 100644 --- a/src/auto-reply/reply/abort.test.ts +++ b/src/auto-reply/reply/abort.test.ts @@ -10,8 +10,10 @@ import { isAbortRequestText, isAbortTrigger, resetAbortMemoryForTest, + resolveAbortCutoffFromContext, resolveSessionEntryForKey, setAbortMemory, + shouldSkipMessageByAbortCutoff, tryFastAbortFromMessage, } from "./abort.js"; import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./queue.js"; @@ -80,6 +82,9 @@ describe("abort detection", () => { sessionKey: string; from: string; to: string; + targetSessionKey?: string; + messageSid?: string; + timestamp?: number; }) { return tryFastAbortFromMessage({ ctx: buildTestCtx({ @@ -91,6 +96,9 @@ describe("abort detection", () => { Surface: "telegram", From: params.from, To: params.to, + ...(params.targetSessionKey ? { CommandTargetSessionKey: params.targetSessionKey } : {}), + ...(params.messageSid ? { MessageSid: params.messageSid } : {}), + ...(typeof params.timestamp === "number" ? { Timestamp: params.timestamp } : {}), }), cfg: params.cfg, }); @@ -221,6 +229,62 @@ describe("abort detection", () => { expect(getAbortMemory("session-2104")).toBe(true); }); + it("extracts abort cutoff metadata from context", () => { + expect( + resolveAbortCutoffFromContext( + buildTestCtx({ + MessageSid: "42", + Timestamp: 123, + }), + ), + ).toEqual({ + messageSid: "42", + timestamp: 123, + }); + }); + + it("treats numeric message IDs at or before cutoff as stale", () => { + expect( + shouldSkipMessageByAbortCutoff({ + cutoffMessageSid: "200", + messageSid: "199", + }), + ).toBe(true); + expect( + shouldSkipMessageByAbortCutoff({ + cutoffMessageSid: "200", + messageSid: "200", + }), + ).toBe(true); + expect( + shouldSkipMessageByAbortCutoff({ + cutoffMessageSid: "200", + messageSid: "201", + }), + ).toBe(false); + }); + + it("falls back to timestamp cutoff when message IDs are unavailable", () => { + expect( + shouldSkipMessageByAbortCutoff({ + cutoffTimestamp: 2000, + timestamp: 1999, + }), + ).toBe(true); + expect( + shouldSkipMessageByAbortCutoff({ + cutoffTimestamp: 2000, + timestamp: 2000, + }), + ).toBe(true); + expect( + shouldSkipMessageByAbortCutoff({ + cutoffTimestamp: 2000, + timestamp: 2001, + }), + ).toBe(false); + }); + it("resolves session entry when key exists in store", () => { const store = { "session-1": { sessionId: "abc", updatedAt: 0 }, @@ -291,6 +355,64 @@ describe("abort detection", () => { expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`); }); + it("persists abort cutoff metadata on /stop when command and target session match", async () => { + const sessionKey = "telegram:123"; + const sessionId = "session-123"; + const { storePath, cfg } = await createAbortConfig({ + sessionIdsByKey: { [sessionKey]: sessionId }, + }); + + const result = await runStopCommand({ + cfg, + sessionKey, + from: "telegram:123", + to: "telegram:123", + messageSid: "55", + timestamp: 1234567890000, + }); + + expect(result.handled).toBe(true); + const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record; + const entry = store[sessionKey] as { + abortedLastRun?: boolean; + abortCutoffMessageSid?: string; + abortCutoffTimestamp?: number; + }; + expect(entry.abortedLastRun).toBe(true); + expect(entry.abortCutoffMessageSid).toBe("55"); + expect(entry.abortCutoffTimestamp).toBe(1234567890000); + }); + + it("does not persist cutoff metadata when native /stop targets a different session", async () => { + const slashSessionKey = "telegram:slash:123"; + const targetSessionKey = "agent:main:telegram:group:123"; + const targetSessionId = "session-target"; + const { storePath, cfg } = await createAbortConfig({ + sessionIdsByKey: { [targetSessionKey]: targetSessionId }, + }); + + const result = await runStopCommand({ + cfg, + sessionKey: slashSessionKey, + from: "telegram:123", + to: "telegram:123", + targetSessionKey, + messageSid: "999", + timestamp: 1234567890000, + }); + + expect(result.handled).toBe(true); + const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record; + const entry = store[targetSessionKey] as { + abortedLastRun?: boolean; + abortCutoffMessageSid?: string; + abortCutoffTimestamp?: number; + }; + expect(entry.abortedLastRun).toBe(true); + expect(entry.abortCutoffMessageSid).toBeUndefined(); + expect(entry.abortCutoffTimestamp).toBeUndefined(); + }); + it("fast-abort stops active subagent runs for requester session", async () => { const sessionKey = "telegram:parent"; const childKey = "agent:main:subagent:child-1"; diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index 3c05fa097b1..a59ffaaee27 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -113,6 +113,80 @@ export function getAbortMemory(key: string): boolean | undefined { return ABORT_MEMORY.get(normalized); } +export type AbortCutoff = { + messageSid?: string; + timestamp?: number; +}; + +export function resolveAbortCutoffFromContext(ctx: MsgContext): AbortCutoff | undefined { + const messageSid = + (typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) || + (typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) || + undefined; + const timestamp = + typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined; + if (!messageSid && timestamp === undefined) { + return undefined; + } + return { messageSid, timestamp }; +} + +function toNumericMessageSid(value: string | undefined): bigint | undefined { + const trimmed = value?.trim(); + if (!trimmed || !/^\d+$/.test(trimmed)) { + return undefined; + } + try { + return BigInt(trimmed); + } catch { + return undefined; + } +} + +export function shouldSkipMessageByAbortCutoff(params: { + cutoffMessageSid?: string; + cutoffTimestamp?: number; + messageSid?: string; + timestamp?: number; +}): boolean { + const cutoffSid = params.cutoffMessageSid?.trim(); + const currentSid = params.messageSid?.trim(); + if (cutoffSid && currentSid) { + const cutoffNumeric = toNumericMessageSid(cutoffSid); + const currentNumeric = toNumericMessageSid(currentSid); + if (cutoffNumeric !== undefined && currentNumeric !== undefined) { + return currentNumeric <= cutoffNumeric; + } + if (currentSid === cutoffSid) { + return true; + } + } + if ( + typeof params.cutoffTimestamp === "number" && + Number.isFinite(params.cutoffTimestamp) && + typeof params.timestamp === "number" && + Number.isFinite(params.timestamp) + ) { + return params.timestamp <= params.cutoffTimestamp; + } + return false; +} + +function shouldPersistAbortCutoff(params: { + commandSessionKey?: string; + targetSessionKey?: string; +}): boolean { + const commandSessionKey = params.commandSessionKey?.trim(); + const targetSessionKey = params.targetSessionKey?.trim(); + if (!commandSessionKey || !targetSessionKey) { + return true; + } + // Native targeted /stop can run from a slash/session-control key while the + // actual target session uses different message id/timestamp spaces. + // Persist cutoff only when command source and target are the same session. + return commandSessionKey === targetSessionKey; +} + function pruneAbortMemory(): void { if (ABORT_MEMORY.size <= ABORT_MEMORY_MAX) { return; @@ -302,8 +376,16 @@ export async function tryFastAbortFromMessage(params: { `abort: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, ); } + const abortCutoff = shouldPersistAbortCutoff({ + commandSessionKey: ctx.SessionKey, + targetSessionKey: key ?? targetKey, + }) + ? resolveAbortCutoffFromContext(ctx) + : undefined; if (entry && key) { entry.abortedLastRun = true; + entry.abortCutoffMessageSid = abortCutoff?.messageSid; + entry.abortCutoffTimestamp = abortCutoff?.timestamp; entry.updatedAt = Date.now(); store[key] = entry; await updateSessionStore(storePath, (nextStore) => { @@ -312,6 +394,8 @@ export async function tryFastAbortFromMessage(params: { return; } nextEntry.abortedLastRun = true; + nextEntry.abortCutoffMessageSid = abortCutoff?.messageSid; + nextEntry.abortCutoffTimestamp = abortCutoff?.timestamp; nextEntry.updatedAt = Date.now(); nextStore[key] = nextEntry; }); diff --git a/src/auto-reply/reply/commands-session.ts b/src/auto-reply/reply/commands-session.ts index ea5bd9200f6..5a752fe367c 100644 --- a/src/auto-reply/reply/commands-session.ts +++ b/src/auto-reply/reply/commands-session.ts @@ -19,6 +19,7 @@ import { normalizeUsageDisplay, resolveResponseUsageMode } from "../thinking.js" import { formatAbortReplyText, isAbortTrigger, + resolveAbortCutoffFromContext, resolveSessionEntryForKey, setAbortMemory, stopSubagentsForRequester, @@ -99,6 +100,7 @@ async function applyAbortTarget(params: { sessionStore?: Record; storePath?: string; abortKey?: string; + abortCutoff?: { messageSid?: string; timestamp?: number }; }) { const { abortTarget } = params; if (abortTarget.sessionId) { @@ -106,11 +108,19 @@ async function applyAbortTarget(params: { } if (abortTarget.entry && params.sessionStore && abortTarget.key) { abortTarget.entry.abortedLastRun = true; + abortTarget.entry.abortCutoffMessageSid = params.abortCutoff?.messageSid; + abortTarget.entry.abortCutoffTimestamp = params.abortCutoff?.timestamp; abortTarget.entry.updatedAt = Date.now(); params.sessionStore[abortTarget.key] = abortTarget.entry; if (params.storePath) { await updateSessionStore(params.storePath, (store) => { - store[abortTarget.key] = abortTarget.entry; + store[abortTarget.key] = { + ...abortTarget.entry, + abortedLastRun: true, + abortCutoffMessageSid: params.abortCutoff?.messageSid, + abortCutoffTimestamp: params.abortCutoff?.timestamp, + updatedAt: Date.now(), + }; }); } } else if (params.abortKey) { @@ -503,6 +513,12 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand sessionStore: params.sessionStore, storePath: params.storePath, abortKey: params.command.abortKey, + abortCutoff: + params.sessionKey?.trim() && + abortTarget.key?.trim() && + params.sessionKey.trim() === abortTarget.key.trim() + ? resolveAbortCutoffFromContext(params.ctx) + : undefined, }); // Trigger internal hook for stop command @@ -545,6 +561,12 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman sessionStore: params.sessionStore, storePath: params.storePath, abortKey: params.command.abortKey, + abortCutoff: + params.sessionKey?.trim() && + abortTarget.key?.trim() && + params.sessionKey.trim() === abortTarget.key.trim() + ? resolveAbortCutoffFromContext(params.ctx) + : undefined, }); return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } }; }; diff --git a/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts b/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts index 68f47253aff..51351f05de8 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.skip-when-config-empty.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { SessionEntry } from "../../config/sessions.js"; import type { TemplateContext } from "../templating.js"; import { clearInlineDirectives } from "./get-reply-directives-utils.js"; import { buildTestCtx } from "./test-ctx.js"; @@ -146,4 +147,78 @@ describe("handleInlineActions", () => { }), ); }); + + it("skips stale queued messages that are at or before the /stop cutoff", async () => { + const typing = createTypingController(); + const sessionEntry: SessionEntry = { + sessionId: "session-1", + updatedAt: Date.now(), + abortCutoffMessageSid: "42", + abortedLastRun: true, + }; + const sessionStore = { "s:main": sessionEntry }; + const ctx = buildTestCtx({ + Body: "old queued message", + CommandBody: "old queued message", + MessageSid: "41", + }); + + const result = await handleInlineActions( + createHandleInlineActionsInput({ + ctx, + typing, + cleanedBody: "old queued message", + command: { + rawBodyNormalized: "old queued message", + commandBodyNormalized: "old queued message", + }, + overrides: { + sessionEntry, + sessionStore, + }, + }), + ); + + expect(result).toEqual({ kind: "reply", reply: undefined }); + expect(typing.cleanup).toHaveBeenCalled(); + expect(handleCommandsMock).not.toHaveBeenCalled(); + }); + + it("clears /stop cutoff when a newer message arrives", async () => { + const typing = createTypingController(); + const sessionEntry: SessionEntry = { + sessionId: "session-2", + updatedAt: Date.now(), + abortCutoffMessageSid: "42", + abortedLastRun: true, + }; + const sessionStore = { "s:main": sessionEntry }; + handleCommandsMock.mockResolvedValue({ shouldContinue: false, reply: { text: "ok" } }); + const ctx = buildTestCtx({ + Body: "new message", + CommandBody: "new message", + MessageSid: "43", + }); + + const result = await handleInlineActions( + createHandleInlineActionsInput({ + ctx, + typing, + cleanedBody: "new message", + command: { + rawBodyNormalized: "new message", + commandBodyNormalized: "new message", + }, + overrides: { + sessionEntry, + sessionStore, + }, + }), + ); + + expect(result).toEqual({ kind: "reply", reply: { text: "ok" } }); + expect(sessionStore["s:main"]?.abortCutoffMessageSid).toBeUndefined(); + expect(sessionStore["s:main"]?.abortCutoffTimestamp).toBeUndefined(); + expect(handleCommandsMock).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/auto-reply/reply/get-reply-inline-actions.ts b/src/auto-reply/reply/get-reply-inline-actions.ts index 89066a47d57..2f399845172 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.ts @@ -5,6 +5,7 @@ import { applyOwnerOnlyToolPolicy } from "../../agents/tool-policy.js"; import { getChannelDock } from "../../channels/dock.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; +import { updateSessionStore } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { resolveGatewayMessageChannel } from "../../utils/message-channel.js"; @@ -16,7 +17,7 @@ import { import type { MsgContext, TemplateContext } from "../templating.js"; import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { getAbortMemory } from "./abort.js"; +import { getAbortMemory, isAbortRequestText, shouldSkipMessageByAbortCutoff } from "./abort.js"; import { buildStatusReply, handleCommands } from "./commands.js"; import type { InlineDirectives } from "./directive-handling.js"; import { isDirectiveOnly } from "./directive-handling.js"; @@ -252,6 +253,57 @@ export async function handleInlineActions(params: { await opts.onBlockReply(reply); }; + const clearAbortCutoff = async () => { + if (!sessionEntry || !sessionStore || !sessionKey) { + return; + } + if ( + sessionEntry.abortCutoffMessageSid === undefined && + sessionEntry.abortCutoffTimestamp === undefined + ) { + return; + } + sessionEntry.abortCutoffMessageSid = undefined; + sessionEntry.abortCutoffTimestamp = undefined; + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + if (storePath) { + await updateSessionStore(storePath, (store) => { + const existing = store[sessionKey] ?? sessionEntry; + if (!existing) { + return; + } + existing.abortCutoffMessageSid = undefined; + existing.abortCutoffTimestamp = undefined; + existing.updatedAt = Date.now(); + store[sessionKey] = existing; + }); + } + }; + + const isStopLikeInbound = isAbortRequestText(command.rawBodyNormalized); + if (!isStopLikeInbound && sessionEntry) { + const shouldSkip = shouldSkipMessageByAbortCutoff({ + cutoffMessageSid: sessionEntry.abortCutoffMessageSid, + cutoffTimestamp: sessionEntry.abortCutoffTimestamp, + messageSid: + (typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) || + (typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) || + undefined, + timestamp: typeof ctx.Timestamp === "number" ? ctx.Timestamp : undefined, + }); + if (shouldSkip) { + typing.cleanup(); + return { kind: "reply", reply: undefined }; + } + if ( + sessionEntry.abortCutoffMessageSid !== undefined || + sessionEntry.abortCutoffTimestamp !== undefined + ) { + await clearAbortCutoff(); + } + } + const inlineCommand = allowTextCommands && command.isAuthorizedSender ? extractInlineSimpleCommand(cleanedBody) diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 286b1544d54..be1a6200040 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -9,6 +9,7 @@ const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true); const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false); const markGatewaySigusr1RestartHandled = vi.fn(); const getActiveTaskCount = vi.fn(() => 0); +const markGatewayDraining = vi.fn(); const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true })); const resetAllLanes = vi.fn(); const restartGatewayProcessWithFreshPid = vi.fn< @@ -37,6 +38,7 @@ vi.mock("../../infra/process-respawn.js", () => ({ vi.mock("../../process/command-queue.js", () => ({ getActiveTaskCount: () => getActiveTaskCount(), + markGatewayDraining: () => markGatewayDraining(), waitForActiveTasks: (timeoutMs: number) => waitForActiveTasks(timeoutMs), resetAllLanes: () => resetAllLanes(), })); @@ -213,6 +215,7 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); expect(waitForActiveTasks).toHaveBeenCalledWith(30_000); + expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); expect(closeFirst).toHaveBeenCalledWith({ reason: "gateway restarting", @@ -229,6 +232,7 @@ describe("runGatewayLoop", () => { restartExpectedMs: 1500, }); expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(2); + expect(markGatewayDraining).toHaveBeenCalledTimes(2); expect(resetAllLanes).toHaveBeenCalledTimes(2); expect(acquireGatewayLock).toHaveBeenCalledTimes(3); }); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 0e43faed309..361817c8cb1 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -9,6 +9,7 @@ import { import { createSubsystemLogger } from "../../logging/subsystem.js"; import { getActiveTaskCount, + markGatewayDraining, resetAllLanes, waitForActiveTasks, } from "../../process/command-queue.js"; @@ -111,6 +112,9 @@ export async function runGatewayLoop(params: { // On restart, wait for in-flight agent turns to finish before // tearing down the server so buffered messages are delivered. if (isRestart) { + // Reject new enqueues immediately during the drain window so + // sessions get an explicit restart error instead of silent task loss. + markGatewayDraining(); const activeTasks = getActiveTaskCount(); if (activeTasks > 0) { gatewayLog.info( diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index dee9a2c76df..c62ab8ff966 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -84,6 +84,14 @@ export type SessionEntry = { spawnDepth?: number; systemSent?: boolean; abortedLastRun?: boolean; + /** + * Session-level stop cutoff captured when /stop is received. + * Messages at/before this boundary are skipped to avoid replaying + * queued pre-stop backlog. + */ + abortCutoffMessageSid?: string; + /** Epoch ms cutoff paired with abortCutoffMessageSid when available. */ + abortCutoffTimestamp?: number; chatType?: SessionChatType; thinkingLevel?: string; verboseLevel?: string; diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 7515b110250..469ff498019 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -9,7 +9,7 @@ import { CronService } from "./service.js"; import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js"; import { computeJobNextRunAtMs } from "./service/jobs.js"; import { createCronServiceState, type CronEvent } from "./service/state.js"; -import { executeJobCore, onTimer, runMissedJobs } from "./service/timer.js"; +import { DEFAULT_JOB_TIMEOUT_MS, executeJobCore, onTimer, runMissedJobs } from "./service/timer.js"; import type { CronJob, CronJobState } from "./types.js"; const noopLogger = { @@ -838,6 +838,58 @@ describe("Cron issue regressions", () => { expect(job?.state.lastStatus).toBe("ok"); }); + it("does not time out agentTurn jobs at the default 10-minute safety window", async () => { + const store = await makeStorePath(); + const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); + + const cronJob = createIsolatedRegressionJob({ + id: "agentturn-default-safety-window", + name: "agentturn default safety window", + scheduledAt, + schedule: { kind: "at", at: new Date(scheduledAt).toISOString() }, + payload: { kind: "agentTurn", message: "work" }, + state: { nextRunAtMs: scheduledAt }, + }); + await writeCronJobs(store.storePath, [cronJob]); + + let now = scheduledAt; + const deferredRun = createDeferred<{ status: "ok"; summary: string }>(); + const runIsolatedAgentJob = vi.fn(async ({ abortSignal }: { abortSignal?: AbortSignal }) => { + const result = await deferredRun.promise; + if (abortSignal?.aborted) { + return { status: "error" as const, error: String(abortSignal.reason) }; + } + now += 5; + return result; + }); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + }); + + const timerPromise = onTimer(state); + let settled = false; + void timerPromise.finally(() => { + settled = true; + }); + + await vi.advanceTimersByTimeAsync(DEFAULT_JOB_TIMEOUT_MS + 1_000); + await Promise.resolve(); + expect(settled).toBe(false); + + deferredRun.resolve({ status: "ok", summary: "done" }); + await timerPromise; + + const job = state.store?.jobs.find((entry) => entry.id === "agentturn-default-safety-window"); + expect(job?.state.lastStatus).toBe("ok"); + expect(job?.state.lastError).toBeUndefined(); + }); + it("aborts isolated runs when cron timeout fires", async () => { vi.useRealTimers(); const store = await makeStorePath(); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index acb3f3037d3..6058777cdfd 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -36,6 +36,7 @@ const MIN_REFIRE_GAP_MS = 2_000; * from wedging the entire cron lane. */ export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes +const AGENT_TURN_SAFETY_TIMEOUT_MS = 60 * 60_000; // 60 minutes type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { @@ -52,7 +53,7 @@ function resolveCronJobTimeoutMs(job: CronJob): number | undefined { ? Math.floor(job.payload.timeoutSeconds * 1_000) : undefined; if (configuredTimeoutMs === undefined) { - return DEFAULT_JOB_TIMEOUT_MS; + return job.payload.kind === "agentTurn" ? AGENT_TURN_SAFETY_TIMEOUT_MS : DEFAULT_JOB_TIMEOUT_MS; } return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs; } diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 6c0a1f57f91..16766eabcd3 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -21,8 +21,10 @@ import { CommandLaneClearedError, enqueueCommand, enqueueCommandInLane, + GatewayDrainingError, getActiveTaskCount, getQueueSize, + markGatewayDraining, resetAllLanes, setCommandLaneConcurrency, waitForActiveTasks, @@ -52,6 +54,7 @@ function enqueueBlockedMainTask( describe("command queue", () => { beforeEach(() => { + resetAllLanes(); diagnosticMocks.logLaneEnqueue.mockClear(); diagnosticMocks.logLaneDequeue.mockClear(); diagnosticMocks.diag.debug.mockClear(); @@ -288,4 +291,47 @@ describe("command queue", () => { release(); await expect(first).resolves.toBe("first"); }); + + it("keeps draining functional after synchronous onWait failure", async () => { + const lane = `drain-sync-throw-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 1); + + const deferred = createDeferred(); + const first = enqueueCommandInLane(lane, async () => { + await deferred.promise; + return "first"; + }); + const second = enqueueCommandInLane(lane, async () => "second", { + warnAfterMs: 0, + onWait: () => { + throw new Error("onWait exploded"); + }, + }); + await Promise.resolve(); + expect(getQueueSize(lane)).toBeGreaterThanOrEqual(2); + + deferred.resolve(); + await expect(first).resolves.toBe("first"); + await expect(second).resolves.toBe("second"); + }); + + it("rejects new enqueues with GatewayDrainingError after markGatewayDraining", async () => { + markGatewayDraining(); + await expect(enqueueCommand(async () => "blocked")).rejects.toBeInstanceOf( + GatewayDrainingError, + ); + }); + + it("does not affect already-active tasks after markGatewayDraining", async () => { + const { task, release } = enqueueBlockedMainTask(async () => "ok"); + markGatewayDraining(); + release(); + await expect(task).resolves.toBe("ok"); + }); + + it("resetAllLanes clears gateway draining flag and re-allows enqueue", async () => { + markGatewayDraining(); + resetAllLanes(); + await expect(enqueueCommand(async () => "ok")).resolves.toBe("ok"); + }); }); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 9ee4c741719..7b4a386bdad 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -12,6 +12,20 @@ export class CommandLaneClearedError extends Error { } } +/** + * Dedicated error type thrown when a new command is rejected because the + * gateway is currently draining for restart. + */ +export class GatewayDrainingError extends Error { + constructor() { + super("Gateway is draining for restart; new tasks are not accepted"); + this.name = "GatewayDrainingError"; + } +} + +// Set while gateway is draining for restart; new enqueues are rejected. +let gatewayDraining = false; + // Minimal in-process queue to serialize command executions. // Default lane ("main") preserves the existing behavior. Additional lanes allow // low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for @@ -66,57 +80,77 @@ function completeTask(state: LaneState, taskId: number, taskGeneration: number): function drainLane(lane: string) { const state = getLaneState(lane); if (state.draining) { + if (state.activeTaskIds.size === 0 && state.queue.length > 0) { + diag.warn( + `drainLane blocked: lane=${lane} draining=true active=0 queue=${state.queue.length}`, + ); + } return; } state.draining = true; const pump = () => { - while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) { - const entry = state.queue.shift() as QueueEntry; - const waitedMs = Date.now() - entry.enqueuedAt; - if (waitedMs >= entry.warnAfterMs) { - entry.onWait?.(waitedMs, state.queue.length); - diag.warn( - `lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`, - ); - } - logLaneDequeue(lane, waitedMs, state.queue.length); - const taskId = nextTaskId++; - const taskGeneration = state.generation; - state.activeTaskIds.add(taskId); - void (async () => { - const startTime = Date.now(); - try { - const result = await entry.task(); - const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); - if (completedCurrentGeneration) { - diag.debug( - `lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`, - ); - pump(); + try { + while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) { + const entry = state.queue.shift() as QueueEntry; + const waitedMs = Date.now() - entry.enqueuedAt; + if (waitedMs >= entry.warnAfterMs) { + try { + entry.onWait?.(waitedMs, state.queue.length); + } catch (err) { + diag.error(`lane onWait callback failed: lane=${lane} error="${String(err)}"`); } - entry.resolve(result); - } catch (err) { - const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); - const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-"); - if (!isProbeLane) { - diag.error( - `lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`, - ); - } - if (completedCurrentGeneration) { - pump(); - } - entry.reject(err); + diag.warn( + `lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`, + ); } - })(); + logLaneDequeue(lane, waitedMs, state.queue.length); + const taskId = nextTaskId++; + const taskGeneration = state.generation; + state.activeTaskIds.add(taskId); + void (async () => { + const startTime = Date.now(); + try { + const result = await entry.task(); + const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); + if (completedCurrentGeneration) { + diag.debug( + `lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`, + ); + pump(); + } + entry.resolve(result); + } catch (err) { + const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); + const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-"); + if (!isProbeLane) { + diag.error( + `lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`, + ); + } + if (completedCurrentGeneration) { + pump(); + } + entry.reject(err); + } + })(); + } + } finally { + state.draining = false; } - state.draining = false; }; pump(); } +/** + * Mark gateway as draining for restart so new enqueues fail fast with + * `GatewayDrainingError` instead of being silently killed on shutdown. + */ +export function markGatewayDraining(): void { + gatewayDraining = true; +} + export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { const cleaned = lane.trim() || CommandLane.Main; const state = getLaneState(cleaned); @@ -132,6 +166,9 @@ export function enqueueCommandInLane( onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { + if (gatewayDraining) { + return Promise.reject(new GatewayDrainingError()); + } const cleaned = lane.trim() || CommandLane.Main; const warnAfterMs = opts?.warnAfterMs ?? 2_000; const state = getLaneState(cleaned); @@ -205,6 +242,7 @@ export function clearCommandLane(lane: string = CommandLane.Main) { * `enqueueCommandInLane()` call (which may never come). */ export function resetAllLanes(): void { + gatewayDraining = false; const lanesToDrain: string[] = []; for (const state of lanes.values()) { state.generation += 1;