mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 05:57:28 +00:00
feat(agent): add /btw side questions
This commit is contained in:
428
src/agents/btw.test.ts
Normal file
428
src/agents/btw.test.ts
Normal file
@@ -0,0 +1,428 @@
|
|||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import type { SessionEntry } from "../config/sessions.js";
|
||||||
|
|
||||||
|
const streamSimpleMock = vi.fn();
|
||||||
|
const appendCustomEntryMock = vi.fn();
|
||||||
|
const buildSessionContextMock = vi.fn();
|
||||||
|
const getLeafEntryMock = vi.fn();
|
||||||
|
const branchMock = vi.fn();
|
||||||
|
const resetLeafMock = vi.fn();
|
||||||
|
const ensureOpenClawModelsJsonMock = vi.fn();
|
||||||
|
const discoverAuthStorageMock = vi.fn();
|
||||||
|
const discoverModelsMock = vi.fn();
|
||||||
|
const resolveModelWithRegistryMock = vi.fn();
|
||||||
|
const getApiKeyForModelMock = vi.fn();
|
||||||
|
const requireApiKeyMock = vi.fn();
|
||||||
|
const acquireSessionWriteLockMock = vi.fn();
|
||||||
|
const resolveSessionAuthProfileOverrideMock = vi.fn();
|
||||||
|
const getActiveEmbeddedRunSnapshotMock = vi.fn();
|
||||||
|
const waitForEmbeddedPiRunEndMock = vi.fn();
|
||||||
|
|
||||||
|
vi.mock("@mariozechner/pi-ai", () => ({
|
||||||
|
streamSimple: (...args: unknown[]) => streamSimpleMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@mariozechner/pi-coding-agent", () => ({
|
||||||
|
SessionManager: {
|
||||||
|
open: () => ({
|
||||||
|
getLeafEntry: getLeafEntryMock,
|
||||||
|
branch: branchMock,
|
||||||
|
resetLeaf: resetLeafMock,
|
||||||
|
buildSessionContext: buildSessionContextMock,
|
||||||
|
appendCustomEntry: appendCustomEntryMock,
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./models-config.js", () => ({
|
||||||
|
ensureOpenClawModelsJson: (...args: unknown[]) => ensureOpenClawModelsJsonMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./pi-model-discovery.js", () => ({
|
||||||
|
discoverAuthStorage: (...args: unknown[]) => discoverAuthStorageMock(...args),
|
||||||
|
discoverModels: (...args: unknown[]) => discoverModelsMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./pi-embedded-runner/model.js", () => ({
|
||||||
|
resolveModelWithRegistry: (...args: unknown[]) => resolveModelWithRegistryMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./model-auth.js", () => ({
|
||||||
|
getApiKeyForModel: (...args: unknown[]) => getApiKeyForModelMock(...args),
|
||||||
|
requireApiKey: (...args: unknown[]) => requireApiKeyMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./session-write-lock.js", () => ({
|
||||||
|
acquireSessionWriteLock: (...args: unknown[]) => acquireSessionWriteLockMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./pi-embedded-runner/runs.js", () => ({
|
||||||
|
getActiveEmbeddedRunSnapshot: (...args: unknown[]) => getActiveEmbeddedRunSnapshotMock(...args),
|
||||||
|
waitForEmbeddedPiRunEnd: (...args: unknown[]) => waitForEmbeddedPiRunEndMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./auth-profiles/session-override.js", () => ({
|
||||||
|
resolveSessionAuthProfileOverride: (...args: unknown[]) =>
|
||||||
|
resolveSessionAuthProfileOverrideMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
const { BTW_CUSTOM_TYPE, runBtwSideQuestion } = await import("./btw.js");
|
||||||
|
|
||||||
|
function makeAsyncEvents(events: unknown[]) {
|
||||||
|
return {
|
||||||
|
async *[Symbol.asyncIterator]() {
|
||||||
|
for (const event of events) {
|
||||||
|
yield event;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createSessionEntry(overrides: Partial<SessionEntry> = {}): SessionEntry {
|
||||||
|
return {
|
||||||
|
sessionId: "session-1",
|
||||||
|
sessionFile: "session-1.jsonl",
|
||||||
|
updatedAt: Date.now(),
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("runBtwSideQuestion", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
streamSimpleMock.mockReset();
|
||||||
|
appendCustomEntryMock.mockReset();
|
||||||
|
buildSessionContextMock.mockReset();
|
||||||
|
getLeafEntryMock.mockReset();
|
||||||
|
branchMock.mockReset();
|
||||||
|
resetLeafMock.mockReset();
|
||||||
|
ensureOpenClawModelsJsonMock.mockReset();
|
||||||
|
discoverAuthStorageMock.mockReset();
|
||||||
|
discoverModelsMock.mockReset();
|
||||||
|
resolveModelWithRegistryMock.mockReset();
|
||||||
|
getApiKeyForModelMock.mockReset();
|
||||||
|
requireApiKeyMock.mockReset();
|
||||||
|
acquireSessionWriteLockMock.mockReset();
|
||||||
|
resolveSessionAuthProfileOverrideMock.mockReset();
|
||||||
|
getActiveEmbeddedRunSnapshotMock.mockReset();
|
||||||
|
waitForEmbeddedPiRunEndMock.mockReset();
|
||||||
|
|
||||||
|
buildSessionContextMock.mockReturnValue({
|
||||||
|
messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }],
|
||||||
|
});
|
||||||
|
getLeafEntryMock.mockReturnValue(null);
|
||||||
|
resolveModelWithRegistryMock.mockReturnValue({
|
||||||
|
provider: "anthropic",
|
||||||
|
id: "claude-sonnet-4-5",
|
||||||
|
api: "anthropic-messages",
|
||||||
|
});
|
||||||
|
getApiKeyForModelMock.mockResolvedValue({ apiKey: "secret", mode: "api-key", source: "test" });
|
||||||
|
requireApiKeyMock.mockReturnValue("secret");
|
||||||
|
acquireSessionWriteLockMock.mockResolvedValue({
|
||||||
|
release: vi.fn().mockResolvedValue(undefined),
|
||||||
|
});
|
||||||
|
resolveSessionAuthProfileOverrideMock.mockResolvedValue("profile-1");
|
||||||
|
getActiveEmbeddedRunSnapshotMock.mockReturnValue(undefined);
|
||||||
|
waitForEmbeddedPiRunEndMock.mockResolvedValue(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("streams blocks and persists a non-context custom entry", async () => {
|
||||||
|
const onBlockReply = vi.fn().mockResolvedValue(undefined);
|
||||||
|
streamSimpleMock.mockReturnValue(
|
||||||
|
makeAsyncEvents([
|
||||||
|
{
|
||||||
|
type: "text_delta",
|
||||||
|
delta: "Side answer.",
|
||||||
|
partial: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [],
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
type: "text_end",
|
||||||
|
content: "Side answer.",
|
||||||
|
contentIndex: 0,
|
||||||
|
partial: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [],
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
type: "done",
|
||||||
|
reason: "stop",
|
||||||
|
message: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "Side answer." }],
|
||||||
|
provider: "anthropic",
|
||||||
|
api: "anthropic-messages",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
stopReason: "stop",
|
||||||
|
usage: {
|
||||||
|
input: 1,
|
||||||
|
output: 2,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 3,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
timestamp: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await runBtwSideQuestion({
|
||||||
|
cfg: {} as never,
|
||||||
|
agentDir: "/tmp/agent",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
question: "What changed?",
|
||||||
|
sessionEntry: createSessionEntry(),
|
||||||
|
sessionStore: {},
|
||||||
|
sessionKey: "agent:main:main",
|
||||||
|
storePath: "/tmp/sessions.json",
|
||||||
|
resolvedThinkLevel: "low",
|
||||||
|
resolvedReasoningLevel: "off",
|
||||||
|
blockReplyChunking: {
|
||||||
|
minChars: 1,
|
||||||
|
maxChars: 200,
|
||||||
|
breakPreference: "paragraph",
|
||||||
|
},
|
||||||
|
resolvedBlockStreamingBreak: "text_end",
|
||||||
|
opts: { onBlockReply },
|
||||||
|
isNewSession: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result).toBeUndefined();
|
||||||
|
expect(onBlockReply).toHaveBeenCalledWith({ text: "Side answer." });
|
||||||
|
expect(appendCustomEntryMock).toHaveBeenCalledWith(
|
||||||
|
BTW_CUSTOM_TYPE,
|
||||||
|
expect.objectContaining({
|
||||||
|
question: "What changed?",
|
||||||
|
answer: "Side answer.",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns a final payload when block streaming is unavailable", async () => {
|
||||||
|
streamSimpleMock.mockReturnValue(
|
||||||
|
makeAsyncEvents([
|
||||||
|
{
|
||||||
|
type: "done",
|
||||||
|
reason: "stop",
|
||||||
|
message: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "Final answer." }],
|
||||||
|
provider: "anthropic",
|
||||||
|
api: "anthropic-messages",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
stopReason: "stop",
|
||||||
|
usage: {
|
||||||
|
input: 1,
|
||||||
|
output: 2,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 3,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
timestamp: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await runBtwSideQuestion({
|
||||||
|
cfg: {} as never,
|
||||||
|
agentDir: "/tmp/agent",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
question: "What changed?",
|
||||||
|
sessionEntry: createSessionEntry(),
|
||||||
|
resolvedReasoningLevel: "off",
|
||||||
|
opts: {},
|
||||||
|
isNewSession: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result).toEqual({ text: "Final answer." });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("fails when the current branch has no messages", async () => {
|
||||||
|
buildSessionContextMock.mockReturnValue({ messages: [] });
|
||||||
|
streamSimpleMock.mockReturnValue(makeAsyncEvents([]));
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
runBtwSideQuestion({
|
||||||
|
cfg: {} as never,
|
||||||
|
agentDir: "/tmp/agent",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
question: "What changed?",
|
||||||
|
sessionEntry: createSessionEntry(),
|
||||||
|
resolvedReasoningLevel: "off",
|
||||||
|
opts: {},
|
||||||
|
isNewSession: false,
|
||||||
|
}),
|
||||||
|
).rejects.toThrow("No active session context.");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("branches away from an unresolved trailing user turn before building BTW context", async () => {
|
||||||
|
getLeafEntryMock.mockReturnValue({
|
||||||
|
type: "message",
|
||||||
|
parentId: "assistant-1",
|
||||||
|
message: { role: "user" },
|
||||||
|
});
|
||||||
|
streamSimpleMock.mockReturnValue(
|
||||||
|
makeAsyncEvents([
|
||||||
|
{
|
||||||
|
type: "done",
|
||||||
|
reason: "stop",
|
||||||
|
message: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "323" }],
|
||||||
|
provider: "anthropic",
|
||||||
|
api: "anthropic-messages",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
stopReason: "stop",
|
||||||
|
usage: {
|
||||||
|
input: 1,
|
||||||
|
output: 2,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 3,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
timestamp: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await runBtwSideQuestion({
|
||||||
|
cfg: {} as never,
|
||||||
|
agentDir: "/tmp/agent",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
question: "What is 17 * 19?",
|
||||||
|
sessionEntry: createSessionEntry(),
|
||||||
|
resolvedReasoningLevel: "off",
|
||||||
|
opts: {},
|
||||||
|
isNewSession: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(branchMock).toHaveBeenCalledWith("assistant-1");
|
||||||
|
expect(resetLeafMock).not.toHaveBeenCalled();
|
||||||
|
expect(buildSessionContextMock).toHaveBeenCalledTimes(1);
|
||||||
|
expect(result).toEqual({ text: "323" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("branches to the active run snapshot leaf when the session is busy", async () => {
|
||||||
|
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
|
||||||
|
transcriptLeafId: "assistant-seed",
|
||||||
|
});
|
||||||
|
streamSimpleMock.mockReturnValue(
|
||||||
|
makeAsyncEvents([
|
||||||
|
{
|
||||||
|
type: "done",
|
||||||
|
reason: "stop",
|
||||||
|
message: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "323" }],
|
||||||
|
provider: "anthropic",
|
||||||
|
api: "anthropic-messages",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
stopReason: "stop",
|
||||||
|
usage: {
|
||||||
|
input: 1,
|
||||||
|
output: 2,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 3,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
timestamp: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await runBtwSideQuestion({
|
||||||
|
cfg: {} as never,
|
||||||
|
agentDir: "/tmp/agent",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
question: "What is 17 * 19?",
|
||||||
|
sessionEntry: createSessionEntry(),
|
||||||
|
resolvedReasoningLevel: "off",
|
||||||
|
opts: {},
|
||||||
|
isNewSession: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(branchMock).toHaveBeenCalledWith("assistant-seed");
|
||||||
|
expect(getLeafEntryMock).not.toHaveBeenCalled();
|
||||||
|
expect(result).toEqual({ text: "323" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns the BTW answer and retries transcript persistence after a session lock", async () => {
|
||||||
|
acquireSessionWriteLockMock
|
||||||
|
.mockRejectedValueOnce(
|
||||||
|
new Error("session file locked (timeout 250ms): pid=123 /tmp/session.lock"),
|
||||||
|
)
|
||||||
|
.mockResolvedValueOnce({
|
||||||
|
release: vi.fn().mockResolvedValue(undefined),
|
||||||
|
});
|
||||||
|
streamSimpleMock.mockReturnValue(
|
||||||
|
makeAsyncEvents([
|
||||||
|
{
|
||||||
|
type: "done",
|
||||||
|
reason: "stop",
|
||||||
|
message: {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "323" }],
|
||||||
|
provider: "anthropic",
|
||||||
|
api: "anthropic-messages",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
stopReason: "stop",
|
||||||
|
usage: {
|
||||||
|
input: 1,
|
||||||
|
output: 2,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 3,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
timestamp: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await runBtwSideQuestion({
|
||||||
|
cfg: {} as never,
|
||||||
|
agentDir: "/tmp/agent",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "claude-sonnet-4-5",
|
||||||
|
question: "What is 17 * 19?",
|
||||||
|
sessionEntry: createSessionEntry(),
|
||||||
|
resolvedReasoningLevel: "off",
|
||||||
|
opts: {},
|
||||||
|
isNewSession: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result).toEqual({ text: "323" });
|
||||||
|
expect(waitForEmbeddedPiRunEndMock).toHaveBeenCalledWith("session-1", 30000);
|
||||||
|
await vi.waitFor(() => {
|
||||||
|
expect(appendCustomEntryMock).toHaveBeenCalledWith(
|
||||||
|
BTW_CUSTOM_TYPE,
|
||||||
|
expect.objectContaining({
|
||||||
|
question: "What is 17 * 19?",
|
||||||
|
answer: "323",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
426
src/agents/btw.ts
Normal file
426
src/agents/btw.ts
Normal file
@@ -0,0 +1,426 @@
|
|||||||
|
import {
|
||||||
|
streamSimple,
|
||||||
|
type Api,
|
||||||
|
type AssistantMessageEvent,
|
||||||
|
type ThinkingLevel as SimpleThinkingLevel,
|
||||||
|
type Message,
|
||||||
|
type Model,
|
||||||
|
} from "@mariozechner/pi-ai";
|
||||||
|
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||||
|
import type { ReasoningLevel, ThinkLevel } from "../auto-reply/thinking.js";
|
||||||
|
import type { GetReplyOptions, ReplyPayload } from "../auto-reply/types.js";
|
||||||
|
import type { OpenClawConfig } from "../config/config.js";
|
||||||
|
import {
|
||||||
|
resolveSessionFilePath,
|
||||||
|
resolveSessionFilePathOptions,
|
||||||
|
type SessionEntry,
|
||||||
|
} from "../config/sessions.js";
|
||||||
|
import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
|
||||||
|
import { getApiKeyForModel, requireApiKey } from "./model-auth.js";
|
||||||
|
import { ensureOpenClawModelsJson } from "./models-config.js";
|
||||||
|
import { EmbeddedBlockChunker, type BlockReplyChunking } from "./pi-embedded-block-chunker.js";
|
||||||
|
import { resolveModelWithRegistry } from "./pi-embedded-runner/model.js";
|
||||||
|
import {
|
||||||
|
getActiveEmbeddedRunSnapshot,
|
||||||
|
waitForEmbeddedPiRunEnd,
|
||||||
|
} from "./pi-embedded-runner/runs.js";
|
||||||
|
import { mapThinkingLevel } from "./pi-embedded-runner/utils.js";
|
||||||
|
import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js";
|
||||||
|
import { acquireSessionWriteLock } from "./session-write-lock.js";
|
||||||
|
|
||||||
|
const BTW_CUSTOM_TYPE = "openclaw:btw";
|
||||||
|
const BTW_PERSIST_TIMEOUT_MS = 250;
|
||||||
|
const BTW_PERSIST_RETRY_WAIT_MS = 30_000;
|
||||||
|
const BTW_PERSIST_RETRY_LOCK_MS = 10_000;
|
||||||
|
|
||||||
|
type SessionManagerLike = {
|
||||||
|
getLeafEntry?: () => {
|
||||||
|
id?: string;
|
||||||
|
type?: string;
|
||||||
|
parentId?: string | null;
|
||||||
|
message?: { role?: string };
|
||||||
|
} | null;
|
||||||
|
branch?: (parentId: string) => void;
|
||||||
|
resetLeaf?: () => void;
|
||||||
|
buildSessionContext: () => { messages?: unknown[] };
|
||||||
|
};
|
||||||
|
|
||||||
|
type BtwCustomEntryData = {
|
||||||
|
timestamp: number;
|
||||||
|
question: string;
|
||||||
|
answer: string;
|
||||||
|
provider: string;
|
||||||
|
model: string;
|
||||||
|
thinkingLevel: ThinkLevel | "off";
|
||||||
|
reasoningLevel: ReasoningLevel;
|
||||||
|
sessionKey?: string;
|
||||||
|
authProfileId?: string;
|
||||||
|
authProfileIdSource?: "auto" | "user";
|
||||||
|
usage?: unknown;
|
||||||
|
};
|
||||||
|
|
||||||
|
async function appendBtwCustomEntry(params: {
|
||||||
|
sessionFile: string;
|
||||||
|
timeoutMs: number;
|
||||||
|
entry: BtwCustomEntryData;
|
||||||
|
}) {
|
||||||
|
const lock = await acquireSessionWriteLock({
|
||||||
|
sessionFile: params.sessionFile,
|
||||||
|
timeoutMs: params.timeoutMs,
|
||||||
|
allowReentrant: false,
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
const persisted = SessionManager.open(params.sessionFile);
|
||||||
|
persisted.appendCustomEntry(BTW_CUSTOM_TYPE, params.entry);
|
||||||
|
} finally {
|
||||||
|
await lock.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function isSessionLockError(error: unknown): boolean {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
return message.includes("session file locked");
|
||||||
|
}
|
||||||
|
|
||||||
|
function deferBtwCustomEntryPersist(params: {
|
||||||
|
sessionId: string;
|
||||||
|
sessionFile: string;
|
||||||
|
entry: BtwCustomEntryData;
|
||||||
|
}) {
|
||||||
|
void (async () => {
|
||||||
|
try {
|
||||||
|
await waitForEmbeddedPiRunEnd(params.sessionId, BTW_PERSIST_RETRY_WAIT_MS);
|
||||||
|
await appendBtwCustomEntry({
|
||||||
|
sessionFile: params.sessionFile,
|
||||||
|
timeoutMs: BTW_PERSIST_RETRY_LOCK_MS,
|
||||||
|
entry: params.entry,
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
console.warn(`[btw] skipped transcript persistence: ${message}`);
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
}
|
||||||
|
|
||||||
|
function collectTextContent(content: Array<{ type?: string; text?: string }>): string {
|
||||||
|
return content
|
||||||
|
.filter((part): part is { type: "text"; text: string } => part.type === "text")
|
||||||
|
.map((part) => part.text)
|
||||||
|
.join("");
|
||||||
|
}
|
||||||
|
|
||||||
|
function collectThinkingContent(content: Array<{ type?: string; thinking?: string }>): string {
|
||||||
|
return content
|
||||||
|
.filter((part): part is { type: "thinking"; thinking: string } => part.type === "thinking")
|
||||||
|
.map((part) => part.thinking)
|
||||||
|
.join("");
|
||||||
|
}
|
||||||
|
|
||||||
|
function toSimpleContextMessages(messages: unknown[]): Message[] {
|
||||||
|
return messages.filter((message): message is Message => {
|
||||||
|
if (!message || typeof message !== "object") {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const role = (message as { role?: unknown }).role;
|
||||||
|
return role === "user" || role === "assistant" || role === "toolResult";
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveSimpleThinkingLevel(level?: ThinkLevel): SimpleThinkingLevel | undefined {
|
||||||
|
if (!level || level === "off") {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return mapThinkingLevel(level) as SimpleThinkingLevel;
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveSessionTranscriptPath(params: {
|
||||||
|
sessionId: string;
|
||||||
|
sessionEntry?: SessionEntry;
|
||||||
|
sessionKey?: string;
|
||||||
|
storePath?: string;
|
||||||
|
}): string | undefined {
|
||||||
|
try {
|
||||||
|
const agentId = params.sessionKey?.split(":")[1];
|
||||||
|
const pathOpts = resolveSessionFilePathOptions({
|
||||||
|
agentId,
|
||||||
|
storePath: params.storePath,
|
||||||
|
});
|
||||||
|
return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts);
|
||||||
|
} catch {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function resolveRuntimeModel(params: {
|
||||||
|
cfg: OpenClawConfig;
|
||||||
|
provider: string;
|
||||||
|
model: string;
|
||||||
|
agentDir: string;
|
||||||
|
sessionEntry?: SessionEntry;
|
||||||
|
sessionStore?: Record<string, SessionEntry>;
|
||||||
|
sessionKey?: string;
|
||||||
|
storePath?: string;
|
||||||
|
isNewSession: boolean;
|
||||||
|
}): Promise<{
|
||||||
|
model: Model<Api>;
|
||||||
|
authProfileId?: string;
|
||||||
|
authProfileIdSource?: "auto" | "user";
|
||||||
|
}> {
|
||||||
|
await ensureOpenClawModelsJson(params.cfg, params.agentDir);
|
||||||
|
const authStorage = discoverAuthStorage(params.agentDir);
|
||||||
|
const modelRegistry = discoverModels(authStorage, params.agentDir);
|
||||||
|
const model = resolveModelWithRegistry({
|
||||||
|
provider: params.provider,
|
||||||
|
modelId: params.model,
|
||||||
|
modelRegistry,
|
||||||
|
cfg: params.cfg,
|
||||||
|
});
|
||||||
|
if (!model) {
|
||||||
|
throw new Error(`Unknown model: ${params.provider}/${params.model}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const authProfileId = await resolveSessionAuthProfileOverride({
|
||||||
|
cfg: params.cfg,
|
||||||
|
provider: params.provider,
|
||||||
|
agentDir: params.agentDir,
|
||||||
|
sessionEntry: params.sessionEntry,
|
||||||
|
sessionStore: params.sessionStore,
|
||||||
|
sessionKey: params.sessionKey,
|
||||||
|
storePath: params.storePath,
|
||||||
|
isNewSession: params.isNewSession,
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
model,
|
||||||
|
authProfileId,
|
||||||
|
authProfileIdSource: params.sessionEntry?.authProfileOverrideSource,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
type RunBtwSideQuestionParams = {
|
||||||
|
cfg: OpenClawConfig;
|
||||||
|
agentDir: string;
|
||||||
|
provider: string;
|
||||||
|
model: string;
|
||||||
|
question: string;
|
||||||
|
sessionEntry: SessionEntry;
|
||||||
|
sessionStore?: Record<string, SessionEntry>;
|
||||||
|
sessionKey?: string;
|
||||||
|
storePath?: string;
|
||||||
|
resolvedThinkLevel?: ThinkLevel;
|
||||||
|
resolvedReasoningLevel: ReasoningLevel;
|
||||||
|
blockReplyChunking?: BlockReplyChunking;
|
||||||
|
resolvedBlockStreamingBreak?: "text_end" | "message_end";
|
||||||
|
opts?: GetReplyOptions;
|
||||||
|
isNewSession: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
|
export async function runBtwSideQuestion(
|
||||||
|
params: RunBtwSideQuestionParams,
|
||||||
|
): Promise<ReplyPayload | undefined> {
|
||||||
|
const sessionId = params.sessionEntry.sessionId?.trim();
|
||||||
|
if (!sessionId) {
|
||||||
|
throw new Error("No active session context.");
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionFile = resolveSessionTranscriptPath({
|
||||||
|
sessionId,
|
||||||
|
sessionEntry: params.sessionEntry,
|
||||||
|
sessionKey: params.sessionKey,
|
||||||
|
storePath: params.storePath,
|
||||||
|
});
|
||||||
|
if (!sessionFile) {
|
||||||
|
throw new Error("No active session transcript.");
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike;
|
||||||
|
const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId);
|
||||||
|
if (activeRunSnapshot) {
|
||||||
|
if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) {
|
||||||
|
sessionManager.branch(activeRunSnapshot.transcriptLeafId);
|
||||||
|
} else {
|
||||||
|
sessionManager.resetLeaf?.();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
const leafEntry = sessionManager.getLeafEntry?.();
|
||||||
|
if (leafEntry?.type === "message" && leafEntry.message?.role === "user") {
|
||||||
|
if (leafEntry.parentId && sessionManager.branch) {
|
||||||
|
sessionManager.branch(leafEntry.parentId);
|
||||||
|
} else {
|
||||||
|
sessionManager.resetLeaf?.();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const sessionContext = sessionManager.buildSessionContext();
|
||||||
|
const messages = toSimpleContextMessages(
|
||||||
|
Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
|
||||||
|
);
|
||||||
|
if (messages.length === 0) {
|
||||||
|
throw new Error("No active session context.");
|
||||||
|
}
|
||||||
|
|
||||||
|
const { model, authProfileId, authProfileIdSource } = await resolveRuntimeModel({
|
||||||
|
cfg: params.cfg,
|
||||||
|
provider: params.provider,
|
||||||
|
model: params.model,
|
||||||
|
agentDir: params.agentDir,
|
||||||
|
sessionEntry: params.sessionEntry,
|
||||||
|
sessionStore: params.sessionStore,
|
||||||
|
sessionKey: params.sessionKey,
|
||||||
|
storePath: params.storePath,
|
||||||
|
isNewSession: params.isNewSession,
|
||||||
|
});
|
||||||
|
const apiKeyInfo = await getApiKeyForModel({
|
||||||
|
model,
|
||||||
|
cfg: params.cfg,
|
||||||
|
profileId: authProfileId,
|
||||||
|
agentDir: params.agentDir,
|
||||||
|
});
|
||||||
|
const apiKey = requireApiKey(apiKeyInfo, model.provider);
|
||||||
|
|
||||||
|
const chunker =
|
||||||
|
params.opts?.onBlockReply && params.blockReplyChunking
|
||||||
|
? new EmbeddedBlockChunker(params.blockReplyChunking)
|
||||||
|
: undefined;
|
||||||
|
let emittedBlocks = 0;
|
||||||
|
let blockEmitChain: Promise<void> = Promise.resolve();
|
||||||
|
let answerText = "";
|
||||||
|
let reasoningText = "";
|
||||||
|
let assistantStarted = false;
|
||||||
|
let sawTextEvent = false;
|
||||||
|
|
||||||
|
const emitBlockChunk = async (text: string) => {
|
||||||
|
const trimmed = text.trim();
|
||||||
|
if (!trimmed || !params.opts?.onBlockReply) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
emittedBlocks += 1;
|
||||||
|
blockEmitChain = blockEmitChain.then(async () => {
|
||||||
|
await params.opts?.onBlockReply?.({ text });
|
||||||
|
});
|
||||||
|
await blockEmitChain;
|
||||||
|
};
|
||||||
|
|
||||||
|
const stream = streamSimple(
|
||||||
|
model,
|
||||||
|
{
|
||||||
|
messages: [
|
||||||
|
...messages,
|
||||||
|
{
|
||||||
|
role: "user",
|
||||||
|
content: [{ type: "text", text: params.question }],
|
||||||
|
timestamp: Date.now(),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
apiKey,
|
||||||
|
reasoning: resolveSimpleThinkingLevel(params.resolvedThinkLevel),
|
||||||
|
signal: params.opts?.abortSignal,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let finalEvent:
|
||||||
|
| Extract<AssistantMessageEvent, { type: "done" }>
|
||||||
|
| Extract<AssistantMessageEvent, { type: "error" }>
|
||||||
|
| undefined;
|
||||||
|
|
||||||
|
for await (const event of stream) {
|
||||||
|
finalEvent = event.type === "done" || event.type === "error" ? event : finalEvent;
|
||||||
|
|
||||||
|
if (!assistantStarted && (event.type === "text_start" || event.type === "start")) {
|
||||||
|
assistantStarted = true;
|
||||||
|
await params.opts?.onAssistantMessageStart?.();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === "text_delta") {
|
||||||
|
sawTextEvent = true;
|
||||||
|
answerText += event.delta;
|
||||||
|
chunker?.append(event.delta);
|
||||||
|
if (chunker && params.resolvedBlockStreamingBreak === "text_end") {
|
||||||
|
chunker.drain({ force: false, emit: (chunk) => void emitBlockChunk(chunk) });
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === "text_end" && chunker && params.resolvedBlockStreamingBreak === "text_end") {
|
||||||
|
chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === "thinking_delta") {
|
||||||
|
reasoningText += event.delta;
|
||||||
|
if (params.resolvedReasoningLevel !== "off") {
|
||||||
|
await params.opts?.onReasoningStream?.({ text: reasoningText, isReasoning: true });
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === "thinking_end" && params.resolvedReasoningLevel !== "off") {
|
||||||
|
await params.opts?.onReasoningEnd?.();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (chunker && params.resolvedBlockStreamingBreak !== "text_end" && chunker.hasBuffered()) {
|
||||||
|
chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) });
|
||||||
|
}
|
||||||
|
await blockEmitChain;
|
||||||
|
|
||||||
|
if (finalEvent?.type === "error") {
|
||||||
|
const message = collectTextContent(finalEvent.error.content);
|
||||||
|
throw new Error(message || finalEvent.error.errorMessage || "BTW failed.");
|
||||||
|
}
|
||||||
|
|
||||||
|
const finalMessage = finalEvent?.type === "done" ? finalEvent.message : undefined;
|
||||||
|
if (finalMessage) {
|
||||||
|
if (!sawTextEvent) {
|
||||||
|
answerText = collectTextContent(finalMessage.content);
|
||||||
|
}
|
||||||
|
if (!reasoningText) {
|
||||||
|
reasoningText = collectThinkingContent(finalMessage.content);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const answer = answerText.trim();
|
||||||
|
if (!answer) {
|
||||||
|
throw new Error("No BTW response generated.");
|
||||||
|
}
|
||||||
|
|
||||||
|
const customEntry = {
|
||||||
|
timestamp: Date.now(),
|
||||||
|
question: params.question,
|
||||||
|
answer,
|
||||||
|
provider: model.provider,
|
||||||
|
model: model.id,
|
||||||
|
thinkingLevel: params.resolvedThinkLevel ?? "off",
|
||||||
|
reasoningLevel: params.resolvedReasoningLevel,
|
||||||
|
sessionKey: params.sessionKey,
|
||||||
|
authProfileId,
|
||||||
|
authProfileIdSource,
|
||||||
|
usage: finalMessage?.usage,
|
||||||
|
} satisfies BtwCustomEntryData;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await appendBtwCustomEntry({
|
||||||
|
sessionFile,
|
||||||
|
timeoutMs: BTW_PERSIST_TIMEOUT_MS,
|
||||||
|
entry: customEntry,
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
if (!isSessionLockError(error)) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
deferBtwCustomEntryPersist({
|
||||||
|
sessionId,
|
||||||
|
sessionFile,
|
||||||
|
entry: customEntry,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (emittedBlocks > 0) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { text: answer };
|
||||||
|
}
|
||||||
|
|
||||||
|
export { BTW_CUSTOM_TYPE };
|
||||||
@@ -111,6 +111,7 @@ import {
|
|||||||
clearActiveEmbeddedRun,
|
clearActiveEmbeddedRun,
|
||||||
type EmbeddedPiQueueHandle,
|
type EmbeddedPiQueueHandle,
|
||||||
setActiveEmbeddedRun,
|
setActiveEmbeddedRun,
|
||||||
|
updateActiveEmbeddedRunSnapshot,
|
||||||
} from "../runs.js";
|
} from "../runs.js";
|
||||||
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
|
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
|
||||||
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
|
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
|
||||||
@@ -2376,6 +2377,10 @@ export async function runEmbeddedAttempt(
|
|||||||
`runId=${params.runId} sessionId=${params.sessionId}`,
|
`runId=${params.runId} sessionId=${params.sessionId}`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
updateActiveEmbeddedRunSnapshot(params.sessionId, {
|
||||||
|
transcriptLeafId:
|
||||||
|
(sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null,
|
||||||
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Idempotent cleanup for legacy sessions with persisted image payloads.
|
// Idempotent cleanup for legacy sessions with persisted image payloads.
|
||||||
|
|||||||
@@ -4,7 +4,9 @@ import {
|
|||||||
__testing,
|
__testing,
|
||||||
abortEmbeddedPiRun,
|
abortEmbeddedPiRun,
|
||||||
clearActiveEmbeddedRun,
|
clearActiveEmbeddedRun,
|
||||||
|
getActiveEmbeddedRunSnapshot,
|
||||||
setActiveEmbeddedRun,
|
setActiveEmbeddedRun,
|
||||||
|
updateActiveEmbeddedRunSnapshot,
|
||||||
waitForActiveEmbeddedRuns,
|
waitForActiveEmbeddedRuns,
|
||||||
} from "./runs.js";
|
} from "./runs.js";
|
||||||
|
|
||||||
@@ -137,4 +139,24 @@ describe("pi-embedded runner run registry", () => {
|
|||||||
runsB.__testing.resetActiveEmbeddedRuns();
|
runsB.__testing.resetActiveEmbeddedRuns();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("tracks and clears per-session transcript snapshots for active runs", () => {
|
||||||
|
const handle = {
|
||||||
|
queueMessage: async () => {},
|
||||||
|
isStreaming: () => true,
|
||||||
|
isCompacting: () => false,
|
||||||
|
abort: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
setActiveEmbeddedRun("session-snapshot", handle);
|
||||||
|
updateActiveEmbeddedRunSnapshot("session-snapshot", {
|
||||||
|
transcriptLeafId: "assistant-1",
|
||||||
|
});
|
||||||
|
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toEqual({
|
||||||
|
transcriptLeafId: "assistant-1",
|
||||||
|
});
|
||||||
|
|
||||||
|
clearActiveEmbeddedRun("session-snapshot", handle);
|
||||||
|
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toBeUndefined();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -12,6 +12,10 @@ type EmbeddedPiQueueHandle = {
|
|||||||
abort: () => void;
|
abort: () => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export type ActiveEmbeddedRunSnapshot = {
|
||||||
|
transcriptLeafId: string | null;
|
||||||
|
};
|
||||||
|
|
||||||
type EmbeddedRunWaiter = {
|
type EmbeddedRunWaiter = {
|
||||||
resolve: (ended: boolean) => void;
|
resolve: (ended: boolean) => void;
|
||||||
timer: NodeJS.Timeout;
|
timer: NodeJS.Timeout;
|
||||||
@@ -25,9 +29,11 @@ const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
|
|||||||
|
|
||||||
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
|
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
|
||||||
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
|
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
|
||||||
|
snapshots: new Map<string, ActiveEmbeddedRunSnapshot>(),
|
||||||
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
|
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
|
||||||
}));
|
}));
|
||||||
const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns;
|
const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns;
|
||||||
|
const ACTIVE_EMBEDDED_RUN_SNAPSHOTS = embeddedRunState.snapshots;
|
||||||
const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters;
|
const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters;
|
||||||
|
|
||||||
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
|
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
|
||||||
@@ -135,6 +141,12 @@ export function getActiveEmbeddedRunCount(): number {
|
|||||||
return ACTIVE_EMBEDDED_RUNS.size;
|
return ACTIVE_EMBEDDED_RUNS.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function getActiveEmbeddedRunSnapshot(
|
||||||
|
sessionId: string,
|
||||||
|
): ActiveEmbeddedRunSnapshot | undefined {
|
||||||
|
return ACTIVE_EMBEDDED_RUN_SNAPSHOTS.get(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for active embedded runs to drain.
|
* Wait for active embedded runs to drain.
|
||||||
*
|
*
|
||||||
@@ -230,6 +242,16 @@ export function setActiveEmbeddedRun(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function updateActiveEmbeddedRunSnapshot(
|
||||||
|
sessionId: string,
|
||||||
|
snapshot: ActiveEmbeddedRunSnapshot,
|
||||||
|
) {
|
||||||
|
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.set(sessionId, snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
export function clearActiveEmbeddedRun(
|
export function clearActiveEmbeddedRun(
|
||||||
sessionId: string,
|
sessionId: string,
|
||||||
handle: EmbeddedPiQueueHandle,
|
handle: EmbeddedPiQueueHandle,
|
||||||
@@ -237,6 +259,7 @@ export function clearActiveEmbeddedRun(
|
|||||||
) {
|
) {
|
||||||
if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) {
|
if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) {
|
||||||
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
|
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
|
||||||
|
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.delete(sessionId);
|
||||||
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" });
|
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" });
|
||||||
if (!sessionId.startsWith("probe-")) {
|
if (!sessionId.startsWith("probe-")) {
|
||||||
diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
|
diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
|
||||||
@@ -257,6 +280,7 @@ export const __testing = {
|
|||||||
}
|
}
|
||||||
EMBEDDED_RUN_WAITERS.clear();
|
EMBEDDED_RUN_WAITERS.clear();
|
||||||
ACTIVE_EMBEDDED_RUNS.clear();
|
ACTIVE_EMBEDDED_RUNS.clear();
|
||||||
|
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.clear();
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -196,6 +196,14 @@ function buildChatCommands(): ChatCommandDefinition[] {
|
|||||||
acceptsArgs: true,
|
acceptsArgs: true,
|
||||||
category: "status",
|
category: "status",
|
||||||
}),
|
}),
|
||||||
|
defineChatCommand({
|
||||||
|
key: "btw",
|
||||||
|
nativeName: "btw",
|
||||||
|
description: "Ask a side question without changing future session context.",
|
||||||
|
textAlias: "/btw",
|
||||||
|
acceptsArgs: true,
|
||||||
|
category: "tools",
|
||||||
|
}),
|
||||||
defineChatCommand({
|
defineChatCommand({
|
||||||
key: "export-session",
|
key: "export-session",
|
||||||
nativeName: "export-session",
|
nativeName: "export-session",
|
||||||
|
|||||||
93
src/auto-reply/reply/commands-btw.test.ts
Normal file
93
src/auto-reply/reply/commands-btw.test.ts
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
import { describe, expect, it, vi, beforeEach } from "vitest";
|
||||||
|
import type { OpenClawConfig } from "../../config/config.js";
|
||||||
|
import { buildCommandTestParams } from "./commands.test-harness.js";
|
||||||
|
|
||||||
|
const runBtwSideQuestionMock = vi.fn();
|
||||||
|
|
||||||
|
vi.mock("../../agents/btw.js", () => ({
|
||||||
|
runBtwSideQuestion: (...args: unknown[]) => runBtwSideQuestionMock(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
const { handleBtwCommand } = await import("./commands-btw.js");
|
||||||
|
|
||||||
|
function buildParams(commandBody: string) {
|
||||||
|
const cfg = {
|
||||||
|
commands: { text: true },
|
||||||
|
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||||
|
} as OpenClawConfig;
|
||||||
|
return buildCommandTestParams(commandBody, cfg, undefined, { workspaceDir: "/tmp/workspace" });
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("handleBtwCommand", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
runBtwSideQuestionMock.mockReset();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns usage when the side question is missing", async () => {
|
||||||
|
const result = await handleBtwCommand(buildParams("/btw"), true);
|
||||||
|
|
||||||
|
expect(result).toEqual({
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: { text: "Usage: /btw <side question>" },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("requires an active session context", async () => {
|
||||||
|
const params = buildParams("/btw what changed?");
|
||||||
|
params.sessionEntry = undefined;
|
||||||
|
|
||||||
|
const result = await handleBtwCommand(params, true);
|
||||||
|
|
||||||
|
expect(result).toEqual({
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: { text: "⚠️ /btw requires an active session with existing context." },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("still delegates while the session is actively running", async () => {
|
||||||
|
const params = buildParams("/btw what changed?");
|
||||||
|
params.agentDir = "/tmp/agent";
|
||||||
|
params.sessionEntry = {
|
||||||
|
sessionId: "session-1",
|
||||||
|
updatedAt: Date.now(),
|
||||||
|
};
|
||||||
|
runBtwSideQuestionMock.mockResolvedValue({ text: "snapshot answer" });
|
||||||
|
|
||||||
|
const result = await handleBtwCommand(params, true);
|
||||||
|
|
||||||
|
expect(runBtwSideQuestionMock).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
question: "what changed?",
|
||||||
|
sessionEntry: params.sessionEntry,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
expect(result).toEqual({
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: { text: "snapshot answer" },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("delegates to the side-question runner", async () => {
|
||||||
|
const params = buildParams("/btw what changed?");
|
||||||
|
params.agentDir = "/tmp/agent";
|
||||||
|
params.sessionEntry = {
|
||||||
|
sessionId: "session-1",
|
||||||
|
updatedAt: Date.now(),
|
||||||
|
};
|
||||||
|
runBtwSideQuestionMock.mockResolvedValue({ text: "nothing important" });
|
||||||
|
|
||||||
|
const result = await handleBtwCommand(params, true);
|
||||||
|
|
||||||
|
expect(runBtwSideQuestionMock).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
question: "what changed?",
|
||||||
|
agentDir: "/tmp/agent",
|
||||||
|
sessionEntry: params.sessionEntry,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
expect(result).toEqual({
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: { text: "nothing important" },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
67
src/auto-reply/reply/commands-btw.ts
Normal file
67
src/auto-reply/reply/commands-btw.ts
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
import { runBtwSideQuestion } from "../../agents/btw.js";
|
||||||
|
import type { CommandHandler } from "./commands-types.js";
|
||||||
|
|
||||||
|
const BTW_USAGE = "Usage: /btw <side question>";
|
||||||
|
|
||||||
|
export const handleBtwCommand: CommandHandler = async (params) => {
|
||||||
|
const match = params.command.commandBodyNormalized.match(/^\/btw(?:\s+(.*))?$/i);
|
||||||
|
if (!match) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const question = match[1]?.trim() ?? "";
|
||||||
|
if (!question) {
|
||||||
|
return {
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: { text: BTW_USAGE },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!params.sessionEntry?.sessionId) {
|
||||||
|
return {
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: { text: "⚠️ /btw requires an active session with existing context." },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!params.agentDir) {
|
||||||
|
return {
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: {
|
||||||
|
text: "⚠️ /btw is unavailable because the active agent directory could not be resolved.",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const reply = await runBtwSideQuestion({
|
||||||
|
cfg: params.cfg,
|
||||||
|
agentDir: params.agentDir,
|
||||||
|
provider: params.provider,
|
||||||
|
model: params.model,
|
||||||
|
question,
|
||||||
|
sessionEntry: params.sessionEntry,
|
||||||
|
sessionStore: params.sessionStore,
|
||||||
|
sessionKey: params.sessionKey,
|
||||||
|
storePath: params.storePath,
|
||||||
|
resolvedThinkLevel: params.resolvedThinkLevel,
|
||||||
|
resolvedReasoningLevel: params.resolvedReasoningLevel,
|
||||||
|
blockReplyChunking: params.blockReplyChunking,
|
||||||
|
resolvedBlockStreamingBreak: params.resolvedBlockStreamingBreak,
|
||||||
|
opts: params.opts,
|
||||||
|
isNewSession: false,
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
shouldContinue: false,
|
||||||
|
reply,
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
const message = error instanceof Error ? error.message.trim() : "";
|
||||||
|
return {
|
||||||
|
shouldContinue: false,
|
||||||
|
reply: {
|
||||||
|
text: `⚠️ /btw failed${message ? `: ${message}` : "."}`,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
@@ -11,6 +11,7 @@ import { resolveBoundAcpThreadSessionKey } from "./commands-acp/targets.js";
|
|||||||
import { handleAllowlistCommand } from "./commands-allowlist.js";
|
import { handleAllowlistCommand } from "./commands-allowlist.js";
|
||||||
import { handleApproveCommand } from "./commands-approve.js";
|
import { handleApproveCommand } from "./commands-approve.js";
|
||||||
import { handleBashCommand } from "./commands-bash.js";
|
import { handleBashCommand } from "./commands-bash.js";
|
||||||
|
import { handleBtwCommand } from "./commands-btw.js";
|
||||||
import { handleCompactCommand } from "./commands-compact.js";
|
import { handleCompactCommand } from "./commands-compact.js";
|
||||||
import { handleConfigCommand, handleDebugCommand } from "./commands-config.js";
|
import { handleConfigCommand, handleDebugCommand } from "./commands-config.js";
|
||||||
import {
|
import {
|
||||||
@@ -174,6 +175,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise<Comm
|
|||||||
HANDLERS = [
|
HANDLERS = [
|
||||||
// Plugin commands are processed first, before built-in commands
|
// Plugin commands are processed first, before built-in commands
|
||||||
handlePluginCommand,
|
handlePluginCommand,
|
||||||
|
handleBtwCommand,
|
||||||
handleBashCommand,
|
handleBashCommand,
|
||||||
handleActivationCommand,
|
handleActivationCommand,
|
||||||
handleSendPolicyCommand,
|
handleSendPolicyCommand,
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import type { OpenClawConfig } from "../../config/config.js";
|
|||||||
import type { SessionEntry, SessionScope } from "../../config/sessions.js";
|
import type { SessionEntry, SessionScope } from "../../config/sessions.js";
|
||||||
import type { MsgContext } from "../templating.js";
|
import type { MsgContext } from "../templating.js";
|
||||||
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js";
|
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js";
|
||||||
import type { ReplyPayload } from "../types.js";
|
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||||
import type { InlineDirectives } from "./directive-handling.js";
|
import type { InlineDirectives } from "./directive-handling.js";
|
||||||
|
|
||||||
export type CommandContext = {
|
export type CommandContext = {
|
||||||
@@ -44,11 +44,19 @@ export type HandleCommandsParams = {
|
|||||||
storePath?: string;
|
storePath?: string;
|
||||||
sessionScope?: SessionScope;
|
sessionScope?: SessionScope;
|
||||||
workspaceDir: string;
|
workspaceDir: string;
|
||||||
|
opts?: GetReplyOptions;
|
||||||
defaultGroupActivation: () => "always" | "mention";
|
defaultGroupActivation: () => "always" | "mention";
|
||||||
resolvedThinkLevel?: ThinkLevel;
|
resolvedThinkLevel?: ThinkLevel;
|
||||||
resolvedVerboseLevel: VerboseLevel;
|
resolvedVerboseLevel: VerboseLevel;
|
||||||
resolvedReasoningLevel: ReasoningLevel;
|
resolvedReasoningLevel: ReasoningLevel;
|
||||||
resolvedElevatedLevel?: ElevatedLevel;
|
resolvedElevatedLevel?: ElevatedLevel;
|
||||||
|
blockReplyChunking?: {
|
||||||
|
minChars: number;
|
||||||
|
maxChars: number;
|
||||||
|
breakPreference: "paragraph" | "newline" | "sentence";
|
||||||
|
flushOnParagraph?: boolean;
|
||||||
|
};
|
||||||
|
resolvedBlockStreamingBreak?: "text_end" | "message_end";
|
||||||
resolveDefaultThinkingLevel: () => Promise<ThinkLevel | undefined>;
|
resolveDefaultThinkingLevel: () => Promise<ThinkLevel | undefined>;
|
||||||
provider: string;
|
provider: string;
|
||||||
model: string;
|
model: string;
|
||||||
|
|||||||
@@ -113,6 +113,13 @@ export async function handleInlineActions(params: {
|
|||||||
resolvedVerboseLevel: VerboseLevel | undefined;
|
resolvedVerboseLevel: VerboseLevel | undefined;
|
||||||
resolvedReasoningLevel: ReasoningLevel;
|
resolvedReasoningLevel: ReasoningLevel;
|
||||||
resolvedElevatedLevel: ElevatedLevel;
|
resolvedElevatedLevel: ElevatedLevel;
|
||||||
|
blockReplyChunking?: {
|
||||||
|
minChars: number;
|
||||||
|
maxChars: number;
|
||||||
|
breakPreference: "paragraph" | "newline" | "sentence";
|
||||||
|
flushOnParagraph?: boolean;
|
||||||
|
};
|
||||||
|
resolvedBlockStreamingBreak?: "text_end" | "message_end";
|
||||||
resolveDefaultThinkingLevel: Awaited<
|
resolveDefaultThinkingLevel: Awaited<
|
||||||
ReturnType<typeof createModelSelectionState>
|
ReturnType<typeof createModelSelectionState>
|
||||||
>["resolveDefaultThinkingLevel"];
|
>["resolveDefaultThinkingLevel"];
|
||||||
@@ -152,6 +159,8 @@ export async function handleInlineActions(params: {
|
|||||||
resolvedVerboseLevel,
|
resolvedVerboseLevel,
|
||||||
resolvedReasoningLevel,
|
resolvedReasoningLevel,
|
||||||
resolvedElevatedLevel,
|
resolvedElevatedLevel,
|
||||||
|
blockReplyChunking,
|
||||||
|
resolvedBlockStreamingBreak,
|
||||||
resolveDefaultThinkingLevel,
|
resolveDefaultThinkingLevel,
|
||||||
provider,
|
provider,
|
||||||
model,
|
model,
|
||||||
@@ -357,11 +366,14 @@ export async function handleInlineActions(params: {
|
|||||||
storePath,
|
storePath,
|
||||||
sessionScope,
|
sessionScope,
|
||||||
workspaceDir,
|
workspaceDir,
|
||||||
|
opts,
|
||||||
defaultGroupActivation: defaultActivation,
|
defaultGroupActivation: defaultActivation,
|
||||||
resolvedThinkLevel,
|
resolvedThinkLevel,
|
||||||
resolvedVerboseLevel: resolvedVerboseLevel ?? "off",
|
resolvedVerboseLevel: resolvedVerboseLevel ?? "off",
|
||||||
resolvedReasoningLevel,
|
resolvedReasoningLevel,
|
||||||
resolvedElevatedLevel,
|
resolvedElevatedLevel,
|
||||||
|
blockReplyChunking,
|
||||||
|
resolvedBlockStreamingBreak,
|
||||||
resolveDefaultThinkingLevel,
|
resolveDefaultThinkingLevel,
|
||||||
provider,
|
provider,
|
||||||
model,
|
model,
|
||||||
|
|||||||
@@ -332,6 +332,8 @@ export async function getReplyFromConfig(
|
|||||||
resolvedVerboseLevel,
|
resolvedVerboseLevel,
|
||||||
resolvedReasoningLevel,
|
resolvedReasoningLevel,
|
||||||
resolvedElevatedLevel,
|
resolvedElevatedLevel,
|
||||||
|
blockReplyChunking,
|
||||||
|
resolvedBlockStreamingBreak,
|
||||||
resolveDefaultThinkingLevel: modelState.resolveDefaultThinkingLevel,
|
resolveDefaultThinkingLevel: modelState.resolveDefaultThinkingLevel,
|
||||||
provider,
|
provider,
|
||||||
model,
|
model,
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ const RESERVED_COMMANDS = new Set([
|
|||||||
"status",
|
"status",
|
||||||
"whoami",
|
"whoami",
|
||||||
"context",
|
"context",
|
||||||
|
"btw",
|
||||||
// Session management
|
// Session management
|
||||||
"stop",
|
"stop",
|
||||||
"restart",
|
"restart",
|
||||||
|
|||||||
Reference in New Issue
Block a user