security(line): synthesize strict LINE auth boundary hardening

LINE auth boundary hardening synthesis for inbound webhook authn/z/authz:
- account-scoped pairing-store access
- strict DM/group allowlist boundary separation
- fail-closed webhook auth/runtime behavior
- replay and duplicate handling with in-flight continuity for concurrent redeliveries

Source PRs: #26701, #26683, #25978, #17593, #16619, #31990, #26047, #30584, #18777
Related continuity context: #21955

Co-authored-by: bmendonca3 <208517100+bmendonca3@users.noreply.github.com>
Co-authored-by: davidahmann <46606159+davidahmann@users.noreply.github.com>
Co-authored-by: harshang03 <58983401+harshang03@users.noreply.github.com>
Co-authored-by: haosenwang1018 <167664334+haosenwang1018@users.noreply.github.com>
Co-authored-by: liuxiaopai-ai <73659136+liuxiaopai-ai@users.noreply.github.com>
Co-authored-by: coygeek <65363919+coygeek@users.noreply.github.com>
Co-authored-by: lailoo <20536249+lailoo@users.noreply.github.com>
This commit is contained in:
Tak Hoffman
2026-03-03 00:21:15 -06:00
committed by GitHub
parent fe92113472
commit dbccc73d7a
10 changed files with 619 additions and 113 deletions

View File

