mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 19:07:26 +00:00
feat(cron): add --account flag for multi-account delivery routing (#26284)
* feat(cron): add --account flag for multi-account delivery routing Add support for explicit delivery account routing in cron jobs across CLI, normalization, delivery planning, and isolated delivery target resolution. Highlights: - Add --account <id> to cron add and cron edit - Add optional delivery.accountId to cron types and delivery plan - Normalize and trim delivery.accountId in cron create/update normalization - Prefer explicit accountId over session lastAccountId and bindings fallback - Thread accountId through isolated cron run delivery resolution - Preserve cron edit --best-effort-deliver/--no-best-effort-deliver behavior by keeping implicit announce mode - Expand tests for account passthrough/merge/precedence and CLI account flows * cron: resolve rebase duplicate accountId fields --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -40,15 +40,21 @@ const { registerCronCli } = await import("./cron-cli.js");
|
||||
type CronUpdatePatch = {
|
||||
patch?: {
|
||||
schedule?: { kind?: string; expr?: string; tz?: string; staggerMs?: number };
|
||||
payload?: { message?: string; model?: string; thinking?: string };
|
||||
delivery?: { mode?: string; channel?: string; to?: string; bestEffort?: boolean };
|
||||
payload?: { kind?: string; message?: string; model?: string; thinking?: string };
|
||||
delivery?: {
|
||||
mode?: string;
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
bestEffort?: boolean;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
type CronAddParams = {
|
||||
schedule?: { kind?: string; staggerMs?: number };
|
||||
payload?: { model?: string; thinking?: string };
|
||||
delivery?: { mode?: string };
|
||||
delivery?: { mode?: string; accountId?: string };
|
||||
deleteAfterRun?: boolean;
|
||||
agentId?: string;
|
||||
sessionTarget?: string;
|
||||
@@ -246,6 +252,40 @@ describe("cron cli", () => {
|
||||
expect(params?.deleteAfterRun).toBe(false);
|
||||
});
|
||||
|
||||
it("includes --account on isolated cron add delivery", async () => {
|
||||
const params = await runCronAddAndGetParams([
|
||||
"--name",
|
||||
"accounted add",
|
||||
"--cron",
|
||||
"* * * * *",
|
||||
"--session",
|
||||
"isolated",
|
||||
"--message",
|
||||
"hello",
|
||||
"--account",
|
||||
" coordinator ",
|
||||
]);
|
||||
expect(params?.delivery?.mode).toBe("announce");
|
||||
expect(params?.delivery?.accountId).toBe("coordinator");
|
||||
});
|
||||
|
||||
it("rejects --account on non-isolated/systemEvent cron add", async () => {
|
||||
await expectCronCommandExit([
|
||||
"cron",
|
||||
"add",
|
||||
"--name",
|
||||
"invalid account add",
|
||||
"--cron",
|
||||
"* * * * *",
|
||||
"--session",
|
||||
"main",
|
||||
"--system-event",
|
||||
"tick",
|
||||
"--account",
|
||||
"coordinator",
|
||||
]);
|
||||
});
|
||||
|
||||
it.each([
|
||||
{ command: "enable" as const, expectedEnabled: true },
|
||||
{ command: "disable" as const, expectedEnabled: false },
|
||||
@@ -354,6 +394,13 @@ describe("cron cli", () => {
|
||||
expect(patch?.patch?.delivery?.mode).toBe("none");
|
||||
});
|
||||
|
||||
it("updates delivery account without requiring --message on cron edit", async () => {
|
||||
const patch = await runCronEditAndGetPatch(["--account", " coordinator "]);
|
||||
expect(patch?.patch?.payload?.kind).toBe("agentTurn");
|
||||
expect(patch?.patch?.delivery?.accountId).toBe("coordinator");
|
||||
expect(patch?.patch?.delivery?.mode).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not include undefined delivery fields when updating message", async () => {
|
||||
// Update message without delivery flags - should NOT include undefined delivery fields
|
||||
await runCronCommand(["cron", "edit", "job-1", "--message", "Updated message"]);
|
||||
|
||||
@@ -92,6 +92,7 @@ export function registerCronAddCommand(cron: Command) {
|
||||
"--to <dest>",
|
||||
"Delivery destination (E.164, Telegram chatId, or Discord channel/user)",
|
||||
)
|
||||
.option("--account <id>", "Channel account id for delivery (multi-account setups)")
|
||||
.option("--best-effort-deliver", "Do not fail the job if delivery fails", false)
|
||||
.option("--json", "Output JSON", false)
|
||||
.action(async (opts: GatewayRpcOpts & Record<string, unknown>, cmd?: Command) => {
|
||||
@@ -221,6 +222,15 @@ export function registerCronAddCommand(cron: Command) {
|
||||
throw new Error("--announce/--no-deliver require --session isolated.");
|
||||
}
|
||||
|
||||
const accountId =
|
||||
typeof opts.account === "string" && opts.account.trim()
|
||||
? opts.account.trim()
|
||||
: undefined;
|
||||
|
||||
if (accountId && (sessionTarget !== "isolated" || payload.kind !== "agentTurn")) {
|
||||
throw new Error("--account requires an isolated agentTurn job with delivery.");
|
||||
}
|
||||
|
||||
const deliveryMode =
|
||||
sessionTarget === "isolated" && payload.kind === "agentTurn"
|
||||
? hasAnnounce
|
||||
@@ -265,6 +275,7 @@ export function registerCronAddCommand(cron: Command) {
|
||||
? opts.channel.trim()
|
||||
: undefined,
|
||||
to: typeof opts.to === "string" && opts.to.trim() ? opts.to.trim() : undefined,
|
||||
accountId,
|
||||
bestEffort: opts.bestEffortDeliver ? true : undefined,
|
||||
}
|
||||
: undefined,
|
||||
|
||||
@@ -59,6 +59,7 @@ export function registerCronEditCommand(cron: Command) {
|
||||
"--to <dest>",
|
||||
"Delivery destination (E.164, Telegram chatId, or Discord channel/user)",
|
||||
)
|
||||
.option("--account <id>", "Channel account id for delivery (multi-account setups)")
|
||||
.option("--best-effort-deliver", "Do not fail job if delivery fails")
|
||||
.option("--no-best-effort-deliver", "Fail job when delivery fails")
|
||||
.action(async (id, opts) => {
|
||||
@@ -209,6 +210,7 @@ export function registerCronEditCommand(cron: Command) {
|
||||
const hasTimeoutSeconds = Boolean(timeoutSeconds && Number.isFinite(timeoutSeconds));
|
||||
const hasDeliveryModeFlag = opts.announce || typeof opts.deliver === "boolean";
|
||||
const hasDeliveryTarget = typeof opts.channel === "string" || typeof opts.to === "string";
|
||||
const hasDeliveryAccount = typeof opts.account === "string";
|
||||
const hasBestEffort = typeof opts.bestEffortDeliver === "boolean";
|
||||
const hasAgentTurnPatch =
|
||||
typeof opts.message === "string" ||
|
||||
@@ -217,6 +219,7 @@ export function registerCronEditCommand(cron: Command) {
|
||||
hasTimeoutSeconds ||
|
||||
hasDeliveryModeFlag ||
|
||||
hasDeliveryTarget ||
|
||||
hasDeliveryAccount ||
|
||||
hasBestEffort;
|
||||
if (hasSystemEventPatch && hasAgentTurnPatch) {
|
||||
throw new Error("Choose at most one payload change");
|
||||
@@ -235,14 +238,14 @@ export function registerCronEditCommand(cron: Command) {
|
||||
patch.payload = payload;
|
||||
}
|
||||
|
||||
if (hasDeliveryModeFlag || hasDeliveryTarget || hasBestEffort) {
|
||||
const deliveryMode =
|
||||
opts.announce || opts.deliver === true
|
||||
? "announce"
|
||||
: opts.deliver === false
|
||||
? "none"
|
||||
: "announce";
|
||||
const delivery: Record<string, unknown> = { mode: deliveryMode };
|
||||
if (hasDeliveryModeFlag || hasDeliveryTarget || hasDeliveryAccount || hasBestEffort) {
|
||||
const delivery: Record<string, unknown> = {};
|
||||
if (hasDeliveryModeFlag) {
|
||||
delivery.mode = opts.announce || opts.deliver === true ? "announce" : "none";
|
||||
} else if (hasBestEffort) {
|
||||
// Back-compat: toggling best-effort alone has historically implied announce mode.
|
||||
delivery.mode = "announce";
|
||||
}
|
||||
if (typeof opts.channel === "string") {
|
||||
const channel = opts.channel.trim();
|
||||
delivery.channel = channel ? channel : undefined;
|
||||
@@ -251,6 +254,10 @@ export function registerCronEditCommand(cron: Command) {
|
||||
const to = opts.to.trim();
|
||||
delivery.to = to ? to : undefined;
|
||||
}
|
||||
if (typeof opts.account === "string") {
|
||||
const account = opts.account.trim();
|
||||
delivery.accountId = account ? account : undefined;
|
||||
}
|
||||
if (typeof opts.bestEffortDeliver === "boolean") {
|
||||
delivery.bestEffort = opts.bestEffortDeliver;
|
||||
}
|
||||
|
||||
@@ -43,6 +43,31 @@ describe("resolveCronDeliveryPlan", () => {
|
||||
expect(plan.requested).toBe(false);
|
||||
});
|
||||
|
||||
it("passes through accountId from delivery config", () => {
|
||||
const plan = resolveCronDeliveryPlan(
|
||||
makeJob({
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-1003816714067",
|
||||
accountId: "coordinator",
|
||||
},
|
||||
}),
|
||||
);
|
||||
expect(plan.mode).toBe("announce");
|
||||
expect(plan.accountId).toBe("coordinator");
|
||||
expect(plan.to).toBe("-1003816714067");
|
||||
});
|
||||
|
||||
it("returns undefined accountId when not set", () => {
|
||||
const plan = resolveCronDeliveryPlan(
|
||||
makeJob({
|
||||
delivery: { mode: "announce", channel: "telegram", to: "123" },
|
||||
}),
|
||||
);
|
||||
expect(plan.accountId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("resolves webhook mode without channel routing", () => {
|
||||
const plan = resolveCronDeliveryPlan(
|
||||
makeJob({
|
||||
|
||||
@@ -4,6 +4,7 @@ export type CronDeliveryPlan = {
|
||||
mode: CronDeliveryMode;
|
||||
channel?: CronMessageChannel;
|
||||
to?: string;
|
||||
/** Explicit channel account id from the delivery config, if set. */
|
||||
accountId?: string;
|
||||
source: "delivery" | "payload";
|
||||
requested: boolean;
|
||||
@@ -59,12 +60,11 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
|
||||
(delivery as { channel?: unknown } | undefined)?.channel,
|
||||
);
|
||||
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
|
||||
const channel = deliveryChannel ?? payloadChannel ?? "last";
|
||||
const to = deliveryTo ?? payloadTo;
|
||||
const deliveryAccountId = normalizeAccountId(
|
||||
(delivery as { accountId?: unknown } | undefined)?.accountId,
|
||||
);
|
||||
|
||||
const channel = deliveryChannel ?? payloadChannel ?? "last";
|
||||
const to = deliveryTo ?? payloadTo;
|
||||
if (hasDelivery) {
|
||||
const resolvedMode = mode ?? "announce";
|
||||
return {
|
||||
|
||||
@@ -110,6 +110,27 @@ describe("resolveDeliveryTarget thread session lookup", () => {
|
||||
expect(result.channel).toBe("telegram");
|
||||
});
|
||||
|
||||
it("explicit accountId overrides session lastAccountId", async () => {
|
||||
mockStore["/mock/store.json"] = {
|
||||
"agent:main:main": {
|
||||
sessionId: "s1",
|
||||
updatedAt: 1,
|
||||
lastChannel: "telegram",
|
||||
lastTo: "-100444",
|
||||
lastAccountId: "session-account",
|
||||
},
|
||||
};
|
||||
|
||||
const result = await resolveDeliveryTarget(cfg, "main", {
|
||||
channel: "telegram",
|
||||
to: "-100444",
|
||||
accountId: "explicit-account",
|
||||
});
|
||||
|
||||
expect(result.accountId).toBe("explicit-account");
|
||||
expect(result.to).toBe("-100444");
|
||||
});
|
||||
|
||||
it("preserves threadId from :topic: when lastTo differs", async () => {
|
||||
mockStore["/mock/store.json"] = {
|
||||
"agent:main:main": {
|
||||
|
||||
@@ -42,8 +42,8 @@ export async function resolveDeliveryTarget(
|
||||
jobPayload: {
|
||||
channel?: "last" | ChannelId;
|
||||
to?: string;
|
||||
sessionKey?: string;
|
||||
accountId?: string;
|
||||
sessionKey?: string;
|
||||
},
|
||||
): Promise<DeliveryTargetResolution> {
|
||||
const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last";
|
||||
@@ -101,11 +101,14 @@ export async function resolveDeliveryTarget(
|
||||
const mode = resolved.mode as "explicit" | "implicit";
|
||||
let toCandidate = resolved.to;
|
||||
|
||||
// When the session has no lastAccountId (e.g. first-run isolated cron
|
||||
// session), fall back to the agent's bound account from bindings config.
|
||||
// This ensures the message tool in isolated sessions resolves the correct
|
||||
// bot token for multi-account setups.
|
||||
let accountId = resolved.accountId;
|
||||
// Prefer an explicit accountId from the job's delivery config (set via
|
||||
// --account on cron add/edit). Fall back to the session's lastAccountId,
|
||||
// then to the agent's bound account from bindings config.
|
||||
const explicitAccountId =
|
||||
typeof jobPayload.accountId === "string" && jobPayload.accountId.trim()
|
||||
? jobPayload.accountId.trim()
|
||||
: undefined;
|
||||
let accountId = explicitAccountId ?? resolved.accountId;
|
||||
if (!accountId && channel) {
|
||||
const bindings = buildChannelAccountBindings(cfg);
|
||||
const byAgent = bindings.get(channel);
|
||||
@@ -115,11 +118,6 @@ export async function resolveDeliveryTarget(
|
||||
}
|
||||
}
|
||||
|
||||
// Explicit delivery account should override inferred session/binding account.
|
||||
if (jobPayload.accountId) {
|
||||
accountId = jobPayload.accountId;
|
||||
}
|
||||
|
||||
// Carry threadId when it was explicitly set (from :topic: parsing or config)
|
||||
// or when delivering to the same recipient as the session's last conversation.
|
||||
// Session-derived threadIds are dropped when the target differs to prevent
|
||||
|
||||
@@ -317,8 +317,8 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
const resolvedDelivery = await resolveDeliveryTarget(cfgWithAgentDefaults, agentId, {
|
||||
channel: deliveryPlan.channel ?? "last",
|
||||
to: deliveryPlan.to,
|
||||
sessionKey: params.job.sessionKey,
|
||||
accountId: deliveryPlan.accountId,
|
||||
sessionKey: params.job.sessionKey,
|
||||
});
|
||||
|
||||
const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now);
|
||||
|
||||
@@ -212,6 +212,51 @@ describe("normalizeCronJobCreate", () => {
|
||||
expect(delivery.to).toBe("7200373102");
|
||||
});
|
||||
|
||||
it("normalizes delivery accountId and strips blanks", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "delivery account",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "hi",
|
||||
},
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-1003816714067",
|
||||
accountId: " coordinator ",
|
||||
},
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const delivery = normalized.delivery as Record<string, unknown>;
|
||||
expect(delivery.accountId).toBe("coordinator");
|
||||
});
|
||||
|
||||
it("strips empty accountId from delivery", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "empty account",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "hi",
|
||||
},
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
accountId: " ",
|
||||
},
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const delivery = normalized.delivery as Record<string, unknown>;
|
||||
expect("accountId" in delivery).toBe(false);
|
||||
});
|
||||
|
||||
it("normalizes webhook delivery mode and target URL", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "webhook delivery",
|
||||
|
||||
@@ -183,6 +183,16 @@ function coerceDelivery(delivery: UnknownRecord) {
|
||||
delete next.to;
|
||||
}
|
||||
}
|
||||
if (typeof delivery.accountId === "string") {
|
||||
const trimmed = delivery.accountId.trim();
|
||||
if (trimmed) {
|
||||
next.accountId = trimmed;
|
||||
} else {
|
||||
delete next.accountId;
|
||||
}
|
||||
} else if ("accountId" in next && typeof next.accountId !== "string") {
|
||||
delete next.accountId;
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
|
||||
@@ -115,6 +115,28 @@ describe("applyJobPatch", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("merges delivery.accountId from patch and preserves existing", () => {
|
||||
const job = createIsolatedAgentTurnJob("job-acct", {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-100123",
|
||||
});
|
||||
|
||||
applyJobPatch(job, { delivery: { mode: "announce", accountId: " coordinator " } });
|
||||
expect(job.delivery?.accountId).toBe("coordinator");
|
||||
expect(job.delivery?.mode).toBe("announce");
|
||||
expect(job.delivery?.to).toBe("-100123");
|
||||
|
||||
// Updating other fields preserves accountId
|
||||
applyJobPatch(job, { delivery: { mode: "announce", to: "-100999" } });
|
||||
expect(job.delivery?.accountId).toBe("coordinator");
|
||||
expect(job.delivery?.to).toBe("-100999");
|
||||
|
||||
// Clearing accountId with empty string
|
||||
applyJobPatch(job, { delivery: { mode: "announce", accountId: "" } });
|
||||
expect(job.delivery?.accountId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("rejects webhook delivery without a valid http(s) target URL", () => {
|
||||
const expectedError = "cron webhook delivery requires delivery.to to be a valid http(s) URL";
|
||||
const cases = [
|
||||
|
||||
@@ -609,6 +609,7 @@ function mergeCronDelivery(
|
||||
mode: existing?.mode ?? "none",
|
||||
channel: existing?.channel,
|
||||
to: existing?.to,
|
||||
accountId: existing?.accountId,
|
||||
bestEffort: existing?.bestEffort,
|
||||
};
|
||||
|
||||
@@ -623,6 +624,10 @@ function mergeCronDelivery(
|
||||
const to = typeof patch.to === "string" ? patch.to.trim() : "";
|
||||
next.to = to ? to : undefined;
|
||||
}
|
||||
if ("accountId" in patch) {
|
||||
const accountId = typeof patch.accountId === "string" ? patch.accountId.trim() : "";
|
||||
next.accountId = accountId ? accountId : undefined;
|
||||
}
|
||||
if (typeof patch.bestEffort === "boolean") {
|
||||
next.bestEffort = patch.bestEffort;
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ export type CronDelivery = {
|
||||
mode: CronDeliveryMode;
|
||||
channel?: CronMessageChannel;
|
||||
to?: string;
|
||||
/** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */
|
||||
accountId?: string;
|
||||
bestEffort?: boolean;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user