mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 13:05:04 +00:00
fix(cron): honor maxConcurrentRuns in timer loop (openclaw#22413) thanks @Takhoffman
Verified: - pnpm install --frozen-lockfile - pnpm build - pnpm check - pnpm test:macmini (failed on unrelated baseline test: src/memory/qmd-manager.test.ts > throws when sqlite index is busy) Co-authored-by: Takhoffman <781889+Takhoffman@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman.
|
||||
- Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg.
|
||||
- Auto-reply/Tools: forward `senderIsOwner` through embedded queued/followup runner params so owner-only tools remain available for authorized senders. (#22296) thanks @hcoj.
|
||||
- Agents/Subagents: restore announce-chain delivery to agent injection, defer nested announce output until descendant follow-up content is ready, and prevent descendant deferrals from consuming announce retry budget so deep chains do not drop final completions. (#22223) Thanks @tyler6204.
|
||||
|
||||
@@ -755,4 +755,61 @@ describe("Cron issue regressions", () => {
|
||||
expect(secondDone?.state.lastDurationMs).toBe(20);
|
||||
expect(startedAtEvents).toEqual([dueAt, dueAt + 50]);
|
||||
});
|
||||
|
||||
it("honors cron maxConcurrentRuns for due jobs", async () => {
|
||||
vi.useRealTimers();
|
||||
const store = await makeStorePath();
|
||||
const dueAt = Date.parse("2026-02-06T10:05:01.000Z");
|
||||
const first = createDueIsolatedJob({ id: "parallel-first", nowMs: dueAt, nextRunAtMs: dueAt });
|
||||
const second = createDueIsolatedJob({
|
||||
id: "parallel-second",
|
||||
nowMs: dueAt,
|
||||
nextRunAtMs: dueAt,
|
||||
});
|
||||
await fs.writeFile(
|
||||
store.storePath,
|
||||
JSON.stringify({ version: 1, jobs: [first, second] }, null, 2),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
let now = dueAt;
|
||||
let activeRuns = 0;
|
||||
let peakActiveRuns = 0;
|
||||
const firstRun = createDeferred<{ status: "ok"; summary: string }>();
|
||||
const secondRun = createDeferred<{ status: "ok"; summary: string }>();
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
cronConfig: { maxConcurrentRuns: 2 },
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async (params: { job: { id: string } }) => {
|
||||
activeRuns += 1;
|
||||
peakActiveRuns = Math.max(peakActiveRuns, activeRuns);
|
||||
try {
|
||||
const result =
|
||||
params.job.id === first.id ? await firstRun.promise : await secondRun.promise;
|
||||
now += 10;
|
||||
return result;
|
||||
} finally {
|
||||
activeRuns -= 1;
|
||||
}
|
||||
}),
|
||||
});
|
||||
|
||||
const timerPromise = onTimer(state);
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
expect(peakActiveRuns).toBe(2);
|
||||
|
||||
firstRun.resolve({ status: "ok", summary: "first done" });
|
||||
secondRun.resolve({ status: "ok", summary: "second done" });
|
||||
await timerPromise;
|
||||
|
||||
const jobs = state.store?.jobs ?? [];
|
||||
expect(jobs.find((job) => job.id === first.id)?.state.lastStatus).toBe("ok");
|
||||
expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -38,6 +38,13 @@ type TimedCronRunOutcome = CronRunOutcome &
|
||||
endedAt: number;
|
||||
};
|
||||
|
||||
function resolveRunConcurrency(state: CronServiceState): number {
|
||||
const raw = state.deps.cronConfig?.maxConcurrentRuns;
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw)) {
|
||||
return 1;
|
||||
}
|
||||
return Math.max(1, Math.floor(raw));
|
||||
}
|
||||
/**
|
||||
* Exponential backoff delays (in ms) indexed by consecutive error count.
|
||||
* After the last entry the delay stays constant.
|
||||
@@ -236,9 +243,11 @@ export async function onTimer(state: CronServiceState) {
|
||||
}));
|
||||
});
|
||||
|
||||
const results: TimedCronRunOutcome[] = [];
|
||||
|
||||
for (const { id, job } of dueJobs) {
|
||||
const runDueJob = async (params: {
|
||||
id: string;
|
||||
job: CronJob;
|
||||
}): Promise<TimedCronRunOutcome> => {
|
||||
const { id, job } = params;
|
||||
const startedAt = state.deps.nowMs();
|
||||
job.state.runningAtMs = startedAt;
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
|
||||
@@ -276,27 +285,49 @@ export async function onTimer(state: CronServiceState) {
|
||||
}
|
||||
})()
|
||||
: await executeJobCore(state, job);
|
||||
results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
|
||||
return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() };
|
||||
} catch (err) {
|
||||
state.deps.log.warn(
|
||||
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null },
|
||||
`cron: job failed: ${String(err)}`,
|
||||
);
|
||||
results.push({
|
||||
return {
|
||||
jobId: id,
|
||||
status: "error",
|
||||
error: String(err),
|
||||
startedAt,
|
||||
endedAt: state.deps.nowMs(),
|
||||
});
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (results.length > 0) {
|
||||
const concurrency = Math.min(resolveRunConcurrency(state), Math.max(1, dueJobs.length));
|
||||
const results: (TimedCronRunOutcome | undefined)[] = Array.from({ length: dueJobs.length });
|
||||
let cursor = 0;
|
||||
const workers = Array.from({ length: concurrency }, async () => {
|
||||
for (;;) {
|
||||
const index = cursor++;
|
||||
if (index >= dueJobs.length) {
|
||||
return;
|
||||
}
|
||||
const due = dueJobs[index];
|
||||
if (!due) {
|
||||
return;
|
||||
}
|
||||
results[index] = await runDueJob(due);
|
||||
}
|
||||
});
|
||||
await Promise.all(workers);
|
||||
|
||||
const completedResults: TimedCronRunOutcome[] = results.filter(
|
||||
(entry): entry is TimedCronRunOutcome => entry !== undefined,
|
||||
);
|
||||
|
||||
if (completedResults.length > 0) {
|
||||
await locked(state, async () => {
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
for (const result of results) {
|
||||
for (const result of completedResults) {
|
||||
const job = state.store?.jobs.find((j) => j.id === result.jobId);
|
||||
if (!job) {
|
||||
continue;
|
||||
|
||||
Reference in New Issue
Block a user