mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-11 08:01:40 +00:00
refactor(outbound,agents): extract shared payload and queue helpers
This commit is contained in:
@@ -297,6 +297,42 @@ function shouldEnableOpenAIResponsesServerCompaction(
|
|||||||
return model.provider === "openai";
|
return model.provider === "openai";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function shouldStripResponsesStore(
|
||||||
|
model: { api?: unknown; compat?: { supportsStore?: boolean } },
|
||||||
|
forceStore: boolean,
|
||||||
|
): boolean {
|
||||||
|
if (forceStore) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (typeof model.api !== "string") {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return OPENAI_RESPONSES_APIS.has(model.api) && model.compat?.supportsStore === false;
|
||||||
|
}
|
||||||
|
|
||||||
|
function applyOpenAIResponsesPayloadOverrides(params: {
|
||||||
|
payloadObj: Record<string, unknown>;
|
||||||
|
forceStore: boolean;
|
||||||
|
stripStore: boolean;
|
||||||
|
useServerCompaction: boolean;
|
||||||
|
compactThreshold: number;
|
||||||
|
}): void {
|
||||||
|
if (params.forceStore) {
|
||||||
|
params.payloadObj.store = true;
|
||||||
|
}
|
||||||
|
if (params.stripStore) {
|
||||||
|
delete params.payloadObj.store;
|
||||||
|
}
|
||||||
|
if (params.useServerCompaction && params.payloadObj.context_management === undefined) {
|
||||||
|
params.payloadObj.context_management = [
|
||||||
|
{
|
||||||
|
type: "compaction",
|
||||||
|
compact_threshold: params.compactThreshold,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function createOpenAIResponsesContextManagementWrapper(
|
function createOpenAIResponsesContextManagementWrapper(
|
||||||
baseStreamFn: StreamFn | undefined,
|
baseStreamFn: StreamFn | undefined,
|
||||||
extraParams: Record<string, unknown> | undefined,
|
extraParams: Record<string, unknown> | undefined,
|
||||||
@@ -308,10 +344,7 @@ function createOpenAIResponsesContextManagementWrapper(
|
|||||||
// Strip `store` from the payload when the model declares supportsStore=false.
|
// Strip `store` from the payload when the model declares supportsStore=false.
|
||||||
// pi-ai upstream hardcodes `store: false` for Responses API; strict
|
// pi-ai upstream hardcodes `store: false` for Responses API; strict
|
||||||
// OpenAI-compatible endpoints (e.g. Gemini via Cloudflare) reject it.
|
// OpenAI-compatible endpoints (e.g. Gemini via Cloudflare) reject it.
|
||||||
const stripStore =
|
const stripStore = shouldStripResponsesStore(model, forceStore);
|
||||||
!forceStore &&
|
|
||||||
OPENAI_RESPONSES_APIS.has(String(model.api ?? "")) &&
|
|
||||||
(model as { compat?: { supportsStore?: boolean } }).compat?.supportsStore === false;
|
|
||||||
if (!forceStore && !useServerCompaction && !stripStore) {
|
if (!forceStore && !useServerCompaction && !stripStore) {
|
||||||
return underlying(model, context, options);
|
return underlying(model, context, options);
|
||||||
}
|
}
|
||||||
@@ -324,21 +357,13 @@ function createOpenAIResponsesContextManagementWrapper(
|
|||||||
...options,
|
...options,
|
||||||
onPayload: (payload) => {
|
onPayload: (payload) => {
|
||||||
if (payload && typeof payload === "object") {
|
if (payload && typeof payload === "object") {
|
||||||
const payloadObj = payload as Record<string, unknown>;
|
applyOpenAIResponsesPayloadOverrides({
|
||||||
if (forceStore) {
|
payloadObj: payload as Record<string, unknown>,
|
||||||
payloadObj.store = true;
|
forceStore,
|
||||||
}
|
stripStore,
|
||||||
if (stripStore) {
|
useServerCompaction,
|
||||||
delete payloadObj.store;
|
compactThreshold,
|
||||||
}
|
});
|
||||||
if (useServerCompaction && payloadObj.context_management === undefined) {
|
|
||||||
payloadObj.context_management = [
|
|
||||||
{
|
|
||||||
type: "compaction",
|
|
||||||
compact_threshold: compactThreshold,
|
|
||||||
},
|
|
||||||
];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
originalOnPayload?.(payload);
|
originalOnPayload?.(payload);
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -67,6 +67,34 @@ function resolveFailedDir(stateDir?: string): string {
|
|||||||
return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME);
|
return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function resolveQueueEntryPaths(
|
||||||
|
id: string,
|
||||||
|
stateDir?: string,
|
||||||
|
): {
|
||||||
|
jsonPath: string;
|
||||||
|
deliveredPath: string;
|
||||||
|
} {
|
||||||
|
const queueDir = resolveQueueDir(stateDir);
|
||||||
|
return {
|
||||||
|
jsonPath: path.join(queueDir, `${id}.json`),
|
||||||
|
deliveredPath: path.join(queueDir, `${id}.delivered`),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function getErrnoCode(err: unknown): string | null {
|
||||||
|
return err && typeof err === "object" && "code" in err
|
||||||
|
? String((err as { code?: unknown }).code)
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function unlinkBestEffort(filePath: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
await fs.promises.unlink(filePath);
|
||||||
|
} catch {
|
||||||
|
// Best-effort cleanup.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Ensure the queue directory (and failed/ subdirectory) exist. */
|
/** Ensure the queue directory (and failed/ subdirectory) exist. */
|
||||||
export async function ensureQueueDir(stateDir?: string): Promise<string> {
|
export async function ensureQueueDir(stateDir?: string): Promise<string> {
|
||||||
const queueDir = resolveQueueDir(stateDir);
|
const queueDir = resolveQueueDir(stateDir);
|
||||||
@@ -117,35 +145,22 @@ export async function enqueueDelivery(
|
|||||||
* by {@link loadPendingDeliveries} on the next startup without re-sending.
|
* by {@link loadPendingDeliveries} on the next startup without re-sending.
|
||||||
*/
|
*/
|
||||||
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
|
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
|
||||||
const queueDir = resolveQueueDir(stateDir);
|
const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir);
|
||||||
const jsonPath = path.join(queueDir, `${id}.json`);
|
|
||||||
const deliveredPath = path.join(queueDir, `${id}.delivered`);
|
|
||||||
try {
|
try {
|
||||||
// Phase 1: atomic rename marks the delivery as complete.
|
// Phase 1: atomic rename marks the delivery as complete.
|
||||||
await fs.promises.rename(jsonPath, deliveredPath);
|
await fs.promises.rename(jsonPath, deliveredPath);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const code =
|
const code = getErrnoCode(err);
|
||||||
err && typeof err === "object" && "code" in err
|
|
||||||
? String((err as { code?: unknown }).code)
|
|
||||||
: null;
|
|
||||||
if (code === "ENOENT") {
|
if (code === "ENOENT") {
|
||||||
// .json already gone — may have been renamed by a previous ack attempt.
|
// .json already gone — may have been renamed by a previous ack attempt.
|
||||||
// Try to clean up a leftover .delivered marker if present.
|
// Try to clean up a leftover .delivered marker if present.
|
||||||
try {
|
await unlinkBestEffort(deliveredPath);
|
||||||
await fs.promises.unlink(deliveredPath);
|
|
||||||
} catch {
|
|
||||||
// marker already gone — no-op.
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
// Phase 2: remove the marker file.
|
// Phase 2: remove the marker file.
|
||||||
try {
|
await unlinkBestEffort(deliveredPath);
|
||||||
await fs.promises.unlink(deliveredPath);
|
|
||||||
} catch {
|
|
||||||
// Best-effort; loadPendingDeliveries will clean it up on next startup.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Update a queue entry after a failed delivery attempt. */
|
/** Update a queue entry after a failed delivery attempt. */
|
||||||
@@ -171,10 +186,7 @@ export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDe
|
|||||||
try {
|
try {
|
||||||
files = await fs.promises.readdir(queueDir);
|
files = await fs.promises.readdir(queueDir);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const code =
|
const code = getErrnoCode(err);
|
||||||
err && typeof err === "object" && "code" in err
|
|
||||||
? String((err as { code?: unknown }).code)
|
|
||||||
: null;
|
|
||||||
if (code === "ENOENT") {
|
if (code === "ENOENT") {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
@@ -186,11 +198,7 @@ export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDe
|
|||||||
if (!file.endsWith(".delivered")) {
|
if (!file.endsWith(".delivered")) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
await unlinkBestEffort(path.join(queueDir, file));
|
||||||
await fs.promises.unlink(path.join(queueDir, file));
|
|
||||||
} catch {
|
|
||||||
// Best-effort cleanup.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const entries: QueuedDelivery[] = [];
|
const entries: QueuedDelivery[] = [];
|
||||||
|
|||||||
Reference in New Issue
Block a user