Cron: drain pending writes before reading run log (#25416)

* Cron: drain pending writes before reading run log

* Retrigger CI
This commit is contained in:
Aleksandrs Tihenko
2026-03-01 15:04:04 +02:00
committed by GitHub
parent 29a55948d6
commit 0cc46589ac
2 changed files with 37 additions and 0 deletions

View File

@@ -103,6 +103,14 @@ export function getPendingCronRunLogWriteCountForTests() {
return writesByPath.size;
}
async function drainPendingWrite(filePath: string): Promise<void> {
const resolved = path.resolve(filePath);
const pending = writesByPath.get(resolved);
if (pending) {
await pending.catch(() => undefined);
}
}
async function pruneIfNeeded(filePath: string, opts: { maxBytes: number; keepLines: number }) {
const stat = await fs.stat(filePath).catch(() => null);
if (!stat || stat.size <= opts.maxBytes) {
@@ -152,6 +160,7 @@ export async function readCronRunLogEntries(
filePath: string,
opts?: { limit?: number; jobId?: string },
): Promise<CronRunLogEntry[]> {
await drainPendingWrite(filePath);
const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200)));
const page = await readCronRunLogEntriesPage(filePath, {
jobId: opts?.jobId,
@@ -334,6 +343,7 @@ export async function readCronRunLogEntriesPage(
filePath: string,
opts?: ReadCronRunLogPageOptions,
): Promise<CronRunLogPageResult> {
await drainPendingWrite(filePath);
const limit = Math.max(1, Math.min(200, Math.floor(opts?.limit ?? 50)));
const raw = await fs.readFile(path.resolve(filePath), "utf-8").catch(() => "");
const statuses = normalizeRunStatuses(opts);
@@ -388,6 +398,7 @@ export async function readCronRunLogEntriesPageAll(
nextOffset: null,
};
}
await Promise.all(jsonlFiles.map((f) => drainPendingWrite(f)));
const chunks = await Promise.all(
jsonlFiles.map(async (filePath) => {
const raw = await fs.readFile(filePath, "utf-8").catch(() => "");