fix(telegram): cron and heartbeat messages land in wrong chat instead of target topic (#19367)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: bf02bbf9ce
Co-authored-by: Lukavyi <1013690+Lukavyi@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Taras Lukavyi
2026-02-18 11:01:01 +01:00
committed by GitHub
parent 114736ed1a
commit d833dcd731
11 changed files with 441 additions and 15 deletions

View File

@@ -0,0 +1,131 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
// Mock session store so we can control what entries exist.
const mockStore: Record<string, Record<string, unknown>> = {};
vi.mock("../config/sessions.js", () => ({
loadSessionStore: vi.fn((storePath: string) => mockStore[storePath] ?? {}),
resolveAgentMainSessionKey: vi.fn(({ agentId }: { agentId: string }) => `agent:${agentId}:main`),
resolveStorePath: vi.fn((_store: unknown, _opts: unknown) => "/mock/store.json"),
}));
// Mock channel-selection to avoid real config resolution.
vi.mock("../infra/outbound/channel-selection.js", () => ({
resolveMessageChannelSelection: vi.fn(async () => ({ channel: "telegram" })),
}));
// Minimal mock for channel plugins (Telegram resolveTarget is an identity).
vi.mock("../channels/plugins/index.js", () => ({
getChannelPlugin: vi.fn(() => ({
meta: { label: "Telegram" },
config: {},
outbound: {
resolveTarget: ({ to }: { to?: string }) =>
to ? { ok: true, to } : { ok: false, error: new Error("missing") },
},
})),
normalizeChannelId: vi.fn((id: string) => id),
}));
const { resolveDeliveryTarget } = await import("./isolated-agent/delivery-target.js");
describe("resolveDeliveryTarget thread session lookup", () => {
const cfg: OpenClawConfig = {};
it("uses thread session entry when sessionKey is provided and entry exists", async () => {
mockStore["/mock/store.json"] = {
"agent:main:main": {
sessionId: "s1",
updatedAt: 1,
lastChannel: "telegram",
lastTo: "-100111",
},
"agent:main:main:thread:9999": {
sessionId: "s2",
updatedAt: 2,
lastChannel: "telegram",
lastTo: "-100111",
lastThreadId: 9999,
},
};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "last",
sessionKey: "agent:main:main:thread:9999",
});
expect(result.to).toBe("-100111");
expect(result.threadId).toBe(9999);
expect(result.channel).toBe("telegram");
});
it("falls back to main session when sessionKey entry does not exist", async () => {
mockStore["/mock/store.json"] = {
"agent:main:main": {
sessionId: "s1",
updatedAt: 1,
lastChannel: "telegram",
lastTo: "-100222",
},
};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "last",
sessionKey: "agent:main:main:thread:nonexistent",
});
expect(result.to).toBe("-100222");
expect(result.threadId).toBeUndefined();
expect(result.channel).toBe("telegram");
});
it("falls back to main session when no sessionKey is provided", async () => {
mockStore["/mock/store.json"] = {
"agent:main:main": {
sessionId: "s1",
updatedAt: 1,
lastChannel: "telegram",
lastTo: "-100333",
},
};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "last",
});
expect(result.to).toBe("-100333");
expect(result.threadId).toBeUndefined();
});
it("preserves threadId from :topic: in delivery.to on first run (no session history)", async () => {
mockStore["/mock/store.json"] = {};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "telegram",
to: "63448508:topic:1008013",
});
expect(result.to).toBe("63448508");
expect(result.threadId).toBe(1008013);
expect(result.channel).toBe("telegram");
});
it("preserves threadId from :topic: when lastTo differs", async () => {
mockStore["/mock/store.json"] = {
"agent:main:main": {
sessionId: "s1",
updatedAt: 1,
lastChannel: "telegram",
lastTo: "-100999",
},
};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "telegram",
to: "63448508:topic:1008013",
});
expect(result.to).toBe("63448508");
expect(result.threadId).toBe(1008013);
});
});

View File

@@ -21,6 +21,7 @@ export async function resolveDeliveryTarget(
jobPayload: {
channel?: "last" | ChannelId;
to?: string;
sessionKey?: string;
},
): Promise<{
channel: Exclude<OutboundChannel, "none">;
@@ -38,7 +39,12 @@ export async function resolveDeliveryTarget(
const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId });
const storePath = resolveStorePath(sessionCfg?.store, { agentId });
const store = loadSessionStore(storePath);
const main = store[mainSessionKey];
// Look up thread-specific session first (e.g. agent:main:main:thread:1234),
// then fall back to the main session entry.
const threadSessionKey = jobPayload.sessionKey?.trim();
const threadEntry = threadSessionKey ? store[threadSessionKey] : undefined;
const main = threadEntry ?? store[mainSessionKey];
const preliminary = resolveSessionDeliveryTarget({
entry: main,
@@ -86,12 +92,13 @@ export async function resolveDeliveryTarget(
}
}
// Only carry threadId when delivering to the same recipient as the session's
// last conversation. This prevents stale thread IDs (e.g. from a Telegram
// supergroup topic) from being sent to a different target (e.g. a private
// chat) where they would cause API errors.
// Carry threadId when it was explicitly set (from :topic: parsing or config)
// or when delivering to the same recipient as the session's last conversation.
// Session-derived threadIds are dropped when the target differs to prevent
// stale thread IDs from leaking to a different chat.
const threadId =
resolved.threadId && resolved.to && resolved.to === resolved.lastTo
resolved.threadId &&
(resolved.threadIdExplicit || (resolved.to && resolved.to === resolved.lastTo))
? resolved.threadId
: undefined;

View File

@@ -363,6 +363,7 @@ export async function runCronIsolatedAgentTurn(params: {
const resolvedDelivery = await resolveDeliveryTarget(cfgWithAgentDefaults, agentId, {
channel: deliveryPlan.channel ?? "last",
to: deliveryPlan.to,
sessionKey: params.job.sessionKey,
});
const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now);