mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 03:21:23 +00:00
fix(delivery): quarantine permanent recovery failures
Co-authored-by: Aldo <17973757+aldoeliacim@users.noreply.github.com>
This commit is contained in:
@@ -61,6 +61,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves.
|
- Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves.
|
||||||
- Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves.
|
- Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves.
|
||||||
- Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl.
|
- Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl.
|
||||||
|
- Delivery/Queue: quarantine queue entries immediately on known permanent delivery errors (for example invalid recipients or missing conversation references) by moving them to `failed/` instead of retrying on every restart. (#23794) Thanks @aldoeliacim.
|
||||||
- Cron/Status: split execution outcome (`lastRunStatus`) from delivery outcome (`lastDeliveryStatus`) in persisted cron state, finished events, and run history so failed/unknown announcement delivery is visible without conflating it with run errors.
|
- Cron/Status: split execution outcome (`lastRunStatus`) from delivery outcome (`lastDeliveryStatus`) in persisted cron state, finished events, and run history so failed/unknown announcement delivery is visible without conflating it with run errors.
|
||||||
- Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber.
|
- Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber.
|
||||||
- Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg.
|
- Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg.
|
||||||
|
|||||||
@@ -282,19 +282,24 @@ export async function recoverPendingDeliveries(opts: {
|
|||||||
recovered += 1;
|
recovered += 1;
|
||||||
opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`);
|
opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
const errMsg = err instanceof Error ? err.message : String(err);
|
||||||
|
if (isPermanentDeliveryError(errMsg)) {
|
||||||
|
opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`);
|
||||||
|
try {
|
||||||
|
await moveToFailed(entry.id, opts.stateDir);
|
||||||
|
} catch (moveErr) {
|
||||||
|
opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(moveErr)}`);
|
||||||
|
}
|
||||||
|
failed += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
await failDelivery(
|
await failDelivery(entry.id, errMsg, opts.stateDir);
|
||||||
entry.id,
|
|
||||||
err instanceof Error ? err.message : String(err),
|
|
||||||
opts.stateDir,
|
|
||||||
);
|
|
||||||
} catch {
|
} catch {
|
||||||
// Best-effort update.
|
// Best-effort update.
|
||||||
}
|
}
|
||||||
failed += 1;
|
failed += 1;
|
||||||
opts.log.warn(
|
opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`);
|
||||||
`Retry failed for delivery ${entry.id}: ${err instanceof Error ? err.message : String(err)}`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -305,3 +310,18 @@ export async function recoverPendingDeliveries(opts: {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export { MAX_RETRIES };
|
export { MAX_RETRIES };
|
||||||
|
|
||||||
|
const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
|
||||||
|
/no conversation reference found/i,
|
||||||
|
/chat not found/i,
|
||||||
|
/user not found/i,
|
||||||
|
/bot was blocked by the user/i,
|
||||||
|
/forbidden: bot was kicked/i,
|
||||||
|
/chat_id is empty/i,
|
||||||
|
/recipient is not a valid/i,
|
||||||
|
/outbound not configured for channel/i,
|
||||||
|
];
|
||||||
|
|
||||||
|
export function isPermanentDeliveryError(error: string): boolean {
|
||||||
|
return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error));
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import {
|
|||||||
type DeliverFn,
|
type DeliverFn,
|
||||||
enqueueDelivery,
|
enqueueDelivery,
|
||||||
failDelivery,
|
failDelivery,
|
||||||
|
isPermanentDeliveryError,
|
||||||
loadPendingDeliveries,
|
loadPendingDeliveries,
|
||||||
MAX_RETRIES,
|
MAX_RETRIES,
|
||||||
moveToFailed,
|
moveToFailed,
|
||||||
@@ -142,6 +143,30 @@ describe("delivery-queue", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("isPermanentDeliveryError", () => {
|
||||||
|
it.each([
|
||||||
|
"No conversation reference found for user:abc",
|
||||||
|
"Telegram send failed: chat not found (chat_id=user:123)",
|
||||||
|
"user not found",
|
||||||
|
"Bot was blocked by the user",
|
||||||
|
"Forbidden: bot was kicked from the group chat",
|
||||||
|
"chat_id is empty",
|
||||||
|
"Outbound not configured for channel: msteams",
|
||||||
|
])("returns true for permanent error: %s", (msg) => {
|
||||||
|
expect(isPermanentDeliveryError(msg)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it.each([
|
||||||
|
"network down",
|
||||||
|
"ETIMEDOUT",
|
||||||
|
"socket hang up",
|
||||||
|
"rate limited",
|
||||||
|
"500 Internal Server Error",
|
||||||
|
])("returns false for transient error: %s", (msg) => {
|
||||||
|
expect(isPermanentDeliveryError(msg)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe("loadPendingDeliveries", () => {
|
describe("loadPendingDeliveries", () => {
|
||||||
it("returns empty array when queue directory does not exist", async () => {
|
it("returns empty array when queue directory does not exist", async () => {
|
||||||
const nonexistent = path.join(tmpDir, "no-such-dir");
|
const nonexistent = path.join(tmpDir, "no-such-dir");
|
||||||
@@ -265,6 +290,26 @@ describe("delivery-queue", () => {
|
|||||||
expect(entries[0].lastError).toBe("network down");
|
expect(entries[0].lastError).toBe("network down");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("moves entries to failed/ immediately on permanent delivery errors", async () => {
|
||||||
|
const id = await enqueueDelivery(
|
||||||
|
{ channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] },
|
||||||
|
tmpDir,
|
||||||
|
);
|
||||||
|
const deliver = vi
|
||||||
|
.fn()
|
||||||
|
.mockRejectedValue(new Error("No conversation reference found for user:abc"));
|
||||||
|
const log = createLog();
|
||||||
|
const { result } = await runRecovery({ deliver, log });
|
||||||
|
|
||||||
|
expect(result.failed).toBe(1);
|
||||||
|
expect(result.recovered).toBe(0);
|
||||||
|
const remaining = await loadPendingDeliveries(tmpDir);
|
||||||
|
expect(remaining).toHaveLength(0);
|
||||||
|
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
|
||||||
|
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
|
||||||
|
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error"));
|
||||||
|
});
|
||||||
|
|
||||||
it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => {
|
it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => {
|
||||||
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user