mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 01:47:27 +00:00
refactor(signal): extract daemon lifecycle and typed exit handling
This commit is contained in:
@@ -16,10 +16,20 @@ export type SignalDaemonOpts = {
|
||||
export type SignalDaemonHandle = {
|
||||
pid?: number;
|
||||
stop: () => void;
|
||||
exited: Promise<{ code: number | null; signal: NodeJS.Signals | null }>;
|
||||
exited: Promise<SignalDaemonExitEvent>;
|
||||
isExited: () => boolean;
|
||||
};
|
||||
|
||||
export type SignalDaemonExitEvent = {
|
||||
source: "process" | "spawn-error";
|
||||
code: number | null;
|
||||
signal: NodeJS.Signals | null;
|
||||
};
|
||||
|
||||
export function formatSignalDaemonExit(exit: SignalDaemonExitEvent): string {
|
||||
return `signal daemon exited (source=${exit.source} code=${String(exit.code ?? "null")} signal=${String(exit.signal ?? "null")})`;
|
||||
}
|
||||
|
||||
export function classifySignalCliLogLine(line: string): "log" | "error" | null {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) {
|
||||
@@ -87,13 +97,11 @@ export function spawnSignalDaemon(opts: SignalDaemonOpts): SignalDaemonHandle {
|
||||
const error = opts.runtime?.error ?? (() => {});
|
||||
let exited = false;
|
||||
let settledExit = false;
|
||||
let resolveExit!: (value: { code: number | null; signal: NodeJS.Signals | null }) => void;
|
||||
const exitedPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>(
|
||||
(resolve) => {
|
||||
resolveExit = resolve;
|
||||
},
|
||||
);
|
||||
const settleExit = (value: { code: number | null; signal: NodeJS.Signals | null }) => {
|
||||
let resolveExit!: (value: SignalDaemonExitEvent) => void;
|
||||
const exitedPromise = new Promise<SignalDaemonExitEvent>((resolve) => {
|
||||
resolveExit = resolve;
|
||||
});
|
||||
const settleExit = (value: SignalDaemonExitEvent) => {
|
||||
if (settledExit) {
|
||||
return;
|
||||
}
|
||||
@@ -106,22 +114,24 @@ export function spawnSignalDaemon(opts: SignalDaemonOpts): SignalDaemonHandle {
|
||||
bindSignalCliOutput({ stream: child.stderr, log, error });
|
||||
child.once("exit", (code, signal) => {
|
||||
settleExit({
|
||||
source: "process",
|
||||
code: typeof code === "number" ? code : null,
|
||||
signal: signal ?? null,
|
||||
});
|
||||
error(
|
||||
`signal-cli daemon exited (code=${String(code ?? "null")} signal=${String(signal ?? "null")})`,
|
||||
formatSignalDaemonExit({ source: "process", code: code ?? null, signal: signal ?? null }),
|
||||
);
|
||||
});
|
||||
child.once("close", (code, signal) => {
|
||||
settleExit({
|
||||
source: "process",
|
||||
code: typeof code === "number" ? code : null,
|
||||
signal: signal ?? null,
|
||||
});
|
||||
});
|
||||
child.on("error", (err) => {
|
||||
error(`signal-cli spawn error: ${String(err)}`);
|
||||
settleExit({ code: null, signal: null });
|
||||
settleExit({ source: "spawn-error", code: null, signal: null });
|
||||
});
|
||||
|
||||
return {
|
||||
|
||||
@@ -3,7 +3,9 @@ import type { OpenClawConfig } from "../config/config.js";
|
||||
import { peekSystemEvents } from "../infra/system-events.js";
|
||||
import { resolveAgentRoute } from "../routing/resolve-route.js";
|
||||
import { normalizeE164 } from "../utils.js";
|
||||
import type { SignalDaemonExitEvent } from "./daemon.js";
|
||||
import {
|
||||
createMockSignalDaemonHandle,
|
||||
config,
|
||||
flush,
|
||||
getSignalToolResultTestMocks,
|
||||
@@ -216,11 +218,12 @@ describe("monitorSignalProvider tool results", () => {
|
||||
it("fails fast when auto-started signal daemon exits during startup", async () => {
|
||||
const runtime = createMonitorRuntime();
|
||||
setSignalAutoStartConfig();
|
||||
spawnSignalDaemonMock.mockReturnValueOnce({
|
||||
stop: vi.fn(),
|
||||
exited: Promise.resolve({ code: 1, signal: null }),
|
||||
isExited: () => true,
|
||||
});
|
||||
spawnSignalDaemonMock.mockReturnValueOnce(
|
||||
createMockSignalDaemonHandle({
|
||||
exited: Promise.resolve({ source: "process", code: 1, signal: null }),
|
||||
isExited: () => true,
|
||||
}),
|
||||
);
|
||||
waitForTransportReadyMock.mockImplementationOnce(
|
||||
async (params: { abortSignal?: AbortSignal | null }) => {
|
||||
await new Promise<void>((_resolve, reject) => {
|
||||
@@ -251,24 +254,24 @@ describe("monitorSignalProvider tool results", () => {
|
||||
setSignalAutoStartConfig();
|
||||
const abortController = new AbortController();
|
||||
let exited = false;
|
||||
let resolveExit!: (value: { code: number | null; signal: NodeJS.Signals | null }) => void;
|
||||
const exitedPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>(
|
||||
(resolve) => {
|
||||
resolveExit = resolve;
|
||||
},
|
||||
);
|
||||
let resolveExit!: (value: SignalDaemonExitEvent) => void;
|
||||
const exitedPromise = new Promise<SignalDaemonExitEvent>((resolve) => {
|
||||
resolveExit = resolve;
|
||||
});
|
||||
const stop = vi.fn(() => {
|
||||
if (exited) {
|
||||
return;
|
||||
}
|
||||
exited = true;
|
||||
resolveExit({ code: null, signal: "SIGTERM" });
|
||||
});
|
||||
spawnSignalDaemonMock.mockReturnValueOnce({
|
||||
stop,
|
||||
exited: exitedPromise,
|
||||
isExited: () => exited,
|
||||
resolveExit({ source: "process", code: null, signal: "SIGTERM" });
|
||||
});
|
||||
spawnSignalDaemonMock.mockReturnValueOnce(
|
||||
createMockSignalDaemonHandle({
|
||||
stop,
|
||||
exited: exitedPromise,
|
||||
isExited: () => exited,
|
||||
}),
|
||||
);
|
||||
streamMock.mockImplementationOnce(async () => {
|
||||
abortController.abort(new Error("stop"));
|
||||
});
|
||||
|
||||
@@ -2,6 +2,7 @@ import { beforeEach, vi } from "vitest";
|
||||
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
|
||||
import { resetSystemEventsForTest } from "../infra/system-events.js";
|
||||
import type { MockFn } from "../test-utils/vitest-mock-fn.js";
|
||||
import type { SignalDaemonExitEvent, SignalDaemonHandle } from "./daemon.js";
|
||||
|
||||
type SignalToolResultTestMocks = {
|
||||
waitForTransportReadyMock: MockFn;
|
||||
@@ -50,6 +51,23 @@ export function setSignalToolResultTestConfig(next: Record<string, unknown>) {
|
||||
|
||||
export const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
export function createMockSignalDaemonHandle(
|
||||
overrides: {
|
||||
stop?: MockFn;
|
||||
exited?: Promise<SignalDaemonExitEvent>;
|
||||
isExited?: () => boolean;
|
||||
} = {},
|
||||
): SignalDaemonHandle {
|
||||
const stop = overrides.stop ?? (vi.fn() as unknown as MockFn);
|
||||
const exited = overrides.exited ?? new Promise<SignalDaemonExitEvent>(() => {});
|
||||
const isExited = overrides.isExited ?? (() => false);
|
||||
return {
|
||||
stop: stop as unknown as () => void,
|
||||
exited,
|
||||
isExited,
|
||||
};
|
||||
}
|
||||
|
||||
vi.mock("../config/config.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../config/config.js")>();
|
||||
return {
|
||||
@@ -86,9 +104,13 @@ vi.mock("./client.js", () => ({
|
||||
signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./daemon.js", () => ({
|
||||
spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args),
|
||||
}));
|
||||
vi.mock("./daemon.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("./daemon.js")>();
|
||||
return {
|
||||
...actual,
|
||||
spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../infra/transport-ready.js", () => ({
|
||||
waitForTransportReady: (...args: unknown[]) => waitForTransportReadyMock(...args),
|
||||
@@ -110,11 +132,7 @@ export function installSignalToolResultTestHooks() {
|
||||
streamMock.mockReset();
|
||||
signalCheckMock.mockReset().mockResolvedValue({});
|
||||
signalRpcRequestMock.mockReset().mockResolvedValue({});
|
||||
spawnSignalDaemonMock.mockReset().mockReturnValue({
|
||||
stop: vi.fn(),
|
||||
exited: new Promise(() => {}),
|
||||
isExited: () => false,
|
||||
});
|
||||
spawnSignalDaemonMock.mockReset().mockReturnValue(createMockSignalDaemonHandle());
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true });
|
||||
waitForTransportReadyMock.mockReset().mockResolvedValue(undefined);
|
||||
|
||||
@@ -11,7 +11,7 @@ import { normalizeStringEntries } from "../shared/string-normalization.js";
|
||||
import { normalizeE164 } from "../utils.js";
|
||||
import { resolveSignalAccount } from "./accounts.js";
|
||||
import { signalCheck, signalRpcRequest } from "./client.js";
|
||||
import { spawnSignalDaemon } from "./daemon.js";
|
||||
import { formatSignalDaemonExit, spawnSignalDaemon, type SignalDaemonHandle } from "./daemon.js";
|
||||
import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js";
|
||||
import { createSignalEventHandler } from "./monitor/event-handler.js";
|
||||
import type {
|
||||
@@ -87,6 +87,38 @@ function mergeAbortSignals(
|
||||
};
|
||||
}
|
||||
|
||||
function createSignalDaemonLifecycle(params: { abortSignal?: AbortSignal }) {
|
||||
let daemonHandle: SignalDaemonHandle | null = null;
|
||||
let daemonStopRequested = false;
|
||||
let daemonExitError: Error | undefined;
|
||||
const daemonAbortController = new AbortController();
|
||||
const mergedAbort = mergeAbortSignals(params.abortSignal, daemonAbortController.signal);
|
||||
const stop = () => {
|
||||
daemonStopRequested = true;
|
||||
daemonHandle?.stop();
|
||||
};
|
||||
const attach = (handle: SignalDaemonHandle) => {
|
||||
daemonHandle = handle;
|
||||
void handle.exited.then((exit) => {
|
||||
if (daemonStopRequested || params.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
daemonExitError = new Error(formatSignalDaemonExit(exit));
|
||||
if (!daemonAbortController.signal.aborted) {
|
||||
daemonAbortController.abort(daemonExitError);
|
||||
}
|
||||
});
|
||||
};
|
||||
const getExitError = () => daemonExitError;
|
||||
return {
|
||||
attach,
|
||||
stop,
|
||||
getExitError,
|
||||
abortSignal: mergedAbort.signal,
|
||||
dispose: mergedAbort.dispose,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeAllowList(raw?: Array<string | number>): string[] {
|
||||
return normalizeStringEntries(raw);
|
||||
}
|
||||
@@ -326,15 +358,8 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
||||
Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000),
|
||||
);
|
||||
const readReceiptsViaDaemon = Boolean(autoStart && sendReadReceipts);
|
||||
let daemonExitError: Error | undefined;
|
||||
const daemonAbortController = new AbortController();
|
||||
const mergedAbort = mergeAbortSignals(opts.abortSignal, daemonAbortController.signal);
|
||||
let daemonHandle: ReturnType<typeof spawnSignalDaemon> | null = null;
|
||||
let daemonStopRequested = false;
|
||||
const stopDaemon = () => {
|
||||
daemonStopRequested = true;
|
||||
daemonHandle?.stop();
|
||||
};
|
||||
const daemonLifecycle = createSignalDaemonLifecycle({ abortSignal: opts.abortSignal });
|
||||
let daemonHandle: SignalDaemonHandle | null = null;
|
||||
|
||||
if (autoStart) {
|
||||
const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli";
|
||||
@@ -351,21 +376,11 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
||||
sendReadReceipts,
|
||||
runtime,
|
||||
});
|
||||
void daemonHandle.exited.then((exit) => {
|
||||
if (daemonStopRequested || opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
daemonExitError = new Error(
|
||||
`signal daemon exited (code=${String(exit.code ?? "null")} signal=${String(exit.signal ?? "null")})`,
|
||||
);
|
||||
if (!daemonAbortController.signal.aborted) {
|
||||
daemonAbortController.abort(daemonExitError);
|
||||
}
|
||||
});
|
||||
daemonLifecycle.attach(daemonHandle);
|
||||
}
|
||||
|
||||
const onAbort = () => {
|
||||
stopDaemon();
|
||||
daemonLifecycle.stop();
|
||||
};
|
||||
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
@@ -373,12 +388,13 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
||||
if (daemonHandle) {
|
||||
await waitForSignalDaemonReady({
|
||||
baseUrl,
|
||||
abortSignal: mergedAbort.signal,
|
||||
abortSignal: daemonLifecycle.abortSignal,
|
||||
timeoutMs: startupTimeoutMs,
|
||||
logAfterMs: 10_000,
|
||||
logIntervalMs: 10_000,
|
||||
runtime,
|
||||
});
|
||||
const daemonExitError = daemonLifecycle.getExitError();
|
||||
if (daemonExitError) {
|
||||
throw daemonExitError;
|
||||
}
|
||||
@@ -415,7 +431,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
||||
await runSignalSseLoop({
|
||||
baseUrl,
|
||||
account,
|
||||
abortSignal: mergedAbort.signal,
|
||||
abortSignal: daemonLifecycle.abortSignal,
|
||||
runtime,
|
||||
onEvent: (event) => {
|
||||
void handleEvent(event).catch((err) => {
|
||||
@@ -423,17 +439,19 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
||||
});
|
||||
},
|
||||
});
|
||||
const daemonExitError = daemonLifecycle.getExitError();
|
||||
if (daemonExitError) {
|
||||
throw daemonExitError;
|
||||
}
|
||||
} catch (err) {
|
||||
const daemonExitError = daemonLifecycle.getExitError();
|
||||
if (opts.abortSignal?.aborted && !daemonExitError) {
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
mergedAbort.dispose();
|
||||
daemonLifecycle.dispose();
|
||||
opts.abortSignal?.removeEventListener("abort", onAbort);
|
||||
stopDaemon();
|
||||
daemonLifecycle.stop();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user