@@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- LINE/auth boundary hardening synthesis: enforce strict LINE webhook authn/z boundary semantics across pairing-store account scoping, DM/group allowlist separation, fail-closed webhook auth/runtime behavior, and replay/duplication controls (including in-flight replay reservation and post-success dedupe marking). (from #26701, #26683, #25978, #17593, #16619, #31990, #26047, #30584, #18777) Thanks @bmendonca3, @davidahmann, @harshang03, @haosenwang1018, @liuxiaopai-ai, @coygeek, and @Takhoffman.
- LINE/media download synthesis: fix file-media download handling and M4A audio classification across overlapping LINE regressions. (from #26386, #27761, #27787, #29509, #29755, #29776, #29785, #32240) Thanks @kevinWangSheng, @loiie45e, @carrotRakko, @Sid-Qin, @codeafridi, and @bmendonca3.
- LINE/context and routing synthesis: fix group/room peer routing and command-authorization context propagation, and keep processing later events in mixed-success webhook batches. (from #21955, #24475, #27035, #28286) Thanks @lailoo, @mcaxtr, @jervyclaw, @Glucksberg, and @Takhoffman.
- LINE/status/config/webhook synthesis: fix status false positives from snapshot/config state and accept LINE webhook HEAD probes for compatibility. (from #10487, #25726, #27537, #27908, #31387) Thanks @BlueBirdBack, @stakeswky, @loiie45e, @puritysb, and @mcaxtr.

View File

@@ -1,4 +1,4 @@
import type { MessageEvent } from "@line/bot-sdk";
import type { MessageEvent, PostbackEvent } from "@line/bot-sdk";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
// Avoid pulling in globals/pairing/media dependencies; this suite only asserts
@@ -16,15 +16,10 @@ vi.mock("../pairing/pairing-messages.js", () => ({
buildPairingReply: () => "pairing-reply",
}));
const { downloadLineMediaMock } = vi.hoisted(() => ({
downloadLineMediaMock: vi.fn(async () => ({
path: "/tmp/line-media-file.pdf",
contentType: "application/pdf",
})),
}));
vi.mock("./download.js", () => ({
downloadLineMedia: downloadLineMediaMock,
downloadLineMedia: async () => {
throw new Error("downloadLineMedia should not be called from bot-handlers tests");
},
}));
vi.mock("./send.js", () => ({
@@ -44,7 +39,7 @@ const { buildLineMessageContextMock, buildLinePostbackContextMock } = vi.hoisted
isGroup: true,
accountId: "default",
})),
buildLinePostbackContextMock: vi.fn(async () => null),
buildLinePostbackContextMock: vi.fn(async () => null as unknown),
}));
vi.mock("./bot-message-context.js", () => ({
@@ -69,6 +64,7 @@ const { readAllowFromStoreMock, upsertPairingRequestMock } = vi.hoisted(() => ({
}));
let handleLineWebhookEvents: typeof import("./bot-handlers.js").handleLineWebhookEvents;
let createLineWebhookReplayCache: typeof import("./bot-handlers.js").createLineWebhookReplayCache;
const createRuntime = () => ({ log: vi.fn(), error: vi.fn(), exit: vi.fn() });
@@ -79,13 +75,12 @@ vi.mock("../pairing/pairing-store.js", () => ({
describe("handleLineWebhookEvents", () => {
beforeAll(async () => {
({ handleLineWebhookEvents } = await import("./bot-handlers.js"));
({ handleLineWebhookEvents, createLineWebhookReplayCache } = await import("./bot-handlers.js"));
});
beforeEach(() => {
buildLineMessageContextMock.mockClear();
buildLinePostbackContextMock.mockClear();
downloadLineMediaMock.mockClear();
readAllowFromStoreMock.mockClear();
upsertPairingRequestMock.mockClear();
});
@@ -188,23 +183,23 @@ describe("handleLineWebhookEvents", () => {
expect(processMessage).toHaveBeenCalledTimes(1);
});
it("blocks group sender that is only present in pairing-store allowlist", async () => {
it("blocks group sender not in groupAllowFrom even when sender is paired in DM store", async () => {
readAllowFromStoreMock.mockResolvedValueOnce(["user-store"]);
const processMessage = vi.fn();
readAllowFromStoreMock.mockResolvedValueOnce(["user-paired"]);
const event = {
type: "message",
message: { id: "m3b", type: "text", text: "hi" },
message: { id: "m5", type: "text", text: "hi" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "group", groupId: "group-1", userId: "user-paired" },
source: { type: "group", groupId: "group-1", userId: "user-store" },
mode: "active",
webhookEventId: "evt-3b",
webhookEventId: "evt-5",
deliveryContext: { isRedelivery: false },
} as MessageEvent;
await handleLineWebhookEvents([event], {
cfg: {
channels: { line: { groupPolicy: "allowlist", groupAllowFrom: ["user-owner"] } },
channels: { line: { groupPolicy: "allowlist", groupAllowFrom: ["user-group"] } },
},
account: {
accountId: "default",
@@ -212,15 +207,54 @@ describe("handleLineWebhookEvents", () => {
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: { groupPolicy: "allowlist", groupAllowFrom: ["user-owner"] },
config: { groupPolicy: "allowlist", groupAllowFrom: ["user-group"] },
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
});
expect(buildLineMessageContextMock).not.toHaveBeenCalled();
expect(processMessage).not.toHaveBeenCalled();
expect(buildLineMessageContextMock).not.toHaveBeenCalled();
expect(readAllowFromStoreMock).toHaveBeenCalledWith("line", undefined, "default");
});
it("does not authorize group messages from DM pairing-store entries when group allowlist is empty", async () => {
readAllowFromStoreMock.mockResolvedValueOnce(["user-5"]);
const processMessage = vi.fn();
const event = {
type: "message",
message: { id: "m5b", type: "text", text: "hi" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "group", groupId: "group-1", userId: "user-5" },
mode: "active",
webhookEventId: "evt-5b",
deliveryContext: { isRedelivery: false },
} as MessageEvent;
await handleLineWebhookEvents([event], {
cfg: { channels: { line: { groupPolicy: "allowlist" } } },
account: {
accountId: "default",
enabled: true,
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: {
dmPolicy: "pairing",
allowFrom: [],
groupPolicy: "allowlist",
groupAllowFrom: [],
},
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
});
expect(processMessage).not.toHaveBeenCalled();
expect(buildLineMessageContextMock).not.toHaveBeenCalled();
});
it("blocks group messages when wildcard group config disables groups", async () => {
@@ -255,21 +289,286 @@ describe("handleLineWebhookEvents", () => {
expect(buildLineMessageContextMock).not.toHaveBeenCalled();
});
it("downloads file attachments and forwards media refs to message context", async () => {
it("scopes DM pairing requests to accountId", async () => {
const processMessage = vi.fn();
const event = {
type: "message",
message: { id: "mf-1", type: "file", fileName: "doc.pdf", fileSize: "42" },
message: { id: "m5", type: "text", text: "hi" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "user", userId: "user-file" },
source: { type: "user", userId: "user-5" },
mode: "active",
webhookEventId: "evt-file-1",
webhookEventId: "evt-5",
deliveryContext: { isRedelivery: false },
} as MessageEvent;
await handleLineWebhookEvents([event], {
cfg: { channels: { line: {} } },
cfg: { channels: { line: { dmPolicy: "pairing" } } },
account: {
accountId: "default",
enabled: true,
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: { dmPolicy: "pairing", allowFrom: ["user-owner"] },
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
});
expect(processMessage).not.toHaveBeenCalled();
expect(upsertPairingRequestMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "line",
id: "user-5",
accountId: "default",
}),
);
});
it("does not authorize DM senders from another account's pairing-store entries", async () => {
const processMessage = vi.fn();
readAllowFromStoreMock.mockImplementation(async (...args: unknown[]) => {
const accountId = args[2] as string | undefined;
if (accountId === "work") {
return [];
}
return ["cross-account-user"];
});
upsertPairingRequestMock.mockResolvedValue({ code: "CODE", created: false });
const event = {
type: "message",
message: { id: "m6", type: "text", text: "hi" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "user", userId: "cross-account-user" },
mode: "active",
webhookEventId: "evt-6",
deliveryContext: { isRedelivery: false },
} as MessageEvent;
await handleLineWebhookEvents([event], {
cfg: { channels: { line: { dmPolicy: "pairing" } } },
account: {
accountId: "work",
enabled: true,
channelAccessToken: "token-work",
channelSecret: "secret-work",
tokenSource: "config",
config: { dmPolicy: "pairing" },
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
});
expect(readAllowFromStoreMock).toHaveBeenCalledWith("line", undefined, "work");
expect(processMessage).not.toHaveBeenCalled();
expect(upsertPairingRequestMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "line",
id: "cross-account-user",
accountId: "work",
}),
);
});
it("deduplicates replayed webhook events by webhookEventId before processing", async () => {
const processMessage = vi.fn();
const event = {
type: "message",
message: { id: "m-replay", type: "text", text: "hello" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "group", groupId: "group-replay", userId: "user-replay" },
mode: "active",
webhookEventId: "evt-replay-1",
deliveryContext: { isRedelivery: true },
} as MessageEvent;
const context: Parameters<typeof handleLineWebhookEvents>[1] = {
cfg: { channels: { line: { groupPolicy: "open" } } },
account: {
accountId: "default",
enabled: true,
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: { groupPolicy: "open" },
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
replayCache: createLineWebhookReplayCache(),
};
await handleLineWebhookEvents([event], context);
await handleLineWebhookEvents([event], context);
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1);
expect(processMessage).toHaveBeenCalledTimes(1);
});
it("skips concurrent redeliveries while the first event is still processing", async () => {
let resolveFirst: (() => void) | undefined;
const firstDone = new Promise<void>((resolve) => {
resolveFirst = resolve;
});
const processMessage = vi.fn(async () => {
await firstDone;
});
const event = {
type: "message",
message: { id: "m-inflight", type: "text", text: "hello" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "group", groupId: "group-inflight", userId: "user-inflight" },
mode: "active",
webhookEventId: "evt-inflight-1",
deliveryContext: { isRedelivery: true },
} as MessageEvent;
const context: Parameters<typeof handleLineWebhookEvents>[1] = {
cfg: { channels: { line: { groupPolicy: "open" } } },
account: {
accountId: "default",
enabled: true,
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: { groupPolicy: "open" },
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
replayCache: createLineWebhookReplayCache(),
};
const firstRun = handleLineWebhookEvents([event], context);
await Promise.resolve();
const secondRun = handleLineWebhookEvents([event], context);
resolveFirst?.();
await Promise.all([firstRun, secondRun]);
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1);
expect(processMessage).toHaveBeenCalledTimes(1);
});
it("mirrors in-flight replay failures so concurrent duplicates also fail", async () => {
let rejectFirst: ((err: Error) => void) | undefined;
const firstDone = new Promise<void>((_, reject) => {
rejectFirst = reject;
});
const processMessage = vi.fn(async () => {
await firstDone;
});
const event = {
type: "message",
message: { id: "m-inflight-fail", type: "text", text: "hello" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "group", groupId: "group-inflight", userId: "user-inflight" },
mode: "active",
webhookEventId: "evt-inflight-fail-1",
deliveryContext: { isRedelivery: true },
} as MessageEvent;
const context: Parameters<typeof handleLineWebhookEvents>[1] = {
cfg: { channels: { line: { groupPolicy: "open" } } },
account: {
accountId: "default",
enabled: true,
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: { groupPolicy: "open" },
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
replayCache: createLineWebhookReplayCache(),
};
const firstRun = handleLineWebhookEvents([event], context);
await Promise.resolve();
const secondRun = handleLineWebhookEvents([event], context);
rejectFirst?.(new Error("transient inflight failure"));
await expect(firstRun).rejects.toThrow("transient inflight failure");
await expect(secondRun).rejects.toThrow("transient inflight failure");
expect(processMessage).toHaveBeenCalledTimes(1);
});
it("deduplicates redeliveries by LINE message id when webhookEventId changes", async () => {
const processMessage = vi.fn();
const event = {
type: "message",
message: { id: "m-dup-1", type: "text", text: "hello" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "group", groupId: "group-dup", userId: "user-dup" },
mode: "active",
webhookEventId: "evt-dup-1",
deliveryContext: { isRedelivery: false },
} as MessageEvent;
const context: Parameters<typeof handleLineWebhookEvents>[1] = {
cfg: {
channels: { line: { groupPolicy: "allowlist", groupAllowFrom: ["user-dup"] } },
},
account: {
accountId: "default",
enabled: true,
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: { groupPolicy: "allowlist", groupAllowFrom: ["user-dup"] },
},
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
replayCache: createLineWebhookReplayCache(),
};
await handleLineWebhookEvents([event], context);
await handleLineWebhookEvents(
[
{
...event,
webhookEventId: "evt-dup-redelivery",
deliveryContext: { isRedelivery: true },
} as MessageEvent,
],
context,
);
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1);
expect(processMessage).toHaveBeenCalledTimes(1);
});
it("deduplicates postback redeliveries by webhookEventId when replyToken changes", async () => {
const processMessage = vi.fn();
buildLinePostbackContextMock.mockResolvedValue({
ctxPayload: { From: "line:user:user-postback" },
route: { agentId: "default" },
isGroup: false,
accountId: "default",
});
const event = {
type: "postback",
postback: { data: "action=confirm" },
replyToken: "reply-token-1",
timestamp: Date.now(),
source: { type: "user", userId: "user-postback" },
mode: "active",
webhookEventId: "evt-postback-1",
deliveryContext: { isRedelivery: false },
} as PostbackEvent;
const context: Parameters<typeof handleLineWebhookEvents>[1] = {
cfg: { channels: { line: { dmPolicy: "open" } } },
account: {
accountId: "default",
enabled: true,
@@ -279,69 +578,66 @@ describe("handleLineWebhookEvents", () => {
config: { dmPolicy: "open" },
},
runtime: createRuntime(),
mediaMaxBytes: 1234,
mediaMaxBytes: 1,
processMessage,
});
replayCache: createLineWebhookReplayCache(),
};
expect(downloadLineMediaMock).toHaveBeenCalledTimes(1);
expect(downloadLineMediaMock).toHaveBeenCalledWith("mf-1", "token", 1234);
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1);
expect(buildLineMessageContextMock).toHaveBeenCalledWith(
expect.objectContaining({
commandAuthorized: false,
allMedia: [
{
path: "/tmp/line-media-file.pdf",
contentType: "application/pdf",
},
],
}),
await handleLineWebhookEvents([event], context);
await handleLineWebhookEvents(
[
{
...event,
replyToken: "reply-token-2",
deliveryContext: { isRedelivery: true },
} as PostbackEvent,
],
context,
);
expect(buildLinePostbackContextMock).toHaveBeenCalledTimes(1);
expect(processMessage).toHaveBeenCalledTimes(1);
});
it("continues processing later events when one event handler fails", async () => {
const failingEvent = {
it("does not mark replay cache when event processing fails", async () => {
const processMessage = vi
.fn()
.mockRejectedValueOnce(new Error("transient failure"))
.mockResolvedValueOnce(undefined);
const event = {
type: "message",
message: { id: "m-err", type: "text", text: "hi" },
message: { id: "m-fail-then-retry", type: "text", text: "hello" },
replyToken: "reply-token",
timestamp: Date.now(),
source: { type: "user", userId: "user-err" },
source: { type: "group", groupId: "group-retry", userId: "user-retry" },
mode: "active",
webhookEventId: "evt-err",
webhookEventId: "evt-fail-then-retry",
deliveryContext: { isRedelivery: false },
} as MessageEvent;
const laterEvent = {
...failingEvent,
message: { id: "m-later", type: "text", text: "hello" },
webhookEventId: "evt-later",
} as MessageEvent;
const runtime = createRuntime();
let invocation = 0;
const processMessage = vi.fn(async () => {
if (invocation === 0) {
invocation += 1;
throw new Error("boom");
}
invocation += 1;
});
await handleLineWebhookEvents([failingEvent, laterEvent], {
cfg: { channels: { line: {} } },
const context: Parameters<typeof handleLineWebhookEvents>[1] = {
cfg: { channels: { line: { groupPolicy: "open" } } },
account: {
accountId: "default",
enabled: true,
channelAccessToken: "token",
channelSecret: "secret",
tokenSource: "config",
config: { dmPolicy: "open" },
config: { groupPolicy: "open" },
},
runtime,
mediaMaxBytes: 1234,
runtime: createRuntime(),
mediaMaxBytes: 1,
processMessage,
});
replayCache: createLineWebhookReplayCache(),
};
await expect(handleLineWebhookEvents([event], context)).rejects.toThrow("transient failure");
await handleLineWebhookEvents([event], context);
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(2);
expect(processMessage).toHaveBeenCalledTimes(2);
expect(runtime.error).toHaveBeenCalledTimes(1);
expect(context.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("line: event handler failed: Error: transient failure"),
);
});
});

View File

@@ -63,6 +63,148 @@ export interface LineHandlerContext {
runtime: RuntimeEnv;
mediaMaxBytes: number;
processMessage: (ctx: LineInboundContext) => Promise<void>;
replayCache?: LineWebhookReplayCache;
}
const LINE_WEBHOOK_REPLAY_WINDOW_MS = 10 * 60 * 1000;
const LINE_WEBHOOK_REPLAY_MAX_ENTRIES = 4096;
const LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS = 1000;
export type LineWebhookReplayCache = {
seenEvents: Map<string, number>;
inFlightEvents: Map<string, Promise<void>>;
lastPruneAtMs: number;
};
export function createLineWebhookReplayCache(): LineWebhookReplayCache {
return {
seenEvents: new Map<string, number>(),
inFlightEvents: new Map<string, Promise<void>>(),
lastPruneAtMs: 0,
};
}
function pruneLineWebhookReplayCache(cache: LineWebhookReplayCache, nowMs: number): void {
const minSeenAt = nowMs - LINE_WEBHOOK_REPLAY_WINDOW_MS;
for (const [key, seenAt] of cache.seenEvents) {
if (seenAt < minSeenAt) {
cache.seenEvents.delete(key);
}
}
if (cache.seenEvents.size > LINE_WEBHOOK_REPLAY_MAX_ENTRIES) {
const deleteCount = cache.seenEvents.size - LINE_WEBHOOK_REPLAY_MAX_ENTRIES;
let deleted = 0;
for (const key of cache.seenEvents.keys()) {
if (deleted >= deleteCount) {
break;
}
cache.seenEvents.delete(key);
deleted += 1;
}
}
}
function buildLineWebhookReplayKey(
event: WebhookEvent,
accountId: string,
): { key: string; eventId: string } | null {
if (event.type === "message") {
const messageId = event.message?.id?.trim();
if (messageId) {
return {
key: `${accountId}|message:${messageId}`,
eventId: `message:${messageId}`,
};
}
}
const eventId = (event as { webhookEventId?: string }).webhookEventId?.trim();
if (!eventId) {
return null;
}
const source = (
event as {
source?: { type?: string; userId?: string; groupId?: string; roomId?: string };
}
).source;
const sourceId =
source?.type === "group"
? `group:${source.groupId ?? ""}`
: source?.type === "room"
? `room:${source.roomId ?? ""}`
: `user:${source?.userId ?? ""}`;
return { key: `${accountId}|${event.type}|${sourceId}|${eventId}`, eventId: `event:${eventId}` };
}
type LineReplayCandidate = {
key: string;
eventId: string;
seenAtMs: number;
cache: LineWebhookReplayCache;
};
type LineInFlightReplayResult = {
promise: Promise<void>;
resolve: () => void;
reject: (err: unknown) => void;
};
function getLineReplayCandidate(
event: WebhookEvent,
context: LineHandlerContext,
): LineReplayCandidate | null {
const replay = buildLineWebhookReplayKey(event, context.account.accountId);
const cache = context.replayCache;
if (!replay || !cache) {
return null;
}
const nowMs = Date.now();
if (
nowMs - cache.lastPruneAtMs >= LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS ||
cache.seenEvents.size >= LINE_WEBHOOK_REPLAY_MAX_ENTRIES
) {
pruneLineWebhookReplayCache(cache, nowMs);
cache.lastPruneAtMs = nowMs;
}
return { key: replay.key, eventId: replay.eventId, seenAtMs: nowMs, cache };
}
function shouldSkipLineReplayEvent(
candidate: LineReplayCandidate,
): { skip: true; inFlightResult?: Promise<void> } | { skip: false } {
const inFlightResult = candidate.cache.inFlightEvents.get(candidate.key);
if (inFlightResult) {
logVerbose(`line: skipped in-flight replayed webhook event ${candidate.eventId}`);
return { skip: true, inFlightResult };
}
if (candidate.cache.seenEvents.has(candidate.key)) {
logVerbose(`line: skipped replayed webhook event ${candidate.eventId}`);
return { skip: true };
}
return { skip: false };
}
function markLineReplayEventInFlight(candidate: LineReplayCandidate): LineInFlightReplayResult {
let resolve!: () => void;
let reject!: (err: unknown) => void;
const promise = new Promise<void>((resolvePromise, rejectPromise) => {
resolve = resolvePromise;
reject = rejectPromise;
});
// Prevent unhandled rejection warnings when no concurrent duplicate awaits
// this in-flight reservation.
void promise.catch(() => {});
candidate.cache.inFlightEvents.set(candidate.key, promise);
return { promise, resolve, reject };
}
function clearLineReplayEventInFlight(candidate: LineReplayCandidate): void {
candidate.cache.inFlightEvents.delete(candidate.key);
}
function rememberLineReplayEvent(candidate: LineReplayCandidate): void {
candidate.cache.seenEvents.set(candidate.key, candidate.seenAtMs);
}
function resolveLineGroupConfig(params: {
@@ -128,15 +270,11 @@ async function sendLinePairingReply(params: {
}
}
type LineAccessDecision = {
allowed: boolean;
commandAuthorized: boolean;
};
async function shouldProcessLineEvent(
event: MessageEvent | PostbackEvent,
context: LineHandlerContext,
): Promise<LineAccessDecision> {
): Promise<{ allowed: boolean; commandAuthorized: boolean }> {
const denied = { allowed: false, commandAuthorized: false };
const { cfg, account } = context;
const { userId, groupId, roomId, isGroup } = getLineSourceInfo(event.source);
const senderId = userId ?? "";
@@ -144,7 +282,7 @@ async function shouldProcessLineEvent(
const storeAllowFrom = await readChannelAllowFromStore(
"line",
process.env,
undefined,
account.accountId,
).catch(() => []);
const effectiveDmAllow = normalizeDmAllowFromWithStore({
@@ -162,8 +300,8 @@ async function shouldProcessLineEvent(
account.config.groupAllowFrom,
fallbackGroupAllowFrom,
);
// Group authorization stays explicit to group allowlists and must not
// inherit DM pairing-store identities.
// Group sender policy must be derived from explicit group config only.
// Pairing store entries are DM-oriented and must not expand group allowlists.
const effectiveGroupAllow = normalizeAllowFrom(groupAllowFrom);
const defaultGroupPolicy = resolveDefaultGroupPolicy(cfg);
const { groupPolicy, providerMissingFallbackApplied } =
@@ -179,8 +317,6 @@ async function shouldProcessLineEvent(
log: (message) => logVerbose(message),
});
const denied = { allowed: false, commandAuthorized: false };
if (isGroup) {
if (groupConfig?.enabled === false) {
logVerbose(`Blocked line group ${groupId ?? roomId ?? "unknown"} (group disabled)`);
@@ -214,8 +350,6 @@ async function shouldProcessLineEvent(
return denied;
}
}
// Resolve command authorization using the same pattern as Telegram/Discord/Slack.
const allowForCommands = effectiveGroupAllow;
const senderAllowedForCommands = isSenderAllowed({ allow: allowForCommands, senderId });
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
@@ -252,7 +386,6 @@ async function shouldProcessLineEvent(
return denied;
}
// Resolve command authorization for DMs.
const allowForCommands = effectiveDmAllow;
const senderAllowedForCommands = isSenderAllowed({ allow: allowForCommands, senderId });
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
@@ -266,7 +399,6 @@ async function shouldProcessLineEvent(
return { allowed: true, commandAuthorized: commandGate.commandAuthorized };
}
/** Extract raw text from a LINE message or postback event for command detection. */
function resolveEventRawText(event: MessageEvent | PostbackEvent): string {
if (event.type === "message") {
const msg = event.message;
@@ -382,7 +514,24 @@ export async function handleLineWebhookEvents(
events: WebhookEvent[],
context: LineHandlerContext,
): Promise<void> {
let firstError: unknown;
for (const event of events) {
const replayCandidate = getLineReplayCandidate(event, context);
const replaySkip = replayCandidate ? shouldSkipLineReplayEvent(replayCandidate) : null;
if (replaySkip?.skip) {
if (replaySkip.inFlightResult) {
try {
await replaySkip.inFlightResult;
} catch (err) {
context.runtime.error?.(danger(`line: replayed in-flight event failed: ${String(err)}`));
firstError ??= err;
}
}
continue;
}
const inFlightReservation = replayCandidate
? markLineReplayEventInFlight(replayCandidate)
: null;
try {
switch (event.type) {
case "message":
@@ -406,11 +555,21 @@ export async function handleLineWebhookEvents(
default:
logVerbose(`line: unhandled event type: ${(event as WebhookEvent).type}`);
}
if (replayCandidate) {
rememberLineReplayEvent(replayCandidate);
inFlightReservation?.resolve();
clearLineReplayEventInFlight(replayCandidate);
}
} catch (err) {
if (replayCandidate) {
inFlightReservation?.reject(err);
clearLineReplayEventInFlight(replayCandidate);
}
context.runtime.error?.(danger(`line: event handler failed: ${String(err)}`));
// Continue processing remaining events in this batch. Webhook ACK is sent
// before processing, so dropping later events here would make them unrecoverable.
continue;
firstError ??= err;
}
}
if (firstError) {
throw firstError;
}
}

View File

@@ -114,6 +114,26 @@ describe("buildLineMessageContext", () => {
expect(context?.ctxPayload.To).toBe("line:room:room-1");
});
it("keeps non-text message contexts fail-closed for command auth", async () => {
const event = createMessageEvent(
{ type: "user", userId: "user-audio" },
{
message: { id: "audio-1", type: "audio", duration: 1000 } as MessageEvent["message"],
},
);
const context = await buildLineMessageContext({
event,
allMedia: [],
cfg,
account,
commandAuthorized: false,
});
expect(context).not.toBeNull();
expect(context?.ctxPayload.CommandAuthorized).toBe(false);
});
it("sets CommandAuthorized=true when authorized", async () => {
const event = createMessageEvent({ type: "user", userId: "user-auth" });

View File

@@ -5,7 +5,7 @@ import { loadConfig } from "../config/config.js";
import { logVerbose } from "../globals.js";
import { createNonExitingRuntime, type RuntimeEnv } from "../runtime.js";
import { resolveLineAccount } from "./accounts.js";
import { handleLineWebhookEvents } from "./bot-handlers.js";
import { createLineWebhookReplayCache, handleLineWebhookEvents } from "./bot-handlers.js";
import type { LineInboundContext } from "./bot-message-context.js";
import type { ResolvedLineAccount } from "./types.js";
import { startLineWebhook } from "./webhook.js";
@@ -41,6 +41,7 @@ export function createLineBot(opts: LineBotOptions): LineBot {
(async () => {
logVerbose("line: no message handler configured");
});
const replayCache = createLineWebhookReplayCache();
const handleWebhook = async (body: WebhookRequestBody): Promise<void> => {
if (!body.events || body.events.length === 0) {
@@ -53,6 +54,7 @@ export function createLineBot(opts: LineBotOptions): LineBot {
runtime,
mediaMaxBytes,
processMessage,
replayCache,
});
};

View File

@@ -195,7 +195,7 @@ describe("createLineNodeWebhookHandler", () => {
);
});
it("returns 200 immediately and logs when event processing fails", async () => {
it("returns 500 when event processing fails and does not acknowledge with 200", async () => {
const rawBody = JSON.stringify({ events: [{ type: "message" }] });
const { secret } = createPostWebhookTestHarness(rawBody);
const failingBot = {
@@ -213,10 +213,9 @@ describe("createLineNodeWebhookHandler", () => {
const { res } = createRes();
await runSignedPost({ handler: failingHandler, rawBody, secret, res });
await Promise.resolve();
expect(res.statusCode).toBe(200);
expect(res.body).toBe(JSON.stringify({ status: "ok" }));
expect(res.statusCode).toBe(500);
expect(res.body).toBe(JSON.stringify({ error: "Internal server error" }));
expect(failingBot.handleWebhook).toHaveBeenCalledTimes(1);
expect(runtime.error).toHaveBeenCalledTimes(1);
});

View File

@@ -111,16 +111,14 @@ export function createLineNodeWebhookHandler(params: {
return;
}
if (body.events && body.events.length > 0) {
logVerbose(`line: received ${body.events.length} webhook events`);
await params.bot.handleWebhook(body);
}
res.statusCode = 200;
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ status: "ok" }));
if (body.events && body.events.length > 0) {
logVerbose(`line: received ${body.events.length} webhook events`);
void params.bot.handleWebhook(body).catch((err) => {
params.runtime.error?.(danger(`line webhook handler failed: ${String(err)}`));
});
}
} catch (err) {
if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) {
res.statusCode = 413;

View File

@@ -1,7 +1,7 @@
import crypto from "node:crypto";
import type { WebhookRequestBody } from "@line/bot-sdk";
import { describe, expect, it, vi } from "vitest";
import { createLineWebhookMiddleware } from "./webhook.js";
import { createLineWebhookMiddleware, startLineWebhook } from "./webhook.js";
const sign = (body: string, secret: string) =>
crypto.createHmac("SHA256", secret).update(body).digest("base64");
@@ -54,6 +54,15 @@ async function invokeWebhook(params: {
}
describe("createLineWebhookMiddleware", () => {
it("rejects startup when channel secret is missing", () => {
expect(() =>
startLineWebhook({
channelSecret: " ",
onEvents: async () => {},
}),
).toThrow(/requires a non-empty channel secret/i);
});
it.each([
["raw string body", JSON.stringify({ events: [{ type: "message" }] })],
["raw buffer body", Buffer.from(JSON.stringify({ events: [{ type: "follow" }] }), "utf-8")],
@@ -112,17 +121,31 @@ describe("createLineWebhookMiddleware", () => {
expect(onEvents).not.toHaveBeenCalled();
});
it("returns 200 immediately when onEvents fails", async () => {
const { res, onEvents } = await invokeWebhook({
body: JSON.stringify({ events: [{ type: "message" }] }),
onEvents: vi.fn(async () => {
throw new Error("transient failure");
}),
it("returns 500 when event processing fails and does not acknowledge with 200", async () => {
const onEvents = vi.fn(async () => {
throw new Error("boom");
});
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
const rawBody = JSON.stringify({ events: [{ type: "message" }] });
const middleware = createLineWebhookMiddleware({
channelSecret: SECRET,
onEvents,
runtime,
});
await Promise.resolve();
expect(onEvents).toHaveBeenCalledTimes(1);
expect(res.status).toHaveBeenCalledWith(200);
expect(res.json).toHaveBeenCalledWith({ status: "ok" });
const req = {
headers: { "x-line-signature": sign(rawBody, SECRET) },
body: rawBody,
// oxlint-disable-next-line typescript/no-explicit-any
} as any;
const res = createRes();
// oxlint-disable-next-line typescript/no-explicit-any
await middleware(req, res, {} as any);
expect(res.status).toHaveBeenCalledWith(500);
expect(res.status).not.toHaveBeenCalledWith(200);
expect(res.json).toHaveBeenCalledWith({ error: "Internal server error" });
expect(runtime.error).toHaveBeenCalled();
});
});

View File

@@ -71,14 +71,12 @@ export function createLineWebhookMiddleware(
return;
}
res.status(200).json({ status: "ok" });
if (body.events && body.events.length > 0) {
logVerbose(`line: received ${body.events.length} webhook events`);
void onEvents(body).catch((err) => {
runtime?.error?.(danger(`line webhook handler failed: ${String(err)}`));
});
await onEvents(body);
}
res.status(200).json({ status: "ok" });
} catch (err) {
runtime?.error?.(danger(`line webhook error: ${String(err)}`));
if (!res.headersSent) {
@@ -99,9 +97,17 @@ export function startLineWebhook(options: StartLineWebhookOptions): {
path: string;
handler: (req: Request, res: Response, _next: NextFunction) => Promise<void>;
} {
const channelSecret =
typeof options.channelSecret === "string" ? options.channelSecret.trim() : "";
if (!channelSecret) {
throw new Error(
"LINE webhook mode requires a non-empty channel secret. " +
"Set channels.line.channelSecret in your config.",
);
}
const path = options.path ?? "/line/webhook";
const middleware = createLineWebhookMiddleware({
channelSecret: options.channelSecret,
channelSecret,
onEvents: options.onEvents,
runtime: options.runtime,
});

View File

@@ -307,6 +307,7 @@ describe("applyMediaUnderstanding", () => {
const ctx = await createAudioCtx({
body: "<media:audio> /capture status",
});
ctx.CommandAuthorized = false;
const result = await applyMediaUnderstanding({
ctx,
cfg: createGroqAudioConfig(),
@@ -320,6 +321,7 @@ describe("applyMediaUnderstanding", () => {
body: "[Audio]\nUser text:\n/capture status\nTranscript:\ntranscribed text",
commandBody: "/capture status",
});
expect(ctx.CommandAuthorized).toBe(false);
});
it("handles URL-only attachments for audio transcription", async () => {