Cron: route reminders by session namespace

This commit is contained in:
Vignesh Natarajan
2026-02-16 14:29:21 -08:00
committed by Peter Steinberger
parent f452a7a60b
commit f988abf202
19 changed files with 530 additions and 32 deletions

View File

@@ -144,6 +144,46 @@ describe("cron tool", () => {
expect(call?.params?.agentId).toBeNull();
});
it("stamps cron.add with caller sessionKey when missing", async () => {
callGatewayMock.mockResolvedValueOnce({ ok: true });
const callerSessionKey = "agent:main:discord:channel:ops";
const tool = createCronTool({ agentSessionKey: callerSessionKey });
await tool.execute("call-session-key", {
action: "add",
job: {
name: "wake-up",
schedule: { at: new Date(123).toISOString() },
payload: { kind: "systemEvent", text: "hello" },
},
});
const call = callGatewayMock.mock.calls[0]?.[0] as {
params?: { sessionKey?: string };
};
expect(call?.params?.sessionKey).toBe(callerSessionKey);
});
it("preserves explicit job.sessionKey on add", async () => {
callGatewayMock.mockResolvedValueOnce({ ok: true });
const tool = createCronTool({ agentSessionKey: "agent:main:discord:channel:ops" });
await tool.execute("call-explicit-session-key", {
action: "add",
job: {
name: "wake-up",
schedule: { at: new Date(123).toISOString() },
sessionKey: "agent:main:telegram:group:-100123:topic:99",
payload: { kind: "systemEvent", text: "hello" },
},
});
const call = callGatewayMock.mock.calls[0]?.[0] as {
params?: { sessionKey?: string };
};
expect(call?.params?.sessionKey).toBe("agent:main:telegram:group:-100123:topic:99");
});
it("adds recent context for systemEvent reminders when contextMessages > 0", async () => {
callGatewayMock
.mockResolvedValueOnce({

View File

@@ -332,13 +332,22 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
throw new Error("job required");
}
const job = normalizeCronJobCreate(params.job) ?? params.job;
if (job && typeof job === "object" && !("agentId" in job)) {
if (job && typeof job === "object") {
const cfg = loadConfig();
const agentId = opts?.agentSessionKey
? resolveSessionAgentId({ sessionKey: opts.agentSessionKey, config: cfg })
const { mainKey, alias } = resolveMainSessionAlias(cfg);
const resolvedSessionKey = opts?.agentSessionKey
? resolveInternalSessionKey({ key: opts.agentSessionKey, alias, mainKey })
: undefined;
if (agentId) {
(job as { agentId?: string }).agentId = agentId;
if (!("agentId" in job)) {
const agentId = opts?.agentSessionKey
? resolveSessionAgentId({ sessionKey: opts.agentSessionKey, config: cfg })
: undefined;
if (agentId) {
(job as { agentId?: string }).agentId = agentId;
}
}
if (!("sessionKey" in job) && resolvedSessionKey) {
(job as { sessionKey?: string }).sessionKey = resolvedSessionKey;
}
}

View File

@@ -79,6 +79,30 @@ describe("normalizeCronJobCreate", () => {
expect(cleared.agentId).toBeNull();
});
it("trims sessionKey and drops blanks", () => {
const normalized = normalizeCronJobCreate({
name: "session-key",
enabled: true,
schedule: { kind: "cron", expr: "* * * * *" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
sessionKey: " agent:main:discord:channel:ops ",
payload: { kind: "systemEvent", text: "hi" },
}) as unknown as Record<string, unknown>;
expect(normalized.sessionKey).toBe("agent:main:discord:channel:ops");
const cleared = normalizeCronJobCreate({
name: "session-key-clear",
enabled: true,
schedule: { kind: "cron", expr: "* * * * *" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
sessionKey: " ",
payload: { kind: "systemEvent", text: "hi" },
}) as unknown as Record<string, unknown>;
expect("sessionKey" in cleared).toBe(false);
});
it("canonicalizes payload.channel casing", () => {
const normalized = normalizeCronJobCreate({
name: "legacy provider",
@@ -329,4 +353,16 @@ describe("normalizeCronJobPatch", () => {
expect(payload.channel).toBe("telegram");
expect(payload.to).toBe("+15550001111");
});
it("preserves null sessionKey patches and trims string values", () => {
const trimmed = normalizeCronJobPatch({
sessionKey: " agent:main:telegram:group:-100123 ",
}) as unknown as Record<string, unknown>;
expect(trimmed.sessionKey).toBe("agent:main:telegram:group:-100123");
const cleared = normalizeCronJobPatch({
sessionKey: null,
}) as unknown as Record<string, unknown>;
expect(cleared.sessionKey).toBeNull();
});
});

View File

@@ -301,6 +301,20 @@ export function normalizeCronJobInput(
}
}
if ("sessionKey" in base) {
const sessionKey = base.sessionKey;
if (sessionKey === null) {
next.sessionKey = null;
} else if (typeof sessionKey === "string") {
const trimmed = sessionKey.trim();
if (trimmed) {
next.sessionKey = trimmed;
} else {
delete next.sessionKey;
}
}
}
if ("enabled" in base) {
const enabled = base.enabled;
if (typeof enabled === "boolean") {

View File

@@ -339,11 +339,12 @@ async function runIsolatedAnnounceJobAndWait(params: {
async function addWakeModeNowMainSystemEventJob(
cron: CronService,
options?: { name?: string; agentId?: string },
options?: { name?: string; agentId?: string; sessionKey?: string },
) {
return cron.add({
name: options?.name ?? "wakeMode now",
...(options?.agentId ? { agentId: options.agentId } : {}),
...(options?.sessionKey ? { sessionKey: options.sessionKey } : {}),
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
@@ -508,7 +509,7 @@ describe("CronService", () => {
await store.cleanup();
});
it("passes agentId to runHeartbeatOnce for main-session wakeMode now jobs", async () => {
it("passes agentId + sessionKey to runHeartbeatOnce for main-session wakeMode now jobs", async () => {
const runHeartbeatOnce = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow } =
@@ -519,9 +520,11 @@ describe("CronService", () => {
wakeNowHeartbeatBusyRetryDelayMs: 2,
});
const sessionKey = "agent:ops:discord:channel:alerts";
const job = await addWakeModeNowMainSystemEventJob(cron, {
name: "wakeMode now with agent",
agentId: "ops",
sessionKey,
});
await cron.run(job.id, "force");
@@ -531,12 +534,13 @@ describe("CronService", () => {
expect.objectContaining({
reason: `cron:${job.id}`,
agentId: "ops",
sessionKey,
}),
);
expect(requestHeartbeatNow).not.toHaveBeenCalled();
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"hello",
expect.objectContaining({ agentId: "ops" }),
expect.objectContaining({ agentId: "ops", sessionKey }),
);
cron.stop();
@@ -562,12 +566,21 @@ describe("CronService", () => {
wakeNowHeartbeatBusyRetryDelayMs: 2,
});
const job = await addWakeModeNowMainSystemEventJob(cron, { name: "wakeMode now fallback" });
const sessionKey = "agent:main:discord:channel:ops";
const job = await addWakeModeNowMainSystemEventJob(cron, {
name: "wakeMode now fallback",
sessionKey,
});
await cron.run(job.id, "force");
expect(runHeartbeatOnce).toHaveBeenCalled();
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(requestHeartbeatNow).toHaveBeenCalledWith(
expect.objectContaining({
reason: `cron:${job.id}`,
sessionKey,
}),
);
expect(job.state.lastStatus).toBe("ok");
expect(job.state.lastError).toBeUndefined();

View File

@@ -58,6 +58,7 @@ describe("cron store migration", () => {
const legacyJob = {
id: "job-1",
agentId: undefined,
sessionKey: " agent:main:discord:channel:ops ",
name: "Legacy job",
description: null,
enabled: true,
@@ -82,6 +83,7 @@ describe("cron store migration", () => {
await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2));
const migrated = await migrateAndLoadFirstJob(store.storePath);
expect(migrated.sessionKey).toBe("agent:main:discord:channel:ops");
expect(migrated.delivery).toEqual({
mode: "announce",
channel: "telegram",

View File

@@ -13,6 +13,7 @@ import type {
import { normalizeHttpWebhookUrl } from "../webhook-url.js";
import {
normalizeOptionalAgentId,
normalizeOptionalSessionKey,
normalizeOptionalText,
normalizePayloadToSystemText,
normalizeRequiredName,
@@ -298,6 +299,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
const job: CronJob = {
id,
agentId: normalizeOptionalAgentId(input.agentId),
sessionKey: normalizeOptionalSessionKey((input as { sessionKey?: unknown }).sessionKey),
name: normalizeRequiredName(input.name),
description: normalizeOptionalText(input.description),
enabled,
@@ -367,6 +369,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) {
if ("agentId" in patch) {
job.agentId = normalizeOptionalAgentId((patch as { agentId?: unknown }).agentId);
}
if ("sessionKey" in patch) {
job.sessionKey = normalizeOptionalSessionKey((patch as { sessionKey?: unknown }).sessionKey);
}
assertSupportedJobSpec(job);
assertDeliverySupport(job);
}

View File

@@ -39,6 +39,14 @@ export function normalizeOptionalAgentId(raw: unknown) {
return normalizeAgentId(trimmed);
}
export function normalizeOptionalSessionKey(raw: unknown) {
if (typeof raw !== "string") {
return undefined;
}
const trimmed = raw.trim();
return trimmed || undefined;
}
export function inferLegacyName(job: {
schedule?: { kind?: unknown; everyMs?: unknown; expr?: unknown };
payload?: { kind?: unknown; text?: unknown; message?: unknown };

View File

@@ -43,9 +43,16 @@ export type CronServiceDeps = {
resolveSessionStorePath?: (agentId?: string) => string;
/** Path to the session store (sessions.json) for reaper use. */
sessionStorePath?: string;
enqueueSystemEvent: (text: string, opts?: { agentId?: string; contextKey?: string }) => void;
requestHeartbeatNow: (opts?: { reason?: string }) => void;
runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string }) => Promise<HeartbeatRunResult>;
enqueueSystemEvent: (
text: string,
opts?: { agentId?: string; sessionKey?: string; contextKey?: string },
) => void;
requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void;
runHeartbeatOnce?: (opts?: {
reason?: string;
agentId?: string;
sessionKey?: string;
}) => Promise<HeartbeatRunResult>;
/**
* WakeMode=now: max time to wait for runHeartbeatOnce to stop returning
* { status:"skipped", reason:"requests-in-flight" } before falling back to

View File

@@ -264,6 +264,15 @@ export async function ensureLoaded(
mutated = true;
}
if ("sessionKey" in raw) {
const sessionKey =
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
if (raw.sessionKey !== sessionKey) {
raw.sessionKey = sessionKey;
mutated = true;
}
}
if (typeof raw.enabled !== "boolean") {
raw.enabled = true;
mutated = true;

View File

@@ -453,6 +453,7 @@ async function executeJobCore(
}
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
@@ -464,7 +465,11 @@ async function executeJobCore(
let heartbeatResult: HeartbeatRunResult;
for (;;) {
heartbeatResult = await state.deps.runHeartbeatOnce({ reason, agentId: job.agentId });
heartbeatResult = await state.deps.runHeartbeatOnce({
reason,
agentId: job.agentId,
sessionKey: job.sessionKey,
});
if (
heartbeatResult.status !== "skipped" ||
heartbeatResult.reason !== "requests-in-flight"
@@ -472,7 +477,11 @@ async function executeJobCore(
break;
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
state.deps.requestHeartbeatNow({ reason });
state.deps.requestHeartbeatNow({
reason,
agentId: job.agentId,
sessionKey: job.sessionKey,
});
return { status: "ok", summary: text };
}
await delay(retryDelayMs);
@@ -486,7 +495,11 @@ async function executeJobCore(
return { status: "error", error: heartbeatResult.reason, summary: text };
}
} else {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
state.deps.requestHeartbeatNow({
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: job.sessionKey,
});
return { status: "ok", summary: text };
}
}
@@ -514,10 +527,15 @@ async function executeJobCore(
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
state.deps.requestHeartbeatNow({
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: job.sessionKey,
});
}
}

View File

@@ -92,6 +92,8 @@ export type CronJobState = {
export type CronJob = {
id: string;
agentId?: string;
/** Origin session namespace for reminder delivery and wake routing. */
sessionKey?: string;
name: string;
description?: string;
enabled: boolean;

View File

@@ -135,6 +135,7 @@ export const CronJobSchema = Type.Object(
{
id: NonEmptyString,
agentId: Type.Optional(NonEmptyString),
sessionKey: Type.Optional(NonEmptyString),
name: NonEmptyString,
description: Type.Optional(Type.String()),
enabled: Type.Boolean(),
@@ -164,6 +165,7 @@ export const CronAddParamsSchema = Type.Object(
{
name: NonEmptyString,
agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
sessionKey: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
description: Type.Optional(Type.String()),
enabled: Type.Optional(Type.Boolean()),
deleteAfterRun: Type.Optional(Type.Boolean()),
@@ -180,6 +182,7 @@ export const CronJobPatchSchema = Type.Object(
{
name: Type.Optional(NonEmptyString),
agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
sessionKey: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
description: Type.Optional(Type.String()),
enabled: Type.Optional(Type.Boolean()),
deleteAfterRun: Type.Optional(Type.Boolean()),

View File

@@ -1,7 +1,11 @@
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import type { CliDeps } from "../cli/deps.js";
import { loadConfig } from "../config/config.js";
import { resolveAgentMainSessionKey } from "../config/sessions.js";
import {
canonicalizeMainSessionAlias,
resolveAgentIdFromSessionKey,
resolveAgentMainSessionKey,
} from "../config/sessions.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
@@ -82,6 +86,35 @@ export function buildGatewayCronService(params: {
return { agentId, cfg: runtimeConfig };
};
const resolveCronSessionKey = (params: {
runtimeConfig: ReturnType<typeof loadConfig>;
agentId: string;
requestedSessionKey?: string | null;
}) => {
const requested = params.requestedSessionKey?.trim();
if (!requested) {
return resolveAgentMainSessionKey({
cfg: params.runtimeConfig,
agentId: params.agentId,
});
}
const canonical = canonicalizeMainSessionAlias({
cfg: params.runtimeConfig,
agentId: params.agentId,
sessionKey: requested,
});
if (canonical !== "global") {
const sessionAgentId = resolveAgentIdFromSessionKey(canonical);
if (normalizeAgentId(sessionAgentId) !== normalizeAgentId(params.agentId)) {
return resolveAgentMainSessionKey({
cfg: params.runtimeConfig,
agentId: params.agentId,
});
}
}
return canonical;
};
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const resolveSessionStorePath = (agentId?: string) =>
resolveStorePath(params.cfg.session?.store, {
@@ -99,20 +132,58 @@ export function buildGatewayCronService(params: {
sessionStorePath,
enqueueSystemEvent: (text, opts) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId);
const sessionKey = resolveAgentMainSessionKey({
cfg: runtimeConfig,
const sessionKey = resolveCronSessionKey({
runtimeConfig,
agentId,
requestedSessionKey: opts?.sessionKey,
});
enqueueSystemEvent(text, { sessionKey, contextKey: opts?.contextKey });
},
requestHeartbeatNow,
requestHeartbeatNow: (opts) => {
const runtimeConfig = loadConfig();
const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined;
const derivedAgentId =
requestedAgentId ??
(opts?.sessionKey
? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey))
: undefined);
const agentId = derivedAgentId || undefined;
const sessionKey =
opts?.sessionKey && agentId
? resolveCronSessionKey({
runtimeConfig,
agentId,
requestedSessionKey: opts.sessionKey,
})
: undefined;
requestHeartbeatNow({
reason: opts?.reason,
agentId,
sessionKey,
});
},
runHeartbeatOnce: async (opts) => {
const runtimeConfig = loadConfig();
const agentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined;
const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined;
const derivedAgentId =
requestedAgentId ??
(opts?.sessionKey
? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey))
: undefined);
const agentId = derivedAgentId || undefined;
const sessionKey =
opts?.sessionKey && agentId
? resolveCronSessionKey({
runtimeConfig,
agentId,
requestedSessionKey: opts.sessionKey,
})
: undefined;
return await runHeartbeatOnce({
cfg: runtimeConfig,
reason: opts?.reason,
agentId,
sessionKey,
deps: { ...params.deps, runtime: defaultRuntime },
});
},

View File

@@ -721,6 +721,81 @@ describe("runHeartbeatOnce", () => {
}
});
it("runs heartbeats in forced session key overrides passed at call time", async () => {
const tmpDir = await createCaseDir("hb-forced-session-override");
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
try {
const cfg: OpenClawConfig = {
agents: {
defaults: {
workspace: tmpDir,
heartbeat: {
every: "5m",
target: "last",
},
},
},
channels: { whatsapp: { allowFrom: ["*"] } },
session: { store: storePath },
};
const mainSessionKey = resolveMainSessionKey(cfg);
const agentId = resolveAgentIdFromSessionKey(mainSessionKey);
const forcedSessionKey = buildAgentPeerSessionKey({
agentId,
channel: "whatsapp",
peerKind: "dm",
peerId: "+15559990000",
});
await fs.writeFile(
storePath,
JSON.stringify({
[mainSessionKey]: {
sessionId: "sid-main",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastTo: "+1555",
},
[forcedSessionKey]: {
sessionId: "sid-forced",
updatedAt: Date.now() + 10_000,
lastChannel: "whatsapp",
lastTo: "+15559990000",
},
}),
);
replySpy.mockResolvedValue([{ text: "Forced alert" }]);
const sendWhatsApp = vi.fn().mockResolvedValue({
messageId: "m1",
toJid: "jid",
});
await runHeartbeatOnce({
cfg,
sessionKey: forcedSessionKey,
deps: {
sendWhatsApp,
getQueueSize: () => 0,
nowMs: () => 0,
webAuthExists: async () => true,
hasActiveWebListener: () => true,
},
});
expect(sendWhatsApp).toHaveBeenCalledTimes(1);
expect(sendWhatsApp).toHaveBeenCalledWith("+15559990000", "Forced alert", expect.any(Object));
expect(replySpy).toHaveBeenCalledWith(
expect.objectContaining({ SessionKey: forcedSessionKey }),
expect.objectContaining({ isHeartbeat: true }),
cfg,
);
} finally {
replySpy.mockRestore();
}
});
it("suppresses duplicate heartbeat payloads within 24h", async () => {
const tmpDir = await createCaseDir("hb-dup-suppress");
const storePath = path.join(tmpDir, "sessions.json");

View File

@@ -1,6 +1,7 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { startHeartbeatRunner } from "./heartbeat-runner.js";
import { requestHeartbeatNow, resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js";
describe("startHeartbeatRunner", () => {
function startDefaultRunner(runOnce: (typeof startHeartbeatRunner)[0]["runOnce"]) {
@@ -13,6 +14,7 @@ describe("startHeartbeatRunner", () => {
}
afterEach(() => {
resetHeartbeatWakeStateForTests();
vi.useRealTimers();
vi.restoreAllMocks();
});
@@ -162,4 +164,42 @@ describe("startHeartbeatRunner", () => {
runner.stop();
});
it("routes targeted wake requests to the requested agent/session", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date(0));
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
const runner = startHeartbeatRunner({
cfg: {
agents: {
defaults: { heartbeat: { every: "30m" } },
list: [
{ id: "main", heartbeat: { every: "30m" } },
{ id: "ops", heartbeat: { every: "15m" } },
],
},
} as OpenClawConfig,
runOnce: runSpy,
});
requestHeartbeatNow({
reason: "cron:job-123",
agentId: "ops",
sessionKey: "agent:ops:discord:channel:alerts",
coalesceMs: 0,
});
await vi.advanceTimersByTimeAsync(1);
expect(runSpy).toHaveBeenCalledTimes(1);
expect(runSpy).toHaveBeenCalledWith(
expect.objectContaining({
agentId: "ops",
reason: "cron:job-123",
sessionKey: "agent:ops:discord:channel:alerts",
}),
);
runner.stop();
});
});

View File

@@ -259,6 +259,7 @@ function resolveHeartbeatSession(
cfg: OpenClawConfig,
agentId?: string,
heartbeat?: HeartbeatConfig,
forcedSessionKey?: string,
) {
const sessionCfg = cfg.session;
const scope = sessionCfg?.scope ?? "per-sender";
@@ -276,6 +277,31 @@ function resolveHeartbeatSession(
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
}
const forced = forcedSessionKey?.trim();
if (forced) {
const forcedCandidate = toAgentStoreSessionKey({
agentId: resolvedAgentId,
requestKey: forced,
mainKey: cfg.session?.mainKey,
});
const forcedCanonical = canonicalizeMainSessionAlias({
cfg,
agentId: resolvedAgentId,
sessionKey: forcedCandidate,
});
if (forcedCanonical !== "global") {
const sessionAgentId = resolveAgentIdFromSessionKey(forcedCanonical);
if (sessionAgentId === normalizeAgentId(resolvedAgentId)) {
return {
sessionKey: forcedCanonical,
storePath,
store,
entry: store[forcedCanonical],
};
}
}
}
const trimmed = heartbeat?.session?.trim() ?? "";
if (!trimmed) {
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
@@ -437,6 +463,7 @@ function normalizeHeartbeatReply(
export async function runHeartbeatOnce(opts: {
cfg?: OpenClawConfig;
agentId?: string;
sessionKey?: string;
heartbeat?: HeartbeatConfig;
reason?: string;
deps?: HeartbeatDeps;
@@ -493,7 +520,12 @@ export async function runHeartbeatOnce(opts: {
// The LLM prompt says "if it exists" so this is expected behavior.
}
const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId, heartbeat);
const { entry, sessionKey, storePath } = resolveHeartbeatSession(
cfg,
agentId,
heartbeat,
opts.sessionKey,
);
const previousUpdatedAt = entry?.updatedAt;
const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat });
const heartbeatAccountId = heartbeat?.accountId?.trim();
@@ -969,11 +1001,45 @@ export function startHeartbeatRunner(opts: {
}
const reason = params?.reason;
const requestedAgentId = params?.agentId ? normalizeAgentId(params.agentId) : undefined;
const requestedSessionKey = params?.sessionKey?.trim() || undefined;
const isInterval = reason === "interval";
const startedAt = Date.now();
const now = startedAt;
let ran = false;
if (requestedSessionKey || requestedAgentId) {
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
const targetAgent = state.agents.get(targetAgentId);
if (!targetAgent) {
scheduleNext();
return { status: "skipped", reason: "disabled" };
}
try {
const res = await runOnce({
cfg: state.cfg,
agentId: targetAgent.agentId,
heartbeat: targetAgent.heartbeat,
reason,
sessionKey: requestedSessionKey,
deps: { runtime: state.runtime },
});
if (res.status !== "skipped" || res.reason !== "disabled") {
advanceAgentSchedule(targetAgent, now);
}
scheduleNext();
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
} catch (err) {
const errMsg = formatErrorMessage(err);
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
error: errMsg,
});
advanceAgentSchedule(targetAgent, now);
scheduleNext();
return { status: "failed", reason: errMsg };
}
}
for (const agent of state.agents.values()) {
if (isInterval && now < agent.nextDueMs) {
continue;
@@ -1016,7 +1082,12 @@ export function startHeartbeatRunner(opts: {
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
};
const wakeHandler: HeartbeatWakeHandler = async (params) => run({ reason: params.reason });
const wakeHandler: HeartbeatWakeHandler = async (params) =>
run({
reason: params.reason,
agentId: params.agentId,
sessionKey: params.sessionKey,
});
const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler);
updateConfig(state.cfg);

View File

@@ -247,4 +247,36 @@ describe("heartbeat-wake", () => {
expect(handler).toHaveBeenCalledWith({ reason: "manual" });
expect(hasPendingHeartbeatWake()).toBe(false);
});
it("forwards wake target fields and preserves them across retries", async () => {
vi.useFakeTimers();
const handler = vi
.fn()
.mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" })
.mockResolvedValueOnce({ status: "ran", durationMs: 1 });
setHeartbeatWakeHandler(handler);
requestHeartbeatNow({
reason: "cron:job-1",
agentId: "ops",
sessionKey: "agent:ops:discord:channel:alerts",
coalesceMs: 0,
});
await vi.advanceTimersByTimeAsync(1);
expect(handler).toHaveBeenCalledTimes(1);
expect(handler.mock.calls[0]?.[0]).toEqual({
reason: "cron:job-1",
agentId: "ops",
sessionKey: "agent:ops:discord:channel:alerts",
});
await vi.advanceTimersByTimeAsync(1000);
expect(handler).toHaveBeenCalledTimes(2);
expect(handler.mock.calls[1]?.[0]).toEqual({
reason: "cron:job-1",
agentId: "ops",
sessionKey: "agent:ops:discord:channel:alerts",
});
});
});

View File

@@ -3,13 +3,19 @@ export type HeartbeatRunResult =
| { status: "skipped"; reason: string }
| { status: "failed"; reason: string };
export type HeartbeatWakeHandler = (opts: { reason?: string }) => Promise<HeartbeatRunResult>;
export type HeartbeatWakeHandler = (opts: {
reason?: string;
agentId?: string;
sessionKey?: string;
}) => Promise<HeartbeatRunResult>;
type WakeTimerKind = "normal" | "retry";
type PendingWakeReason = {
reason: string;
priority: number;
requestedAt: number;
agentId?: string;
sessionKey?: string;
};
let handler: HeartbeatWakeHandler | null = null;
@@ -56,12 +62,25 @@ function normalizeWakeReason(reason?: string): string {
return trimmed.length > 0 ? trimmed : "requested";
}
function queuePendingWakeReason(reason?: string, requestedAt = Date.now()) {
const normalizedReason = normalizeWakeReason(reason);
function normalizeWakeTarget(value?: string): string | undefined {
const trimmed = typeof value === "string" ? value.trim() : "";
return trimmed || undefined;
}
function queuePendingWakeReason(params?: {
reason?: string;
requestedAt?: number;
agentId?: string;
sessionKey?: string;
}) {
const requestedAt = params?.requestedAt ?? Date.now();
const normalizedReason = normalizeWakeReason(params?.reason);
const next: PendingWakeReason = {
reason: normalizedReason,
priority: resolveReasonPriority(normalizedReason),
requestedAt,
agentId: normalizeWakeTarget(params?.agentId),
sessionKey: normalizeWakeTarget(params?.sessionKey),
};
if (!pendingWake) {
pendingWake = next;
@@ -113,18 +132,33 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
}
const reason = pendingWake?.reason;
const agentId = pendingWake?.agentId;
const sessionKey = pendingWake?.sessionKey;
pendingWake = null;
running = true;
try {
const res = await active({ reason: reason ?? undefined });
const wakeOpts = {
reason: reason ?? undefined,
...(agentId ? { agentId } : {}),
...(sessionKey ? { sessionKey } : {}),
};
const res = await active(wakeOpts);
if (res.status === "skipped" && res.reason === "requests-in-flight") {
// The main lane is busy; retry soon.
queuePendingWakeReason(reason ?? "retry");
queuePendingWakeReason({
reason: reason ?? "retry",
agentId,
sessionKey,
});
schedule(DEFAULT_RETRY_MS, "retry");
}
} catch {
// Error is already logged by the heartbeat runner; schedule a retry.
queuePendingWakeReason(reason ?? "retry");
queuePendingWakeReason({
reason: reason ?? "retry",
agentId,
sessionKey,
});
schedule(DEFAULT_RETRY_MS, "retry");
} finally {
running = false;
@@ -178,8 +212,17 @@ export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () =
};
}
export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }) {
queuePendingWakeReason(opts?.reason);
export function requestHeartbeatNow(opts?: {
reason?: string;
coalesceMs?: number;
agentId?: string;
sessionKey?: string;
}) {
queuePendingWakeReason({
reason: opts?.reason,
agentId: opts?.agentId,
sessionKey: opts?.sessionKey,
});
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal");
}