diff --git a/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts b/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts index 05e42222434..bfd09ca4beb 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts @@ -382,8 +382,9 @@ describe("trigger handling", () => { ); const text = Array.isArray(res) ? res[0]?.text : res?.text; expect(text).toContain("api-key"); - expect(text).toContain("****"); - expect(text).toContain("sk-t"); + expect(text).toMatch(/\u2026|\.{3}/); + expect(text).toContain("sk-tes"); + expect(text).toContain("abcdef"); expect(text).not.toContain("1234567890abcdef"); expect(text).toContain("(anthropic:work)"); expect(text).not.toContain("mixed"); diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index fa474beddb6..85f657b4815 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -51,6 +51,76 @@ import { appendUntrustedContext } from "./untrusted-context.js"; type AgentDefaults = NonNullable["defaults"]; type ExecOverrides = Pick; +function buildResetSessionNoticeText(params: { + provider: string; + model: string; + defaultProvider: string; + defaultModel: string; +}): string { + const modelLabel = `${params.provider}/${params.model}`; + const defaultLabel = `${params.defaultProvider}/${params.defaultModel}`; + return modelLabel === defaultLabel + ? `✅ New session started · model: ${modelLabel}` + : `✅ New session started · model: ${modelLabel} (default: ${defaultLabel})`; +} + +function resolveResetSessionNoticeRoute(params: { + ctx: MsgContext; + command: ReturnType; +}): { + channel: Parameters[0]["channel"]; + to: string; +} | null { + const commandChannel = params.command.channel?.trim().toLowerCase(); + const fallbackChannel = + commandChannel && commandChannel !== "webchat" + ? (commandChannel as Parameters[0]["channel"]) + : undefined; + const channel = params.ctx.OriginatingChannel ?? fallbackChannel; + const to = params.ctx.OriginatingTo ?? params.command.from ?? params.command.to; + if (!channel || channel === "webchat" || !to) { + return null; + } + return { channel, to }; +} + +async function sendResetSessionNotice(params: { + ctx: MsgContext; + command: ReturnType; + sessionKey: string; + cfg: OpenClawConfig; + accountId: string | undefined; + threadId: string | number | undefined; + provider: string; + model: string; + defaultProvider: string; + defaultModel: string; +}): Promise { + const route = resolveResetSessionNoticeRoute({ + ctx: params.ctx, + command: params.command, + }); + if (!route) { + return; + } + await routeReply({ + payload: { + text: buildResetSessionNoticeText({ + provider: params.provider, + model: params.model, + defaultProvider: params.defaultProvider, + defaultModel: params.defaultModel, + }), + }, + channel: route.channel, + to: route.to, + sessionKey: params.sessionKey, + accountId: params.accountId, + threadId: params.threadId, + cfg: params.cfg, + }); +} + type RunPreparedReplyParams = { ctx: MsgContext; sessionCtx: TemplateContext; @@ -318,26 +388,18 @@ export async function runPreparedReply( } } if (resetTriggered && command.isAuthorizedSender) { - // oxlint-disable-next-line typescript/no-explicit-any - const channel = ctx.OriginatingChannel || (command.channel as any); - const to = ctx.OriginatingTo || command.from || command.to; - if (channel && to) { - const modelLabel = `${provider}/${model}`; - const defaultLabel = `${defaultProvider}/${defaultModel}`; - const text = - modelLabel === defaultLabel - ? `✅ New session started · model: ${modelLabel}` - : `✅ New session started · model: ${modelLabel} (default: ${defaultLabel})`; - await routeReply({ - payload: { text }, - channel, - to, - sessionKey, - accountId: ctx.AccountId, - threadId: ctx.MessageThreadId, - cfg, - }); - } + await sendResetSessionNotice({ + ctx, + command, + sessionKey, + cfg, + accountId: ctx.AccountId, + threadId: ctx.MessageThreadId, + provider, + model, + defaultProvider, + defaultModel, + }); } const sessionIdFinal = sessionId ?? crypto.randomUUID(); const sessionFile = resolveSessionFilePath( diff --git a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts index d9b5f41b5d4..71a1df023c3 100644 --- a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts +++ b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts @@ -74,6 +74,48 @@ describe("runCronIsolatedAgentTurn", () => { setupIsolatedAgentTurnMocks({ fast: true }); }); + it("does not fan out telegram cron delivery across allowFrom entries", async () => { + await withTempCronHome(async (home) => { + const { storePath, deps } = await createTelegramDeliveryFixture(home); + mockEmbeddedAgentPayloads([ + { text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" }, + ]); + + const cfg = makeCfg(home, storePath, { + channels: { + telegram: { + botToken: "tok", + allowFrom: ["111", "222", "333"], + }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg, + deps, + job: { + ...makeJob({ + kind: "agentTurn", + message: "deliver once", + }), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + message: "deliver once", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(res.delivered).toBe(true); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "123", + "HEARTBEAT_OK", + expect.objectContaining({ accountId: undefined }), + ); + }); + }); + it("handles media heartbeat delivery and announce cleanup modes", async () => { await withTempCronHome(async (home) => { const { storePath, deps } = await createTelegramDeliveryFixture(home); diff --git a/src/cron/isolated-agent/delivery-target.test.ts b/src/cron/isolated-agent/delivery-target.test.ts index 6cc3cd9c4e8..ad1df42bb47 100644 --- a/src/cron/isolated-agent/delivery-target.test.ts +++ b/src/cron/isolated-agent/delivery-target.test.ts @@ -230,7 +230,11 @@ describe("resolveDeliveryTarget", () => { target: { channel: "last", to: undefined }, }); expect(result.channel).toBe("telegram"); - expect(result.error).toBeUndefined(); + expect(result.ok).toBe(false); + if (result.ok) { + throw new Error("expected unresolved delivery target"); + } + expect(result.error.message).toContain('No delivery target resolved for channel "telegram"'); }); it("returns an error when channel selection is ambiguous", async () => { @@ -245,7 +249,11 @@ describe("resolveDeliveryTarget", () => { }); expect(result.channel).toBeUndefined(); expect(result.to).toBeUndefined(); - expect(result.error?.message).toContain("Channel is required"); + expect(result.ok).toBe(false); + if (result.ok) { + throw new Error("expected ambiguous channel selection error"); + } + expect(result.error.message).toContain("Channel is required"); }); it("uses sessionKey thread entry before main session entry", async () => { @@ -289,6 +297,6 @@ describe("resolveDeliveryTarget", () => { expect(result.channel).toBe("telegram"); expect(result.to).toBe("987654"); - expect(result.error).toBeUndefined(); + expect(result.ok).toBe(true); }); }); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index a800b9ca6ed..0aa26188120 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -17,6 +17,25 @@ import { normalizeAgentId } from "../../routing/session-key.js"; import { resolveWhatsAppAccount } from "../../web/accounts.js"; import { normalizeWhatsAppTarget } from "../../whatsapp/normalize.js"; +export type DeliveryTargetResolution = + | { + ok: true; + channel: Exclude; + to: string; + accountId?: string; + threadId?: string | number; + mode: "explicit" | "implicit"; + } + | { + ok: false; + channel?: Exclude; + to?: string; + accountId?: string; + threadId?: string | number; + mode: "explicit" | "implicit"; + error: Error; + }; + export async function resolveDeliveryTarget( cfg: OpenClawConfig, agentId: string, @@ -25,14 +44,7 @@ export async function resolveDeliveryTarget( to?: string; sessionKey?: string; }, -): Promise<{ - channel?: Exclude; - to?: string; - accountId?: string; - threadId?: string | number; - mode: "explicit" | "implicit"; - error?: Error; -}> { +): Promise { const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined; const allowMismatchedLastTo = requestedChannel === "last"; @@ -114,23 +126,29 @@ export async function resolveDeliveryTarget( if (!channel) { return { + ok: false, channel: undefined, to: undefined, accountId, threadId, mode, - error: channelResolutionError, + error: + channelResolutionError ?? + new Error("Channel is required when delivery.channel=last has no previous channel."), }; } if (!toCandidate) { return { + ok: false, channel, to: undefined, accountId, threadId, mode, - error: channelResolutionError, + error: + channelResolutionError ?? + new Error(`No delivery target resolved for channel "${channel}". Set delivery.to.`), }; } @@ -163,12 +181,23 @@ export async function resolveDeliveryTarget( mode, allowFrom: allowFromOverride, }); + if (!docked.ok) { + return { + ok: false, + channel, + to: undefined, + accountId, + threadId, + mode, + error: docked.error, + }; + } return { + ok: true, channel, - to: docked.ok ? docked.to : undefined, + to: docked.to, accountId, threadId, mode, - error: docked.ok ? channelResolutionError : docked.error, }; } diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index bc46fcb18f8..01dae6ce295 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -635,6 +635,10 @@ export async function runCronIsolatedAgentTurn(params: { // `true` means we confirmed at least one outbound send reached the target. // Keep this strict so timer fallback can safely decide whether to wake main. let delivered = skipMessagingToolDelivery; + type SuccessfulDeliveryTarget = Extract< + Awaited>, + { ok: true } + >; const failDeliveryTarget = (error: string) => withRunSession({ status: "error", @@ -644,28 +648,189 @@ export async function runCronIsolatedAgentTurn(params: { outputText, ...telemetry, }); + const deliverViaDirect = async ( + delivery: SuccessfulDeliveryTarget, + ): Promise => { + const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId); + try { + const payloadsForDelivery = + deliveryPayloads.length > 0 + ? deliveryPayloads + : synthesizedText + ? [{ text: synthesizedText }] + : []; + if (payloadsForDelivery.length === 0) { + return null; + } + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason(), ...telemetry }); + } + const deliveryResults = await deliverOutboundPayloads({ + cfg: cfgWithAgentDefaults, + channel: delivery.channel, + to: delivery.to, + accountId: delivery.accountId, + threadId: delivery.threadId, + payloads: payloadsForDelivery, + agentId, + identity, + bestEffort: deliveryBestEffort, + deps: createOutboundSendDeps(params.deps), + abortSignal, + }); + delivered = deliveryResults.length > 0; + return null; + } catch (err) { + if (!deliveryBestEffort) { + return withRunSession({ + status: "error", + summary, + outputText, + error: String(err), + ...telemetry, + }); + } + return null; + } + }; + const deliverViaAnnounce = async ( + delivery: SuccessfulDeliveryTarget, + ): Promise => { + if (!synthesizedText) { + return null; + } + const announceMainSessionKey = resolveAgentMainSessionKey({ + cfg: params.cfg, + agentId, + }); + const announceSessionKey = await resolveCronAnnounceSessionKey({ + cfg: cfgWithAgentDefaults, + agentId, + fallbackSessionKey: announceMainSessionKey, + delivery: { + channel: delivery.channel, + to: delivery.to, + accountId: delivery.accountId, + threadId: delivery.threadId, + }, + }); + const taskLabel = + typeof params.job.name === "string" && params.job.name.trim() + ? params.job.name.trim() + : `cron:${params.job.id}`; + const initialSynthesizedText = synthesizedText.trim(); + let activeSubagentRuns = countActiveDescendantRuns(agentSessionKey); + const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText); + const hadActiveDescendants = activeSubagentRuns > 0; + if (activeSubagentRuns > 0 || expectedSubagentFollowup) { + let finalReply = await waitForDescendantSubagentSummary({ + sessionKey: agentSessionKey, + initialReply: initialSynthesizedText, + timeoutMs, + observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup, + }); + activeSubagentRuns = countActiveDescendantRuns(agentSessionKey); + if ( + !finalReply && + activeSubagentRuns === 0 && + (hadActiveDescendants || expectedSubagentFollowup) + ) { + finalReply = await readDescendantSubagentFallbackReply({ + sessionKey: agentSessionKey, + runStartedAt, + }); + } + if (finalReply && activeSubagentRuns === 0) { + outputText = finalReply; + summary = pickSummaryFromOutput(finalReply) ?? summary; + synthesizedText = finalReply; + deliveryPayloads = [{ text: finalReply }]; + } + } + if (activeSubagentRuns > 0) { + // Parent orchestration is still in progress; avoid announcing a partial + // update to the main requester. + return withRunSession({ status: "ok", summary, outputText, ...telemetry }); + } + if ( + (hadActiveDescendants || expectedSubagentFollowup) && + synthesizedText.trim() === initialSynthesizedText && + isLikelyInterimCronMessage(initialSynthesizedText) && + initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() + ) { + // Descendants existed but no post-orchestration synthesis arrived, so + // suppress stale parent text like "on it, pulling everything together". + return withRunSession({ status: "ok", summary, outputText, ...telemetry }); + } + if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { + return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry }); + } + try { + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason(), ...telemetry }); + } + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: agentSessionKey, + childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`, + requesterSessionKey: announceSessionKey, + requesterOrigin: { + channel: delivery.channel, + to: delivery.to, + accountId: delivery.accountId, + threadId: delivery.threadId, + }, + requesterDisplayKey: announceSessionKey, + task: taskLabel, + timeoutMs, + cleanup: params.job.deleteAfterRun ? "delete" : "keep", + roundOneReply: synthesizedText, + // Keep delivery outcome truthful for cron state: if outbound send fails, + // announce flow must report false so caller can apply best-effort policy. + bestEffortDeliver: false, + waitForCompletion: false, + startedAt: runStartedAt, + endedAt: runEndedAt, + outcome: { status: "ok" }, + announceType: "cron job", + signal: abortSignal, + }); + if (didAnnounce) { + delivered = true; + } else { + const message = "cron announce delivery failed"; + if (!deliveryBestEffort) { + return withRunSession({ + status: "error", + summary, + outputText, + error: message, + ...telemetry, + }); + } + logWarn(`[cron:${params.job.id}] ${message}`); + } + } catch (err) { + if (!deliveryBestEffort) { + return withRunSession({ + status: "error", + summary, + outputText, + error: String(err), + ...telemetry, + }); + } + logWarn(`[cron:${params.job.id}] ${String(err)}`); + } + return null; + }; if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) { - if (resolvedDelivery.error) { + if (!resolvedDelivery.ok) { if (!deliveryBestEffort) { return failDeliveryTarget(resolvedDelivery.error.message); } logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`); return withRunSession({ status: "ok", summary, outputText, ...telemetry }); } - const failOrWarnMissingDeliveryField = (message: string) => { - if (!deliveryBestEffort) { - return failDeliveryTarget(message); - } - logWarn(`[cron:${params.job.id}] ${message}`); - return withRunSession({ status: "ok", summary, outputText, ...telemetry }); - }; - if (!resolvedDelivery.channel) { - return failOrWarnMissingDeliveryField("cron delivery channel is missing"); - } - if (!resolvedDelivery.to) { - return failOrWarnMissingDeliveryField("cron delivery target is missing"); - } - const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId); // Route text-only cron announce output back through the main session so it // follows the same system-message injection path as subagent completions. @@ -678,165 +843,14 @@ export async function runCronIsolatedAgentTurn(params: { const useDirectDelivery = deliveryPayloadHasStructuredContent || resolvedDelivery.threadId != null; if (useDirectDelivery) { - try { - const payloadsForDelivery = - deliveryPayloads.length > 0 - ? deliveryPayloads - : synthesizedText - ? [{ text: synthesizedText }] - : []; - if (payloadsForDelivery.length > 0) { - if (isAborted()) { - return withRunSession({ status: "error", error: abortReason(), ...telemetry }); - } - const deliveryResults = await deliverOutboundPayloads({ - cfg: cfgWithAgentDefaults, - channel: resolvedDelivery.channel, - to: resolvedDelivery.to, - accountId: resolvedDelivery.accountId, - threadId: resolvedDelivery.threadId, - payloads: payloadsForDelivery, - agentId, - identity, - bestEffort: deliveryBestEffort, - deps: createOutboundSendDeps(params.deps), - abortSignal, - }); - delivered = deliveryResults.length > 0; - } - } catch (err) { - if (!deliveryBestEffort) { - return withRunSession({ - status: "error", - summary, - outputText, - error: String(err), - ...telemetry, - }); - } + const directResult = await deliverViaDirect(resolvedDelivery); + if (directResult) { + return directResult; } - } else if (synthesizedText) { - const announceMainSessionKey = resolveAgentMainSessionKey({ - cfg: params.cfg, - agentId, - }); - const announceSessionKey = await resolveCronAnnounceSessionKey({ - cfg: cfgWithAgentDefaults, - agentId, - fallbackSessionKey: announceMainSessionKey, - delivery: { - channel: resolvedDelivery.channel, - to: resolvedDelivery.to, - accountId: resolvedDelivery.accountId, - threadId: resolvedDelivery.threadId, - }, - }); - const taskLabel = - typeof params.job.name === "string" && params.job.name.trim() - ? params.job.name.trim() - : `cron:${params.job.id}`; - const initialSynthesizedText = synthesizedText.trim(); - let activeSubagentRuns = countActiveDescendantRuns(agentSessionKey); - const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText); - const hadActiveDescendants = activeSubagentRuns > 0; - if (activeSubagentRuns > 0 || expectedSubagentFollowup) { - let finalReply = await waitForDescendantSubagentSummary({ - sessionKey: agentSessionKey, - initialReply: initialSynthesizedText, - timeoutMs, - observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup, - }); - activeSubagentRuns = countActiveDescendantRuns(agentSessionKey); - if ( - !finalReply && - activeSubagentRuns === 0 && - (hadActiveDescendants || expectedSubagentFollowup) - ) { - finalReply = await readDescendantSubagentFallbackReply({ - sessionKey: agentSessionKey, - runStartedAt, - }); - } - if (finalReply && activeSubagentRuns === 0) { - outputText = finalReply; - summary = pickSummaryFromOutput(finalReply) ?? summary; - synthesizedText = finalReply; - deliveryPayloads = [{ text: finalReply }]; - } - } - if (activeSubagentRuns > 0) { - // Parent orchestration is still in progress; avoid announcing a partial - // update to the main requester. - return withRunSession({ status: "ok", summary, outputText, ...telemetry }); - } - if ( - (hadActiveDescendants || expectedSubagentFollowup) && - synthesizedText.trim() === initialSynthesizedText && - isLikelyInterimCronMessage(initialSynthesizedText) && - initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() - ) { - // Descendants existed but no post-orchestration synthesis arrived, so - // suppress stale parent text like "on it, pulling everything together". - return withRunSession({ status: "ok", summary, outputText, ...telemetry }); - } - if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { - return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry }); - } - try { - if (isAborted()) { - return withRunSession({ status: "error", error: abortReason(), ...telemetry }); - } - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: agentSessionKey, - childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`, - requesterSessionKey: announceSessionKey, - requesterOrigin: { - channel: resolvedDelivery.channel, - to: resolvedDelivery.to, - accountId: resolvedDelivery.accountId, - threadId: resolvedDelivery.threadId, - }, - requesterDisplayKey: announceSessionKey, - task: taskLabel, - timeoutMs, - cleanup: params.job.deleteAfterRun ? "delete" : "keep", - roundOneReply: synthesizedText, - // Keep delivery outcome truthful for cron state: if outbound send fails, - // announce flow must report false so caller can apply best-effort policy. - bestEffortDeliver: false, - waitForCompletion: false, - startedAt: runStartedAt, - endedAt: runEndedAt, - outcome: { status: "ok" }, - announceType: "cron job", - signal: abortSignal, - }); - if (didAnnounce) { - delivered = true; - } else { - const message = "cron announce delivery failed"; - if (!deliveryBestEffort) { - return withRunSession({ - status: "error", - summary, - outputText, - error: message, - ...telemetry, - }); - } - logWarn(`[cron:${params.job.id}] ${message}`); - } - } catch (err) { - if (!deliveryBestEffort) { - return withRunSession({ - status: "error", - summary, - outputText, - error: String(err), - ...telemetry, - }); - } - logWarn(`[cron:${params.job.id}] ${String(err)}`); + } else { + const announceResult = await deliverViaAnnounce(resolvedDelivery); + if (announceResult) { + return announceResult; } } }