fix(agents): honor explicit rate-limit cooldown probes in fallback runs

This commit is contained in:
Vignesh Natarajan
2026-03-05 20:02:36 -08:00
parent ce71fac7d6
commit d45353f95b
14 changed files with 150 additions and 25 deletions

View File

@@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai
- Subagents/kill-complete announce race: when a late `subagent-complete` lifecycle event arrives after an earlier kill marker, clear stale kill suppression/cleanup flags and re-run announce cleanup so finished runs no longer get silently swallowed. (#37024) Thanks @cmfinlan. - Subagents/kill-complete announce race: when a late `subagent-complete` lifecycle event arrives after an earlier kill marker, clear stale kill suppression/cleanup flags and re-run announce cleanup so finished runs no longer get silently swallowed. (#37024) Thanks @cmfinlan.
- Agents/tool-result cleanup timeout hardening: on embedded runner teardown idle timeouts, clear pending tool-call state without persisting synthetic `missing tool result` entries, preventing timeout cleanups from poisoning follow-up turns; adds regression coverage for timeout clear-vs-flush behavior. (#37081) Thanks @Coyote-Den. - Agents/tool-result cleanup timeout hardening: on embedded runner teardown idle timeouts, clear pending tool-call state without persisting synthetic `missing tool result` entries, preventing timeout cleanups from poisoning follow-up turns; adds regression coverage for timeout clear-vs-flush behavior. (#37081) Thanks @Coyote-Den.
- Agents/openai-completions stream timeout hardening: ensure runtime undici global dispatchers use extended streaming body/header timeouts (including env-proxy dispatcher mode) before embedded runs, reducing forced mid-stream `terminated` failures on long generations; adds regression coverage for dispatcher selection and idempotent reconfiguration. (#9708) Thanks @scottchguard. - Agents/openai-completions stream timeout hardening: ensure runtime undici global dispatchers use extended streaming body/header timeouts (including env-proxy dispatcher mode) before embedded runs, reducing forced mid-stream `terminated` failures on long generations; adds regression coverage for dispatcher selection and idempotent reconfiguration. (#9708) Thanks @scottchguard.
- Agents/fallback cooldown probe execution: thread explicit rate-limit cooldown probe intent from model fallback into embedded runner auth-profile selection so same-provider fallback attempts can actually run when all profiles are cooldowned for `rate_limit` (instead of failing pre-run as `No available auth profile`), while preserving default cooldown skip behavior and adding regression tests at both fallback and runner layers. (#13623) Thanks @asfura.
- Cron/OpenAI Codex OAuth refresh hardening: when `openai-codex` token refresh fails specifically on account-id extraction, reuse the cached access token instead of failing the run immediately, with regression coverage to keep non-Codex and unrelated refresh failures unchanged. (#36604) Thanks @laulopezreal. - Cron/OpenAI Codex OAuth refresh hardening: when `openai-codex` token refresh fails specifically on account-id extraction, reuse the cached access token instead of failing the run immediately, with regression coverage to keep non-Codex and unrelated refresh failures unchanged. (#36604) Thanks @laulopezreal.
- Gateway/remote WS break-glass hostname support: honor `OPENCLAW_ALLOW_INSECURE_PRIVATE_WS=1` for `ws://` hostname URLs (not only private IP literals) across onboarding validation and runtime gateway connection checks, while still rejecting public IP literals and non-unicast IPv6 endpoints. (#36930) Thanks @manju-rn. - Gateway/remote WS break-glass hostname support: honor `OPENCLAW_ALLOW_INSECURE_PRIVATE_WS=1` for `ws://` hostname URLs (not only private IP literals) across onboarding validation and runtime gateway connection checks, while still rejecting public IP literals and non-unicast IPv6 endpoints. (#36930) Thanks @manju-rn.
- Routing/binding lookup scalability: pre-index route bindings by channel/account and avoid full binding-list rescans on channel-account cache rollover, preventing multi-second `resolveAgentRoute` stalls in large binding configurations. (#36915) Thanks @songchenghao. - Routing/binding lookup scalability: pre-index route bindings by channel/account and avoid full binding-list rescans on channel-account cache rollover, preventing multi-second `resolveAgentRoute` stalls in large binding configurations. (#36915) Thanks @songchenghao.

View File

@@ -52,7 +52,9 @@ function expectPrimaryProbeSuccess(
) { ) {
expect(result.result).toBe(expectedResult); expect(result.result).toBe(expectedResult);
expect(run).toHaveBeenCalledTimes(1); expect(run).toHaveBeenCalledTimes(1);
expect(run).toHaveBeenCalledWith("openai", "gpt-4.1-mini"); expect(run).toHaveBeenCalledWith("openai", "gpt-4.1-mini", {
allowRateLimitCooldownProbe: true,
});
} }
describe("runWithModelFallback probe logic", () => { describe("runWithModelFallback probe logic", () => {
@@ -197,8 +199,12 @@ describe("runWithModelFallback probe logic", () => {
expect(result.result).toBe("fallback-ok"); expect(result.result).toBe("fallback-ok");
expect(run).toHaveBeenCalledTimes(2); expect(run).toHaveBeenCalledTimes(2);
expect(run).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini"); expect(run).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini", {
expect(run).toHaveBeenNthCalledWith(2, "anthropic", "claude-haiku-3-5"); allowRateLimitCooldownProbe: true,
});
expect(run).toHaveBeenNthCalledWith(2, "anthropic", "claude-haiku-3-5", {
allowRateLimitCooldownProbe: true,
});
}); });
it("throttles probe when called within 30s interval", async () => { it("throttles probe when called within 30s interval", async () => {
@@ -319,7 +325,11 @@ describe("runWithModelFallback probe logic", () => {
run, run,
}); });
expect(run).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini"); expect(run).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini", {
expect(run).toHaveBeenNthCalledWith(2, "openai", "gpt-4.1-mini"); allowRateLimitCooldownProbe: true,
});
expect(run).toHaveBeenNthCalledWith(2, "openai", "gpt-4.1-mini", {
allowRateLimitCooldownProbe: true,
});
}); });
}); });

