refactor(sessions): share session thread/topic parsing

This commit is contained in:
Peter Steinberger
2026-02-18 17:55:01 +00:00
parent 1aa4d3a6f0
commit 0dc004fd21
3 changed files with 38 additions and 17 deletions

View File

@@ -17,7 +17,7 @@ vi.mock("./store.js", () => ({
loadSessionStore: () => storeState.store, loadSessionStore: () => storeState.store,
})); }));
import { extractDeliveryInfo } from "./delivery-info.js"; import { extractDeliveryInfo, parseSessionThreadInfo } from "./delivery-info.js";
const buildEntry = (deliveryContext: SessionEntry["deliveryContext"]): SessionEntry => ({ const buildEntry = (deliveryContext: SessionEntry["deliveryContext"]): SessionEntry => ({
sessionId: "session-1", sessionId: "session-1",
@@ -30,6 +30,25 @@ beforeEach(() => {
}); });
describe("extractDeliveryInfo", () => { describe("extractDeliveryInfo", () => {
it("parses base session and thread/topic ids", () => {
expect(parseSessionThreadInfo("agent:main:telegram:group:1:topic:55")).toEqual({
baseSessionKey: "agent:main:telegram:group:1",
threadId: "55",
});
expect(parseSessionThreadInfo("agent:main:slack:channel:C1:thread:123.456")).toEqual({
baseSessionKey: "agent:main:slack:channel:C1",
threadId: "123.456",
});
expect(parseSessionThreadInfo("agent:main:telegram:dm:user-1")).toEqual({
baseSessionKey: "agent:main:telegram:dm:user-1",
threadId: undefined,
});
expect(parseSessionThreadInfo(undefined)).toEqual({
baseSessionKey: undefined,
threadId: undefined,
});
});
it("returns deliveryContext for direct session keys", () => { it("returns deliveryContext for direct session keys", () => {
const sessionKey = "agent:main:webchat:dm:user-123"; const sessionKey = "agent:main:webchat:dm:user-123";
storeState.store[sessionKey] = buildEntry({ storeState.store[sessionKey] = buildEntry({

View File

@@ -6,12 +6,12 @@ import { loadSessionStore } from "./store.js";
* Extract deliveryContext and threadId from a sessionKey. * Extract deliveryContext and threadId from a sessionKey.
* Supports both :thread: (most channels) and :topic: (Telegram). * Supports both :thread: (most channels) and :topic: (Telegram).
*/ */
export function extractDeliveryInfo(sessionKey: string | undefined): { export function parseSessionThreadInfo(sessionKey: string | undefined): {
deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined; baseSessionKey: string | undefined;
threadId: string | undefined; threadId: string | undefined;
} { } {
if (!sessionKey) { if (!sessionKey) {
return { deliveryContext: undefined, threadId: undefined }; return { baseSessionKey: undefined, threadId: undefined };
} }
const topicIndex = sessionKey.lastIndexOf(":topic:"); const topicIndex = sessionKey.lastIndexOf(":topic:");
const threadIndex = sessionKey.lastIndexOf(":thread:"); const threadIndex = sessionKey.lastIndexOf(":thread:");
@@ -22,6 +22,17 @@ export function extractDeliveryInfo(sessionKey: string | undefined): {
const threadIdRaw = const threadIdRaw =
markerIndex === -1 ? undefined : sessionKey.slice(markerIndex + marker.length); markerIndex === -1 ? undefined : sessionKey.slice(markerIndex + marker.length);
const threadId = threadIdRaw?.trim() || undefined; const threadId = threadIdRaw?.trim() || undefined;
return { baseSessionKey, threadId };
}
export function extractDeliveryInfo(sessionKey: string | undefined): {
deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined;
threadId: string | undefined;
} {
const { baseSessionKey, threadId } = parseSessionThreadInfo(sessionKey);
if (!sessionKey || !baseSessionKey) {
return { deliveryContext: undefined, threadId };
}
let deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined; let deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined;
try { try {
@@ -29,7 +40,7 @@ export function extractDeliveryInfo(sessionKey: string | undefined): {
const storePath = resolveStorePath(cfg.session?.store); const storePath = resolveStorePath(cfg.session?.store);
const store = loadSessionStore(storePath); const store = loadSessionStore(storePath);
let entry = store[sessionKey]; let entry = store[sessionKey];
if (!entry?.deliveryContext && markerIndex !== -1 && baseSessionKey) { if (!entry?.deliveryContext && baseSessionKey !== sessionKey) {
entry = store[baseSessionKey]; entry = store[baseSessionKey];
} }
if (entry?.deliveryContext) { if (entry?.deliveryContext) {

View File

@@ -3,6 +3,7 @@ import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-help
import { normalizeChannelId } from "../channels/plugins/index.js"; import { normalizeChannelId } from "../channels/plugins/index.js";
import type { CliDeps } from "../cli/deps.js"; import type { CliDeps } from "../cli/deps.js";
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
import { parseSessionThreadInfo } from "../config/sessions/delivery-info.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js";
import { import {
@@ -30,17 +31,7 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) {
return; return;
} }
// Extract topic/thread ID from sessionKey (supports both :topic: and :thread:) const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey);
// Telegram uses :topic:, other platforms use :thread:
const topicIndex = sessionKey.lastIndexOf(":topic:");
const threadIndex = sessionKey.lastIndexOf(":thread:");
const markerIndex = Math.max(topicIndex, threadIndex);
const marker = topicIndex > threadIndex ? ":topic:" : ":thread:";
const baseSessionKey = markerIndex === -1 ? sessionKey : sessionKey.slice(0, markerIndex);
const threadIdRaw =
markerIndex === -1 ? undefined : sessionKey.slice(markerIndex + marker.length);
const sessionThreadId = threadIdRaw?.trim() || undefined;
const { cfg, entry } = loadSessionEntry(sessionKey); const { cfg, entry } = loadSessionEntry(sessionKey);
const parsedTarget = resolveAnnounceTargetFromKey(baseSessionKey); const parsedTarget = resolveAnnounceTargetFromKey(baseSessionKey);
@@ -49,7 +40,7 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) {
// Handles race condition where store wasn't flushed before restart // Handles race condition where store wasn't flushed before restart
const sentinelContext = payload.deliveryContext; const sentinelContext = payload.deliveryContext;
let sessionDeliveryContext = deliveryContextFromSession(entry); let sessionDeliveryContext = deliveryContextFromSession(entry);
if (!sessionDeliveryContext && markerIndex !== -1 && baseSessionKey) { if (!sessionDeliveryContext && baseSessionKey && baseSessionKey !== sessionKey) {
const { entry: baseEntry } = loadSessionEntry(baseSessionKey); const { entry: baseEntry } = loadSessionEntry(baseSessionKey);
sessionDeliveryContext = deliveryContextFromSession(baseEntry); sessionDeliveryContext = deliveryContextFromSession(baseEntry);
} }