mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 14:11:35 +00:00
refactor: consolidate typing lifecycle and queue policy
This commit is contained in:
@@ -51,6 +51,7 @@ import {
|
||||
readSessionMessages,
|
||||
} from "./post-compaction-audit.js";
|
||||
import { readPostCompactionContext } from "./post-compaction-context.js";
|
||||
import { resolveActiveRunQueueAction } from "./queue-policy.js";
|
||||
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
|
||||
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
|
||||
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
|
||||
@@ -235,12 +236,19 @@ export async function runReplyAgent(params: {
|
||||
}
|
||||
}
|
||||
|
||||
if (isHeartbeat && isActive) {
|
||||
const activeRunQueueAction = resolveActiveRunQueueAction({
|
||||
isActive,
|
||||
isHeartbeat,
|
||||
shouldFollowup,
|
||||
queueMode: resolvedQueue.mode,
|
||||
});
|
||||
|
||||
if (activeRunQueueAction === "drop") {
|
||||
typing.cleanup();
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
|
||||
if (activeRunQueueAction === "enqueue-followup") {
|
||||
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
|
||||
await touchActiveSessionEntry();
|
||||
typing.cleanup();
|
||||
|
||||
48
src/auto-reply/reply/queue-policy.test.ts
Normal file
48
src/auto-reply/reply/queue-policy.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { resolveActiveRunQueueAction } from "./queue-policy.js";
|
||||
|
||||
describe("resolveActiveRunQueueAction", () => {
|
||||
it("runs immediately when there is no active run", () => {
|
||||
expect(
|
||||
resolveActiveRunQueueAction({
|
||||
isActive: false,
|
||||
isHeartbeat: false,
|
||||
shouldFollowup: true,
|
||||
queueMode: "collect",
|
||||
}),
|
||||
).toBe("run-now");
|
||||
});
|
||||
|
||||
it("drops heartbeat runs while another run is active", () => {
|
||||
expect(
|
||||
resolveActiveRunQueueAction({
|
||||
isActive: true,
|
||||
isHeartbeat: true,
|
||||
shouldFollowup: true,
|
||||
queueMode: "collect",
|
||||
}),
|
||||
).toBe("drop");
|
||||
});
|
||||
|
||||
it("enqueues followups for non-heartbeat active runs", () => {
|
||||
expect(
|
||||
resolveActiveRunQueueAction({
|
||||
isActive: true,
|
||||
isHeartbeat: false,
|
||||
shouldFollowup: true,
|
||||
queueMode: "collect",
|
||||
}),
|
||||
).toBe("enqueue-followup");
|
||||
});
|
||||
|
||||
it("enqueues steer mode runs while active", () => {
|
||||
expect(
|
||||
resolveActiveRunQueueAction({
|
||||
isActive: true,
|
||||
isHeartbeat: false,
|
||||
shouldFollowup: false,
|
||||
queueMode: "steer",
|
||||
}),
|
||||
).toBe("enqueue-followup");
|
||||
});
|
||||
});
|
||||
21
src/auto-reply/reply/queue-policy.ts
Normal file
21
src/auto-reply/reply/queue-policy.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import type { QueueSettings } from "./queue.js";
|
||||
|
||||
export type ActiveRunQueueAction = "run-now" | "enqueue-followup" | "drop";
|
||||
|
||||
export function resolveActiveRunQueueAction(params: {
|
||||
isActive: boolean;
|
||||
isHeartbeat: boolean;
|
||||
shouldFollowup: boolean;
|
||||
queueMode: QueueSettings["mode"];
|
||||
}): ActiveRunQueueAction {
|
||||
if (!params.isActive) {
|
||||
return "run-now";
|
||||
}
|
||||
if (params.isHeartbeat) {
|
||||
return "drop";
|
||||
}
|
||||
if (params.shouldFollowup || params.queueMode === "steer") {
|
||||
return "enqueue-followup";
|
||||
}
|
||||
return "run-now";
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
import type { TypingCallbacks } from "../../channels/typing.js";
|
||||
import type { HumanDelayConfig } from "../../config/types.js";
|
||||
import { sleep } from "../../utils.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
@@ -57,6 +58,7 @@ export type ReplyDispatcherOptions = {
|
||||
};
|
||||
|
||||
export type ReplyDispatcherWithTypingOptions = Omit<ReplyDispatcherOptions, "onIdle"> & {
|
||||
typingCallbacks?: TypingCallbacks;
|
||||
onReplyStart?: () => Promise<void> | void;
|
||||
onIdle?: () => void;
|
||||
/** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */
|
||||
@@ -209,28 +211,31 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
|
||||
export function createReplyDispatcherWithTyping(
|
||||
options: ReplyDispatcherWithTypingOptions,
|
||||
): ReplyDispatcherWithTypingResult {
|
||||
const { onReplyStart, onIdle, onCleanup, ...dispatcherOptions } = options;
|
||||
const { typingCallbacks, onReplyStart, onIdle, onCleanup, ...dispatcherOptions } = options;
|
||||
const resolvedOnReplyStart = onReplyStart ?? typingCallbacks?.onReplyStart;
|
||||
const resolvedOnIdle = onIdle ?? typingCallbacks?.onIdle;
|
||||
const resolvedOnCleanup = onCleanup ?? typingCallbacks?.onCleanup;
|
||||
let typingController: TypingController | undefined;
|
||||
const dispatcher = createReplyDispatcher({
|
||||
...dispatcherOptions,
|
||||
onIdle: () => {
|
||||
typingController?.markDispatchIdle();
|
||||
onIdle?.();
|
||||
resolvedOnIdle?.();
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
onReplyStart,
|
||||
onTypingCleanup: onCleanup,
|
||||
onReplyStart: resolvedOnReplyStart,
|
||||
onTypingCleanup: resolvedOnCleanup,
|
||||
onTypingController: (typing) => {
|
||||
typingController = typing;
|
||||
},
|
||||
},
|
||||
markDispatchIdle: () => {
|
||||
typingController?.markDispatchIdle();
|
||||
onIdle?.();
|
||||
resolvedOnIdle?.();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { createTypingKeepaliveLoop } from "../../channels/typing-lifecycle.js";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
|
||||
|
||||
export type TypingController = {
|
||||
@@ -35,7 +36,6 @@ export function createTypingController(params: {
|
||||
// especially when upstream event emitters don't await async listeners.
|
||||
// Once we stop typing, we "seal" the controller so late events can't restart typing forever.
|
||||
let sealed = false;
|
||||
let typingTimer: NodeJS.Timeout | undefined;
|
||||
let typingTtlTimer: NodeJS.Timeout | undefined;
|
||||
const typingIntervalMs = typingIntervalSeconds * 1000;
|
||||
|
||||
@@ -61,10 +61,7 @@ export function createTypingController(params: {
|
||||
clearTimeout(typingTtlTimer);
|
||||
typingTtlTimer = undefined;
|
||||
}
|
||||
if (typingTimer) {
|
||||
clearInterval(typingTimer);
|
||||
typingTimer = undefined;
|
||||
}
|
||||
typingLoop.stop();
|
||||
// Notify the channel to stop its typing indicator (e.g., on NO_REPLY).
|
||||
// This fires only once (sealed prevents re-entry).
|
||||
if (active) {
|
||||
@@ -88,7 +85,7 @@ export function createTypingController(params: {
|
||||
clearTimeout(typingTtlTimer);
|
||||
}
|
||||
typingTtlTimer = setTimeout(() => {
|
||||
if (!typingTimer) {
|
||||
if (!typingLoop.isRunning()) {
|
||||
return;
|
||||
}
|
||||
log?.(`typing TTL reached (${formatTypingTtl(typingTtlMs)}); stopping typing indicator`);
|
||||
@@ -105,6 +102,11 @@ export function createTypingController(params: {
|
||||
await onReplyStart?.();
|
||||
};
|
||||
|
||||
const typingLoop = createTypingKeepaliveLoop({
|
||||
intervalMs: typingIntervalMs,
|
||||
onTick: triggerTyping,
|
||||
});
|
||||
|
||||
const ensureStart = async () => {
|
||||
if (sealed) {
|
||||
return;
|
||||
@@ -146,16 +148,11 @@ export function createTypingController(params: {
|
||||
if (!onReplyStart) {
|
||||
return;
|
||||
}
|
||||
if (typingIntervalMs <= 0) {
|
||||
return;
|
||||
}
|
||||
if (typingTimer) {
|
||||
if (typingLoop.isRunning()) {
|
||||
return;
|
||||
}
|
||||
await ensureStart();
|
||||
typingTimer = setInterval(() => {
|
||||
void triggerTyping();
|
||||
}, typingIntervalMs);
|
||||
typingLoop.start();
|
||||
};
|
||||
|
||||
const startTypingOnText = async (text?: string) => {
|
||||
|
||||
Reference in New Issue
Block a user