mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 09:11:26 +00:00
chore: Enable "curly" rule to avoid single-statement if confusion/errors.
This commit is contained in:
@@ -12,7 +12,9 @@ type DeliveryPayload = {
|
||||
|
||||
export function pickSummaryFromOutput(text: string | undefined) {
|
||||
const clean = (text ?? "").trim();
|
||||
if (!clean) return undefined;
|
||||
if (!clean) {
|
||||
return undefined;
|
||||
}
|
||||
const limit = 2000;
|
||||
return clean.length > limit ? `${truncateUtf16Safe(clean, limit)}…` : clean;
|
||||
}
|
||||
@@ -20,7 +22,9 @@ export function pickSummaryFromOutput(text: string | undefined) {
|
||||
export function pickSummaryFromPayloads(payloads: Array<{ text?: string | undefined }>) {
|
||||
for (let i = payloads.length - 1; i >= 0; i--) {
|
||||
const summary = pickSummaryFromOutput(payloads[i]?.text);
|
||||
if (summary) return summary;
|
||||
if (summary) {
|
||||
return summary;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
@@ -28,7 +32,9 @@ export function pickSummaryFromPayloads(payloads: Array<{ text?: string | undefi
|
||||
export function pickLastNonEmptyTextFromPayloads(payloads: Array<{ text?: string | undefined }>) {
|
||||
for (let i = payloads.length - 1; i >= 0; i--) {
|
||||
const clean = (payloads[i]?.text ?? "").trim();
|
||||
if (clean) return clean;
|
||||
if (clean) {
|
||||
return clean;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
@@ -38,11 +44,15 @@ export function pickLastNonEmptyTextFromPayloads(payloads: Array<{ text?: string
|
||||
* Returns true if delivery should be skipped because there's no real content.
|
||||
*/
|
||||
export function isHeartbeatOnlyResponse(payloads: DeliveryPayload[], ackMaxChars: number) {
|
||||
if (payloads.length === 0) return true;
|
||||
if (payloads.length === 0) {
|
||||
return true;
|
||||
}
|
||||
return payloads.every((payload) => {
|
||||
// If there's media, we should deliver regardless of text content.
|
||||
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0 || Boolean(payload.mediaUrl);
|
||||
if (hasMedia) return false;
|
||||
if (hasMedia) {
|
||||
return false;
|
||||
}
|
||||
// Use heartbeat mode to check if text is just HEARTBEAT_OK or short ack.
|
||||
const result = stripHeartbeatToken(payload.text, {
|
||||
mode: "heartbeat",
|
||||
|
||||
@@ -67,10 +67,14 @@ function matchesMessagingToolDeliveryTarget(
|
||||
target: MessagingToolSend,
|
||||
delivery: { channel: string; to?: string; accountId?: string },
|
||||
): boolean {
|
||||
if (!delivery.to || !target.to) return false;
|
||||
if (!delivery.to || !target.to) {
|
||||
return false;
|
||||
}
|
||||
const channel = delivery.channel.trim().toLowerCase();
|
||||
const provider = target.provider?.trim().toLowerCase();
|
||||
if (provider && provider !== "message" && provider !== channel) return false;
|
||||
if (provider && provider !== "message" && provider !== channel) {
|
||||
return false;
|
||||
}
|
||||
if (target.accountId && delivery.accountId && target.accountId !== delivery.accountId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -34,17 +34,22 @@ function coerceSchedule(schedule: UnknownRecord) {
|
||||
typeof schedule.atMs === "number" ||
|
||||
typeof schedule.at === "string" ||
|
||||
typeof schedule.atMs === "string"
|
||||
)
|
||||
) {
|
||||
next.kind = "at";
|
||||
else if (typeof schedule.everyMs === "number") next.kind = "every";
|
||||
else if (typeof schedule.expr === "string") next.kind = "cron";
|
||||
} else if (typeof schedule.everyMs === "number") {
|
||||
next.kind = "every";
|
||||
} else if (typeof schedule.expr === "string") {
|
||||
next.kind = "cron";
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof schedule.atMs !== "number" && parsedAtMs !== null) {
|
||||
next.atMs = parsedAtMs;
|
||||
}
|
||||
|
||||
if ("at" in next) delete next.at;
|
||||
if ("at" in next) {
|
||||
delete next.at;
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
@@ -57,8 +62,12 @@ function coercePayload(payload: UnknownRecord) {
|
||||
}
|
||||
|
||||
function unwrapJob(raw: UnknownRecord) {
|
||||
if (isRecord(raw.data)) return raw.data;
|
||||
if (isRecord(raw.job)) return raw.job;
|
||||
if (isRecord(raw.data)) {
|
||||
return raw.data;
|
||||
}
|
||||
if (isRecord(raw.job)) {
|
||||
return raw.job;
|
||||
}
|
||||
return raw;
|
||||
}
|
||||
|
||||
@@ -66,7 +75,9 @@ export function normalizeCronJobInput(
|
||||
raw: unknown,
|
||||
options: NormalizeOptions = DEFAULT_OPTIONS,
|
||||
): UnknownRecord | null {
|
||||
if (!isRecord(raw)) return null;
|
||||
if (!isRecord(raw)) {
|
||||
return null;
|
||||
}
|
||||
const base = unwrapJob(raw);
|
||||
const next: UnknownRecord = { ...base };
|
||||
|
||||
@@ -76,8 +87,11 @@ export function normalizeCronJobInput(
|
||||
next.agentId = null;
|
||||
} else if (typeof agentId === "string") {
|
||||
const trimmed = agentId.trim();
|
||||
if (trimmed) next.agentId = sanitizeAgentId(trimmed);
|
||||
else delete next.agentId;
|
||||
if (trimmed) {
|
||||
next.agentId = sanitizeAgentId(trimmed);
|
||||
} else {
|
||||
delete next.agentId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,8 +101,12 @@ export function normalizeCronJobInput(
|
||||
next.enabled = enabled;
|
||||
} else if (typeof enabled === "string") {
|
||||
const trimmed = enabled.trim().toLowerCase();
|
||||
if (trimmed === "true") next.enabled = true;
|
||||
if (trimmed === "false") next.enabled = false;
|
||||
if (trimmed === "true") {
|
||||
next.enabled = true;
|
||||
}
|
||||
if (trimmed === "false") {
|
||||
next.enabled = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,11 +119,17 @@ export function normalizeCronJobInput(
|
||||
}
|
||||
|
||||
if (options.applyDefaults) {
|
||||
if (!next.wakeMode) next.wakeMode = "next-heartbeat";
|
||||
if (!next.wakeMode) {
|
||||
next.wakeMode = "next-heartbeat";
|
||||
}
|
||||
if (!next.sessionTarget && isRecord(next.payload)) {
|
||||
const kind = typeof next.payload.kind === "string" ? next.payload.kind : "";
|
||||
if (kind === "systemEvent") next.sessionTarget = "main";
|
||||
if (kind === "agentTurn") next.sessionTarget = "isolated";
|
||||
if (kind === "systemEvent") {
|
||||
next.sessionTarget = "main";
|
||||
}
|
||||
if (kind === "agentTurn") {
|
||||
next.sessionTarget = "isolated";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,18 +3,28 @@ const ISO_DATE_RE = /^\d{4}-\d{2}-\d{2}$/;
|
||||
const ISO_DATE_TIME_RE = /^\d{4}-\d{2}-\d{2}T/;
|
||||
|
||||
function normalizeUtcIso(raw: string) {
|
||||
if (ISO_TZ_RE.test(raw)) return raw;
|
||||
if (ISO_DATE_RE.test(raw)) return `${raw}T00:00:00Z`;
|
||||
if (ISO_DATE_TIME_RE.test(raw)) return `${raw}Z`;
|
||||
if (ISO_TZ_RE.test(raw)) {
|
||||
return raw;
|
||||
}
|
||||
if (ISO_DATE_RE.test(raw)) {
|
||||
return `${raw}T00:00:00Z`;
|
||||
}
|
||||
if (ISO_DATE_TIME_RE.test(raw)) {
|
||||
return `${raw}Z`;
|
||||
}
|
||||
return raw;
|
||||
}
|
||||
|
||||
export function parseAbsoluteTimeMs(input: string): number | null {
|
||||
const raw = input.trim();
|
||||
if (!raw) return null;
|
||||
if (!raw) {
|
||||
return null;
|
||||
}
|
||||
if (/^\d+$/.test(raw)) {
|
||||
const n = Number(raw);
|
||||
if (Number.isFinite(n) && n > 0) return Math.floor(n);
|
||||
if (Number.isFinite(n) && n > 0) {
|
||||
return Math.floor(n);
|
||||
}
|
||||
}
|
||||
const parsed = Date.parse(normalizeUtcIso(raw));
|
||||
return Number.isFinite(parsed) ? parsed : null;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
type UnknownRecord = Record<string, unknown>;
|
||||
|
||||
function readString(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") return undefined;
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,9 @@ const writesByPath = new Map<string, Promise<void>>();
|
||||
|
||||
async function pruneIfNeeded(filePath: string, opts: { maxBytes: number; keepLines: number }) {
|
||||
const stat = await fs.stat(filePath).catch(() => null);
|
||||
if (!stat || stat.size <= opts.maxBytes) return;
|
||||
if (!stat || stat.size <= opts.maxBytes) {
|
||||
return;
|
||||
}
|
||||
|
||||
const raw = await fs.readFile(filePath, "utf-8").catch(() => "");
|
||||
const lines = raw
|
||||
@@ -64,19 +66,33 @@ export async function readCronRunLogEntries(
|
||||
const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200)));
|
||||
const jobId = opts?.jobId?.trim() || undefined;
|
||||
const raw = await fs.readFile(path.resolve(filePath), "utf-8").catch(() => "");
|
||||
if (!raw.trim()) return [];
|
||||
if (!raw.trim()) {
|
||||
return [];
|
||||
}
|
||||
const parsed: CronRunLogEntry[] = [];
|
||||
const lines = raw.split("\n");
|
||||
for (let i = lines.length - 1; i >= 0 && parsed.length < limit; i--) {
|
||||
const line = lines[i]?.trim();
|
||||
if (!line) continue;
|
||||
if (!line) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const obj = JSON.parse(line) as Partial<CronRunLogEntry> | null;
|
||||
if (!obj || typeof obj !== "object") continue;
|
||||
if (obj.action !== "finished") continue;
|
||||
if (typeof obj.jobId !== "string" || obj.jobId.trim().length === 0) continue;
|
||||
if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) continue;
|
||||
if (jobId && obj.jobId !== jobId) continue;
|
||||
if (!obj || typeof obj !== "object") {
|
||||
continue;
|
||||
}
|
||||
if (obj.action !== "finished") {
|
||||
continue;
|
||||
}
|
||||
if (typeof obj.jobId !== "string" || obj.jobId.trim().length === 0) {
|
||||
continue;
|
||||
}
|
||||
if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) {
|
||||
continue;
|
||||
}
|
||||
if (jobId && obj.jobId !== jobId) {
|
||||
continue;
|
||||
}
|
||||
parsed.push(obj as CronRunLogEntry);
|
||||
} catch {
|
||||
// ignore invalid lines
|
||||
|
||||
@@ -9,14 +9,18 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe
|
||||
if (schedule.kind === "every") {
|
||||
const everyMs = Math.max(1, Math.floor(schedule.everyMs));
|
||||
const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs));
|
||||
if (nowMs < anchor) return anchor;
|
||||
if (nowMs < anchor) {
|
||||
return anchor;
|
||||
}
|
||||
const elapsed = nowMs - anchor;
|
||||
const steps = Math.max(1, Math.floor((elapsed + everyMs - 1) / everyMs));
|
||||
return anchor + steps * everyMs;
|
||||
}
|
||||
|
||||
const expr = schedule.expr.trim();
|
||||
if (!expr) return undefined;
|
||||
if (!expr) {
|
||||
return undefined;
|
||||
}
|
||||
const cron = new Cron(expr, {
|
||||
timezone: schedule.tz?.trim() || undefined,
|
||||
catch: false,
|
||||
|
||||
@@ -163,7 +163,9 @@ describe("CronService", () => {
|
||||
|
||||
const runPromise = cron.run(job.id, "force");
|
||||
for (let i = 0; i < 10; i++) {
|
||||
if (runHeartbeatOnce.mock.calls.length > 0) break;
|
||||
if (runHeartbeatOnce.mock.calls.length > 0) {
|
||||
break;
|
||||
}
|
||||
// Let the locked() chain progress.
|
||||
await Promise.resolve();
|
||||
}
|
||||
|
||||
@@ -29,25 +29,35 @@ export function assertSupportedJobSpec(job: Pick<CronJob, "sessionTarget" | "pay
|
||||
|
||||
export function findJobOrThrow(state: CronServiceState, id: string) {
|
||||
const job = state.store?.jobs.find((j) => j.id === id);
|
||||
if (!job) throw new Error(`unknown cron job id: ${id}`);
|
||||
if (!job) {
|
||||
throw new Error(`unknown cron job id: ${id}`);
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | undefined {
|
||||
if (!job.enabled) return undefined;
|
||||
if (!job.enabled) {
|
||||
return undefined;
|
||||
}
|
||||
if (job.schedule.kind === "at") {
|
||||
// One-shot jobs stay due until they successfully finish.
|
||||
if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) return undefined;
|
||||
if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) {
|
||||
return undefined;
|
||||
}
|
||||
return job.schedule.atMs;
|
||||
}
|
||||
return computeNextRunAtMs(job.schedule, nowMs);
|
||||
}
|
||||
|
||||
export function recomputeNextRuns(state: CronServiceState) {
|
||||
if (!state.store) return;
|
||||
if (!state.store) {
|
||||
return;
|
||||
}
|
||||
const now = state.deps.nowMs();
|
||||
for (const job of state.store.jobs) {
|
||||
if (!job.state) job.state = {};
|
||||
if (!job.state) {
|
||||
job.state = {};
|
||||
}
|
||||
if (!job.enabled) {
|
||||
job.state.nextRunAtMs = undefined;
|
||||
job.state.runningAtMs = undefined;
|
||||
@@ -68,7 +78,9 @@ export function recomputeNextRuns(state: CronServiceState) {
|
||||
export function nextWakeAtMs(state: CronServiceState) {
|
||||
const jobs = state.store?.jobs ?? [];
|
||||
const enabled = jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number");
|
||||
if (enabled.length === 0) return undefined;
|
||||
if (enabled.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
return enabled.reduce(
|
||||
(min, j) => Math.min(min, j.state.nextRunAtMs as number),
|
||||
enabled[0].state.nextRunAtMs as number,
|
||||
@@ -102,16 +114,36 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
|
||||
}
|
||||
|
||||
export function applyJobPatch(job: CronJob, patch: CronJobPatch) {
|
||||
if ("name" in patch) job.name = normalizeRequiredName(patch.name);
|
||||
if ("description" in patch) job.description = normalizeOptionalText(patch.description);
|
||||
if (typeof patch.enabled === "boolean") job.enabled = patch.enabled;
|
||||
if (typeof patch.deleteAfterRun === "boolean") job.deleteAfterRun = patch.deleteAfterRun;
|
||||
if (patch.schedule) job.schedule = patch.schedule;
|
||||
if (patch.sessionTarget) job.sessionTarget = patch.sessionTarget;
|
||||
if (patch.wakeMode) job.wakeMode = patch.wakeMode;
|
||||
if (patch.payload) job.payload = mergeCronPayload(job.payload, patch.payload);
|
||||
if (patch.isolation) job.isolation = patch.isolation;
|
||||
if (patch.state) job.state = { ...job.state, ...patch.state };
|
||||
if ("name" in patch) {
|
||||
job.name = normalizeRequiredName(patch.name);
|
||||
}
|
||||
if ("description" in patch) {
|
||||
job.description = normalizeOptionalText(patch.description);
|
||||
}
|
||||
if (typeof patch.enabled === "boolean") {
|
||||
job.enabled = patch.enabled;
|
||||
}
|
||||
if (typeof patch.deleteAfterRun === "boolean") {
|
||||
job.deleteAfterRun = patch.deleteAfterRun;
|
||||
}
|
||||
if (patch.schedule) {
|
||||
job.schedule = patch.schedule;
|
||||
}
|
||||
if (patch.sessionTarget) {
|
||||
job.sessionTarget = patch.sessionTarget;
|
||||
}
|
||||
if (patch.wakeMode) {
|
||||
job.wakeMode = patch.wakeMode;
|
||||
}
|
||||
if (patch.payload) {
|
||||
job.payload = mergeCronPayload(job.payload, patch.payload);
|
||||
}
|
||||
if (patch.isolation) {
|
||||
job.isolation = patch.isolation;
|
||||
}
|
||||
if (patch.state) {
|
||||
job.state = { ...job.state, ...patch.state };
|
||||
}
|
||||
if ("agentId" in patch) {
|
||||
job.agentId = normalizeOptionalAgentId((patch as { agentId?: unknown }).agentId);
|
||||
}
|
||||
@@ -136,13 +168,27 @@ function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronP
|
||||
}
|
||||
|
||||
const next: Extract<CronPayload, { kind: "agentTurn" }> = { ...existing };
|
||||
if (typeof patch.message === "string") next.message = patch.message;
|
||||
if (typeof patch.model === "string") next.model = patch.model;
|
||||
if (typeof patch.thinking === "string") next.thinking = patch.thinking;
|
||||
if (typeof patch.timeoutSeconds === "number") next.timeoutSeconds = patch.timeoutSeconds;
|
||||
if (typeof patch.deliver === "boolean") next.deliver = patch.deliver;
|
||||
if (typeof patch.channel === "string") next.channel = patch.channel;
|
||||
if (typeof patch.to === "string") next.to = patch.to;
|
||||
if (typeof patch.message === "string") {
|
||||
next.message = patch.message;
|
||||
}
|
||||
if (typeof patch.model === "string") {
|
||||
next.model = patch.model;
|
||||
}
|
||||
if (typeof patch.thinking === "string") {
|
||||
next.thinking = patch.thinking;
|
||||
}
|
||||
if (typeof patch.timeoutSeconds === "number") {
|
||||
next.timeoutSeconds = patch.timeoutSeconds;
|
||||
}
|
||||
if (typeof patch.deliver === "boolean") {
|
||||
next.deliver = patch.deliver;
|
||||
}
|
||||
if (typeof patch.channel === "string") {
|
||||
next.channel = patch.channel;
|
||||
}
|
||||
if (typeof patch.to === "string") {
|
||||
next.to = patch.to;
|
||||
}
|
||||
if (typeof patch.bestEffortDeliver === "boolean") {
|
||||
next.bestEffortDeliver = patch.bestEffortDeliver;
|
||||
}
|
||||
@@ -175,12 +221,16 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload {
|
||||
}
|
||||
|
||||
export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) {
|
||||
if (opts.forced) return true;
|
||||
if (opts.forced) {
|
||||
return true;
|
||||
}
|
||||
return job.enabled && typeof job.state.nextRunAtMs === "number" && nowMs >= job.state.nextRunAtMs;
|
||||
}
|
||||
|
||||
export function resolveJobPayloadTextForMain(job: CronJob): string | undefined {
|
||||
if (job.payload.kind !== "systemEvent") return undefined;
|
||||
if (job.payload.kind !== "systemEvent") {
|
||||
return undefined;
|
||||
}
|
||||
const text = normalizePayloadToSystemText(job.payload);
|
||||
return text.trim() ? text : undefined;
|
||||
}
|
||||
|
||||
@@ -3,27 +3,39 @@ import { truncateUtf16Safe } from "../../utils.js";
|
||||
import type { CronPayload } from "../types.js";
|
||||
|
||||
export function normalizeRequiredName(raw: unknown) {
|
||||
if (typeof raw !== "string") throw new Error("cron job name is required");
|
||||
if (typeof raw !== "string") {
|
||||
throw new Error("cron job name is required");
|
||||
}
|
||||
const name = raw.trim();
|
||||
if (!name) throw new Error("cron job name is required");
|
||||
if (!name) {
|
||||
throw new Error("cron job name is required");
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
export function normalizeOptionalText(raw: unknown) {
|
||||
if (typeof raw !== "string") return undefined;
|
||||
if (typeof raw !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = raw.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function truncateText(input: string, maxLen: number) {
|
||||
if (input.length <= maxLen) return input;
|
||||
if (input.length <= maxLen) {
|
||||
return input;
|
||||
}
|
||||
return `${truncateUtf16Safe(input, Math.max(0, maxLen - 1)).trimEnd()}…`;
|
||||
}
|
||||
|
||||
export function normalizeOptionalAgentId(raw: unknown) {
|
||||
if (typeof raw !== "string") return undefined;
|
||||
if (typeof raw !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = raw.trim();
|
||||
if (!trimmed) return undefined;
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return normalizeAgentId(trimmed);
|
||||
}
|
||||
|
||||
@@ -42,18 +54,26 @@ export function inferLegacyName(job: {
|
||||
.split("\n")
|
||||
.map((l) => l.trim())
|
||||
.find(Boolean) ?? "";
|
||||
if (firstLine) return truncateText(firstLine, 60);
|
||||
if (firstLine) {
|
||||
return truncateText(firstLine, 60);
|
||||
}
|
||||
|
||||
const kind = typeof job?.schedule?.kind === "string" ? job.schedule.kind : "";
|
||||
if (kind === "cron" && typeof job?.schedule?.expr === "string")
|
||||
if (kind === "cron" && typeof job?.schedule?.expr === "string") {
|
||||
return `Cron: ${truncateText(job.schedule.expr, 52)}`;
|
||||
if (kind === "every" && typeof job?.schedule?.everyMs === "number")
|
||||
}
|
||||
if (kind === "every" && typeof job?.schedule?.everyMs === "number") {
|
||||
return `Every: ${job.schedule.everyMs}ms`;
|
||||
if (kind === "at") return "One-shot";
|
||||
}
|
||||
if (kind === "at") {
|
||||
return "One-shot";
|
||||
}
|
||||
return "Cron job";
|
||||
}
|
||||
|
||||
export function normalizePayloadToSystemText(payload: CronPayload) {
|
||||
if (payload.kind === "systemEvent") return payload.text.trim();
|
||||
if (payload.kind === "systemEvent") {
|
||||
return payload.text.trim();
|
||||
}
|
||||
return payload.message.trim();
|
||||
}
|
||||
|
||||
@@ -107,12 +107,16 @@ export async function remove(state: CronServiceState, id: string) {
|
||||
warnIfDisabled(state, "remove");
|
||||
await ensureLoaded(state);
|
||||
const before = state.store?.jobs.length ?? 0;
|
||||
if (!state.store) return { ok: false, removed: false } as const;
|
||||
if (!state.store) {
|
||||
return { ok: false, removed: false } as const;
|
||||
}
|
||||
state.store.jobs = state.store.jobs.filter((j) => j.id !== id);
|
||||
const removed = (state.store.jobs.length ?? 0) !== before;
|
||||
await persist(state);
|
||||
armTimer(state);
|
||||
if (removed) emit(state, { jobId: id, action: "removed" });
|
||||
if (removed) {
|
||||
emit(state, { jobId: id, action: "removed" });
|
||||
}
|
||||
return { ok: true, removed } as const;
|
||||
});
|
||||
}
|
||||
@@ -124,7 +128,9 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
|
||||
const job = findJobOrThrow(state, id);
|
||||
const now = state.deps.nowMs();
|
||||
const due = isJobDue(job, now, { forced: mode === "force" });
|
||||
if (!due) return { ok: true, ran: false, reason: "not-due" as const };
|
||||
if (!due) {
|
||||
return { ok: true, ran: false, reason: "not-due" as const };
|
||||
}
|
||||
await executeJob(state, job, now, { forced: mode === "force" });
|
||||
await persist(state);
|
||||
armTimer(state);
|
||||
|
||||
@@ -7,7 +7,9 @@ import type { CronServiceState } from "./state.js";
|
||||
const storeCache = new Map<string, { version: 1; jobs: CronJob[] }>();
|
||||
|
||||
export async function ensureLoaded(state: CronServiceState) {
|
||||
if (state.store) return;
|
||||
if (state.store) {
|
||||
return;
|
||||
}
|
||||
const cached = storeCache.get(state.deps.storePath);
|
||||
if (cached) {
|
||||
state.store = cached;
|
||||
@@ -43,12 +45,18 @@ export async function ensureLoaded(state: CronServiceState) {
|
||||
}
|
||||
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
|
||||
storeCache.set(state.deps.storePath, state.store);
|
||||
if (mutated) await persist(state);
|
||||
if (mutated) {
|
||||
await persist(state);
|
||||
}
|
||||
}
|
||||
|
||||
export function warnIfDisabled(state: CronServiceState, action: string) {
|
||||
if (state.deps.cronEnabled) return;
|
||||
if (state.warnedDisabled) return;
|
||||
if (state.deps.cronEnabled) {
|
||||
return;
|
||||
}
|
||||
if (state.warnedDisabled) {
|
||||
return;
|
||||
}
|
||||
state.warnedDisabled = true;
|
||||
state.deps.log.warn(
|
||||
{ enabled: false, action, storePath: state.deps.storePath },
|
||||
@@ -57,6 +65,8 @@ export function warnIfDisabled(state: CronServiceState, action: string) {
|
||||
}
|
||||
|
||||
export async function persist(state: CronServiceState) {
|
||||
if (!state.store) return;
|
||||
if (!state.store) {
|
||||
return;
|
||||
}
|
||||
await saveCronStore(state.deps.storePath, state.store);
|
||||
}
|
||||
|
||||
@@ -8,11 +8,17 @@ import { ensureLoaded, persist } from "./store.js";
|
||||
const MAX_TIMEOUT_MS = 2 ** 31 - 1;
|
||||
|
||||
export function armTimer(state: CronServiceState) {
|
||||
if (state.timer) clearTimeout(state.timer);
|
||||
if (state.timer) {
|
||||
clearTimeout(state.timer);
|
||||
}
|
||||
state.timer = null;
|
||||
if (!state.deps.cronEnabled) return;
|
||||
if (!state.deps.cronEnabled) {
|
||||
return;
|
||||
}
|
||||
const nextAt = nextWakeAtMs(state);
|
||||
if (!nextAt) return;
|
||||
if (!nextAt) {
|
||||
return;
|
||||
}
|
||||
const delay = Math.max(nextAt - state.deps.nowMs(), 0);
|
||||
// Avoid TimeoutOverflowWarning when a job is far in the future.
|
||||
const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS);
|
||||
@@ -25,7 +31,9 @@ export function armTimer(state: CronServiceState) {
|
||||
}
|
||||
|
||||
export async function onTimer(state: CronServiceState) {
|
||||
if (state.running) return;
|
||||
if (state.running) {
|
||||
return;
|
||||
}
|
||||
state.running = true;
|
||||
try {
|
||||
await locked(state, async () => {
|
||||
@@ -40,11 +48,17 @@ export async function onTimer(state: CronServiceState) {
|
||||
}
|
||||
|
||||
export async function runDueJobs(state: CronServiceState) {
|
||||
if (!state.store) return;
|
||||
if (!state.store) {
|
||||
return;
|
||||
}
|
||||
const now = state.deps.nowMs();
|
||||
const due = state.store.jobs.filter((j) => {
|
||||
if (!j.enabled) return false;
|
||||
if (typeof j.state.runningAtMs === "number") return false;
|
||||
if (!j.enabled) {
|
||||
return false;
|
||||
}
|
||||
if (typeof j.state.runningAtMs === "number") {
|
||||
return false;
|
||||
}
|
||||
const next = j.state.nextRunAtMs;
|
||||
return typeof next === "number" && now >= next;
|
||||
});
|
||||
@@ -199,10 +213,13 @@ export async function executeJob(
|
||||
job,
|
||||
message: job.payload.message,
|
||||
});
|
||||
if (res.status === "ok") await finish("ok", undefined, res.summary, res.outputText);
|
||||
else if (res.status === "skipped")
|
||||
if (res.status === "ok") {
|
||||
await finish("ok", undefined, res.summary, res.outputText);
|
||||
} else if (res.status === "skipped") {
|
||||
await finish("skipped", undefined, res.summary, res.outputText);
|
||||
else await finish("error", res.error ?? "cron job failed", res.summary, res.outputText);
|
||||
} else {
|
||||
await finish("error", res.error ?? "cron job failed", res.summary, res.outputText);
|
||||
}
|
||||
} catch (err) {
|
||||
await finish("error", String(err));
|
||||
} finally {
|
||||
@@ -219,7 +236,9 @@ export function wake(
|
||||
opts: { mode: "now" | "next-heartbeat"; text: string },
|
||||
) {
|
||||
const text = opts.text.trim();
|
||||
if (!text) return { ok: false } as const;
|
||||
if (!text) {
|
||||
return { ok: false } as const;
|
||||
}
|
||||
state.deps.enqueueSystemEvent(text);
|
||||
if (opts.mode === "now") {
|
||||
state.deps.requestHeartbeatNow({ reason: "wake" });
|
||||
@@ -228,7 +247,9 @@ export function wake(
|
||||
}
|
||||
|
||||
export function stopTimer(state: CronServiceState) {
|
||||
if (state.timer) clearTimeout(state.timer);
|
||||
if (state.timer) {
|
||||
clearTimeout(state.timer);
|
||||
}
|
||||
state.timer = null;
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,9 @@ export const DEFAULT_CRON_STORE_PATH = path.join(DEFAULT_CRON_DIR, "jobs.json");
|
||||
export function resolveCronStorePath(storePath?: string) {
|
||||
if (storePath?.trim()) {
|
||||
const raw = storePath.trim();
|
||||
if (raw.startsWith("~")) return path.resolve(raw.replace("~", os.homedir()));
|
||||
if (raw.startsWith("~")) {
|
||||
return path.resolve(raw.replace("~", os.homedir()));
|
||||
}
|
||||
return path.resolve(raw);
|
||||
}
|
||||
return DEFAULT_CRON_STORE_PATH;
|
||||
|
||||
Reference in New Issue
Block a user