mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 02:31:22 +00:00
refactor(cron): extract delivery dispatch + harden reset notices
This commit is contained in:
@@ -221,4 +221,32 @@ describe("runPreparedReply media-only handling", () => {
|
|||||||
expect(resetNoticeCall?.payload?.text).not.toContain("api-key");
|
expect(resetNoticeCall?.payload?.text).not.toContain("api-key");
|
||||||
expect(resetNoticeCall?.payload?.text).not.toContain("env:");
|
expect(resetNoticeCall?.payload?.text).not.toContain("env:");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("skips reset notice when only webchat fallback routing is available", async () => {
|
||||||
|
await runPreparedReply(
|
||||||
|
baseParams({
|
||||||
|
resetTriggered: true,
|
||||||
|
ctx: {
|
||||||
|
Body: "",
|
||||||
|
RawBody: "",
|
||||||
|
CommandBody: "",
|
||||||
|
ThreadHistoryBody: "Earlier message in this thread",
|
||||||
|
OriginatingChannel: undefined,
|
||||||
|
OriginatingTo: undefined,
|
||||||
|
ChatType: "group",
|
||||||
|
},
|
||||||
|
command: {
|
||||||
|
isAuthorizedSender: true,
|
||||||
|
abortKey: "session-key",
|
||||||
|
ownerList: [],
|
||||||
|
senderIsOwner: false,
|
||||||
|
channel: "webchat",
|
||||||
|
from: undefined,
|
||||||
|
to: undefined,
|
||||||
|
} as never,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(vi.mocked(routeReply)).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
414
src/cron/isolated-agent/delivery-dispatch.ts
Normal file
414
src/cron/isolated-agent/delivery-dispatch.ts
Normal file
@@ -0,0 +1,414 @@
|
|||||||
|
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
|
||||||
|
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
|
||||||
|
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
|
||||||
|
import type { ReplyPayload } from "../../auto-reply/types.js";
|
||||||
|
import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
|
||||||
|
import type { OpenClawConfig } from "../../config/config.js";
|
||||||
|
import { resolveAgentMainSessionKey } from "../../config/sessions.js";
|
||||||
|
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
|
||||||
|
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
|
||||||
|
import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js";
|
||||||
|
import { logWarn } from "../../logger.js";
|
||||||
|
import type { CronJob, CronRunTelemetry } from "../types.js";
|
||||||
|
import type { DeliveryTargetResolution } from "./delivery-target.js";
|
||||||
|
import { pickSummaryFromOutput } from "./helpers.js";
|
||||||
|
import type { RunCronAgentTurnResult } from "./run.js";
|
||||||
|
import {
|
||||||
|
expectsSubagentFollowup,
|
||||||
|
isLikelyInterimCronMessage,
|
||||||
|
readDescendantSubagentFallbackReply,
|
||||||
|
waitForDescendantSubagentSummary,
|
||||||
|
} from "./subagent-followup.js";
|
||||||
|
|
||||||
|
export function matchesMessagingToolDeliveryTarget(
|
||||||
|
target: { provider?: string; to?: string; accountId?: string },
|
||||||
|
delivery: { channel?: string; to?: string; accountId?: string },
|
||||||
|
): boolean {
|
||||||
|
if (!delivery.channel || !delivery.to || !target.to) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const channel = delivery.channel.trim().toLowerCase();
|
||||||
|
const provider = target.provider?.trim().toLowerCase();
|
||||||
|
if (provider && provider !== "message" && provider !== channel) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (target.accountId && delivery.accountId && target.accountId !== delivery.accountId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return target.to === delivery.to;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resolveCronDeliveryBestEffort(job: CronJob): boolean {
|
||||||
|
if (typeof job.delivery?.bestEffort === "boolean") {
|
||||||
|
return job.delivery.bestEffort;
|
||||||
|
}
|
||||||
|
if (job.payload.kind === "agentTurn" && typeof job.payload.bestEffortDeliver === "boolean") {
|
||||||
|
return job.payload.bestEffortDeliver;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function resolveCronAnnounceSessionKey(params: {
|
||||||
|
cfg: OpenClawConfig;
|
||||||
|
agentId: string;
|
||||||
|
fallbackSessionKey: string;
|
||||||
|
delivery: {
|
||||||
|
channel: NonNullable<DeliveryTargetResolution["channel"]>;
|
||||||
|
to?: string;
|
||||||
|
accountId?: string;
|
||||||
|
threadId?: string | number;
|
||||||
|
};
|
||||||
|
}): Promise<string> {
|
||||||
|
const to = params.delivery.to?.trim();
|
||||||
|
if (!to) {
|
||||||
|
return params.fallbackSessionKey;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const route = await resolveOutboundSessionRoute({
|
||||||
|
cfg: params.cfg,
|
||||||
|
channel: params.delivery.channel,
|
||||||
|
agentId: params.agentId,
|
||||||
|
accountId: params.delivery.accountId,
|
||||||
|
target: to,
|
||||||
|
threadId: params.delivery.threadId,
|
||||||
|
});
|
||||||
|
const resolved = route?.sessionKey?.trim();
|
||||||
|
if (resolved) {
|
||||||
|
return resolved;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Fall back to main session routing if announce session resolution fails.
|
||||||
|
}
|
||||||
|
return params.fallbackSessionKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type SuccessfulDeliveryTarget = Extract<DeliveryTargetResolution, { ok: true }>;
|
||||||
|
|
||||||
|
type DispatchCronDeliveryParams = {
|
||||||
|
cfg: OpenClawConfig;
|
||||||
|
cfgWithAgentDefaults: OpenClawConfig;
|
||||||
|
deps: CliDeps;
|
||||||
|
job: CronJob;
|
||||||
|
agentId: string;
|
||||||
|
agentSessionKey: string;
|
||||||
|
runSessionId: string;
|
||||||
|
runStartedAt: number;
|
||||||
|
runEndedAt: number;
|
||||||
|
timeoutMs: number;
|
||||||
|
resolvedDelivery: DeliveryTargetResolution;
|
||||||
|
deliveryRequested: boolean;
|
||||||
|
skipHeartbeatDelivery: boolean;
|
||||||
|
skipMessagingToolDelivery: boolean;
|
||||||
|
deliveryBestEffort: boolean;
|
||||||
|
deliveryPayloadHasStructuredContent: boolean;
|
||||||
|
deliveryPayloads: ReplyPayload[];
|
||||||
|
synthesizedText?: string;
|
||||||
|
summary?: string;
|
||||||
|
outputText?: string;
|
||||||
|
telemetry?: CronRunTelemetry;
|
||||||
|
abortSignal?: AbortSignal;
|
||||||
|
isAborted: () => boolean;
|
||||||
|
abortReason: () => string;
|
||||||
|
withRunSession: (
|
||||||
|
result: Omit<RunCronAgentTurnResult, "sessionId" | "sessionKey">,
|
||||||
|
) => RunCronAgentTurnResult;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type DispatchCronDeliveryState = {
|
||||||
|
result?: RunCronAgentTurnResult;
|
||||||
|
delivered: boolean;
|
||||||
|
summary?: string;
|
||||||
|
outputText?: string;
|
||||||
|
synthesizedText?: string;
|
||||||
|
deliveryPayloads: ReplyPayload[];
|
||||||
|
};
|
||||||
|
|
||||||
|
export async function dispatchCronDelivery(
|
||||||
|
params: DispatchCronDeliveryParams,
|
||||||
|
): Promise<DispatchCronDeliveryState> {
|
||||||
|
let summary = params.summary;
|
||||||
|
let outputText = params.outputText;
|
||||||
|
let synthesizedText = params.synthesizedText;
|
||||||
|
let deliveryPayloads = params.deliveryPayloads;
|
||||||
|
|
||||||
|
// `true` means we confirmed at least one outbound send reached the target.
|
||||||
|
// Keep this strict so timer fallback can safely decide whether to wake main.
|
||||||
|
let delivered = params.skipMessagingToolDelivery;
|
||||||
|
const failDeliveryTarget = (error: string) =>
|
||||||
|
params.withRunSession({
|
||||||
|
status: "error",
|
||||||
|
error,
|
||||||
|
errorKind: "delivery-target",
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
...params.telemetry,
|
||||||
|
});
|
||||||
|
|
||||||
|
const deliverViaDirect = async (
|
||||||
|
delivery: SuccessfulDeliveryTarget,
|
||||||
|
): Promise<RunCronAgentTurnResult | null> => {
|
||||||
|
const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId);
|
||||||
|
try {
|
||||||
|
const payloadsForDelivery =
|
||||||
|
deliveryPayloads.length > 0
|
||||||
|
? deliveryPayloads
|
||||||
|
: synthesizedText
|
||||||
|
? [{ text: synthesizedText }]
|
||||||
|
: [];
|
||||||
|
if (payloadsForDelivery.length === 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (params.isAborted()) {
|
||||||
|
return params.withRunSession({
|
||||||
|
status: "error",
|
||||||
|
error: params.abortReason(),
|
||||||
|
...params.telemetry,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
const deliveryResults = await deliverOutboundPayloads({
|
||||||
|
cfg: params.cfgWithAgentDefaults,
|
||||||
|
channel: delivery.channel,
|
||||||
|
to: delivery.to,
|
||||||
|
accountId: delivery.accountId,
|
||||||
|
threadId: delivery.threadId,
|
||||||
|
payloads: payloadsForDelivery,
|
||||||
|
agentId: params.agentId,
|
||||||
|
identity,
|
||||||
|
bestEffort: params.deliveryBestEffort,
|
||||||
|
deps: createOutboundSendDeps(params.deps),
|
||||||
|
abortSignal: params.abortSignal,
|
||||||
|
});
|
||||||
|
delivered = deliveryResults.length > 0;
|
||||||
|
return null;
|
||||||
|
} catch (err) {
|
||||||
|
if (!params.deliveryBestEffort) {
|
||||||
|
return params.withRunSession({
|
||||||
|
status: "error",
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
error: String(err),
|
||||||
|
...params.telemetry,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const deliverViaAnnounce = async (
|
||||||
|
delivery: SuccessfulDeliveryTarget,
|
||||||
|
): Promise<RunCronAgentTurnResult | null> => {
|
||||||
|
if (!synthesizedText) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
const announceMainSessionKey = resolveAgentMainSessionKey({
|
||||||
|
cfg: params.cfg,
|
||||||
|
agentId: params.agentId,
|
||||||
|
});
|
||||||
|
const announceSessionKey = await resolveCronAnnounceSessionKey({
|
||||||
|
cfg: params.cfgWithAgentDefaults,
|
||||||
|
agentId: params.agentId,
|
||||||
|
fallbackSessionKey: announceMainSessionKey,
|
||||||
|
delivery: {
|
||||||
|
channel: delivery.channel,
|
||||||
|
to: delivery.to,
|
||||||
|
accountId: delivery.accountId,
|
||||||
|
threadId: delivery.threadId,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const taskLabel =
|
||||||
|
typeof params.job.name === "string" && params.job.name.trim()
|
||||||
|
? params.job.name.trim()
|
||||||
|
: `cron:${params.job.id}`;
|
||||||
|
const initialSynthesizedText = synthesizedText.trim();
|
||||||
|
let activeSubagentRuns = countActiveDescendantRuns(params.agentSessionKey);
|
||||||
|
const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText);
|
||||||
|
const hadActiveDescendants = activeSubagentRuns > 0;
|
||||||
|
if (activeSubagentRuns > 0 || expectedSubagentFollowup) {
|
||||||
|
let finalReply = await waitForDescendantSubagentSummary({
|
||||||
|
sessionKey: params.agentSessionKey,
|
||||||
|
initialReply: initialSynthesizedText,
|
||||||
|
timeoutMs: params.timeoutMs,
|
||||||
|
observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup,
|
||||||
|
});
|
||||||
|
activeSubagentRuns = countActiveDescendantRuns(params.agentSessionKey);
|
||||||
|
if (
|
||||||
|
!finalReply &&
|
||||||
|
activeSubagentRuns === 0 &&
|
||||||
|
(hadActiveDescendants || expectedSubagentFollowup)
|
||||||
|
) {
|
||||||
|
finalReply = await readDescendantSubagentFallbackReply({
|
||||||
|
sessionKey: params.agentSessionKey,
|
||||||
|
runStartedAt: params.runStartedAt,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (finalReply && activeSubagentRuns === 0) {
|
||||||
|
outputText = finalReply;
|
||||||
|
summary = pickSummaryFromOutput(finalReply) ?? summary;
|
||||||
|
synthesizedText = finalReply;
|
||||||
|
deliveryPayloads = [{ text: finalReply }];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (activeSubagentRuns > 0) {
|
||||||
|
// Parent orchestration is still in progress; avoid announcing a partial
|
||||||
|
// update to the main requester.
|
||||||
|
return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry });
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
(hadActiveDescendants || expectedSubagentFollowup) &&
|
||||||
|
synthesizedText.trim() === initialSynthesizedText &&
|
||||||
|
isLikelyInterimCronMessage(initialSynthesizedText) &&
|
||||||
|
initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase()
|
||||||
|
) {
|
||||||
|
// Descendants existed but no post-orchestration synthesis arrived, so
|
||||||
|
// suppress stale parent text like "on it, pulling everything together".
|
||||||
|
return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry });
|
||||||
|
}
|
||||||
|
if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
|
||||||
|
return params.withRunSession({
|
||||||
|
status: "ok",
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
delivered: true,
|
||||||
|
...params.telemetry,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (params.isAborted()) {
|
||||||
|
return params.withRunSession({
|
||||||
|
status: "error",
|
||||||
|
error: params.abortReason(),
|
||||||
|
...params.telemetry,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
const didAnnounce = await runSubagentAnnounceFlow({
|
||||||
|
childSessionKey: params.agentSessionKey,
|
||||||
|
childRunId: `${params.job.id}:${params.runSessionId}:${params.runStartedAt}`,
|
||||||
|
requesterSessionKey: announceSessionKey,
|
||||||
|
requesterOrigin: {
|
||||||
|
channel: delivery.channel,
|
||||||
|
to: delivery.to,
|
||||||
|
accountId: delivery.accountId,
|
||||||
|
threadId: delivery.threadId,
|
||||||
|
},
|
||||||
|
requesterDisplayKey: announceSessionKey,
|
||||||
|
task: taskLabel,
|
||||||
|
timeoutMs: params.timeoutMs,
|
||||||
|
cleanup: params.job.deleteAfterRun ? "delete" : "keep",
|
||||||
|
roundOneReply: synthesizedText,
|
||||||
|
// Keep delivery outcome truthful for cron state: if outbound send fails,
|
||||||
|
// announce flow must report false so caller can apply best-effort policy.
|
||||||
|
bestEffortDeliver: false,
|
||||||
|
waitForCompletion: false,
|
||||||
|
startedAt: params.runStartedAt,
|
||||||
|
endedAt: params.runEndedAt,
|
||||||
|
outcome: { status: "ok" },
|
||||||
|
announceType: "cron job",
|
||||||
|
signal: params.abortSignal,
|
||||||
|
});
|
||||||
|
if (didAnnounce) {
|
||||||
|
delivered = true;
|
||||||
|
} else {
|
||||||
|
const message = "cron announce delivery failed";
|
||||||
|
if (!params.deliveryBestEffort) {
|
||||||
|
return params.withRunSession({
|
||||||
|
status: "error",
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
error: message,
|
||||||
|
...params.telemetry,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
logWarn(`[cron:${params.job.id}] ${message}`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
if (!params.deliveryBestEffort) {
|
||||||
|
return params.withRunSession({
|
||||||
|
status: "error",
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
error: String(err),
|
||||||
|
...params.telemetry,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
logWarn(`[cron:${params.job.id}] ${String(err)}`);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (
|
||||||
|
params.deliveryRequested &&
|
||||||
|
!params.skipHeartbeatDelivery &&
|
||||||
|
!params.skipMessagingToolDelivery
|
||||||
|
) {
|
||||||
|
if (!params.resolvedDelivery.ok) {
|
||||||
|
if (!params.deliveryBestEffort) {
|
||||||
|
return {
|
||||||
|
result: failDeliveryTarget(params.resolvedDelivery.error.message),
|
||||||
|
delivered,
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
synthesizedText,
|
||||||
|
deliveryPayloads,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
logWarn(`[cron:${params.job.id}] ${params.resolvedDelivery.error.message}`);
|
||||||
|
return {
|
||||||
|
result: params.withRunSession({
|
||||||
|
status: "ok",
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
...params.telemetry,
|
||||||
|
}),
|
||||||
|
delivered,
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
synthesizedText,
|
||||||
|
deliveryPayloads,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Route text-only cron announce output back through the main session so it
|
||||||
|
// follows the same system-message injection path as subagent completions.
|
||||||
|
// Keep direct outbound delivery only for structured payloads (media/channel
|
||||||
|
// data), which cannot be represented by the shared announce flow.
|
||||||
|
//
|
||||||
|
// Forum/topic targets should also use direct delivery. Announce flow can
|
||||||
|
// be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which
|
||||||
|
// silently drops cron output for topic-bound sessions.
|
||||||
|
const useDirectDelivery =
|
||||||
|
params.deliveryPayloadHasStructuredContent || params.resolvedDelivery.threadId != null;
|
||||||
|
if (useDirectDelivery) {
|
||||||
|
const directResult = await deliverViaDirect(params.resolvedDelivery);
|
||||||
|
if (directResult) {
|
||||||
|
return {
|
||||||
|
result: directResult,
|
||||||
|
delivered,
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
synthesizedText,
|
||||||
|
deliveryPayloads,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
const announceResult = await deliverViaAnnounce(params.resolvedDelivery);
|
||||||
|
if (announceResult) {
|
||||||
|
return {
|
||||||
|
result: announceResult,
|
||||||
|
delivered,
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
synthesizedText,
|
||||||
|
deliveryPayloads,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
delivered,
|
||||||
|
summary,
|
||||||
|
outputText,
|
||||||
|
synthesizedText,
|
||||||
|
deliveryPayloads,
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -21,10 +21,7 @@ import {
|
|||||||
resolveHooksGmailModel,
|
resolveHooksGmailModel,
|
||||||
resolveThinkingDefault,
|
resolveThinkingDefault,
|
||||||
} from "../../agents/model-selection.js";
|
} from "../../agents/model-selection.js";
|
||||||
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js";
|
|
||||||
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
||||||
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
|
|
||||||
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
|
|
||||||
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
|
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
|
||||||
import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js";
|
import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js";
|
||||||
import { ensureAgentWorkspace } from "../../agents/workspace.js";
|
import { ensureAgentWorkspace } from "../../agents/workspace.js";
|
||||||
@@ -33,19 +30,11 @@ import {
|
|||||||
normalizeVerboseLevel,
|
normalizeVerboseLevel,
|
||||||
supportsXHighThinking,
|
supportsXHighThinking,
|
||||||
} from "../../auto-reply/thinking.js";
|
} from "../../auto-reply/thinking.js";
|
||||||
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
|
import type { CliDeps } from "../../cli/outbound-send-deps.js";
|
||||||
import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
|
|
||||||
import type { OpenClawConfig } from "../../config/config.js";
|
import type { OpenClawConfig } from "../../config/config.js";
|
||||||
import {
|
import { resolveSessionTranscriptPath, updateSessionStore } from "../../config/sessions.js";
|
||||||
resolveAgentMainSessionKey,
|
|
||||||
resolveSessionTranscriptPath,
|
|
||||||
updateSessionStore,
|
|
||||||
} from "../../config/sessions.js";
|
|
||||||
import type { AgentDefaultsConfig } from "../../config/types.js";
|
import type { AgentDefaultsConfig } from "../../config/types.js";
|
||||||
import { registerAgentRunContext } from "../../infra/agent-events.js";
|
import { registerAgentRunContext } from "../../infra/agent-events.js";
|
||||||
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
|
|
||||||
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
|
|
||||||
import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js";
|
|
||||||
import { logWarn } from "../../logger.js";
|
import { logWarn } from "../../logger.js";
|
||||||
import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js";
|
import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js";
|
||||||
import {
|
import {
|
||||||
@@ -56,6 +45,11 @@ import {
|
|||||||
} from "../../security/external-content.js";
|
} from "../../security/external-content.js";
|
||||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||||
import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js";
|
import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js";
|
||||||
|
import {
|
||||||
|
dispatchCronDelivery,
|
||||||
|
matchesMessagingToolDeliveryTarget,
|
||||||
|
resolveCronDeliveryBestEffort,
|
||||||
|
} from "./delivery-dispatch.js";
|
||||||
import { resolveDeliveryTarget } from "./delivery-target.js";
|
import { resolveDeliveryTarget } from "./delivery-target.js";
|
||||||
import {
|
import {
|
||||||
isHeartbeatOnlyResponse,
|
isHeartbeatOnlyResponse,
|
||||||
@@ -67,74 +61,6 @@ import {
|
|||||||
} from "./helpers.js";
|
} from "./helpers.js";
|
||||||
import { resolveCronSession } from "./session.js";
|
import { resolveCronSession } from "./session.js";
|
||||||
import { resolveCronSkillsSnapshot } from "./skills-snapshot.js";
|
import { resolveCronSkillsSnapshot } from "./skills-snapshot.js";
|
||||||
import {
|
|
||||||
expectsSubagentFollowup,
|
|
||||||
isLikelyInterimCronMessage,
|
|
||||||
readDescendantSubagentFallbackReply,
|
|
||||||
waitForDescendantSubagentSummary,
|
|
||||||
} from "./subagent-followup.js";
|
|
||||||
|
|
||||||
function matchesMessagingToolDeliveryTarget(
|
|
||||||
target: MessagingToolSend,
|
|
||||||
delivery: { channel?: string; to?: string; accountId?: string },
|
|
||||||
): boolean {
|
|
||||||
if (!delivery.channel || !delivery.to || !target.to) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const channel = delivery.channel.trim().toLowerCase();
|
|
||||||
const provider = target.provider?.trim().toLowerCase();
|
|
||||||
if (provider && provider !== "message" && provider !== channel) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (target.accountId && delivery.accountId && target.accountId !== delivery.accountId) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return target.to === delivery.to;
|
|
||||||
}
|
|
||||||
|
|
||||||
function resolveCronDeliveryBestEffort(job: CronJob): boolean {
|
|
||||||
if (typeof job.delivery?.bestEffort === "boolean") {
|
|
||||||
return job.delivery.bestEffort;
|
|
||||||
}
|
|
||||||
if (job.payload.kind === "agentTurn" && typeof job.payload.bestEffortDeliver === "boolean") {
|
|
||||||
return job.payload.bestEffortDeliver;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function resolveCronAnnounceSessionKey(params: {
|
|
||||||
cfg: OpenClawConfig;
|
|
||||||
agentId: string;
|
|
||||||
fallbackSessionKey: string;
|
|
||||||
delivery: {
|
|
||||||
channel: Parameters<typeof resolveOutboundSessionRoute>[0]["channel"];
|
|
||||||
to?: string;
|
|
||||||
accountId?: string;
|
|
||||||
threadId?: string | number;
|
|
||||||
};
|
|
||||||
}): Promise<string> {
|
|
||||||
const to = params.delivery.to?.trim();
|
|
||||||
if (!to) {
|
|
||||||
return params.fallbackSessionKey;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
const route = await resolveOutboundSessionRoute({
|
|
||||||
cfg: params.cfg,
|
|
||||||
channel: params.delivery.channel,
|
|
||||||
agentId: params.agentId,
|
|
||||||
accountId: params.delivery.accountId,
|
|
||||||
target: to,
|
|
||||||
threadId: params.delivery.threadId,
|
|
||||||
});
|
|
||||||
const resolved = route?.sessionKey?.trim();
|
|
||||||
if (resolved) {
|
|
||||||
return resolved;
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// Fall back to main session routing if announce session resolution fails.
|
|
||||||
}
|
|
||||||
return params.fallbackSessionKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
export type RunCronAgentTurnResult = {
|
export type RunCronAgentTurnResult = {
|
||||||
/** Last non-empty agent text output (not truncated). */
|
/** Last non-empty agent text output (not truncated). */
|
||||||
@@ -632,228 +558,39 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
// `true` means we confirmed at least one outbound send reached the target.
|
const deliveryResult = await dispatchCronDelivery({
|
||||||
// Keep this strict so timer fallback can safely decide whether to wake main.
|
cfg: params.cfg,
|
||||||
let delivered = skipMessagingToolDelivery;
|
cfgWithAgentDefaults,
|
||||||
type SuccessfulDeliveryTarget = Extract<
|
deps: params.deps,
|
||||||
Awaited<ReturnType<typeof resolveDeliveryTarget>>,
|
job: params.job,
|
||||||
{ ok: true }
|
agentId,
|
||||||
>;
|
agentSessionKey,
|
||||||
const failDeliveryTarget = (error: string) =>
|
runSessionId,
|
||||||
withRunSession({
|
runStartedAt,
|
||||||
status: "error",
|
runEndedAt,
|
||||||
error,
|
timeoutMs,
|
||||||
errorKind: "delivery-target",
|
resolvedDelivery,
|
||||||
summary,
|
deliveryRequested,
|
||||||
outputText,
|
skipHeartbeatDelivery,
|
||||||
...telemetry,
|
skipMessagingToolDelivery,
|
||||||
});
|
deliveryBestEffort,
|
||||||
const deliverViaDirect = async (
|
deliveryPayloadHasStructuredContent,
|
||||||
delivery: SuccessfulDeliveryTarget,
|
deliveryPayloads,
|
||||||
): Promise<RunCronAgentTurnResult | null> => {
|
synthesizedText,
|
||||||
const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId);
|
summary,
|
||||||
try {
|
outputText,
|
||||||
const payloadsForDelivery =
|
telemetry,
|
||||||
deliveryPayloads.length > 0
|
abortSignal,
|
||||||
? deliveryPayloads
|
isAborted,
|
||||||
: synthesizedText
|
abortReason,
|
||||||
? [{ text: synthesizedText }]
|
withRunSession,
|
||||||
: [];
|
});
|
||||||
if (payloadsForDelivery.length === 0) {
|
if (deliveryResult.result) {
|
||||||
return null;
|
return deliveryResult.result;
|
||||||
}
|
|
||||||
if (isAborted()) {
|
|
||||||
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
|
|
||||||
}
|
|
||||||
const deliveryResults = await deliverOutboundPayloads({
|
|
||||||
cfg: cfgWithAgentDefaults,
|
|
||||||
channel: delivery.channel,
|
|
||||||
to: delivery.to,
|
|
||||||
accountId: delivery.accountId,
|
|
||||||
threadId: delivery.threadId,
|
|
||||||
payloads: payloadsForDelivery,
|
|
||||||
agentId,
|
|
||||||
identity,
|
|
||||||
bestEffort: deliveryBestEffort,
|
|
||||||
deps: createOutboundSendDeps(params.deps),
|
|
||||||
abortSignal,
|
|
||||||
});
|
|
||||||
delivered = deliveryResults.length > 0;
|
|
||||||
return null;
|
|
||||||
} catch (err) {
|
|
||||||
if (!deliveryBestEffort) {
|
|
||||||
return withRunSession({
|
|
||||||
status: "error",
|
|
||||||
summary,
|
|
||||||
outputText,
|
|
||||||
error: String(err),
|
|
||||||
...telemetry,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const deliverViaAnnounce = async (
|
|
||||||
delivery: SuccessfulDeliveryTarget,
|
|
||||||
): Promise<RunCronAgentTurnResult | null> => {
|
|
||||||
if (!synthesizedText) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
const announceMainSessionKey = resolveAgentMainSessionKey({
|
|
||||||
cfg: params.cfg,
|
|
||||||
agentId,
|
|
||||||
});
|
|
||||||
const announceSessionKey = await resolveCronAnnounceSessionKey({
|
|
||||||
cfg: cfgWithAgentDefaults,
|
|
||||||
agentId,
|
|
||||||
fallbackSessionKey: announceMainSessionKey,
|
|
||||||
delivery: {
|
|
||||||
channel: delivery.channel,
|
|
||||||
to: delivery.to,
|
|
||||||
accountId: delivery.accountId,
|
|
||||||
threadId: delivery.threadId,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
const taskLabel =
|
|
||||||
typeof params.job.name === "string" && params.job.name.trim()
|
|
||||||
? params.job.name.trim()
|
|
||||||
: `cron:${params.job.id}`;
|
|
||||||
const initialSynthesizedText = synthesizedText.trim();
|
|
||||||
let activeSubagentRuns = countActiveDescendantRuns(agentSessionKey);
|
|
||||||
const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText);
|
|
||||||
const hadActiveDescendants = activeSubagentRuns > 0;
|
|
||||||
if (activeSubagentRuns > 0 || expectedSubagentFollowup) {
|
|
||||||
let finalReply = await waitForDescendantSubagentSummary({
|
|
||||||
sessionKey: agentSessionKey,
|
|
||||||
initialReply: initialSynthesizedText,
|
|
||||||
timeoutMs,
|
|
||||||
observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup,
|
|
||||||
});
|
|
||||||
activeSubagentRuns = countActiveDescendantRuns(agentSessionKey);
|
|
||||||
if (
|
|
||||||
!finalReply &&
|
|
||||||
activeSubagentRuns === 0 &&
|
|
||||||
(hadActiveDescendants || expectedSubagentFollowup)
|
|
||||||
) {
|
|
||||||
finalReply = await readDescendantSubagentFallbackReply({
|
|
||||||
sessionKey: agentSessionKey,
|
|
||||||
runStartedAt,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (finalReply && activeSubagentRuns === 0) {
|
|
||||||
outputText = finalReply;
|
|
||||||
summary = pickSummaryFromOutput(finalReply) ?? summary;
|
|
||||||
synthesizedText = finalReply;
|
|
||||||
deliveryPayloads = [{ text: finalReply }];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (activeSubagentRuns > 0) {
|
|
||||||
// Parent orchestration is still in progress; avoid announcing a partial
|
|
||||||
// update to the main requester.
|
|
||||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
|
||||||
}
|
|
||||||
if (
|
|
||||||
(hadActiveDescendants || expectedSubagentFollowup) &&
|
|
||||||
synthesizedText.trim() === initialSynthesizedText &&
|
|
||||||
isLikelyInterimCronMessage(initialSynthesizedText) &&
|
|
||||||
initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase()
|
|
||||||
) {
|
|
||||||
// Descendants existed but no post-orchestration synthesis arrived, so
|
|
||||||
// suppress stale parent text like "on it, pulling everything together".
|
|
||||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
|
||||||
}
|
|
||||||
if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
|
|
||||||
return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry });
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (isAborted()) {
|
|
||||||
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
|
|
||||||
}
|
|
||||||
const didAnnounce = await runSubagentAnnounceFlow({
|
|
||||||
childSessionKey: agentSessionKey,
|
|
||||||
childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`,
|
|
||||||
requesterSessionKey: announceSessionKey,
|
|
||||||
requesterOrigin: {
|
|
||||||
channel: delivery.channel,
|
|
||||||
to: delivery.to,
|
|
||||||
accountId: delivery.accountId,
|
|
||||||
threadId: delivery.threadId,
|
|
||||||
},
|
|
||||||
requesterDisplayKey: announceSessionKey,
|
|
||||||
task: taskLabel,
|
|
||||||
timeoutMs,
|
|
||||||
cleanup: params.job.deleteAfterRun ? "delete" : "keep",
|
|
||||||
roundOneReply: synthesizedText,
|
|
||||||
// Keep delivery outcome truthful for cron state: if outbound send fails,
|
|
||||||
// announce flow must report false so caller can apply best-effort policy.
|
|
||||||
bestEffortDeliver: false,
|
|
||||||
waitForCompletion: false,
|
|
||||||
startedAt: runStartedAt,
|
|
||||||
endedAt: runEndedAt,
|
|
||||||
outcome: { status: "ok" },
|
|
||||||
announceType: "cron job",
|
|
||||||
signal: abortSignal,
|
|
||||||
});
|
|
||||||
if (didAnnounce) {
|
|
||||||
delivered = true;
|
|
||||||
} else {
|
|
||||||
const message = "cron announce delivery failed";
|
|
||||||
if (!deliveryBestEffort) {
|
|
||||||
return withRunSession({
|
|
||||||
status: "error",
|
|
||||||
summary,
|
|
||||||
outputText,
|
|
||||||
error: message,
|
|
||||||
...telemetry,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
logWarn(`[cron:${params.job.id}] ${message}`);
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
if (!deliveryBestEffort) {
|
|
||||||
return withRunSession({
|
|
||||||
status: "error",
|
|
||||||
summary,
|
|
||||||
outputText,
|
|
||||||
error: String(err),
|
|
||||||
...telemetry,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
logWarn(`[cron:${params.job.id}] ${String(err)}`);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
};
|
|
||||||
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
|
|
||||||
if (!resolvedDelivery.ok) {
|
|
||||||
if (!deliveryBestEffort) {
|
|
||||||
return failDeliveryTarget(resolvedDelivery.error.message);
|
|
||||||
}
|
|
||||||
logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`);
|
|
||||||
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
|
|
||||||
}
|
|
||||||
|
|
||||||
// Route text-only cron announce output back through the main session so it
|
|
||||||
// follows the same system-message injection path as subagent completions.
|
|
||||||
// Keep direct outbound delivery only for structured payloads (media/channel
|
|
||||||
// data), which cannot be represented by the shared announce flow.
|
|
||||||
//
|
|
||||||
// Forum/topic targets should also use direct delivery. Announce flow can
|
|
||||||
// be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which
|
|
||||||
// silently drops cron output for topic-bound sessions.
|
|
||||||
const useDirectDelivery =
|
|
||||||
deliveryPayloadHasStructuredContent || resolvedDelivery.threadId != null;
|
|
||||||
if (useDirectDelivery) {
|
|
||||||
const directResult = await deliverViaDirect(resolvedDelivery);
|
|
||||||
if (directResult) {
|
|
||||||
return directResult;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
const announceResult = await deliverViaAnnounce(resolvedDelivery);
|
|
||||||
if (announceResult) {
|
|
||||||
return announceResult;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
const delivered = deliveryResult.delivered;
|
||||||
|
summary = deliveryResult.summary;
|
||||||
|
outputText = deliveryResult.outputText;
|
||||||
|
|
||||||
return withRunSession({ status: "ok", summary, outputText, delivered, ...telemetry });
|
return withRunSession({ status: "ok", summary, outputText, delivered, ...telemetry });
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user