refactor: scope skipQueue to retryTransient path only

Non-retrying direct delivery (structured content / thread) keeps the
write-ahead queue so recoverPendingDeliveries can replay after a crash.

Addresses review feedback from codex-connector.
This commit is contained in:
openperf
2026-03-09 14:07:34 +08:00
parent 157769f83f
commit ea5ae5c5da
2 changed files with 34 additions and 10 deletions

View File

@@ -212,6 +212,9 @@ describe("dispatchCronDelivery — double-announce guard", () => {
channel: "telegram", channel: "telegram",
to: "123456", to: "123456",
payloads: [{ text: "Detailed child result, everything finished successfully." }], payloads: [{ text: "Detailed child result, everything finished successfully." }],
// Text delivery goes through finalizeTextDelivery which uses
// retryTransient: true, so skipQueue must be set.
skipQueue: true,
}), }),
); );
}); });
@@ -266,7 +269,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(state.deliveryAttempted).toBe(false); expect(state.deliveryAttempted).toBe(false);
}); });
it("direct delivery passes skipQueue=true to avoid duplicate queue entries on transient retry", async () => { it("text delivery (retryTransient path) passes skipQueue=true to avoid duplicate queue entries", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
@@ -278,7 +281,8 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(state.deliveryAttempted).toBe(true); expect(state.deliveryAttempted).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
// The call must include skipQueue: true so transient retries inside // Text delivery goes through finalizeTextDelivery → deliverViaDirect with
// retryTransient: true. skipQueue must be set so transient retries inside
// retryTransientDirectCronDelivery do not enqueue additional write-ahead // retryTransientDirectCronDelivery do not enqueue additional write-ahead
// entries that would cause duplicate sends. // entries that would cause duplicate sends.
// See: https://github.com/openclaw/openclaw/issues/40545 // See: https://github.com/openclaw/openclaw/issues/40545
@@ -292,6 +296,24 @@ describe("dispatchCronDelivery — double-announce guard", () => {
); );
}); });
it("structured/thread delivery (non-retryTransient path) keeps write-ahead queue", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Report attached." });
// Simulate structured content so useDirectDelivery path is taken (no retryTransient)
(params as Record<string, unknown>).deliveryPayloadHasStructuredContent = true;
await dispatchCronDelivery(params);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
// Non-retrying path should NOT set skipQueue, preserving crash-recovery
// via recoverPendingDeliveries.
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
expect.not.objectContaining({ skipQueue: true }),
);
});
it("transient retry delivers exactly once after initial transient failure", async () => { it("transient retry delivers exactly once after initial transient failure", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);

View File

@@ -241,7 +241,7 @@ export async function dispatchCronDelivery(
agentId: params.agentId, agentId: params.agentId,
sessionKey: params.agentSessionKey, sessionKey: params.agentSessionKey,
}); });
const runDelivery = async () => const runDelivery = async (skipQueue?: boolean) =>
await deliverOutboundPayloads({ await deliverOutboundPayloads({
cfg: params.cfgWithAgentDefaults, cfg: params.cfgWithAgentDefaults,
channel: delivery.channel, channel: delivery.channel,
@@ -254,19 +254,21 @@ export async function dispatchCronDelivery(
bestEffort: params.deliveryBestEffort, bestEffort: params.deliveryBestEffort,
deps: createOutboundSendDeps(params.deps), deps: createOutboundSendDeps(params.deps),
abortSignal: params.abortSignal, abortSignal: params.abortSignal,
// Skip the write-ahead delivery queue for cron direct delivery. // Skip the write-ahead delivery queue only when retrying transient
// retryTransientDirectCronDelivery already handles transient retries; // errors. retryTransientDirectCronDelivery already provides
// without skipQueue each retry attempt enqueues a *new* queue entry, // resilience; without skipQueue each retry attempt enqueues a *new*
// causing duplicate sends when the first attempt actually succeeded // queue entry, causing duplicate sends when the first attempt
// but threw a transient error (e.g. gateway timeout / econnreset). // actually succeeded but threw a transient error (e.g. gateway
// timeout / econnreset). Non-retrying callers keep the queue so
// recoverPendingDeliveries can replay after a crash.
// See: https://github.com/openclaw/openclaw/issues/40545 // See: https://github.com/openclaw/openclaw/issues/40545
skipQueue: true, ...(skipQueue ? { skipQueue: true } : {}),
}); });
const deliveryResults = options?.retryTransient const deliveryResults = options?.retryTransient
? await retryTransientDirectCronDelivery({ ? await retryTransientDirectCronDelivery({
jobId: params.job.id, jobId: params.job.id,
signal: params.abortSignal, signal: params.abortSignal,
run: runDelivery, run: () => runDelivery(true),
}) })
: await runDelivery(); : await runDelivery();
delivered = deliveryResults.length > 0; delivered = deliveryResults.length > 0;