mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 13:41:37 +00:00
refactor(session): consolidate transcript snapshot reads
This commit is contained in:
@@ -128,23 +128,72 @@ function resolveSessionLogPath(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function readSessionLogByteSize(
|
function deriveTranscriptUsageSnapshot(
|
||||||
sessionId?: string,
|
usage: ReturnType<typeof normalizeUsage> | undefined,
|
||||||
sessionEntry?: SessionEntry,
|
): SessionTranscriptUsageSnapshot | undefined {
|
||||||
sessionKey?: string,
|
if (!usage) {
|
||||||
opts?: { storePath?: string },
|
return undefined;
|
||||||
): Promise<number | undefined> {
|
}
|
||||||
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
|
const promptTokens = derivePromptTokens(usage);
|
||||||
|
const outputRaw = usage.output;
|
||||||
|
const outputTokens =
|
||||||
|
typeof outputRaw === "number" && Number.isFinite(outputRaw) && outputRaw > 0
|
||||||
|
? outputRaw
|
||||||
|
: undefined;
|
||||||
|
if (!(typeof promptTokens === "number") && !(typeof outputTokens === "number")) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
promptTokens,
|
||||||
|
outputTokens,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
type SessionLogSnapshot = {
|
||||||
|
byteSize?: number;
|
||||||
|
usage?: SessionTranscriptUsageSnapshot;
|
||||||
|
};
|
||||||
|
|
||||||
|
async function readSessionLogSnapshot(params: {
|
||||||
|
sessionId?: string;
|
||||||
|
sessionEntry?: SessionEntry;
|
||||||
|
sessionKey?: string;
|
||||||
|
opts?: { storePath?: string };
|
||||||
|
includeByteSize: boolean;
|
||||||
|
includeUsage: boolean;
|
||||||
|
}): Promise<SessionLogSnapshot> {
|
||||||
|
const logPath = resolveSessionLogPath(
|
||||||
|
params.sessionId,
|
||||||
|
params.sessionEntry,
|
||||||
|
params.sessionKey,
|
||||||
|
params.opts,
|
||||||
|
);
|
||||||
if (!logPath) {
|
if (!logPath) {
|
||||||
return undefined;
|
return {};
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
const stat = await fs.promises.stat(logPath);
|
const snapshot: SessionLogSnapshot = {};
|
||||||
const size = Math.floor(stat.size);
|
|
||||||
return Number.isFinite(size) && size >= 0 ? size : undefined;
|
if (params.includeByteSize) {
|
||||||
} catch {
|
try {
|
||||||
return undefined;
|
const stat = await fs.promises.stat(logPath);
|
||||||
|
const size = Math.floor(stat.size);
|
||||||
|
snapshot.byteSize = Number.isFinite(size) && size >= 0 ? size : undefined;
|
||||||
|
} catch {
|
||||||
|
snapshot.byteSize = undefined;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (params.includeUsage) {
|
||||||
|
try {
|
||||||
|
const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath);
|
||||||
|
snapshot.usage = deriveTranscriptUsageSnapshot(lastUsage);
|
||||||
|
} catch {
|
||||||
|
snapshot.usage = undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function readLastNonzeroUsageFromSessionLog(logPath: string) {
|
async function readLastNonzeroUsageFromSessionLog(logPath: string) {
|
||||||
@@ -185,35 +234,15 @@ export async function readPromptTokensFromSessionLog(
|
|||||||
sessionKey?: string,
|
sessionKey?: string,
|
||||||
opts?: { storePath?: string },
|
opts?: { storePath?: string },
|
||||||
): Promise<SessionTranscriptUsageSnapshot | undefined> {
|
): Promise<SessionTranscriptUsageSnapshot | undefined> {
|
||||||
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
|
const snapshot = await readSessionLogSnapshot({
|
||||||
if (!logPath) {
|
sessionId,
|
||||||
return undefined;
|
sessionEntry,
|
||||||
}
|
sessionKey,
|
||||||
|
opts,
|
||||||
try {
|
includeByteSize: false,
|
||||||
const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath);
|
includeUsage: true,
|
||||||
if (!lastUsage) {
|
});
|
||||||
return undefined;
|
return snapshot.usage;
|
||||||
}
|
|
||||||
|
|
||||||
const promptTokens = derivePromptTokens(lastUsage);
|
|
||||||
const outputRaw = lastUsage.output;
|
|
||||||
const outputTokens =
|
|
||||||
typeof outputRaw === "number" && Number.isFinite(outputRaw) && outputRaw > 0
|
|
||||||
? outputRaw
|
|
||||||
: undefined;
|
|
||||||
|
|
||||||
if (!(typeof promptTokens === "number") && !(typeof outputTokens === "number")) {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
promptTokens,
|
|
||||||
outputTokens,
|
|
||||||
};
|
|
||||||
} catch {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function runMemoryFlushIfNeeded(params: {
|
export async function runMemoryFlushIfNeeded(params: {
|
||||||
@@ -294,34 +323,33 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||||||
(persistedPromptTokens ?? 0) + promptTokenEstimate >=
|
(persistedPromptTokens ?? 0) + promptTokenEstimate >=
|
||||||
flushThreshold - TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS;
|
flushThreshold - TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS;
|
||||||
|
|
||||||
const shouldReadTranscript =
|
const shouldReadTranscript = Boolean(
|
||||||
canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput);
|
canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput),
|
||||||
|
);
|
||||||
|
|
||||||
const forceFlushTranscriptBytes = memoryFlushSettings.forceFlushTranscriptBytes;
|
const forceFlushTranscriptBytes = memoryFlushSettings.forceFlushTranscriptBytes;
|
||||||
const shouldCheckTranscriptSizeForForcedFlush =
|
const shouldCheckTranscriptSizeForForcedFlush = Boolean(
|
||||||
canAttemptFlush &&
|
canAttemptFlush &&
|
||||||
entry &&
|
entry &&
|
||||||
Number.isFinite(forceFlushTranscriptBytes) &&
|
Number.isFinite(forceFlushTranscriptBytes) &&
|
||||||
forceFlushTranscriptBytes > 0;
|
forceFlushTranscriptBytes > 0,
|
||||||
const transcriptByteSize = shouldCheckTranscriptSizeForForcedFlush
|
);
|
||||||
? await readSessionLogByteSize(
|
const shouldReadSessionLog = shouldReadTranscript || shouldCheckTranscriptSizeForForcedFlush;
|
||||||
params.followupRun.run.sessionId,
|
const sessionLogSnapshot = shouldReadSessionLog
|
||||||
entry,
|
? await readSessionLogSnapshot({
|
||||||
params.sessionKey ?? params.followupRun.run.sessionKey,
|
sessionId: params.followupRun.run.sessionId,
|
||||||
{ storePath: params.storePath },
|
sessionEntry: entry,
|
||||||
)
|
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
|
||||||
|
opts: { storePath: params.storePath },
|
||||||
|
includeByteSize: shouldCheckTranscriptSizeForForcedFlush,
|
||||||
|
includeUsage: shouldReadTranscript,
|
||||||
|
})
|
||||||
: undefined;
|
: undefined;
|
||||||
|
const transcriptByteSize = sessionLogSnapshot?.byteSize;
|
||||||
const shouldForceFlushByTranscriptSize =
|
const shouldForceFlushByTranscriptSize =
|
||||||
typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes;
|
typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes;
|
||||||
|
|
||||||
const transcriptUsageSnapshot = shouldReadTranscript
|
const transcriptUsageSnapshot = sessionLogSnapshot?.usage;
|
||||||
? await readPromptTokensFromSessionLog(
|
|
||||||
params.followupRun.run.sessionId,
|
|
||||||
entry,
|
|
||||||
params.sessionKey ?? params.followupRun.run.sessionKey,
|
|
||||||
{ storePath: params.storePath },
|
|
||||||
)
|
|
||||||
: undefined;
|
|
||||||
const transcriptPromptTokens = transcriptUsageSnapshot?.promptTokens;
|
const transcriptPromptTokens = transcriptUsageSnapshot?.promptTokens;
|
||||||
const transcriptOutputTokens = transcriptUsageSnapshot?.outputTokens;
|
const transcriptOutputTokens = transcriptUsageSnapshot?.outputTokens;
|
||||||
const hasReliableTranscriptPromptTokens =
|
const hasReliableTranscriptPromptTokens =
|
||||||
|
|||||||
Reference in New Issue
Block a user