refactor: centralize followup origin routing helpers

This commit is contained in:
Peter Steinberger
2026-02-24 23:28:26 +00:00
parent 9b53102100
commit 54648a9cf1
9 changed files with 183 additions and 79 deletions

View File

@@ -6,6 +6,11 @@ import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyPayload } from "../types.js";
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import {
resolveOriginAccountId,
resolveOriginMessageProvider,
resolveOriginMessageTo,
} from "./origin-routing.js";
import { normalizeReplyPayloadDirectives } from "./reply-delivery.js";
import {
applyReplyThreading,
@@ -87,10 +92,17 @@ export function buildReplyPayloads(params: {
const messagingToolSentTexts = params.messagingToolSentTexts ?? [];
const messagingToolSentTargets = params.messagingToolSentTargets ?? [];
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
messageProvider: params.originatingChannel ?? params.messageProvider,
messageProvider: resolveOriginMessageProvider({
originatingChannel: params.originatingChannel,
provider: params.messageProvider,
}),
messagingToolSentTargets,
originatingTo: params.originatingTo,
accountId: params.accountId,
originatingTo: resolveOriginMessageTo({
originatingTo: params.originatingTo,
}),
accountId: resolveOriginAccountId({
originatingAccountId: params.accountId,
}),
});
// Only dedupe against messaging tool sends for the same origin target.
// Cross-target sends (for example posting to another channel) must not

View File

@@ -9,6 +9,7 @@ import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { estimateUsageCost, formatTokenCount, formatUsd } from "../../utils/usage-format.js";
import type { TemplateContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js";
import type { FollowupRun } from "./queue.js";
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i;
@@ -196,12 +197,15 @@ export function buildEmbeddedContextFromTemplate(params: {
sessionId: params.run.sessionId,
sessionKey: params.run.sessionKey,
agentId: params.run.agentId,
messageProvider:
params.sessionCtx.OriginatingChannel?.trim().toLowerCase() ||
params.sessionCtx.Provider?.trim().toLowerCase() ||
undefined,
messageProvider: resolveOriginMessageProvider({
originatingChannel: params.sessionCtx.OriginatingChannel,
provider: params.sessionCtx.Provider,
}),
agentAccountId: params.sessionCtx.AccountId,
messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To,
messageTo: resolveOriginMessageTo({
originatingTo: params.sessionCtx.OriginatingTo,
to: params.sessionCtx.To,
}),
messageThreadId: params.sessionCtx.MessageThreadId ?? undefined,
// Provider threading context for tool auto-injection
...buildThreadingToolContext({

View File

@@ -43,6 +43,7 @@ import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.j
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
import { createFollowupRunner } from "./followup-runner.js";
import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js";
import {
auditPostCompactionReads,
extractReadPaths,
@@ -179,11 +180,10 @@ export async function runReplyAgent(params: {
const pendingToolTasks = new Set<Promise<void>>();
const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
const replyToChannel =
sessionCtx.OriginatingChannel ??
((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as
| OriginatingChannelType
| undefined);
const replyToChannel = resolveOriginMessageProvider({
originatingChannel: sessionCtx.OriginatingChannel,
provider: sessionCtx.Surface ?? sessionCtx.Provider,
}) as OriginatingChannelType | undefined;
const replyToMode = resolveReplyToMode(
followupRun.run.config,
replyToChannel,
@@ -515,7 +515,10 @@ export async function runReplyAgent(params: {
messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls,
messagingToolSentTargets: runResult.messagingToolSentTargets,
originatingChannel: sessionCtx.OriginatingChannel,
originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To,
originatingTo: resolveOriginMessageTo({
originatingTo: sessionCtx.OriginatingTo,
to: sessionCtx.To,
}),
accountId: sessionCtx.AccountId,
});
const { replyPayloads } = payloadResult;

View File

@@ -63,6 +63,20 @@ const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun =>
},
}) as FollowupRun;
function createQueuedRun(
overrides: Partial<FollowupRun> & { run?: Partial<FollowupRun["run"]> } = {},
): FollowupRun {
const base = baseQueuedRun();
return {
...base,
...overrides,
run: {
...base.run,
...overrides.run,
},
};
}
function mockCompactionRun(params: {
willRetry: boolean;
result: {
@@ -114,32 +128,11 @@ describe("createFollowupRunner compaction", () => {
defaultModel: "anthropic/claude-opus-4-5",
});
const queued = {
prompt: "hello",
summaryLine: "hello",
enqueuedAt: Date.now(),
const queued = createQueuedRun({
run: {
sessionId: "session",
sessionKey: "main",
messageProvider: "whatsapp",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
config: {},
skillsSnapshot: {},
provider: "anthropic",
model: "claude",
thinkLevel: "low",
verboseLevel: "on",
elevatedLevel: "off",
bashElevated: {
enabled: false,
allowed: false,
defaultLevel: "off",
},
timeoutMs: 1_000,
blockReplyBreak: "message_end",
},
} as FollowupRun;
});
await runner(queued);
@@ -411,7 +404,7 @@ describe("createFollowupRunner agentDir forwarding", () => {
defaultModel: "anthropic/claude-opus-4-5",
});
const agentDir = path.join("/tmp", "agent-dir");
const queued = baseQueuedRun();
const queued = createQueuedRun();
await runner({
...queued,
run: {

View File

@@ -14,6 +14,11 @@ import type { OriginatingChannelType } from "../templating.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { resolveRunAuthProfile } from "./agent-runner-utils.js";
import {
resolveOriginAccountId,
resolveOriginMessageProvider,
resolveOriginMessageTo,
} from "./origin-routing.js";
import type { FollowupRun } from "./queue.js";
import {
applyReplyThreading,
@@ -231,9 +236,10 @@ export function createFollowupRunner(params: {
}
return [{ ...payload, text: stripped.text }];
});
const replyToChannel =
queued.originatingChannel ??
(queued.run.messageProvider?.toLowerCase() as OriginatingChannelType | undefined);
const replyToChannel = resolveOriginMessageProvider({
originatingChannel: queued.originatingChannel,
provider: queued.run.messageProvider,
}) as OriginatingChannelType | undefined;
const replyToMode = resolveReplyToMode(
queued.run.config,
replyToChannel,
@@ -256,10 +262,18 @@ export function createFollowupRunner(params: {
sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [],
});
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
messageProvider: queued.originatingChannel ?? queued.run.messageProvider,
messageProvider: resolveOriginMessageProvider({
originatingChannel: queued.originatingChannel,
provider: queued.run.messageProvider,
}),
messagingToolSentTargets: runResult.messagingToolSentTargets,
originatingTo: queued.originatingTo,
accountId: queued.originatingAccountId ?? queued.run.agentAccountId,
originatingTo: resolveOriginMessageTo({
originatingTo: queued.originatingTo,
}),
accountId: resolveOriginAccountId({
originatingAccountId: queued.originatingAccountId,
accountId: queued.run.agentAccountId,
}),
});
const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;

View File

@@ -40,6 +40,7 @@ import type { InlineDirectives } from "./directive-handling.js";
import { buildGroupChatContext, buildGroupIntro } from "./groups.js";
import { buildInboundMetaSystemPrompt, buildInboundUserContextPrefix } from "./inbound-meta.js";
import type { createModelSelectionState } from "./model-selection.js";
import { resolveOriginMessageProvider } from "./origin-routing.js";
import { resolveQueueSettings } from "./queue.js";
import { routeReply } from "./route-reply.js";
import { BARE_SESSION_RESET_PROMPT } from "./session-reset-prompt.js";
@@ -460,10 +461,10 @@ export async function runPreparedReply(
agentDir,
sessionId: sessionIdFinal,
sessionKey,
messageProvider:
sessionCtx.OriginatingChannel?.trim().toLowerCase() ||
sessionCtx.Provider?.trim().toLowerCase() ||
undefined,
messageProvider: resolveOriginMessageProvider({
originatingChannel: sessionCtx.OriginatingChannel,
provider: sessionCtx.Provider,
}),
agentAccountId: sessionCtx.AccountId,
groupId: resolveGroupSessionKey(sessionCtx)?.id ?? undefined,
groupChannel: sessionCtx.GroupChannel?.trim() ?? sessionCtx.GroupSubject?.trim(),

View File

@@ -0,0 +1,43 @@
import { describe, expect, it } from "vitest";
import {
resolveOriginAccountId,
resolveOriginMessageProvider,
resolveOriginMessageTo,
} from "./origin-routing.js";
describe("origin-routing helpers", () => {
it("prefers originating channel over provider for message provider", () => {
const provider = resolveOriginMessageProvider({
originatingChannel: "Telegram",
provider: "heartbeat",
});
expect(provider).toBe("telegram");
});
it("falls back to provider when originating channel is missing", () => {
const provider = resolveOriginMessageProvider({
provider: " Slack ",
});
expect(provider).toBe("slack");
});
it("prefers originating destination over fallback destination", () => {
const to = resolveOriginMessageTo({
originatingTo: "channel:C1",
to: "channel:C2",
});
expect(to).toBe("channel:C1");
});
it("prefers originating account over fallback account", () => {
const accountId = resolveOriginAccountId({
originatingAccountId: "work",
accountId: "personal",
});
expect(accountId).toBe("work");
});
});

View File

@@ -0,0 +1,29 @@
import type { OriginatingChannelType } from "../templating.js";
function normalizeProviderValue(value?: string): string | undefined {
const normalized = value?.trim().toLowerCase();
return normalized || undefined;
}
export function resolveOriginMessageProvider(params: {
originatingChannel?: OriginatingChannelType;
provider?: string;
}): string | undefined {
return (
normalizeProviderValue(params.originatingChannel) ?? normalizeProviderValue(params.provider)
);
}
export function resolveOriginMessageTo(params: {
originatingTo?: string;
to?: string;
}): string | undefined {
return params.originatingTo ?? params.to;
}
export function resolveOriginAccountId(params: {
originatingAccountId?: string;
accountId?: string;
}): string | undefined {
return params.originatingAccountId ?? params.accountId;
}

View File

@@ -13,6 +13,39 @@ import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import type { FollowupRun } from "./types.js";
type OriginRoutingMetadata = Pick<
FollowupRun,
"originatingChannel" | "originatingTo" | "originatingAccountId" | "originatingThreadId"
>;
function resolveOriginRoutingMetadata(items: FollowupRun[]): OriginRoutingMetadata {
return {
originatingChannel: items.find((item) => item.originatingChannel)?.originatingChannel,
originatingTo: items.find((item) => item.originatingTo)?.originatingTo,
originatingAccountId: items.find((item) => item.originatingAccountId)?.originatingAccountId,
// Support both number (Telegram topic) and string (Slack thread_ts) thread IDs.
originatingThreadId: items.find(
(item) => item.originatingThreadId != null && item.originatingThreadId !== "",
)?.originatingThreadId,
};
}
function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string } {
const { originatingChannel: channel, originatingTo: to, originatingAccountId: accountId } = item;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && (threadId == null || threadId === "")) {
return {};
}
if (!isRoutableChannel(channel) || !to) {
return { cross: true };
}
// Support both number (Telegram topic IDs) and string (Slack thread_ts) thread IDs.
const threadKey = threadId != null && threadId !== "" ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
@@ -33,23 +66,7 @@ export function scheduleFollowupDrain(
// Debug: `pnpm test src/auto-reply/reply/reply-flow.test.ts`
// Check if messages span multiple channels.
// If so, process individually to preserve per-message routing.
const isCrossChannel = hasCrossChannelItems(queue.items, (item) => {
const channel = item.originatingChannel;
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && (threadId == null || threadId === "")) {
return {};
}
if (!isRoutableChannel(channel) || !to) {
return { cross: true };
}
// Support both number (Telegram topic IDs) and string (Slack thread_ts) thread IDs.
const threadKey = threadId != null && threadId !== "" ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
});
const isCrossChannel = hasCrossChannelItems(queue.items, resolveCrossChannelKey);
const collectDrainResult = await drainCollectQueueStep({
collectState,
@@ -71,16 +88,7 @@ export function scheduleFollowupDrain(
break;
}
// Preserve originating channel from items when collecting same-channel.
const originatingChannel = items.find((i) => i.originatingChannel)?.originatingChannel;
const originatingTo = items.find((i) => i.originatingTo)?.originatingTo;
const originatingAccountId = items.find(
(i) => i.originatingAccountId,
)?.originatingAccountId;
// Support both number (Telegram topic) and string (Slack thread_ts) thread IDs.
const originatingThreadId = items.find(
(i) => i.originatingThreadId != null && i.originatingThreadId !== "",
)?.originatingThreadId;
const routing = resolveOriginRoutingMetadata(items);
const prompt = buildCollectPrompt({
title: "[Queued messages while agent was busy]",
@@ -92,10 +100,7 @@ export function scheduleFollowupDrain(
prompt,
run,
enqueuedAt: Date.now(),
originatingChannel,
originatingTo,
originatingAccountId,
originatingThreadId,
...routing,
});
queue.items.splice(0, items.length);
if (summary) {