fix(otel): complete diagnostics-otel OpenTelemetry v2 API migration (#12897)

* fix(otel): complete diagnostics-otel OpenTelemetry v2 API migration

* chore(format): align otel files with updated oxfmt config

* chore(format): apply updated oxfmt spacing to otel diagnostics
This commit is contained in:
Vincent Koc
2026-02-19 02:36:47 -08:00
committed by GitHub
parent 1faa7a87a0
commit de656e3194
3 changed files with 219 additions and 148 deletions

View File

@@ -70,7 +70,6 @@ vi.mock("@opentelemetry/exporter-logs-otlp-http", () => ({
vi.mock("@opentelemetry/sdk-logs", () => ({ vi.mock("@opentelemetry/sdk-logs", () => ({
BatchLogRecordProcessor: class {}, BatchLogRecordProcessor: class {},
LoggerProvider: class { LoggerProvider: class {
addLogRecordProcessor = vi.fn();
getLogger = vi.fn(() => ({ getLogger = vi.fn(() => ({
emit: logEmit, emit: logEmit,
})); }));
@@ -96,9 +95,7 @@ vi.mock("@opentelemetry/resources", () => ({
})); }));
vi.mock("@opentelemetry/semantic-conventions", () => ({ vi.mock("@opentelemetry/semantic-conventions", () => ({
SemanticResourceAttributes: { ATTR_SERVICE_NAME: "service.name",
SERVICE_NAME: "service.name",
},
})); }));
vi.mock("openclaw/plugin-sdk", async () => { vi.mock("openclaw/plugin-sdk", async () => {

View File

@@ -8,7 +8,7 @@ import { BatchLogRecordProcessor, LoggerProvider } from "@opentelemetry/sdk-logs
import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"; import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics";
import { NodeSDK } from "@opentelemetry/sdk-node"; import { NodeSDK } from "@opentelemetry/sdk-node";
import { ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base"; import { ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions"; import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
import type { DiagnosticEventPayload, OpenClawPluginService } from "openclaw/plugin-sdk"; import type { DiagnosticEventPayload, OpenClawPluginService } from "openclaw/plugin-sdk";
import { onDiagnosticEvent, registerLogTransport } from "openclaw/plugin-sdk"; import { onDiagnosticEvent, registerLogTransport } from "openclaw/plugin-sdk";
@@ -40,6 +40,20 @@ function resolveSampleRate(value: number | undefined): number | undefined {
return value; return value;
} }
function formatError(err: unknown): string {
if (err instanceof Error) {
return err.stack ?? err.message;
}
if (typeof err === "string") {
return err;
}
try {
return JSON.stringify(err);
} catch {
return String(err);
}
}
export function createDiagnosticsOtelService(): OpenClawPluginService { export function createDiagnosticsOtelService(): OpenClawPluginService {
let sdk: NodeSDK | null = null; let sdk: NodeSDK | null = null;
let logProvider: LoggerProvider | null = null; let logProvider: LoggerProvider | null = null;
@@ -75,7 +89,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
} }
const resource = resourceFromAttributes({ const resource = resourceFromAttributes({
[SemanticResourceAttributes.SERVICE_NAME]: serviceName, [ATTR_SERVICE_NAME]: serviceName,
}); });
const traceUrl = resolveOtelUrl(endpoint, "v1/traces"); const traceUrl = resolveOtelUrl(endpoint, "v1/traces");
@@ -118,7 +132,12 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
: {}), : {}),
}); });
sdk.start(); try {
await sdk.start();
} catch (err) {
ctx.logger.error(`diagnostics-otel: failed to start SDK: ${formatError(err)}`);
throw err;
}
} }
const logSeverityMap: Record<string, SeverityNumber> = { const logSeverityMap: Record<string, SeverityNumber> = {
@@ -211,115 +230,122 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
...(logUrl ? { url: logUrl } : {}), ...(logUrl ? { url: logUrl } : {}),
...(headers ? { headers } : {}), ...(headers ? { headers } : {}),
}); });
const processor = new BatchLogRecordProcessor( const logProcessor = new BatchLogRecordProcessor(
logExporter, logExporter,
typeof otel.flushIntervalMs === "number" typeof otel.flushIntervalMs === "number"
? { scheduledDelayMillis: Math.max(1000, otel.flushIntervalMs) } ? { scheduledDelayMillis: Math.max(1000, otel.flushIntervalMs) }
: {}, : {},
); );
logProvider = new LoggerProvider({ resource, processors: [processor] }); logProvider = new LoggerProvider({
resource,
processors: [logProcessor],
});
const otelLogger = logProvider.getLogger("openclaw"); const otelLogger = logProvider.getLogger("openclaw");
stopLogTransport = registerLogTransport((logObj) => { stopLogTransport = registerLogTransport((logObj) => {
const safeStringify = (value: unknown) => { try {
try { const safeStringify = (value: unknown) => {
return JSON.stringify(value); try {
} catch { return JSON.stringify(value);
return String(value); } catch {
} return String(value);
};
const meta = (logObj as Record<string, unknown>)._meta as
| {
logLevelName?: string;
date?: Date;
name?: string;
parentNames?: string[];
path?: {
filePath?: string;
fileLine?: string;
fileColumn?: string;
filePathWithLine?: string;
method?: string;
};
} }
| undefined; };
const logLevelName = meta?.logLevelName ?? "INFO"; const meta = (logObj as Record<string, unknown>)._meta as
const severityNumber = logSeverityMap[logLevelName] ?? (9 as SeverityNumber); | {
logLevelName?: string;
date?: Date;
name?: string;
parentNames?: string[];
path?: {
filePath?: string;
fileLine?: string;
fileColumn?: string;
filePathWithLine?: string;
method?: string;
};
}
| undefined;
const logLevelName = meta?.logLevelName ?? "INFO";
const severityNumber = logSeverityMap[logLevelName] ?? (9 as SeverityNumber);
const numericArgs = Object.entries(logObj) const numericArgs = Object.entries(logObj)
.filter(([key]) => /^\d+$/.test(key)) .filter(([key]) => /^\d+$/.test(key))
.toSorted((a, b) => Number(a[0]) - Number(b[0])) .toSorted((a, b) => Number(a[0]) - Number(b[0]))
.map(([, value]) => value); .map(([, value]) => value);
let bindings: Record<string, unknown> | undefined; let bindings: Record<string, unknown> | undefined;
if (typeof numericArgs[0] === "string" && numericArgs[0].trim().startsWith("{")) { if (typeof numericArgs[0] === "string" && numericArgs[0].trim().startsWith("{")) {
try { try {
const parsed = JSON.parse(numericArgs[0]); const parsed = JSON.parse(numericArgs[0]);
if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) {
bindings = parsed as Record<string, unknown>; bindings = parsed as Record<string, unknown>;
numericArgs.shift(); numericArgs.shift();
} }
} catch { } catch {
// ignore malformed json bindings // ignore malformed json bindings
}
}
let message = "";
if (numericArgs.length > 0 && typeof numericArgs[numericArgs.length - 1] === "string") {
message = String(numericArgs.pop());
} else if (numericArgs.length === 1) {
message = safeStringify(numericArgs[0]);
numericArgs.length = 0;
}
if (!message) {
message = "log";
}
const attributes: Record<string, string | number | boolean> = {
"openclaw.log.level": logLevelName,
};
if (meta?.name) {
attributes["openclaw.logger"] = meta.name;
}
if (meta?.parentNames?.length) {
attributes["openclaw.logger.parents"] = meta.parentNames.join(".");
}
if (bindings) {
for (const [key, value] of Object.entries(bindings)) {
if (
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
attributes[`openclaw.${key}`] = value;
} else if (value != null) {
attributes[`openclaw.${key}`] = safeStringify(value);
} }
} }
}
if (numericArgs.length > 0) {
attributes["openclaw.log.args"] = safeStringify(numericArgs);
}
if (meta?.path?.filePath) {
attributes["code.filepath"] = meta.path.filePath;
}
if (meta?.path?.fileLine) {
attributes["code.lineno"] = Number(meta.path.fileLine);
}
if (meta?.path?.method) {
attributes["code.function"] = meta.path.method;
}
if (meta?.path?.filePathWithLine) {
attributes["openclaw.code.location"] = meta.path.filePathWithLine;
}
otelLogger.emit({ let message = "";
body: message, if (numericArgs.length > 0 && typeof numericArgs[numericArgs.length - 1] === "string") {
severityText: logLevelName, message = String(numericArgs.pop());
severityNumber, } else if (numericArgs.length === 1) {
attributes, message = safeStringify(numericArgs[0]);
timestamp: meta?.date ?? new Date(), numericArgs.length = 0;
}); }
if (!message) {
message = "log";
}
const attributes: Record<string, string | number | boolean> = {
"openclaw.log.level": logLevelName,
};
if (meta?.name) {
attributes["openclaw.logger"] = meta.name;
}
if (meta?.parentNames?.length) {
attributes["openclaw.logger.parents"] = meta.parentNames.join(".");
}
if (bindings) {
for (const [key, value] of Object.entries(bindings)) {
if (
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
attributes[`openclaw.${key}`] = value;
} else if (value != null) {
attributes[`openclaw.${key}`] = safeStringify(value);
}
}
}
if (numericArgs.length > 0) {
attributes["openclaw.log.args"] = safeStringify(numericArgs);
}
if (meta?.path?.filePath) {
attributes["code.filepath"] = meta.path.filePath;
}
if (meta?.path?.fileLine) {
attributes["code.lineno"] = Number(meta.path.fileLine);
}
if (meta?.path?.method) {
attributes["code.function"] = meta.path.method;
}
if (meta?.path?.filePathWithLine) {
attributes["openclaw.code.location"] = meta.path.filePathWithLine;
}
otelLogger.emit({
body: message,
severityText: logLevelName,
severityNumber,
attributes,
timestamp: meta?.date ?? new Date(),
});
} catch (err) {
ctx.logger.error(`diagnostics-otel: log transport failed: ${formatError(err)}`);
}
}); });
} }
@@ -572,43 +598,49 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
}; };
unsubscribe = onDiagnosticEvent((evt: DiagnosticEventPayload) => { unsubscribe = onDiagnosticEvent((evt: DiagnosticEventPayload) => {
switch (evt.type) { try {
case "model.usage": switch (evt.type) {
recordModelUsage(evt); case "model.usage":
return; recordModelUsage(evt);
case "webhook.received": return;
recordWebhookReceived(evt); case "webhook.received":
return; recordWebhookReceived(evt);
case "webhook.processed": return;
recordWebhookProcessed(evt); case "webhook.processed":
return; recordWebhookProcessed(evt);
case "webhook.error": return;
recordWebhookError(evt); case "webhook.error":
return; recordWebhookError(evt);
case "message.queued": return;
recordMessageQueued(evt); case "message.queued":
return; recordMessageQueued(evt);
case "message.processed": return;
recordMessageProcessed(evt); case "message.processed":
return; recordMessageProcessed(evt);
case "queue.lane.enqueue": return;
recordLaneEnqueue(evt); case "queue.lane.enqueue":
return; recordLaneEnqueue(evt);
case "queue.lane.dequeue": return;
recordLaneDequeue(evt); case "queue.lane.dequeue":
return; recordLaneDequeue(evt);
case "session.state": return;
recordSessionState(evt); case "session.state":
return; recordSessionState(evt);
case "session.stuck": return;
recordSessionStuck(evt); case "session.stuck":
return; recordSessionStuck(evt);
case "run.attempt": return;
recordRunAttempt(evt); case "run.attempt":
return; recordRunAttempt(evt);
case "diagnostic.heartbeat": return;
recordHeartbeat(evt); case "diagnostic.heartbeat":
return; recordHeartbeat(evt);
return;
}
} catch (err) {
ctx.logger.error(
`diagnostics-otel: event handler failed (${evt.type}): ${formatError(err)}`,
);
} }
}); });

