mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 22:44:36 +00:00
fix(slack): reconnect socket mode after disconnect (#27232)
* fix(slack): reconnect socket mode after disconnect * fix(slack): avoid orphaned disconnect waiters on start failure * docs(changelog): record slack socket reconnect reliability fix --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
a54b85822c
commit
949faff5ce
@@ -98,6 +98,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Slack/Usage footer formatting: wrap session keys in inline code in full response-usage footers so Slack does not parse colon-delimited session segments as emoji shortcodes. (#30258) Thanks @pushkarsingh32.
|
- Slack/Usage footer formatting: wrap session keys in inline code in full response-usage footers so Slack does not parse colon-delimited session segments as emoji shortcodes. (#30258) Thanks @pushkarsingh32.
|
||||||
- Slack/Socket Mode slash startup: treat `app.options()` registration as best-effort and fall back to static arg menus when listener registration fails, preventing Slack monitor startup crash loops on receiver init edge cases. (#21715)
|
- Slack/Socket Mode slash startup: treat `app.options()` registration as best-effort and fall back to static arg menus when listener registration fails, preventing Slack monitor startup crash loops on receiver init edge cases. (#21715)
|
||||||
- Slack/Legacy streaming config: map boolean `channels.slack.streaming=false` to unified streaming mode `off` (with `nativeStreaming=false`) so legacy configs correctly disable draft preview/native streaming instead of defaulting to `partial`. (#25990) Thanks @chilu18.
|
- Slack/Legacy streaming config: map boolean `channels.slack.streaming=false` to unified streaming mode `off` (with `nativeStreaming=false`) so legacy configs correctly disable draft preview/native streaming instead of defaulting to `partial`. (#25990) Thanks @chilu18.
|
||||||
|
- Slack/Socket reconnect reliability: reconnect Socket Mode after disconnect/start failures using bounded exponential backoff with abort-aware waits, while preserving clean shutdown behavior and adding disconnect/error helper tests. (#27232) Thanks @pandego.
|
||||||
- Onboarding/Custom providers: raise default custom-provider model context window to the runtime hard minimum (16k) and auto-heal existing custom model entries below that threshold during reconfiguration, preventing immediate `Model context window too small (4096 tokens)` failures. (#21653) Thanks @r4jiv007.
|
- Onboarding/Custom providers: raise default custom-provider model context window to the runtime hard minimum (16k) and auto-heal existing custom model entries below that threshold during reconfiguration, preventing immediate `Model context window too small (4096 tokens)` failures. (#21653) Thanks @r4jiv007.
|
||||||
- Web UI/Assistant text: strip internal `<relevant-memories>...</relevant-memories>` scaffolding from rendered assistant messages (while preserving code-fence literals), preventing memory-context leakage in chat output for models that echo internal blocks. (#29851) Thanks @Valkster70.
|
- Web UI/Assistant text: strip internal `<relevant-memories>...</relevant-memories>` scaffolding from rendered assistant messages (while preserving code-fence literals), preventing memory-context leakage in chat output for models that echo internal blocks. (#29851) Thanks @Valkster70.
|
||||||
- Dashboard/Sessions: allow authenticated Control UI clients to delete and patch sessions while still blocking regular webchat clients from session mutation RPCs, fixing Dashboard session delete failures. (#21264) Thanks @jskoiz.
|
- Dashboard/Sessions: allow authenticated Control UI clients to delete and patch sessions while still blocking regular webchat clients from session mutation RPCs, fixing Dashboard session delete failures. (#21264) Thanks @jskoiz.
|
||||||
|
|||||||
45
src/slack/monitor/provider.reconnect.test.ts
Normal file
45
src/slack/monitor/provider.reconnect.test.ts
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import { __testing } from "./provider.js";
|
||||||
|
|
||||||
|
class FakeEmitter {
|
||||||
|
private listeners = new Map<string, Set<(...args: unknown[]) => void>>();
|
||||||
|
|
||||||
|
on(event: string, listener: (...args: unknown[]) => void) {
|
||||||
|
const bucket = this.listeners.get(event) ?? new Set<(...args: unknown[]) => void>();
|
||||||
|
bucket.add(listener);
|
||||||
|
this.listeners.set(event, bucket);
|
||||||
|
}
|
||||||
|
|
||||||
|
off(event: string, listener: (...args: unknown[]) => void) {
|
||||||
|
this.listeners.get(event)?.delete(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
emit(event: string, ...args: unknown[]) {
|
||||||
|
for (const listener of this.listeners.get(event) ?? []) {
|
||||||
|
listener(...args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("slack socket reconnect helpers", () => {
|
||||||
|
it("resolves disconnect waiter on socket disconnect event", async () => {
|
||||||
|
const client = new FakeEmitter();
|
||||||
|
const app = { receiver: { client } };
|
||||||
|
|
||||||
|
const waiter = __testing.waitForSlackSocketDisconnect(app as never);
|
||||||
|
client.emit("disconnected");
|
||||||
|
|
||||||
|
await expect(waiter).resolves.toEqual({ event: "disconnect" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resolves disconnect waiter on socket error event", async () => {
|
||||||
|
const client = new FakeEmitter();
|
||||||
|
const app = { receiver: { client } };
|
||||||
|
const err = new Error("dns down");
|
||||||
|
|
||||||
|
const waiter = __testing.waitForSlackSocketDisconnect(app as never);
|
||||||
|
client.emit("error", err);
|
||||||
|
|
||||||
|
await expect(waiter).resolves.toEqual({ event: "error", error: err });
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -18,6 +18,7 @@ import {
|
|||||||
} from "../../config/runtime-group-policy.js";
|
} from "../../config/runtime-group-policy.js";
|
||||||
import type { SessionScope } from "../../config/sessions.js";
|
import type { SessionScope } from "../../config/sessions.js";
|
||||||
import { warn } from "../../globals.js";
|
import { warn } from "../../globals.js";
|
||||||
|
import { computeBackoff, sleepWithAbort } from "../../infra/backoff.js";
|
||||||
import { installRequestBodyLimitGuard } from "../../infra/http-body.js";
|
import { installRequestBodyLimitGuard } from "../../infra/http-body.js";
|
||||||
import { normalizeMainKey } from "../../routing/session-key.js";
|
import { normalizeMainKey } from "../../routing/session-key.js";
|
||||||
import { createNonExitingRuntime, type RuntimeEnv } from "../../runtime.js";
|
import { createNonExitingRuntime, type RuntimeEnv } from "../../runtime.js";
|
||||||
@@ -46,6 +47,100 @@ const { App, HTTPReceiver } = slackBolt;
|
|||||||
|
|
||||||
const SLACK_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
|
const SLACK_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
|
||||||
const SLACK_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
|
const SLACK_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
|
||||||
|
const SLACK_SOCKET_RECONNECT_POLICY = {
|
||||||
|
initialMs: 2_000,
|
||||||
|
maxMs: 30_000,
|
||||||
|
factor: 1.8,
|
||||||
|
jitter: 0.25,
|
||||||
|
maxAttempts: 12,
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
type SlackSocketDisconnectEvent = "disconnect" | "unable_to_socket_mode_start" | "error";
|
||||||
|
|
||||||
|
type EmitterLike = {
|
||||||
|
on: (event: string, listener: (...args: unknown[]) => void) => unknown;
|
||||||
|
off: (event: string, listener: (...args: unknown[]) => void) => unknown;
|
||||||
|
};
|
||||||
|
|
||||||
|
function getSocketEmitter(app: unknown): EmitterLike | null {
|
||||||
|
const receiver = (app as { receiver?: unknown }).receiver;
|
||||||
|
const client =
|
||||||
|
receiver && typeof receiver === "object"
|
||||||
|
? (receiver as { client?: unknown }).client
|
||||||
|
: undefined;
|
||||||
|
if (!client || typeof client !== "object") {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
const on = (client as { on?: unknown }).on;
|
||||||
|
const off = (client as { off?: unknown }).off;
|
||||||
|
if (typeof on !== "function" || typeof off !== "function") {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
on: (event, listener) =>
|
||||||
|
(
|
||||||
|
on as (this: unknown, event: string, listener: (...args: unknown[]) => void) => unknown
|
||||||
|
).call(client, event, listener),
|
||||||
|
off: (event, listener) =>
|
||||||
|
(
|
||||||
|
off as (this: unknown, event: string, listener: (...args: unknown[]) => void) => unknown
|
||||||
|
).call(client, event, listener),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function waitForSlackSocketDisconnect(
|
||||||
|
app: unknown,
|
||||||
|
abortSignal?: AbortSignal,
|
||||||
|
): Promise<{
|
||||||
|
event: SlackSocketDisconnectEvent;
|
||||||
|
error?: unknown;
|
||||||
|
}> {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
const emitter = getSocketEmitter(app);
|
||||||
|
if (!emitter) {
|
||||||
|
abortSignal?.addEventListener("abort", () => resolve({ event: "disconnect" }), {
|
||||||
|
once: true,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const disconnectListener = () => resolveOnce({ event: "disconnect" });
|
||||||
|
const startFailListener = () => resolveOnce({ event: "unable_to_socket_mode_start" });
|
||||||
|
const errorListener = (error: unknown) => resolveOnce({ event: "error", error });
|
||||||
|
const abortListener = () => resolveOnce({ event: "disconnect" });
|
||||||
|
|
||||||
|
const cleanup = () => {
|
||||||
|
emitter.off("disconnected", disconnectListener);
|
||||||
|
emitter.off("unable_to_socket_mode_start", startFailListener);
|
||||||
|
emitter.off("error", errorListener);
|
||||||
|
abortSignal?.removeEventListener("abort", abortListener);
|
||||||
|
};
|
||||||
|
|
||||||
|
const resolveOnce = (value: { event: SlackSocketDisconnectEvent; error?: unknown }) => {
|
||||||
|
cleanup();
|
||||||
|
resolve(value);
|
||||||
|
};
|
||||||
|
|
||||||
|
emitter.on("disconnected", disconnectListener);
|
||||||
|
emitter.on("unable_to_socket_mode_start", startFailListener);
|
||||||
|
emitter.on("error", errorListener);
|
||||||
|
abortSignal?.addEventListener("abort", abortListener, { once: true });
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function formatUnknownError(error: unknown): string {
|
||||||
|
if (error instanceof Error) {
|
||||||
|
return error.message;
|
||||||
|
}
|
||||||
|
if (typeof error === "string") {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return JSON.stringify(error);
|
||||||
|
} catch {
|
||||||
|
return "unknown error";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function parseApiAppIdFromAppToken(raw?: string) {
|
function parseApiAppIdFromAppToken(raw?: string) {
|
||||||
const token = raw?.trim();
|
const token = raw?.trim();
|
||||||
@@ -362,19 +457,74 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
if (slackMode === "socket") {
|
if (slackMode === "socket") {
|
||||||
await app.start();
|
let reconnectAttempts = 0;
|
||||||
runtime.log?.("slack socket mode connected");
|
while (!opts.abortSignal?.aborted) {
|
||||||
|
try {
|
||||||
|
await app.start();
|
||||||
|
reconnectAttempts = 0;
|
||||||
|
runtime.log?.("slack socket mode connected");
|
||||||
|
} catch (err) {
|
||||||
|
reconnectAttempts += 1;
|
||||||
|
if (
|
||||||
|
SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 &&
|
||||||
|
reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts);
|
||||||
|
runtime.error?.(
|
||||||
|
`slack socket mode failed to start. retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s (${formatUnknownError(err)})`,
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||||
|
} catch {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opts.abortSignal?.aborted) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const disconnect = await waitForSlackSocketDisconnect(app, opts.abortSignal);
|
||||||
|
if (opts.abortSignal?.aborted) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
reconnectAttempts += 1;
|
||||||
|
if (
|
||||||
|
SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 &&
|
||||||
|
reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts
|
||||||
|
) {
|
||||||
|
throw new Error(
|
||||||
|
`Slack socket mode reconnect max attempts reached (${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts}) after ${disconnect.event}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts);
|
||||||
|
runtime.error?.(
|
||||||
|
`slack socket disconnected (${disconnect.event}). retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s${
|
||||||
|
disconnect.error ? ` (${formatUnknownError(disconnect.error)})` : ""
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
await app.stop().catch(() => undefined);
|
||||||
|
try {
|
||||||
|
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||||
|
} catch {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
runtime.log?.(`slack http mode listening at ${slackWebhookPath}`);
|
runtime.log?.(`slack http mode listening at ${slackWebhookPath}`);
|
||||||
|
if (!opts.abortSignal?.aborted) {
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
opts.abortSignal?.addEventListener("abort", () => resolve(), {
|
||||||
|
once: true,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (opts.abortSignal?.aborted) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await new Promise<void>((resolve) => {
|
|
||||||
opts.abortSignal?.addEventListener("abort", () => resolve(), {
|
|
||||||
once: true,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
} finally {
|
} finally {
|
||||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||||
unregisterHttpHandler?.();
|
unregisterHttpHandler?.();
|
||||||
@@ -385,4 +535,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
|||||||
export const __testing = {
|
export const __testing = {
|
||||||
resolveSlackRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy,
|
resolveSlackRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy,
|
||||||
resolveDefaultGroupPolicy,
|
resolveDefaultGroupPolicy,
|
||||||
|
getSocketEmitter,
|
||||||
|
waitForSlackSocketDisconnect,
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user