mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-22 07:48:10 +00:00
* fix(acp): implicit streamToParent for mode=run without thread When spawning ACP sessions with mode=run and no thread binding, automatically route output to parent session instead of Discord. This enables agent-to-agent supervision patterns where the spawning agent wants results returned programmatically, not posted as chat. The change makes sessions_spawn with runtime=acp and thread=false behave like direct acpx invocation - output goes to the spawning session, not to Discord. Fixes the issue where mode=run without thread still posted to Discord because hasDeliveryTarget was true when called from a Discord context. * fix: use resolved spawnMode instead of params.mode Move implicit streamToParent check to after resolveSpawnMode so that both explicit mode="run" and omitted mode (which defaults to "run" when thread is false) correctly trigger parent routing. This fixes the issue where callers that rely on default mode selection would not get the intended parent streaming behavior. * fix: tighten implicit ACP parent relay gating (#42404) (thanks @davidguttman) --------- Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com>
1247 lines
40 KiB
TypeScript
1247 lines
40 KiB
TypeScript
import fs from "node:fs/promises";
|
|
import path from "node:path";
|
|
import {
|
|
resolveAgentConfig,
|
|
resolveAgentWorkspaceDir,
|
|
resolveDefaultAgentId,
|
|
} from "../agents/agent-scope.js";
|
|
import { appendCronStyleCurrentTimeLine } from "../agents/current-time.js";
|
|
import { resolveEffectiveMessagesConfig } from "../agents/identity.js";
|
|
import { DEFAULT_HEARTBEAT_FILENAME } from "../agents/workspace.js";
|
|
import { resolveHeartbeatReplyPayload } from "../auto-reply/heartbeat-reply-payload.js";
|
|
import {
|
|
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
|
DEFAULT_HEARTBEAT_EVERY,
|
|
isHeartbeatContentEffectivelyEmpty,
|
|
resolveHeartbeatPrompt as resolveHeartbeatPromptText,
|
|
stripHeartbeatToken,
|
|
} from "../auto-reply/heartbeat.js";
|
|
import { getReplyFromConfig } from "../auto-reply/reply.js";
|
|
import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js";
|
|
import type { ReplyPayload } from "../auto-reply/types.js";
|
|
import { getChannelPlugin } from "../channels/plugins/index.js";
|
|
import type { ChannelHeartbeatDeps } from "../channels/plugins/types.js";
|
|
import { parseDurationMs } from "../cli/parse-duration.js";
|
|
import type { OpenClawConfig } from "../config/config.js";
|
|
import { loadConfig } from "../config/config.js";
|
|
import {
|
|
canonicalizeMainSessionAlias,
|
|
loadSessionStore,
|
|
resolveAgentIdFromSessionKey,
|
|
resolveAgentMainSessionKey,
|
|
resolveSessionFilePath,
|
|
resolveStorePath,
|
|
saveSessionStore,
|
|
updateSessionStore,
|
|
} from "../config/sessions.js";
|
|
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
|
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
|
import { getQueueSize } from "../process/command-queue.js";
|
|
import { CommandLane } from "../process/lanes.js";
|
|
import {
|
|
normalizeAgentId,
|
|
parseAgentSessionKey,
|
|
toAgentStoreSessionKey,
|
|
} from "../routing/session-key.js";
|
|
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
|
import { escapeRegExp } from "../utils.js";
|
|
import { formatErrorMessage, hasErrnoCode } from "./errors.js";
|
|
import { isWithinActiveHours } from "./heartbeat-active-hours.js";
|
|
import {
|
|
buildExecEventPrompt,
|
|
buildCronEventPrompt,
|
|
isCronSystemEvent,
|
|
isExecCompletionEvent,
|
|
} from "./heartbeat-events-filter.js";
|
|
import { emitHeartbeatEvent, resolveIndicatorType } from "./heartbeat-events.js";
|
|
import { resolveHeartbeatReasonKind } from "./heartbeat-reason.js";
|
|
import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js";
|
|
import {
|
|
areHeartbeatsEnabled,
|
|
type HeartbeatRunResult,
|
|
type HeartbeatWakeHandler,
|
|
requestHeartbeatNow,
|
|
setHeartbeatsEnabled,
|
|
setHeartbeatWakeHandler,
|
|
} from "./heartbeat-wake.js";
|
|
import type { OutboundSendDeps } from "./outbound/deliver.js";
|
|
import { deliverOutboundPayloads } from "./outbound/deliver.js";
|
|
import { buildOutboundSessionContext } from "./outbound/session-context.js";
|
|
import {
|
|
resolveHeartbeatDeliveryTarget,
|
|
resolveHeartbeatSenderContext,
|
|
} from "./outbound/targets.js";
|
|
import { peekSystemEventEntries } from "./system-events.js";
|
|
|
|
export type HeartbeatDeps = OutboundSendDeps &
|
|
ChannelHeartbeatDeps & {
|
|
runtime?: RuntimeEnv;
|
|
getQueueSize?: (lane?: string) => number;
|
|
nowMs?: () => number;
|
|
};
|
|
|
|
const log = createSubsystemLogger("gateway/heartbeat");
|
|
|
|
export { areHeartbeatsEnabled, setHeartbeatsEnabled };
|
|
|
|
type HeartbeatConfig = AgentDefaultsConfig["heartbeat"];
|
|
type HeartbeatAgent = {
|
|
agentId: string;
|
|
heartbeat?: HeartbeatConfig;
|
|
};
|
|
|
|
export type HeartbeatSummary = {
|
|
enabled: boolean;
|
|
every: string;
|
|
everyMs: number | null;
|
|
prompt: string;
|
|
target: string;
|
|
model?: string;
|
|
ackMaxChars: number;
|
|
};
|
|
|
|
const DEFAULT_HEARTBEAT_TARGET = "none";
|
|
export { isCronSystemEvent };
|
|
|
|
type HeartbeatAgentState = {
|
|
agentId: string;
|
|
heartbeat?: HeartbeatConfig;
|
|
intervalMs: number;
|
|
lastRunMs?: number;
|
|
nextDueMs: number;
|
|
};
|
|
|
|
export type HeartbeatRunner = {
|
|
stop: () => void;
|
|
updateConfig: (cfg: OpenClawConfig) => void;
|
|
};
|
|
|
|
function hasExplicitHeartbeatAgents(cfg: OpenClawConfig) {
|
|
const list = cfg.agents?.list ?? [];
|
|
return list.some((entry) => Boolean(entry?.heartbeat));
|
|
}
|
|
|
|
export function isHeartbeatEnabledForAgent(cfg: OpenClawConfig, agentId?: string): boolean {
|
|
const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg));
|
|
const list = cfg.agents?.list ?? [];
|
|
const hasExplicit = hasExplicitHeartbeatAgents(cfg);
|
|
if (hasExplicit) {
|
|
return list.some(
|
|
(entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === resolvedAgentId,
|
|
);
|
|
}
|
|
return resolvedAgentId === resolveDefaultAgentId(cfg);
|
|
}
|
|
|
|
function resolveHeartbeatConfig(
|
|
cfg: OpenClawConfig,
|
|
agentId?: string,
|
|
): HeartbeatConfig | undefined {
|
|
const defaults = cfg.agents?.defaults?.heartbeat;
|
|
if (!agentId) {
|
|
return defaults;
|
|
}
|
|
const overrides = resolveAgentConfig(cfg, agentId)?.heartbeat;
|
|
if (!defaults && !overrides) {
|
|
return overrides;
|
|
}
|
|
return { ...defaults, ...overrides };
|
|
}
|
|
|
|
export function resolveHeartbeatSummaryForAgent(
|
|
cfg: OpenClawConfig,
|
|
agentId?: string,
|
|
): HeartbeatSummary {
|
|
const defaults = cfg.agents?.defaults?.heartbeat;
|
|
const overrides = agentId ? resolveAgentConfig(cfg, agentId)?.heartbeat : undefined;
|
|
const enabled = isHeartbeatEnabledForAgent(cfg, agentId);
|
|
|
|
if (!enabled) {
|
|
return {
|
|
enabled: false,
|
|
every: "disabled",
|
|
everyMs: null,
|
|
prompt: resolveHeartbeatPromptText(defaults?.prompt),
|
|
target: defaults?.target ?? DEFAULT_HEARTBEAT_TARGET,
|
|
model: defaults?.model,
|
|
ackMaxChars: Math.max(0, defaults?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS),
|
|
};
|
|
}
|
|
|
|
const merged = defaults || overrides ? { ...defaults, ...overrides } : undefined;
|
|
const every = merged?.every ?? defaults?.every ?? overrides?.every ?? DEFAULT_HEARTBEAT_EVERY;
|
|
const everyMs = resolveHeartbeatIntervalMs(cfg, undefined, merged);
|
|
const prompt = resolveHeartbeatPromptText(
|
|
merged?.prompt ?? defaults?.prompt ?? overrides?.prompt,
|
|
);
|
|
const target =
|
|
merged?.target ?? defaults?.target ?? overrides?.target ?? DEFAULT_HEARTBEAT_TARGET;
|
|
const model = merged?.model ?? defaults?.model ?? overrides?.model;
|
|
const ackMaxChars = Math.max(
|
|
0,
|
|
merged?.ackMaxChars ??
|
|
defaults?.ackMaxChars ??
|
|
overrides?.ackMaxChars ??
|
|
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
|
);
|
|
|
|
return {
|
|
enabled: true,
|
|
every,
|
|
everyMs,
|
|
prompt,
|
|
target,
|
|
model,
|
|
ackMaxChars,
|
|
};
|
|
}
|
|
|
|
function resolveHeartbeatAgents(cfg: OpenClawConfig): HeartbeatAgent[] {
|
|
const list = cfg.agents?.list ?? [];
|
|
if (hasExplicitHeartbeatAgents(cfg)) {
|
|
return list
|
|
.filter((entry) => entry?.heartbeat)
|
|
.map((entry) => {
|
|
const id = normalizeAgentId(entry.id);
|
|
return { agentId: id, heartbeat: resolveHeartbeatConfig(cfg, id) };
|
|
})
|
|
.filter((entry) => entry.agentId);
|
|
}
|
|
const fallbackId = resolveDefaultAgentId(cfg);
|
|
return [{ agentId: fallbackId, heartbeat: resolveHeartbeatConfig(cfg, fallbackId) }];
|
|
}
|
|
|
|
export function resolveHeartbeatIntervalMs(
|
|
cfg: OpenClawConfig,
|
|
overrideEvery?: string,
|
|
heartbeat?: HeartbeatConfig,
|
|
) {
|
|
const raw =
|
|
overrideEvery ??
|
|
heartbeat?.every ??
|
|
cfg.agents?.defaults?.heartbeat?.every ??
|
|
DEFAULT_HEARTBEAT_EVERY;
|
|
if (!raw) {
|
|
return null;
|
|
}
|
|
const trimmed = String(raw).trim();
|
|
if (!trimmed) {
|
|
return null;
|
|
}
|
|
let ms: number;
|
|
try {
|
|
ms = parseDurationMs(trimmed, { defaultUnit: "m" });
|
|
} catch {
|
|
return null;
|
|
}
|
|
if (ms <= 0) {
|
|
return null;
|
|
}
|
|
return ms;
|
|
}
|
|
|
|
export function resolveHeartbeatPrompt(cfg: OpenClawConfig, heartbeat?: HeartbeatConfig) {
|
|
return resolveHeartbeatPromptText(heartbeat?.prompt ?? cfg.agents?.defaults?.heartbeat?.prompt);
|
|
}
|
|
|
|
function resolveHeartbeatAckMaxChars(cfg: OpenClawConfig, heartbeat?: HeartbeatConfig) {
|
|
return Math.max(
|
|
0,
|
|
heartbeat?.ackMaxChars ??
|
|
cfg.agents?.defaults?.heartbeat?.ackMaxChars ??
|
|
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
|
);
|
|
}
|
|
|
|
function resolveHeartbeatSession(
|
|
cfg: OpenClawConfig,
|
|
agentId?: string,
|
|
heartbeat?: HeartbeatConfig,
|
|
forcedSessionKey?: string,
|
|
) {
|
|
const sessionCfg = cfg.session;
|
|
const scope = sessionCfg?.scope ?? "per-sender";
|
|
const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg));
|
|
const mainSessionKey =
|
|
scope === "global" ? "global" : resolveAgentMainSessionKey({ cfg, agentId: resolvedAgentId });
|
|
const storeAgentId = scope === "global" ? resolveDefaultAgentId(cfg) : resolvedAgentId;
|
|
const storePath = resolveStorePath(sessionCfg?.store, {
|
|
agentId: storeAgentId,
|
|
});
|
|
const store = loadSessionStore(storePath);
|
|
const mainEntry = store[mainSessionKey];
|
|
|
|
if (scope === "global") {
|
|
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
|
}
|
|
|
|
const forced = forcedSessionKey?.trim();
|
|
if (forced) {
|
|
const forcedCandidate = toAgentStoreSessionKey({
|
|
agentId: resolvedAgentId,
|
|
requestKey: forced,
|
|
mainKey: cfg.session?.mainKey,
|
|
});
|
|
const forcedCanonical = canonicalizeMainSessionAlias({
|
|
cfg,
|
|
agentId: resolvedAgentId,
|
|
sessionKey: forcedCandidate,
|
|
});
|
|
if (forcedCanonical !== "global") {
|
|
const sessionAgentId = resolveAgentIdFromSessionKey(forcedCanonical);
|
|
if (sessionAgentId === normalizeAgentId(resolvedAgentId)) {
|
|
return {
|
|
sessionKey: forcedCanonical,
|
|
storePath,
|
|
store,
|
|
entry: store[forcedCanonical],
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
const trimmed = heartbeat?.session?.trim() ?? "";
|
|
if (!trimmed) {
|
|
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
|
}
|
|
|
|
const normalized = trimmed.toLowerCase();
|
|
if (normalized === "main" || normalized === "global") {
|
|
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
|
}
|
|
|
|
const candidate = toAgentStoreSessionKey({
|
|
agentId: resolvedAgentId,
|
|
requestKey: trimmed,
|
|
mainKey: cfg.session?.mainKey,
|
|
});
|
|
const canonical = canonicalizeMainSessionAlias({
|
|
cfg,
|
|
agentId: resolvedAgentId,
|
|
sessionKey: candidate,
|
|
});
|
|
if (canonical !== "global") {
|
|
const sessionAgentId = resolveAgentIdFromSessionKey(canonical);
|
|
if (sessionAgentId === normalizeAgentId(resolvedAgentId)) {
|
|
return {
|
|
sessionKey: canonical,
|
|
storePath,
|
|
store,
|
|
entry: store[canonical],
|
|
};
|
|
}
|
|
}
|
|
|
|
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
|
}
|
|
|
|
function resolveHeartbeatReasoningPayloads(
|
|
replyResult: ReplyPayload | ReplyPayload[] | undefined,
|
|
): ReplyPayload[] {
|
|
const payloads = Array.isArray(replyResult) ? replyResult : replyResult ? [replyResult] : [];
|
|
return payloads.filter((payload) => {
|
|
const text = typeof payload.text === "string" ? payload.text : "";
|
|
return text.trimStart().startsWith("Reasoning:");
|
|
});
|
|
}
|
|
|
|
async function restoreHeartbeatUpdatedAt(params: {
|
|
storePath: string;
|
|
sessionKey: string;
|
|
updatedAt?: number;
|
|
}) {
|
|
const { storePath, sessionKey, updatedAt } = params;
|
|
if (typeof updatedAt !== "number") {
|
|
return;
|
|
}
|
|
const store = loadSessionStore(storePath);
|
|
const entry = store[sessionKey];
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
const nextUpdatedAt = Math.max(entry.updatedAt ?? 0, updatedAt);
|
|
if (entry.updatedAt === nextUpdatedAt) {
|
|
return;
|
|
}
|
|
await updateSessionStore(storePath, (nextStore) => {
|
|
const nextEntry = nextStore[sessionKey] ?? entry;
|
|
if (!nextEntry) {
|
|
return;
|
|
}
|
|
const resolvedUpdatedAt = Math.max(nextEntry.updatedAt ?? 0, updatedAt);
|
|
if (nextEntry.updatedAt === resolvedUpdatedAt) {
|
|
return;
|
|
}
|
|
nextStore[sessionKey] = { ...nextEntry, updatedAt: resolvedUpdatedAt };
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Prune heartbeat transcript entries by truncating the file back to a previous size.
|
|
* This removes the user+assistant turns that were written during a HEARTBEAT_OK run,
|
|
* preventing context pollution from zero-information exchanges.
|
|
*/
|
|
async function pruneHeartbeatTranscript(params: {
|
|
transcriptPath?: string;
|
|
preHeartbeatSize?: number;
|
|
}) {
|
|
const { transcriptPath, preHeartbeatSize } = params;
|
|
if (!transcriptPath || typeof preHeartbeatSize !== "number" || preHeartbeatSize < 0) {
|
|
return;
|
|
}
|
|
try {
|
|
const stat = await fs.stat(transcriptPath);
|
|
// Only truncate if the file has grown during the heartbeat run
|
|
if (stat.size > preHeartbeatSize) {
|
|
await fs.truncate(transcriptPath, preHeartbeatSize);
|
|
}
|
|
} catch {
|
|
// File may not exist or may have been removed - ignore errors
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the transcript file path and its current size before a heartbeat run.
|
|
* Returns undefined values if the session or transcript doesn't exist yet.
|
|
*/
|
|
async function captureTranscriptState(params: {
|
|
storePath: string;
|
|
sessionKey: string;
|
|
agentId?: string;
|
|
}): Promise<{ transcriptPath?: string; preHeartbeatSize?: number }> {
|
|
const { storePath, sessionKey, agentId } = params;
|
|
try {
|
|
const store = loadSessionStore(storePath);
|
|
const entry = store[sessionKey];
|
|
if (!entry?.sessionId) {
|
|
return {};
|
|
}
|
|
const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, {
|
|
agentId,
|
|
sessionsDir: path.dirname(storePath),
|
|
});
|
|
const stat = await fs.stat(transcriptPath);
|
|
return { transcriptPath, preHeartbeatSize: stat.size };
|
|
} catch {
|
|
// Session or transcript doesn't exist yet - nothing to prune
|
|
return {};
|
|
}
|
|
}
|
|
|
|
function stripLeadingHeartbeatResponsePrefix(
|
|
text: string,
|
|
responsePrefix: string | undefined,
|
|
): string {
|
|
const normalizedPrefix = responsePrefix?.trim();
|
|
if (!normalizedPrefix) {
|
|
return text;
|
|
}
|
|
|
|
// Require a boundary after the configured prefix so short prefixes like "Hi"
|
|
// do not strip the beginning of normal words like "History".
|
|
const prefixPattern = new RegExp(
|
|
`^${escapeRegExp(normalizedPrefix)}(?=$|\\s|[\\p{P}\\p{S}])\\s*`,
|
|
"iu",
|
|
);
|
|
return text.replace(prefixPattern, "");
|
|
}
|
|
|
|
function normalizeHeartbeatReply(
|
|
payload: ReplyPayload,
|
|
responsePrefix: string | undefined,
|
|
ackMaxChars: number,
|
|
) {
|
|
const rawText = typeof payload.text === "string" ? payload.text : "";
|
|
const textForStrip = stripLeadingHeartbeatResponsePrefix(rawText, responsePrefix);
|
|
const stripped = stripHeartbeatToken(textForStrip, {
|
|
mode: "heartbeat",
|
|
maxAckChars: ackMaxChars,
|
|
});
|
|
const hasMedia = Boolean(payload.mediaUrl || (payload.mediaUrls?.length ?? 0) > 0);
|
|
if (stripped.shouldSkip && !hasMedia) {
|
|
return {
|
|
shouldSkip: true,
|
|
text: "",
|
|
hasMedia,
|
|
};
|
|
}
|
|
let finalText = stripped.text;
|
|
if (responsePrefix && finalText && !finalText.startsWith(responsePrefix)) {
|
|
finalText = `${responsePrefix} ${finalText}`;
|
|
}
|
|
return { shouldSkip: false, text: finalText, hasMedia };
|
|
}
|
|
|
|
type HeartbeatReasonFlags = {
|
|
isExecEventReason: boolean;
|
|
isCronEventReason: boolean;
|
|
isWakeReason: boolean;
|
|
};
|
|
|
|
type HeartbeatSkipReason = "empty-heartbeat-file";
|
|
|
|
type HeartbeatPreflight = HeartbeatReasonFlags & {
|
|
session: ReturnType<typeof resolveHeartbeatSession>;
|
|
pendingEventEntries: ReturnType<typeof peekSystemEventEntries>;
|
|
hasTaggedCronEvents: boolean;
|
|
shouldInspectPendingEvents: boolean;
|
|
skipReason?: HeartbeatSkipReason;
|
|
};
|
|
|
|
function resolveHeartbeatReasonFlags(reason?: string): HeartbeatReasonFlags {
|
|
const reasonKind = resolveHeartbeatReasonKind(reason);
|
|
return {
|
|
isExecEventReason: reasonKind === "exec-event",
|
|
isCronEventReason: reasonKind === "cron",
|
|
isWakeReason: reasonKind === "wake" || reasonKind === "hook",
|
|
};
|
|
}
|
|
|
|
async function resolveHeartbeatPreflight(params: {
|
|
cfg: OpenClawConfig;
|
|
agentId: string;
|
|
heartbeat?: HeartbeatConfig;
|
|
forcedSessionKey?: string;
|
|
reason?: string;
|
|
}): Promise<HeartbeatPreflight> {
|
|
const reasonFlags = resolveHeartbeatReasonFlags(params.reason);
|
|
const session = resolveHeartbeatSession(
|
|
params.cfg,
|
|
params.agentId,
|
|
params.heartbeat,
|
|
params.forcedSessionKey,
|
|
);
|
|
const pendingEventEntries = peekSystemEventEntries(session.sessionKey);
|
|
const hasTaggedCronEvents = pendingEventEntries.some((event) =>
|
|
event.contextKey?.startsWith("cron:"),
|
|
);
|
|
const shouldInspectPendingEvents =
|
|
reasonFlags.isExecEventReason || reasonFlags.isCronEventReason || hasTaggedCronEvents;
|
|
const shouldBypassFileGates =
|
|
reasonFlags.isExecEventReason ||
|
|
reasonFlags.isCronEventReason ||
|
|
reasonFlags.isWakeReason ||
|
|
hasTaggedCronEvents;
|
|
const basePreflight = {
|
|
...reasonFlags,
|
|
session,
|
|
pendingEventEntries,
|
|
hasTaggedCronEvents,
|
|
shouldInspectPendingEvents,
|
|
} satisfies Omit<HeartbeatPreflight, "skipReason">;
|
|
|
|
if (shouldBypassFileGates) {
|
|
return basePreflight;
|
|
}
|
|
|
|
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId);
|
|
const heartbeatFilePath = path.join(workspaceDir, DEFAULT_HEARTBEAT_FILENAME);
|
|
try {
|
|
const heartbeatFileContent = await fs.readFile(heartbeatFilePath, "utf-8");
|
|
if (isHeartbeatContentEffectivelyEmpty(heartbeatFileContent)) {
|
|
return {
|
|
...basePreflight,
|
|
skipReason: "empty-heartbeat-file",
|
|
};
|
|
}
|
|
} catch (err: unknown) {
|
|
if (hasErrnoCode(err, "ENOENT")) {
|
|
// Missing HEARTBEAT.md is intentional in some setups (for example, when
|
|
// heartbeat instructions live outside the file), so keep the run active.
|
|
// The heartbeat prompt already says "if it exists".
|
|
return basePreflight;
|
|
}
|
|
// For other read errors, proceed with heartbeat as before.
|
|
}
|
|
|
|
return basePreflight;
|
|
}
|
|
|
|
type HeartbeatPromptResolution = {
|
|
prompt: string;
|
|
hasExecCompletion: boolean;
|
|
hasCronEvents: boolean;
|
|
};
|
|
|
|
function appendHeartbeatWorkspacePathHint(prompt: string, workspaceDir: string): string {
|
|
if (!/heartbeat\.md/i.test(prompt)) {
|
|
return prompt;
|
|
}
|
|
const heartbeatFilePath = path.join(workspaceDir, DEFAULT_HEARTBEAT_FILENAME).replace(/\\/g, "/");
|
|
const hint = `When reading HEARTBEAT.md, use workspace file ${heartbeatFilePath} (exact case). Do not read docs/heartbeat.md.`;
|
|
if (prompt.includes(hint)) {
|
|
return prompt;
|
|
}
|
|
return `${prompt}\n${hint}`;
|
|
}
|
|
|
|
function resolveHeartbeatRunPrompt(params: {
|
|
cfg: OpenClawConfig;
|
|
heartbeat?: HeartbeatConfig;
|
|
preflight: HeartbeatPreflight;
|
|
canRelayToUser: boolean;
|
|
workspaceDir: string;
|
|
}): HeartbeatPromptResolution {
|
|
const pendingEventEntries = params.preflight.pendingEventEntries;
|
|
const pendingEvents = params.preflight.shouldInspectPendingEvents
|
|
? pendingEventEntries.map((event) => event.text)
|
|
: [];
|
|
const cronEvents = pendingEventEntries
|
|
.filter(
|
|
(event) =>
|
|
(params.preflight.isCronEventReason || event.contextKey?.startsWith("cron:")) &&
|
|
isCronSystemEvent(event.text),
|
|
)
|
|
.map((event) => event.text);
|
|
const hasExecCompletion = pendingEvents.some(isExecCompletionEvent);
|
|
const hasCronEvents = cronEvents.length > 0;
|
|
const basePrompt = hasExecCompletion
|
|
? buildExecEventPrompt({ deliverToUser: params.canRelayToUser })
|
|
: hasCronEvents
|
|
? buildCronEventPrompt(cronEvents, { deliverToUser: params.canRelayToUser })
|
|
: resolveHeartbeatPrompt(params.cfg, params.heartbeat);
|
|
const prompt = appendHeartbeatWorkspacePathHint(basePrompt, params.workspaceDir);
|
|
|
|
return { prompt, hasExecCompletion, hasCronEvents };
|
|
}
|
|
|
|
export async function runHeartbeatOnce(opts: {
|
|
cfg?: OpenClawConfig;
|
|
agentId?: string;
|
|
sessionKey?: string;
|
|
heartbeat?: HeartbeatConfig;
|
|
reason?: string;
|
|
deps?: HeartbeatDeps;
|
|
}): Promise<HeartbeatRunResult> {
|
|
const cfg = opts.cfg ?? loadConfig();
|
|
const explicitAgentId = typeof opts.agentId === "string" ? opts.agentId.trim() : "";
|
|
const forcedSessionAgentId =
|
|
explicitAgentId.length > 0 ? undefined : parseAgentSessionKey(opts.sessionKey)?.agentId;
|
|
const agentId = normalizeAgentId(
|
|
explicitAgentId || forcedSessionAgentId || resolveDefaultAgentId(cfg),
|
|
);
|
|
const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId);
|
|
if (!areHeartbeatsEnabled()) {
|
|
return { status: "skipped", reason: "disabled" };
|
|
}
|
|
if (!isHeartbeatEnabledForAgent(cfg, agentId)) {
|
|
return { status: "skipped", reason: "disabled" };
|
|
}
|
|
if (!resolveHeartbeatIntervalMs(cfg, undefined, heartbeat)) {
|
|
return { status: "skipped", reason: "disabled" };
|
|
}
|
|
|
|
const startedAt = opts.deps?.nowMs?.() ?? Date.now();
|
|
if (!isWithinActiveHours(cfg, heartbeat, startedAt)) {
|
|
return { status: "skipped", reason: "quiet-hours" };
|
|
}
|
|
|
|
const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main);
|
|
if (queueSize > 0) {
|
|
return { status: "skipped", reason: "requests-in-flight" };
|
|
}
|
|
|
|
// Preflight centralizes trigger classification, event inspection, and HEARTBEAT.md gating.
|
|
const preflight = await resolveHeartbeatPreflight({
|
|
cfg,
|
|
agentId,
|
|
heartbeat,
|
|
forcedSessionKey: opts.sessionKey,
|
|
reason: opts.reason,
|
|
});
|
|
if (preflight.skipReason) {
|
|
emitHeartbeatEvent({
|
|
status: "skipped",
|
|
reason: preflight.skipReason,
|
|
durationMs: Date.now() - startedAt,
|
|
});
|
|
return { status: "skipped", reason: preflight.skipReason };
|
|
}
|
|
const { entry, sessionKey, storePath } = preflight.session;
|
|
const previousUpdatedAt = entry?.updatedAt;
|
|
const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat });
|
|
const heartbeatAccountId = heartbeat?.accountId?.trim();
|
|
if (delivery.reason === "unknown-account") {
|
|
log.warn("heartbeat: unknown accountId", {
|
|
accountId: delivery.accountId ?? heartbeatAccountId ?? null,
|
|
target: heartbeat?.target ?? "none",
|
|
});
|
|
} else if (heartbeatAccountId) {
|
|
log.info("heartbeat: using explicit accountId", {
|
|
accountId: delivery.accountId ?? heartbeatAccountId,
|
|
target: heartbeat?.target ?? "none",
|
|
channel: delivery.channel,
|
|
});
|
|
}
|
|
const visibility =
|
|
delivery.channel !== "none"
|
|
? resolveHeartbeatVisibility({
|
|
cfg,
|
|
channel: delivery.channel,
|
|
accountId: delivery.accountId,
|
|
})
|
|
: { showOk: false, showAlerts: true, useIndicator: true };
|
|
const { sender } = resolveHeartbeatSenderContext({ cfg, entry, delivery });
|
|
const responsePrefix = resolveEffectiveMessagesConfig(cfg, agentId, {
|
|
channel: delivery.channel !== "none" ? delivery.channel : undefined,
|
|
accountId: delivery.accountId,
|
|
}).responsePrefix;
|
|
|
|
const canRelayToUser = Boolean(
|
|
delivery.channel !== "none" && delivery.to && visibility.showAlerts,
|
|
);
|
|
const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId);
|
|
const { prompt, hasExecCompletion, hasCronEvents } = resolveHeartbeatRunPrompt({
|
|
cfg,
|
|
heartbeat,
|
|
preflight,
|
|
canRelayToUser,
|
|
workspaceDir,
|
|
});
|
|
const ctx = {
|
|
Body: appendCronStyleCurrentTimeLine(prompt, cfg, startedAt),
|
|
From: sender,
|
|
To: sender,
|
|
OriginatingChannel: delivery.channel !== "none" ? delivery.channel : undefined,
|
|
OriginatingTo: delivery.to,
|
|
AccountId: delivery.accountId,
|
|
MessageThreadId: delivery.threadId,
|
|
Provider: hasExecCompletion ? "exec-event" : hasCronEvents ? "cron-event" : "heartbeat",
|
|
SessionKey: sessionKey,
|
|
};
|
|
if (!visibility.showAlerts && !visibility.showOk && !visibility.useIndicator) {
|
|
emitHeartbeatEvent({
|
|
status: "skipped",
|
|
reason: "alerts-disabled",
|
|
durationMs: Date.now() - startedAt,
|
|
channel: delivery.channel !== "none" ? delivery.channel : undefined,
|
|
accountId: delivery.accountId,
|
|
});
|
|
return { status: "skipped", reason: "alerts-disabled" };
|
|
}
|
|
|
|
const heartbeatOkText = responsePrefix ? `${responsePrefix} ${HEARTBEAT_TOKEN}` : HEARTBEAT_TOKEN;
|
|
const outboundSession = buildOutboundSessionContext({
|
|
cfg,
|
|
agentId,
|
|
sessionKey,
|
|
});
|
|
const canAttemptHeartbeatOk = Boolean(
|
|
visibility.showOk && delivery.channel !== "none" && delivery.to,
|
|
);
|
|
const maybeSendHeartbeatOk = async () => {
|
|
if (!canAttemptHeartbeatOk || delivery.channel === "none" || !delivery.to) {
|
|
return false;
|
|
}
|
|
const heartbeatPlugin = getChannelPlugin(delivery.channel);
|
|
if (heartbeatPlugin?.heartbeat?.checkReady) {
|
|
const readiness = await heartbeatPlugin.heartbeat.checkReady({
|
|
cfg,
|
|
accountId: delivery.accountId,
|
|
deps: opts.deps,
|
|
});
|
|
if (!readiness.ok) {
|
|
return false;
|
|
}
|
|
}
|
|
await deliverOutboundPayloads({
|
|
cfg,
|
|
channel: delivery.channel,
|
|
to: delivery.to,
|
|
accountId: delivery.accountId,
|
|
threadId: delivery.threadId,
|
|
payloads: [{ text: heartbeatOkText }],
|
|
session: outboundSession,
|
|
deps: opts.deps,
|
|
});
|
|
return true;
|
|
};
|
|
|
|
try {
|
|
// Capture transcript state before the heartbeat run so we can prune if HEARTBEAT_OK
|
|
const transcriptState = await captureTranscriptState({
|
|
storePath,
|
|
sessionKey,
|
|
agentId,
|
|
});
|
|
|
|
const heartbeatModelOverride = heartbeat?.model?.trim() || undefined;
|
|
const suppressToolErrorWarnings = heartbeat?.suppressToolErrorWarnings === true;
|
|
const bootstrapContextMode: "lightweight" | undefined =
|
|
heartbeat?.lightContext === true ? "lightweight" : undefined;
|
|
const replyOpts = heartbeatModelOverride
|
|
? {
|
|
isHeartbeat: true,
|
|
heartbeatModelOverride,
|
|
suppressToolErrorWarnings,
|
|
bootstrapContextMode,
|
|
}
|
|
: { isHeartbeat: true, suppressToolErrorWarnings, bootstrapContextMode };
|
|
const replyResult = await getReplyFromConfig(ctx, replyOpts, cfg);
|
|
const replyPayload = resolveHeartbeatReplyPayload(replyResult);
|
|
const includeReasoning = heartbeat?.includeReasoning === true;
|
|
const reasoningPayloads = includeReasoning
|
|
? resolveHeartbeatReasoningPayloads(replyResult).filter((payload) => payload !== replyPayload)
|
|
: [];
|
|
|
|
if (
|
|
!replyPayload ||
|
|
(!replyPayload.text && !replyPayload.mediaUrl && !replyPayload.mediaUrls?.length)
|
|
) {
|
|
await restoreHeartbeatUpdatedAt({
|
|
storePath,
|
|
sessionKey,
|
|
updatedAt: previousUpdatedAt,
|
|
});
|
|
// Prune the transcript to remove HEARTBEAT_OK turns
|
|
await pruneHeartbeatTranscript(transcriptState);
|
|
const okSent = await maybeSendHeartbeatOk();
|
|
emitHeartbeatEvent({
|
|
status: "ok-empty",
|
|
reason: opts.reason,
|
|
durationMs: Date.now() - startedAt,
|
|
channel: delivery.channel !== "none" ? delivery.channel : undefined,
|
|
accountId: delivery.accountId,
|
|
silent: !okSent,
|
|
indicatorType: visibility.useIndicator ? resolveIndicatorType("ok-empty") : undefined,
|
|
});
|
|
return { status: "ran", durationMs: Date.now() - startedAt };
|
|
}
|
|
|
|
const ackMaxChars = resolveHeartbeatAckMaxChars(cfg, heartbeat);
|
|
const normalized = normalizeHeartbeatReply(replyPayload, responsePrefix, ackMaxChars);
|
|
// For exec completion events, don't skip even if the response looks like HEARTBEAT_OK.
|
|
// The model should be responding with exec results, not ack tokens.
|
|
// Also, if normalized.text is empty due to token stripping but we have exec completion,
|
|
// fall back to the original reply text.
|
|
const execFallbackText =
|
|
hasExecCompletion && !normalized.text.trim() && replyPayload.text?.trim()
|
|
? replyPayload.text.trim()
|
|
: null;
|
|
if (execFallbackText) {
|
|
normalized.text = execFallbackText;
|
|
normalized.shouldSkip = false;
|
|
}
|
|
const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia && !hasExecCompletion;
|
|
if (shouldSkipMain && reasoningPayloads.length === 0) {
|
|
await restoreHeartbeatUpdatedAt({
|
|
storePath,
|
|
sessionKey,
|
|
updatedAt: previousUpdatedAt,
|
|
});
|
|
// Prune the transcript to remove HEARTBEAT_OK turns
|
|
await pruneHeartbeatTranscript(transcriptState);
|
|
const okSent = await maybeSendHeartbeatOk();
|
|
emitHeartbeatEvent({
|
|
status: "ok-token",
|
|
reason: opts.reason,
|
|
durationMs: Date.now() - startedAt,
|
|
channel: delivery.channel !== "none" ? delivery.channel : undefined,
|
|
accountId: delivery.accountId,
|
|
silent: !okSent,
|
|
indicatorType: visibility.useIndicator ? resolveIndicatorType("ok-token") : undefined,
|
|
});
|
|
return { status: "ran", durationMs: Date.now() - startedAt };
|
|
}
|
|
|
|
const mediaUrls =
|
|
replyPayload.mediaUrls ?? (replyPayload.mediaUrl ? [replyPayload.mediaUrl] : []);
|
|
|
|
// Suppress duplicate heartbeats (same payload) within a short window.
|
|
// This prevents "nagging" when nothing changed but the model repeats the same items.
|
|
const prevHeartbeatText =
|
|
typeof entry?.lastHeartbeatText === "string" ? entry.lastHeartbeatText : "";
|
|
const prevHeartbeatAt =
|
|
typeof entry?.lastHeartbeatSentAt === "number" ? entry.lastHeartbeatSentAt : undefined;
|
|
const isDuplicateMain =
|
|
!shouldSkipMain &&
|
|
!mediaUrls.length &&
|
|
Boolean(prevHeartbeatText.trim()) &&
|
|
normalized.text.trim() === prevHeartbeatText.trim() &&
|
|
typeof prevHeartbeatAt === "number" &&
|
|
startedAt - prevHeartbeatAt < 24 * 60 * 60 * 1000;
|
|
|
|
if (isDuplicateMain) {
|
|
await restoreHeartbeatUpdatedAt({
|
|
storePath,
|
|
sessionKey,
|
|
updatedAt: previousUpdatedAt,
|
|
});
|
|
// Prune the transcript to remove duplicate heartbeat turns
|
|
await pruneHeartbeatTranscript(transcriptState);
|
|
emitHeartbeatEvent({
|
|
status: "skipped",
|
|
reason: "duplicate",
|
|
preview: normalized.text.slice(0, 200),
|
|
durationMs: Date.now() - startedAt,
|
|
hasMedia: false,
|
|
channel: delivery.channel !== "none" ? delivery.channel : undefined,
|
|
accountId: delivery.accountId,
|
|
});
|
|
return { status: "ran", durationMs: Date.now() - startedAt };
|
|
}
|
|
|
|
// Reasoning payloads are text-only; any attachments stay on the main reply.
|
|
const previewText = shouldSkipMain
|
|
? reasoningPayloads
|
|
.map((payload) => payload.text)
|
|
.filter((text): text is string => Boolean(text?.trim()))
|
|
.join("\n")
|
|
: normalized.text;
|
|
|
|
if (delivery.channel === "none" || !delivery.to) {
|
|
emitHeartbeatEvent({
|
|
status: "skipped",
|
|
reason: delivery.reason ?? "no-target",
|
|
preview: previewText?.slice(0, 200),
|
|
durationMs: Date.now() - startedAt,
|
|
hasMedia: mediaUrls.length > 0,
|
|
accountId: delivery.accountId,
|
|
});
|
|
return { status: "ran", durationMs: Date.now() - startedAt };
|
|
}
|
|
|
|
if (!visibility.showAlerts) {
|
|
await restoreHeartbeatUpdatedAt({
|
|
storePath,
|
|
sessionKey,
|
|
updatedAt: previousUpdatedAt,
|
|
});
|
|
emitHeartbeatEvent({
|
|
status: "skipped",
|
|
reason: "alerts-disabled",
|
|
preview: previewText?.slice(0, 200),
|
|
durationMs: Date.now() - startedAt,
|
|
channel: delivery.channel,
|
|
hasMedia: mediaUrls.length > 0,
|
|
accountId: delivery.accountId,
|
|
indicatorType: visibility.useIndicator ? resolveIndicatorType("sent") : undefined,
|
|
});
|
|
return { status: "ran", durationMs: Date.now() - startedAt };
|
|
}
|
|
|
|
const deliveryAccountId = delivery.accountId;
|
|
const heartbeatPlugin = getChannelPlugin(delivery.channel);
|
|
if (heartbeatPlugin?.heartbeat?.checkReady) {
|
|
const readiness = await heartbeatPlugin.heartbeat.checkReady({
|
|
cfg,
|
|
accountId: deliveryAccountId,
|
|
deps: opts.deps,
|
|
});
|
|
if (!readiness.ok) {
|
|
emitHeartbeatEvent({
|
|
status: "skipped",
|
|
reason: readiness.reason,
|
|
preview: previewText?.slice(0, 200),
|
|
durationMs: Date.now() - startedAt,
|
|
hasMedia: mediaUrls.length > 0,
|
|
channel: delivery.channel,
|
|
accountId: delivery.accountId,
|
|
});
|
|
log.info("heartbeat: channel not ready", {
|
|
channel: delivery.channel,
|
|
reason: readiness.reason,
|
|
});
|
|
return { status: "skipped", reason: readiness.reason };
|
|
}
|
|
}
|
|
|
|
await deliverOutboundPayloads({
|
|
cfg,
|
|
channel: delivery.channel,
|
|
to: delivery.to,
|
|
accountId: deliveryAccountId,
|
|
session: outboundSession,
|
|
threadId: delivery.threadId,
|
|
payloads: [
|
|
...reasoningPayloads,
|
|
...(shouldSkipMain
|
|
? []
|
|
: [
|
|
{
|
|
text: normalized.text,
|
|
mediaUrls,
|
|
},
|
|
]),
|
|
],
|
|
deps: opts.deps,
|
|
});
|
|
|
|
// Record last delivered heartbeat payload for dedupe.
|
|
if (!shouldSkipMain && normalized.text.trim()) {
|
|
const store = loadSessionStore(storePath);
|
|
const current = store[sessionKey];
|
|
if (current) {
|
|
store[sessionKey] = {
|
|
...current,
|
|
lastHeartbeatText: normalized.text,
|
|
lastHeartbeatSentAt: startedAt,
|
|
};
|
|
await saveSessionStore(storePath, store);
|
|
}
|
|
}
|
|
|
|
emitHeartbeatEvent({
|
|
status: "sent",
|
|
to: delivery.to,
|
|
preview: previewText?.slice(0, 200),
|
|
durationMs: Date.now() - startedAt,
|
|
hasMedia: mediaUrls.length > 0,
|
|
channel: delivery.channel,
|
|
accountId: delivery.accountId,
|
|
indicatorType: visibility.useIndicator ? resolveIndicatorType("sent") : undefined,
|
|
});
|
|
return { status: "ran", durationMs: Date.now() - startedAt };
|
|
} catch (err) {
|
|
const reason = formatErrorMessage(err);
|
|
emitHeartbeatEvent({
|
|
status: "failed",
|
|
reason,
|
|
durationMs: Date.now() - startedAt,
|
|
channel: delivery.channel !== "none" ? delivery.channel : undefined,
|
|
accountId: delivery.accountId,
|
|
indicatorType: visibility.useIndicator ? resolveIndicatorType("failed") : undefined,
|
|
});
|
|
log.error(`heartbeat failed: ${reason}`, { error: reason });
|
|
return { status: "failed", reason };
|
|
}
|
|
}
|
|
|
|
export function startHeartbeatRunner(opts: {
|
|
cfg?: OpenClawConfig;
|
|
runtime?: RuntimeEnv;
|
|
abortSignal?: AbortSignal;
|
|
runOnce?: typeof runHeartbeatOnce;
|
|
}): HeartbeatRunner {
|
|
const runtime = opts.runtime ?? defaultRuntime;
|
|
const runOnce = opts.runOnce ?? runHeartbeatOnce;
|
|
const state = {
|
|
cfg: opts.cfg ?? loadConfig(),
|
|
runtime,
|
|
agents: new Map<string, HeartbeatAgentState>(),
|
|
timer: null as NodeJS.Timeout | null,
|
|
stopped: false,
|
|
};
|
|
let initialized = false;
|
|
|
|
const resolveNextDue = (now: number, intervalMs: number, prevState?: HeartbeatAgentState) => {
|
|
if (typeof prevState?.lastRunMs === "number") {
|
|
return prevState.lastRunMs + intervalMs;
|
|
}
|
|
if (prevState && prevState.intervalMs === intervalMs && prevState.nextDueMs > now) {
|
|
return prevState.nextDueMs;
|
|
}
|
|
return now + intervalMs;
|
|
};
|
|
|
|
const advanceAgentSchedule = (agent: HeartbeatAgentState, now: number) => {
|
|
agent.lastRunMs = now;
|
|
agent.nextDueMs = now + agent.intervalMs;
|
|
};
|
|
|
|
const scheduleNext = () => {
|
|
if (state.stopped) {
|
|
return;
|
|
}
|
|
if (state.timer) {
|
|
clearTimeout(state.timer);
|
|
state.timer = null;
|
|
}
|
|
if (state.agents.size === 0) {
|
|
return;
|
|
}
|
|
const now = Date.now();
|
|
let nextDue = Number.POSITIVE_INFINITY;
|
|
for (const agent of state.agents.values()) {
|
|
if (agent.nextDueMs < nextDue) {
|
|
nextDue = agent.nextDueMs;
|
|
}
|
|
}
|
|
if (!Number.isFinite(nextDue)) {
|
|
return;
|
|
}
|
|
const delay = Math.max(0, nextDue - now);
|
|
state.timer = setTimeout(() => {
|
|
state.timer = null;
|
|
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
|
|
}, delay);
|
|
state.timer.unref?.();
|
|
};
|
|
|
|
const updateConfig = (cfg: OpenClawConfig) => {
|
|
if (state.stopped) {
|
|
return;
|
|
}
|
|
const now = Date.now();
|
|
const prevAgents = state.agents;
|
|
const prevEnabled = prevAgents.size > 0;
|
|
const nextAgents = new Map<string, HeartbeatAgentState>();
|
|
const intervals: number[] = [];
|
|
for (const agent of resolveHeartbeatAgents(cfg)) {
|
|
const intervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat);
|
|
if (!intervalMs) {
|
|
continue;
|
|
}
|
|
intervals.push(intervalMs);
|
|
const prevState = prevAgents.get(agent.agentId);
|
|
const nextDueMs = resolveNextDue(now, intervalMs, prevState);
|
|
nextAgents.set(agent.agentId, {
|
|
agentId: agent.agentId,
|
|
heartbeat: agent.heartbeat,
|
|
intervalMs,
|
|
lastRunMs: prevState?.lastRunMs,
|
|
nextDueMs,
|
|
});
|
|
}
|
|
|
|
state.cfg = cfg;
|
|
state.agents = nextAgents;
|
|
const nextEnabled = nextAgents.size > 0;
|
|
if (!initialized) {
|
|
if (!nextEnabled) {
|
|
log.info("heartbeat: disabled", { enabled: false });
|
|
} else {
|
|
log.info("heartbeat: started", { intervalMs: Math.min(...intervals) });
|
|
}
|
|
initialized = true;
|
|
} else if (prevEnabled !== nextEnabled) {
|
|
if (!nextEnabled) {
|
|
log.info("heartbeat: disabled", { enabled: false });
|
|
} else {
|
|
log.info("heartbeat: started", { intervalMs: Math.min(...intervals) });
|
|
}
|
|
}
|
|
|
|
scheduleNext();
|
|
};
|
|
|
|
const run: HeartbeatWakeHandler = async (params) => {
|
|
if (state.stopped) {
|
|
return {
|
|
status: "skipped",
|
|
reason: "disabled",
|
|
} satisfies HeartbeatRunResult;
|
|
}
|
|
if (!areHeartbeatsEnabled()) {
|
|
return {
|
|
status: "skipped",
|
|
reason: "disabled",
|
|
} satisfies HeartbeatRunResult;
|
|
}
|
|
if (state.agents.size === 0) {
|
|
return {
|
|
status: "skipped",
|
|
reason: "disabled",
|
|
} satisfies HeartbeatRunResult;
|
|
}
|
|
|
|
const reason = params?.reason;
|
|
const requestedAgentId = params?.agentId ? normalizeAgentId(params.agentId) : undefined;
|
|
const requestedSessionKey = params?.sessionKey?.trim() || undefined;
|
|
const isInterval = reason === "interval";
|
|
const startedAt = Date.now();
|
|
const now = startedAt;
|
|
let ran = false;
|
|
|
|
if (requestedSessionKey || requestedAgentId) {
|
|
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
|
|
const targetAgent = state.agents.get(targetAgentId);
|
|
if (!targetAgent) {
|
|
scheduleNext();
|
|
return { status: "skipped", reason: "disabled" };
|
|
}
|
|
try {
|
|
const res = await runOnce({
|
|
cfg: state.cfg,
|
|
agentId: targetAgent.agentId,
|
|
heartbeat: targetAgent.heartbeat,
|
|
reason,
|
|
sessionKey: requestedSessionKey,
|
|
deps: { runtime: state.runtime },
|
|
});
|
|
if (res.status !== "skipped" || res.reason !== "disabled") {
|
|
advanceAgentSchedule(targetAgent, now);
|
|
}
|
|
scheduleNext();
|
|
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
|
|
} catch (err) {
|
|
const errMsg = formatErrorMessage(err);
|
|
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
|
|
error: errMsg,
|
|
});
|
|
advanceAgentSchedule(targetAgent, now);
|
|
scheduleNext();
|
|
return { status: "failed", reason: errMsg };
|
|
}
|
|
}
|
|
|
|
for (const agent of state.agents.values()) {
|
|
if (isInterval && now < agent.nextDueMs) {
|
|
continue;
|
|
}
|
|
|
|
let res: HeartbeatRunResult;
|
|
try {
|
|
res = await runOnce({
|
|
cfg: state.cfg,
|
|
agentId: agent.agentId,
|
|
heartbeat: agent.heartbeat,
|
|
reason,
|
|
deps: { runtime: state.runtime },
|
|
});
|
|
} catch (err) {
|
|
// If runOnce throws (e.g. during session compaction), we must still
|
|
// advance the timer and call scheduleNext so heartbeats keep firing.
|
|
const errMsg = formatErrorMessage(err);
|
|
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
|
advanceAgentSchedule(agent, now);
|
|
continue;
|
|
}
|
|
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
|
// Do not advance the schedule — the main lane is busy and the wake
|
|
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling
|
|
// scheduleNext() here would register a 0 ms timer that races with
|
|
// the wake layer's 1 s retry and wins, bypassing the cooldown.
|
|
return res;
|
|
}
|
|
if (res.status !== "skipped" || res.reason !== "disabled") {
|
|
advanceAgentSchedule(agent, now);
|
|
}
|
|
if (res.status === "ran") {
|
|
ran = true;
|
|
}
|
|
}
|
|
|
|
scheduleNext();
|
|
if (ran) {
|
|
return { status: "ran", durationMs: Date.now() - startedAt };
|
|
}
|
|
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
|
};
|
|
|
|
const wakeHandler: HeartbeatWakeHandler = async (params) =>
|
|
run({
|
|
reason: params.reason,
|
|
agentId: params.agentId,
|
|
sessionKey: params.sessionKey,
|
|
});
|
|
const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler);
|
|
updateConfig(state.cfg);
|
|
|
|
const cleanup = () => {
|
|
if (state.stopped) {
|
|
return;
|
|
}
|
|
state.stopped = true;
|
|
disposeWakeHandler();
|
|
if (state.timer) {
|
|
clearTimeout(state.timer);
|
|
}
|
|
state.timer = null;
|
|
};
|
|
|
|
opts.abortSignal?.addEventListener("abort", cleanup, { once: true });
|
|
|
|
return { stop: cleanup, updateConfig };
|
|
}
|