Runtime: stabilize tool/run state transitions under compaction and backpressure

Synthesize runtime state transition fixes for compaction tool-use integrity and long-running handler backpressure.

Sources: #33630, #33583

Co-authored-by: Kevin Shenghui <shenghuikevin@gmail.com>
Co-authored-by: Theo Tarr <theodore@tarr.com>
This commit is contained in:
Tak Hoffman
2026-03-03 21:25:32 -06:00
committed by GitHub
parent 575bd77196
commit 9889c6da53
15 changed files with 1090 additions and 21 deletions

View File

@@ -58,6 +58,8 @@ function sleep(ms: number): Promise<void> {
});
}
const DISCORD_TYPING_MAX_DURATION_MS = 20 * 60_000;
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
const {
cfg,
@@ -430,6 +432,8 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
error: err,
});
},
// Long tool-heavy runs are expected on Discord; keep heartbeats alive.
maxDurationMs: DISCORD_TYPING_MAX_DURATION_MS,
});
// --- Discord draft stream (edit-based preview streaming) ---

View File

@@ -0,0 +1,411 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.js";
import { createNoopThreadBindingManager } from "./thread-bindings.js";
const preflightDiscordMessageMock = vi.hoisted(() => vi.fn());
const processDiscordMessageMock = vi.hoisted(() => vi.fn());
vi.mock("./message-handler.preflight.js", () => ({
preflightDiscordMessage: preflightDiscordMessageMock,
}));
vi.mock("./message-handler.process.js", () => ({
processDiscordMessage: processDiscordMessageMock,
}));
const { createDiscordMessageHandler } = await import("./message-handler.js");
function createDeferred<T = void>() {
let resolve: (value: T | PromiseLike<T>) => void = () => {};
const promise = new Promise<T>((innerResolve) => {
resolve = innerResolve;
});
return { promise, resolve };
}
function createHandlerParams(overrides?: {
setStatus?: (patch: Record<string, unknown>) => void;
abortSignal?: AbortSignal;
}) {
const cfg: OpenClawConfig = {
channels: {
discord: {
enabled: true,
token: "test-token",
groupPolicy: "allowlist",
},
},
messages: {
inbound: {
debounceMs: 0,
},
},
};
return {
cfg,
discordConfig: cfg.channels?.discord,
accountId: "default",
token: "test-token",
runtime: {
log: vi.fn(),
error: vi.fn(),
exit: (code: number): never => {
throw new Error(`exit ${code}`);
},
},
botUserId: "bot-123",
guildHistories: new Map(),
historyLimit: 0,
mediaMaxBytes: 10_000,
textLimit: 2_000,
replyToMode: "off" as const,
dmEnabled: true,
groupDmEnabled: false,
threadBindings: createNoopThreadBindingManager("default"),
setStatus: overrides?.setStatus,
abortSignal: overrides?.abortSignal,
};
}
function createMessageData(messageId: string, channelId = "ch-1") {
return {
channel_id: channelId,
author: { id: "user-1" },
message: {
id: messageId,
author: { id: "user-1", bot: false },
content: "hello",
channel_id: channelId,
attachments: [{ id: `att-${messageId}` }],
},
};
}
function createPreflightContext(channelId = "ch-1") {
return {
route: {
sessionKey: `agent:main:discord:channel:${channelId}`,
},
baseSessionKey: `agent:main:discord:channel:${channelId}`,
messageChannelId: channelId,
};
}
describe("createDiscordMessageHandler queue behavior", () => {
it("resets busy counters when the handler is created", () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const setStatus = vi.fn();
createDiscordMessageHandler(createHandlerParams({ setStatus }));
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({
activeRuns: 0,
busy: false,
}),
);
});
it("returns immediately and tracks busy status while queued runs execute", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstRun = createDeferred();
const secondRun = createDeferred();
processDiscordMessageMock
.mockImplementationOnce(async () => {
await firstRun.promise;
})
.mockImplementationOnce(async () => {
await secondRun.promise;
});
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const setStatus = vi.fn();
const handler = createDiscordMessageHandler(createHandlerParams({ setStatus }));
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({
activeRuns: 1,
busy: true,
}),
);
await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(2);
});
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
firstRun.resolve();
await firstRun.promise;
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
secondRun.resolve();
await secondRun.promise;
await vi.waitFor(() => {
expect(setStatus).toHaveBeenLastCalledWith(
expect.objectContaining({
activeRuns: 0,
busy: false,
}),
);
});
});
it("refreshes run activity while active runs are in progress", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const runInFlight = createDeferred();
processDiscordMessageMock.mockImplementation(async () => {
await runInFlight.promise;
});
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
let heartbeatTick: () => void = () => {};
let capturedHeartbeat = false;
const setIntervalSpy = vi
.spyOn(globalThis, "setInterval")
.mockImplementation((callback: TimerHandler) => {
if (typeof callback === "function") {
heartbeatTick = () => {
callback();
};
capturedHeartbeat = true;
}
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval");
try {
const setStatus = vi.fn();
const handler = createDiscordMessageHandler(createHandlerParams({ setStatus }));
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
expect(capturedHeartbeat).toBe(true);
const busyCallsBefore = setStatus.mock.calls.filter(
([patch]) => (patch as { busy?: boolean }).busy === true,
).length;
heartbeatTick();
const busyCallsAfter = setStatus.mock.calls.filter(
([patch]) => (patch as { busy?: boolean }).busy === true,
).length;
expect(busyCallsAfter).toBeGreaterThan(busyCallsBefore);
runInFlight.resolve();
await runInFlight.promise;
await vi.waitFor(() => {
expect(clearIntervalSpy).toHaveBeenCalled();
});
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
}
});
it("stops status publishing after lifecycle abort", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const runInFlight = createDeferred();
processDiscordMessageMock.mockImplementation(async () => {
await runInFlight.promise;
});
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const setStatus = vi.fn();
const abortController = new AbortController();
const handler = createDiscordMessageHandler(
createHandlerParams({ setStatus, abortSignal: abortController.signal }),
);
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
const callsBeforeAbort = setStatus.mock.calls.length;
abortController.abort();
runInFlight.resolve();
await runInFlight.promise;
await Promise.resolve();
expect(setStatus.mock.calls.length).toBe(callsBeforeAbort);
});
it("stops status publishing after handler deactivation", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const runInFlight = createDeferred();
processDiscordMessageMock.mockImplementation(async () => {
await runInFlight.promise;
});
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const setStatus = vi.fn();
const handler = createDiscordMessageHandler(createHandlerParams({ setStatus }));
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
const callsBeforeDeactivate = setStatus.mock.calls.length;
handler.deactivate();
runInFlight.resolve();
await runInFlight.promise;
await Promise.resolve();
expect(setStatus.mock.calls.length).toBe(callsBeforeDeactivate);
});
it("skips queued runs that have not started yet after deactivation", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstRun = createDeferred();
processDiscordMessageMock
.mockImplementationOnce(async () => {
await firstRun.promise;
})
.mockImplementationOnce(async () => undefined);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const handler = createDiscordMessageHandler(createHandlerParams());
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined();
handler.deactivate();
firstRun.resolve();
await firstRun.promise;
await Promise.resolve();
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
it("preserves non-debounced message ordering by awaiting debouncer enqueue", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstPreflight = createDeferred();
const processedMessageIds: string[] = [];
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string; message?: { id?: string } } }) => {
const messageId = params.data.message?.id ?? "unknown";
if (messageId === "m-1") {
await firstPreflight.promise;
}
return {
...createPreflightContext(params.data.channel_id),
messageId,
};
},
);
processDiscordMessageMock.mockImplementation(async (ctx: { messageId?: string }) => {
processedMessageIds.push(ctx.messageId ?? "unknown");
});
const handler = createDiscordMessageHandler(createHandlerParams());
const sequentialDispatch = (async () => {
await handler(createMessageData("m-1") as never, {} as never);
await handler(createMessageData("m-2") as never, {} as never);
})();
await vi.waitFor(() => {
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1);
});
await Promise.resolve();
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1);
firstPreflight.resolve();
await sequentialDispatch;
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
expect(processedMessageIds).toEqual(["m-1", "m-2"]);
});
it("recovers queue progress after a run failure without leaving busy state stuck", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstRun = createDeferred();
processDiscordMessageMock
.mockImplementationOnce(async () => {
await firstRun.promise;
throw new Error("simulated run failure");
})
.mockImplementationOnce(async () => undefined);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const setStatus = vi.fn();
const handler = createDiscordMessageHandler(createHandlerParams({ setStatus }));
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined();
firstRun.resolve();
await firstRun.promise.catch(() => undefined);
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
await vi.waitFor(() => {
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({ activeRuns: 0, busy: false }),
);
});
});
});

