mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 07:42:44 +00:00
fix(stability): patch regex retries and timeout abort handling
This commit is contained in:
committed by
Peter Steinberger
parent
99a2f5379e
commit
1051f42f96
@@ -591,4 +591,52 @@ describe("sessions", () => {
|
|||||||
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
|
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
|
||||||
await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow();
|
await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("updateSessionStoreEntry re-reads disk inside lock instead of using stale cache", async () => {
|
||||||
|
const mainSessionKey = "agent:main:main";
|
||||||
|
const dir = await createCaseDir("updateSessionStoreEntry-cache-bypass");
|
||||||
|
const storePath = path.join(dir, "sessions.json");
|
||||||
|
await fs.writeFile(
|
||||||
|
storePath,
|
||||||
|
JSON.stringify(
|
||||||
|
{
|
||||||
|
[mainSessionKey]: {
|
||||||
|
sessionId: "sess-1",
|
||||||
|
updatedAt: 123,
|
||||||
|
thinkingLevel: "low",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
2,
|
||||||
|
),
|
||||||
|
"utf-8",
|
||||||
|
);
|
||||||
|
|
||||||
|
// Prime the in-process cache with the original entry.
|
||||||
|
expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low");
|
||||||
|
const originalStat = await fs.stat(storePath);
|
||||||
|
|
||||||
|
// Simulate an external writer that updates the store but preserves mtime.
|
||||||
|
const externalStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
|
||||||
|
string,
|
||||||
|
Record<string, unknown>
|
||||||
|
>;
|
||||||
|
externalStore[mainSessionKey] = {
|
||||||
|
...externalStore[mainSessionKey],
|
||||||
|
providerOverride: "anthropic",
|
||||||
|
updatedAt: 124,
|
||||||
|
};
|
||||||
|
await fs.writeFile(storePath, JSON.stringify(externalStore, null, 2), "utf-8");
|
||||||
|
await fs.utimes(storePath, originalStat.atime, originalStat.mtime);
|
||||||
|
|
||||||
|
await updateSessionStoreEntry({
|
||||||
|
storePath,
|
||||||
|
sessionKey: mainSessionKey,
|
||||||
|
update: async () => ({ thinkingLevel: "high" }),
|
||||||
|
});
|
||||||
|
|
||||||
|
const store = loadSessionStore(storePath);
|
||||||
|
expect(store[mainSessionKey]?.providerOverride).toBe("anthropic");
|
||||||
|
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -806,7 +806,7 @@ export async function updateSessionStoreEntry(params: {
|
|||||||
}): Promise<SessionEntry | null> {
|
}): Promise<SessionEntry | null> {
|
||||||
const { storePath, sessionKey, update } = params;
|
const { storePath, sessionKey, update } = params;
|
||||||
return await withSessionStoreLock(storePath, async () => {
|
return await withSessionStoreLock(storePath, async () => {
|
||||||
const store = loadSessionStore(storePath);
|
const store = loadSessionStore(storePath, { skipCache: true });
|
||||||
const existing = store[sessionKey];
|
const existing = store[sessionKey];
|
||||||
if (!existing) {
|
if (!existing) {
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
@@ -154,6 +154,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||||||
deps: CliDeps;
|
deps: CliDeps;
|
||||||
job: CronJob;
|
job: CronJob;
|
||||||
message: string;
|
message: string;
|
||||||
|
abortSignal?: AbortSignal;
|
||||||
sessionKey: string;
|
sessionKey: string;
|
||||||
agentId?: string;
|
agentId?: string;
|
||||||
lane?: string;
|
lane?: string;
|
||||||
@@ -454,6 +455,9 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||||||
agentDir,
|
agentDir,
|
||||||
fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
|
fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
|
||||||
run: (providerOverride, modelOverride) => {
|
run: (providerOverride, modelOverride) => {
|
||||||
|
if (params.abortSignal?.aborted) {
|
||||||
|
throw new Error("cron: isolated run aborted");
|
||||||
|
}
|
||||||
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
|
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
|
||||||
const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride);
|
const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride);
|
||||||
return runCliAgent({
|
return runCliAgent({
|
||||||
@@ -492,6 +496,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||||||
runId: cronSession.sessionEntry.sessionId,
|
runId: cronSession.sessionEntry.sessionId,
|
||||||
requireExplicitMessageTarget: true,
|
requireExplicitMessageTarget: true,
|
||||||
disableMessageTool: deliveryRequested,
|
disableMessageTool: deliveryRequested,
|
||||||
|
abortSignal: params.abortSignal,
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -683,6 +683,55 @@ describe("Cron issue regressions", () => {
|
|||||||
expect(job?.state.lastStatus).toBe("ok");
|
expect(job?.state.lastStatus).toBe("ok");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("aborts isolated runs when cron timeout fires", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
||||||
|
const cronJob = createIsolatedRegressionJob({
|
||||||
|
id: "abort-on-timeout",
|
||||||
|
name: "abort timeout",
|
||||||
|
scheduledAt,
|
||||||
|
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
|
||||||
|
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 },
|
||||||
|
state: { nextRunAtMs: scheduledAt },
|
||||||
|
});
|
||||||
|
await writeCronJobs(store.storePath, [cronJob]);
|
||||||
|
|
||||||
|
let now = scheduledAt;
|
||||||
|
let observedAbortSignal: AbortSignal | undefined;
|
||||||
|
const state = createCronServiceState({
|
||||||
|
cronEnabled: true,
|
||||||
|
storePath: store.storePath,
|
||||||
|
log: noopLogger,
|
||||||
|
nowMs: () => now,
|
||||||
|
enqueueSystemEvent: vi.fn(),
|
||||||
|
requestHeartbeatNow: vi.fn(),
|
||||||
|
runIsolatedAgentJob: vi.fn(async ({ abortSignal }) => {
|
||||||
|
observedAbortSignal = abortSignal;
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
if (!abortSignal) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (abortSignal.aborted) {
|
||||||
|
resolve();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
abortSignal.addEventListener("abort", () => resolve(), { once: true });
|
||||||
|
});
|
||||||
|
now += 5;
|
||||||
|
return { status: "ok" as const, summary: "late" };
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
await onTimer(state);
|
||||||
|
|
||||||
|
expect(observedAbortSignal).toBeDefined();
|
||||||
|
expect(observedAbortSignal?.aborted).toBe(true);
|
||||||
|
const job = state.store?.jobs.find((entry) => entry.id === "abort-on-timeout");
|
||||||
|
expect(job?.state.lastStatus).toBe("error");
|
||||||
|
expect(job?.state.lastError).toContain("timed out");
|
||||||
|
});
|
||||||
|
|
||||||
it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => {
|
it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => {
|
||||||
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
||||||
const cronJob = createIsolatedRegressionJob({
|
const cronJob = createIsolatedRegressionJob({
|
||||||
|
|||||||
@@ -62,7 +62,11 @@ export type CronServiceDeps = {
|
|||||||
wakeNowHeartbeatBusyMaxWaitMs?: number;
|
wakeNowHeartbeatBusyMaxWaitMs?: number;
|
||||||
/** WakeMode=now: delay between runHeartbeatOnce retries while busy. */
|
/** WakeMode=now: delay between runHeartbeatOnce retries while busy. */
|
||||||
wakeNowHeartbeatBusyRetryDelayMs?: number;
|
wakeNowHeartbeatBusyRetryDelayMs?: number;
|
||||||
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<
|
runIsolatedAgentJob: (params: {
|
||||||
|
job: CronJob;
|
||||||
|
message: string;
|
||||||
|
abortSignal?: AbortSignal;
|
||||||
|
}) => Promise<
|
||||||
{
|
{
|
||||||
summary?: string;
|
summary?: string;
|
||||||
/** Last non-empty agent text output (not truncated). */
|
/** Last non-empty agent text output (not truncated). */
|
||||||
|
|||||||
@@ -267,18 +267,20 @@ export async function onTimer(state: CronServiceState) {
|
|||||||
: DEFAULT_JOB_TIMEOUT_MS;
|
: DEFAULT_JOB_TIMEOUT_MS;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const runAbortController =
|
||||||
|
typeof jobTimeoutMs === "number" ? new AbortController() : undefined;
|
||||||
const result =
|
const result =
|
||||||
typeof jobTimeoutMs === "number"
|
typeof jobTimeoutMs === "number"
|
||||||
? await (async () => {
|
? await (async () => {
|
||||||
let timeoutId: NodeJS.Timeout | undefined;
|
let timeoutId: NodeJS.Timeout | undefined;
|
||||||
try {
|
try {
|
||||||
return await Promise.race([
|
return await Promise.race([
|
||||||
executeJobCore(state, job),
|
executeJobCore(state, job, runAbortController?.signal),
|
||||||
new Promise<never>((_, reject) => {
|
new Promise<never>((_, reject) => {
|
||||||
timeoutId = setTimeout(
|
timeoutId = setTimeout(() => {
|
||||||
() => reject(new Error("cron: job execution timed out")),
|
runAbortController?.abort(new Error("cron: job execution timed out"));
|
||||||
jobTimeoutMs,
|
reject(new Error("cron: job execution timed out"));
|
||||||
);
|
}, jobTimeoutMs);
|
||||||
}),
|
}),
|
||||||
]);
|
]);
|
||||||
} finally {
|
} finally {
|
||||||
@@ -565,6 +567,7 @@ export async function runDueJobs(state: CronServiceState) {
|
|||||||
async function executeJobCore(
|
async function executeJobCore(
|
||||||
state: CronServiceState,
|
state: CronServiceState,
|
||||||
job: CronJob,
|
job: CronJob,
|
||||||
|
abortSignal?: AbortSignal,
|
||||||
): Promise<CronRunOutcome & CronRunTelemetry & { delivered?: boolean }> {
|
): Promise<CronRunOutcome & CronRunTelemetry & { delivered?: boolean }> {
|
||||||
if (job.sessionTarget === "main") {
|
if (job.sessionTarget === "main") {
|
||||||
const text = resolveJobPayloadTextForMain(job);
|
const text = resolveJobPayloadTextForMain(job);
|
||||||
@@ -634,10 +637,14 @@ async function executeJobCore(
|
|||||||
if (job.payload.kind !== "agentTurn") {
|
if (job.payload.kind !== "agentTurn") {
|
||||||
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
|
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
|
||||||
}
|
}
|
||||||
|
if (abortSignal?.aborted) {
|
||||||
|
return { status: "error", error: "cron: job execution aborted" };
|
||||||
|
}
|
||||||
|
|
||||||
const res = await state.deps.runIsolatedAgentJob({
|
const res = await state.deps.runIsolatedAgentJob({
|
||||||
job,
|
job,
|
||||||
message: job.payload.message,
|
message: job.payload.message,
|
||||||
|
abortSignal,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Post a short summary back to the main session — but only when the
|
// Post a short summary back to the main session — but only when the
|
||||||
|
|||||||
@@ -185,13 +185,14 @@ export function buildGatewayCronService(params: {
|
|||||||
deps: { ...params.deps, runtime: defaultRuntime },
|
deps: { ...params.deps, runtime: defaultRuntime },
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
runIsolatedAgentJob: async ({ job, message }) => {
|
runIsolatedAgentJob: async ({ job, message, abortSignal }) => {
|
||||||
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
|
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
|
||||||
return await runCronIsolatedAgentTurn({
|
return await runCronIsolatedAgentTurn({
|
||||||
cfg: runtimeConfig,
|
cfg: runtimeConfig,
|
||||||
deps: params.deps,
|
deps: params.deps,
|
||||||
job,
|
job,
|
||||||
message,
|
message,
|
||||||
|
abortSignal,
|
||||||
agentId,
|
agentId,
|
||||||
sessionKey: `cron:${job.id}`,
|
sessionKey: `cron:${job.id}`,
|
||||||
lane: "cron",
|
lane: "cron",
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ describe("fetchClaudeUsage", () => {
|
|||||||
expect(result.windows).toEqual([{ label: "5h", usedPercent: 12, resetAt: undefined }]);
|
expect(result.windows).toEqual([{ label: "5h", usedPercent: 12, resetAt: undefined }]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("keeps oauth error when cookie header cannot be parsed into a session key", async () => {
|
it("parses sessionKey from CLAUDE_WEB_COOKIE for web fallback", async () => {
|
||||||
vi.stubEnv("CLAUDE_WEB_COOKIE", "sessionKey=sk-ant-cookie-session");
|
vi.stubEnv("CLAUDE_WEB_COOKIE", "sessionKey=sk-ant-cookie-session");
|
||||||
|
|
||||||
const mockFetch = createScopeFallbackFetch(async (url) => {
|
const mockFetch = createScopeFallbackFetch(async (url) => {
|
||||||
@@ -120,7 +120,10 @@ describe("fetchClaudeUsage", () => {
|
|||||||
return makeResponse(404, "not found");
|
return makeResponse(404, "not found");
|
||||||
});
|
});
|
||||||
|
|
||||||
await expectMissingScopeWithoutFallback(mockFetch);
|
const result = await fetchClaudeUsage("token", 5000, mockFetch);
|
||||||
|
expect(result.error).toBeUndefined();
|
||||||
|
expect(result.windows).toEqual([{ label: "Opus", usedPercent: 44 }]);
|
||||||
|
expect(mockFetch).toHaveBeenCalledTimes(3);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("keeps oauth error when fallback session key is unavailable", async () => {
|
it("keeps oauth error when fallback session key is unavailable", async () => {
|
||||||
|
|||||||
@@ -57,8 +57,8 @@ function resolveClaudeWebSessionKey(): string | undefined {
|
|||||||
if (!cookieHeader) {
|
if (!cookieHeader) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
const stripped = cookieHeader.replace(/^cookie:\\s*/i, "");
|
const stripped = cookieHeader.replace(/^cookie:\s*/i, "");
|
||||||
const match = stripped.match(/(?:^|;\\s*)sessionKey=([^;\\s]+)/i);
|
const match = stripped.match(/(?:^|;\s*)sessionKey=([^;\s]+)/i);
|
||||||
const value = match?.[1]?.trim();
|
const value = match?.[1]?.trim();
|
||||||
return value?.startsWith("sk-ant-") ? value : undefined;
|
return value?.startsWith("sk-ant-") ? value : undefined;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ export type SignalDaemonOpts = {
|
|||||||
export type SignalDaemonHandle = {
|
export type SignalDaemonHandle = {
|
||||||
pid?: number;
|
pid?: number;
|
||||||
stop: () => void;
|
stop: () => void;
|
||||||
|
exited: Promise<{ code: number | null; signal: NodeJS.Signals | null }>;
|
||||||
|
isExited: () => boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
export function classifySignalCliLogLine(line: string): "log" | "error" | null {
|
export function classifySignalCliLogLine(line: string): "log" | "error" | null {
|
||||||
@@ -83,17 +85,51 @@ export function spawnSignalDaemon(opts: SignalDaemonOpts): SignalDaemonHandle {
|
|||||||
});
|
});
|
||||||
const log = opts.runtime?.log ?? (() => {});
|
const log = opts.runtime?.log ?? (() => {});
|
||||||
const error = opts.runtime?.error ?? (() => {});
|
const error = opts.runtime?.error ?? (() => {});
|
||||||
|
let exited = false;
|
||||||
|
let settledExit = false;
|
||||||
|
let resolveExit!: (value: { code: number | null; signal: NodeJS.Signals | null }) => void;
|
||||||
|
const exitedPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>(
|
||||||
|
(resolve) => {
|
||||||
|
resolveExit = resolve;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
const settleExit = (value: { code: number | null; signal: NodeJS.Signals | null }) => {
|
||||||
|
if (settledExit) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settledExit = true;
|
||||||
|
exited = true;
|
||||||
|
resolveExit(value);
|
||||||
|
};
|
||||||
|
|
||||||
bindSignalCliOutput({ stream: child.stdout, log, error });
|
bindSignalCliOutput({ stream: child.stdout, log, error });
|
||||||
bindSignalCliOutput({ stream: child.stderr, log, error });
|
bindSignalCliOutput({ stream: child.stderr, log, error });
|
||||||
|
child.once("exit", (code, signal) => {
|
||||||
|
settleExit({
|
||||||
|
code: typeof code === "number" ? code : null,
|
||||||
|
signal: signal ?? null,
|
||||||
|
});
|
||||||
|
error(
|
||||||
|
`signal-cli daemon exited (code=${String(code ?? "null")} signal=${String(signal ?? "null")})`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
child.once("close", (code, signal) => {
|
||||||
|
settleExit({
|
||||||
|
code: typeof code === "number" ? code : null,
|
||||||
|
signal: signal ?? null,
|
||||||
|
});
|
||||||
|
});
|
||||||
child.on("error", (err) => {
|
child.on("error", (err) => {
|
||||||
error(`signal-cli spawn error: ${String(err)}`);
|
error(`signal-cli spawn error: ${String(err)}`);
|
||||||
|
settleExit({ code: null, signal: null });
|
||||||
});
|
});
|
||||||
|
|
||||||
return {
|
return {
|
||||||
pid: child.pid ?? undefined,
|
pid: child.pid ?? undefined,
|
||||||
|
exited: exitedPromise,
|
||||||
|
isExited: () => exited,
|
||||||
stop: () => {
|
stop: () => {
|
||||||
if (!child.killed) {
|
if (!child.killed && !exited) {
|
||||||
child.kill("SIGTERM");
|
child.kill("SIGTERM");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ const {
|
|||||||
updateLastRouteMock,
|
updateLastRouteMock,
|
||||||
upsertPairingRequestMock,
|
upsertPairingRequestMock,
|
||||||
waitForTransportReadyMock,
|
waitForTransportReadyMock,
|
||||||
|
spawnSignalDaemonMock,
|
||||||
} = getSignalToolResultTestMocks();
|
} = getSignalToolResultTestMocks();
|
||||||
|
|
||||||
const SIGNAL_BASE_URL = "http://127.0.0.1:8080";
|
const SIGNAL_BASE_URL = "http://127.0.0.1:8080";
|
||||||
@@ -176,7 +177,7 @@ describe("monitorSignalProvider tool results", () => {
|
|||||||
logIntervalMs: 10_000,
|
logIntervalMs: 10_000,
|
||||||
pollIntervalMs: 150,
|
pollIntervalMs: 150,
|
||||||
runtime,
|
runtime,
|
||||||
abortSignal: abortController.signal,
|
abortSignal: expect.any(AbortSignal),
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
@@ -212,6 +213,39 @@ describe("monitorSignalProvider tool results", () => {
|
|||||||
expectWaitForTransportReadyTimeout(120_000);
|
expectWaitForTransportReadyTimeout(120_000);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("fails fast when auto-started signal daemon exits during startup", async () => {
|
||||||
|
const runtime = createMonitorRuntime();
|
||||||
|
setSignalAutoStartConfig();
|
||||||
|
spawnSignalDaemonMock.mockReturnValueOnce({
|
||||||
|
stop: vi.fn(),
|
||||||
|
exited: Promise.resolve({ code: 1, signal: null }),
|
||||||
|
isExited: () => true,
|
||||||
|
});
|
||||||
|
waitForTransportReadyMock.mockImplementationOnce(
|
||||||
|
async (params: { abortSignal?: AbortSignal | null }) => {
|
||||||
|
await new Promise<void>((_resolve, reject) => {
|
||||||
|
if (params.abortSignal?.aborted) {
|
||||||
|
reject(params.abortSignal.reason);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
params.abortSignal?.addEventListener(
|
||||||
|
"abort",
|
||||||
|
() => reject(params.abortSignal?.reason ?? new Error("aborted")),
|
||||||
|
{ once: true },
|
||||||
|
);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
runMonitorWithMocks({
|
||||||
|
autoStart: true,
|
||||||
|
baseUrl: SIGNAL_BASE_URL,
|
||||||
|
runtime,
|
||||||
|
}),
|
||||||
|
).rejects.toThrow(/signal daemon exited/i);
|
||||||
|
});
|
||||||
|
|
||||||
it("skips tool summaries with responsePrefix", async () => {
|
it("skips tool summaries with responsePrefix", async () => {
|
||||||
replyMock.mockResolvedValue({ text: "final reply" });
|
replyMock.mockResolvedValue({ text: "final reply" });
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ type SignalToolResultTestMocks = {
|
|||||||
streamMock: MockFn;
|
streamMock: MockFn;
|
||||||
signalCheckMock: MockFn;
|
signalCheckMock: MockFn;
|
||||||
signalRpcRequestMock: MockFn;
|
signalRpcRequestMock: MockFn;
|
||||||
|
spawnSignalDaemonMock: MockFn;
|
||||||
};
|
};
|
||||||
|
|
||||||
const waitForTransportReadyMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
const waitForTransportReadyMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
||||||
@@ -24,6 +25,7 @@ const upsertPairingRequestMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
|||||||
const streamMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
const streamMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
||||||
const signalCheckMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
const signalCheckMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
||||||
const signalRpcRequestMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
const signalRpcRequestMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
||||||
|
const spawnSignalDaemonMock = vi.hoisted(() => vi.fn()) as unknown as MockFn;
|
||||||
|
|
||||||
export function getSignalToolResultTestMocks(): SignalToolResultTestMocks {
|
export function getSignalToolResultTestMocks(): SignalToolResultTestMocks {
|
||||||
return {
|
return {
|
||||||
@@ -36,6 +38,7 @@ export function getSignalToolResultTestMocks(): SignalToolResultTestMocks {
|
|||||||
streamMock,
|
streamMock,
|
||||||
signalCheckMock,
|
signalCheckMock,
|
||||||
signalRpcRequestMock,
|
signalRpcRequestMock,
|
||||||
|
spawnSignalDaemonMock,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -84,7 +87,7 @@ vi.mock("./client.js", () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock("./daemon.js", () => ({
|
vi.mock("./daemon.js", () => ({
|
||||||
spawnSignalDaemon: vi.fn(() => ({ stop: vi.fn() })),
|
spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock("../infra/transport-ready.js", () => ({
|
vi.mock("../infra/transport-ready.js", () => ({
|
||||||
@@ -107,6 +110,11 @@ export function installSignalToolResultTestHooks() {
|
|||||||
streamMock.mockReset();
|
streamMock.mockReset();
|
||||||
signalCheckMock.mockReset().mockResolvedValue({});
|
signalCheckMock.mockReset().mockResolvedValue({});
|
||||||
signalRpcRequestMock.mockReset().mockResolvedValue({});
|
signalRpcRequestMock.mockReset().mockResolvedValue({});
|
||||||
|
spawnSignalDaemonMock.mockReset().mockReturnValue({
|
||||||
|
stop: vi.fn(),
|
||||||
|
exited: new Promise(() => {}),
|
||||||
|
isExited: () => false,
|
||||||
|
});
|
||||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||||
upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true });
|
upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true });
|
||||||
waitForTransportReadyMock.mockReset().mockResolvedValue(undefined);
|
waitForTransportReadyMock.mockReset().mockResolvedValue(undefined);
|
||||||
|
|||||||
@@ -47,6 +47,46 @@ function resolveRuntime(opts: MonitorSignalOpts): RuntimeEnv {
|
|||||||
return opts.runtime ?? createNonExitingRuntime();
|
return opts.runtime ?? createNonExitingRuntime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function mergeAbortSignals(
|
||||||
|
a?: AbortSignal,
|
||||||
|
b?: AbortSignal,
|
||||||
|
): { signal?: AbortSignal; dispose: () => void } {
|
||||||
|
if (!a && !b) {
|
||||||
|
return { signal: undefined, dispose: () => {} };
|
||||||
|
}
|
||||||
|
if (!a) {
|
||||||
|
return { signal: b, dispose: () => {} };
|
||||||
|
}
|
||||||
|
if (!b) {
|
||||||
|
return { signal: a, dispose: () => {} };
|
||||||
|
}
|
||||||
|
const controller = new AbortController();
|
||||||
|
const abortFrom = (source: AbortSignal) => {
|
||||||
|
if (!controller.signal.aborted) {
|
||||||
|
controller.abort(source.reason);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if (a.aborted) {
|
||||||
|
abortFrom(a);
|
||||||
|
return { signal: controller.signal, dispose: () => {} };
|
||||||
|
}
|
||||||
|
if (b.aborted) {
|
||||||
|
abortFrom(b);
|
||||||
|
return { signal: controller.signal, dispose: () => {} };
|
||||||
|
}
|
||||||
|
const onAbortA = () => abortFrom(a);
|
||||||
|
const onAbortB = () => abortFrom(b);
|
||||||
|
a.addEventListener("abort", onAbortA, { once: true });
|
||||||
|
b.addEventListener("abort", onAbortB, { once: true });
|
||||||
|
return {
|
||||||
|
signal: controller.signal,
|
||||||
|
dispose: () => {
|
||||||
|
a.removeEventListener("abort", onAbortA);
|
||||||
|
b.removeEventListener("abort", onAbortB);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
function normalizeAllowList(raw?: Array<string | number>): string[] {
|
function normalizeAllowList(raw?: Array<string | number>): string[] {
|
||||||
return normalizeStringEntries(raw);
|
return normalizeStringEntries(raw);
|
||||||
}
|
}
|
||||||
@@ -286,6 +326,9 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
|||||||
Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000),
|
Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000),
|
||||||
);
|
);
|
||||||
const readReceiptsViaDaemon = Boolean(autoStart && sendReadReceipts);
|
const readReceiptsViaDaemon = Boolean(autoStart && sendReadReceipts);
|
||||||
|
let daemonExitError: Error | undefined;
|
||||||
|
const daemonAbortController = new AbortController();
|
||||||
|
const mergedAbort = mergeAbortSignals(opts.abortSignal, daemonAbortController.signal);
|
||||||
let daemonHandle: ReturnType<typeof spawnSignalDaemon> | null = null;
|
let daemonHandle: ReturnType<typeof spawnSignalDaemon> | null = null;
|
||||||
|
|
||||||
if (autoStart) {
|
if (autoStart) {
|
||||||
@@ -303,6 +346,14 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
|||||||
sendReadReceipts,
|
sendReadReceipts,
|
||||||
runtime,
|
runtime,
|
||||||
});
|
});
|
||||||
|
void daemonHandle.exited.then((exit) => {
|
||||||
|
daemonExitError = new Error(
|
||||||
|
`signal daemon exited (code=${String(exit.code ?? "null")} signal=${String(exit.signal ?? "null")})`,
|
||||||
|
);
|
||||||
|
if (!daemonAbortController.signal.aborted) {
|
||||||
|
daemonAbortController.abort(daemonExitError);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const onAbort = () => {
|
const onAbort = () => {
|
||||||
@@ -314,12 +365,15 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
|||||||
if (daemonHandle) {
|
if (daemonHandle) {
|
||||||
await waitForSignalDaemonReady({
|
await waitForSignalDaemonReady({
|
||||||
baseUrl,
|
baseUrl,
|
||||||
abortSignal: opts.abortSignal,
|
abortSignal: mergedAbort.signal,
|
||||||
timeoutMs: startupTimeoutMs,
|
timeoutMs: startupTimeoutMs,
|
||||||
logAfterMs: 10_000,
|
logAfterMs: 10_000,
|
||||||
logIntervalMs: 10_000,
|
logIntervalMs: 10_000,
|
||||||
runtime,
|
runtime,
|
||||||
});
|
});
|
||||||
|
if (daemonExitError) {
|
||||||
|
throw daemonExitError;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const handleEvent = createSignalEventHandler({
|
const handleEvent = createSignalEventHandler({
|
||||||
@@ -353,7 +407,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
|||||||
await runSignalSseLoop({
|
await runSignalSseLoop({
|
||||||
baseUrl,
|
baseUrl,
|
||||||
account,
|
account,
|
||||||
abortSignal: opts.abortSignal,
|
abortSignal: mergedAbort.signal,
|
||||||
runtime,
|
runtime,
|
||||||
onEvent: (event) => {
|
onEvent: (event) => {
|
||||||
void handleEvent(event).catch((err) => {
|
void handleEvent(event).catch((err) => {
|
||||||
@@ -361,12 +415,16 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
|
|||||||
});
|
});
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
if (daemonExitError) {
|
||||||
|
throw daemonExitError;
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (opts.abortSignal?.aborted) {
|
if (opts.abortSignal?.aborted && !daemonExitError) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw err;
|
throw err;
|
||||||
} finally {
|
} finally {
|
||||||
|
mergedAbort.dispose();
|
||||||
opts.abortSignal?.removeEventListener("abort", onAbort);
|
opts.abortSignal?.removeEventListener("abort", onAbort);
|
||||||
daemonHandle?.stop();
|
daemonHandle?.stop();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,6 +98,28 @@ describe("deliverWebReply", () => {
|
|||||||
expect(sleep).toHaveBeenCalledWith(500);
|
expect(sleep).toHaveBeenCalledWith(500);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("retries text send when error contains timed out", async () => {
|
||||||
|
const msg = makeMsg();
|
||||||
|
(msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce(
|
||||||
|
new Error("operation timed out"),
|
||||||
|
);
|
||||||
|
(msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce(
|
||||||
|
undefined,
|
||||||
|
);
|
||||||
|
|
||||||
|
await deliverWebReply({
|
||||||
|
replyResult: { text: "hi" },
|
||||||
|
msg,
|
||||||
|
maxMediaBytes: 1024 * 1024,
|
||||||
|
textLimit: 200,
|
||||||
|
replyLogger,
|
||||||
|
skipLog: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(msg.reply).toHaveBeenCalledTimes(2);
|
||||||
|
expect(sleep).toHaveBeenCalledWith(500);
|
||||||
|
});
|
||||||
|
|
||||||
it("sends image media with caption and then remaining text", async () => {
|
it("sends image media with caption and then remaining text", async () => {
|
||||||
const msg = makeMsg();
|
const msg = makeMsg();
|
||||||
const mediaLocalRoots = ["/tmp/workspace-work"];
|
const mediaLocalRoots = ["/tmp/workspace-work"];
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ export async function deliverWebReply(params: {
|
|||||||
lastErr = err;
|
lastErr = err;
|
||||||
const errText = formatError(err);
|
const errText = formatError(err);
|
||||||
const isLast = attempt === maxAttempts;
|
const isLast = attempt === maxAttempts;
|
||||||
const shouldRetry = /closed|reset|timed\\s*out|disconnect/i.test(errText);
|
const shouldRetry = /closed|reset|timed\s*out|disconnect/i.test(errText);
|
||||||
if (!shouldRetry || isLast) {
|
if (!shouldRetry || isLast) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user