mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 18:04:32 +00:00
fix: harden session lock contention and cleanup
This commit is contained in:
@@ -77,6 +77,39 @@ describe("acquireSessionWriteLock", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("does not reclaim fresh malformed lock files during contention", async () => {
|
||||||
|
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
|
||||||
|
try {
|
||||||
|
const sessionFile = path.join(root, "sessions.json");
|
||||||
|
const lockPath = `${sessionFile}.lock`;
|
||||||
|
await fs.writeFile(lockPath, "{}", "utf8");
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
acquireSessionWriteLock({ sessionFile, timeoutMs: 50, staleMs: 60_000 }),
|
||||||
|
).rejects.toThrow(/session file locked/);
|
||||||
|
await expect(fs.access(lockPath)).resolves.toBeUndefined();
|
||||||
|
} finally {
|
||||||
|
await fs.rm(root, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("reclaims malformed lock files once they are old enough", async () => {
|
||||||
|
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
|
||||||
|
try {
|
||||||
|
const sessionFile = path.join(root, "sessions.json");
|
||||||
|
const lockPath = `${sessionFile}.lock`;
|
||||||
|
await fs.writeFile(lockPath, "{}", "utf8");
|
||||||
|
const staleDate = new Date(Date.now() - 2 * 60_000);
|
||||||
|
await fs.utimes(lockPath, staleDate, staleDate);
|
||||||
|
|
||||||
|
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10_000 });
|
||||||
|
await lock.release();
|
||||||
|
await expect(fs.access(lockPath)).rejects.toThrow();
|
||||||
|
} finally {
|
||||||
|
await fs.rm(root, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it("watchdog releases stale in-process locks", async () => {
|
it("watchdog releases stale in-process locks", async () => {
|
||||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
|
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
|
||||||
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
|
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
|
||||||
|
|||||||
@@ -52,6 +52,11 @@ type WatchdogState = {
|
|||||||
timer?: NodeJS.Timeout;
|
timer?: NodeJS.Timeout;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type LockInspectionDetails = Pick<
|
||||||
|
SessionLockInspection,
|
||||||
|
"pid" | "pidAlive" | "createdAt" | "ageMs" | "stale" | "staleReasons"
|
||||||
|
>;
|
||||||
|
|
||||||
const HELD_LOCKS = resolveProcessScopedMap<HeldLock>(HELD_LOCKS_KEY);
|
const HELD_LOCKS = resolveProcessScopedMap<HeldLock>(HELD_LOCKS_KEY);
|
||||||
|
|
||||||
function resolveCleanupState(): CleanupState {
|
function resolveCleanupState(): CleanupState {
|
||||||
@@ -281,10 +286,7 @@ function inspectLockPayload(
|
|||||||
payload: LockFilePayload | null,
|
payload: LockFilePayload | null,
|
||||||
staleMs: number,
|
staleMs: number,
|
||||||
nowMs: number,
|
nowMs: number,
|
||||||
): Pick<
|
): LockInspectionDetails {
|
||||||
SessionLockInspection,
|
|
||||||
"pid" | "pidAlive" | "createdAt" | "ageMs" | "stale" | "staleReasons"
|
|
||||||
> {
|
|
||||||
const pid = typeof payload?.pid === "number" ? payload.pid : null;
|
const pid = typeof payload?.pid === "number" ? payload.pid : null;
|
||||||
const pidAlive = pid !== null ? isPidAlive(pid) : false;
|
const pidAlive = pid !== null ? isPidAlive(pid) : false;
|
||||||
const createdAt = typeof payload?.createdAt === "string" ? payload.createdAt : null;
|
const createdAt = typeof payload?.createdAt === "string" ? payload.createdAt : null;
|
||||||
@@ -313,6 +315,37 @@ function inspectLockPayload(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function lockInspectionNeedsMtimeStaleFallback(details: LockInspectionDetails): boolean {
|
||||||
|
return (
|
||||||
|
details.stale &&
|
||||||
|
details.staleReasons.every(
|
||||||
|
(reason) => reason === "missing-pid" || reason === "invalid-createdAt",
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function shouldReclaimContendedLockFile(
|
||||||
|
lockPath: string,
|
||||||
|
details: LockInspectionDetails,
|
||||||
|
staleMs: number,
|
||||||
|
nowMs: number,
|
||||||
|
): Promise<boolean> {
|
||||||
|
if (!details.stale) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!lockInspectionNeedsMtimeStaleFallback(details)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const stat = await fs.stat(lockPath);
|
||||||
|
const ageMs = Math.max(0, nowMs - stat.mtimeMs);
|
||||||
|
return ageMs > staleMs;
|
||||||
|
} catch (error) {
|
||||||
|
const code = (error as { code?: string } | null)?.code;
|
||||||
|
return code !== "ENOENT";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function cleanStaleLockFiles(params: {
|
export async function cleanStaleLockFiles(params: {
|
||||||
sessionsDir: string;
|
sessionsDir: string;
|
||||||
staleMs?: number;
|
staleMs?: number;
|
||||||
@@ -410,8 +443,9 @@ export async function acquireSessionWriteLock(params: {
|
|||||||
let attempt = 0;
|
let attempt = 0;
|
||||||
while (Date.now() - startedAt < timeoutMs) {
|
while (Date.now() - startedAt < timeoutMs) {
|
||||||
attempt += 1;
|
attempt += 1;
|
||||||
|
let handle: fs.FileHandle | null = null;
|
||||||
try {
|
try {
|
||||||
const handle = await fs.open(lockPath, "wx");
|
handle = await fs.open(lockPath, "wx");
|
||||||
const createdAt = new Date().toISOString();
|
const createdAt = new Date().toISOString();
|
||||||
await handle.writeFile(JSON.stringify({ pid: process.pid, createdAt }, null, 2), "utf8");
|
await handle.writeFile(JSON.stringify({ pid: process.pid, createdAt }, null, 2), "utf8");
|
||||||
const createdHeld: HeldLock = {
|
const createdHeld: HeldLock = {
|
||||||
@@ -428,13 +462,26 @@ export async function acquireSessionWriteLock(params: {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
if (handle) {
|
||||||
|
try {
|
||||||
|
await handle.close();
|
||||||
|
} catch {
|
||||||
|
// Ignore cleanup errors on failed lock initialization.
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await fs.rm(lockPath, { force: true });
|
||||||
|
} catch {
|
||||||
|
// Ignore cleanup errors on failed lock initialization.
|
||||||
|
}
|
||||||
|
}
|
||||||
const code = (err as { code?: unknown }).code;
|
const code = (err as { code?: unknown }).code;
|
||||||
if (code !== "EEXIST") {
|
if (code !== "EEXIST") {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
const payload = await readLockPayload(lockPath);
|
const payload = await readLockPayload(lockPath);
|
||||||
const inspected = inspectLockPayload(payload, staleMs, Date.now());
|
const nowMs = Date.now();
|
||||||
if (inspected.stale) {
|
const inspected = inspectLockPayload(payload, staleMs, nowMs);
|
||||||
|
if (await shouldReclaimContendedLockFile(lockPath, inspected, staleMs, nowMs)) {
|
||||||
await fs.rm(lockPath, { force: true });
|
await fs.rm(lockPath, { force: true });
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user