View File

@@ -3,26 +3,51 @@ import {
createChannelInboundDebouncer,
shouldDebounceTextInbound,
} from "../../channels/inbound-debounce-policy.js";
import { createRunStateMachine } from "../../channels/run-state-machine.js";
import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js";
import { danger } from "../../globals.js";
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js";
import { preflightDiscordMessage } from "./message-handler.preflight.js";
import type { DiscordMessagePreflightParams } from "./message-handler.preflight.types.js";
import type {
DiscordMessagePreflightContext,
DiscordMessagePreflightParams,
} from "./message-handler.preflight.types.js";
import { processDiscordMessage } from "./message-handler.process.js";
import {
hasDiscordMessageStickers,
resolveDiscordMessageChannelId,
resolveDiscordMessageText,
} from "./message-utils.js";
import type { DiscordMonitorStatusSink } from "./status.js";
type DiscordMessageHandlerParams = Omit<
DiscordMessagePreflightParams,
"ackReactionScope" | "groupPolicy" | "data" | "client"
>;
> & {
setStatus?: DiscordMonitorStatusSink;
abortSignal?: AbortSignal;
};
export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & {
deactivate: () => void;
};
function resolveDiscordRunQueueKey(ctx: DiscordMessagePreflightContext): string {
const sessionKey = ctx.route.sessionKey?.trim();
if (sessionKey) {
return sessionKey;
}
const baseSessionKey = ctx.baseSessionKey?.trim();
if (baseSessionKey) {
return baseSessionKey;
}
return ctx.messageChannelId;
}
export function createDiscordMessageHandler(
params: DiscordMessageHandlerParams,
): DiscordMessageHandler {
): DiscordMessageHandlerWithLifecycle {
const { groupPolicy } = resolveOpenProviderRuntimeGroupPolicy({
providerConfigPresent: params.cfg.channels?.discord !== undefined,
groupPolicy: params.discordConfig?.groupPolicy,
@@ -32,6 +57,34 @@ export function createDiscordMessageHandler(
params.discordConfig?.ackReactionScope ??
params.cfg.messages?.ackReactionScope ??
"group-mentions";
const runQueue = new KeyedAsyncQueue();
const runState = createRunStateMachine({
setStatus: params.setStatus,
abortSignal: params.abortSignal,
});
const enqueueDiscordRun = (ctx: DiscordMessagePreflightContext) => {
const queueKey = resolveDiscordRunQueueKey(ctx);
void runQueue
.enqueue(queueKey, async () => {
if (!runState.isActive()) {
return;
}
runState.onRunStart();
try {
if (!runState.isActive()) {
return;
}
await processDiscordMessage(ctx);
} finally {
runState.onRunEnd();
}
})
.catch((err) => {
params.runtime.error?.(danger(`discord process failed: ${String(err)}`));
});
};
const { debouncer } = createChannelInboundDebouncer<{
data: DiscordMessageEvent;
client: Client;
@@ -84,9 +137,7 @@ export function createDiscordMessageHandler(
if (!ctx) {
return;
}
void processDiscordMessage(ctx).catch((err) => {
params.runtime.error?.(danger(`discord process failed: ${String(err)}`));
});
enqueueDiscordRun(ctx);
return;
}
const combinedBaseText = entries
@@ -130,30 +181,32 @@ export function createDiscordMessageHandler(
ctxBatch.MessageSidLast = ids[ids.length - 1];
}
}
void processDiscordMessage(ctx).catch((err) => {
params.runtime.error?.(danger(`discord process failed: ${String(err)}`));
});
enqueueDiscordRun(ctx);
},
onError: (err) => {
params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`));
},
});
return async (data, client) => {
try {
// Filter bot-own messages before they enter the debounce queue.
// The same check exists in preflightDiscordMessage(), but by that point
// the message has already consumed debounce capacity and blocked
// legitimate user messages. On active servers this causes cumulative
// slowdown (see #15874).
const msgAuthorId = data.message?.author?.id ?? data.author?.id;
if (params.botUserId && msgAuthorId === params.botUserId) {
return;
}
const handler: DiscordMessageHandlerWithLifecycle = async (data, client) => {
// Filter bot-own messages before they enter the debounce queue.
// The same check exists in preflightDiscordMessage(), but by that point
// the message has already consumed debounce capacity and blocked
// legitimate user messages. On active servers this causes cumulative
// slowdown (see #15874).
const msgAuthorId = data.message?.author?.id ?? data.author?.id;
if (params.botUserId && msgAuthorId === params.botUserId) {
return;
}
try {
await debouncer.enqueue({ data, client });
} catch (err) {
params.runtime.error?.(danger(`handler failed: ${String(err)}`));
}
};
handler.deactivate = runState.deactivate;
return handler;
}

View File

@@ -395,6 +395,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
}
let lifecycleStarted = false;
let releaseEarlyGatewayErrorGuard = () => {};
let deactivateMessageHandler: (() => void) | undefined;
let autoPresenceController: ReturnType<typeof createDiscordAutoPresenceController> | null = null;
try {
const commands: BaseCommand[] = commandSpecs.map((spec) =>
@@ -596,6 +597,8 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
accountId: account.accountId,
token,
runtime,
setStatus: opts.setStatus,
abortSignal: opts.abortSignal,
botUserId,
guildHistories,
historyLimit,
@@ -610,6 +613,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
threadBindings,
discordRestFetch,
});
deactivateMessageHandler = messageHandler.deactivate;
const trackInboundEvent = opts.setStatus
? () => {
const at = Date.now();
@@ -679,6 +683,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
releaseEarlyGatewayErrorGuard,
});
} finally {
deactivateMessageHandler?.();
autoPresenceController?.stop();
opts.setStatus?.({ connected: false });
releaseEarlyGatewayErrorGuard();

View File

@@ -13,6 +13,9 @@ export type DiscordMonitorStatusPatch = {
| null;
lastInboundAt?: number | null;
lastError?: string | null;
busy?: boolean;
activeRuns?: number;
lastRunActivityAt?: number | null;
};
export type DiscordMonitorStatusSink = (patch: DiscordMonitorStatusPatch) => void;