From 0956b599e1f8ae767a43e5106c5a375aa3c518dc Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 2 Mar 2026 19:18:10 +0000 Subject: [PATCH] fix: harden slack debounce key routing and ordering (#31951) (thanks @scoootscooob) --- CHANGELOG.md | 1 + .../message-handler.debounce-key.test.ts | 7 ++ src/slack/monitor/message-handler.test.ts | 37 ++++++++ src/slack/monitor/message-handler.ts | 92 ++++++++++++++++--- 4 files changed, 122 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e886e98e7a..d7ba59321d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Gateway/Heartbeat model reload: treat `models.*` and `agents.defaults.model` config updates as heartbeat hot-reload triggers so heartbeat picks up model changes without a full gateway restart. (#32046) Thanks @stakeswky. +- Slack/inbound debounce routing: isolate top-level non-DM message debounce keys by message timestamp to avoid cross-thread collisions, preserve DM batching, and flush pending top-level buffers before immediate non-debounce follow-ups to keep ordering stable. (#31951) Thanks @scoootscooob. - Memory/LanceDB embeddings: forward configured `embedding.dimensions` into OpenAI embeddings requests so vector size and API output dimensions stay aligned when dimensions are explicitly configured. (#32036) Thanks @scotthuang. - Mentions/Slack formatting hardening: add null-safe guards for runtime text normalization paths so malformed/undefined text payloads do not crash mention stripping or mrkdwn conversion. (#31865) Thanks @stone-jin. - Failover/error classification: treat HTTP `529` (provider overloaded, common with Anthropic-compatible APIs) as `rate_limit` so model failover can engage instead of misclassifying the error path. (#31854) Thanks @bugkill3r. diff --git a/src/slack/monitor/message-handler.debounce-key.test.ts b/src/slack/monitor/message-handler.debounce-key.test.ts index 5b415fd73ab..17c677b4e37 100644 --- a/src/slack/monitor/message-handler.debounce-key.test.ts +++ b/src/slack/monitor/message-handler.debounce-key.test.ts @@ -50,6 +50,13 @@ describe("buildSlackDebounceKey", () => { expect(keyB).toBe("slack:default:C123:1709000000.000200:U456"); }); + it("keeps top-level DMs channel-scoped to preserve short-message batching", () => { + const dmA = makeMessage({ channel: "D123", ts: "1709000000.000100" }); + const dmB = makeMessage({ channel: "D123", ts: "1709000000.000200" }); + expect(buildSlackDebounceKey(dmA, accountId)).toBe("slack:default:D123:U456"); + expect(buildSlackDebounceKey(dmB, accountId)).toBe("slack:default:D123:U456"); + }); + it("falls back to bare channel when no timestamp is available", () => { const msg = makeMessage({ ts: undefined, event_ts: undefined }); expect(buildSlackDebounceKey(msg, accountId)).toBe("slack:default:C123:U456"); diff --git a/src/slack/monitor/message-handler.test.ts b/src/slack/monitor/message-handler.test.ts index c40254ec93d..f19f640ed6e 100644 --- a/src/slack/monitor/message-handler.test.ts +++ b/src/slack/monitor/message-handler.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { createSlackMessageHandler } from "./message-handler.js"; const enqueueMock = vi.fn(async (_entry: unknown) => {}); +const flushKeyMock = vi.fn(async (_key: string) => {}); const resolveThreadTsMock = vi.fn(async ({ message }: { message: Record }) => ({ ...message, })); @@ -10,6 +11,7 @@ vi.mock("../../auto-reply/inbound-debounce.js", () => ({ resolveInboundDebounceMs: () => 10, createInboundDebouncer: () => ({ enqueue: (entry: unknown) => enqueueMock(entry), + flushKey: (key: string) => flushKeyMock(key), }), })); @@ -37,6 +39,7 @@ function createContext(overrides?: { describe("createSlackMessageHandler", () => { beforeEach(() => { enqueueMock.mockClear(); + flushKeyMock.mockClear(); resolveThreadTsMock.mockClear(); }); @@ -113,4 +116,38 @@ describe("createSlackMessageHandler", () => { expect(resolveThreadTsMock).toHaveBeenCalledTimes(1); expect(enqueueMock).toHaveBeenCalledTimes(1); }); + + it("flushes pending top-level buffered keys before immediate non-debounce follow-ups", async () => { + const handler = createSlackMessageHandler({ + ctx: createContext(), + account: { accountId: "default" } as Parameters< + typeof createSlackMessageHandler + >[0]["account"], + }); + + await handler( + { + type: "message", + channel: "C111", + user: "U111", + ts: "1709000000.000100", + text: "first buffered text", + } as never, + { source: "message" }, + ); + await handler( + { + type: "message", + subtype: "file_share", + channel: "C111", + user: "U111", + ts: "1709000000.000200", + text: "file follows", + files: [{ id: "F1" }], + } as never, + { source: "message" }, + ); + + expect(flushKeyMock).toHaveBeenCalledWith("slack:default:C111:1709000000.000100:U111"); + }); }); diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index 2c884803ffa..f5d47400cde 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -16,17 +16,57 @@ export type SlackMessageHandler = ( opts: { source: "message" | "app_mention"; wasMentioned?: boolean }, ) => Promise; +function resolveSlackSenderId(message: SlackMessageEvent): string | null { + return message.user ?? message.bot_id ?? null; +} + +function isSlackDirectMessageChannel(channelId: string): boolean { + return channelId.startsWith("D"); +} + +function isTopLevelSlackMessage(message: SlackMessageEvent): boolean { + return !message.thread_ts && !message.parent_user_id; +} + +function buildTopLevelSlackConversationKey( + message: SlackMessageEvent, + accountId: string, +): string | null { + if (!isTopLevelSlackMessage(message)) { + return null; + } + const senderId = resolveSlackSenderId(message); + if (!senderId) { + return null; + } + return `slack:${accountId}:${message.channel}:${senderId}`; +} + +function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonitorContext["cfg"]) { + const text = message.text ?? ""; + if (!text.trim()) { + return false; + } + if (message.files && message.files.length > 0) { + return false; + } + const textForCommandDetection = stripSlackMentionsForCommandDetection(text); + return !hasControlCommand(textForCommandDetection, cfg); +} + /** * Build a debounce key that isolates messages by thread (or by message timestamp - * for top-level channel messages). Without per-message scoping, concurrent - * top-level messages from the same sender would share a key and get merged + * for top-level non-DM channel messages). Without per-message scoping, concurrent + * top-level messages from the same sender can share a key and get merged * into a single reply on the wrong thread. + * + * DMs intentionally stay channel-scoped to preserve short-message batching. */ export function buildSlackDebounceKey( message: SlackMessageEvent, accountId: string, ): string | null { - const senderId = message.user ?? message.bot_id; + const senderId = resolveSlackSenderId(message); if (!senderId) { return null; } @@ -35,7 +75,7 @@ export function buildSlackDebounceKey( ? `${message.channel}:${message.thread_ts}` : message.parent_user_id && messageTs ? `${message.channel}:maybe-thread:${messageTs}` - : messageTs + : messageTs && !isSlackDirectMessageChannel(message.channel) ? `${message.channel}:${messageTs}` : message.channel; return `slack:${accountId}:${threadKey}:${senderId}`; @@ -50,6 +90,7 @@ export function createSlackMessageHandler(params: { const { ctx, account, trackEvent } = params; const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" }); const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client }); + const pendingTopLevelDebounceKeys = new Map>(); const debouncer = createInboundDebouncer<{ message: SlackMessageEvent; @@ -57,22 +98,26 @@ export function createSlackMessageHandler(params: { }>({ debounceMs, buildKey: (entry) => buildSlackDebounceKey(entry.message, ctx.accountId), - shouldDebounce: (entry) => { - const text = entry.message.text ?? ""; - if (!text.trim()) { - return false; - } - if (entry.message.files && entry.message.files.length > 0) { - return false; - } - const textForCommandDetection = stripSlackMentionsForCommandDetection(text); - return !hasControlCommand(textForCommandDetection, ctx.cfg); - }, + shouldDebounce: (entry) => shouldDebounceSlackMessage(entry.message, ctx.cfg), onFlush: async (entries) => { const last = entries.at(-1); if (!last) { return; } + const flushedKey = buildSlackDebounceKey(last.message, ctx.accountId); + const topLevelConversationKey = buildTopLevelSlackConversationKey( + last.message, + ctx.accountId, + ); + if (flushedKey && topLevelConversationKey) { + const pendingKeys = pendingTopLevelDebounceKeys.get(topLevelConversationKey); + if (pendingKeys) { + pendingKeys.delete(flushedKey); + if (pendingKeys.size === 0) { + pendingTopLevelDebounceKeys.delete(topLevelConversationKey); + } + } + } const combinedText = entries.length === 1 ? (last.message.text ?? "") @@ -129,6 +174,23 @@ export function createSlackMessageHandler(params: { } trackEvent?.(); const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source }); + const debounceKey = buildSlackDebounceKey(resolvedMessage, ctx.accountId); + const conversationKey = buildTopLevelSlackConversationKey(resolvedMessage, ctx.accountId); + const canDebounce = debounceMs > 0 && shouldDebounceSlackMessage(resolvedMessage, ctx.cfg); + if (!canDebounce && conversationKey) { + const pendingKeys = pendingTopLevelDebounceKeys.get(conversationKey); + if (pendingKeys && pendingKeys.size > 0) { + const keysToFlush = Array.from(pendingKeys); + for (const pendingKey of keysToFlush) { + await debouncer.flushKey(pendingKey); + } + } + } + if (canDebounce && debounceKey && conversationKey) { + const pendingKeys = pendingTopLevelDebounceKeys.get(conversationKey) ?? new Set(); + pendingKeys.add(debounceKey); + pendingTopLevelDebounceKeys.set(conversationKey, pendingKeys); + } await debouncer.enqueue({ message: resolvedMessage, opts }); }; }