View File

@@ -167,34 +167,76 @@ export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event
? Omit<Event, "seq" | "ts"> ? Omit<Event, "seq" | "ts">
: never : never
: never; : never;
let seq = 0;
const listeners = new Set<(evt: DiagnosticEventPayload) => void>(); type DiagnosticEventsGlobalState = {
seq: number;
listeners: Set<(evt: DiagnosticEventPayload) => void>;
dispatchDepth: number;
};
function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
const globalStore = globalThis as typeof globalThis & {
__openclawDiagnosticEventsState?: DiagnosticEventsGlobalState;
};
if (!globalStore.__openclawDiagnosticEventsState) {
globalStore.__openclawDiagnosticEventsState = {
seq: 0,
listeners: new Set<(evt: DiagnosticEventPayload) => void>(),
dispatchDepth: 0,
};
}
return globalStore.__openclawDiagnosticEventsState;
}
export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean { export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean {
return config?.diagnostics?.enabled === true; return config?.diagnostics?.enabled === true;
} }
export function emitDiagnosticEvent(event: DiagnosticEventInput) { export function emitDiagnosticEvent(event: DiagnosticEventInput) {
const state = getDiagnosticEventsState();
if (state.dispatchDepth > 100) {
console.error(
`[diagnostic-events] recursion guard tripped at depth=${state.dispatchDepth}, dropping type=${event.type}`,
);
return;
}
const enriched = { const enriched = {
...event, ...event,
seq: (seq += 1), seq: (state.seq += 1),
ts: Date.now(), ts: Date.now(),
} satisfies DiagnosticEventPayload; } satisfies DiagnosticEventPayload;
for (const listener of listeners) { state.dispatchDepth += 1;
for (const listener of state.listeners) {
try { try {
listener(enriched); listener(enriched);
} catch { } catch (err) {
const errorMessage =
err instanceof Error
? (err.stack ?? err.message)
: typeof err === "string"
? err
: String(err);
console.error(
`[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`,
);
// Ignore listener failures. // Ignore listener failures.
} }
} }
state.dispatchDepth -= 1;
} }
export function onDiagnosticEvent(listener: (evt: DiagnosticEventPayload) => void): () => void { export function onDiagnosticEvent(listener: (evt: DiagnosticEventPayload) => void): () => void {
listeners.add(listener); const state = getDiagnosticEventsState();
return () => listeners.delete(listener); state.listeners.add(listener);
return () => {
state.listeners.delete(listener);
};
} }
export function resetDiagnosticEventsForTest(): void { export function resetDiagnosticEventsForTest(): void {
seq = 0; const state = getDiagnosticEventsState();
listeners.clear(); state.seq = 0;
state.listeners.clear();
state.dispatchDepth = 0;
} }