mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 04:37:27 +00:00
fix: harden delivery recovery backoff eligibility and tests (#27710) (thanks @Jimmy-xuzimo)
This commit is contained in:
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
|
||||||
|
- Delivery queue/recovery backoff: prevent retry starvation by persisting `lastAttemptAt` on failed sends and deferring recovery retries until each entry's `lastAttemptAt + backoff` window is eligible, while continuing to recover ready entries behind deferred ones. Landed from contributor PR #27710 by @Jimmy-xuzimo. Thanks @Jimmy-xuzimo.
|
||||||
- Microsoft Teams/File uploads: acknowledge `fileConsent/invoke` immediately (`invokeResponse` before upload + file card send) so Teams no longer shows false "Something went wrong" timeout banners while upload completion continues asynchronously; includes updated async regression coverage. Landed from contributor PR #27641 by @scz2011.
|
- Microsoft Teams/File uploads: acknowledge `fileConsent/invoke` immediately (`invokeResponse` before upload + file card send) so Teams no longer shows false "Something went wrong" timeout banners while upload completion continues asynchronously; includes updated async regression coverage. Landed from contributor PR #27641 by @scz2011.
|
||||||
- Queue/Drain/Cron reliability: harden lane draining with guaranteed `draining` flag reset on synchronous pump failures, reject new queue enqueues during gateway restart drain windows (instead of silently killing accepted tasks), add `/stop` queued-backlog cutoff metadata with stale-message skipping (while avoiding cross-session native-stop cutoff bleed), and raise isolated cron `agentTurn` outer safety timeout to avoid false 10-minute timeout races against longer agent session timeouts. (#27407, #27332, #27427)
|
- Queue/Drain/Cron reliability: harden lane draining with guaranteed `draining` flag reset on synchronous pump failures, reject new queue enqueues during gateway restart drain windows (instead of silently killing accepted tasks), add `/stop` queued-backlog cutoff metadata with stale-message skipping (while avoiding cross-session native-stop cutoff bleed), and raise isolated cron `agentTurn` outer safety timeout to avoid false 10-minute timeout races against longer agent session timeouts. (#27407, #27332, #27427)
|
||||||
- Typing/Main reply pipeline: always mark dispatch idle in `agent-runner` finalization so typing cleanup runs even when dispatcher `onIdle` does not fire, preventing stuck typing indicators after run completion. (#27250) Thanks @Sid-Qin.
|
- Typing/Main reply pipeline: always mark dispatch idle in `agent-runner` finalization so typing cleanup runs even when dispatcher `onIdle` does not fire, preventing stuck typing indicators after run completion. (#27250) Thanks @Sid-Qin.
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ export interface QueuedDelivery extends QueuedDeliveryPayload {
|
|||||||
id: string;
|
id: string;
|
||||||
enqueuedAt: number;
|
enqueuedAt: number;
|
||||||
retryCount: number;
|
retryCount: number;
|
||||||
|
lastAttemptAt?: number;
|
||||||
lastError?: string;
|
lastError?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,6 +123,7 @@ export async function failDelivery(id: string, error: string, stateDir?: string)
|
|||||||
const raw = await fs.promises.readFile(filePath, "utf-8");
|
const raw = await fs.promises.readFile(filePath, "utf-8");
|
||||||
const entry: QueuedDelivery = JSON.parse(raw);
|
const entry: QueuedDelivery = JSON.parse(raw);
|
||||||
entry.retryCount += 1;
|
entry.retryCount += 1;
|
||||||
|
entry.lastAttemptAt = Date.now();
|
||||||
entry.lastError = error;
|
entry.lastError = error;
|
||||||
const tmp = `${filePath}.${process.pid}.tmp`;
|
const tmp = `${filePath}.${process.pid}.tmp`;
|
||||||
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
|
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
|
||||||
@@ -208,8 +210,6 @@ export async function recoverPendingDeliveries(opts: {
|
|||||||
log: RecoveryLogger;
|
log: RecoveryLogger;
|
||||||
cfg: OpenClawConfig;
|
cfg: OpenClawConfig;
|
||||||
stateDir?: string;
|
stateDir?: string;
|
||||||
/** Override for testing — resolves instead of using real setTimeout. */
|
|
||||||
delay?: (ms: number) => Promise<void>;
|
|
||||||
/** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */
|
/** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */
|
||||||
maxRecoveryMs?: number;
|
maxRecoveryMs?: number;
|
||||||
}): Promise<{ recovered: number; failed: number; skipped: number }> {
|
}): Promise<{ recovered: number; failed: number; skipped: number }> {
|
||||||
@@ -223,12 +223,12 @@ export async function recoverPendingDeliveries(opts: {
|
|||||||
|
|
||||||
opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`);
|
opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`);
|
||||||
|
|
||||||
const delayFn = opts.delay ?? ((ms: number) => new Promise<void>((r) => setTimeout(r, ms)));
|
|
||||||
const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000);
|
const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000);
|
||||||
|
|
||||||
let recovered = 0;
|
let recovered = 0;
|
||||||
let failed = 0;
|
let failed = 0;
|
||||||
let skipped = 0;
|
let skipped = 0;
|
||||||
|
let deferred = 0;
|
||||||
|
|
||||||
for (const entry of pending) {
|
for (const entry of pending) {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
@@ -252,15 +252,18 @@ export async function recoverPendingDeliveries(opts: {
|
|||||||
|
|
||||||
const backoff = computeBackoffMs(entry.retryCount + 1);
|
const backoff = computeBackoffMs(entry.retryCount + 1);
|
||||||
if (backoff > 0) {
|
if (backoff > 0) {
|
||||||
if (now + backoff >= deadline) {
|
const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined;
|
||||||
opts.log.info(
|
if (!firstReplayAfterCrash) {
|
||||||
`Backoff ${backoff}ms exceeds budget for ${entry.id} — skipping to next entry`,
|
const baseAttemptAt = entry.lastAttemptAt ?? entry.enqueuedAt;
|
||||||
);
|
const nextEligibleAt = baseAttemptAt + backoff;
|
||||||
skipped += 1;
|
if (now < nextEligibleAt) {
|
||||||
continue;
|
deferred += 1;
|
||||||
|
opts.log.info(
|
||||||
|
`Delivery ${entry.id} not ready for retry yet — backoff ${nextEligibleAt - now}ms remaining`,
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
opts.log.info(`Waiting ${backoff}ms before retrying delivery ${entry.id}`);
|
|
||||||
await delayFn(backoff);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -304,7 +307,7 @@ export async function recoverPendingDeliveries(opts: {
|
|||||||
}
|
}
|
||||||
|
|
||||||
opts.log.info(
|
opts.log.info(
|
||||||
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries)`,
|
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries), ${deferred} deferred (backoff)`,
|
||||||
);
|
);
|
||||||
return { recovered, failed, skipped };
|
return { recovered, failed, skipped };
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -104,7 +104,7 @@ describe("delivery-queue", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("failDelivery", () => {
|
describe("failDelivery", () => {
|
||||||
it("increments retryCount and sets lastError", async () => {
|
it("increments retryCount, records attempt time, and sets lastError", async () => {
|
||||||
const id = await enqueueDelivery(
|
const id = await enqueueDelivery(
|
||||||
{
|
{
|
||||||
channel: "telegram",
|
channel: "telegram",
|
||||||
@@ -119,6 +119,8 @@ describe("delivery-queue", () => {
|
|||||||
const queueDir = path.join(tmpDir, "delivery-queue");
|
const queueDir = path.join(tmpDir, "delivery-queue");
|
||||||
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8"));
|
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8"));
|
||||||
expect(entry.retryCount).toBe(1);
|
expect(entry.retryCount).toBe(1);
|
||||||
|
expect(typeof entry.lastAttemptAt).toBe("number");
|
||||||
|
expect(entry.lastAttemptAt).toBeGreaterThan(0);
|
||||||
expect(entry.lastError).toBe("connection refused");
|
expect(entry.lastError).toBe("connection refused");
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -204,28 +206,36 @@ describe("delivery-queue", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("recoverPendingDeliveries", () => {
|
describe("recoverPendingDeliveries", () => {
|
||||||
const noopDelay = async () => {};
|
|
||||||
const baseCfg = {};
|
const baseCfg = {};
|
||||||
const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
|
const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
|
||||||
const enqueueCrashRecoveryEntries = async () => {
|
const enqueueCrashRecoveryEntries = async () => {
|
||||||
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
||||||
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
|
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
|
||||||
};
|
};
|
||||||
const setEntryRetryCount = (id: string, retryCount: number) => {
|
const setEntryState = (
|
||||||
|
id: string,
|
||||||
|
state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number },
|
||||||
|
) => {
|
||||||
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||||
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
|
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
|
||||||
entry.retryCount = retryCount;
|
entry.retryCount = state.retryCount;
|
||||||
|
if (state.lastAttemptAt === undefined) {
|
||||||
|
delete entry.lastAttemptAt;
|
||||||
|
} else {
|
||||||
|
entry.lastAttemptAt = state.lastAttemptAt;
|
||||||
|
}
|
||||||
|
if (state.enqueuedAt !== undefined) {
|
||||||
|
entry.enqueuedAt = state.enqueuedAt;
|
||||||
|
}
|
||||||
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
|
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
|
||||||
};
|
};
|
||||||
const runRecovery = async ({
|
const runRecovery = async ({
|
||||||
deliver,
|
deliver,
|
||||||
log = createLog(),
|
log = createLog(),
|
||||||
delay = noopDelay,
|
|
||||||
maxRecoveryMs,
|
maxRecoveryMs,
|
||||||
}: {
|
}: {
|
||||||
deliver: ReturnType<typeof vi.fn>;
|
deliver: ReturnType<typeof vi.fn>;
|
||||||
log?: ReturnType<typeof createLog>;
|
log?: ReturnType<typeof createLog>;
|
||||||
delay?: (ms: number) => Promise<void>;
|
|
||||||
maxRecoveryMs?: number;
|
maxRecoveryMs?: number;
|
||||||
}) => {
|
}) => {
|
||||||
const result = await recoverPendingDeliveries({
|
const result = await recoverPendingDeliveries({
|
||||||
@@ -233,7 +243,6 @@ describe("delivery-queue", () => {
|
|||||||
log,
|
log,
|
||||||
cfg: baseCfg,
|
cfg: baseCfg,
|
||||||
stateDir: tmpDir,
|
stateDir: tmpDir,
|
||||||
delay,
|
|
||||||
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
|
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
|
||||||
});
|
});
|
||||||
return { result, log };
|
return { result, log };
|
||||||
@@ -261,7 +270,7 @@ describe("delivery-queue", () => {
|
|||||||
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
|
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
|
||||||
tmpDir,
|
tmpDir,
|
||||||
);
|
);
|
||||||
setEntryRetryCount(id, MAX_RETRIES);
|
setEntryState(id, { retryCount: MAX_RETRIES });
|
||||||
|
|
||||||
const deliver = vi.fn();
|
const deliver = vi.fn();
|
||||||
const { result } = await runRecovery({ deliver });
|
const { result } = await runRecovery({ deliver });
|
||||||
@@ -377,29 +386,82 @@ describe("delivery-queue", () => {
|
|||||||
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart"));
|
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart"));
|
||||||
});
|
});
|
||||||
|
|
||||||
it("defers entries when backoff exceeds the recovery budget", async () => {
|
it("defers entries until backoff becomes eligible", async () => {
|
||||||
const id = await enqueueDelivery(
|
const id = await enqueueDelivery(
|
||||||
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
|
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
|
||||||
tmpDir,
|
tmpDir,
|
||||||
);
|
);
|
||||||
setEntryRetryCount(id, 3);
|
setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() });
|
||||||
|
|
||||||
const deliver = vi.fn().mockResolvedValue([]);
|
const deliver = vi.fn().mockResolvedValue([]);
|
||||||
const delay = vi.fn(async () => {});
|
|
||||||
const { result, log } = await runRecovery({
|
const { result, log } = await runRecovery({
|
||||||
deliver,
|
deliver,
|
||||||
delay,
|
maxRecoveryMs: 60_000,
|
||||||
maxRecoveryMs: 1000,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(deliver).not.toHaveBeenCalled();
|
expect(deliver).not.toHaveBeenCalled();
|
||||||
expect(delay).not.toHaveBeenCalled();
|
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
|
||||||
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 1 });
|
|
||||||
|
|
||||||
const remaining = await loadPendingDeliveries(tmpDir);
|
const remaining = await loadPendingDeliveries(tmpDir);
|
||||||
expect(remaining).toHaveLength(1);
|
expect(remaining).toHaveLength(1);
|
||||||
|
|
||||||
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("Backoff"));
|
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("continues past high-backoff entries and recovers ready entries behind them", async () => {
|
||||||
|
const now = Date.now();
|
||||||
|
const blockedId = await enqueueDelivery(
|
||||||
|
{ channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] },
|
||||||
|
tmpDir,
|
||||||
|
);
|
||||||
|
const readyId = await enqueueDelivery(
|
||||||
|
{ channel: "telegram", to: "2", payloads: [{ text: "ready" }] },
|
||||||
|
tmpDir,
|
||||||
|
);
|
||||||
|
|
||||||
|
setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 });
|
||||||
|
setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 });
|
||||||
|
|
||||||
|
const deliver = vi.fn().mockResolvedValue([]);
|
||||||
|
const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 });
|
||||||
|
|
||||||
|
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
|
||||||
|
expect(deliver).toHaveBeenCalledTimes(1);
|
||||||
|
expect(deliver).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }),
|
||||||
|
);
|
||||||
|
|
||||||
|
const remaining = await loadPendingDeliveries(tmpDir);
|
||||||
|
expect(remaining).toHaveLength(1);
|
||||||
|
expect(remaining[0]?.id).toBe(blockedId);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recovers deferred entries on a later restart once backoff elapsed", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const start = new Date("2026-01-01T00:00:00.000Z");
|
||||||
|
vi.setSystemTime(start);
|
||||||
|
|
||||||
|
const id = await enqueueDelivery(
|
||||||
|
{ channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] },
|
||||||
|
tmpDir,
|
||||||
|
);
|
||||||
|
setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() });
|
||||||
|
|
||||||
|
const firstDeliver = vi.fn().mockResolvedValue([]);
|
||||||
|
const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 });
|
||||||
|
expect(firstRun.result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
|
||||||
|
expect(firstDeliver).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
vi.setSystemTime(new Date(start.getTime() + 600_000 + 1));
|
||||||
|
const secondDeliver = vi.fn().mockResolvedValue([]);
|
||||||
|
const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 });
|
||||||
|
expect(secondRun.result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
|
||||||
|
expect(secondDeliver).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
const remaining = await loadPendingDeliveries(tmpDir);
|
||||||
|
expect(remaining).toHaveLength(0);
|
||||||
|
|
||||||
|
vi.useRealTimers();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("returns zeros when queue is empty", async () => {
|
it("returns zeros when queue is empty", async () => {
|
||||||
|
|||||||
Reference in New Issue
Block a user