mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 10:57:26 +00:00
fix(telegram): scope polling offsets per bot and await shared runner stop (#24549)
* Telegram: scope polling offsets and await shared runner stop * Changelog: remove unrelated session-fix entries from PR * Update CHANGELOG.md
This commit is contained in:
@@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Security/OTEL: redact sensitive values (API keys, tokens, credential fields) from diagnostics-otel log bodies, log attributes, and error/reason span fields before OTLP export. (#12542) Thanks @brandonwise.
|
||||
- Providers/OpenRouter: remove conflicting top-level `reasoning_effort` when injecting nested `reasoning.effort`, preventing OpenRouter 400 payload-validation failures for reasoning models. (#24120) thanks @tenequm.
|
||||
- Skills/Python: add CI + pre-commit linting (`ruff`) and pytest discovery coverage for Python scripts/tests under `skills/`, including package test execution from repo root. Thanks @vincentkoc.
|
||||
- Telegram/Polling: scope persisted polling offsets to bot identity and reuse a single awaited runner-stop path on abort/retry, preventing cross-token offset bleed and overlapping pollers during restart/error recovery. (#10850, #11347) Thanks @talhaorak, @anooprdawar, and @vincentkoc.
|
||||
- Sessions/Store: canonicalize inbound mixed-case session keys for metadata and route updates, and migrate legacy case-variant entries to a single lowercase key to prevent duplicate sessions and missing TUI/WebUI history. (#9561) Thanks @hillghost86.
|
||||
- Security/CI: add pre-commit security hook coverage for private-key detection and production dependency auditing, and enforce those checks in CI alongside baseline secret scanning. Thanks @vincentkoc.
|
||||
- Skills/Python: harden skill script packaging and validation edge cases (self-including `.skill` outputs, CRLF frontmatter parsing, strict `--days` validation, and safer image file loading), with expanded Python regression coverage. Thanks @vincentkoc.
|
||||
|
||||
@@ -135,6 +135,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
|
||||
let lastUpdateId = await readTelegramUpdateOffset({
|
||||
accountId: account.accountId,
|
||||
botToken: token,
|
||||
});
|
||||
const persistUpdateId = async (updateId: number) => {
|
||||
if (lastUpdateId !== null && updateId <= lastUpdateId) {
|
||||
@@ -145,6 +146,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
await writeTelegramUpdateOffset({
|
||||
accountId: account.accountId,
|
||||
updateId,
|
||||
botToken: token,
|
||||
});
|
||||
} catch (err) {
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
@@ -257,9 +259,18 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
|
||||
const runner = run(bot, runnerOptions);
|
||||
activeRunner = runner;
|
||||
let stopPromise: Promise<void> | undefined;
|
||||
const stopRunner = () => {
|
||||
stopPromise ??= Promise.resolve(runner.stop())
|
||||
.then(() => undefined)
|
||||
.catch(() => {
|
||||
// Runner may already be stopped by abort/retry paths.
|
||||
});
|
||||
return stopPromise;
|
||||
};
|
||||
const stopOnAbort = () => {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
void runner.stop();
|
||||
void stopRunner();
|
||||
}
|
||||
};
|
||||
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
||||
@@ -304,11 +315,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
}
|
||||
} finally {
|
||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||
try {
|
||||
await runner.stop();
|
||||
} catch {
|
||||
// Runner may already be stopped by abort/retry paths.
|
||||
}
|
||||
await stopRunner();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { withStateDirEnv } from "../test-helpers/state-dir-env.js";
|
||||
import {
|
||||
@@ -34,4 +36,46 @@ describe("deleteTelegramUpdateOffset", () => {
|
||||
expect(await readTelegramUpdateOffset({ accountId: "alerts" })).toBe(200);
|
||||
});
|
||||
});
|
||||
|
||||
it("returns null when stored offset was written by a different bot token", async () => {
|
||||
await withStateDirEnv("openclaw-tg-offset-", async () => {
|
||||
await writeTelegramUpdateOffset({
|
||||
accountId: "default",
|
||||
updateId: 321,
|
||||
botToken: "111111:token-a",
|
||||
});
|
||||
|
||||
expect(
|
||||
await readTelegramUpdateOffset({
|
||||
accountId: "default",
|
||||
botToken: "222222:token-b",
|
||||
}),
|
||||
).toBeNull();
|
||||
expect(
|
||||
await readTelegramUpdateOffset({
|
||||
accountId: "default",
|
||||
botToken: "111111:token-a",
|
||||
}),
|
||||
).toBe(321);
|
||||
});
|
||||
});
|
||||
|
||||
it("treats legacy offset records without bot identity as stale when token is provided", async () => {
|
||||
await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => {
|
||||
const legacyPath = path.join(stateDir, "telegram", "update-offset-default.json");
|
||||
await fs.mkdir(path.dirname(legacyPath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
legacyPath,
|
||||
`${JSON.stringify({ version: 1, lastUpdateId: 777 }, null, 2)}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
expect(
|
||||
await readTelegramUpdateOffset({
|
||||
accountId: "default",
|
||||
botToken: "333333:token-c",
|
||||
}),
|
||||
).toBeNull();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,11 +4,12 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
|
||||
const STORE_VERSION = 1;
|
||||
const STORE_VERSION = 2;
|
||||
|
||||
type TelegramUpdateOffsetState = {
|
||||
version: number;
|
||||
lastUpdateId: number | null;
|
||||
botId: string | null;
|
||||
};
|
||||
|
||||
function normalizeAccountId(accountId?: string) {
|
||||
@@ -28,16 +29,43 @@ function resolveTelegramUpdateOffsetPath(
|
||||
return path.join(stateDir, "telegram", `update-offset-${normalized}.json`);
|
||||
}
|
||||
|
||||
function extractBotIdFromToken(token?: string): string | null {
|
||||
const trimmed = token?.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
const [rawBotId] = trimmed.split(":", 1);
|
||||
if (!rawBotId || !/^\d+$/.test(rawBotId)) {
|
||||
return null;
|
||||
}
|
||||
return rawBotId;
|
||||
}
|
||||
|
||||
function safeParseState(raw: string): TelegramUpdateOffsetState | null {
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as TelegramUpdateOffsetState;
|
||||
if (parsed?.version !== STORE_VERSION) {
|
||||
const parsed = JSON.parse(raw) as {
|
||||
version?: number;
|
||||
lastUpdateId?: number | null;
|
||||
botId?: string | null;
|
||||
};
|
||||
if (parsed?.version !== STORE_VERSION && parsed?.version !== 1) {
|
||||
return null;
|
||||
}
|
||||
if (parsed.lastUpdateId !== null && typeof parsed.lastUpdateId !== "number") {
|
||||
return null;
|
||||
}
|
||||
return parsed;
|
||||
if (
|
||||
parsed.version === STORE_VERSION &&
|
||||
parsed.botId !== null &&
|
||||
typeof parsed.botId !== "string"
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
version: STORE_VERSION,
|
||||
lastUpdateId: parsed.lastUpdateId ?? null,
|
||||
botId: parsed.version === STORE_VERSION ? (parsed.botId ?? null) : null,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
@@ -45,12 +73,20 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null {
|
||||
|
||||
export async function readTelegramUpdateOffset(params: {
|
||||
accountId?: string;
|
||||
botToken?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}): Promise<number | null> {
|
||||
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
|
||||
try {
|
||||
const raw = await fs.readFile(filePath, "utf-8");
|
||||
const parsed = safeParseState(raw);
|
||||
const expectedBotId = extractBotIdFromToken(params.botToken);
|
||||
if (expectedBotId && parsed?.botId && parsed.botId !== expectedBotId) {
|
||||
return null;
|
||||
}
|
||||
if (expectedBotId && parsed?.botId === null) {
|
||||
return null;
|
||||
}
|
||||
return parsed?.lastUpdateId ?? null;
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
@@ -64,6 +100,7 @@ export async function readTelegramUpdateOffset(params: {
|
||||
export async function writeTelegramUpdateOffset(params: {
|
||||
accountId?: string;
|
||||
updateId: number;
|
||||
botToken?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}): Promise<void> {
|
||||
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
|
||||
@@ -73,6 +110,7 @@ export async function writeTelegramUpdateOffset(params: {
|
||||
const payload: TelegramUpdateOffsetState = {
|
||||
version: STORE_VERSION,
|
||||
lastUpdateId: params.updateId,
|
||||
botId: extractBotIdFromToken(params.botToken),
|
||||
};
|
||||
await fs.writeFile(tmp, `${JSON.stringify(payload, null, 2)}\n`, {
|
||||
encoding: "utf-8",
|
||||
|
||||
Reference in New Issue
Block a user