View File

@@ -1116,7 +1116,9 @@ describe("runWithModelFallback", () => {
expect(result.result).toBe("sonnet success"); expect(result.result).toBe("sonnet success");
expect(run).toHaveBeenCalledTimes(1); // Primary skipped, fallback attempted expect(run).toHaveBeenCalledTimes(1); // Primary skipped, fallback attempted
expect(run).toHaveBeenNthCalledWith(1, "anthropic", "claude-sonnet-4-5"); expect(run).toHaveBeenNthCalledWith(1, "anthropic", "claude-sonnet-4-5", {
allowRateLimitCooldownProbe: true,
});
}); });
it("skips same-provider models on auth cooldown but still tries no-profile fallback providers", async () => { it("skips same-provider models on auth cooldown but still tries no-profile fallback providers", async () => {
@@ -1221,7 +1223,9 @@ describe("runWithModelFallback", () => {
expect(result.result).toBe("groq success"); expect(result.result).toBe("groq success");
expect(run).toHaveBeenCalledTimes(2); expect(run).toHaveBeenCalledTimes(2);
expect(run).toHaveBeenNthCalledWith(1, "anthropic", "claude-sonnet-4-5"); // Rate limit allows attempt expect(run).toHaveBeenNthCalledWith(1, "anthropic", "claude-sonnet-4-5", {
allowRateLimitCooldownProbe: true,
}); // Rate limit allows attempt
expect(run).toHaveBeenNthCalledWith(2, "groq", "llama-3.3-70b-versatile"); // Cross-provider works expect(run).toHaveBeenNthCalledWith(2, "groq", "llama-3.3-70b-versatile"); // Cross-provider works
}); });
}); });

View File

