mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 04:16:25 +00:00
fix(queue): harden drain/abort/timeout race handling
- reject new lane enqueues once gateway drain begins - always reset lane draining state and isolate onWait callback failures - persist per-session abort cutoff and skip stale queued messages - avoid false 600s agentTurn timeout in isolated cron jobs Fixes #27407 Fixes #27332 Fixes #27427 Co-authored-by: Kevin Shenghui <shenghuikevin@github.com> Co-authored-by: zjmy <zhangjunmengyang@gmail.com> Co-authored-by: suko <miha.sukic@gmail.com>
This commit is contained in:
@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Cron/Hooks isolated routing: preserve canonical `agent:*` session keys in isolated runs so already-qualified keys are not double-prefixed (for example `agent:main:main` no longer becomes `agent:main:agent:main:main`). Landed from contributor PR #27333 by @MaheshBhushan. (#27289, #27282)
|
||||
- Queue/Drain/Cron reliability: harden lane draining with guaranteed `draining` flag reset on synchronous pump failures, reject new queue enqueues during gateway restart drain windows (instead of silently killing accepted tasks), add `/stop` queued-backlog cutoff metadata with stale-message skipping (while avoiding cross-session native-stop cutoff bleed), and raise isolated cron `agentTurn` outer safety timeout to avoid false 10-minute timeout races against longer agent session timeouts. (#27407, #27332, #27427)
|
||||
- Security/Plugin channel HTTP auth: normalize protected `/api/channels` path checks against canonicalized request paths (case + percent-decoding + slash normalization), and fail closed on malformed `%`-encoded channel prefixes so alternate-path variants cannot bypass gateway auth.
|
||||
- Security/Exec approvals forwarding: prefer turn-source channel/account/thread metadata when resolving approval delivery targets so stale session routes do not misroute approval prompts.
|
||||
- Security/Sandbox path alias guard: reject broken symlink targets by resolving through existing ancestors and failing closed on out-of-root targets, preventing workspace-only `apply_patch` writes from escaping sandbox/workspace boundaries via dangling symlinks. This ships in the next npm release (`2026.2.26`). Thanks @tdjackey for reporting.
|
||||
|
||||
@@ -10,8 +10,10 @@ import {
|
||||
isAbortRequestText,
|
||||
isAbortTrigger,
|
||||
resetAbortMemoryForTest,
|
||||
resolveAbortCutoffFromContext,
|
||||
resolveSessionEntryForKey,
|
||||
setAbortMemory,
|
||||
shouldSkipMessageByAbortCutoff,
|
||||
tryFastAbortFromMessage,
|
||||
} from "./abort.js";
|
||||
import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./queue.js";
|
||||
@@ -80,6 +82,9 @@ describe("abort detection", () => {
|
||||
sessionKey: string;
|
||||
from: string;
|
||||
to: string;
|
||||
targetSessionKey?: string;
|
||||
messageSid?: string;
|
||||
timestamp?: number;
|
||||
}) {
|
||||
return tryFastAbortFromMessage({
|
||||
ctx: buildTestCtx({
|
||||
@@ -91,6 +96,9 @@ describe("abort detection", () => {
|
||||
Surface: "telegram",
|
||||
From: params.from,
|
||||
To: params.to,
|
||||
...(params.targetSessionKey ? { CommandTargetSessionKey: params.targetSessionKey } : {}),
|
||||
...(params.messageSid ? { MessageSid: params.messageSid } : {}),
|
||||
...(typeof params.timestamp === "number" ? { Timestamp: params.timestamp } : {}),
|
||||
}),
|
||||
cfg: params.cfg,
|
||||
});
|
||||
@@ -221,6 +229,62 @@ describe("abort detection", () => {
|
||||
expect(getAbortMemory("session-2104")).toBe(true);
|
||||
});
|
||||
|
||||
it("extracts abort cutoff metadata from context", () => {
|
||||
expect(
|
||||
resolveAbortCutoffFromContext(
|
||||
buildTestCtx({
|
||||
MessageSid: "42",
|
||||
Timestamp: 123,
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
messageSid: "42",
|
||||
timestamp: 123,
|
||||
});
|
||||
});
|
||||
|
||||
it("treats numeric message IDs at or before cutoff as stale", () => {
|
||||
expect(
|
||||
shouldSkipMessageByAbortCutoff({
|
||||
cutoffMessageSid: "200",
|
||||
messageSid: "199",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSkipMessageByAbortCutoff({
|
||||
cutoffMessageSid: "200",
|
||||
messageSid: "200",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSkipMessageByAbortCutoff({
|
||||
cutoffMessageSid: "200",
|
||||
messageSid: "201",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("falls back to timestamp cutoff when message IDs are unavailable", () => {
|
||||
expect(
|
||||
shouldSkipMessageByAbortCutoff({
|
||||
cutoffTimestamp: 2000,
|
||||
timestamp: 1999,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSkipMessageByAbortCutoff({
|
||||
cutoffTimestamp: 2000,
|
||||
timestamp: 2000,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSkipMessageByAbortCutoff({
|
||||
cutoffTimestamp: 2000,
|
||||
timestamp: 2001,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("resolves session entry when key exists in store", () => {
|
||||
const store = {
|
||||
"session-1": { sessionId: "abc", updatedAt: 0 },
|
||||
@@ -291,6 +355,64 @@ describe("abort detection", () => {
|
||||
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`);
|
||||
});
|
||||
|
||||
it("persists abort cutoff metadata on /stop when command and target session match", async () => {
|
||||
const sessionKey = "telegram:123";
|
||||
const sessionId = "session-123";
|
||||
const { storePath, cfg } = await createAbortConfig({
|
||||
sessionIdsByKey: { [sessionKey]: sessionId },
|
||||
});
|
||||
|
||||
const result = await runStopCommand({
|
||||
cfg,
|
||||
sessionKey,
|
||||
from: "telegram:123",
|
||||
to: "telegram:123",
|
||||
messageSid: "55",
|
||||
timestamp: 1234567890000,
|
||||
});
|
||||
|
||||
expect(result.handled).toBe(true);
|
||||
const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
|
||||
const entry = store[sessionKey] as {
|
||||
abortedLastRun?: boolean;
|
||||
abortCutoffMessageSid?: string;
|
||||
abortCutoffTimestamp?: number;
|
||||
};
|
||||
expect(entry.abortedLastRun).toBe(true);
|
||||
expect(entry.abortCutoffMessageSid).toBe("55");
|
||||
expect(entry.abortCutoffTimestamp).toBe(1234567890000);
|
||||
});
|
||||
|
||||
it("does not persist cutoff metadata when native /stop targets a different session", async () => {
|
||||
const slashSessionKey = "telegram:slash:123";
|
||||
const targetSessionKey = "agent:main:telegram:group:123";
|
||||
const targetSessionId = "session-target";
|
||||
const { storePath, cfg } = await createAbortConfig({
|
||||
sessionIdsByKey: { [targetSessionKey]: targetSessionId },
|
||||
});
|
||||
|
||||
const result = await runStopCommand({
|
||||
cfg,
|
||||
sessionKey: slashSessionKey,
|
||||
from: "telegram:123",
|
||||
to: "telegram:123",
|
||||
targetSessionKey,
|
||||
messageSid: "999",
|
||||
timestamp: 1234567890000,
|
||||
});
|
||||
|
||||
expect(result.handled).toBe(true);
|
||||
const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
|
||||
const entry = store[targetSessionKey] as {
|
||||
abortedLastRun?: boolean;
|
||||
abortCutoffMessageSid?: string;
|
||||
abortCutoffTimestamp?: number;
|
||||
};
|
||||
expect(entry.abortedLastRun).toBe(true);
|
||||
expect(entry.abortCutoffMessageSid).toBeUndefined();
|
||||
expect(entry.abortCutoffTimestamp).toBeUndefined();
|
||||
});
|
||||
|
||||
it("fast-abort stops active subagent runs for requester session", async () => {
|
||||
const sessionKey = "telegram:parent";
|
||||
const childKey = "agent:main:subagent:child-1";
|
||||
|
||||
@@ -113,6 +113,80 @@ export function getAbortMemory(key: string): boolean | undefined {
|
||||
return ABORT_MEMORY.get(normalized);
|
||||
}
|
||||
|
||||
export type AbortCutoff = {
|
||||
messageSid?: string;
|
||||
timestamp?: number;
|
||||
};
|
||||
|
||||
export function resolveAbortCutoffFromContext(ctx: MsgContext): AbortCutoff | undefined {
|
||||
const messageSid =
|
||||
(typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) ||
|
||||
(typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) ||
|
||||
undefined;
|
||||
const timestamp =
|
||||
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
|
||||
if (!messageSid && timestamp === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return { messageSid, timestamp };
|
||||
}
|
||||
|
||||
function toNumericMessageSid(value: string | undefined): bigint | undefined {
|
||||
const trimmed = value?.trim();
|
||||
if (!trimmed || !/^\d+$/.test(trimmed)) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
return BigInt(trimmed);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export function shouldSkipMessageByAbortCutoff(params: {
|
||||
cutoffMessageSid?: string;
|
||||
cutoffTimestamp?: number;
|
||||
messageSid?: string;
|
||||
timestamp?: number;
|
||||
}): boolean {
|
||||
const cutoffSid = params.cutoffMessageSid?.trim();
|
||||
const currentSid = params.messageSid?.trim();
|
||||
if (cutoffSid && currentSid) {
|
||||
const cutoffNumeric = toNumericMessageSid(cutoffSid);
|
||||
const currentNumeric = toNumericMessageSid(currentSid);
|
||||
if (cutoffNumeric !== undefined && currentNumeric !== undefined) {
|
||||
return currentNumeric <= cutoffNumeric;
|
||||
}
|
||||
if (currentSid === cutoffSid) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (
|
||||
typeof params.cutoffTimestamp === "number" &&
|
||||
Number.isFinite(params.cutoffTimestamp) &&
|
||||
typeof params.timestamp === "number" &&
|
||||
Number.isFinite(params.timestamp)
|
||||
) {
|
||||
return params.timestamp <= params.cutoffTimestamp;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function shouldPersistAbortCutoff(params: {
|
||||
commandSessionKey?: string;
|
||||
targetSessionKey?: string;
|
||||
}): boolean {
|
||||
const commandSessionKey = params.commandSessionKey?.trim();
|
||||
const targetSessionKey = params.targetSessionKey?.trim();
|
||||
if (!commandSessionKey || !targetSessionKey) {
|
||||
return true;
|
||||
}
|
||||
// Native targeted /stop can run from a slash/session-control key while the
|
||||
// actual target session uses different message id/timestamp spaces.
|
||||
// Persist cutoff only when command source and target are the same session.
|
||||
return commandSessionKey === targetSessionKey;
|
||||
}
|
||||
|
||||
function pruneAbortMemory(): void {
|
||||
if (ABORT_MEMORY.size <= ABORT_MEMORY_MAX) {
|
||||
return;
|
||||
@@ -302,8 +376,16 @@ export async function tryFastAbortFromMessage(params: {
|
||||
`abort: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
|
||||
);
|
||||
}
|
||||
const abortCutoff = shouldPersistAbortCutoff({
|
||||
commandSessionKey: ctx.SessionKey,
|
||||
targetSessionKey: key ?? targetKey,
|
||||
})
|
||||
? resolveAbortCutoffFromContext(ctx)
|
||||
: undefined;
|
||||
if (entry && key) {
|
||||
entry.abortedLastRun = true;
|
||||
entry.abortCutoffMessageSid = abortCutoff?.messageSid;
|
||||
entry.abortCutoffTimestamp = abortCutoff?.timestamp;
|
||||
entry.updatedAt = Date.now();
|
||||
store[key] = entry;
|
||||
await updateSessionStore(storePath, (nextStore) => {
|
||||
@@ -312,6 +394,8 @@ export async function tryFastAbortFromMessage(params: {
|
||||
return;
|
||||
}
|
||||
nextEntry.abortedLastRun = true;
|
||||
nextEntry.abortCutoffMessageSid = abortCutoff?.messageSid;
|
||||
nextEntry.abortCutoffTimestamp = abortCutoff?.timestamp;
|
||||
nextEntry.updatedAt = Date.now();
|
||||
nextStore[key] = nextEntry;
|
||||
});
|
||||
|
||||
@@ -19,6 +19,7 @@ import { normalizeUsageDisplay, resolveResponseUsageMode } from "../thinking.js"
|
||||
import {
|
||||
formatAbortReplyText,
|
||||
isAbortTrigger,
|
||||
resolveAbortCutoffFromContext,
|
||||
resolveSessionEntryForKey,
|
||||
setAbortMemory,
|
||||
stopSubagentsForRequester,
|
||||
@@ -99,6 +100,7 @@ async function applyAbortTarget(params: {
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
storePath?: string;
|
||||
abortKey?: string;
|
||||
abortCutoff?: { messageSid?: string; timestamp?: number };
|
||||
}) {
|
||||
const { abortTarget } = params;
|
||||
if (abortTarget.sessionId) {
|
||||
@@ -106,11 +108,19 @@ async function applyAbortTarget(params: {
|
||||
}
|
||||
if (abortTarget.entry && params.sessionStore && abortTarget.key) {
|
||||
abortTarget.entry.abortedLastRun = true;
|
||||
abortTarget.entry.abortCutoffMessageSid = params.abortCutoff?.messageSid;
|
||||
abortTarget.entry.abortCutoffTimestamp = params.abortCutoff?.timestamp;
|
||||
abortTarget.entry.updatedAt = Date.now();
|
||||
params.sessionStore[abortTarget.key] = abortTarget.entry;
|
||||
if (params.storePath) {
|
||||
await updateSessionStore(params.storePath, (store) => {
|
||||
store[abortTarget.key] = abortTarget.entry;
|
||||
store[abortTarget.key] = {
|
||||
...abortTarget.entry,
|
||||
abortedLastRun: true,
|
||||
abortCutoffMessageSid: params.abortCutoff?.messageSid,
|
||||
abortCutoffTimestamp: params.abortCutoff?.timestamp,
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
});
|
||||
}
|
||||
} else if (params.abortKey) {
|
||||
@@ -503,6 +513,12 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand
|
||||
sessionStore: params.sessionStore,
|
||||
storePath: params.storePath,
|
||||
abortKey: params.command.abortKey,
|
||||
abortCutoff:
|
||||
params.sessionKey?.trim() &&
|
||||
abortTarget.key?.trim() &&
|
||||
params.sessionKey.trim() === abortTarget.key.trim()
|
||||
? resolveAbortCutoffFromContext(params.ctx)
|
||||
: undefined,
|
||||
});
|
||||
|
||||
// Trigger internal hook for stop command
|
||||
@@ -545,6 +561,12 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman
|
||||
sessionStore: params.sessionStore,
|
||||
storePath: params.storePath,
|
||||
abortKey: params.command.abortKey,
|
||||
abortCutoff:
|
||||
params.sessionKey?.trim() &&
|
||||
abortTarget.key?.trim() &&
|
||||
params.sessionKey.trim() === abortTarget.key.trim()
|
||||
? resolveAbortCutoffFromContext(params.ctx)
|
||||
: undefined,
|
||||
});
|
||||
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
|
||||
};
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import type { TemplateContext } from "../templating.js";
|
||||
import { clearInlineDirectives } from "./get-reply-directives-utils.js";
|
||||
import { buildTestCtx } from "./test-ctx.js";
|
||||
@@ -146,4 +147,78 @@ describe("handleInlineActions", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("skips stale queued messages that are at or before the /stop cutoff", async () => {
|
||||
const typing = createTypingController();
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session-1",
|
||||
updatedAt: Date.now(),
|
||||
abortCutoffMessageSid: "42",
|
||||
abortedLastRun: true,
|
||||
};
|
||||
const sessionStore = { "s:main": sessionEntry };
|
||||
const ctx = buildTestCtx({
|
||||
Body: "old queued message",
|
||||
CommandBody: "old queued message",
|
||||
MessageSid: "41",
|
||||
});
|
||||
|
||||
const result = await handleInlineActions(
|
||||
createHandleInlineActionsInput({
|
||||
ctx,
|
||||
typing,
|
||||
cleanedBody: "old queued message",
|
||||
command: {
|
||||
rawBodyNormalized: "old queued message",
|
||||
commandBodyNormalized: "old queued message",
|
||||
},
|
||||
overrides: {
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(result).toEqual({ kind: "reply", reply: undefined });
|
||||
expect(typing.cleanup).toHaveBeenCalled();
|
||||
expect(handleCommandsMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("clears /stop cutoff when a newer message arrives", async () => {
|
||||
const typing = createTypingController();
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session-2",
|
||||
updatedAt: Date.now(),
|
||||
abortCutoffMessageSid: "42",
|
||||
abortedLastRun: true,
|
||||
};
|
||||
const sessionStore = { "s:main": sessionEntry };
|
||||
handleCommandsMock.mockResolvedValue({ shouldContinue: false, reply: { text: "ok" } });
|
||||
const ctx = buildTestCtx({
|
||||
Body: "new message",
|
||||
CommandBody: "new message",
|
||||
MessageSid: "43",
|
||||
});
|
||||
|
||||
const result = await handleInlineActions(
|
||||
createHandleInlineActionsInput({
|
||||
ctx,
|
||||
typing,
|
||||
cleanedBody: "new message",
|
||||
command: {
|
||||
rawBodyNormalized: "new message",
|
||||
commandBodyNormalized: "new message",
|
||||
},
|
||||
overrides: {
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(result).toEqual({ kind: "reply", reply: { text: "ok" } });
|
||||
expect(sessionStore["s:main"]?.abortCutoffMessageSid).toBeUndefined();
|
||||
expect(sessionStore["s:main"]?.abortCutoffTimestamp).toBeUndefined();
|
||||
expect(handleCommandsMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,6 +5,7 @@ import { applyOwnerOnlyToolPolicy } from "../../agents/tool-policy.js";
|
||||
import { getChannelDock } from "../../channels/dock.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import { updateSessionStore } from "../../config/sessions.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { generateSecureToken } from "../../infra/secure-random.js";
|
||||
import { resolveGatewayMessageChannel } from "../../utils/message-channel.js";
|
||||
@@ -16,7 +17,7 @@ import {
|
||||
import type { MsgContext, TemplateContext } from "../templating.js";
|
||||
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { getAbortMemory } from "./abort.js";
|
||||
import { getAbortMemory, isAbortRequestText, shouldSkipMessageByAbortCutoff } from "./abort.js";
|
||||
import { buildStatusReply, handleCommands } from "./commands.js";
|
||||
import type { InlineDirectives } from "./directive-handling.js";
|
||||
import { isDirectiveOnly } from "./directive-handling.js";
|
||||
@@ -252,6 +253,57 @@ export async function handleInlineActions(params: {
|
||||
await opts.onBlockReply(reply);
|
||||
};
|
||||
|
||||
const clearAbortCutoff = async () => {
|
||||
if (!sessionEntry || !sessionStore || !sessionKey) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
sessionEntry.abortCutoffMessageSid === undefined &&
|
||||
sessionEntry.abortCutoffTimestamp === undefined
|
||||
) {
|
||||
return;
|
||||
}
|
||||
sessionEntry.abortCutoffMessageSid = undefined;
|
||||
sessionEntry.abortCutoffTimestamp = undefined;
|
||||
sessionEntry.updatedAt = Date.now();
|
||||
sessionStore[sessionKey] = sessionEntry;
|
||||
if (storePath) {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const existing = store[sessionKey] ?? sessionEntry;
|
||||
if (!existing) {
|
||||
return;
|
||||
}
|
||||
existing.abortCutoffMessageSid = undefined;
|
||||
existing.abortCutoffTimestamp = undefined;
|
||||
existing.updatedAt = Date.now();
|
||||
store[sessionKey] = existing;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const isStopLikeInbound = isAbortRequestText(command.rawBodyNormalized);
|
||||
if (!isStopLikeInbound && sessionEntry) {
|
||||
const shouldSkip = shouldSkipMessageByAbortCutoff({
|
||||
cutoffMessageSid: sessionEntry.abortCutoffMessageSid,
|
||||
cutoffTimestamp: sessionEntry.abortCutoffTimestamp,
|
||||
messageSid:
|
||||
(typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) ||
|
||||
(typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) ||
|
||||
undefined,
|
||||
timestamp: typeof ctx.Timestamp === "number" ? ctx.Timestamp : undefined,
|
||||
});
|
||||
if (shouldSkip) {
|
||||
typing.cleanup();
|
||||
return { kind: "reply", reply: undefined };
|
||||
}
|
||||
if (
|
||||
sessionEntry.abortCutoffMessageSid !== undefined ||
|
||||
sessionEntry.abortCutoffTimestamp !== undefined
|
||||
) {
|
||||
await clearAbortCutoff();
|
||||
}
|
||||
}
|
||||
|
||||
const inlineCommand =
|
||||
allowTextCommands && command.isAuthorizedSender
|
||||
? extractInlineSimpleCommand(cleanedBody)
|
||||
|
||||
@@ -9,6 +9,7 @@ const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
|
||||
const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
|
||||
const markGatewaySigusr1RestartHandled = vi.fn();
|
||||
const getActiveTaskCount = vi.fn(() => 0);
|
||||
const markGatewayDraining = vi.fn();
|
||||
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
|
||||
const resetAllLanes = vi.fn();
|
||||
const restartGatewayProcessWithFreshPid = vi.fn<
|
||||
@@ -37,6 +38,7 @@ vi.mock("../../infra/process-respawn.js", () => ({
|
||||
|
||||
vi.mock("../../process/command-queue.js", () => ({
|
||||
getActiveTaskCount: () => getActiveTaskCount(),
|
||||
markGatewayDraining: () => markGatewayDraining(),
|
||||
waitForActiveTasks: (timeoutMs: number) => waitForActiveTasks(timeoutMs),
|
||||
resetAllLanes: () => resetAllLanes(),
|
||||
}));
|
||||
@@ -213,6 +215,7 @@ describe("runGatewayLoop", () => {
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(waitForActiveTasks).toHaveBeenCalledWith(30_000);
|
||||
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
|
||||
expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG);
|
||||
expect(closeFirst).toHaveBeenCalledWith({
|
||||
reason: "gateway restarting",
|
||||
@@ -229,6 +232,7 @@ describe("runGatewayLoop", () => {
|
||||
restartExpectedMs: 1500,
|
||||
});
|
||||
expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(2);
|
||||
expect(markGatewayDraining).toHaveBeenCalledTimes(2);
|
||||
expect(resetAllLanes).toHaveBeenCalledTimes(2);
|
||||
expect(acquireGatewayLock).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import {
|
||||
getActiveTaskCount,
|
||||
markGatewayDraining,
|
||||
resetAllLanes,
|
||||
waitForActiveTasks,
|
||||
} from "../../process/command-queue.js";
|
||||
@@ -111,6 +112,9 @@ export async function runGatewayLoop(params: {
|
||||
// On restart, wait for in-flight agent turns to finish before
|
||||
// tearing down the server so buffered messages are delivered.
|
||||
if (isRestart) {
|
||||
// Reject new enqueues immediately during the drain window so
|
||||
// sessions get an explicit restart error instead of silent task loss.
|
||||
markGatewayDraining();
|
||||
const activeTasks = getActiveTaskCount();
|
||||
if (activeTasks > 0) {
|
||||
gatewayLog.info(
|
||||
|
||||
@@ -84,6 +84,14 @@ export type SessionEntry = {
|
||||
spawnDepth?: number;
|
||||
systemSent?: boolean;
|
||||
abortedLastRun?: boolean;
|
||||
/**
|
||||
* Session-level stop cutoff captured when /stop is received.
|
||||
* Messages at/before this boundary are skipped to avoid replaying
|
||||
* queued pre-stop backlog.
|
||||
*/
|
||||
abortCutoffMessageSid?: string;
|
||||
/** Epoch ms cutoff paired with abortCutoffMessageSid when available. */
|
||||
abortCutoffTimestamp?: number;
|
||||
chatType?: SessionChatType;
|
||||
thinkingLevel?: string;
|
||||
verboseLevel?: string;
|
||||
|
||||
@@ -9,7 +9,7 @@ import { CronService } from "./service.js";
|
||||
import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js";
|
||||
import { computeJobNextRunAtMs } from "./service/jobs.js";
|
||||
import { createCronServiceState, type CronEvent } from "./service/state.js";
|
||||
import { executeJobCore, onTimer, runMissedJobs } from "./service/timer.js";
|
||||
import { DEFAULT_JOB_TIMEOUT_MS, executeJobCore, onTimer, runMissedJobs } from "./service/timer.js";
|
||||
import type { CronJob, CronJobState } from "./types.js";
|
||||
|
||||
const noopLogger = {
|
||||
@@ -838,6 +838,58 @@ describe("Cron issue regressions", () => {
|
||||
expect(job?.state.lastStatus).toBe("ok");
|
||||
});
|
||||
|
||||
it("does not time out agentTurn jobs at the default 10-minute safety window", async () => {
|
||||
const store = await makeStorePath();
|
||||
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
||||
|
||||
const cronJob = createIsolatedRegressionJob({
|
||||
id: "agentturn-default-safety-window",
|
||||
name: "agentturn default safety window",
|
||||
scheduledAt,
|
||||
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
|
||||
payload: { kind: "agentTurn", message: "work" },
|
||||
state: { nextRunAtMs: scheduledAt },
|
||||
});
|
||||
await writeCronJobs(store.storePath, [cronJob]);
|
||||
|
||||
let now = scheduledAt;
|
||||
const deferredRun = createDeferred<{ status: "ok"; summary: string }>();
|
||||
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }: { abortSignal?: AbortSignal }) => {
|
||||
const result = await deferredRun.promise;
|
||||
if (abortSignal?.aborted) {
|
||||
return { status: "error" as const, error: String(abortSignal.reason) };
|
||||
}
|
||||
now += 5;
|
||||
return result;
|
||||
});
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob,
|
||||
});
|
||||
|
||||
const timerPromise = onTimer(state);
|
||||
let settled = false;
|
||||
void timerPromise.finally(() => {
|
||||
settled = true;
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(DEFAULT_JOB_TIMEOUT_MS + 1_000);
|
||||
await Promise.resolve();
|
||||
expect(settled).toBe(false);
|
||||
|
||||
deferredRun.resolve({ status: "ok", summary: "done" });
|
||||
await timerPromise;
|
||||
|
||||
const job = state.store?.jobs.find((entry) => entry.id === "agentturn-default-safety-window");
|
||||
expect(job?.state.lastStatus).toBe("ok");
|
||||
expect(job?.state.lastError).toBeUndefined();
|
||||
});
|
||||
|
||||
it("aborts isolated runs when cron timeout fires", async () => {
|
||||
vi.useRealTimers();
|
||||
const store = await makeStorePath();
|
||||
|
||||
@@ -36,6 +36,7 @@ const MIN_REFIRE_GAP_MS = 2_000;
|
||||
* from wedging the entire cron lane.
|
||||
*/
|
||||
export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
|
||||
const AGENT_TURN_SAFETY_TIMEOUT_MS = 60 * 60_000; // 60 minutes
|
||||
|
||||
type TimedCronRunOutcome = CronRunOutcome &
|
||||
CronRunTelemetry & {
|
||||
@@ -52,7 +53,7 @@ function resolveCronJobTimeoutMs(job: CronJob): number | undefined {
|
||||
? Math.floor(job.payload.timeoutSeconds * 1_000)
|
||||
: undefined;
|
||||
if (configuredTimeoutMs === undefined) {
|
||||
return DEFAULT_JOB_TIMEOUT_MS;
|
||||
return job.payload.kind === "agentTurn" ? AGENT_TURN_SAFETY_TIMEOUT_MS : DEFAULT_JOB_TIMEOUT_MS;
|
||||
}
|
||||
return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs;
|
||||
}
|
||||
|
||||
@@ -21,8 +21,10 @@ import {
|
||||
CommandLaneClearedError,
|
||||
enqueueCommand,
|
||||
enqueueCommandInLane,
|
||||
GatewayDrainingError,
|
||||
getActiveTaskCount,
|
||||
getQueueSize,
|
||||
markGatewayDraining,
|
||||
resetAllLanes,
|
||||
setCommandLaneConcurrency,
|
||||
waitForActiveTasks,
|
||||
@@ -52,6 +54,7 @@ function enqueueBlockedMainTask<T = void>(
|
||||
|
||||
describe("command queue", () => {
|
||||
beforeEach(() => {
|
||||
resetAllLanes();
|
||||
diagnosticMocks.logLaneEnqueue.mockClear();
|
||||
diagnosticMocks.logLaneDequeue.mockClear();
|
||||
diagnosticMocks.diag.debug.mockClear();
|
||||
@@ -288,4 +291,47 @@ describe("command queue", () => {
|
||||
release();
|
||||
await expect(first).resolves.toBe("first");
|
||||
});
|
||||
|
||||
it("keeps draining functional after synchronous onWait failure", async () => {
|
||||
const lane = `drain-sync-throw-${Date.now()}-${Math.random().toString(16).slice(2)}`;
|
||||
setCommandLaneConcurrency(lane, 1);
|
||||
|
||||
const deferred = createDeferred();
|
||||
const first = enqueueCommandInLane(lane, async () => {
|
||||
await deferred.promise;
|
||||
return "first";
|
||||
});
|
||||
const second = enqueueCommandInLane(lane, async () => "second", {
|
||||
warnAfterMs: 0,
|
||||
onWait: () => {
|
||||
throw new Error("onWait exploded");
|
||||
},
|
||||
});
|
||||
await Promise.resolve();
|
||||
expect(getQueueSize(lane)).toBeGreaterThanOrEqual(2);
|
||||
|
||||
deferred.resolve();
|
||||
await expect(first).resolves.toBe("first");
|
||||
await expect(second).resolves.toBe("second");
|
||||
});
|
||||
|
||||
it("rejects new enqueues with GatewayDrainingError after markGatewayDraining", async () => {
|
||||
markGatewayDraining();
|
||||
await expect(enqueueCommand(async () => "blocked")).rejects.toBeInstanceOf(
|
||||
GatewayDrainingError,
|
||||
);
|
||||
});
|
||||
|
||||
it("does not affect already-active tasks after markGatewayDraining", async () => {
|
||||
const { task, release } = enqueueBlockedMainTask(async () => "ok");
|
||||
markGatewayDraining();
|
||||
release();
|
||||
await expect(task).resolves.toBe("ok");
|
||||
});
|
||||
|
||||
it("resetAllLanes clears gateway draining flag and re-allows enqueue", async () => {
|
||||
markGatewayDraining();
|
||||
resetAllLanes();
|
||||
await expect(enqueueCommand(async () => "ok")).resolves.toBe("ok");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,6 +12,20 @@ export class CommandLaneClearedError extends Error {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dedicated error type thrown when a new command is rejected because the
|
||||
* gateway is currently draining for restart.
|
||||
*/
|
||||
export class GatewayDrainingError extends Error {
|
||||
constructor() {
|
||||
super("Gateway is draining for restart; new tasks are not accepted");
|
||||
this.name = "GatewayDrainingError";
|
||||
}
|
||||
}
|
||||
|
||||
// Set while gateway is draining for restart; new enqueues are rejected.
|
||||
let gatewayDraining = false;
|
||||
|
||||
// Minimal in-process queue to serialize command executions.
|
||||
// Default lane ("main") preserves the existing behavior. Additional lanes allow
|
||||
// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for
|
||||
@@ -66,57 +80,77 @@ function completeTask(state: LaneState, taskId: number, taskGeneration: number):
|
||||
function drainLane(lane: string) {
|
||||
const state = getLaneState(lane);
|
||||
if (state.draining) {
|
||||
if (state.activeTaskIds.size === 0 && state.queue.length > 0) {
|
||||
diag.warn(
|
||||
`drainLane blocked: lane=${lane} draining=true active=0 queue=${state.queue.length}`,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
state.draining = true;
|
||||
|
||||
const pump = () => {
|
||||
while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
|
||||
const entry = state.queue.shift() as QueueEntry;
|
||||
const waitedMs = Date.now() - entry.enqueuedAt;
|
||||
if (waitedMs >= entry.warnAfterMs) {
|
||||
entry.onWait?.(waitedMs, state.queue.length);
|
||||
diag.warn(
|
||||
`lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
|
||||
);
|
||||
}
|
||||
logLaneDequeue(lane, waitedMs, state.queue.length);
|
||||
const taskId = nextTaskId++;
|
||||
const taskGeneration = state.generation;
|
||||
state.activeTaskIds.add(taskId);
|
||||
void (async () => {
|
||||
const startTime = Date.now();
|
||||
try {
|
||||
const result = await entry.task();
|
||||
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
|
||||
if (completedCurrentGeneration) {
|
||||
diag.debug(
|
||||
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`,
|
||||
);
|
||||
pump();
|
||||
try {
|
||||
while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
|
||||
const entry = state.queue.shift() as QueueEntry;
|
||||
const waitedMs = Date.now() - entry.enqueuedAt;
|
||||
if (waitedMs >= entry.warnAfterMs) {
|
||||
try {
|
||||
entry.onWait?.(waitedMs, state.queue.length);
|
||||
} catch (err) {
|
||||
diag.error(`lane onWait callback failed: lane=${lane} error="${String(err)}"`);
|
||||
}
|
||||
entry.resolve(result);
|
||||
} catch (err) {
|
||||
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
|
||||
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
|
||||
if (!isProbeLane) {
|
||||
diag.error(
|
||||
`lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`,
|
||||
);
|
||||
}
|
||||
if (completedCurrentGeneration) {
|
||||
pump();
|
||||
}
|
||||
entry.reject(err);
|
||||
diag.warn(
|
||||
`lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
|
||||
);
|
||||
}
|
||||
})();
|
||||
logLaneDequeue(lane, waitedMs, state.queue.length);
|
||||
const taskId = nextTaskId++;
|
||||
const taskGeneration = state.generation;
|
||||
state.activeTaskIds.add(taskId);
|
||||
void (async () => {
|
||||
const startTime = Date.now();
|
||||
try {
|
||||
const result = await entry.task();
|
||||
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
|
||||
if (completedCurrentGeneration) {
|
||||
diag.debug(
|
||||
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`,
|
||||
);
|
||||
pump();
|
||||
}
|
||||
entry.resolve(result);
|
||||
} catch (err) {
|
||||
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
|
||||
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
|
||||
if (!isProbeLane) {
|
||||
diag.error(
|
||||
`lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`,
|
||||
);
|
||||
}
|
||||
if (completedCurrentGeneration) {
|
||||
pump();
|
||||
}
|
||||
entry.reject(err);
|
||||
}
|
||||
})();
|
||||
}
|
||||
} finally {
|
||||
state.draining = false;
|
||||
}
|
||||
state.draining = false;
|
||||
};
|
||||
|
||||
pump();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark gateway as draining for restart so new enqueues fail fast with
|
||||
* `GatewayDrainingError` instead of being silently killed on shutdown.
|
||||
*/
|
||||
export function markGatewayDraining(): void {
|
||||
gatewayDraining = true;
|
||||
}
|
||||
|
||||
export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
|
||||
const cleaned = lane.trim() || CommandLane.Main;
|
||||
const state = getLaneState(cleaned);
|
||||
@@ -132,6 +166,9 @@ export function enqueueCommandInLane<T>(
|
||||
onWait?: (waitMs: number, queuedAhead: number) => void;
|
||||
},
|
||||
): Promise<T> {
|
||||
if (gatewayDraining) {
|
||||
return Promise.reject(new GatewayDrainingError());
|
||||
}
|
||||
const cleaned = lane.trim() || CommandLane.Main;
|
||||
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
|
||||
const state = getLaneState(cleaned);
|
||||
@@ -205,6 +242,7 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
* `enqueueCommandInLane()` call (which may never come).
|
||||
*/
|
||||
export function resetAllLanes(): void {
|
||||
gatewayDraining = false;
|
||||
const lanesToDrain: string[] = [];
|
||||
for (const state of lanes.values()) {
|
||||
state.generation += 1;
|
||||
|
||||
Reference in New Issue
Block a user