fix: harden slack debounce key routing and ordering (#31951) (thanks @scoootscooob)

This commit is contained in:
Peter Steinberger
2026-03-02 19:18:10 +00:00
parent d4b20f5295
commit 0956b599e1
4 changed files with 122 additions and 15 deletions

View File

@@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai
### Fixes ### Fixes
- Gateway/Heartbeat model reload: treat `models.*` and `agents.defaults.model` config updates as heartbeat hot-reload triggers so heartbeat picks up model changes without a full gateway restart. (#32046) Thanks @stakeswky. - Gateway/Heartbeat model reload: treat `models.*` and `agents.defaults.model` config updates as heartbeat hot-reload triggers so heartbeat picks up model changes without a full gateway restart. (#32046) Thanks @stakeswky.
- Slack/inbound debounce routing: isolate top-level non-DM message debounce keys by message timestamp to avoid cross-thread collisions, preserve DM batching, and flush pending top-level buffers before immediate non-debounce follow-ups to keep ordering stable. (#31951) Thanks @scoootscooob.
- Memory/LanceDB embeddings: forward configured `embedding.dimensions` into OpenAI embeddings requests so vector size and API output dimensions stay aligned when dimensions are explicitly configured. (#32036) Thanks @scotthuang. - Memory/LanceDB embeddings: forward configured `embedding.dimensions` into OpenAI embeddings requests so vector size and API output dimensions stay aligned when dimensions are explicitly configured. (#32036) Thanks @scotthuang.
- Mentions/Slack formatting hardening: add null-safe guards for runtime text normalization paths so malformed/undefined text payloads do not crash mention stripping or mrkdwn conversion. (#31865) Thanks @stone-jin. - Mentions/Slack formatting hardening: add null-safe guards for runtime text normalization paths so malformed/undefined text payloads do not crash mention stripping or mrkdwn conversion. (#31865) Thanks @stone-jin.
- Failover/error classification: treat HTTP `529` (provider overloaded, common with Anthropic-compatible APIs) as `rate_limit` so model failover can engage instead of misclassifying the error path. (#31854) Thanks @bugkill3r. - Failover/error classification: treat HTTP `529` (provider overloaded, common with Anthropic-compatible APIs) as `rate_limit` so model failover can engage instead of misclassifying the error path. (#31854) Thanks @bugkill3r.

View File

@@ -50,6 +50,13 @@ describe("buildSlackDebounceKey", () => {
expect(keyB).toBe("slack:default:C123:1709000000.000200:U456"); expect(keyB).toBe("slack:default:C123:1709000000.000200:U456");
}); });
it("keeps top-level DMs channel-scoped to preserve short-message batching", () => {
const dmA = makeMessage({ channel: "D123", ts: "1709000000.000100" });
const dmB = makeMessage({ channel: "D123", ts: "1709000000.000200" });
expect(buildSlackDebounceKey(dmA, accountId)).toBe("slack:default:D123:U456");
expect(buildSlackDebounceKey(dmB, accountId)).toBe("slack:default:D123:U456");
});
it("falls back to bare channel when no timestamp is available", () => { it("falls back to bare channel when no timestamp is available", () => {
const msg = makeMessage({ ts: undefined, event_ts: undefined }); const msg = makeMessage({ ts: undefined, event_ts: undefined });
expect(buildSlackDebounceKey(msg, accountId)).toBe("slack:default:C123:U456"); expect(buildSlackDebounceKey(msg, accountId)).toBe("slack:default:C123:U456");

View File

@@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import { createSlackMessageHandler } from "./message-handler.js"; import { createSlackMessageHandler } from "./message-handler.js";
const enqueueMock = vi.fn(async (_entry: unknown) => {}); const enqueueMock = vi.fn(async (_entry: unknown) => {});
const flushKeyMock = vi.fn(async (_key: string) => {});
const resolveThreadTsMock = vi.fn(async ({ message }: { message: Record<string, unknown> }) => ({ const resolveThreadTsMock = vi.fn(async ({ message }: { message: Record<string, unknown> }) => ({
...message, ...message,
})); }));
@@ -10,6 +11,7 @@ vi.mock("../../auto-reply/inbound-debounce.js", () => ({
resolveInboundDebounceMs: () => 10, resolveInboundDebounceMs: () => 10,
createInboundDebouncer: () => ({ createInboundDebouncer: () => ({
enqueue: (entry: unknown) => enqueueMock(entry), enqueue: (entry: unknown) => enqueueMock(entry),
flushKey: (key: string) => flushKeyMock(key),
}), }),
})); }));
@@ -37,6 +39,7 @@ function createContext(overrides?: {
describe("createSlackMessageHandler", () => { describe("createSlackMessageHandler", () => {
beforeEach(() => { beforeEach(() => {
enqueueMock.mockClear(); enqueueMock.mockClear();
flushKeyMock.mockClear();
resolveThreadTsMock.mockClear(); resolveThreadTsMock.mockClear();
}); });
@@ -113,4 +116,38 @@ describe("createSlackMessageHandler", () => {
expect(resolveThreadTsMock).toHaveBeenCalledTimes(1); expect(resolveThreadTsMock).toHaveBeenCalledTimes(1);
expect(enqueueMock).toHaveBeenCalledTimes(1); expect(enqueueMock).toHaveBeenCalledTimes(1);
}); });
it("flushes pending top-level buffered keys before immediate non-debounce follow-ups", async () => {
const handler = createSlackMessageHandler({
ctx: createContext(),
account: { accountId: "default" } as Parameters<
typeof createSlackMessageHandler
>[0]["account"],
});
await handler(
{
type: "message",
channel: "C111",
user: "U111",
ts: "1709000000.000100",
text: "first buffered text",
} as never,
{ source: "message" },
);
await handler(
{
type: "message",
subtype: "file_share",
channel: "C111",
user: "U111",
ts: "1709000000.000200",
text: "file follows",
files: [{ id: "F1" }],
} as never,
{ source: "message" },
);
expect(flushKeyMock).toHaveBeenCalledWith("slack:default:C111:1709000000.000100:U111");
});
}); });

View File

@@ -16,17 +16,57 @@ export type SlackMessageHandler = (
opts: { source: "message" | "app_mention"; wasMentioned?: boolean }, opts: { source: "message" | "app_mention"; wasMentioned?: boolean },
) => Promise<void>; ) => Promise<void>;
function resolveSlackSenderId(message: SlackMessageEvent): string | null {
return message.user ?? message.bot_id ?? null;
}
function isSlackDirectMessageChannel(channelId: string): boolean {
return channelId.startsWith("D");
}
function isTopLevelSlackMessage(message: SlackMessageEvent): boolean {
return !message.thread_ts && !message.parent_user_id;
}
function buildTopLevelSlackConversationKey(
message: SlackMessageEvent,
accountId: string,
): string | null {
if (!isTopLevelSlackMessage(message)) {
return null;
}
const senderId = resolveSlackSenderId(message);
if (!senderId) {
return null;
}
return `slack:${accountId}:${message.channel}:${senderId}`;
}
function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonitorContext["cfg"]) {
const text = message.text ?? "";
if (!text.trim()) {
return false;
}
if (message.files && message.files.length > 0) {
return false;
}
const textForCommandDetection = stripSlackMentionsForCommandDetection(text);
return !hasControlCommand(textForCommandDetection, cfg);
}
/** /**
* Build a debounce key that isolates messages by thread (or by message timestamp * Build a debounce key that isolates messages by thread (or by message timestamp
* for top-level channel messages). Without per-message scoping, concurrent * for top-level non-DM channel messages). Without per-message scoping, concurrent
* top-level messages from the same sender would share a key and get merged * top-level messages from the same sender can share a key and get merged
* into a single reply on the wrong thread. * into a single reply on the wrong thread.
*
* DMs intentionally stay channel-scoped to preserve short-message batching.
*/ */
export function buildSlackDebounceKey( export function buildSlackDebounceKey(
message: SlackMessageEvent, message: SlackMessageEvent,
accountId: string, accountId: string,
): string | null { ): string | null {
const senderId = message.user ?? message.bot_id; const senderId = resolveSlackSenderId(message);
if (!senderId) { if (!senderId) {
return null; return null;
} }
@@ -35,7 +75,7 @@ export function buildSlackDebounceKey(
? `${message.channel}:${message.thread_ts}` ? `${message.channel}:${message.thread_ts}`
: message.parent_user_id && messageTs : message.parent_user_id && messageTs
? `${message.channel}:maybe-thread:${messageTs}` ? `${message.channel}:maybe-thread:${messageTs}`
: messageTs : messageTs && !isSlackDirectMessageChannel(message.channel)
? `${message.channel}:${messageTs}` ? `${message.channel}:${messageTs}`
: message.channel; : message.channel;
return `slack:${accountId}:${threadKey}:${senderId}`; return `slack:${accountId}:${threadKey}:${senderId}`;
@@ -50,6 +90,7 @@ export function createSlackMessageHandler(params: {
const { ctx, account, trackEvent } = params; const { ctx, account, trackEvent } = params;
const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" }); const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" });
const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client }); const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client });
const pendingTopLevelDebounceKeys = new Map<string, Set<string>>();
const debouncer = createInboundDebouncer<{ const debouncer = createInboundDebouncer<{
message: SlackMessageEvent; message: SlackMessageEvent;
@@ -57,22 +98,26 @@ export function createSlackMessageHandler(params: {
}>({ }>({
debounceMs, debounceMs,
buildKey: (entry) => buildSlackDebounceKey(entry.message, ctx.accountId), buildKey: (entry) => buildSlackDebounceKey(entry.message, ctx.accountId),
shouldDebounce: (entry) => { shouldDebounce: (entry) => shouldDebounceSlackMessage(entry.message, ctx.cfg),
const text = entry.message.text ?? "";
if (!text.trim()) {
return false;
}
if (entry.message.files && entry.message.files.length > 0) {
return false;
}
const textForCommandDetection = stripSlackMentionsForCommandDetection(text);
return !hasControlCommand(textForCommandDetection, ctx.cfg);
},
onFlush: async (entries) => { onFlush: async (entries) => {
const last = entries.at(-1); const last = entries.at(-1);
if (!last) { if (!last) {
return; return;
} }
const flushedKey = buildSlackDebounceKey(last.message, ctx.accountId);
const topLevelConversationKey = buildTopLevelSlackConversationKey(
last.message,
ctx.accountId,
);
if (flushedKey && topLevelConversationKey) {
const pendingKeys = pendingTopLevelDebounceKeys.get(topLevelConversationKey);
if (pendingKeys) {
pendingKeys.delete(flushedKey);
if (pendingKeys.size === 0) {
pendingTopLevelDebounceKeys.delete(topLevelConversationKey);
}
}
}
const combinedText = const combinedText =
entries.length === 1 entries.length === 1
? (last.message.text ?? "") ? (last.message.text ?? "")
@@ -129,6 +174,23 @@ export function createSlackMessageHandler(params: {
} }
trackEvent?.(); trackEvent?.();
const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source }); const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source });
const debounceKey = buildSlackDebounceKey(resolvedMessage, ctx.accountId);
const conversationKey = buildTopLevelSlackConversationKey(resolvedMessage, ctx.accountId);
const canDebounce = debounceMs > 0 && shouldDebounceSlackMessage(resolvedMessage, ctx.cfg);
if (!canDebounce && conversationKey) {
const pendingKeys = pendingTopLevelDebounceKeys.get(conversationKey);
if (pendingKeys && pendingKeys.size > 0) {
const keysToFlush = Array.from(pendingKeys);
for (const pendingKey of keysToFlush) {
await debouncer.flushKey(pendingKey);
}
}
}
if (canDebounce && debounceKey && conversationKey) {
const pendingKeys = pendingTopLevelDebounceKeys.get(conversationKey) ?? new Set<string>();
pendingKeys.add(debounceKey);
pendingTopLevelDebounceKeys.set(conversationKey, pendingKeys);
}
await debouncer.enqueue({ message: resolvedMessage, opts }); await debouncer.enqueue({ message: resolvedMessage, opts });
}; };
} }