mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-04 08:41:27 +00:00
refactor: scope skipQueue to retryTransient path only
Non-retrying direct delivery (structured content / thread) keeps the write-ahead queue so recoverPendingDeliveries can replay after a crash. Addresses review feedback from codex-connector.
This commit is contained in:
@@ -212,6 +212,9 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
|||||||
channel: "telegram",
|
channel: "telegram",
|
||||||
to: "123456",
|
to: "123456",
|
||||||
payloads: [{ text: "Detailed child result, everything finished successfully." }],
|
payloads: [{ text: "Detailed child result, everything finished successfully." }],
|
||||||
|
// Text delivery goes through finalizeTextDelivery which uses
|
||||||
|
// retryTransient: true, so skipQueue must be set.
|
||||||
|
skipQueue: true,
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
@@ -266,7 +269,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
|||||||
expect(state.deliveryAttempted).toBe(false);
|
expect(state.deliveryAttempted).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("direct delivery passes skipQueue=true to avoid duplicate queue entries on transient retry", async () => {
|
it("text delivery (retryTransient path) passes skipQueue=true to avoid duplicate queue entries", async () => {
|
||||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||||
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
||||||
@@ -278,7 +281,8 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
|||||||
expect(state.deliveryAttempted).toBe(true);
|
expect(state.deliveryAttempted).toBe(true);
|
||||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
// The call must include skipQueue: true so transient retries inside
|
// Text delivery goes through finalizeTextDelivery → deliverViaDirect with
|
||||||
|
// retryTransient: true. skipQueue must be set so transient retries inside
|
||||||
// retryTransientDirectCronDelivery do not enqueue additional write-ahead
|
// retryTransientDirectCronDelivery do not enqueue additional write-ahead
|
||||||
// entries that would cause duplicate sends.
|
// entries that would cause duplicate sends.
|
||||||
// See: https://github.com/openclaw/openclaw/issues/40545
|
// See: https://github.com/openclaw/openclaw/issues/40545
|
||||||
@@ -292,6 +296,24 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("structured/thread delivery (non-retryTransient path) keeps write-ahead queue", async () => {
|
||||||
|
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||||
|
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||||
|
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
||||||
|
|
||||||
|
const params = makeBaseParams({ synthesizedText: "Report attached." });
|
||||||
|
// Simulate structured content so useDirectDelivery path is taken (no retryTransient)
|
||||||
|
(params as Record<string, unknown>).deliveryPayloadHasStructuredContent = true;
|
||||||
|
await dispatchCronDelivery(params);
|
||||||
|
|
||||||
|
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||||
|
// Non-retrying path should NOT set skipQueue, preserving crash-recovery
|
||||||
|
// via recoverPendingDeliveries.
|
||||||
|
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||||
|
expect.not.objectContaining({ skipQueue: true }),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
it("transient retry delivers exactly once after initial transient failure", async () => {
|
it("transient retry delivers exactly once after initial transient failure", async () => {
|
||||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||||
|
|||||||
@@ -241,7 +241,7 @@ export async function dispatchCronDelivery(
|
|||||||
agentId: params.agentId,
|
agentId: params.agentId,
|
||||||
sessionKey: params.agentSessionKey,
|
sessionKey: params.agentSessionKey,
|
||||||
});
|
});
|
||||||
const runDelivery = async () =>
|
const runDelivery = async (skipQueue?: boolean) =>
|
||||||
await deliverOutboundPayloads({
|
await deliverOutboundPayloads({
|
||||||
cfg: params.cfgWithAgentDefaults,
|
cfg: params.cfgWithAgentDefaults,
|
||||||
channel: delivery.channel,
|
channel: delivery.channel,
|
||||||
@@ -254,19 +254,21 @@ export async function dispatchCronDelivery(
|
|||||||
bestEffort: params.deliveryBestEffort,
|
bestEffort: params.deliveryBestEffort,
|
||||||
deps: createOutboundSendDeps(params.deps),
|
deps: createOutboundSendDeps(params.deps),
|
||||||
abortSignal: params.abortSignal,
|
abortSignal: params.abortSignal,
|
||||||
// Skip the write-ahead delivery queue for cron direct delivery.
|
// Skip the write-ahead delivery queue only when retrying transient
|
||||||
// retryTransientDirectCronDelivery already handles transient retries;
|
// errors. retryTransientDirectCronDelivery already provides
|
||||||
// without skipQueue each retry attempt enqueues a *new* queue entry,
|
// resilience; without skipQueue each retry attempt enqueues a *new*
|
||||||
// causing duplicate sends when the first attempt actually succeeded
|
// queue entry, causing duplicate sends when the first attempt
|
||||||
// but threw a transient error (e.g. gateway timeout / econnreset).
|
// actually succeeded but threw a transient error (e.g. gateway
|
||||||
|
// timeout / econnreset). Non-retrying callers keep the queue so
|
||||||
|
// recoverPendingDeliveries can replay after a crash.
|
||||||
// See: https://github.com/openclaw/openclaw/issues/40545
|
// See: https://github.com/openclaw/openclaw/issues/40545
|
||||||
skipQueue: true,
|
...(skipQueue ? { skipQueue: true } : {}),
|
||||||
});
|
});
|
||||||
const deliveryResults = options?.retryTransient
|
const deliveryResults = options?.retryTransient
|
||||||
? await retryTransientDirectCronDelivery({
|
? await retryTransientDirectCronDelivery({
|
||||||
jobId: params.job.id,
|
jobId: params.job.id,
|
||||||
signal: params.abortSignal,
|
signal: params.abortSignal,
|
||||||
run: runDelivery,
|
run: () => runDelivery(true),
|
||||||
})
|
})
|
||||||
: await runDelivery();
|
: await runDelivery();
|
||||||
delivered = deliveryResults.length > 0;
|
delivered = deliveryResults.length > 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user