mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 19:48:27 +00:00
fix: unify session maintenance and cron run pruning (#13083)
* fix: prune stale session entries, cap entry count, and rotate sessions.json
The sessions.json file grows unbounded over time. Every heartbeat tick (default: 30m)
triggers multiple full rewrites, and session keys from groups, threads, and DMs
accumulate indefinitely with large embedded objects (skillsSnapshot,
systemPromptReport). At >50MB the synchronous JSON parse blocks the event loop,
causing Telegram webhook timeouts and effectively taking the bot down.
Three mitigations, all running inside saveSessionStoreUnlocked() on every write:
1. Prune stale entries: remove entries with updatedAt older than 30 days
(configurable via session.maintenance.pruneDays in openclaw.json)
2. Cap entry count: keep only the 500 most recently updated entries
(configurable via session.maintenance.maxEntries). Entries without updatedAt
are evicted first.
3. File rotation: if the existing sessions.json exceeds 10MB before a write,
rename it to sessions.json.bak.{timestamp} and keep only the 3 most recent
backups (configurable via session.maintenance.rotateBytes).
All three thresholds are configurable under session.maintenance in openclaw.json
with Zod validation. No env vars.
Existing tests updated to use Date.now() instead of epoch-relative timestamps
(1, 2, 3) that would be incorrectly pruned as stale.
27 new tests covering pruning, capping, rotation, and integration scenarios.
* feat: auto-prune expired cron run sessions (#12289)
Add TTL-based reaper for isolated cron run sessions that accumulate
indefinitely in sessions.json.
New config option:
cron.sessionRetention: string | false (default: '24h')
The reaper runs piggy-backed on the cron timer tick, self-throttled
to sweep at most every 5 minutes. It removes session entries matching
the pattern cron:<jobId>:run:<uuid> whose updatedAt + retention < now.
Design follows the Kubernetes ttlSecondsAfterFinished pattern:
- Sessions are persisted normally (observability/debugging)
- A periodic reaper prunes expired entries
- Configurable retention with sensible default
- Set to false to disable pruning entirely
Files changed:
- src/config/types.cron.ts: Add sessionRetention to CronConfig
- src/config/zod-schema.ts: Add Zod validation for sessionRetention
- src/cron/session-reaper.ts: New reaper module (sweepCronRunSessions)
- src/cron/session-reaper.test.ts: 12 tests covering all paths
- src/cron/service/state.ts: Add cronConfig/sessionStorePath to deps
- src/cron/service/timer.ts: Wire reaper into onTimer tick
- src/gateway/server-cron.ts: Pass config and session store path to deps
Closes #12289
* fix: sweep cron session stores per agent
* docs: add changelog for session maintenance (#13083) (thanks @skyfallsin, @Glucksberg)
* fix: add warn-only session maintenance mode
* fix: warn-only maintenance defaults to active session
* fix: deliver maintenance warnings to active session
* docs: add session maintenance examples
* fix: accept duration and size maintenance thresholds
* refactor: share cron run session key check
* fix: format issues and replace defaultRuntime.warn with console.warn
---------
Co-authored-by: Pradeep Elankumaran <pradeepe@gmail.com>
Co-authored-by: Glucksberg <markuscontasul@gmail.com>
Co-authored-by: max <40643627+quotentiroler@users.noreply.github.com>
Co-authored-by: quotentiroler <max.nussbaumer@maxhealth.tech>
This commit is contained in:
committed by
GitHub
parent
0657d7c772
commit
e19a23520c
@@ -1,3 +1,4 @@
|
||||
import type { CronConfig } from "../../config/types.cron.js";
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js";
|
||||
|
||||
@@ -26,6 +27,14 @@ export type CronServiceDeps = {
|
||||
log: Logger;
|
||||
storePath: string;
|
||||
cronEnabled: boolean;
|
||||
/** CronConfig for session retention settings. */
|
||||
cronConfig?: CronConfig;
|
||||
/** Default agent id for jobs without an agent id. */
|
||||
defaultAgentId?: string;
|
||||
/** Resolve session store path for a given agent id. */
|
||||
resolveSessionStorePath?: (agentId?: string) => string;
|
||||
/** Path to the session store (sessions.json) for reaper use. */
|
||||
sessionStorePath?: string;
|
||||
enqueueSystemEvent: (text: string, opts?: { agentId?: string }) => void;
|
||||
requestHeartbeatNow: (opts?: { reason?: string }) => void;
|
||||
runHeartbeatOnce?: (opts?: { reason?: string }) => Promise<HeartbeatRunResult>;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import type { CronEvent, CronServiceState } from "./state.js";
|
||||
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||
import { sweepCronRunSessions } from "../session-reaper.js";
|
||||
import {
|
||||
computeJobNextRunAtMs,
|
||||
nextWakeAtMs,
|
||||
@@ -273,6 +275,38 @@ export async function onTimer(state: CronServiceState) {
|
||||
await persist(state);
|
||||
});
|
||||
}
|
||||
// Piggyback session reaper on timer tick (self-throttled to every 5 min).
|
||||
const storePaths = new Set<string>();
|
||||
if (state.deps.resolveSessionStorePath) {
|
||||
const defaultAgentId = state.deps.defaultAgentId ?? DEFAULT_AGENT_ID;
|
||||
if (state.store?.jobs?.length) {
|
||||
for (const job of state.store.jobs) {
|
||||
const agentId =
|
||||
typeof job.agentId === "string" && job.agentId.trim() ? job.agentId : defaultAgentId;
|
||||
storePaths.add(state.deps.resolveSessionStorePath(agentId));
|
||||
}
|
||||
} else {
|
||||
storePaths.add(state.deps.resolveSessionStorePath(defaultAgentId));
|
||||
}
|
||||
} else if (state.deps.sessionStorePath) {
|
||||
storePaths.add(state.deps.sessionStorePath);
|
||||
}
|
||||
|
||||
if (storePaths.size > 0) {
|
||||
const nowMs = state.deps.nowMs();
|
||||
for (const storePath of storePaths) {
|
||||
try {
|
||||
await sweepCronRunSessions({
|
||||
cronConfig: state.deps.cronConfig,
|
||||
sessionStorePath: storePath,
|
||||
nowMs,
|
||||
log: state.deps.log,
|
||||
});
|
||||
} catch (err) {
|
||||
state.deps.log.warn({ err: String(err), storePath }, "cron: session reaper sweep failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
state.running = false;
|
||||
armTimer(state);
|
||||
|
||||
203
src/cron/session-reaper.test.ts
Normal file
203
src/cron/session-reaper.test.ts
Normal file
@@ -0,0 +1,203 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import type { Logger } from "./service/state.js";
|
||||
import { isCronRunSessionKey } from "../sessions/session-key-utils.js";
|
||||
import { sweepCronRunSessions, resolveRetentionMs, resetReaperThrottle } from "./session-reaper.js";
|
||||
|
||||
function createTestLogger(): Logger {
|
||||
return {
|
||||
debug: () => {},
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
error: () => {},
|
||||
};
|
||||
}
|
||||
|
||||
describe("resolveRetentionMs", () => {
|
||||
it("returns 24h default when no config", () => {
|
||||
expect(resolveRetentionMs()).toBe(24 * 3_600_000);
|
||||
});
|
||||
|
||||
it("returns 24h default when config is empty", () => {
|
||||
expect(resolveRetentionMs({})).toBe(24 * 3_600_000);
|
||||
});
|
||||
|
||||
it("parses duration string", () => {
|
||||
expect(resolveRetentionMs({ sessionRetention: "1h" })).toBe(3_600_000);
|
||||
expect(resolveRetentionMs({ sessionRetention: "7d" })).toBe(7 * 86_400_000);
|
||||
expect(resolveRetentionMs({ sessionRetention: "30m" })).toBe(30 * 60_000);
|
||||
});
|
||||
|
||||
it("returns null when disabled", () => {
|
||||
expect(resolveRetentionMs({ sessionRetention: false })).toBeNull();
|
||||
});
|
||||
|
||||
it("falls back to default on invalid string", () => {
|
||||
expect(resolveRetentionMs({ sessionRetention: "abc" })).toBe(24 * 3_600_000);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isCronRunSessionKey", () => {
|
||||
it("matches cron run session keys", () => {
|
||||
expect(isCronRunSessionKey("agent:main:cron:abc-123:run:def-456")).toBe(true);
|
||||
expect(isCronRunSessionKey("agent:debugger:cron:249ecf82:run:1102aabb")).toBe(true);
|
||||
});
|
||||
|
||||
it("does not match base cron session keys", () => {
|
||||
expect(isCronRunSessionKey("agent:main:cron:abc-123")).toBe(false);
|
||||
});
|
||||
|
||||
it("does not match regular session keys", () => {
|
||||
expect(isCronRunSessionKey("agent:main:telegram:dm:123")).toBe(false);
|
||||
});
|
||||
|
||||
it("does not match non-canonical cron-like keys", () => {
|
||||
expect(isCronRunSessionKey("agent:main:slack:cron:job:run:uuid")).toBe(false);
|
||||
expect(isCronRunSessionKey("cron:job:run:uuid")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("sweepCronRunSessions", () => {
|
||||
let tmpDir: string;
|
||||
let storePath: string;
|
||||
const log = createTestLogger();
|
||||
|
||||
beforeEach(async () => {
|
||||
resetReaperThrottle();
|
||||
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "cron-reaper-"));
|
||||
storePath = path.join(tmpDir, "sessions.json");
|
||||
});
|
||||
|
||||
it("prunes expired cron run sessions", async () => {
|
||||
const now = Date.now();
|
||||
const store: Record<string, { sessionId: string; updatedAt: number }> = {
|
||||
"agent:main:cron:job1": {
|
||||
sessionId: "base-session",
|
||||
updatedAt: now,
|
||||
},
|
||||
"agent:main:cron:job1:run:old-run": {
|
||||
sessionId: "old-run",
|
||||
updatedAt: now - 25 * 3_600_000, // 25h ago — expired
|
||||
},
|
||||
"agent:main:cron:job1:run:recent-run": {
|
||||
sessionId: "recent-run",
|
||||
updatedAt: now - 1 * 3_600_000, // 1h ago — not expired
|
||||
},
|
||||
"agent:main:telegram:dm:123": {
|
||||
sessionId: "regular-session",
|
||||
updatedAt: now - 100 * 3_600_000, // old but not a cron run
|
||||
},
|
||||
};
|
||||
fs.writeFileSync(storePath, JSON.stringify(store));
|
||||
|
||||
const result = await sweepCronRunSessions({
|
||||
sessionStorePath: storePath,
|
||||
nowMs: now,
|
||||
log,
|
||||
force: true,
|
||||
});
|
||||
|
||||
expect(result.swept).toBe(true);
|
||||
expect(result.pruned).toBe(1);
|
||||
|
||||
const updated = JSON.parse(fs.readFileSync(storePath, "utf-8"));
|
||||
expect(updated["agent:main:cron:job1"]).toBeDefined();
|
||||
expect(updated["agent:main:cron:job1:run:old-run"]).toBeUndefined();
|
||||
expect(updated["agent:main:cron:job1:run:recent-run"]).toBeDefined();
|
||||
expect(updated["agent:main:telegram:dm:123"]).toBeDefined();
|
||||
});
|
||||
|
||||
it("respects custom retention", async () => {
|
||||
const now = Date.now();
|
||||
const store: Record<string, { sessionId: string; updatedAt: number }> = {
|
||||
"agent:main:cron:job1:run:run1": {
|
||||
sessionId: "run1",
|
||||
updatedAt: now - 2 * 3_600_000, // 2h ago
|
||||
},
|
||||
};
|
||||
fs.writeFileSync(storePath, JSON.stringify(store));
|
||||
|
||||
const result = await sweepCronRunSessions({
|
||||
cronConfig: { sessionRetention: "1h" },
|
||||
sessionStorePath: storePath,
|
||||
nowMs: now,
|
||||
log,
|
||||
force: true,
|
||||
});
|
||||
|
||||
expect(result.pruned).toBe(1);
|
||||
});
|
||||
|
||||
it("does nothing when pruning is disabled", async () => {
|
||||
const now = Date.now();
|
||||
const store: Record<string, { sessionId: string; updatedAt: number }> = {
|
||||
"agent:main:cron:job1:run:run1": {
|
||||
sessionId: "run1",
|
||||
updatedAt: now - 100 * 3_600_000,
|
||||
},
|
||||
};
|
||||
fs.writeFileSync(storePath, JSON.stringify(store));
|
||||
|
||||
const result = await sweepCronRunSessions({
|
||||
cronConfig: { sessionRetention: false },
|
||||
sessionStorePath: storePath,
|
||||
nowMs: now,
|
||||
log,
|
||||
force: true,
|
||||
});
|
||||
|
||||
expect(result.swept).toBe(false);
|
||||
expect(result.pruned).toBe(0);
|
||||
});
|
||||
|
||||
it("throttles sweeps without force", async () => {
|
||||
const now = Date.now();
|
||||
fs.writeFileSync(storePath, JSON.stringify({}));
|
||||
|
||||
// First sweep runs
|
||||
const r1 = await sweepCronRunSessions({
|
||||
sessionStorePath: storePath,
|
||||
nowMs: now,
|
||||
log,
|
||||
});
|
||||
expect(r1.swept).toBe(true);
|
||||
|
||||
// Second sweep (1 second later) is throttled
|
||||
const r2 = await sweepCronRunSessions({
|
||||
sessionStorePath: storePath,
|
||||
nowMs: now + 1000,
|
||||
log,
|
||||
});
|
||||
expect(r2.swept).toBe(false);
|
||||
});
|
||||
|
||||
it("throttles per store path", async () => {
|
||||
const now = Date.now();
|
||||
const otherPath = path.join(tmpDir, "sessions-other.json");
|
||||
fs.writeFileSync(storePath, JSON.stringify({}));
|
||||
fs.writeFileSync(otherPath, JSON.stringify({}));
|
||||
|
||||
const r1 = await sweepCronRunSessions({
|
||||
sessionStorePath: storePath,
|
||||
nowMs: now,
|
||||
log,
|
||||
});
|
||||
expect(r1.swept).toBe(true);
|
||||
|
||||
const r2 = await sweepCronRunSessions({
|
||||
sessionStorePath: otherPath,
|
||||
nowMs: now + 1000,
|
||||
log,
|
||||
});
|
||||
expect(r2.swept).toBe(true);
|
||||
|
||||
const r3 = await sweepCronRunSessions({
|
||||
sessionStorePath: storePath,
|
||||
nowMs: now + 1000,
|
||||
log,
|
||||
});
|
||||
expect(r3.swept).toBe(false);
|
||||
});
|
||||
});
|
||||
115
src/cron/session-reaper.ts
Normal file
115
src/cron/session-reaper.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
/**
|
||||
* Cron session reaper — prunes completed isolated cron run sessions
|
||||
* from the session store after a configurable retention period.
|
||||
*
|
||||
* Pattern: sessions keyed as `...:cron:<jobId>:run:<uuid>` are ephemeral
|
||||
* run records. The base session (`...:cron:<jobId>`) is kept as-is.
|
||||
*/
|
||||
|
||||
import type { CronConfig } from "../config/types.cron.js";
|
||||
import type { Logger } from "./service/state.js";
|
||||
import { parseDurationMs } from "../cli/parse-duration.js";
|
||||
import { updateSessionStore } from "../config/sessions.js";
|
||||
import { isCronRunSessionKey } from "../sessions/session-key-utils.js";
|
||||
|
||||
const DEFAULT_RETENTION_MS = 24 * 3_600_000; // 24 hours
|
||||
|
||||
/** Minimum interval between reaper sweeps (avoid running every timer tick). */
|
||||
const MIN_SWEEP_INTERVAL_MS = 5 * 60_000; // 5 minutes
|
||||
|
||||
const lastSweepAtMsByStore = new Map<string, number>();
|
||||
|
||||
export function resolveRetentionMs(cronConfig?: CronConfig): number | null {
|
||||
if (cronConfig?.sessionRetention === false) {
|
||||
return null; // pruning disabled
|
||||
}
|
||||
const raw = cronConfig?.sessionRetention;
|
||||
if (typeof raw === "string" && raw.trim()) {
|
||||
try {
|
||||
return parseDurationMs(raw.trim(), { defaultUnit: "h" });
|
||||
} catch {
|
||||
return DEFAULT_RETENTION_MS;
|
||||
}
|
||||
}
|
||||
return DEFAULT_RETENTION_MS;
|
||||
}
|
||||
|
||||
export type ReaperResult = {
|
||||
swept: boolean;
|
||||
pruned: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* Sweep the session store and prune expired cron run sessions.
|
||||
* Designed to be called from the cron timer tick — self-throttles via
|
||||
* MIN_SWEEP_INTERVAL_MS to avoid excessive I/O.
|
||||
*
|
||||
* Lock ordering: this function acquires the session-store file lock via
|
||||
* `updateSessionStore`. It must be called OUTSIDE of the cron service's
|
||||
* own `locked()` section to avoid lock-order inversions. The cron timer
|
||||
* calls this after all `locked()` sections have been released.
|
||||
*/
|
||||
export async function sweepCronRunSessions(params: {
|
||||
cronConfig?: CronConfig;
|
||||
/** Resolved path to sessions.json — required. */
|
||||
sessionStorePath: string;
|
||||
nowMs?: number;
|
||||
log: Logger;
|
||||
/** Override for testing — skips the min-interval throttle. */
|
||||
force?: boolean;
|
||||
}): Promise<ReaperResult> {
|
||||
const now = params.nowMs ?? Date.now();
|
||||
const storePath = params.sessionStorePath;
|
||||
const lastSweepAtMs = lastSweepAtMsByStore.get(storePath) ?? 0;
|
||||
|
||||
// Throttle: don't sweep more often than every 5 minutes.
|
||||
if (!params.force && now - lastSweepAtMs < MIN_SWEEP_INTERVAL_MS) {
|
||||
return { swept: false, pruned: 0 };
|
||||
}
|
||||
|
||||
const retentionMs = resolveRetentionMs(params.cronConfig);
|
||||
if (retentionMs === null) {
|
||||
lastSweepAtMsByStore.set(storePath, now);
|
||||
return { swept: false, pruned: 0 };
|
||||
}
|
||||
|
||||
let pruned = 0;
|
||||
try {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const cutoff = now - retentionMs;
|
||||
for (const key of Object.keys(store)) {
|
||||
if (!isCronRunSessionKey(key)) {
|
||||
continue;
|
||||
}
|
||||
const entry = store[key];
|
||||
if (!entry) {
|
||||
continue;
|
||||
}
|
||||
const updatedAt = entry.updatedAt ?? 0;
|
||||
if (updatedAt < cutoff) {
|
||||
delete store[key];
|
||||
pruned++;
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (err) {
|
||||
params.log.warn({ err: String(err) }, "cron-reaper: failed to sweep session store");
|
||||
return { swept: false, pruned: 0 };
|
||||
}
|
||||
|
||||
lastSweepAtMsByStore.set(storePath, now);
|
||||
|
||||
if (pruned > 0) {
|
||||
params.log.info(
|
||||
{ pruned, retentionMs },
|
||||
`cron-reaper: pruned ${pruned} expired cron run session(s)`,
|
||||
);
|
||||
}
|
||||
|
||||
return { swept: true, pruned };
|
||||
}
|
||||
|
||||
/** Reset the throttle timer (for tests). */
|
||||
export function resetReaperThrottle(): void {
|
||||
lastSweepAtMsByStore.clear();
|
||||
}
|
||||
Reference in New Issue
Block a user