mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-07 19:51:22 +00:00
fix: harden isolated cron announce delivery fallback (#15739) (thanks @widingmarcus-cyber)
This commit is contained in:
@@ -105,6 +105,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Cron: pass `agentId` to `runHeartbeatOnce` for main-session jobs. (#14140) Thanks @ishikawa-pro.
|
- Cron: pass `agentId` to `runHeartbeatOnce` for main-session jobs. (#14140) Thanks @ishikawa-pro.
|
||||||
- Cron: re-arm timers when `onTimer` fires while a job is still executing. (#14233) Thanks @tomron87.
|
- Cron: re-arm timers when `onTimer` fires while a job is still executing. (#14233) Thanks @tomron87.
|
||||||
- Cron: prevent duplicate fires when multiple jobs trigger simultaneously. (#14256) Thanks @xinhuagu.
|
- Cron: prevent duplicate fires when multiple jobs trigger simultaneously. (#14256) Thanks @xinhuagu.
|
||||||
|
- Cron: prevent duplicate announce-mode isolated cron deliveries, and keep main-session fallback active when best-effort structured delivery attempts fail to send any message. (#15739) Thanks @widingmarcus-cyber.
|
||||||
- Cron: isolate scheduler errors so one bad job does not break all jobs. (#14385) Thanks @MarvinDontPanic.
|
- Cron: isolate scheduler errors so one bad job does not break all jobs. (#14385) Thanks @MarvinDontPanic.
|
||||||
- Cron: prevent one-shot `at` jobs from re-firing on restart after skipped/errored runs. (#13878) Thanks @lailoo.
|
- Cron: prevent one-shot `at` jobs from re-firing on restart after skipped/errored runs. (#13878) Thanks @lailoo.
|
||||||
- Heartbeat: prevent scheduler stalls on unexpected run errors and avoid immediate rerun loops after `requests-in-flight` skips. (#14901) Thanks @joeykrug.
|
- Heartbeat: prevent scheduler stalls on unexpected run errors and avoid immediate rerun loops after `requests-in-flight` skips. (#14901) Thanks @joeykrug.
|
||||||
|
|||||||
@@ -135,6 +135,7 @@ describe("runCronIsolatedAgentTurn", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
expect(res.status).toBe("ok");
|
expect(res.status).toBe("ok");
|
||||||
|
expect(res.delivered).toBe(true);
|
||||||
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
|
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
|
||||||
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
|
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
|
||||||
| { announceType?: string }
|
| { announceType?: string }
|
||||||
@@ -280,11 +281,56 @@ describe("runCronIsolatedAgentTurn", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
expect(res.status).toBe("ok");
|
expect(res.status).toBe("ok");
|
||||||
|
expect(res.delivered).toBe(true);
|
||||||
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
|
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
|
||||||
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("reports not-delivered when best-effort structured outbound sends all fail", async () => {
|
||||||
|
await withTempHome(async (home) => {
|
||||||
|
const storePath = await writeSessionStore(home);
|
||||||
|
const deps: CliDeps = {
|
||||||
|
sendMessageWhatsApp: vi.fn(),
|
||||||
|
sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
|
||||||
|
sendMessageDiscord: vi.fn(),
|
||||||
|
sendMessageSignal: vi.fn(),
|
||||||
|
sendMessageIMessage: vi.fn(),
|
||||||
|
};
|
||||||
|
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
|
||||||
|
payloads: [{ text: "caption", mediaUrl: "https://example.com/img.png" }],
|
||||||
|
meta: {
|
||||||
|
durationMs: 5,
|
||||||
|
agentMeta: { sessionId: "s", provider: "p", model: "m" },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const res = await runCronIsolatedAgentTurn({
|
||||||
|
cfg: makeCfg(home, storePath, {
|
||||||
|
channels: { telegram: { botToken: "t-1" } },
|
||||||
|
}),
|
||||||
|
deps,
|
||||||
|
job: {
|
||||||
|
...makeJob({ kind: "agentTurn", message: "do it" }),
|
||||||
|
delivery: {
|
||||||
|
mode: "announce",
|
||||||
|
channel: "telegram",
|
||||||
|
to: "123",
|
||||||
|
bestEffort: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
message: "do it",
|
||||||
|
sessionKey: "cron:job-1",
|
||||||
|
lane: "cron",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(res.status).toBe("ok");
|
||||||
|
expect(res.delivered).toBe(false);
|
||||||
|
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
|
||||||
|
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it("skips announce for heartbeat-only output", async () => {
|
it("skips announce for heartbeat-only output", async () => {
|
||||||
await withTempHome(async (home) => {
|
await withTempHome(async (home) => {
|
||||||
const storePath = await writeSessionStore(home);
|
const storePath = await writeSessionStore(home);
|
||||||
|
|||||||
@@ -103,8 +103,9 @@ export type RunCronAgentTurnResult = {
|
|||||||
sessionKey?: string;
|
sessionKey?: string;
|
||||||
/**
|
/**
|
||||||
* `true` when the isolated run already delivered its output to the target
|
* `true` when the isolated run already delivered its output to the target
|
||||||
* channel (via outbound payloads or the subagent announce flow). Callers
|
* channel (via outbound payloads, the subagent announce flow, or a matching
|
||||||
* should skip posting a summary to the main session to avoid duplicate
|
* messaging-tool send). Callers should skip posting a summary to the main
|
||||||
|
* session to avoid duplicate
|
||||||
* messages. See: https://github.com/openclaw/openclaw/issues/15692
|
* messages. See: https://github.com/openclaw/openclaw/issues/15692
|
||||||
*/
|
*/
|
||||||
delivered?: boolean;
|
delivered?: boolean;
|
||||||
@@ -525,7 +526,9 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
let delivered = false;
|
// `true` means we confirmed at least one outbound send reached the target.
|
||||||
|
// Keep this strict so timer fallback can safely decide whether to wake main.
|
||||||
|
let delivered = skipMessagingToolDelivery;
|
||||||
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
|
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
|
||||||
if (resolvedDelivery.error) {
|
if (resolvedDelivery.error) {
|
||||||
if (!deliveryBestEffort) {
|
if (!deliveryBestEffort) {
|
||||||
@@ -556,7 +559,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||||||
// for media/channel payloads so structured content is preserved.
|
// for media/channel payloads so structured content is preserved.
|
||||||
if (deliveryPayloadHasStructuredContent) {
|
if (deliveryPayloadHasStructuredContent) {
|
||||||
try {
|
try {
|
||||||
await deliverOutboundPayloads({
|
const deliveryResults = await deliverOutboundPayloads({
|
||||||
cfg: cfgWithAgentDefaults,
|
cfg: cfgWithAgentDefaults,
|
||||||
channel: resolvedDelivery.channel,
|
channel: resolvedDelivery.channel,
|
||||||
to: resolvedDelivery.to,
|
to: resolvedDelivery.to,
|
||||||
@@ -566,7 +569,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||||||
bestEffort: deliveryBestEffort,
|
bestEffort: deliveryBestEffort,
|
||||||
deps: createOutboundSendDeps(params.deps),
|
deps: createOutboundSendDeps(params.deps),
|
||||||
});
|
});
|
||||||
delivered = true;
|
delivered = deliveryResults.length > 0;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (!deliveryBestEffort) {
|
if (!deliveryBestEffort) {
|
||||||
return withRunSession({ status: "error", summary, outputText, error: String(err) });
|
return withRunSession({ status: "error", summary, outputText, error: String(err) });
|
||||||
|
|||||||
@@ -329,6 +329,48 @@ describe("CronService", () => {
|
|||||||
await store.cleanup();
|
await store.cleanup();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("does not post isolated summary to main when run already delivered output", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
const requestHeartbeatNow = vi.fn();
|
||||||
|
const runIsolatedAgentJob = vi.fn(async () => ({
|
||||||
|
status: "ok" as const,
|
||||||
|
summary: "done",
|
||||||
|
delivered: true,
|
||||||
|
}));
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent,
|
||||||
|
requestHeartbeatNow,
|
||||||
|
runIsolatedAgentJob,
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
|
||||||
|
await cron.add({
|
||||||
|
enabled: true,
|
||||||
|
name: "weekly delivered",
|
||||||
|
schedule: { kind: "at", at: new Date(atMs).toISOString() },
|
||||||
|
sessionTarget: "isolated",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: { kind: "agentTurn", message: "do it" },
|
||||||
|
delivery: { mode: "announce" },
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
|
||||||
|
await vi.runOnlyPendingTimersAsync();
|
||||||
|
|
||||||
|
await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "ok"));
|
||||||
|
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||||
|
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||||
|
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
it("migrates legacy payload.provider to payload.channel on load", async () => {
|
it("migrates legacy payload.provider to payload.channel on load", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const enqueueSystemEvent = vi.fn();
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
|||||||
@@ -48,7 +48,8 @@ export type CronServiceDeps = {
|
|||||||
sessionKey?: string;
|
sessionKey?: string;
|
||||||
/**
|
/**
|
||||||
* `true` when the isolated run already delivered its output to the target
|
* `true` when the isolated run already delivered its output to the target
|
||||||
* channel. See: https://github.com/openclaw/openclaw/issues/15692
|
* channel (including matching messaging-tool sends). See:
|
||||||
|
* https://github.com/openclaw/openclaw/issues/15692
|
||||||
*/
|
*/
|
||||||
delivered?: boolean;
|
delivered?: boolean;
|
||||||
}>;
|
}>;
|
||||||
|
|||||||
Reference in New Issue
Block a user