mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-07 05:11:36 +00:00
fix(session): harden usage accounting and memory flush recovery
This commit is contained in:
@@ -96,6 +96,57 @@ function parseUsageFromTranscriptLine(line: string): ReturnType<typeof normalize
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveSessionLogPath(
|
||||
sessionId?: string,
|
||||
sessionEntry?: SessionEntry,
|
||||
sessionKey?: string,
|
||||
opts?: { storePath?: string },
|
||||
): string | undefined {
|
||||
if (!sessionId) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const transcriptPath = (
|
||||
sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined
|
||||
)?.transcriptPath?.trim();
|
||||
const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath;
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
const pathOpts = resolveSessionFilePathOptions({
|
||||
agentId,
|
||||
storePath: opts?.storePath,
|
||||
});
|
||||
// Normalize sessionFile through resolveSessionFilePath so relative entries
|
||||
// are resolved against the sessions dir/store layout, not process.cwd().
|
||||
return resolveSessionFilePath(
|
||||
sessionId,
|
||||
sessionFile ? { sessionFile } : sessionEntry,
|
||||
pathOpts,
|
||||
);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
async function readSessionLogByteSize(
|
||||
sessionId?: string,
|
||||
sessionEntry?: SessionEntry,
|
||||
sessionKey?: string,
|
||||
opts?: { storePath?: string },
|
||||
): Promise<number | undefined> {
|
||||
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
|
||||
if (!logPath) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
const stat = await fs.promises.stat(logPath);
|
||||
const size = Math.floor(stat.size);
|
||||
return Number.isFinite(size) && size >= 0 ? size : undefined;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
async function readLastNonzeroUsageFromSessionLog(logPath: string) {
|
||||
const handle = await fs.promises.open(logPath, "r");
|
||||
try {
|
||||
@@ -134,28 +185,12 @@ export async function readPromptTokensFromSessionLog(
|
||||
sessionKey?: string,
|
||||
opts?: { storePath?: string },
|
||||
): Promise<SessionTranscriptUsageSnapshot | undefined> {
|
||||
if (!sessionId) {
|
||||
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
|
||||
if (!logPath) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const transcriptPath = (
|
||||
sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined
|
||||
)?.transcriptPath?.trim();
|
||||
const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath;
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
const pathOpts = resolveSessionFilePathOptions({
|
||||
agentId,
|
||||
storePath: opts?.storePath,
|
||||
});
|
||||
// Normalize sessionFile through resolveSessionFilePath so relative entries
|
||||
// are resolved against the sessions dir/store layout, not process.cwd().
|
||||
const logPath = resolveSessionFilePath(
|
||||
sessionId,
|
||||
sessionFile ? { sessionFile } : sessionEntry,
|
||||
pathOpts,
|
||||
);
|
||||
|
||||
const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath);
|
||||
if (!lastUsage) {
|
||||
return undefined;
|
||||
@@ -262,6 +297,23 @@ export async function runMemoryFlushIfNeeded(params: {
|
||||
const shouldReadTranscript =
|
||||
canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput);
|
||||
|
||||
const forceFlushTranscriptBytes = memoryFlushSettings.forceFlushTranscriptBytes;
|
||||
const shouldCheckTranscriptSizeForForcedFlush =
|
||||
canAttemptFlush &&
|
||||
entry &&
|
||||
Number.isFinite(forceFlushTranscriptBytes) &&
|
||||
forceFlushTranscriptBytes > 0;
|
||||
const transcriptByteSize = shouldCheckTranscriptSizeForForcedFlush
|
||||
? await readSessionLogByteSize(
|
||||
params.followupRun.run.sessionId,
|
||||
entry,
|
||||
params.sessionKey ?? params.followupRun.run.sessionKey,
|
||||
{ storePath: params.storePath },
|
||||
)
|
||||
: undefined;
|
||||
const shouldForceFlushByTranscriptSize =
|
||||
typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes;
|
||||
|
||||
const transcriptUsageSnapshot = shouldReadTranscript
|
||||
? await readPromptTokensFromSessionLog(
|
||||
params.followupRun.run.sessionId,
|
||||
@@ -341,21 +393,23 @@ export async function runMemoryFlushIfNeeded(params: {
|
||||
`compactionCount=${entry?.compactionCount ?? 0} memoryFlushCompactionCount=${entry?.memoryFlushCompactionCount ?? "undefined"} ` +
|
||||
`persistedPromptTokens=${persistedPromptTokens ?? "undefined"} persistedFresh=${entry?.totalTokensFresh === true} ` +
|
||||
`promptTokensEst=${promptTokenEstimate ?? "undefined"} transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} transcriptOutputTokens=${transcriptOutputTokens ?? "undefined"} ` +
|
||||
`projectedTokenCount=${projectedTokenCount ?? "undefined"}`,
|
||||
`projectedTokenCount=${projectedTokenCount ?? "undefined"} transcriptBytes=${transcriptByteSize ?? "undefined"} ` +
|
||||
`forceFlushTranscriptBytes=${forceFlushTranscriptBytes} forceFlushByTranscriptSize=${shouldForceFlushByTranscriptSize}`,
|
||||
);
|
||||
|
||||
const shouldFlushMemory =
|
||||
memoryFlushSettings &&
|
||||
memoryFlushWritable &&
|
||||
!params.isHeartbeat &&
|
||||
!isCli &&
|
||||
shouldRunMemoryFlush({
|
||||
entry,
|
||||
tokenCount: tokenCountForFlush,
|
||||
contextWindowTokens,
|
||||
reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
|
||||
softThresholdTokens: memoryFlushSettings.softThresholdTokens,
|
||||
});
|
||||
(memoryFlushSettings &&
|
||||
memoryFlushWritable &&
|
||||
!params.isHeartbeat &&
|
||||
!isCli &&
|
||||
shouldRunMemoryFlush({
|
||||
entry,
|
||||
tokenCount: tokenCountForFlush,
|
||||
contextWindowTokens,
|
||||
reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
|
||||
softThresholdTokens: memoryFlushSettings.softThresholdTokens,
|
||||
})) ||
|
||||
shouldForceFlushByTranscriptSize;
|
||||
|
||||
if (!shouldFlushMemory) {
|
||||
return entry ?? params.sessionEntry;
|
||||
|
||||
Reference in New Issue
Block a user