refactor: unify outbound session context wiring

This commit is contained in:
Peter Steinberger
2026-02-26 21:03:23 +01:00
parent 8483e01a68
commit a1628d89ec
14 changed files with 344 additions and 36 deletions

View File

@@ -60,6 +60,7 @@ import {
} from "./heartbeat-wake.js";
import type { OutboundSendDeps } from "./outbound/deliver.js";
import { deliverOutboundPayloads } from "./outbound/deliver.js";
import { buildOutboundSessionContext } from "./outbound/session-context.js";
import {
resolveHeartbeatDeliveryTarget,
resolveHeartbeatSenderContext,
@@ -696,6 +697,11 @@ export async function runHeartbeatOnce(opts: {
}
const heartbeatOkText = responsePrefix ? `${responsePrefix} ${HEARTBEAT_TOKEN}` : HEARTBEAT_TOKEN;
const outboundSession = buildOutboundSessionContext({
cfg,
agentId,
sessionKey,
});
const canAttemptHeartbeatOk = Boolean(
visibility.showOk && delivery.channel !== "none" && delivery.to,
);
@@ -721,9 +727,8 @@ export async function runHeartbeatOnce(opts: {
accountId: delivery.accountId,
threadId: delivery.threadId,
payloads: [{ text: heartbeatOkText }],
agentId,
session: outboundSession,
deps: opts.deps,
sessionKey,
});
return true;
};
@@ -915,7 +920,7 @@ export async function runHeartbeatOnce(opts: {
channel: delivery.channel,
to: delivery.to,
accountId: deliveryAccountId,
agentId,
session: outboundSession,
threadId: delivery.threadId,
payloads: [
...reasoningPayloads,
@@ -929,7 +934,6 @@ export async function runHeartbeatOnce(opts: {
]),
],
deps: opts.deps,
sessionKey,
});
// Record last delivered heartbeat payload for dedupe.

View File

@@ -31,6 +31,9 @@ const queueMocks = vi.hoisted(() => ({
ackDelivery: vi.fn(async () => {}),
failDelivery: vi.fn(async () => {}),
}));
const logMocks = vi.hoisted(() => ({
warn: vi.fn(),
}));
vi.mock("../../config/sessions.js", async () => {
const actual = await vi.importActual<typeof import("../../config/sessions.js")>(
@@ -53,6 +56,18 @@ vi.mock("./delivery-queue.js", () => ({
ackDelivery: queueMocks.ackDelivery,
failDelivery: queueMocks.failDelivery,
}));
vi.mock("../../logging/subsystem.js", () => ({
createSubsystemLogger: () => {
const makeLogger = () => ({
warn: logMocks.warn,
info: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
child: vi.fn(() => makeLogger()),
});
return makeLogger();
},
}));
const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js");
@@ -117,6 +132,7 @@ describe("deliverOutboundPayloads", () => {
queueMocks.ackDelivery.mockResolvedValue(undefined);
queueMocks.failDelivery.mockClear();
queueMocks.failDelivery.mockResolvedValue(undefined);
logMocks.warn.mockClear();
});
afterEach(() => {
@@ -188,7 +204,7 @@ describe("deliverOutboundPayloads", () => {
cfg: telegramChunkConfig,
channel: "telegram",
to: "123",
agentId: "work",
session: { agentId: "work" },
payloads: [{ text: "hi", mediaUrl: "file:///tmp/f.png" }],
deps: { sendTelegram },
});
@@ -583,7 +599,7 @@ describe("deliverOutboundPayloads", () => {
to: "+1555",
payloads: [{ text: "hello" }],
deps: { sendWhatsApp },
sessionKey: "agent:main:main",
session: { key: "agent:main:main" },
});
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1);
@@ -603,6 +619,25 @@ describe("deliverOutboundPayloads", () => {
expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
});
it("warns when session.agentId is set without a session key", async () => {
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
hookMocks.runner.hasHooks.mockReturnValue(true);
await deliverOutboundPayloads({
cfg: whatsappChunkConfig,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hello" }],
deps: { sendWhatsApp },
session: { agentId: "agent-main" },
});
expect(logMocks.warn).toHaveBeenCalledWith(
"deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped",
expect.objectContaining({ channel: "whatsapp", to: "+1555", agentId: "agent-main" }),
);
});
it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => {
const sendWhatsApp = vi
.fn()

View File

@@ -20,6 +20,7 @@ import {
import type { sendMessageDiscord } from "../../discord/send.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import type { sendMessageIMessage } from "../../imessage/send.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { getAgentScopedMediaLocalRoots } from "../../media/local-roots.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js";
@@ -32,11 +33,14 @@ import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js"
import type { OutboundIdentity } from "./identity.js";
import type { NormalizedOutboundPayload } from "./payloads.js";
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
import type { OutboundSessionContext } from "./session-context.js";
import type { OutboundChannel } from "./targets.js";
export type { NormalizedOutboundPayload } from "./payloads.js";
export { normalizeOutboundPayloads } from "./payloads.js";
const log = createSubsystemLogger("outbound/deliver");
type SendMatrixMessage = (
to: string,
text: string,
@@ -207,8 +211,8 @@ type DeliverOutboundPayloadsCoreParams = {
bestEffort?: boolean;
onError?: (err: unknown, payload: NormalizedOutboundPayload) => void;
onPayload?: (payload: NormalizedOutboundPayload) => void;
/** Active agent id for media local-root scoping. */
agentId?: string;
/** Session/agent context used for hooks and media local-root scoping. */
session?: OutboundSessionContext;
mirror?: {
sessionKey: string;
agentId?: string;
@@ -216,8 +220,6 @@ type DeliverOutboundPayloadsCoreParams = {
mediaUrls?: string[];
};
silent?: boolean;
/** Session key for internal hook dispatch (when `mirror` is not needed). */
sessionKey?: string;
};
type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & {
@@ -296,7 +298,7 @@ async function deliverOutboundPayloadsCore(
const sendSignal = params.deps?.sendSignal ?? sendMessageSignal;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(
cfg,
params.agentId ?? params.mirror?.agentId,
params.session?.agentId ?? params.mirror?.agentId,
);
const results: OutboundDeliveryResult[] = [];
const handler = await createChannelHandler({
@@ -446,7 +448,21 @@ async function deliverOutboundPayloadsCore(
return normalized ? [normalized] : [];
});
const hookRunner = getGlobalHookRunner();
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.sessionKey;
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
if (
hookRunner?.hasHooks("message_sent") &&
params.session?.agentId &&
!sessionKeyForInternalHooks
) {
log.warn(
"deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped",
{
channel,
to,
agentId: params.session.agentId,
},
);
}
for (const payload of normalizedPayloads) {
const payloadSummary: NormalizedOutboundPayload = {
text: payload.text ?? "",

View File

@@ -20,6 +20,7 @@ import {
type OutboundSendDeps,
} from "./deliver.js";
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
import { buildOutboundSessionContext } from "./session-context.js";
import { resolveOutboundTarget } from "./targets.js";
export type MessageGatewayOptions = {
@@ -212,11 +213,16 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
throw resolvedTarget.error;
}
const outboundSession = buildOutboundSessionContext({
cfg,
agentId: params.agentId,
sessionKey: params.mirror?.sessionKey,
});
const results = await deliverOutboundPayloads({
cfg,
channel: outboundChannel,
to: resolvedTarget.to,
agentId: params.agentId,
session: outboundSession,
accountId: params.accountId,
payloads: normalizedPayloads,
replyToId: params.replyToId,
@@ -233,7 +239,6 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
mediaUrls: mirrorMediaUrls.length ? mirrorMediaUrls : undefined,
}
: undefined,
sessionKey: params.mirror?.sessionKey,
});
return {

View File

@@ -0,0 +1,37 @@
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import type { OpenClawConfig } from "../../config/config.js";
export type OutboundSessionContext = {
/** Canonical session key used for internal hook dispatch. */
key?: string;
/** Active agent id used for workspace-scoped media roots. */
agentId?: string;
};
function normalizeOptionalString(value?: string | null): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
export function buildOutboundSessionContext(params: {
cfg: OpenClawConfig;
sessionKey?: string | null;
agentId?: string | null;
}): OutboundSessionContext | undefined {
const key = normalizeOptionalString(params.sessionKey);
const explicitAgentId = normalizeOptionalString(params.agentId);
const derivedAgentId = key
? resolveSessionAgentId({ sessionKey: key, config: params.cfg })
: undefined;
const agentId = explicitAgentId ?? derivedAgentId;
if (!key && !agentId) {
return undefined;
}
return {
...(key ? { key } : {}),
...(agentId ? { agentId } : {}),
};
}

View File

@@ -0,0 +1,93 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
resolveSessionAgentId: vi.fn(() => "agent-from-key"),
resolveSessionDeliveryTarget: vi.fn(() => ({
channel: "whatsapp",
to: "+15550001",
accountId: "acct-1",
threadId: "thread-1",
})),
normalizeMessageChannel: vi.fn((channel: string) => channel),
isDeliverableMessageChannel: vi.fn(() => true),
deliverOutboundPayloads: vi.fn(async () => []),
enqueueSystemEvent: vi.fn(),
}));
vi.mock("../agents/agent-scope.js", () => ({
resolveSessionAgentId: mocks.resolveSessionAgentId,
}));
vi.mock("../utils/message-channel.js", () => ({
normalizeMessageChannel: mocks.normalizeMessageChannel,
isDeliverableMessageChannel: mocks.isDeliverableMessageChannel,
}));
vi.mock("./outbound/targets.js", () => ({
resolveSessionDeliveryTarget: mocks.resolveSessionDeliveryTarget,
}));
vi.mock("./outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
}));
vi.mock("./system-events.js", () => ({
enqueueSystemEvent: mocks.enqueueSystemEvent,
}));
const { deliverSessionMaintenanceWarning } = await import("./session-maintenance-warning.js");
describe("deliverSessionMaintenanceWarning", () => {
let prevVitest: string | undefined;
let prevNodeEnv: string | undefined;
beforeEach(() => {
prevVitest = process.env.VITEST;
prevNodeEnv = process.env.NODE_ENV;
delete process.env.VITEST;
process.env.NODE_ENV = "development";
mocks.resolveSessionAgentId.mockClear();
mocks.resolveSessionDeliveryTarget.mockClear();
mocks.normalizeMessageChannel.mockClear();
mocks.isDeliverableMessageChannel.mockClear();
mocks.deliverOutboundPayloads.mockClear();
mocks.enqueueSystemEvent.mockClear();
});
afterEach(() => {
if (prevVitest === undefined) {
delete process.env.VITEST;
} else {
process.env.VITEST = prevVitest;
}
if (prevNodeEnv === undefined) {
delete process.env.NODE_ENV;
} else {
process.env.NODE_ENV = prevNodeEnv;
}
});
it("forwards session context to outbound delivery", async () => {
await deliverSessionMaintenanceWarning({
cfg: {},
sessionKey: "agent:main:main",
entry: {} as never,
warning: {
activeSessionKey: "agent:main:main",
pruneAfterMs: 1_000,
maxEntries: 100,
wouldPrune: true,
wouldCap: false,
} as never,
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
channel: "whatsapp",
to: "+15550001",
session: { key: "agent:main:main", agentId: "agent-from-key" },
}),
);
expect(mocks.enqueueSystemEvent).not.toHaveBeenCalled();
});
});

View File

@@ -1,8 +1,8 @@
import { resolveSessionAgentId } from "../agents/agent-scope.js";
import type { OpenClawConfig } from "../config/config.js";
import type { SessionEntry, SessionMaintenanceWarning } from "../config/sessions.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { isDeliverableMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js";
import { buildOutboundSessionContext } from "./outbound/session-context.js";
import { resolveSessionDeliveryTarget } from "./outbound/targets.js";
import { enqueueSystemEvent } from "./system-events.js";
@@ -96,6 +96,10 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P
try {
const { deliverOutboundPayloads } = await import("./outbound/deliver.js");
const outboundSession = buildOutboundSessionContext({
cfg: params.cfg,
sessionKey: params.sessionKey,
});
await deliverOutboundPayloads({
cfg: params.cfg,
channel,
@@ -103,8 +107,7 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P
accountId: target.accountId,
threadId: target.threadId,
payloads: [{ text }],
agentId: resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg }),
sessionKey: params.sessionKey,
session: outboundSession,
});
} catch (err) {
log.warn(`Failed to deliver session maintenance warning: ${String(err)}`);