mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 20:04:32 +00:00
Cron: keep list/status responsive during startup catch-up
This commit is contained in:
@@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
### Fixes
|
### Fixes
|
||||||
|
|
||||||
- Agents/Ollama: preserve unsafe integer tool-call arguments as exact strings during NDJSON parsing, preventing large numeric IDs from being rounded before tool execution. (#23170) Thanks @BestJoester.
|
- Agents/Ollama: preserve unsafe integer tool-call arguments as exact strings during NDJSON parsing, preventing large numeric IDs from being rounded before tool execution. (#23170) Thanks @BestJoester.
|
||||||
|
- Cron/Gateway: keep `cron.list` and `cron.status` responsive during startup catch-up by avoiding a long-held cron lock while missed jobs execute. (#23106) Thanks @jayleekr.
|
||||||
- Gateway/Pairing: treat operator.admin pairing tokens as satisfying operator.write requests so legacy devices stop looping through scope-upgrade prompts introduced in 2026.2.19. (#23125, #23006) Thanks @vignesh07.
|
- Gateway/Pairing: treat operator.admin pairing tokens as satisfying operator.write requests so legacy devices stop looping through scope-upgrade prompts introduced in 2026.2.19. (#23125, #23006) Thanks @vignesh07.
|
||||||
- Memory/QMD: add optional `memory.qmd.mcporter` search routing so QMD `query/search/vsearch` can run through mcporter keep-alive flows (including multi-collection paths) to reduce cold starts, while keeping searches on agent-scoped QMD state for consistent recall. (#19617) Thanks @nicole-luxe and @vignesh07.
|
- Memory/QMD: add optional `memory.qmd.mcporter` search routing so QMD `query/search/vsearch` can run through mcporter keep-alive flows (including multi-collection paths) to reduce cold starts, while keeping searches on agent-scoped QMD state for consistent recall. (#19617) Thanks @nicole-luxe and @vignesh07.
|
||||||
- Chat/UI: strip inline reply/audio directive tags (`[[reply_to_current]]`, `[[reply_to:<id>]]`, `[[audio_as_voice]]`) from displayed chat history, live chat event output, and session preview snippets so control tags no longer leak into user-visible surfaces.
|
- Chat/UI: strip inline reply/audio directive tags (`[[reply_to_current]]`, `[[reply_to:<id>]]`, `[[audio_as_voice]]`) from displayed chat history, live chat event output, and session preview snippets so control tags no longer leak into user-visible surfaces.
|
||||||
|
|||||||
@@ -11,6 +11,22 @@ const noopLogger = {
|
|||||||
error: vi.fn(),
|
error: vi.fn(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
async function withTimeout<T>(promise: Promise<T>, timeoutMs: number, label: string): Promise<T> {
|
||||||
|
let timeout: NodeJS.Timeout | undefined;
|
||||||
|
try {
|
||||||
|
return await Promise.race([
|
||||||
|
promise,
|
||||||
|
new Promise<T>((_resolve, reject) => {
|
||||||
|
timeout = setTimeout(() => reject(new Error(`${label} timed out`)), timeoutMs);
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
} finally {
|
||||||
|
if (timeout) {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function makeStorePath() {
|
async function makeStorePath() {
|
||||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
|
||||||
return {
|
return {
|
||||||
@@ -135,4 +151,86 @@ describe("CronService read ops while job is running", () => {
|
|||||||
await store.cleanup();
|
await store.cleanup();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("keeps list and status responsive during startup catch-up runs", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
const requestHeartbeatNow = vi.fn();
|
||||||
|
const nowMs = Date.parse("2025-12-13T00:00:00.000Z");
|
||||||
|
|
||||||
|
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||||
|
await fs.writeFile(
|
||||||
|
store.storePath,
|
||||||
|
JSON.stringify({
|
||||||
|
version: 1,
|
||||||
|
jobs: [
|
||||||
|
{
|
||||||
|
id: "startup-catchup",
|
||||||
|
name: "startup catch-up",
|
||||||
|
enabled: true,
|
||||||
|
createdAtMs: nowMs - 86_400_000,
|
||||||
|
updatedAtMs: nowMs - 86_400_000,
|
||||||
|
schedule: { kind: "at", at: new Date(nowMs - 60_000).toISOString() },
|
||||||
|
sessionTarget: "isolated",
|
||||||
|
wakeMode: "next-heartbeat",
|
||||||
|
payload: { kind: "agentTurn", message: "startup replay" },
|
||||||
|
delivery: { mode: "none" },
|
||||||
|
state: { nextRunAtMs: nowMs - 60_000 },
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
"utf-8",
|
||||||
|
);
|
||||||
|
|
||||||
|
let resolveRun:
|
||||||
|
| ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void)
|
||||||
|
| undefined;
|
||||||
|
let resolveRunStarted: (() => void) | undefined;
|
||||||
|
const runStarted = new Promise<void>((resolve) => {
|
||||||
|
resolveRunStarted = resolve;
|
||||||
|
});
|
||||||
|
|
||||||
|
const runIsolatedAgentJob = vi.fn(async () => {
|
||||||
|
resolveRunStarted?.();
|
||||||
|
return await new Promise<{
|
||||||
|
status: "ok" | "error" | "skipped";
|
||||||
|
summary?: string;
|
||||||
|
error?: string;
|
||||||
|
}>((resolve) => {
|
||||||
|
resolveRun = resolve;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
nowMs: () => nowMs,
|
||||||
|
enqueueSystemEvent,
|
||||||
|
requestHeartbeatNow,
|
||||||
|
runIsolatedAgentJob,
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
const startPromise = cron.start();
|
||||||
|
await runStarted;
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
withTimeout(cron.list({ includeDisabled: true }), 300, "cron.list during startup"),
|
||||||
|
).resolves.toBeTypeOf("object");
|
||||||
|
await expect(withTimeout(cron.status(), 300, "cron.status during startup")).resolves.toEqual(
|
||||||
|
expect.objectContaining({ enabled: true, storePath: store.storePath }),
|
||||||
|
);
|
||||||
|
|
||||||
|
resolveRun?.({ status: "ok", summary: "done" });
|
||||||
|
await startPromise;
|
||||||
|
|
||||||
|
const jobs = await cron.list({ includeDisabled: true });
|
||||||
|
expect(jobs[0]?.state.lastStatus).toBe("ok");
|
||||||
|
expect(jobs[0]?.state.runningAtMs).toBeUndefined();
|
||||||
|
} finally {
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -28,14 +28,15 @@ async function ensureLoadedForRead(state: CronServiceState) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export async function start(state: CronServiceState) {
|
export async function start(state: CronServiceState) {
|
||||||
|
if (!state.deps.cronEnabled) {
|
||||||
|
state.deps.log.info({ enabled: false }, "cron: disabled");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const startupInterruptedJobIds = new Set<string>();
|
||||||
await locked(state, async () => {
|
await locked(state, async () => {
|
||||||
if (!state.deps.cronEnabled) {
|
|
||||||
state.deps.log.info({ enabled: false }, "cron: disabled");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await ensureLoaded(state, { skipRecompute: true });
|
await ensureLoaded(state, { skipRecompute: true });
|
||||||
const jobs = state.store?.jobs ?? [];
|
const jobs = state.store?.jobs ?? [];
|
||||||
const startupInterruptedJobIds = new Set<string>();
|
|
||||||
for (const job of jobs) {
|
for (const job of jobs) {
|
||||||
if (typeof job.state.runningAtMs === "number") {
|
if (typeof job.state.runningAtMs === "number") {
|
||||||
state.deps.log.warn(
|
state.deps.log.warn(
|
||||||
@@ -46,7 +47,13 @@ export async function start(state: CronServiceState) {
|
|||||||
startupInterruptedJobIds.add(job.id);
|
startupInterruptedJobIds.add(job.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await runMissedJobs(state, { skipJobIds: startupInterruptedJobIds });
|
await persist(state);
|
||||||
|
});
|
||||||
|
|
||||||
|
await runMissedJobs(state, { skipJobIds: startupInterruptedJobIds });
|
||||||
|
|
||||||
|
await locked(state, async () => {
|
||||||
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
recomputeNextRuns(state);
|
recomputeNextRuns(state);
|
||||||
await persist(state);
|
await persist(state);
|
||||||
armTimer(state);
|
armTimer(state);
|
||||||
|
|||||||
@@ -458,22 +458,97 @@ export async function runMissedJobs(
|
|||||||
state: CronServiceState,
|
state: CronServiceState,
|
||||||
opts?: { skipJobIds?: ReadonlySet<string> },
|
opts?: { skipJobIds?: ReadonlySet<string> },
|
||||||
) {
|
) {
|
||||||
if (!state.store) {
|
const startupCandidates = await locked(state, async () => {
|
||||||
return;
|
await ensureLoaded(state, { skipRecompute: true });
|
||||||
}
|
if (!state.store) {
|
||||||
const now = state.deps.nowMs();
|
return [] as Array<{ jobId: string; job: CronJob }>;
|
||||||
const skipJobIds = opts?.skipJobIds;
|
}
|
||||||
const missed = collectRunnableJobs(state, now, { skipJobIds, skipAtIfAlreadyRan: true });
|
const now = state.deps.nowMs();
|
||||||
|
const skipJobIds = opts?.skipJobIds;
|
||||||
if (missed.length > 0) {
|
const missed = collectRunnableJobs(state, now, { skipJobIds, skipAtIfAlreadyRan: true });
|
||||||
|
if (missed.length === 0) {
|
||||||
|
return [] as Array<{ jobId: string; job: CronJob }>;
|
||||||
|
}
|
||||||
state.deps.log.info(
|
state.deps.log.info(
|
||||||
{ count: missed.length, jobIds: missed.map((j) => j.id) },
|
{ count: missed.length, jobIds: missed.map((j) => j.id) },
|
||||||
"cron: running missed jobs after restart",
|
"cron: running missed jobs after restart",
|
||||||
);
|
);
|
||||||
for (const job of missed) {
|
for (const job of missed) {
|
||||||
await executeJob(state, job, now, { forced: false });
|
job.state.runningAtMs = now;
|
||||||
|
job.state.lastError = undefined;
|
||||||
|
}
|
||||||
|
await persist(state);
|
||||||
|
return missed.map((job) => ({ jobId: job.id, job }));
|
||||||
|
});
|
||||||
|
|
||||||
|
if (startupCandidates.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const outcomes: Array<TimedCronRunOutcome> = [];
|
||||||
|
for (const candidate of startupCandidates) {
|
||||||
|
const startedAt = state.deps.nowMs();
|
||||||
|
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
|
||||||
|
try {
|
||||||
|
const result = await executeJobCore(state, candidate.job);
|
||||||
|
outcomes.push({
|
||||||
|
jobId: candidate.jobId,
|
||||||
|
status: result.status,
|
||||||
|
error: result.error,
|
||||||
|
summary: result.summary,
|
||||||
|
delivered: result.delivered,
|
||||||
|
sessionId: result.sessionId,
|
||||||
|
sessionKey: result.sessionKey,
|
||||||
|
model: result.model,
|
||||||
|
provider: result.provider,
|
||||||
|
usage: result.usage,
|
||||||
|
startedAt,
|
||||||
|
endedAt: state.deps.nowMs(),
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
outcomes.push({
|
||||||
|
jobId: candidate.jobId,
|
||||||
|
status: "error",
|
||||||
|
error: String(err),
|
||||||
|
startedAt,
|
||||||
|
endedAt: state.deps.nowMs(),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await locked(state, async () => {
|
||||||
|
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||||
|
if (!state.store) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const result of outcomes) {
|
||||||
|
const job = state.store.jobs.find((entry) => entry.id === result.jobId);
|
||||||
|
if (!job) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const shouldDelete = applyJobResult(state, job, {
|
||||||
|
status: result.status,
|
||||||
|
error: result.error,
|
||||||
|
delivered: result.delivered,
|
||||||
|
startedAt: result.startedAt,
|
||||||
|
endedAt: result.endedAt,
|
||||||
|
});
|
||||||
|
|
||||||
|
emitJobFinished(state, job, result, result.startedAt);
|
||||||
|
|
||||||
|
if (shouldDelete) {
|
||||||
|
state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id);
|
||||||
|
emit(state, { jobId: job.id, action: "removed" });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preserve any new past-due nextRunAtMs values that became due while
|
||||||
|
// startup catch-up was running. They should execute on a future tick
|
||||||
|
// instead of being silently advanced.
|
||||||
|
recomputeNextRunsForMaintenance(state);
|
||||||
|
await persist(state);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function runDueJobs(state: CronServiceState) {
|
export async function runDueJobs(state: CronServiceState) {
|
||||||
|
|||||||
Reference in New Issue
Block a user