mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 08:12:21 +00:00
fix(heartbeat): scope exec wake dispatch to session key (#32724)
Merged via squash.
Prepared head SHA: 563fee0e65
Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com>
Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com>
Reviewed-by: @altaywtf
This commit is contained in:
@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Exec heartbeat routing: scope exec-triggered heartbeat wakes to agent session keys so unrelated agents are no longer awakened by exec events, while preserving legacy unscoped behavior for non-canonical session keys. (#32724) thanks @altaywtf
|
||||
- macOS/Tailscale remote gateway discovery: add a Tailscale Serve fallback peer probe path (`wss://<peer>.ts.net`) when Bonjour and wide-area DNS-SD discovery return no gateways, and refresh both discovery paths from macOS onboarding. (#32860) Thanks @ngutman.
|
||||
- Telegram/multi-account default routing clarity: warn only for ambiguous (2+) account setups without an explicit default, add `openclaw doctor` warnings for missing/invalid multi-account defaults across channels, and document explicit-default guidance for channel routing and Telegram config. (#32544) thanks @Sid-Qin.
|
||||
- Telegram/plugin outbound hook parity: run `message_sending` + `message_sent` in Telegram reply delivery, include reply-path hook metadata (`mediaUrls`, `threadId`), and report `message_sent.success=false` when hooks blank text and no outbound message is delivered. (#32649) Thanks @KimGLee.
|
||||
|
||||
64
src/agents/bash-tools.exec-runtime.test.ts
Normal file
64
src/agents/bash-tools.exec-runtime.test.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("../infra/heartbeat-wake.js", () => ({
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../infra/system-events.js", () => ({
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
}));
|
||||
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { emitExecSystemEvent } from "./bash-tools.exec-runtime.js";
|
||||
|
||||
const requestHeartbeatNowMock = vi.mocked(requestHeartbeatNow);
|
||||
const enqueueSystemEventMock = vi.mocked(enqueueSystemEvent);
|
||||
|
||||
describe("emitExecSystemEvent", () => {
|
||||
beforeEach(() => {
|
||||
requestHeartbeatNowMock.mockClear();
|
||||
enqueueSystemEventMock.mockClear();
|
||||
});
|
||||
|
||||
it("scopes heartbeat wake to the event session key", () => {
|
||||
emitExecSystemEvent("Exec finished", {
|
||||
sessionKey: "agent:ops:main",
|
||||
contextKey: "exec:run-1",
|
||||
});
|
||||
|
||||
expect(enqueueSystemEventMock).toHaveBeenCalledWith("Exec finished", {
|
||||
sessionKey: "agent:ops:main",
|
||||
contextKey: "exec:run-1",
|
||||
});
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({
|
||||
reason: "exec-event",
|
||||
sessionKey: "agent:ops:main",
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps wake unscoped for non-agent session keys", () => {
|
||||
emitExecSystemEvent("Exec finished", {
|
||||
sessionKey: "global",
|
||||
contextKey: "exec:run-global",
|
||||
});
|
||||
|
||||
expect(enqueueSystemEventMock).toHaveBeenCalledWith("Exec finished", {
|
||||
sessionKey: "global",
|
||||
contextKey: "exec:run-global",
|
||||
});
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({
|
||||
reason: "exec-event",
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores events without a session key", () => {
|
||||
emitExecSystemEvent("Exec finished", {
|
||||
sessionKey: " ",
|
||||
contextKey: "exec:run-2",
|
||||
});
|
||||
|
||||
expect(enqueueSystemEventMock).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNowMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -6,6 +6,7 @@ import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { isDangerousHostEnvVarName } from "../infra/host-env-security.js";
|
||||
import { findPathKey, mergePathPrepend } from "../infra/path-prepend.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { scopedHeartbeatWakeOptions } from "../routing/session-key.js";
|
||||
import type { ProcessSession } from "./bash-process-registry.js";
|
||||
import type { ExecToolDetails } from "./bash-tools.exec-types.js";
|
||||
import type { BashSandboxConfig } from "./bash-tools.shared.js";
|
||||
@@ -239,7 +240,9 @@ function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "faile
|
||||
? `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel}) :: ${output}`
|
||||
: `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel})`;
|
||||
enqueueSystemEvent(summary, { sessionKey });
|
||||
requestHeartbeatNow({ reason: `exec:${session.id}:exit` });
|
||||
requestHeartbeatNow(
|
||||
scopedHeartbeatWakeOptions(sessionKey, { reason: `exec:${session.id}:exit` }),
|
||||
);
|
||||
}
|
||||
|
||||
export function createApprovalSlug(id: string) {
|
||||
@@ -265,7 +268,7 @@ export function emitExecSystemEvent(
|
||||
return;
|
||||
}
|
||||
enqueueSystemEvent(text, { sessionKey, contextKey: opts.contextKey });
|
||||
requestHeartbeatNow({ reason: "exec-event" });
|
||||
requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" }));
|
||||
}
|
||||
|
||||
export async function runExecProcess(opts: {
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
resetHeartbeatWakeStateForTests,
|
||||
setHeartbeatWakeHandler,
|
||||
} from "../infra/heartbeat-wake.js";
|
||||
import { applyPathPrepend, findPathKey } from "../infra/path-prepend.js";
|
||||
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
|
||||
import { captureEnv } from "../test-utils/env.js";
|
||||
@@ -510,6 +514,14 @@ describe("exec exit codes", () => {
|
||||
});
|
||||
|
||||
describe("exec notifyOnExit", () => {
|
||||
beforeEach(() => {
|
||||
resetHeartbeatWakeStateForTests();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
resetHeartbeatWakeStateForTests();
|
||||
});
|
||||
|
||||
it("enqueues a system event when a backgrounded exec exits", async () => {
|
||||
const tool = createNotifyOnExitExecTool();
|
||||
|
||||
@@ -521,6 +533,45 @@ describe("exec notifyOnExit", () => {
|
||||
expect(hasEvent).toBe(true);
|
||||
});
|
||||
|
||||
it("scopes notifyOnExit heartbeat wake to the exec session key", async () => {
|
||||
const tool = createNotifyOnExitExecTool();
|
||||
const wakeHandler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" });
|
||||
const dispose = setHeartbeatWakeHandler(
|
||||
wakeHandler as unknown as Parameters<typeof setHeartbeatWakeHandler>[0],
|
||||
);
|
||||
try {
|
||||
const sessionId = await startBackgroundCommand(tool, echoAfterDelay("notify"));
|
||||
|
||||
await expect
|
||||
.poll(() => wakeHandler.mock.calls[0]?.[0], NOTIFY_POLL_OPTIONS)
|
||||
.toMatchObject({
|
||||
reason: `exec:${sessionId}:exit`,
|
||||
sessionKey: DEFAULT_NOTIFY_SESSION_KEY,
|
||||
});
|
||||
} finally {
|
||||
dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps notifyOnExit heartbeat wake unscoped for non-agent session keys", async () => {
|
||||
const tool = createNotifyOnExitExecTool({ sessionKey: "global" });
|
||||
const wakeHandler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" });
|
||||
const dispose = setHeartbeatWakeHandler(
|
||||
wakeHandler as unknown as Parameters<typeof setHeartbeatWakeHandler>[0],
|
||||
);
|
||||
try {
|
||||
const sessionId = await startBackgroundCommand(tool, echoAfterDelay("notify"));
|
||||
|
||||
await expect
|
||||
.poll(() => wakeHandler.mock.calls[0]?.[0], NOTIFY_POLL_OPTIONS)
|
||||
.toEqual({
|
||||
reason: `exec:${sessionId}:exit`,
|
||||
});
|
||||
} finally {
|
||||
dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it.each<NotifyNoopCase>(NOOP_NOTIFY_CASES)("$label", runNotifyNoopCase);
|
||||
});
|
||||
|
||||
|
||||
@@ -111,7 +111,10 @@ describe("node exec events", () => {
|
||||
"Exec started (node=node-1 id=run-1): ls -la",
|
||||
{ sessionKey: "agent:main:main", contextKey: "exec:run-1" },
|
||||
);
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({ reason: "exec-event" });
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({
|
||||
reason: "exec-event",
|
||||
sessionKey: "agent:main:main",
|
||||
});
|
||||
});
|
||||
|
||||
it("enqueues exec.finished events with output", async () => {
|
||||
@@ -185,7 +188,10 @@ describe("node exec events", () => {
|
||||
"Exec denied (node=node-3 id=run-3, allowlist-miss): rm -rf /",
|
||||
{ sessionKey: "agent:demo:main", contextKey: "exec:run-3" },
|
||||
);
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({ reason: "exec-event" });
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({
|
||||
reason: "exec-event",
|
||||
sessionKey: "agent:demo:main",
|
||||
});
|
||||
});
|
||||
|
||||
it("suppresses exec.started when notifyOnExit is false", async () => {
|
||||
|
||||
@@ -10,7 +10,7 @@ import { buildOutboundSessionContext } from "../infra/outbound/session-context.j
|
||||
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
|
||||
import { registerApnsToken } from "../infra/push-apns.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { normalizeMainKey } from "../routing/session-key.js";
|
||||
import { normalizeMainKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { parseMessageWithAttachments } from "./chat-attachments.js";
|
||||
import { normalizeRpcAttachmentsToChatAttachments } from "./server-methods/attachment-normalize.js";
|
||||
@@ -574,7 +574,10 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
}
|
||||
|
||||
enqueueSystemEvent(text, { sessionKey, contextKey: runId ? `exec:${runId}` : "exec" });
|
||||
requestHeartbeatNow({ reason: "exec-event" });
|
||||
// Scope wakes only for canonical agent sessions. Synthetic node-* fallback
|
||||
// keys should keep legacy unscoped behavior so enabled non-main heartbeat
|
||||
// agents still run when no explicit agent session is provided.
|
||||
requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" }));
|
||||
return;
|
||||
}
|
||||
case "push.apns.register": {
|
||||
|
||||
@@ -202,4 +202,42 @@ describe("startHeartbeatRunner", () => {
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
|
||||
it("does not fan out to unrelated agents for session-scoped exec wakes", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date(0));
|
||||
|
||||
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
|
||||
const runner = startHeartbeatRunner({
|
||||
cfg: {
|
||||
agents: {
|
||||
defaults: { heartbeat: { every: "30m" } },
|
||||
list: [
|
||||
{ id: "main", heartbeat: { every: "30m" } },
|
||||
{ id: "finance", heartbeat: { every: "30m" } },
|
||||
],
|
||||
},
|
||||
} as OpenClawConfig,
|
||||
runOnce: runSpy,
|
||||
});
|
||||
|
||||
requestHeartbeatNow({
|
||||
reason: "exec-event",
|
||||
sessionKey: "agent:main:main",
|
||||
coalesceMs: 0,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||
expect(runSpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentId: "main",
|
||||
reason: "exec-event",
|
||||
sessionKey: "agent:main:main",
|
||||
}),
|
||||
);
|
||||
expect(runSpy.mock.calls.some((call) => call[0]?.agentId === "finance")).toBe(false);
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -30,6 +30,13 @@ function normalizeToken(value: string | undefined | null): string {
|
||||
return (value ?? "").trim().toLowerCase();
|
||||
}
|
||||
|
||||
export function scopedHeartbeatWakeOptions<T extends object>(
|
||||
sessionKey: string,
|
||||
wakeOptions: T,
|
||||
): T | (T & { sessionKey: string }) {
|
||||
return parseAgentSessionKey(sessionKey) ? { ...wakeOptions, sessionKey } : wakeOptions;
|
||||
}
|
||||
|
||||
export function normalizeMainKey(value: string | undefined | null): string {
|
||||
const trimmed = (value ?? "").trim();
|
||||
return trimmed ? trimmed.toLowerCase() : DEFAULT_MAIN_KEY;
|
||||
|
||||
Reference in New Issue
Block a user