diff --git a/CHANGELOG.md b/CHANGELOG.md index 97dd23edba8..bd611f32f44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman. - Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg. - Auto-reply/Tools: forward `senderIsOwner` through embedded queued/followup runner params so owner-only tools remain available for authorized senders. (#22296) thanks @hcoj. - Agents/Subagents: restore announce-chain delivery to agent injection, defer nested announce output until descendant follow-up content is ready, and prevent descendant deferrals from consuming announce retry budget so deep chains do not drop final completions. (#22223) Thanks @tyler6204. diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 1899f54fc8f..ac122840750 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -755,4 +755,61 @@ describe("Cron issue regressions", () => { expect(secondDone?.state.lastDurationMs).toBe(20); expect(startedAtEvents).toEqual([dueAt, dueAt + 50]); }); + + it("honors cron maxConcurrentRuns for due jobs", async () => { + vi.useRealTimers(); + const store = await makeStorePath(); + const dueAt = Date.parse("2026-02-06T10:05:01.000Z"); + const first = createDueIsolatedJob({ id: "parallel-first", nowMs: dueAt, nextRunAtMs: dueAt }); + const second = createDueIsolatedJob({ + id: "parallel-second", + nowMs: dueAt, + nextRunAtMs: dueAt, + }); + await fs.writeFile( + store.storePath, + JSON.stringify({ version: 1, jobs: [first, second] }, null, 2), + "utf-8", + ); + + let now = dueAt; + let activeRuns = 0; + let peakActiveRuns = 0; + const firstRun = createDeferred<{ status: "ok"; summary: string }>(); + const secondRun = createDeferred<{ status: "ok"; summary: string }>(); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + cronConfig: { maxConcurrentRuns: 2 }, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async (params: { job: { id: string } }) => { + activeRuns += 1; + peakActiveRuns = Math.max(peakActiveRuns, activeRuns); + try { + const result = + params.job.id === first.id ? await firstRun.promise : await secondRun.promise; + now += 10; + return result; + } finally { + activeRuns -= 1; + } + }), + }); + + const timerPromise = onTimer(state); + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(peakActiveRuns).toBe(2); + + firstRun.resolve({ status: "ok", summary: "first done" }); + secondRun.resolve({ status: "ok", summary: "second done" }); + await timerPromise; + + const jobs = state.store?.jobs ?? []; + expect(jobs.find((job) => job.id === first.id)?.state.lastStatus).toBe("ok"); + expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok"); + }); }); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 18fda9aa78e..a51813bbc6c 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -38,6 +38,13 @@ type TimedCronRunOutcome = CronRunOutcome & endedAt: number; }; +function resolveRunConcurrency(state: CronServiceState): number { + const raw = state.deps.cronConfig?.maxConcurrentRuns; + if (typeof raw !== "number" || !Number.isFinite(raw)) { + return 1; + } + return Math.max(1, Math.floor(raw)); +} /** * Exponential backoff delays (in ms) indexed by consecutive error count. * After the last entry the delay stays constant. @@ -236,9 +243,11 @@ export async function onTimer(state: CronServiceState) { })); }); - const results: TimedCronRunOutcome[] = []; - - for (const { id, job } of dueJobs) { + const runDueJob = async (params: { + id: string; + job: CronJob; + }): Promise => { + const { id, job } = params; const startedAt = state.deps.nowMs(); job.state.runningAtMs = startedAt; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); @@ -276,27 +285,49 @@ export async function onTimer(state: CronServiceState) { } })() : await executeJobCore(state, job); - results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }); + return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }; } catch (err) { state.deps.log.warn( { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null }, `cron: job failed: ${String(err)}`, ); - results.push({ + return { jobId: id, status: "error", error: String(err), startedAt, endedAt: state.deps.nowMs(), - }); + }; } - } + }; - if (results.length > 0) { + const concurrency = Math.min(resolveRunConcurrency(state), Math.max(1, dueJobs.length)); + const results: (TimedCronRunOutcome | undefined)[] = Array.from({ length: dueJobs.length }); + let cursor = 0; + const workers = Array.from({ length: concurrency }, async () => { + for (;;) { + const index = cursor++; + if (index >= dueJobs.length) { + return; + } + const due = dueJobs[index]; + if (!due) { + return; + } + results[index] = await runDueJob(due); + } + }); + await Promise.all(workers); + + const completedResults: TimedCronRunOutcome[] = results.filter( + (entry): entry is TimedCronRunOutcome => entry !== undefined, + ); + + if (completedResults.length > 0) { await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - for (const result of results) { + for (const result of completedResults) { const job = state.store?.jobs.find((j) => j.id === result.jobId); if (!job) { continue;