From a1628d89ec1f1831b4a6622cf32ecbe6991e37de Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 26 Feb 2026 21:03:23 +0100 Subject: [PATCH] refactor: unify outbound session context wiring --- src/auto-reply/reply/route-reply.ts | 8 +- src/commands/agent/delivery.ts | 15 ++- src/cron/isolated-agent/delivery-dispatch.ts | 9 +- src/gateway/server-methods/send.ts | 8 +- src/gateway/server-node-events.ts | 10 +- src/gateway/server-restart-sentinel.test.ts | 94 +++++++++++++++++++ src/gateway/server-restart-sentinel.ts | 9 +- src/infra/heartbeat-runner.ts | 12 ++- src/infra/outbound/deliver.test.ts | 39 +++++++- src/infra/outbound/deliver.ts | 28 ++++-- src/infra/outbound/message.ts | 9 +- src/infra/outbound/session-context.ts | 37 ++++++++ src/infra/session-maintenance-warning.test.ts | 93 ++++++++++++++++++ src/infra/session-maintenance-warning.ts | 9 +- 14 files changed, 344 insertions(+), 36 deletions(-) create mode 100644 src/gateway/server-restart-sentinel.test.ts create mode 100644 src/infra/outbound/session-context.ts create mode 100644 src/infra/session-maintenance-warning.test.ts diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index 081fd58a04a..e349c31e542 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -11,6 +11,7 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveEffectiveMessagesConfig } from "../../agents/identity.js"; import { normalizeChannelId } from "../../channels/plugins/index.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js"; import type { OriginatingChannelType } from "../templating.js"; import type { ReplyPayload } from "../types.js"; @@ -122,6 +123,11 @@ export async function routeReply(params: RouteReplyParams): Promise logDeliveryError(err), onPayload: logPayload, deps: createOutboundSendDeps(deps), - sessionKey: opts.sessionKey, }); } } diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index c6040a61774..b071f63172d 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -8,6 +8,7 @@ import { resolveAgentMainSessionKey } from "../../config/sessions.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; import { resolveOutboundSessionRoute } from "../../infra/outbound/outbound-session.js"; +import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; import { logWarn } from "../../logger.js"; import type { CronJob, CronRunTelemetry } from "../types.js"; import type { DeliveryTargetResolution } from "./delivery-target.js"; @@ -170,6 +171,11 @@ export async function dispatchCronDelivery( }); } deliveryAttempted = true; + const deliverySession = buildOutboundSessionContext({ + cfg: params.cfgWithAgentDefaults, + agentId: params.agentId, + sessionKey: params.agentSessionKey, + }); const deliveryResults = await deliverOutboundPayloads({ cfg: params.cfgWithAgentDefaults, channel: delivery.channel, @@ -177,12 +183,11 @@ export async function dispatchCronDelivery( accountId: delivery.accountId, threadId: delivery.threadId, payloads: payloadsForDelivery, - agentId: params.agentId, + session: deliverySession, identity, bestEffort: params.deliveryBestEffort, deps: createOutboundSendDeps(params.deps), abortSignal: params.abortSignal, - sessionKey: params.agentSessionKey, }); delivered = deliveryResults.length > 0; return null; diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index f398d94aae4..8585f1c84aa 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -10,6 +10,7 @@ import { resolveOutboundSessionRoute, } from "../../infra/outbound/outbound-session.js"; import { normalizeReplyPayloadsForDelivery } from "../../infra/outbound/payloads.js"; +import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; import { normalizePollInput } from "../../polls.js"; import { @@ -237,13 +238,18 @@ export const sendHandlers: GatewayRequestHandlers = { route: derivedRoute, }); } + const outboundSession = buildOutboundSessionContext({ + cfg, + agentId: effectiveAgentId, + sessionKey: providedSessionKey ?? derivedRoute?.sessionKey, + }); const results = await deliverOutboundPayloads({ cfg, channel: outboundChannel, to: resolved.to, accountId, payloads: [{ text: message, mediaUrl, mediaUrls }], - agentId: effectiveAgentId, + session: outboundSession, gifPlayback: request.gifPlayback, threadId: threadId ?? null, deps: outboundDeps, diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 7446d1e22cf..c191a836066 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -1,5 +1,4 @@ import { randomUUID } from "node:crypto"; -import { resolveSessionAgentId } from "../agents/agent-scope.js"; import { normalizeChannelId } from "../channels/plugins/index.js"; import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import { agentCommand } from "../commands/agent.js"; @@ -7,6 +6,7 @@ import { loadConfig } from "../config/config.js"; import { updateSessionStore } from "../config/sessions.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; +import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { registerApnsToken } from "../infra/push-apns.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; @@ -232,16 +232,18 @@ async function sendReceiptAck(params: { if (!resolved.ok) { throw new Error(String(resolved.error)); } - const agentId = resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg }); + const session = buildOutboundSessionContext({ + cfg: params.cfg, + sessionKey: params.sessionKey, + }); await deliverOutboundPayloads({ cfg: params.cfg, channel: params.channel, to: resolved.to, payloads: [{ text: params.text }], - agentId, + session, bestEffort: true, deps: createOutboundSendDeps(params.deps), - sessionKey: params.sessionKey, }); } diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts new file mode 100644 index 00000000000..187698b06ed --- /dev/null +++ b/src/gateway/server-restart-sentinel.test.ts @@ -0,0 +1,94 @@ +import { describe, expect, it, vi } from "vitest"; + +const mocks = vi.hoisted(() => ({ + resolveSessionAgentId: vi.fn(() => "agent-from-key"), + consumeRestartSentinel: vi.fn(async () => ({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + }, + })), + formatRestartSentinelMessage: vi.fn(() => "restart message"), + summarizeRestartSentinel: vi.fn(() => "restart summary"), + resolveMainSessionKeyFromConfig: vi.fn(() => "agent:main:main"), + parseSessionThreadInfo: vi.fn(() => ({ baseSessionKey: null, threadId: undefined })), + loadSessionEntry: vi.fn(() => ({ cfg: {}, entry: {} })), + resolveAnnounceTargetFromKey: vi.fn(() => null), + deliveryContextFromSession: vi.fn(() => undefined), + mergeDeliveryContext: vi.fn((a?: Record, b?: Record) => ({ + ...b, + ...a, + })), + normalizeChannelId: vi.fn((channel: string) => channel), + resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15550002" })), + deliverOutboundPayloads: vi.fn(async () => []), + enqueueSystemEvent: vi.fn(), +})); + +vi.mock("../agents/agent-scope.js", () => ({ + resolveSessionAgentId: mocks.resolveSessionAgentId, +})); + +vi.mock("../infra/restart-sentinel.js", () => ({ + consumeRestartSentinel: mocks.consumeRestartSentinel, + formatRestartSentinelMessage: mocks.formatRestartSentinelMessage, + summarizeRestartSentinel: mocks.summarizeRestartSentinel, +})); + +vi.mock("../config/sessions.js", () => ({ + resolveMainSessionKeyFromConfig: mocks.resolveMainSessionKeyFromConfig, +})); + +vi.mock("../config/sessions/delivery-info.js", () => ({ + parseSessionThreadInfo: mocks.parseSessionThreadInfo, +})); + +vi.mock("./session-utils.js", () => ({ + loadSessionEntry: mocks.loadSessionEntry, +})); + +vi.mock("../agents/tools/sessions-send-helpers.js", () => ({ + resolveAnnounceTargetFromKey: mocks.resolveAnnounceTargetFromKey, +})); + +vi.mock("../utils/delivery-context.js", () => ({ + deliveryContextFromSession: mocks.deliveryContextFromSession, + mergeDeliveryContext: mocks.mergeDeliveryContext, +})); + +vi.mock("../channels/plugins/index.js", () => ({ + normalizeChannelId: mocks.normalizeChannelId, +})); + +vi.mock("../infra/outbound/targets.js", () => ({ + resolveOutboundTarget: mocks.resolveOutboundTarget, +})); + +vi.mock("../infra/outbound/deliver.js", () => ({ + deliverOutboundPayloads: mocks.deliverOutboundPayloads, +})); + +vi.mock("../infra/system-events.js", () => ({ + enqueueSystemEvent: mocks.enqueueSystemEvent, +})); + +const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js"); + +describe("scheduleRestartSentinelWake", () => { + it("forwards session context to outbound delivery", async () => { + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "whatsapp", + to: "+15550002", + session: { key: "agent:main:main", agentId: "agent-from-key" }, + }), + ); + expect(mocks.enqueueSystemEvent).not.toHaveBeenCalled(); + }); +}); diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index bbbd506b772..e6191942dba 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -1,10 +1,10 @@ -import { resolveSessionAgentId } from "../agents/agent-scope.js"; import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js"; import { normalizeChannelId } from "../channels/plugins/index.js"; import type { CliDeps } from "../cli/deps.js"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; import { parseSessionThreadInfo } from "../config/sessions/delivery-info.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; +import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { consumeRestartSentinel, @@ -83,6 +83,10 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) { const isSlack = channel === "slack"; const replyToId = isSlack && threadId != null && threadId !== "" ? String(threadId) : undefined; const resolvedThreadId = isSlack ? undefined : threadId; + const outboundSession = buildOutboundSessionContext({ + cfg, + sessionKey, + }); try { await deliverOutboundPayloads({ @@ -93,9 +97,8 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) { replyToId, threadId: resolvedThreadId, payloads: [{ text: message }], - agentId: resolveSessionAgentId({ sessionKey, config: cfg }), + session: outboundSession, bestEffort: true, - sessionKey, }); } catch (err) { enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index bd43092b92a..056142c4056 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -60,6 +60,7 @@ import { } from "./heartbeat-wake.js"; import type { OutboundSendDeps } from "./outbound/deliver.js"; import { deliverOutboundPayloads } from "./outbound/deliver.js"; +import { buildOutboundSessionContext } from "./outbound/session-context.js"; import { resolveHeartbeatDeliveryTarget, resolveHeartbeatSenderContext, @@ -696,6 +697,11 @@ export async function runHeartbeatOnce(opts: { } const heartbeatOkText = responsePrefix ? `${responsePrefix} ${HEARTBEAT_TOKEN}` : HEARTBEAT_TOKEN; + const outboundSession = buildOutboundSessionContext({ + cfg, + agentId, + sessionKey, + }); const canAttemptHeartbeatOk = Boolean( visibility.showOk && delivery.channel !== "none" && delivery.to, ); @@ -721,9 +727,8 @@ export async function runHeartbeatOnce(opts: { accountId: delivery.accountId, threadId: delivery.threadId, payloads: [{ text: heartbeatOkText }], - agentId, + session: outboundSession, deps: opts.deps, - sessionKey, }); return true; }; @@ -915,7 +920,7 @@ export async function runHeartbeatOnce(opts: { channel: delivery.channel, to: delivery.to, accountId: deliveryAccountId, - agentId, + session: outboundSession, threadId: delivery.threadId, payloads: [ ...reasoningPayloads, @@ -929,7 +934,6 @@ export async function runHeartbeatOnce(opts: { ]), ], deps: opts.deps, - sessionKey, }); // Record last delivered heartbeat payload for dedupe. diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 94b5bee9891..b9c59f0e391 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -31,6 +31,9 @@ const queueMocks = vi.hoisted(() => ({ ackDelivery: vi.fn(async () => {}), failDelivery: vi.fn(async () => {}), })); +const logMocks = vi.hoisted(() => ({ + warn: vi.fn(), +})); vi.mock("../../config/sessions.js", async () => { const actual = await vi.importActual( @@ -53,6 +56,18 @@ vi.mock("./delivery-queue.js", () => ({ ackDelivery: queueMocks.ackDelivery, failDelivery: queueMocks.failDelivery, })); +vi.mock("../../logging/subsystem.js", () => ({ + createSubsystemLogger: () => { + const makeLogger = () => ({ + warn: logMocks.warn, + info: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + child: vi.fn(() => makeLogger()), + }); + return makeLogger(); + }, +})); const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js"); @@ -117,6 +132,7 @@ describe("deliverOutboundPayloads", () => { queueMocks.ackDelivery.mockResolvedValue(undefined); queueMocks.failDelivery.mockClear(); queueMocks.failDelivery.mockResolvedValue(undefined); + logMocks.warn.mockClear(); }); afterEach(() => { @@ -188,7 +204,7 @@ describe("deliverOutboundPayloads", () => { cfg: telegramChunkConfig, channel: "telegram", to: "123", - agentId: "work", + session: { agentId: "work" }, payloads: [{ text: "hi", mediaUrl: "file:///tmp/f.png" }], deps: { sendTelegram }, }); @@ -583,7 +599,7 @@ describe("deliverOutboundPayloads", () => { to: "+1555", payloads: [{ text: "hello" }], deps: { sendWhatsApp }, - sessionKey: "agent:main:main", + session: { key: "agent:main:main" }, }); expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1); @@ -603,6 +619,25 @@ describe("deliverOutboundPayloads", () => { expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1); }); + it("warns when session.agentId is set without a session key", async () => { + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + hookMocks.runner.hasHooks.mockReturnValue(true); + + await deliverOutboundPayloads({ + cfg: whatsappChunkConfig, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + deps: { sendWhatsApp }, + session: { agentId: "agent-main" }, + }); + + expect(logMocks.warn).toHaveBeenCalledWith( + "deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped", + expect.objectContaining({ channel: "whatsapp", to: "+1555", agentId: "agent-main" }), + ); + }); + it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => { const sendWhatsApp = vi .fn() diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index f071a25d048..76ea0e78736 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -20,6 +20,7 @@ import { import type { sendMessageDiscord } from "../../discord/send.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import type { sendMessageIMessage } from "../../imessage/send.js"; +import { createSubsystemLogger } from "../../logging/subsystem.js"; import { getAgentScopedMediaLocalRoots } from "../../media/local-roots.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js"; @@ -32,11 +33,14 @@ import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js" import type { OutboundIdentity } from "./identity.js"; import type { NormalizedOutboundPayload } from "./payloads.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; +import type { OutboundSessionContext } from "./session-context.js"; import type { OutboundChannel } from "./targets.js"; export type { NormalizedOutboundPayload } from "./payloads.js"; export { normalizeOutboundPayloads } from "./payloads.js"; +const log = createSubsystemLogger("outbound/deliver"); + type SendMatrixMessage = ( to: string, text: string, @@ -207,8 +211,8 @@ type DeliverOutboundPayloadsCoreParams = { bestEffort?: boolean; onError?: (err: unknown, payload: NormalizedOutboundPayload) => void; onPayload?: (payload: NormalizedOutboundPayload) => void; - /** Active agent id for media local-root scoping. */ - agentId?: string; + /** Session/agent context used for hooks and media local-root scoping. */ + session?: OutboundSessionContext; mirror?: { sessionKey: string; agentId?: string; @@ -216,8 +220,6 @@ type DeliverOutboundPayloadsCoreParams = { mediaUrls?: string[]; }; silent?: boolean; - /** Session key for internal hook dispatch (when `mirror` is not needed). */ - sessionKey?: string; }; type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & { @@ -296,7 +298,7 @@ async function deliverOutboundPayloadsCore( const sendSignal = params.deps?.sendSignal ?? sendMessageSignal; const mediaLocalRoots = getAgentScopedMediaLocalRoots( cfg, - params.agentId ?? params.mirror?.agentId, + params.session?.agentId ?? params.mirror?.agentId, ); const results: OutboundDeliveryResult[] = []; const handler = await createChannelHandler({ @@ -446,7 +448,21 @@ async function deliverOutboundPayloadsCore( return normalized ? [normalized] : []; }); const hookRunner = getGlobalHookRunner(); - const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.sessionKey; + const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key; + if ( + hookRunner?.hasHooks("message_sent") && + params.session?.agentId && + !sessionKeyForInternalHooks + ) { + log.warn( + "deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped", + { + channel, + to, + agentId: params.session.agentId, + }, + ); + } for (const payload of normalizedPayloads) { const payloadSummary: NormalizedOutboundPayload = { text: payload.text ?? "", diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index 1ae2a9246ae..9bee14f45d0 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -20,6 +20,7 @@ import { type OutboundSendDeps, } from "./deliver.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; +import { buildOutboundSessionContext } from "./session-context.js"; import { resolveOutboundTarget } from "./targets.js"; export type MessageGatewayOptions = { @@ -212,11 +213,16 @@ export async function sendMessage(params: MessageSendParams): Promise 0 ? trimmed : undefined; +} + +export function buildOutboundSessionContext(params: { + cfg: OpenClawConfig; + sessionKey?: string | null; + agentId?: string | null; +}): OutboundSessionContext | undefined { + const key = normalizeOptionalString(params.sessionKey); + const explicitAgentId = normalizeOptionalString(params.agentId); + const derivedAgentId = key + ? resolveSessionAgentId({ sessionKey: key, config: params.cfg }) + : undefined; + const agentId = explicitAgentId ?? derivedAgentId; + if (!key && !agentId) { + return undefined; + } + return { + ...(key ? { key } : {}), + ...(agentId ? { agentId } : {}), + }; +} diff --git a/src/infra/session-maintenance-warning.test.ts b/src/infra/session-maintenance-warning.test.ts new file mode 100644 index 00000000000..f0e9590c572 --- /dev/null +++ b/src/infra/session-maintenance-warning.test.ts @@ -0,0 +1,93 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const mocks = vi.hoisted(() => ({ + resolveSessionAgentId: vi.fn(() => "agent-from-key"), + resolveSessionDeliveryTarget: vi.fn(() => ({ + channel: "whatsapp", + to: "+15550001", + accountId: "acct-1", + threadId: "thread-1", + })), + normalizeMessageChannel: vi.fn((channel: string) => channel), + isDeliverableMessageChannel: vi.fn(() => true), + deliverOutboundPayloads: vi.fn(async () => []), + enqueueSystemEvent: vi.fn(), +})); + +vi.mock("../agents/agent-scope.js", () => ({ + resolveSessionAgentId: mocks.resolveSessionAgentId, +})); + +vi.mock("../utils/message-channel.js", () => ({ + normalizeMessageChannel: mocks.normalizeMessageChannel, + isDeliverableMessageChannel: mocks.isDeliverableMessageChannel, +})); + +vi.mock("./outbound/targets.js", () => ({ + resolveSessionDeliveryTarget: mocks.resolveSessionDeliveryTarget, +})); + +vi.mock("./outbound/deliver.js", () => ({ + deliverOutboundPayloads: mocks.deliverOutboundPayloads, +})); + +vi.mock("./system-events.js", () => ({ + enqueueSystemEvent: mocks.enqueueSystemEvent, +})); + +const { deliverSessionMaintenanceWarning } = await import("./session-maintenance-warning.js"); + +describe("deliverSessionMaintenanceWarning", () => { + let prevVitest: string | undefined; + let prevNodeEnv: string | undefined; + + beforeEach(() => { + prevVitest = process.env.VITEST; + prevNodeEnv = process.env.NODE_ENV; + delete process.env.VITEST; + process.env.NODE_ENV = "development"; + mocks.resolveSessionAgentId.mockClear(); + mocks.resolveSessionDeliveryTarget.mockClear(); + mocks.normalizeMessageChannel.mockClear(); + mocks.isDeliverableMessageChannel.mockClear(); + mocks.deliverOutboundPayloads.mockClear(); + mocks.enqueueSystemEvent.mockClear(); + }); + + afterEach(() => { + if (prevVitest === undefined) { + delete process.env.VITEST; + } else { + process.env.VITEST = prevVitest; + } + if (prevNodeEnv === undefined) { + delete process.env.NODE_ENV; + } else { + process.env.NODE_ENV = prevNodeEnv; + } + }); + + it("forwards session context to outbound delivery", async () => { + await deliverSessionMaintenanceWarning({ + cfg: {}, + sessionKey: "agent:main:main", + entry: {} as never, + warning: { + activeSessionKey: "agent:main:main", + pruneAfterMs: 1_000, + maxEntries: 100, + wouldPrune: true, + wouldCap: false, + } as never, + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "whatsapp", + to: "+15550001", + session: { key: "agent:main:main", agentId: "agent-from-key" }, + }), + ); + expect(mocks.enqueueSystemEvent).not.toHaveBeenCalled(); + }); +}); diff --git a/src/infra/session-maintenance-warning.ts b/src/infra/session-maintenance-warning.ts index 081b5c3a4fb..df803f88411 100644 --- a/src/infra/session-maintenance-warning.ts +++ b/src/infra/session-maintenance-warning.ts @@ -1,8 +1,8 @@ -import { resolveSessionAgentId } from "../agents/agent-scope.js"; import type { OpenClawConfig } from "../config/config.js"; import type { SessionEntry, SessionMaintenanceWarning } from "../config/sessions.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { isDeliverableMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js"; +import { buildOutboundSessionContext } from "./outbound/session-context.js"; import { resolveSessionDeliveryTarget } from "./outbound/targets.js"; import { enqueueSystemEvent } from "./system-events.js"; @@ -96,6 +96,10 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P try { const { deliverOutboundPayloads } = await import("./outbound/deliver.js"); + const outboundSession = buildOutboundSessionContext({ + cfg: params.cfg, + sessionKey: params.sessionKey, + }); await deliverOutboundPayloads({ cfg: params.cfg, channel, @@ -103,8 +107,7 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P accountId: target.accountId, threadId: target.threadId, payloads: [{ text }], - agentId: resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg }), - sessionKey: params.sessionKey, + session: outboundSession, }); } catch (err) { log.warn(`Failed to deliver session maintenance warning: ${String(err)}`);