@@ -33,6 +33,16 @@ type ModelCandidate = {
model: string; model: string;
}; };
export type ModelFallbackRunOptions = {
allowRateLimitCooldownProbe?: boolean;
};
type ModelFallbackRunFn<T> = (
provider: string,
model: string,
options?: ModelFallbackRunOptions,
) => Promise<T>;
type FallbackAttempt = { type FallbackAttempt = {
provider: string; provider: string;
model: string; model: string;
@@ -124,14 +134,18 @@ function buildFallbackSuccess<T>(params: {
} }
async function runFallbackCandidate<T>(params: { async function runFallbackCandidate<T>(params: {
run: (provider: string, model: string) => Promise<T>; run: ModelFallbackRunFn<T>;
provider: string; provider: string;
model: string; model: string;
options?: ModelFallbackRunOptions;
}): Promise<{ ok: true; result: T } | { ok: false; error: unknown }> { }): Promise<{ ok: true; result: T } | { ok: false; error: unknown }> {
try { try {
const result = params.options
? await params.run(params.provider, params.model, params.options)
: await params.run(params.provider, params.model);
return { return {
ok: true, ok: true,
result: await params.run(params.provider, params.model), result,
}; };
} catch (err) { } catch (err) {
if (shouldRethrowAbort(err)) { if (shouldRethrowAbort(err)) {
@@ -142,15 +156,17 @@ async function runFallbackCandidate<T>(params: {
} }
async function runFallbackAttempt<T>(params: { async function runFallbackAttempt<T>(params: {
run: (provider: string, model: string) => Promise<T>; run: ModelFallbackRunFn<T>;
provider: string; provider: string;
model: string; model: string;
attempts: FallbackAttempt[]; attempts: FallbackAttempt[];
options?: ModelFallbackRunOptions;
}): Promise<{ success: ModelFallbackRunResult<T> } | { error: unknown }> { }): Promise<{ success: ModelFallbackRunResult<T> } | { error: unknown }> {
const runResult = await runFallbackCandidate({ const runResult = await runFallbackCandidate({
run: params.run, run: params.run,
provider: params.provider, provider: params.provider,
model: params.model, model: params.model,
options: params.options,
}); });
if (runResult.ok) { if (runResult.ok) {
return { return {
@@ -439,7 +455,7 @@ export async function runWithModelFallback<T>(params: {
agentDir?: string; agentDir?: string;
/** Optional explicit fallbacks list; when provided (even empty), replaces agents.defaults.model.fallbacks. */ /** Optional explicit fallbacks list; when provided (even empty), replaces agents.defaults.model.fallbacks. */
fallbacksOverride?: string[]; fallbacksOverride?: string[];
run: (provider: string, model: string) => Promise<T>; run: ModelFallbackRunFn<T>;
onError?: ModelFallbackErrorHandler; onError?: ModelFallbackErrorHandler;
}): Promise<ModelFallbackRunResult<T>> { }): Promise<ModelFallbackRunResult<T>> {
const candidates = resolveFallbackCandidates({ const candidates = resolveFallbackCandidates({
@@ -458,6 +474,7 @@ export async function runWithModelFallback<T>(params: {
for (let i = 0; i < candidates.length; i += 1) { for (let i = 0; i < candidates.length; i += 1) {
const candidate = candidates[i]; const candidate = candidates[i];
let runOptions: ModelFallbackRunOptions | undefined;
if (authStore) { if (authStore) {
const profileIds = resolveAuthProfileOrder({ const profileIds = resolveAuthProfileOrder({
cfg: params.cfg, cfg: params.cfg,
@@ -497,10 +514,18 @@ export async function runWithModelFallback<T>(params: {
if (decision.markProbe) { if (decision.markProbe) {
lastProbeAttempt.set(probeThrottleKey, now); lastProbeAttempt.set(probeThrottleKey, now);
} }
if (decision.reason === "rate_limit") {
runOptions = { allowRateLimitCooldownProbe: true };
}
} }
} }
const attemptRun = await runFallbackAttempt({ run: params.run, ...candidate, attempts }); const attemptRun = await runFallbackAttempt({
run: params.run,
...candidate,
attempts,
options: runOptions,
});
if ("success" in attemptRun) { if ("success" in attemptRun) {
return attemptRun.success; return attemptRun.success;
} }

View File

@@ -829,6 +829,46 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
}); });
}); });
it("can probe one cooldowned profile when rate-limit cooldown probe is explicitly allowed", async () => {
await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => {
await writeAuthStore(agentDir, {
usageStats: {
"openai:p1": { lastUsed: 1, cooldownUntil: now + 60 * 60 * 1000 },
"openai:p2": { lastUsed: 2, cooldownUntil: now + 60 * 60 * 1000 },
},
});
runEmbeddedAttemptMock.mockResolvedValueOnce(
makeAttempt({
assistantTexts: ["ok"],
lastAssistant: buildAssistant({
stopReason: "stop",
content: [{ type: "text", text: "ok" }],
}),
}),
);
const result = await runEmbeddedPiAgent({
sessionId: "session:test",
sessionKey: "agent:test:cooldown-probe",
sessionFile: path.join(workspaceDir, "session.jsonl"),
workspaceDir,
agentDir,
config: makeConfig({ fallbacks: ["openai/mock-2"] }),
prompt: "hello",
provider: "openai",
model: "mock-1",
authProfileIdSource: "auto",
allowRateLimitCooldownProbe: true,
timeoutMs: 5_000,
runId: "run:cooldown-probe",
});
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1);
expect(result.payloads?.[0]?.text ?? "").toContain("ok");
});
});
it("treats agent-level fallbacks as configured when defaults have none", async () => { it("treats agent-level fallbacks as configured when defaults have none", async () => {
await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => { await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => {
await writeAuthStore(agentDir, { await writeAuthStore(agentDir, {

View File

@@ -633,15 +633,39 @@ export async function runEmbeddedPiAgent(
}; };
try { try {
const autoProfileCandidates = profileCandidates.filter(
(candidate): candidate is string =>
typeof candidate === "string" && candidate.length > 0 && candidate !== lockedProfileId,
);
const allAutoProfilesInCooldown =
autoProfileCandidates.length > 0 &&
autoProfileCandidates.every((candidate) => isProfileInCooldown(authStore, candidate));
const unavailableReason = allAutoProfilesInCooldown
? (resolveProfilesUnavailableReason({
store: authStore,
profileIds: autoProfileCandidates,
}) ?? "rate_limit")
: null;
const allowRateLimitCooldownProbe =
params.allowRateLimitCooldownProbe === true &&
allAutoProfilesInCooldown &&
unavailableReason === "rate_limit";
let didRateLimitCooldownProbe = false;
while (profileIndex < profileCandidates.length) { while (profileIndex < profileCandidates.length) {
const candidate = profileCandidates[profileIndex]; const candidate = profileCandidates[profileIndex];
if ( const inCooldown =
candidate && candidate && candidate !== lockedProfileId && isProfileInCooldown(authStore, candidate);
candidate !== lockedProfileId && if (inCooldown) {
isProfileInCooldown(authStore, candidate) if (allowRateLimitCooldownProbe && !didRateLimitCooldownProbe) {
) { didRateLimitCooldownProbe = true;
profileIndex += 1; log.warn(
continue; `probing cooldowned auth profile for ${provider}/${modelId} due to rate_limit unavailability`,
);
} else {
profileIndex += 1;
continue;
}
} }
await applyApiKeyInfo(profileCandidates[profileIndex]); await applyApiKeyInfo(profileCandidates[profileIndex]);
break; break;

View File

@@ -113,4 +113,12 @@ export type RunEmbeddedPiAgentParams = {
streamParams?: AgentStreamParams; streamParams?: AgentStreamParams;
ownerNumbers?: string[]; ownerNumbers?: string[];
enforceFinalTag?: boolean; enforceFinalTag?: boolean;
/**
* Allow a single run attempt even when all auth profiles are in cooldown,
* but only for inferred `rate_limit` cooldowns.
*
* This is used by model fallback when trying sibling models on providers
* where rate limits are often model-scoped.
*/
allowRateLimitCooldownProbe?: boolean;
}; };

View File

@@ -186,7 +186,7 @@ export async function runAgentTurnWithFallback(params: {
const onToolResult = params.opts?.onToolResult; const onToolResult = params.opts?.onToolResult;
const fallbackResult = await runWithModelFallback({ const fallbackResult = await runWithModelFallback({
...resolveModelFallbackOptions(params.followupRun.run), ...resolveModelFallbackOptions(params.followupRun.run),
run: (provider, model) => { run: (provider, model, runOptions) => {
// Notify that model selection is complete (including after fallback). // Notify that model selection is complete (including after fallback).
// This allows responsePrefix template interpolation with the actual model. // This allows responsePrefix template interpolation with the actual model.
params.opts?.onModelSelected?.({ params.opts?.onModelSelected?.({
@@ -304,6 +304,7 @@ export async function runAgentTurnWithFallback(params: {
model, model,
runId, runId,
authProfile, authProfile,
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
}); });
return (async () => { return (async () => {
const result = await runEmbeddedPiAgent({ const result = await runEmbeddedPiAgent({

View File

@@ -474,7 +474,7 @@ export async function runMemoryFlushIfNeeded(params: {
try { try {
await runWithModelFallback({ await runWithModelFallback({
...resolveModelFallbackOptions(params.followupRun.run), ...resolveModelFallbackOptions(params.followupRun.run),
run: async (provider, model) => { run: async (provider, model, runOptions) => {
const { authProfile, embeddedContext, senderContext } = buildEmbeddedRunContexts({ const { authProfile, embeddedContext, senderContext } = buildEmbeddedRunContexts({
run: params.followupRun.run, run: params.followupRun.run,
sessionCtx: params.sessionCtx, sessionCtx: params.sessionCtx,
@@ -487,6 +487,7 @@ export async function runMemoryFlushIfNeeded(params: {
model, model,
runId: flushRunId, runId: flushRunId,
authProfile, authProfile,
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
}); });
const result = await runEmbeddedPiAgent({ const result = await runEmbeddedPiAgent({
...embeddedContext, ...embeddedContext,

View File

@@ -166,6 +166,7 @@ export function buildEmbeddedRunBaseParams(params: {
model: string; model: string;
runId: string; runId: string;
authProfile: ReturnType<typeof resolveProviderScopedAuthProfile>; authProfile: ReturnType<typeof resolveProviderScopedAuthProfile>;
allowRateLimitCooldownProbe?: boolean;
}) { }) {
return { return {
sessionFile: params.run.sessionFile, sessionFile: params.run.sessionFile,
@@ -186,6 +187,7 @@ export function buildEmbeddedRunBaseParams(params: {
bashElevated: params.run.bashElevated, bashElevated: params.run.bashElevated,
timeoutMs: params.run.timeoutMs, timeoutMs: params.run.timeoutMs,
runId: params.runId, runId: params.runId,
allowRateLimitCooldownProbe: params.allowRateLimitCooldownProbe,
}; };
} }

View File

@@ -157,7 +157,7 @@ export function createFollowupRunner(params: {
agentId: queued.run.agentId, agentId: queued.run.agentId,
sessionKey: queued.run.sessionKey, sessionKey: queued.run.sessionKey,
}), }),
run: async (provider, model) => { run: async (provider, model, runOptions) => {
const authProfile = resolveRunAuthProfile(queued.run, provider); const authProfile = resolveRunAuthProfile(queued.run, provider);
const result = await runEmbeddedPiAgent({ const result = await runEmbeddedPiAgent({
sessionId: queued.run.sessionId, sessionId: queued.run.sessionId,
@@ -200,6 +200,7 @@ export function createFollowupRunner(params: {
bashElevated: queued.run.bashElevated, bashElevated: queued.run.bashElevated,
timeoutMs: queued.run.timeoutMs, timeoutMs: queued.run.timeoutMs,
runId, runId,
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
blockReplyBreak: queued.run.blockReplyBreak, blockReplyBreak: queued.run.blockReplyBreak,
bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature: bootstrapPromptWarningSignature:

View File

@@ -174,6 +174,7 @@ function runAgentAttempt(params: {
primaryProvider: string; primaryProvider: string;
sessionStore?: Record<string, SessionEntry>; sessionStore?: Record<string, SessionEntry>;
storePath?: string; storePath?: string;
allowRateLimitCooldownProbe?: boolean;
}) { }) {
const effectivePrompt = resolveFallbackRetryPrompt({ const effectivePrompt = resolveFallbackRetryPrompt({
body: params.body, body: params.body,
@@ -324,6 +325,7 @@ function runAgentAttempt(params: {
inputProvenance: params.opts.inputProvenance, inputProvenance: params.opts.inputProvenance,
streamParams: params.opts.streamParams, streamParams: params.opts.streamParams,
agentDir: params.agentDir, agentDir: params.agentDir,
allowRateLimitCooldownProbe: params.allowRateLimitCooldownProbe,
onAgentEvent: params.onAgentEvent, onAgentEvent: params.onAgentEvent,
bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature, bootstrapPromptWarningSignature,
@@ -838,7 +840,7 @@ async function agentCommandInternal(
model, model,
agentDir, agentDir,
fallbacksOverride: effectiveFallbacksOverride, fallbacksOverride: effectiveFallbacksOverride,
run: (providerOverride, modelOverride) => { run: (providerOverride, modelOverride, runOptions) => {
const isFallbackRetry = fallbackAttemptIndex > 0; const isFallbackRetry = fallbackAttemptIndex > 0;
fallbackAttemptIndex += 1; fallbackAttemptIndex += 1;
return runAgentAttempt({ return runAgentAttempt({
@@ -866,6 +868,7 @@ async function agentCommandInternal(
primaryProvider: provider, primaryProvider: provider,
sessionStore, sessionStore,
storePath, storePath,
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
onAgentEvent: (evt) => { onAgentEvent: (evt) => {
// Track lifecycle end for fallback emission below. // Track lifecycle end for fallback emission below.
if ( if (

View File

@@ -468,7 +468,7 @@ export async function runCronIsolatedAgentTurn(params: {
agentDir, agentDir,
fallbacksOverride: fallbacksOverride:
payloadFallbacks ?? resolveAgentModelFallbacksOverride(params.cfg, agentId), payloadFallbacks ?? resolveAgentModelFallbacksOverride(params.cfg, agentId),
run: async (providerOverride, modelOverride) => { run: async (providerOverride, modelOverride, runOptions) => {
if (abortSignal?.aborted) { if (abortSignal?.aborted) {
throw new Error(abortReason()); throw new Error(abortReason());
} }
@@ -534,6 +534,7 @@ export async function runCronIsolatedAgentTurn(params: {
// be blocked by a target it cannot satisfy (#27898). // be blocked by a target it cannot satisfy (#27898).
requireExplicitMessageTarget: deliveryRequested && resolvedDelivery.ok, requireExplicitMessageTarget: deliveryRequested && resolvedDelivery.ok,
disableMessageTool: deliveryRequested || deliveryPlan.mode === "none", disableMessageTool: deliveryRequested || deliveryPlan.mode === "none",
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
abortSignal, abortSignal,
bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature, bootstrapPromptWarningSignature,

View File

@@ -1,7 +1,11 @@
export async function runWithModelFallback(params: { export async function runWithModelFallback(params: {
provider: string; provider: string;
model: string; model: string;
run: (provider: string, model: string) => Promise<unknown>; run: (
provider: string,
model: string,
options?: { allowRateLimitCooldownProbe?: boolean },
) => Promise<unknown>;
}) { }) {
return { return {
result: await params.run(params.provider, params.model), result: await params.run(params.provider, params.model),