fix cron announce routing and timeout handling

This commit is contained in:
Tyler Yust
2026-02-17 11:40:04 -08:00
parent e1015a5197
commit 75001a0490
12 changed files with 298 additions and 43 deletions

View File

@@ -25,9 +25,9 @@ Docs: https://docs.openclaw.ai
### Fixes
- BlueBubbles: add fallback path to recover outbound `message_id` from `fromMe` webhooks when platform message IDs are missing.
- BlueBubbles: match outbound message-id fallback recovery by chat identifier as well as account context.
- BlueBubbles: include sender identifier in untrusted conversation metadata for conversation info payloads.
- BlueBubbles: add fallback path to recover outbound `message_id` from `fromMe` webhooks when platform message IDs are missing. Thanks @tyler6204.
- BlueBubbles: match outbound message-id fallback recovery by chat identifier as well as account context. Thanks @tyler6204.
- BlueBubbles: include sender identifier in untrusted conversation metadata for conversation info payloads. Thanks @tyler6204.
- macOS/Update: correct the Sparkle appcast version for 2026.2.15 so updates are offered again. (#18201)
- Gateway/Auth: clear stale device-auth tokens after device token mismatch errors so re-paired clients can re-auth. (#18201)
- Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky.
@@ -47,6 +47,9 @@ Docs: https://docs.openclaw.ai
- Feishu: detect bot mentions in post messages with embedded docs when `message.mentions` is empty. (#18074) Thanks @popomore.
- Agents/Sessions: align session lock watchdog hold windows with run and compaction timeout budgets (plus grace), preventing valid long-running turns from being force-unlocked mid-run while still recovering hung lock owners. (#18060)
- Cron: preserve default model fallbacks for cron agent runs when only `model.primary` is overridden, so failover still follows configured fallbacks unless explicitly cleared with `fallbacks: []`. (#18210) Thanks @mahsumaktas.
- Cron: route text-only announce output through the main session announce flow via runSubagentAnnounceFlow so cron text-only output remains visible to the initiating session. Thanks @tyler6204.
- Cron: treat `timeoutSeconds: 0` as no-timeout (not clamped to 1), ensuring long-running cron runs are not prematurely terminated. Thanks @tyler6204.
- Cron announce injection now targets the session determined by delivery config (`to` + channel) instead of defaulting to the current session. Thanks @tyler6204.
- Cron/Heartbeat: canonicalize session-scoped reminder `sessionKey` routing and preserve explicit flat `sessionKey` cron tool inputs, preventing enqueue/wake namespace drift for session-targeted reminders. (#18637) Thanks @vignesh07.
- Cron/Webhooks: reuse existing session IDs for webhook/cron runs when the session key is stable and still fresh, preserving conversation history. (#18031) Thanks @Operative-001.
- Cron: prevent spin loops when cron jobs complete within the scheduled second by advancing the next run and enforcing a minimum refire gap. (#18073) Thanks @widingmarcus-cyber.
@@ -92,7 +95,7 @@ Docs: https://docs.openclaw.ai
- CLI/Message: preserve `--components` JSON payloads in `openclaw message send` so Discord component payloads are no longer dropped. (#18222) Thanks @saurabhchopade.
- Voice Call: add an optional stale call reaper (`staleCallReaperSeconds`) to end stuck calls when enabled. (#18437)
- Auto-reply/Subagents: propagate group context (`groupId`, `groupChannel`, `space`) when spawning via `/subagents spawn`, matching tool-triggered subagent spawn behavior.
- Subagents: route nested announce results back to the parent session after the parent run ends, falling back only when the parent session is deleted. (#18043)
- Subagents: route nested announce results back to the parent session after the parent run ends, falling back only when the parent session is deleted. (#18043) Thanks @tyler6204.
- Subagents: cap announce retry loops with max attempts and expiry to prevent infinite retry spam after deferred announces. (#18444)
- Agents/Tools/exec: add a preflight guard that detects likely shell env var injection (e.g. `$DM_JSON`, `$TMPDIR`) in Python/Node scripts before execution, preventing recurring cron failures and wasted tokens when models emit mixed shell+language source. (#12836)
- Agents/Tools/exec: treat normal non-zero exit codes as completed and append the exit code to tool output to avoid false tool-failure warnings. (#18425)

View File

@@ -237,7 +237,7 @@ PAYLOAD TYPES (payload.kind):
- "systemEvent": Injects text as system event into session
{ "kind": "systemEvent", "text": "<message>" }
- "agentTurn": Runs agent with message (isolated sessions only)
{ "kind": "agentTurn", "message": "<prompt>", "model": "<optional>", "thinking": "<optional>", "timeoutSeconds": <optional> }
{ "kind": "agentTurn", "message": "<prompt>", "model": "<optional>", "thinking": "<optional>", "timeoutSeconds": <optional, 0 means no timeout> }
DELIVERY (top-level):
{ "mode": "none|announce|webhook", "channel": "<optional>", "to": "<optional>", "bestEffort": <optional-bool> }

View File

@@ -93,7 +93,7 @@ async function expectBestEffortTelegramNotDelivered(
});
}
async function expectExplicitTelegramTargetDelivery(params: {
async function expectExplicitTelegramTargetAnnounce(params: {
payloads: Array<Record<string, unknown>>;
expectedText: string;
}): Promise<void> {
@@ -110,11 +110,17 @@ async function expectExplicitTelegramTargetDelivery(params: {
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
const [to, text] = vi.mocked(deps.sendMessageTelegram).mock.calls[0] ?? [];
expect(to).toBe("123");
expect(text).toBe(params.expectedText);
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| {
requesterOrigin?: { channel?: string; to?: string };
roundOneReply?: string;
}
| undefined;
expect(announceArgs?.requesterOrigin?.channel).toBe("telegram");
expect(announceArgs?.requesterOrigin?.to).toBe("123");
expect(announceArgs?.roundOneReply).toBe(params.expectedText);
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
});
}
@@ -123,20 +129,61 @@ describe("runCronIsolatedAgentTurn", () => {
setupIsolatedAgentTurnMocks();
});
it("delivers directly when delivery has an explicit target", async () => {
await expectExplicitTelegramTargetDelivery({
it("routes text-only explicit target delivery through announce flow", async () => {
await expectExplicitTelegramTargetAnnounce({
payloads: [{ text: "hello from cron" }],
expectedText: "hello from cron",
});
});
it("delivers the final payload text when delivery has an explicit target", async () => {
await expectExplicitTelegramTargetDelivery({
it("announces the final payload text when delivery has an explicit target", async () => {
await expectExplicitTelegramTargetAnnounce({
payloads: [{ text: "Working on it..." }, { text: "Final weather summary" }],
expectedText: "Final weather summary",
});
});
it("routes announce injection to the delivery-target session key", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads([{ text: "hello from cron" }]);
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
session: {
store: storePath,
mainKey: "main",
dmScope: "per-channel-peer",
},
channels: {
telegram: { botToken: "t-1" },
},
}),
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| {
requesterSessionKey?: string;
requesterOrigin?: { channel?: string; to?: string };
}
| undefined;
expect(announceArgs?.requesterSessionKey).toBe("agent:main:telegram:direct:123");
expect(announceArgs?.requesterOrigin?.channel).toBe("telegram");
expect(announceArgs?.requesterOrigin?.to).toBe("123");
});
});
it("passes resolved threadId into shared subagent announce flow", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
@@ -225,13 +272,13 @@ describe("runCronIsolatedAgentTurn", () => {
});
});
it("fails when direct delivery fails and best-effort is disabled", async () => {
it("fails when structured direct delivery fails and best-effort is disabled", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps({
sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
});
mockAgentPayloads([{ text: "hello from cron" }]);
mockAgentPayloads([{ text: "hello from cron", mediaUrl: "https://example.com/img.png" }]);
const res = await runTelegramAnnounceTurn({
home,
storePath,
@@ -246,7 +293,10 @@ describe("runCronIsolatedAgentTurn", () => {
});
});
it("ignores direct delivery failures when best-effort is enabled", async () => {
await expectBestEffortTelegramNotDelivered({ text: "hello from cron" });
it("ignores structured direct delivery failures when best-effort is enabled", async () => {
await expectBestEffortTelegramNotDelivered({
text: "hello from cron",
mediaUrl: "https://example.com/img.png",
});
});
});

View File

@@ -44,6 +44,7 @@ import type { AgentDefaultsConfig } from "../../config/types.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js";
import { logWarn } from "../../logger.js";
import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js";
import {
@@ -100,6 +101,40 @@ function resolveCronDeliveryBestEffort(job: CronJob): boolean {
return false;
}
async function resolveCronAnnounceSessionKey(params: {
cfg: OpenClawConfig;
agentId: string;
fallbackSessionKey: string;
delivery: {
channel: Parameters<typeof resolveOutboundSessionRoute>[0]["channel"];
to?: string;
accountId?: string;
threadId?: string | number;
};
}): Promise<string> {
const to = params.delivery.to?.trim();
if (!to) {
return params.fallbackSessionKey;
}
try {
const route = await resolveOutboundSessionRoute({
cfg: params.cfg,
channel: params.delivery.channel,
agentId: params.agentId,
accountId: params.delivery.accountId,
target: to,
threadId: params.delivery.threadId,
});
const resolved = route?.sessionKey?.trim();
if (resolved) {
return resolved;
}
} catch {
// Fall back to main session routing if announce session resolution fails.
}
return params.fallbackSessionKey;
}
export type RunCronAgentTurnResult = {
/** Last non-empty agent text output (not truncated). */
outputText?: string;
@@ -584,15 +619,14 @@ export async function runCronIsolatedAgentTurn(params: {
}
const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId);
// Shared subagent announce flow is text-based and prompts the main agent to
// summarize. When we have an explicit delivery target (delivery.to), sender
// identity, or structured content, prefer direct outbound delivery to send
// the actual cron output without summarization.
const hasExplicitDeliveryTarget = Boolean(deliveryPlan.to);
if (deliveryPayloadHasStructuredContent || identity || hasExplicitDeliveryTarget) {
// Route text-only cron announce output back through the main session so it
// follows the same system-message injection path as subagent completions.
// Keep direct outbound delivery only for structured payloads (media/channel
// data), which cannot be represented by the shared announce flow.
if (deliveryPayloadHasStructuredContent) {
try {
const payloadsForDelivery =
deliveryPayloadHasStructuredContent && deliveryPayloads.length > 0
deliveryPayloads.length > 0
? deliveryPayloads
: synthesizedText
? [{ text: synthesizedText }]
@@ -624,10 +658,21 @@ export async function runCronIsolatedAgentTurn(params: {
}
}
} else if (synthesizedText) {
const announceSessionKey = resolveAgentMainSessionKey({
const announceMainSessionKey = resolveAgentMainSessionKey({
cfg: params.cfg,
agentId,
});
const announceSessionKey = await resolveCronAnnounceSessionKey({
cfg: cfgWithAgentDefaults,
agentId,
fallbackSessionKey: announceMainSessionKey,
delivery: {
channel: resolvedDelivery.channel,
to: resolvedDelivery.to,
accountId: resolvedDelivery.accountId,
threadId: resolvedDelivery.threadId,
},
});
const taskLabel =
typeof params.job.name === "string" && params.job.name.trim()
? params.job.name.trim()

View File

@@ -300,6 +300,18 @@ describe("normalizeCronJobCreate", () => {
expect(payload.allowUnsafeExternalContent).toBe(true);
});
it("preserves timeoutSeconds=0 for no-timeout agentTurn payloads", () => {
const normalized = normalizeCronJobCreate({
name: "legacy no-timeout",
schedule: { kind: "every", everyMs: 60_000 },
payload: { kind: "agentTurn", message: "hello" },
timeoutSeconds: 0,
}) as unknown as Record<string, unknown>;
const payload = normalized.payload as Record<string, unknown>;
expect(payload.timeoutSeconds).toBe(0);
});
it("coerces sessionTarget and wakeMode casing", () => {
const normalized = normalizeCronJobCreate({
name: "casing",

View File

@@ -131,7 +131,7 @@ function coercePayload(payload: UnknownRecord) {
}
if ("timeoutSeconds" in next) {
if (typeof next.timeoutSeconds === "number" && Number.isFinite(next.timeoutSeconds)) {
next.timeoutSeconds = Math.max(1, Math.floor(next.timeoutSeconds));
next.timeoutSeconds = Math.max(0, Math.floor(next.timeoutSeconds));
} else {
delete next.timeoutSeconds;
}

View File

@@ -642,6 +642,62 @@ describe("Cron issue regressions", () => {
expect(job!.state.nextRunAtMs).toBeGreaterThanOrEqual(minNext);
});
it("treats timeoutSeconds=0 as no timeout for isolated agentTurn jobs", async () => {
const store = await makeStorePath();
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
const cronJob: CronJob = {
id: "no-timeout-0",
name: "no-timeout",
enabled: true,
createdAtMs: scheduledAt - 86_400_000,
updatedAtMs: scheduledAt - 86_400_000,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0 },
delivery: { mode: "announce" },
state: { nextRunAtMs: scheduledAt },
};
await fs.writeFile(
store.storePath,
JSON.stringify({ version: 1, jobs: [cronJob] }, null, 2),
"utf-8",
);
let now = scheduledAt;
const deferredRun = createDeferred<{ status: "ok"; summary: string }>();
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => {
const result = await deferredRun.promise;
now += 5;
return result;
}),
});
const timerPromise = onTimer(state);
let settled = false;
void timerPromise.finally(() => {
settled = true;
});
await vi.advanceTimersByTimeAsync(0);
await Promise.resolve();
expect(settled).toBe(false);
deferredRun.resolve({ status: "ok", summary: "done" });
await timerPromise;
const job = state.store?.jobs.find((j) => j.id === "no-timeout-0");
expect(job?.state.lastStatus).toBe("ok");
});
it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => {
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
const cronJob: CronJob = {

View File

@@ -102,4 +102,56 @@ describe("CronService store migrations", () => {
cron.stop();
await store.cleanup();
});
it("preserves legacy timeoutSeconds=0 during top-level agentTurn field migration", async () => {
const store = await makeStorePath();
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile(
store.storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "legacy-agentturn-no-timeout",
name: "legacy no-timeout",
enabled: true,
createdAtMs: Date.parse("2026-02-01T12:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-05T12:00:00.000Z"),
schedule: { kind: "cron", expr: "0 23 * * *", tz: "UTC" },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
timeoutSeconds: 0,
payload: { kind: "agentTurn", message: "legacy payload fields" },
},
],
},
null,
2,
),
"utf-8",
);
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "ok" })),
});
await cron.start();
const jobs = await cron.list({ includeDisabled: true });
const job = jobs.find((entry) => entry.id === "legacy-agentturn-no-timeout");
expect(job).toBeDefined();
expect(job?.payload.kind).toBe("agentTurn");
if (job?.payload.kind === "agentTurn") {
expect(job.payload.timeoutSeconds).toBe(0);
}
cron.stop();
await store.cleanup();
});
});

View File

@@ -127,7 +127,7 @@ function copyTopLevelAgentTurnFields(
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(1, Math.floor(raw.timeoutSeconds));
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
mutated = true;
}

View File

@@ -243,26 +243,43 @@ export async function onTimer(state: CronServiceState) {
job.state.runningAtMs = startedAt;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
const jobTimeoutMs =
const configuredTimeoutMs =
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
? job.payload.timeoutSeconds * 1_000
? Math.floor(job.payload.timeoutSeconds * 1_000)
: undefined;
const jobTimeoutMs =
configuredTimeoutMs !== undefined
? configuredTimeoutMs <= 0
? undefined
: configuredTimeoutMs
: DEFAULT_JOB_TIMEOUT_MS;
try {
let timeoutId: NodeJS.Timeout;
const result = await Promise.race([
executeJobCore(state, job),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error("cron: job execution timed out")),
jobTimeoutMs,
);
}),
]).finally(() => clearTimeout(timeoutId!));
const result =
typeof jobTimeoutMs === "number"
? await (async () => {
let timeoutId: NodeJS.Timeout | undefined;
try {
return await Promise.race([
executeJobCore(state, job),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error("cron: job execution timed out")),
jobTimeoutMs,
);
}),
]);
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
}
})()
: await executeJobCore(state, job);
results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
} catch (err) {
state.deps.log.warn(
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs },
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null },
`cron: job failed: ${String(err)}`,
);
results.push({

View File

@@ -8,7 +8,7 @@ function cronAgentTurnPayloadSchema(params: { message: TSchema }) {
message: params.message,
model: Type.Optional(Type.String()),
thinking: Type.Optional(Type.String()),
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })),
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 0 })),
allowUnsafeExternalContent: Type.Optional(Type.Boolean()),
deliver: Type.Optional(Type.Boolean()),
channel: Type.Optional(Type.String()),

View File

@@ -180,6 +180,26 @@ describe("gateway server cron", () => {
const mergeJobId = typeof mergeJobIdValue === "string" ? mergeJobIdValue : "";
expect(mergeJobId.length > 0).toBe(true);
const noTimeoutRes = await rpcReq(ws, "cron.add", {
name: "no-timeout payload",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "hello", timeoutSeconds: 0 },
});
expect(noTimeoutRes.ok).toBe(true);
const noTimeoutPayload = noTimeoutRes.payload as
| {
payload?: {
kind?: unknown;
timeoutSeconds?: unknown;
};
}
| undefined;
expect(noTimeoutPayload?.payload?.kind).toBe("agentTurn");
expect(noTimeoutPayload?.payload?.timeoutSeconds).toBe(0);
const mergeUpdateRes = await rpcReq(ws, "cron.update", {
id: mergeJobId,
patch: {