Session/Cron maintenance hardening and cleanup UX (#24753)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: 7533b85156
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: shakkernerd <165377636+shakkernerd@users.noreply.github.com>
Reviewed-by: @shakkernerd
This commit is contained in:
Gustavo Madeira Santana
2026-02-23 17:39:48 -05:00
committed by GitHub
parent 29b19455e3
commit eff3c5c707
49 changed files with 3180 additions and 235 deletions

View File

@@ -4,12 +4,40 @@ import path from "node:path";
import { describe, expect, it } from "vitest";
import {
appendCronRunLog,
DEFAULT_CRON_RUN_LOG_KEEP_LINES,
DEFAULT_CRON_RUN_LOG_MAX_BYTES,
getPendingCronRunLogWriteCountForTests,
readCronRunLogEntries,
resolveCronRunLogPruneOptions,
resolveCronRunLogPath,
} from "./run-log.js";
describe("cron run log", () => {
it("resolves prune options from config with defaults", () => {
expect(resolveCronRunLogPruneOptions()).toEqual({
maxBytes: DEFAULT_CRON_RUN_LOG_MAX_BYTES,
keepLines: DEFAULT_CRON_RUN_LOG_KEEP_LINES,
});
expect(
resolveCronRunLogPruneOptions({
maxBytes: "5mb",
keepLines: 123,
}),
).toEqual({
maxBytes: 5 * 1024 * 1024,
keepLines: 123,
});
expect(
resolveCronRunLogPruneOptions({
maxBytes: "invalid",
keepLines: -1,
}),
).toEqual({
maxBytes: DEFAULT_CRON_RUN_LOG_MAX_BYTES,
keepLines: DEFAULT_CRON_RUN_LOG_KEEP_LINES,
});
});
async function withRunLogDir(prefix: string, run: (dir: string) => Promise<void>) {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
try {

View File

@@ -1,5 +1,7 @@
import fs from "node:fs/promises";
import path from "node:path";
import { parseByteSize } from "../cli/parse-bytes.js";
import type { CronConfig } from "../config/types.cron.js";
import type { CronDeliveryStatus, CronRunStatus, CronRunTelemetry } from "./types.js";
export type CronRunLogEntry = {
@@ -73,6 +75,30 @@ export function resolveCronRunLogPath(params: { storePath: string; jobId: string
const writesByPath = new Map<string, Promise<void>>();
export const DEFAULT_CRON_RUN_LOG_MAX_BYTES = 2_000_000;
export const DEFAULT_CRON_RUN_LOG_KEEP_LINES = 2_000;
export function resolveCronRunLogPruneOptions(cfg?: CronConfig["runLog"]): {
maxBytes: number;
keepLines: number;
} {
let maxBytes = DEFAULT_CRON_RUN_LOG_MAX_BYTES;
if (cfg?.maxBytes !== undefined) {
try {
maxBytes = parseByteSize(String(cfg.maxBytes).trim(), { defaultUnit: "b" });
} catch {
maxBytes = DEFAULT_CRON_RUN_LOG_MAX_BYTES;
}
}
let keepLines = DEFAULT_CRON_RUN_LOG_KEEP_LINES;
if (typeof cfg?.keepLines === "number" && Number.isFinite(cfg.keepLines) && cfg.keepLines > 0) {
keepLines = Math.floor(cfg.keepLines);
}
return { maxBytes, keepLines };
}
export function getPendingCronRunLogWriteCountForTests() {
return writesByPath.size;
}
@@ -108,8 +134,8 @@ export async function appendCronRunLog(
await fs.mkdir(path.dirname(resolved), { recursive: true });
await fs.appendFile(resolved, `${JSON.stringify(entry)}\n`, "utf-8");
await pruneIfNeeded(resolved, {
maxBytes: opts?.maxBytes ?? 2_000_000,
keepLines: opts?.keepLines ?? 2_000,
maxBytes: opts?.maxBytes ?? DEFAULT_CRON_RUN_LOG_MAX_BYTES,
keepLines: opts?.keepLines ?? DEFAULT_CRON_RUN_LOG_KEEP_LINES,
});
});
writesByPath.set(resolved, next);

View File

@@ -109,6 +109,61 @@ describe("sweepCronRunSessions", () => {
expect(updated["agent:main:telegram:dm:123"]).toBeDefined();
});
it("archives transcript files for pruned run sessions that are no longer referenced", async () => {
const now = Date.now();
const runSessionId = "old-run";
const runTranscript = path.join(tmpDir, `${runSessionId}.jsonl`);
fs.writeFileSync(runTranscript, '{"type":"session"}\n');
const store: Record<string, { sessionId: string; updatedAt: number }> = {
"agent:main:cron:job1:run:old-run": {
sessionId: runSessionId,
updatedAt: now - 25 * 3_600_000,
},
};
fs.writeFileSync(storePath, JSON.stringify(store));
const result = await sweepCronRunSessions({
sessionStorePath: storePath,
nowMs: now,
log,
force: true,
});
expect(result.pruned).toBe(1);
expect(fs.existsSync(runTranscript)).toBe(false);
const files = fs.readdirSync(tmpDir);
expect(files.some((name) => name.startsWith(`${runSessionId}.jsonl.deleted.`))).toBe(true);
});
it("does not archive external transcript paths for pruned runs", async () => {
const now = Date.now();
const externalDir = fs.mkdtempSync(path.join(os.tmpdir(), "cron-reaper-external-"));
const externalTranscript = path.join(externalDir, "outside.jsonl");
fs.writeFileSync(externalTranscript, '{"type":"session"}\n');
const store: Record<string, { sessionId: string; sessionFile?: string; updatedAt: number }> = {
"agent:main:cron:job1:run:old-run": {
sessionId: "old-run",
sessionFile: externalTranscript,
updatedAt: now - 25 * 3_600_000,
},
};
fs.writeFileSync(storePath, JSON.stringify(store));
try {
const result = await sweepCronRunSessions({
sessionStorePath: storePath,
nowMs: now,
log,
force: true,
});
expect(result.pruned).toBe(1);
expect(fs.existsSync(externalTranscript)).toBe(true);
} finally {
fs.rmSync(externalDir, { recursive: true, force: true });
}
});
it("respects custom retention", async () => {
const now = Date.now();
const store: Record<string, { sessionId: string; updatedAt: number }> = {

View File

@@ -6,9 +6,14 @@
* run records. The base session (`...:cron:<jobId>`) is kept as-is.
*/
import path from "node:path";
import { parseDurationMs } from "../cli/parse-duration.js";
import { updateSessionStore } from "../config/sessions.js";
import { loadSessionStore, updateSessionStore } from "../config/sessions.js";
import type { CronConfig } from "../config/types.cron.js";
import {
archiveSessionTranscripts,
cleanupArchivedSessionTranscripts,
} from "../gateway/session-utils.fs.js";
import { isCronRunSessionKey } from "../sessions/session-key-utils.js";
import type { Logger } from "./service/state.js";
@@ -74,6 +79,7 @@ export async function sweepCronRunSessions(params: {
}
let pruned = 0;
const prunedSessions = new Map<string, string | undefined>();
try {
await updateSessionStore(storePath, (store) => {
const cutoff = now - retentionMs;
@@ -87,6 +93,9 @@ export async function sweepCronRunSessions(params: {
}
const updatedAt = entry.updatedAt ?? 0;
if (updatedAt < cutoff) {
if (!prunedSessions.has(entry.sessionId) || entry.sessionFile) {
prunedSessions.set(entry.sessionId, entry.sessionFile);
}
delete store[key];
pruned++;
}
@@ -99,6 +108,43 @@ export async function sweepCronRunSessions(params: {
lastSweepAtMsByStore.set(storePath, now);
if (prunedSessions.size > 0) {
try {
const store = loadSessionStore(storePath, { skipCache: true });
const referencedSessionIds = new Set(
Object.values(store)
.map((entry) => entry?.sessionId)
.filter((id): id is string => Boolean(id)),
);
const archivedDirs = new Set<string>();
for (const [sessionId, sessionFile] of prunedSessions) {
if (referencedSessionIds.has(sessionId)) {
continue;
}
const archived = archiveSessionTranscripts({
sessionId,
storePath,
sessionFile,
reason: "deleted",
restrictToStoreDir: true,
});
for (const archivedPath of archived) {
archivedDirs.add(path.dirname(archivedPath));
}
}
if (archivedDirs.size > 0) {
await cleanupArchivedSessionTranscripts({
directories: [...archivedDirs],
olderThanMs: retentionMs,
reason: "deleted",
nowMs: now,
});
}
} catch (err) {
params.log.warn({ err: String(err) }, "cron-reaper: transcript cleanup failed");
}
}
if (pruned > 0) {
params.log.info(
{ pruned, retentionMs },