mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 19:44:59 +00:00
Co-authored-by: Shawn <shenghuikevin@shenghuideMac-mini.local> Co-authored-by: 不做了睡大觉 <user@example.com> Co-authored-by: Marcus Widing <widing.marcus@gmail.com>
This commit is contained in:
@@ -37,12 +37,16 @@ import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
|
||||
import type { SpawnSubagentMode } from "./subagent-spawn.js";
|
||||
import { readLatestAssistantReply } from "./tools/agent-step.js";
|
||||
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
|
||||
import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
|
||||
|
||||
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
|
||||
const FAST_TEST_RETRY_INTERVAL_MS = 8;
|
||||
const FAST_TEST_REPLY_CHANGE_WAIT_MS = 20;
|
||||
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 60_000;
|
||||
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
|
||||
const DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS = FAST_TEST_MODE
|
||||
? ([8, 16, 32] as const)
|
||||
: ([5_000, 10_000, 20_000] as const);
|
||||
|
||||
type ToolResultMessage = {
|
||||
role?: unknown;
|
||||
@@ -72,6 +76,9 @@ function buildCompletionDeliveryMessage(params: {
|
||||
outcome?: SubagentRunOutcome;
|
||||
}): string {
|
||||
const findingsText = params.findings.trim();
|
||||
if (isAnnounceSkip(findingsText)) {
|
||||
return "";
|
||||
}
|
||||
const hasFindings = findingsText.length > 0 && findingsText !== "(no output)";
|
||||
const header = (() => {
|
||||
if (params.outcome?.status === "error") {
|
||||
@@ -111,6 +118,92 @@ function summarizeDeliveryError(error: unknown): string {
|
||||
}
|
||||
}
|
||||
|
||||
const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
|
||||
/\berrorcode=unavailable\b/i,
|
||||
/\bstatus\s*[:=]\s*"?unavailable\b/i,
|
||||
/\bUNAVAILABLE\b/,
|
||||
/no active .* listener/i,
|
||||
/gateway not connected/i,
|
||||
/gateway closed \(1006/i,
|
||||
/gateway timeout/i,
|
||||
/\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i,
|
||||
];
|
||||
|
||||
const PERMANENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
|
||||
/unsupported channel/i,
|
||||
/unknown channel/i,
|
||||
/chat not found/i,
|
||||
/user not found/i,
|
||||
/bot was blocked by the user/i,
|
||||
/forbidden: bot was kicked/i,
|
||||
/recipient is not a valid/i,
|
||||
/outbound not configured for channel/i,
|
||||
];
|
||||
|
||||
function isTransientAnnounceDeliveryError(error: unknown): boolean {
|
||||
const message = summarizeDeliveryError(error);
|
||||
if (!message) {
|
||||
return false;
|
||||
}
|
||||
if (PERMANENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message))) {
|
||||
return false;
|
||||
}
|
||||
return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message));
|
||||
}
|
||||
|
||||
async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise<void> {
|
||||
if (ms <= 0) {
|
||||
return;
|
||||
}
|
||||
if (!signal) {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, ms));
|
||||
return;
|
||||
}
|
||||
if (signal.aborted) {
|
||||
return;
|
||||
}
|
||||
await new Promise<void>((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
}, ms);
|
||||
const onAbort = () => {
|
||||
clearTimeout(timer);
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
};
|
||||
signal.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
async function runAnnounceDeliveryWithRetry<T>(params: {
|
||||
operation: string;
|
||||
signal?: AbortSignal;
|
||||
run: () => Promise<T>;
|
||||
}): Promise<T> {
|
||||
let retryIndex = 0;
|
||||
for (;;) {
|
||||
if (params.signal?.aborted) {
|
||||
throw new Error("announce delivery aborted");
|
||||
}
|
||||
try {
|
||||
return await params.run();
|
||||
} catch (err) {
|
||||
const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex];
|
||||
if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) {
|
||||
throw err;
|
||||
}
|
||||
const nextAttempt = retryIndex + 2;
|
||||
const maxAttempts = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS.length + 1;
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent announce ${params.operation} transient failure, retrying ${nextAttempt}/${maxAttempts} in ${Math.round(delayMs / 1000)}s: ${summarizeDeliveryError(err)}`,
|
||||
);
|
||||
retryIndex += 1;
|
||||
await waitForAnnounceRetryDelay(delayMs, params.signal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function extractToolResultText(content: unknown): string {
|
||||
if (typeof content === "string") {
|
||||
return sanitizeTextContent(content);
|
||||
@@ -712,18 +805,23 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
await callGateway({
|
||||
method: "send",
|
||||
params: {
|
||||
channel: completionChannel,
|
||||
to: completionTo,
|
||||
accountId: completionDirectOrigin?.accountId,
|
||||
threadId: completionThreadId,
|
||||
sessionKey: canonicalRequesterSessionKey,
|
||||
message: params.completionMessage,
|
||||
idempotencyKey: params.directIdempotencyKey,
|
||||
},
|
||||
timeoutMs: announceTimeoutMs,
|
||||
await runAnnounceDeliveryWithRetry({
|
||||
operation: "completion direct send",
|
||||
signal: params.signal,
|
||||
run: async () =>
|
||||
await callGateway({
|
||||
method: "send",
|
||||
params: {
|
||||
channel: completionChannel,
|
||||
to: completionTo,
|
||||
accountId: completionDirectOrigin?.accountId,
|
||||
threadId: completionThreadId,
|
||||
sessionKey: canonicalRequesterSessionKey,
|
||||
message: params.completionMessage,
|
||||
idempotencyKey: params.directIdempotencyKey,
|
||||
},
|
||||
timeoutMs: announceTimeoutMs,
|
||||
}),
|
||||
});
|
||||
|
||||
return {
|
||||
@@ -754,21 +852,26 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: canonicalRequesterSessionKey,
|
||||
message: params.triggerMessage,
|
||||
deliver: shouldDeliverExternally,
|
||||
bestEffortDeliver: params.bestEffortDeliver,
|
||||
channel: shouldDeliverExternally ? directChannel : undefined,
|
||||
accountId: shouldDeliverExternally ? directOrigin?.accountId : undefined,
|
||||
to: shouldDeliverExternally ? directTo : undefined,
|
||||
threadId: shouldDeliverExternally ? threadId : undefined,
|
||||
idempotencyKey: params.directIdempotencyKey,
|
||||
},
|
||||
expectFinal: true,
|
||||
timeoutMs: announceTimeoutMs,
|
||||
await runAnnounceDeliveryWithRetry({
|
||||
operation: "direct announce agent call",
|
||||
signal: params.signal,
|
||||
run: async () =>
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: canonicalRequesterSessionKey,
|
||||
message: params.triggerMessage,
|
||||
deliver: shouldDeliverExternally,
|
||||
bestEffortDeliver: params.bestEffortDeliver,
|
||||
channel: shouldDeliverExternally ? directChannel : undefined,
|
||||
accountId: shouldDeliverExternally ? directOrigin?.accountId : undefined,
|
||||
to: shouldDeliverExternally ? directTo : undefined,
|
||||
threadId: shouldDeliverExternally ? threadId : undefined,
|
||||
idempotencyKey: params.directIdempotencyKey,
|
||||
},
|
||||
expectFinal: true,
|
||||
timeoutMs: announceTimeoutMs,
|
||||
}),
|
||||
});
|
||||
|
||||
return {
|
||||
@@ -1096,6 +1199,10 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isAnnounceSkip(reply)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!outcome) {
|
||||
outcome = { status: "unknown" };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user