mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 20:37:27 +00:00
feat: detect stale Slack sockets and auto-restart (#30153)
* feat: detect stale Slack sockets and auto-restart
Slack Socket Mode connections can silently stop delivering events while
still appearing connected (health checks pass, WebSocket stays open).
This "half-dead socket" problem causes messages to go unanswered.
This commit adds two layers of protection:
1. **Event liveness tracking**: Every inbound Slack event (messages,
reactions, member joins/leaves, channel events, pins) now calls
`setStatus({ lastEventAt, lastInboundAt })` to update the channel
account snapshot with the timestamp of the last received event.
2. **Health monitor stale socket detection**: The channel health monitor
now checks `lastEventAt` against a configurable threshold (default
30 minutes). If a channel has been running longer than the threshold
and hasn't received any events in that window, it is flagged as
unhealthy and automatically restarted — the same way disconnected
or crashed channels are already handled.
The restart reason is logged as "stale-socket" for observability, and
the existing cooldown/rate-limit logic (3 restarts/hour max) prevents
restart storms.
* Slack: gate liveness tracking to accepted events
---------
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -427,6 +427,8 @@ export const slackPlugin: ChannelPlugin<ResolvedSlackAccount> = {
|
||||
abortSignal: ctx.abortSignal,
|
||||
mediaMaxMb: account.config.mediaMaxMb,
|
||||
slashCommand: account.config.slashCommand,
|
||||
setStatus: ctx.setStatus as (next: Record<string, unknown>) => void,
|
||||
getStatus: ctx.getStatus as () => Record<string, unknown>,
|
||||
});
|
||||
},
|
||||
},
|
||||
|
||||
@@ -306,4 +306,110 @@ describe("channel-health-monitor", () => {
|
||||
expect(manager.stopChannel).not.toHaveBeenCalled();
|
||||
monitor.stop();
|
||||
});
|
||||
|
||||
describe("stale socket detection", () => {
|
||||
const STALE_THRESHOLD = 30 * 60_000;
|
||||
|
||||
it("restarts a channel with no events past the stale threshold", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSnapshotManager({
|
||||
slack: {
|
||||
default: {
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: now - STALE_THRESHOLD - 30_000,
|
||||
},
|
||||
},
|
||||
});
|
||||
const monitor = await startAndRunCheck(manager);
|
||||
expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default");
|
||||
expect(manager.startChannel).toHaveBeenCalledWith("slack", "default");
|
||||
monitor.stop();
|
||||
});
|
||||
|
||||
it("skips channels with recent events", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSnapshotManager({
|
||||
slack: {
|
||||
default: {
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: now - 5_000,
|
||||
},
|
||||
},
|
||||
});
|
||||
const monitor = await startAndRunCheck(manager);
|
||||
expect(manager.stopChannel).not.toHaveBeenCalled();
|
||||
expect(manager.startChannel).not.toHaveBeenCalled();
|
||||
monitor.stop();
|
||||
});
|
||||
|
||||
it("skips channels still within the startup grace window for stale detection", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSnapshotManager({
|
||||
slack: {
|
||||
default: {
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: now - 5_000,
|
||||
lastEventAt: null,
|
||||
},
|
||||
},
|
||||
});
|
||||
const monitor = await startAndRunCheck(manager);
|
||||
expect(manager.stopChannel).not.toHaveBeenCalled();
|
||||
expect(manager.startChannel).not.toHaveBeenCalled();
|
||||
monitor.stop();
|
||||
});
|
||||
|
||||
it("restarts a channel that never received any event past the stale threshold", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSnapshotManager({
|
||||
slack: {
|
||||
default: {
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
},
|
||||
},
|
||||
});
|
||||
const monitor = await startAndRunCheck(manager);
|
||||
expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default");
|
||||
expect(manager.startChannel).toHaveBeenCalledWith("slack", "default");
|
||||
monitor.stop();
|
||||
});
|
||||
|
||||
it("respects custom staleEventThresholdMs", async () => {
|
||||
const customThreshold = 10 * 60_000;
|
||||
const now = Date.now();
|
||||
const manager = createSnapshotManager({
|
||||
slack: {
|
||||
default: {
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: now - customThreshold - 60_000,
|
||||
lastEventAt: now - customThreshold - 30_000,
|
||||
},
|
||||
},
|
||||
});
|
||||
const monitor = await startAndRunCheck(manager, {
|
||||
staleEventThresholdMs: customThreshold,
|
||||
});
|
||||
expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default");
|
||||
expect(manager.startChannel).toHaveBeenCalledWith("slack", "default");
|
||||
monitor.stop();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -10,12 +10,21 @@ const DEFAULT_COOLDOWN_CYCLES = 2;
|
||||
const DEFAULT_MAX_RESTARTS_PER_HOUR = 3;
|
||||
const ONE_HOUR_MS = 60 * 60_000;
|
||||
|
||||
/**
|
||||
* How long a connected channel can go without receiving any event before
|
||||
* the health monitor treats it as a "stale socket" and triggers a restart.
|
||||
* This catches the half-dead WebSocket scenario where the connection appears
|
||||
* alive (health checks pass) but Slack silently stops delivering events.
|
||||
*/
|
||||
const DEFAULT_STALE_EVENT_THRESHOLD_MS = 30 * 60_000;
|
||||
|
||||
export type ChannelHealthMonitorDeps = {
|
||||
channelManager: ChannelManager;
|
||||
checkIntervalMs?: number;
|
||||
startupGraceMs?: number;
|
||||
cooldownCycles?: number;
|
||||
maxRestartsPerHour?: number;
|
||||
staleEventThresholdMs?: number;
|
||||
abortSignal?: AbortSignal;
|
||||
};
|
||||
|
||||
@@ -32,12 +41,17 @@ function isManagedAccount(snapshot: { enabled?: boolean; configured?: boolean })
|
||||
return snapshot.enabled !== false && snapshot.configured !== false;
|
||||
}
|
||||
|
||||
function isChannelHealthy(snapshot: {
|
||||
running?: boolean;
|
||||
connected?: boolean;
|
||||
enabled?: boolean;
|
||||
configured?: boolean;
|
||||
}): boolean {
|
||||
function isChannelHealthy(
|
||||
snapshot: {
|
||||
running?: boolean;
|
||||
connected?: boolean;
|
||||
enabled?: boolean;
|
||||
configured?: boolean;
|
||||
lastEventAt?: number | null;
|
||||
lastStartAt?: number | null;
|
||||
},
|
||||
opts: { now: number; staleEventThresholdMs: number },
|
||||
): boolean {
|
||||
if (!isManagedAccount(snapshot)) {
|
||||
return true;
|
||||
}
|
||||
@@ -47,6 +61,22 @@ function isChannelHealthy(snapshot: {
|
||||
if (snapshot.connected === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Stale socket detection: if the channel has been running long enough
|
||||
// (past the stale threshold) and we have never received an event, or the
|
||||
// last event was received longer ago than the threshold, treat as unhealthy.
|
||||
if (snapshot.lastEventAt != null || snapshot.lastStartAt != null) {
|
||||
const upSince = snapshot.lastStartAt ?? 0;
|
||||
const upDuration = opts.now - upSince;
|
||||
if (upDuration > opts.staleEventThresholdMs) {
|
||||
const lastEvent = snapshot.lastEventAt ?? 0;
|
||||
const eventAge = opts.now - lastEvent;
|
||||
if (eventAge > opts.staleEventThresholdMs) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -57,6 +87,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
|
||||
startupGraceMs = DEFAULT_STARTUP_GRACE_MS,
|
||||
cooldownCycles = DEFAULT_COOLDOWN_CYCLES,
|
||||
maxRestartsPerHour = DEFAULT_MAX_RESTARTS_PER_HOUR,
|
||||
staleEventThresholdMs = DEFAULT_STALE_EVENT_THRESHOLD_MS,
|
||||
abortSignal,
|
||||
} = deps;
|
||||
|
||||
@@ -101,7 +132,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
|
||||
if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) {
|
||||
continue;
|
||||
}
|
||||
if (isChannelHealthy(status)) {
|
||||
if (isChannelHealthy(status, { now, staleEventThresholdMs })) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -123,11 +154,19 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann
|
||||
continue;
|
||||
}
|
||||
|
||||
const isStaleSocket =
|
||||
status.running &&
|
||||
status.connected !== false &&
|
||||
status.lastEventAt != null &&
|
||||
now - (status.lastEventAt ?? 0) > staleEventThresholdMs;
|
||||
|
||||
const reason = !status.running
|
||||
? status.reconnectAttempts && status.reconnectAttempts >= 10
|
||||
? "gave-up"
|
||||
: "stopped"
|
||||
: "stuck";
|
||||
: isStaleSocket
|
||||
? "stale-socket"
|
||||
: "stuck";
|
||||
|
||||
log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`);
|
||||
|
||||
|
||||
@@ -12,14 +12,16 @@ export function registerSlackMonitorEvents(params: {
|
||||
ctx: SlackMonitorContext;
|
||||
account: ResolvedSlackAccount;
|
||||
handleSlackMessage: SlackMessageHandler;
|
||||
/** Called on each inbound event to update liveness tracking. */
|
||||
trackEvent?: () => void;
|
||||
}) {
|
||||
registerSlackMessageEvents({
|
||||
ctx: params.ctx,
|
||||
handleSlackMessage: params.handleSlackMessage,
|
||||
});
|
||||
registerSlackReactionEvents({ ctx: params.ctx });
|
||||
registerSlackMemberEvents({ ctx: params.ctx });
|
||||
registerSlackChannelEvents({ ctx: params.ctx });
|
||||
registerSlackPinEvents({ ctx: params.ctx });
|
||||
registerSlackReactionEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
|
||||
registerSlackMemberEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
|
||||
registerSlackChannelEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
|
||||
registerSlackPinEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
|
||||
registerSlackInteractionEvents({ ctx: params.ctx });
|
||||
}
|
||||
|
||||
67
src/slack/monitor/events/channels.test.ts
Normal file
67
src/slack/monitor/events/channels.test.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { registerSlackChannelEvents } from "./channels.js";
|
||||
import { createSlackSystemEventTestHarness } from "./system-event-test-harness.js";
|
||||
|
||||
const enqueueSystemEventMock = vi.fn();
|
||||
|
||||
vi.mock("../../../infra/system-events.js", () => ({
|
||||
enqueueSystemEvent: (...args: unknown[]) => enqueueSystemEventMock(...args),
|
||||
}));
|
||||
|
||||
type SlackChannelHandler = (args: {
|
||||
event: Record<string, unknown>;
|
||||
body: unknown;
|
||||
}) => Promise<void>;
|
||||
|
||||
function createChannelContext(params?: {
|
||||
trackEvent?: () => void;
|
||||
shouldDropMismatchedSlackEvent?: (body: unknown) => boolean;
|
||||
}) {
|
||||
const harness = createSlackSystemEventTestHarness();
|
||||
if (params?.shouldDropMismatchedSlackEvent) {
|
||||
harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent;
|
||||
}
|
||||
registerSlackChannelEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent });
|
||||
return {
|
||||
getCreatedHandler: () => harness.getHandler("channel_created") as SlackChannelHandler | null,
|
||||
};
|
||||
}
|
||||
|
||||
describe("registerSlackChannelEvents", () => {
|
||||
it("does not track mismatched events", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getCreatedHandler } = createChannelContext({
|
||||
trackEvent,
|
||||
shouldDropMismatchedSlackEvent: () => true,
|
||||
});
|
||||
const createdHandler = getCreatedHandler();
|
||||
expect(createdHandler).toBeTruthy();
|
||||
|
||||
await createdHandler!({
|
||||
event: {
|
||||
channel: { id: "C1", name: "general" },
|
||||
},
|
||||
body: { api_app_id: "A_OTHER" },
|
||||
});
|
||||
|
||||
expect(trackEvent).not.toHaveBeenCalled();
|
||||
expect(enqueueSystemEventMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("tracks accepted events", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getCreatedHandler } = createChannelContext({ trackEvent });
|
||||
const createdHandler = getCreatedHandler();
|
||||
expect(createdHandler).toBeTruthy();
|
||||
|
||||
await createdHandler!({
|
||||
event: {
|
||||
channel: { id: "C1", name: "general" },
|
||||
},
|
||||
body: {},
|
||||
});
|
||||
|
||||
expect(trackEvent).toHaveBeenCalledTimes(1);
|
||||
expect(enqueueSystemEventMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@@ -12,8 +12,11 @@ import type {
|
||||
SlackChannelRenamedEvent,
|
||||
} from "../types.js";
|
||||
|
||||
export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext }) {
|
||||
const { ctx } = params;
|
||||
export function registerSlackChannelEvents(params: {
|
||||
ctx: SlackMonitorContext;
|
||||
trackEvent?: () => void;
|
||||
}) {
|
||||
const { ctx, trackEvent } = params;
|
||||
|
||||
const enqueueChannelSystemEvent = (params: {
|
||||
kind: "created" | "renamed";
|
||||
@@ -51,6 +54,7 @@ export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext })
|
||||
if (ctx.shouldDropMismatchedSlackEvent(body)) {
|
||||
return;
|
||||
}
|
||||
trackEvent?.();
|
||||
|
||||
const payload = event as SlackChannelCreatedEvent;
|
||||
const channelId = payload.channel?.id;
|
||||
@@ -69,6 +73,7 @@ export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext })
|
||||
if (ctx.shouldDropMismatchedSlackEvent(body)) {
|
||||
return;
|
||||
}
|
||||
trackEvent?.();
|
||||
|
||||
const payload = event as SlackChannelRenamedEvent;
|
||||
const channelId = payload.channel?.id;
|
||||
@@ -87,6 +92,7 @@ export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext })
|
||||
if (ctx.shouldDropMismatchedSlackEvent(body)) {
|
||||
return;
|
||||
}
|
||||
trackEvent?.();
|
||||
|
||||
const payload = event as SlackChannelIdChangedEvent;
|
||||
const oldChannelId = payload.old_channel_id;
|
||||
|
||||
@@ -21,9 +21,16 @@ type SlackMemberHandler = (args: {
|
||||
body: unknown;
|
||||
}) => Promise<void>;
|
||||
|
||||
function createMembersContext(overrides?: SlackSystemEventTestOverrides) {
|
||||
const harness = createSlackSystemEventTestHarness(overrides);
|
||||
registerSlackMemberEvents({ ctx: harness.ctx });
|
||||
function createMembersContext(params?: {
|
||||
overrides?: SlackSystemEventTestOverrides;
|
||||
trackEvent?: () => void;
|
||||
shouldDropMismatchedSlackEvent?: (body: unknown) => boolean;
|
||||
}) {
|
||||
const harness = createSlackSystemEventTestHarness(params?.overrides);
|
||||
if (params?.shouldDropMismatchedSlackEvent) {
|
||||
harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent;
|
||||
}
|
||||
registerSlackMemberEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent });
|
||||
return {
|
||||
getJoinedHandler: () =>
|
||||
harness.getHandler("member_joined_channel") as SlackMemberHandler | null,
|
||||
@@ -44,7 +51,7 @@ describe("registerSlackMemberEvents", () => {
|
||||
it("enqueues DM member events when dmPolicy is open", async () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getJoinedHandler } = createMembersContext({ dmPolicy: "open" });
|
||||
const { getJoinedHandler } = createMembersContext({ overrides: { dmPolicy: "open" } });
|
||||
const joinedHandler = getJoinedHandler();
|
||||
expect(joinedHandler).toBeTruthy();
|
||||
|
||||
@@ -59,7 +66,7 @@ describe("registerSlackMemberEvents", () => {
|
||||
it("blocks DM member events when dmPolicy is disabled", async () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getJoinedHandler } = createMembersContext({ dmPolicy: "disabled" });
|
||||
const { getJoinedHandler } = createMembersContext({ overrides: { dmPolicy: "disabled" } });
|
||||
const joinedHandler = getJoinedHandler();
|
||||
expect(joinedHandler).toBeTruthy();
|
||||
|
||||
@@ -75,8 +82,7 @@ describe("registerSlackMemberEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getJoinedHandler } = createMembersContext({
|
||||
dmPolicy: "allowlist",
|
||||
allowFrom: ["U2"],
|
||||
overrides: { dmPolicy: "allowlist", allowFrom: ["U2"] },
|
||||
});
|
||||
const joinedHandler = getJoinedHandler();
|
||||
expect(joinedHandler).toBeTruthy();
|
||||
@@ -93,8 +99,7 @@ describe("registerSlackMemberEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getLeftHandler } = createMembersContext({
|
||||
dmPolicy: "allowlist",
|
||||
allowFrom: ["U1"],
|
||||
overrides: { dmPolicy: "allowlist", allowFrom: ["U1"] },
|
||||
});
|
||||
const leftHandler = getLeftHandler();
|
||||
expect(leftHandler).toBeTruthy();
|
||||
@@ -114,9 +119,11 @@ describe("registerSlackMemberEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getJoinedHandler } = createMembersContext({
|
||||
dmPolicy: "open",
|
||||
channelType: "channel",
|
||||
channelUsers: ["U_OWNER"],
|
||||
overrides: {
|
||||
dmPolicy: "open",
|
||||
channelType: "channel",
|
||||
channelUsers: ["U_OWNER"],
|
||||
},
|
||||
});
|
||||
const joinedHandler = getJoinedHandler();
|
||||
expect(joinedHandler).toBeTruthy();
|
||||
@@ -128,4 +135,35 @@ describe("registerSlackMemberEvents", () => {
|
||||
|
||||
expect(enqueueSystemEventMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not track mismatched events", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getJoinedHandler } = createMembersContext({
|
||||
trackEvent,
|
||||
shouldDropMismatchedSlackEvent: () => true,
|
||||
});
|
||||
const joinedHandler = getJoinedHandler();
|
||||
expect(joinedHandler).toBeTruthy();
|
||||
|
||||
await joinedHandler!({
|
||||
event: makeMemberEvent(),
|
||||
body: { api_app_id: "A_OTHER" },
|
||||
});
|
||||
|
||||
expect(trackEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("tracks accepted member events", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getJoinedHandler } = createMembersContext({ trackEvent });
|
||||
const joinedHandler = getJoinedHandler();
|
||||
expect(joinedHandler).toBeTruthy();
|
||||
|
||||
await joinedHandler!({
|
||||
event: makeMemberEvent(),
|
||||
body: {},
|
||||
});
|
||||
|
||||
expect(trackEvent).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,8 +5,11 @@ import type { SlackMonitorContext } from "../context.js";
|
||||
import type { SlackMemberChannelEvent } from "../types.js";
|
||||
import { authorizeAndResolveSlackSystemEventContext } from "./system-event-context.js";
|
||||
|
||||
export function registerSlackMemberEvents(params: { ctx: SlackMonitorContext }) {
|
||||
const { ctx } = params;
|
||||
export function registerSlackMemberEvents(params: {
|
||||
ctx: SlackMonitorContext;
|
||||
trackEvent?: () => void;
|
||||
}) {
|
||||
const { ctx, trackEvent } = params;
|
||||
|
||||
const handleMemberChannelEvent = async (params: {
|
||||
verb: "joined" | "left";
|
||||
@@ -17,6 +20,7 @@ export function registerSlackMemberEvents(params: { ctx: SlackMonitorContext })
|
||||
if (ctx.shouldDropMismatchedSlackEvent(params.body)) {
|
||||
return;
|
||||
}
|
||||
trackEvent?.();
|
||||
const payload = params.event;
|
||||
const channelId = payload.channel;
|
||||
const channelInfo = channelId ? await ctx.resolveChannelName(channelId) : {};
|
||||
|
||||
@@ -18,9 +18,16 @@ vi.mock("../../../pairing/pairing-store.js", () => ({
|
||||
|
||||
type SlackPinHandler = (args: { event: Record<string, unknown>; body: unknown }) => Promise<void>;
|
||||
|
||||
function createPinContext(overrides?: SlackSystemEventTestOverrides) {
|
||||
const harness = createSlackSystemEventTestHarness(overrides);
|
||||
registerSlackPinEvents({ ctx: harness.ctx });
|
||||
function createPinContext(params?: {
|
||||
overrides?: SlackSystemEventTestOverrides;
|
||||
trackEvent?: () => void;
|
||||
shouldDropMismatchedSlackEvent?: (body: unknown) => boolean;
|
||||
}) {
|
||||
const harness = createSlackSystemEventTestHarness(params?.overrides);
|
||||
if (params?.shouldDropMismatchedSlackEvent) {
|
||||
harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent;
|
||||
}
|
||||
registerSlackPinEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent });
|
||||
return {
|
||||
getAddedHandler: () => harness.getHandler("pin_added") as SlackPinHandler | null,
|
||||
getRemovedHandler: () => harness.getHandler("pin_removed") as SlackPinHandler | null,
|
||||
@@ -46,7 +53,7 @@ describe("registerSlackPinEvents", () => {
|
||||
it("enqueues DM pin system events when dmPolicy is open", async () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createPinContext({ dmPolicy: "open" });
|
||||
const { getAddedHandler } = createPinContext({ overrides: { dmPolicy: "open" } });
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
@@ -61,7 +68,7 @@ describe("registerSlackPinEvents", () => {
|
||||
it("blocks DM pin system events when dmPolicy is disabled", async () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createPinContext({ dmPolicy: "disabled" });
|
||||
const { getAddedHandler } = createPinContext({ overrides: { dmPolicy: "disabled" } });
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
@@ -77,8 +84,7 @@ describe("registerSlackPinEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createPinContext({
|
||||
dmPolicy: "allowlist",
|
||||
allowFrom: ["U2"],
|
||||
overrides: { dmPolicy: "allowlist", allowFrom: ["U2"] },
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
@@ -95,8 +101,7 @@ describe("registerSlackPinEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createPinContext({
|
||||
dmPolicy: "allowlist",
|
||||
allowFrom: ["U1"],
|
||||
overrides: { dmPolicy: "allowlist", allowFrom: ["U1"] },
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
@@ -113,9 +118,11 @@ describe("registerSlackPinEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createPinContext({
|
||||
dmPolicy: "open",
|
||||
channelType: "channel",
|
||||
channelUsers: ["U_OWNER"],
|
||||
overrides: {
|
||||
dmPolicy: "open",
|
||||
channelType: "channel",
|
||||
channelUsers: ["U_OWNER"],
|
||||
},
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
@@ -127,4 +134,35 @@ describe("registerSlackPinEvents", () => {
|
||||
|
||||
expect(enqueueSystemEventMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not track mismatched events", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getAddedHandler } = createPinContext({
|
||||
trackEvent,
|
||||
shouldDropMismatchedSlackEvent: () => true,
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
await addedHandler!({
|
||||
event: makePinEvent(),
|
||||
body: { api_app_id: "A_OTHER" },
|
||||
});
|
||||
|
||||
expect(trackEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("tracks accepted pin events", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getAddedHandler } = createPinContext({ trackEvent });
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
await addedHandler!({
|
||||
event: makePinEvent(),
|
||||
body: {},
|
||||
});
|
||||
|
||||
expect(trackEvent).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -7,18 +7,20 @@ import { authorizeAndResolveSlackSystemEventContext } from "./system-event-conte
|
||||
|
||||
async function handleSlackPinEvent(params: {
|
||||
ctx: SlackMonitorContext;
|
||||
trackEvent?: () => void;
|
||||
body: unknown;
|
||||
event: unknown;
|
||||
action: "pinned" | "unpinned";
|
||||
contextKeySuffix: "added" | "removed";
|
||||
errorLabel: string;
|
||||
}): Promise<void> {
|
||||
const { ctx, body, event, action, contextKeySuffix, errorLabel } = params;
|
||||
const { ctx, trackEvent, body, event, action, contextKeySuffix, errorLabel } = params;
|
||||
|
||||
try {
|
||||
if (ctx.shouldDropMismatchedSlackEvent(body)) {
|
||||
return;
|
||||
}
|
||||
trackEvent?.();
|
||||
|
||||
const payload = event as SlackPinEvent;
|
||||
const channelId = payload.channel_id;
|
||||
@@ -47,12 +49,16 @@ async function handleSlackPinEvent(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export function registerSlackPinEvents(params: { ctx: SlackMonitorContext }) {
|
||||
const { ctx } = params;
|
||||
export function registerSlackPinEvents(params: {
|
||||
ctx: SlackMonitorContext;
|
||||
trackEvent?: () => void;
|
||||
}) {
|
||||
const { ctx, trackEvent } = params;
|
||||
|
||||
ctx.app.event("pin_added", async ({ event, body }: SlackEventMiddlewareArgs<"pin_added">) => {
|
||||
await handleSlackPinEvent({
|
||||
ctx,
|
||||
trackEvent,
|
||||
body,
|
||||
event,
|
||||
action: "pinned",
|
||||
@@ -64,6 +70,7 @@ export function registerSlackPinEvents(params: { ctx: SlackMonitorContext }) {
|
||||
ctx.app.event("pin_removed", async ({ event, body }: SlackEventMiddlewareArgs<"pin_removed">) => {
|
||||
await handleSlackPinEvent({
|
||||
ctx,
|
||||
trackEvent,
|
||||
body,
|
||||
event,
|
||||
action: "unpinned",
|
||||
|
||||
@@ -21,9 +21,16 @@ type SlackReactionHandler = (args: {
|
||||
body: unknown;
|
||||
}) => Promise<void>;
|
||||
|
||||
function createReactionContext(overrides?: SlackSystemEventTestOverrides) {
|
||||
const harness = createSlackSystemEventTestHarness(overrides);
|
||||
registerSlackReactionEvents({ ctx: harness.ctx });
|
||||
function createReactionContext(params?: {
|
||||
overrides?: SlackSystemEventTestOverrides;
|
||||
trackEvent?: () => void;
|
||||
shouldDropMismatchedSlackEvent?: (body: unknown) => boolean;
|
||||
}) {
|
||||
const harness = createSlackSystemEventTestHarness(params?.overrides);
|
||||
if (params?.shouldDropMismatchedSlackEvent) {
|
||||
harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent;
|
||||
}
|
||||
registerSlackReactionEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent });
|
||||
return {
|
||||
getAddedHandler: () => harness.getHandler("reaction_added") as SlackReactionHandler | null,
|
||||
getRemovedHandler: () => harness.getHandler("reaction_removed") as SlackReactionHandler | null,
|
||||
@@ -48,7 +55,7 @@ describe("registerSlackReactionEvents", () => {
|
||||
it("enqueues DM reaction system events when dmPolicy is open", async () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createReactionContext({ dmPolicy: "open" });
|
||||
const { getAddedHandler } = createReactionContext({ overrides: { dmPolicy: "open" } });
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
@@ -63,7 +70,7 @@ describe("registerSlackReactionEvents", () => {
|
||||
it("blocks DM reaction system events when dmPolicy is disabled", async () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createReactionContext({ dmPolicy: "disabled" });
|
||||
const { getAddedHandler } = createReactionContext({ overrides: { dmPolicy: "disabled" } });
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
@@ -79,8 +86,7 @@ describe("registerSlackReactionEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createReactionContext({
|
||||
dmPolicy: "allowlist",
|
||||
allowFrom: ["U2"],
|
||||
overrides: { dmPolicy: "allowlist", allowFrom: ["U2"] },
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
@@ -97,8 +103,7 @@ describe("registerSlackReactionEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createReactionContext({
|
||||
dmPolicy: "allowlist",
|
||||
allowFrom: ["U1"],
|
||||
overrides: { dmPolicy: "allowlist", allowFrom: ["U1"] },
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
@@ -115,8 +120,7 @@ describe("registerSlackReactionEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getRemovedHandler } = createReactionContext({
|
||||
dmPolicy: "disabled",
|
||||
channelType: "channel",
|
||||
overrides: { dmPolicy: "disabled", channelType: "channel" },
|
||||
});
|
||||
const removedHandler = getRemovedHandler();
|
||||
expect(removedHandler).toBeTruthy();
|
||||
@@ -136,9 +140,11 @@ describe("registerSlackReactionEvents", () => {
|
||||
enqueueSystemEventMock.mockClear();
|
||||
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
|
||||
const { getAddedHandler } = createReactionContext({
|
||||
dmPolicy: "open",
|
||||
channelType: "channel",
|
||||
channelUsers: ["U_OWNER"],
|
||||
overrides: {
|
||||
dmPolicy: "open",
|
||||
channelType: "channel",
|
||||
channelUsers: ["U_OWNER"],
|
||||
},
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
@@ -150,4 +156,35 @@ describe("registerSlackReactionEvents", () => {
|
||||
|
||||
expect(enqueueSystemEventMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not track mismatched events", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getAddedHandler } = createReactionContext({
|
||||
trackEvent,
|
||||
shouldDropMismatchedSlackEvent: () => true,
|
||||
});
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
await addedHandler!({
|
||||
event: makeReactionEvent(),
|
||||
body: { api_app_id: "A_OTHER" },
|
||||
});
|
||||
|
||||
expect(trackEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("tracks accepted message reactions", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const { getAddedHandler } = createReactionContext({ trackEvent });
|
||||
const addedHandler = getAddedHandler();
|
||||
expect(addedHandler).toBeTruthy();
|
||||
|
||||
await addedHandler!({
|
||||
event: makeReactionEvent(),
|
||||
body: {},
|
||||
});
|
||||
|
||||
expect(trackEvent).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,8 +5,11 @@ import type { SlackMonitorContext } from "../context.js";
|
||||
import type { SlackReactionEvent } from "../types.js";
|
||||
import { authorizeAndResolveSlackSystemEventContext } from "./system-event-context.js";
|
||||
|
||||
export function registerSlackReactionEvents(params: { ctx: SlackMonitorContext }) {
|
||||
const { ctx } = params;
|
||||
export function registerSlackReactionEvents(params: {
|
||||
ctx: SlackMonitorContext;
|
||||
trackEvent?: () => void;
|
||||
}) {
|
||||
const { ctx, trackEvent } = params;
|
||||
|
||||
const handleReactionEvent = async (event: SlackReactionEvent, action: string) => {
|
||||
try {
|
||||
@@ -14,6 +17,7 @@ export function registerSlackReactionEvents(params: { ctx: SlackMonitorContext }
|
||||
if (!item || item.type !== "message") {
|
||||
return;
|
||||
}
|
||||
trackEvent?.();
|
||||
|
||||
const ingressContext = await authorizeAndResolveSlackSystemEventContext({
|
||||
ctx,
|
||||
|
||||
116
src/slack/monitor/message-handler.test.ts
Normal file
116
src/slack/monitor/message-handler.test.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createSlackMessageHandler } from "./message-handler.js";
|
||||
|
||||
const enqueueMock = vi.fn(async (_entry: unknown) => {});
|
||||
const resolveThreadTsMock = vi.fn(async ({ message }: { message: Record<string, unknown> }) => ({
|
||||
...message,
|
||||
}));
|
||||
|
||||
vi.mock("../../auto-reply/inbound-debounce.js", () => ({
|
||||
resolveInboundDebounceMs: () => 10,
|
||||
createInboundDebouncer: () => ({
|
||||
enqueue: (entry: unknown) => enqueueMock(entry),
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("./thread-resolution.js", () => ({
|
||||
createSlackThreadTsResolver: () => ({
|
||||
resolve: (entry: { message: Record<string, unknown> }) => resolveThreadTsMock(entry),
|
||||
}),
|
||||
}));
|
||||
|
||||
function createContext(overrides?: {
|
||||
markMessageSeen?: (channel: string | undefined, ts: string | undefined) => boolean;
|
||||
}) {
|
||||
return {
|
||||
cfg: {},
|
||||
accountId: "default",
|
||||
app: {
|
||||
client: {},
|
||||
},
|
||||
runtime: {},
|
||||
markMessageSeen: (channel: string | undefined, ts: string | undefined) =>
|
||||
overrides?.markMessageSeen?.(channel, ts) ?? false,
|
||||
} as Parameters<typeof createSlackMessageHandler>[0]["ctx"];
|
||||
}
|
||||
|
||||
describe("createSlackMessageHandler", () => {
|
||||
beforeEach(() => {
|
||||
enqueueMock.mockClear();
|
||||
resolveThreadTsMock.mockClear();
|
||||
});
|
||||
|
||||
it("does not track invalid non-message events from the message stream", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const handler = createSlackMessageHandler({
|
||||
ctx: createContext(),
|
||||
account: { accountId: "default" } as Parameters<
|
||||
typeof createSlackMessageHandler
|
||||
>[0]["account"],
|
||||
trackEvent,
|
||||
});
|
||||
|
||||
await handler(
|
||||
{
|
||||
type: "reaction_added",
|
||||
channel: "D1",
|
||||
ts: "123.456",
|
||||
} as never,
|
||||
{ source: "message" },
|
||||
);
|
||||
|
||||
expect(trackEvent).not.toHaveBeenCalled();
|
||||
expect(resolveThreadTsMock).not.toHaveBeenCalled();
|
||||
expect(enqueueMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not track duplicate messages that are already seen", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const handler = createSlackMessageHandler({
|
||||
ctx: createContext({ markMessageSeen: () => true }),
|
||||
account: { accountId: "default" } as Parameters<
|
||||
typeof createSlackMessageHandler
|
||||
>[0]["account"],
|
||||
trackEvent,
|
||||
});
|
||||
|
||||
await handler(
|
||||
{
|
||||
type: "message",
|
||||
channel: "D1",
|
||||
ts: "123.456",
|
||||
text: "hello",
|
||||
} as never,
|
||||
{ source: "message" },
|
||||
);
|
||||
|
||||
expect(trackEvent).not.toHaveBeenCalled();
|
||||
expect(resolveThreadTsMock).not.toHaveBeenCalled();
|
||||
expect(enqueueMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("tracks accepted non-duplicate messages", async () => {
|
||||
const trackEvent = vi.fn();
|
||||
const handler = createSlackMessageHandler({
|
||||
ctx: createContext(),
|
||||
account: { accountId: "default" } as Parameters<
|
||||
typeof createSlackMessageHandler
|
||||
>[0]["account"],
|
||||
trackEvent,
|
||||
});
|
||||
|
||||
await handler(
|
||||
{
|
||||
type: "message",
|
||||
channel: "D1",
|
||||
ts: "123.456",
|
||||
text: "hello",
|
||||
} as never,
|
||||
{ source: "message" },
|
||||
);
|
||||
|
||||
expect(trackEvent).toHaveBeenCalledTimes(1);
|
||||
expect(resolveThreadTsMock).toHaveBeenCalledTimes(1);
|
||||
expect(enqueueMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@@ -19,8 +19,10 @@ export type SlackMessageHandler = (
|
||||
export function createSlackMessageHandler(params: {
|
||||
ctx: SlackMonitorContext;
|
||||
account: ResolvedSlackAccount;
|
||||
/** Called on each inbound event to update liveness tracking. */
|
||||
trackEvent?: () => void;
|
||||
}): SlackMessageHandler {
|
||||
const { ctx, account } = params;
|
||||
const { ctx, account, trackEvent } = params;
|
||||
const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" });
|
||||
const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client });
|
||||
|
||||
@@ -113,6 +115,7 @@ export function createSlackMessageHandler(params: {
|
||||
if (ctx.markMessageSeen(message.channel, message.ts)) {
|
||||
return;
|
||||
}
|
||||
trackEvent?.();
|
||||
const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source });
|
||||
await debouncer.enqueue({ message: resolvedMessage, opts });
|
||||
};
|
||||
|
||||
@@ -337,9 +337,18 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
removeAckAfterReply,
|
||||
});
|
||||
|
||||
const handleSlackMessage = createSlackMessageHandler({ ctx, account });
|
||||
// Wire up event liveness tracking: update lastEventAt on every inbound event
|
||||
// so the health monitor can detect "half-dead" sockets that pass health checks
|
||||
// but silently stop delivering events.
|
||||
const trackEvent = opts.setStatus
|
||||
? () => {
|
||||
opts.setStatus!({ lastEventAt: Date.now(), lastInboundAt: Date.now() });
|
||||
}
|
||||
: undefined;
|
||||
|
||||
registerSlackMonitorEvents({ ctx, account, handleSlackMessage });
|
||||
const handleSlackMessage = createSlackMessageHandler({ ctx, account, trackEvent });
|
||||
|
||||
registerSlackMonitorEvents({ ctx, account, handleSlackMessage, trackEvent });
|
||||
await registerSlackMonitorSlashCommands({ ctx, account });
|
||||
if (slackMode === "http" && slackHttpHandler) {
|
||||
unregisterHttpHandler = registerSlackHttpHandler({
|
||||
|
||||
@@ -12,6 +12,10 @@ export type MonitorSlackOpts = {
|
||||
abortSignal?: AbortSignal;
|
||||
mediaMaxMb?: number;
|
||||
slashCommand?: SlackSlashCommandConfig;
|
||||
/** Callback to update the channel account status snapshot (e.g. lastEventAt). */
|
||||
setStatus?: (next: Record<string, unknown>) => void;
|
||||
/** Callback to read the current channel account status snapshot. */
|
||||
getStatus?: () => Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type SlackReactionEvent = {
|
||||
|
||||
Reference in New Issue
Block a user