diff --git a/CHANGELOG.md b/CHANGELOG.md index 15c50c110f6..84f88c9522a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,9 +25,9 @@ Docs: https://docs.openclaw.ai ### Fixes -- BlueBubbles: add fallback path to recover outbound `message_id` from `fromMe` webhooks when platform message IDs are missing. -- BlueBubbles: match outbound message-id fallback recovery by chat identifier as well as account context. -- BlueBubbles: include sender identifier in untrusted conversation metadata for conversation info payloads. +- BlueBubbles: add fallback path to recover outbound `message_id` from `fromMe` webhooks when platform message IDs are missing. Thanks @tyler6204. +- BlueBubbles: match outbound message-id fallback recovery by chat identifier as well as account context. Thanks @tyler6204. +- BlueBubbles: include sender identifier in untrusted conversation metadata for conversation info payloads. Thanks @tyler6204. - macOS/Update: correct the Sparkle appcast version for 2026.2.15 so updates are offered again. (#18201) - Gateway/Auth: clear stale device-auth tokens after device token mismatch errors so re-paired clients can re-auth. (#18201) - Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky. @@ -47,6 +47,9 @@ Docs: https://docs.openclaw.ai - Feishu: detect bot mentions in post messages with embedded docs when `message.mentions` is empty. (#18074) Thanks @popomore. - Agents/Sessions: align session lock watchdog hold windows with run and compaction timeout budgets (plus grace), preventing valid long-running turns from being force-unlocked mid-run while still recovering hung lock owners. (#18060) - Cron: preserve default model fallbacks for cron agent runs when only `model.primary` is overridden, so failover still follows configured fallbacks unless explicitly cleared with `fallbacks: []`. (#18210) Thanks @mahsumaktas. +- Cron: route text-only announce output through the main session announce flow via runSubagentAnnounceFlow so cron text-only output remains visible to the initiating session. Thanks @tyler6204. +- Cron: treat `timeoutSeconds: 0` as no-timeout (not clamped to 1), ensuring long-running cron runs are not prematurely terminated. Thanks @tyler6204. +- Cron announce injection now targets the session determined by delivery config (`to` + channel) instead of defaulting to the current session. Thanks @tyler6204. - Cron/Heartbeat: canonicalize session-scoped reminder `sessionKey` routing and preserve explicit flat `sessionKey` cron tool inputs, preventing enqueue/wake namespace drift for session-targeted reminders. (#18637) Thanks @vignesh07. - Cron/Webhooks: reuse existing session IDs for webhook/cron runs when the session key is stable and still fresh, preserving conversation history. (#18031) Thanks @Operative-001. - Cron: prevent spin loops when cron jobs complete within the scheduled second by advancing the next run and enforcing a minimum refire gap. (#18073) Thanks @widingmarcus-cyber. @@ -92,7 +95,7 @@ Docs: https://docs.openclaw.ai - CLI/Message: preserve `--components` JSON payloads in `openclaw message send` so Discord component payloads are no longer dropped. (#18222) Thanks @saurabhchopade. - Voice Call: add an optional stale call reaper (`staleCallReaperSeconds`) to end stuck calls when enabled. (#18437) - Auto-reply/Subagents: propagate group context (`groupId`, `groupChannel`, `space`) when spawning via `/subagents spawn`, matching tool-triggered subagent spawn behavior. -- Subagents: route nested announce results back to the parent session after the parent run ends, falling back only when the parent session is deleted. (#18043) +- Subagents: route nested announce results back to the parent session after the parent run ends, falling back only when the parent session is deleted. (#18043) Thanks @tyler6204. - Subagents: cap announce retry loops with max attempts and expiry to prevent infinite retry spam after deferred announces. (#18444) - Agents/Tools/exec: add a preflight guard that detects likely shell env var injection (e.g. `$DM_JSON`, `$TMPDIR`) in Python/Node scripts before execution, preventing recurring cron failures and wasted tokens when models emit mixed shell+language source. (#12836) - Agents/Tools/exec: treat normal non-zero exit codes as completed and append the exit code to tool output to avoid false tool-failure warnings. (#18425) diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index e977ed8302b..63ac6f83fd0 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -237,7 +237,7 @@ PAYLOAD TYPES (payload.kind): - "systemEvent": Injects text as system event into session { "kind": "systemEvent", "text": "" } - "agentTurn": Runs agent with message (isolated sessions only) - { "kind": "agentTurn", "message": "", "model": "", "thinking": "", "timeoutSeconds": } + { "kind": "agentTurn", "message": "", "model": "", "thinking": "", "timeoutSeconds": } DELIVERY (top-level): { "mode": "none|announce|webhook", "channel": "", "to": "", "bestEffort": } diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.e2e.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.e2e.test.ts index 17bcfc11ec2..edb1599e494 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.e2e.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.e2e.test.ts @@ -93,7 +93,7 @@ async function expectBestEffortTelegramNotDelivered( }); } -async function expectExplicitTelegramTargetDelivery(params: { +async function expectExplicitTelegramTargetAnnounce(params: { payloads: Array>; expectedText: string; }): Promise { @@ -110,11 +110,17 @@ async function expectExplicitTelegramTargetDelivery(params: { expect(res.status).toBe("ok"); expect(res.delivered).toBe(true); - expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); - expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); - const [to, text] = vi.mocked(deps.sendMessageTelegram).mock.calls[0] ?? []; - expect(to).toBe("123"); - expect(text).toBe(params.expectedText); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as + | { + requesterOrigin?: { channel?: string; to?: string }; + roundOneReply?: string; + } + | undefined; + expect(announceArgs?.requesterOrigin?.channel).toBe("telegram"); + expect(announceArgs?.requesterOrigin?.to).toBe("123"); + expect(announceArgs?.roundOneReply).toBe(params.expectedText); + expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); } @@ -123,20 +129,61 @@ describe("runCronIsolatedAgentTurn", () => { setupIsolatedAgentTurnMocks(); }); - it("delivers directly when delivery has an explicit target", async () => { - await expectExplicitTelegramTargetDelivery({ + it("routes text-only explicit target delivery through announce flow", async () => { + await expectExplicitTelegramTargetAnnounce({ payloads: [{ text: "hello from cron" }], expectedText: "hello from cron", }); }); - it("delivers the final payload text when delivery has an explicit target", async () => { - await expectExplicitTelegramTargetDelivery({ + it("announces the final payload text when delivery has an explicit target", async () => { + await expectExplicitTelegramTargetAnnounce({ payloads: [{ text: "Working on it..." }, { text: "Final weather summary" }], expectedText: "Final weather summary", }); }); + it("routes announce injection to the delivery-target session key", async () => { + await withTempCronHome(async (home) => { + const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); + const deps = createCliDeps(); + mockAgentPayloads([{ text: "hello from cron" }]); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + session: { + store: storePath, + mainKey: "main", + dmScope: "per-channel-peer", + }, + channels: { + telegram: { botToken: "t-1" }, + }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as + | { + requesterSessionKey?: string; + requesterOrigin?: { channel?: string; to?: string }; + } + | undefined; + expect(announceArgs?.requesterSessionKey).toBe("agent:main:telegram:direct:123"); + expect(announceArgs?.requesterOrigin?.channel).toBe("telegram"); + expect(announceArgs?.requesterOrigin?.to).toBe("123"); + }); + }); + it("passes resolved threadId into shared subagent announce flow", async () => { await withTempCronHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); @@ -225,13 +272,13 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("fails when direct delivery fails and best-effort is disabled", async () => { + it("fails when structured direct delivery fails and best-effort is disabled", async () => { await withTempCronHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); const deps = createCliDeps({ sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")), }); - mockAgentPayloads([{ text: "hello from cron" }]); + mockAgentPayloads([{ text: "hello from cron", mediaUrl: "https://example.com/img.png" }]); const res = await runTelegramAnnounceTurn({ home, storePath, @@ -246,7 +293,10 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("ignores direct delivery failures when best-effort is enabled", async () => { - await expectBestEffortTelegramNotDelivered({ text: "hello from cron" }); + it("ignores structured direct delivery failures when best-effort is enabled", async () => { + await expectBestEffortTelegramNotDelivered({ + text: "hello from cron", + mediaUrl: "https://example.com/img.png", + }); }); }); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index e775ce3df96..fdcdc43564f 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -44,6 +44,7 @@ import type { AgentDefaultsConfig } from "../../config/types.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; +import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js"; import { logWarn } from "../../logger.js"; import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js"; import { @@ -100,6 +101,40 @@ function resolveCronDeliveryBestEffort(job: CronJob): boolean { return false; } +async function resolveCronAnnounceSessionKey(params: { + cfg: OpenClawConfig; + agentId: string; + fallbackSessionKey: string; + delivery: { + channel: Parameters[0]["channel"]; + to?: string; + accountId?: string; + threadId?: string | number; + }; +}): Promise { + const to = params.delivery.to?.trim(); + if (!to) { + return params.fallbackSessionKey; + } + try { + const route = await resolveOutboundSessionRoute({ + cfg: params.cfg, + channel: params.delivery.channel, + agentId: params.agentId, + accountId: params.delivery.accountId, + target: to, + threadId: params.delivery.threadId, + }); + const resolved = route?.sessionKey?.trim(); + if (resolved) { + return resolved; + } + } catch { + // Fall back to main session routing if announce session resolution fails. + } + return params.fallbackSessionKey; +} + export type RunCronAgentTurnResult = { /** Last non-empty agent text output (not truncated). */ outputText?: string; @@ -584,15 +619,14 @@ export async function runCronIsolatedAgentTurn(params: { } const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId); - // Shared subagent announce flow is text-based and prompts the main agent to - // summarize. When we have an explicit delivery target (delivery.to), sender - // identity, or structured content, prefer direct outbound delivery to send - // the actual cron output without summarization. - const hasExplicitDeliveryTarget = Boolean(deliveryPlan.to); - if (deliveryPayloadHasStructuredContent || identity || hasExplicitDeliveryTarget) { + // Route text-only cron announce output back through the main session so it + // follows the same system-message injection path as subagent completions. + // Keep direct outbound delivery only for structured payloads (media/channel + // data), which cannot be represented by the shared announce flow. + if (deliveryPayloadHasStructuredContent) { try { const payloadsForDelivery = - deliveryPayloadHasStructuredContent && deliveryPayloads.length > 0 + deliveryPayloads.length > 0 ? deliveryPayloads : synthesizedText ? [{ text: synthesizedText }] @@ -624,10 +658,21 @@ export async function runCronIsolatedAgentTurn(params: { } } } else if (synthesizedText) { - const announceSessionKey = resolveAgentMainSessionKey({ + const announceMainSessionKey = resolveAgentMainSessionKey({ cfg: params.cfg, agentId, }); + const announceSessionKey = await resolveCronAnnounceSessionKey({ + cfg: cfgWithAgentDefaults, + agentId, + fallbackSessionKey: announceMainSessionKey, + delivery: { + channel: resolvedDelivery.channel, + to: resolvedDelivery.to, + accountId: resolvedDelivery.accountId, + threadId: resolvedDelivery.threadId, + }, + }); const taskLabel = typeof params.job.name === "string" && params.job.name.trim() ? params.job.name.trim() diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index c07066fd828..c381faddee9 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -300,6 +300,18 @@ describe("normalizeCronJobCreate", () => { expect(payload.allowUnsafeExternalContent).toBe(true); }); + it("preserves timeoutSeconds=0 for no-timeout agentTurn payloads", () => { + const normalized = normalizeCronJobCreate({ + name: "legacy no-timeout", + schedule: { kind: "every", everyMs: 60_000 }, + payload: { kind: "agentTurn", message: "hello" }, + timeoutSeconds: 0, + }) as unknown as Record; + + const payload = normalized.payload as Record; + expect(payload.timeoutSeconds).toBe(0); + }); + it("coerces sessionTarget and wakeMode casing", () => { const normalized = normalizeCronJobCreate({ name: "casing", diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 31cce630ef7..3d4c533efa7 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -131,7 +131,7 @@ function coercePayload(payload: UnknownRecord) { } if ("timeoutSeconds" in next) { if (typeof next.timeoutSeconds === "number" && Number.isFinite(next.timeoutSeconds)) { - next.timeoutSeconds = Math.max(1, Math.floor(next.timeoutSeconds)); + next.timeoutSeconds = Math.max(0, Math.floor(next.timeoutSeconds)); } else { delete next.timeoutSeconds; } diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index bbd64f4c728..00af22e9751 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -642,6 +642,62 @@ describe("Cron issue regressions", () => { expect(job!.state.nextRunAtMs).toBeGreaterThanOrEqual(minNext); }); + it("treats timeoutSeconds=0 as no timeout for isolated agentTurn jobs", async () => { + const store = await makeStorePath(); + const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); + + const cronJob: CronJob = { + id: "no-timeout-0", + name: "no-timeout", + enabled: true, + createdAtMs: scheduledAt - 86_400_000, + updatedAtMs: scheduledAt - 86_400_000, + schedule: { kind: "at", at: new Date(scheduledAt).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0 }, + delivery: { mode: "announce" }, + state: { nextRunAtMs: scheduledAt }, + }; + await fs.writeFile( + store.storePath, + JSON.stringify({ version: 1, jobs: [cronJob] }, null, 2), + "utf-8", + ); + + let now = scheduledAt; + const deferredRun = createDeferred<{ status: "ok"; summary: string }>(); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => { + const result = await deferredRun.promise; + now += 5; + return result; + }), + }); + + const timerPromise = onTimer(state); + let settled = false; + void timerPromise.finally(() => { + settled = true; + }); + + await vi.advanceTimersByTimeAsync(0); + await Promise.resolve(); + expect(settled).toBe(false); + + deferredRun.resolve({ status: "ok", summary: "done" }); + await timerPromise; + + const job = state.store?.jobs.find((j) => j.id === "no-timeout-0"); + expect(job?.state.lastStatus).toBe("ok"); + }); + it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => { const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); const cronJob: CronJob = { diff --git a/src/cron/service.store-migration.test.ts b/src/cron/service.store-migration.test.ts index 4edbd0f01b2..6c7170060ef 100644 --- a/src/cron/service.store-migration.test.ts +++ b/src/cron/service.store-migration.test.ts @@ -102,4 +102,56 @@ describe("CronService store migrations", () => { cron.stop(); await store.cleanup(); }); + + it("preserves legacy timeoutSeconds=0 during top-level agentTurn field migration", async () => { + const store = await makeStorePath(); + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "legacy-agentturn-no-timeout", + name: "legacy no-timeout", + enabled: true, + createdAtMs: Date.parse("2026-02-01T12:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-05T12:00:00.000Z"), + schedule: { kind: "cron", expr: "0 23 * * *", tz: "UTC" }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + timeoutSeconds: 0, + payload: { kind: "agentTurn", message: "legacy payload fields" }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "ok" })), + }); + + await cron.start(); + + const jobs = await cron.list({ includeDisabled: true }); + const job = jobs.find((entry) => entry.id === "legacy-agentturn-no-timeout"); + expect(job).toBeDefined(); + expect(job?.payload.kind).toBe("agentTurn"); + if (job?.payload.kind === "agentTurn") { + expect(job.payload.timeoutSeconds).toBe(0); + } + + cron.stop(); + await store.cleanup(); + }); }); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 5335d86f595..b29dd517197 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -127,7 +127,7 @@ function copyTopLevelAgentTurnFields( typeof raw.timeoutSeconds === "number" && Number.isFinite(raw.timeoutSeconds) ) { - payload.timeoutSeconds = Math.max(1, Math.floor(raw.timeoutSeconds)); + payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds)); mutated = true; } diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 9240735d238..18fda9aa78e 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -243,26 +243,43 @@ export async function onTimer(state: CronServiceState) { job.state.runningAtMs = startedAt; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); - const jobTimeoutMs = + const configuredTimeoutMs = job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number" - ? job.payload.timeoutSeconds * 1_000 + ? Math.floor(job.payload.timeoutSeconds * 1_000) + : undefined; + const jobTimeoutMs = + configuredTimeoutMs !== undefined + ? configuredTimeoutMs <= 0 + ? undefined + : configuredTimeoutMs : DEFAULT_JOB_TIMEOUT_MS; try { - let timeoutId: NodeJS.Timeout; - const result = await Promise.race([ - executeJobCore(state, job), - new Promise((_, reject) => { - timeoutId = setTimeout( - () => reject(new Error("cron: job execution timed out")), - jobTimeoutMs, - ); - }), - ]).finally(() => clearTimeout(timeoutId!)); + const result = + typeof jobTimeoutMs === "number" + ? await (async () => { + let timeoutId: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + executeJobCore(state, job), + new Promise((_, reject) => { + timeoutId = setTimeout( + () => reject(new Error("cron: job execution timed out")), + jobTimeoutMs, + ); + }), + ]); + } finally { + if (timeoutId) { + clearTimeout(timeoutId); + } + } + })() + : await executeJobCore(state, job); results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }); } catch (err) { state.deps.log.warn( - { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs }, + { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null }, `cron: job failed: ${String(err)}`, ); results.push({ diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index 0ed3d3de230..a45e3403a47 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -8,7 +8,7 @@ function cronAgentTurnPayloadSchema(params: { message: TSchema }) { message: params.message, model: Type.Optional(Type.String()), thinking: Type.Optional(Type.String()), - timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })), + timeoutSeconds: Type.Optional(Type.Integer({ minimum: 0 })), allowUnsafeExternalContent: Type.Optional(Type.Boolean()), deliver: Type.Optional(Type.Boolean()), channel: Type.Optional(Type.String()), diff --git a/src/gateway/server.cron.e2e.test.ts b/src/gateway/server.cron.e2e.test.ts index 49af77648c7..a307dd2d470 100644 --- a/src/gateway/server.cron.e2e.test.ts +++ b/src/gateway/server.cron.e2e.test.ts @@ -180,6 +180,26 @@ describe("gateway server cron", () => { const mergeJobId = typeof mergeJobIdValue === "string" ? mergeJobIdValue : ""; expect(mergeJobId.length > 0).toBe(true); + const noTimeoutRes = await rpcReq(ws, "cron.add", { + name: "no-timeout payload", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello", timeoutSeconds: 0 }, + }); + expect(noTimeoutRes.ok).toBe(true); + const noTimeoutPayload = noTimeoutRes.payload as + | { + payload?: { + kind?: unknown; + timeoutSeconds?: unknown; + }; + } + | undefined; + expect(noTimeoutPayload?.payload?.kind).toBe("agentTurn"); + expect(noTimeoutPayload?.payload?.timeoutSeconds).toBe(0); + const mergeUpdateRes = await rpcReq(ws, "cron.update", { id: mergeJobId, patch: {