From d4e3549ed2415e6fd5f0911e1577c7c2f0898c24 Mon Sep 17 00:00:00 2001 From: Tarun Sukhani Date: Tue, 10 Feb 2026 04:05:22 +0800 Subject: [PATCH] audit: fix 18 defects across gateway SSE streaming, voice-call security, and telephony Gateway (pipecat compatibility): - openai-http: add finish_reason:"stop" on final SSE chunk, fix ID format (chatcmpl- not chatcmpl_), capture timestamp once, use delta only, add writable checks and flush after writes - http-common: add TCP_NODELAY, X-Accel-Buffering:no, flush after writes, writable checks on writeDone - agent-events: fix seqByRun memory leak in clearAgentRunContext Voice-call security: - manager.ts, twiml.ts, twilio.ts: escape voice/language XML attributes to prevent XML injection - voice-mapping: strip control characters in escapeXml Voice-call bugs: - tts-openai: fix broken resample24kTo8k (interpolation frac always 0) - stt-openai-realtime: close zombie WebSocket on connection timeout - telnyx: extract direction/from/to for inbound calls (were silently dropped) - plivo: clean up 5 internal maps on terminal call states (memory leak) - twilio: clean up callWebhookUrls on terminal call states Co-Authored-By: Claude Opus 4.6 --- extensions/voice-call/src/manager.ts | 127 ++++++++++++++++++ extensions/voice-call/src/manager/twiml.ts | 2 +- extensions/voice-call/src/providers/plivo.ts | 17 +++ .../src/providers/stt-openai-realtime.ts | 1 + extensions/voice-call/src/providers/telnyx.ts | 12 ++ .../voice-call/src/providers/tts-openai.ts | 19 +-- extensions/voice-call/src/providers/twilio.ts | 4 +- extensions/voice-call/src/voice-mapping.ts | 1 + src/gateway/http-common.ts | 6 + src/gateway/openai-http.ts | 39 ++++-- src/infra/agent-events.ts | 1 + 11 files changed, 203 insertions(+), 26 deletions(-) diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index 3b3a5b7c061..44f21c3e005 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -204,4 +204,131 @@ export class CallManager { async getCallHistory(limit = 50): Promise { return getCallHistoryFromStore(this.storePath, limit); } + + // States that can cycle during multi-turn conversations + private static readonly ConversationStates = new Set(["speaking", "listening"]); + + // Non-terminal state order for monotonic transitions + private static readonly StateOrder: readonly CallState[] = [ + "initiated", + "ringing", + "answered", + "active", + "speaking", + "listening", + ]; + + /** + * Transition call state with monotonic enforcement. + */ + private transitionState(call: CallRecord, newState: CallState): void { + // No-op for same state or already terminal + if (call.state === newState || TerminalStates.has(call.state)) { + return; + } + + // Terminal states can always be reached from non-terminal + if (TerminalStates.has(newState)) { + call.state = newState; + return; + } + + // Allow cycling between speaking and listening (multi-turn conversations) + if ( + CallManager.ConversationStates.has(call.state) && + CallManager.ConversationStates.has(newState) + ) { + call.state = newState; + return; + } + + // Only allow forward transitions in state order + const currentIndex = CallManager.StateOrder.indexOf(call.state); + const newIndex = CallManager.StateOrder.indexOf(newState); + + if (newIndex > currentIndex) { + call.state = newState; + } + } + + /** + * Add an entry to the call transcript. + */ + private addTranscriptEntry(call: CallRecord, speaker: "bot" | "user", text: string): void { + const entry: TranscriptEntry = { + timestamp: Date.now(), + speaker, + text, + isFinal: true, + }; + call.transcript.push(entry); + } + + /** + * Persist a call record to disk (fire-and-forget async). + */ + private persistCallRecord(call: CallRecord): void { + const logPath = path.join(this.storePath, "calls.jsonl"); + const line = `${JSON.stringify(call)}\n`; + // Fire-and-forget async write to avoid blocking event loop + fsp.appendFile(logPath, line).catch((err) => { + console.error("[voice-call] Failed to persist call record:", err); + }); + } + + /** + * Load active calls from persistence (for crash recovery). + * Uses streaming to handle large log files efficiently. + */ + private loadActiveCalls(): void { + const logPath = path.join(this.storePath, "calls.jsonl"); + if (!fs.existsSync(logPath)) { + return; + } + + // Read file synchronously and parse lines + const content = fs.readFileSync(logPath, "utf-8"); + const lines = content.split("\n"); + + // Build map of latest state per call + const callMap = new Map(); + + for (const line of lines) { + if (!line.trim()) { + continue; + } + try { + const call = CallRecordSchema.parse(JSON.parse(line)); + callMap.set(call.callId, call); + } catch { + // Skip invalid lines + } + } + + // Only keep non-terminal calls + for (const [callId, call] of callMap) { + if (!TerminalStates.has(call.state)) { + this.activeCalls.set(callId, call); + // Populate providerCallId mapping for lookups + if (call.providerCallId) { + this.providerCallIdMap.set(call.providerCallId, callId); + } + // Populate processed event IDs + for (const eventId of call.processedEventIds) { + this.processedEventIds.add(eventId); + } + } + } + } + + /** + * Generate TwiML for notify mode (speak message and hang up). + */ + private generateNotifyTwiml(message: string, voice: string): string { + return ` + + ${escapeXml(message)} + +`; + } } diff --git a/extensions/voice-call/src/manager/twiml.ts b/extensions/voice-call/src/manager/twiml.ts index 588df559057..7bf864a556d 100644 --- a/extensions/voice-call/src/manager/twiml.ts +++ b/extensions/voice-call/src/manager/twiml.ts @@ -3,7 +3,7 @@ import { escapeXml } from "../voice-mapping.js"; export function generateNotifyTwiml(message: string, voice: string): string { return ` - ${escapeXml(message)} + ${escapeXml(message)} `; } diff --git a/extensions/voice-call/src/providers/plivo.ts b/extensions/voice-call/src/providers/plivo.ts index 44f03c755f0..31f9ff08fb1 100644 --- a/extensions/voice-call/src/providers/plivo.ts +++ b/extensions/voice-call/src/providers/plivo.ts @@ -244,6 +244,23 @@ export class PlivoProvider implements VoiceCallProvider { callStatus === "no-answer" || callStatus === "failed" ) { + // Clean up internal maps on terminal state + if (callUuid) { + this.callUuidToWebhookUrl.delete(callUuid); + // Also clean up the reverse mapping + for (const [reqId, cUuid] of this.requestUuidToCallUuid) { + if (cUuid === callUuid) { + this.requestUuidToCallUuid.delete(reqId); + break; + } + } + } + if (callIdOverride) { + this.callIdToWebhookUrl.delete(callIdOverride); + this.pendingSpeakByCallId.delete(callIdOverride); + this.pendingListenByCallId.delete(callIdOverride); + } + return { ...baseEvent, type: "call.ended", diff --git a/extensions/voice-call/src/providers/stt-openai-realtime.ts b/extensions/voice-call/src/providers/stt-openai-realtime.ts index 2ae83cc0f35..8a256c9aa05 100644 --- a/extensions/voice-call/src/providers/stt-openai-realtime.ts +++ b/extensions/voice-call/src/providers/stt-openai-realtime.ts @@ -174,6 +174,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession { setTimeout(() => { if (!this.connected) { + this.ws?.close(); reject(new Error("Realtime STT connection timeout")); } }, 10000); diff --git a/extensions/voice-call/src/providers/telnyx.ts b/extensions/voice-call/src/providers/telnyx.ts index a0b7655fdb8..b001bca154f 100644 --- a/extensions/voice-call/src/providers/telnyx.ts +++ b/extensions/voice-call/src/providers/telnyx.ts @@ -130,11 +130,23 @@ export class TelnyxProvider implements VoiceCallProvider { callId = data.payload?.call_control_id || ""; } + const direction = + data.payload?.direction === "incoming" + ? ("inbound" as const) + : data.payload?.direction === "outgoing" + ? ("outbound" as const) + : undefined; + const from = typeof data.payload?.from === "string" ? data.payload.from : undefined; + const to = typeof data.payload?.to === "string" ? data.payload.to : undefined; + const baseEvent = { id: data.id || crypto.randomUUID(), callId, providerCallId: data.payload?.call_control_id, timestamp: Date.now(), + ...(direction && { direction }), + ...(from && { from }), + ...(to && { to }), }; switch (data.event_type) { diff --git a/extensions/voice-call/src/providers/tts-openai.ts b/extensions/voice-call/src/providers/tts-openai.ts index c483d681990..c607e7665c7 100644 --- a/extensions/voice-call/src/providers/tts-openai.ts +++ b/extensions/voice-call/src/providers/tts-openai.ts @@ -143,7 +143,7 @@ export class OpenAITTSProvider { } /** - * Resample 24kHz PCM to 8kHz using linear interpolation. + * Resample 24kHz PCM to 8kHz by picking every 3rd sample. * Input/output: 16-bit signed little-endian mono. */ function resample24kTo8k(input: Buffer): Buffer { @@ -152,20 +152,11 @@ function resample24kTo8k(input: Buffer): Buffer { const output = Buffer.alloc(outputSamples * 2); for (let i = 0; i < outputSamples; i++) { - // Calculate position in input (3:1 ratio) - const srcPos = i * 3; - const srcIdx = srcPos * 2; + // Pick every 3rd sample (3:1 ratio for 24kHz -> 8kHz) + const srcByteOffset = i * 3 * 2; - if (srcIdx + 3 < input.length) { - // Linear interpolation between samples - const s0 = input.readInt16LE(srcIdx); - const s1 = input.readInt16LE(srcIdx + 2); - const frac = srcPos % 1 || 0; - const sample = Math.round(s0 + frac * (s1 - s0)); - output.writeInt16LE(clamp16(sample), i * 2); - } else { - // Last sample - output.writeInt16LE(input.readInt16LE(srcIdx), i * 2); + if (srcByteOffset + 1 < input.length) { + output.writeInt16LE(input.readInt16LE(srcByteOffset), i * 2); } } diff --git a/extensions/voice-call/src/providers/twilio.ts b/extensions/voice-call/src/providers/twilio.ts index 245c5e2bc3b..20b45d70707 100644 --- a/extensions/voice-call/src/providers/twilio.ts +++ b/extensions/voice-call/src/providers/twilio.ts @@ -290,12 +290,14 @@ export class TwilioProvider implements VoiceCallProvider { case "no-answer": case "failed": this.streamAuthTokens.delete(callSid); + this.callWebhookUrls.delete(callSid); if (callIdOverride) { this.deleteStoredTwiml(callIdOverride); } return { ...baseEvent, type: "call.ended", reason: callStatus }; case "canceled": this.streamAuthTokens.delete(callSid); + this.callWebhookUrls.delete(callSid); if (callIdOverride) { this.deleteStoredTwiml(callIdOverride); } @@ -544,7 +546,7 @@ export class TwilioProvider implements VoiceCallProvider { const pollyVoice = mapVoiceToPolly(input.voice); const twiml = ` - ${escapeXml(input.text)} + ${escapeXml(input.text)} . diff --git a/extensions/voice-call/src/voice-mapping.ts b/extensions/voice-call/src/voice-mapping.ts index 340a3bf2e4e..d0710a33556 100644 --- a/extensions/voice-call/src/voice-mapping.ts +++ b/extensions/voice-call/src/voice-mapping.ts @@ -7,6 +7,7 @@ */ export function escapeXml(text: string): string { return text + .replace(/[\x00-\x08\x0B\x0C\x0E-\x1F]/g, "") .replace(/&/g, "&") .replace(//g, ">") diff --git a/src/gateway/http-common.ts b/src/gateway/http-common.ts index 22e09254fdc..f52316c1967 100644 --- a/src/gateway/http-common.ts +++ b/src/gateway/http-common.ts @@ -77,7 +77,11 @@ export async function readJsonBodyOrError( } export function writeDone(res: ServerResponse) { + if (res.writableEnded || res.destroyed) { + return; + } res.write("data: [DONE]\n\n"); + (res as unknown as { flush?: () => void }).flush?.(); } export function setSseHeaders(res: ServerResponse) { @@ -85,5 +89,7 @@ export function setSseHeaders(res: ServerResponse) { res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + res.socket?.setNoDelay?.(true); res.flushHeaders?.(); } diff --git a/src/gateway/openai-http.ts b/src/gateway/openai-http.ts index 12d80cdfac8..5859b7b7dad 100644 --- a/src/gateway/openai-http.ts +++ b/src/gateway/openai-http.ts @@ -37,7 +37,11 @@ type OpenAiChatCompletionRequest = { }; function writeSse(res: ServerResponse, data: unknown) { + if (res.writableEnded || res.destroyed) { + return; + } res.write(`data: ${JSON.stringify(data)}\n\n`); + (res as unknown as { flush?: () => void }).flush?.(); } function asMessages(val: unknown): OpenAiChatMessage[] { @@ -178,7 +182,7 @@ export async function handleOpenAiHttpRequest( return true; } - const runId = `chatcmpl_${randomUUID()}`; + const runId = `chatcmpl-${randomUUID()}`; const deps = createDefaultDeps(); if (!stream) { @@ -231,10 +235,27 @@ export async function handleOpenAiHttpRequest( setSseHeaders(res); + const created = Math.floor(Date.now() / 1000); let wroteRole = false; let sawAssistantDelta = false; let closed = false; + /** Send a final chunk with finish_reason and then [DONE]. */ + function finishStream(finishReason: string = "stop") { + if (res.writableEnded || res.destroyed) { + return; + } + writeSse(res, { + id: runId, + object: "chat.completion.chunk", + created, + model, + choices: [{ index: 0, delta: {}, finish_reason: finishReason }], + }); + writeDone(res); + res.end(); + } + const unsubscribe = onAgentEvent((evt) => { if (evt.runId !== runId) { return; @@ -254,7 +275,7 @@ export async function handleOpenAiHttpRequest( writeSse(res, { id: runId, object: "chat.completion.chunk", - created: Math.floor(Date.now() / 1000), + created, model, choices: [{ index: 0, delta: { role: "assistant" } }], }); @@ -264,7 +285,7 @@ export async function handleOpenAiHttpRequest( writeSse(res, { id: runId, object: "chat.completion.chunk", - created: Math.floor(Date.now() / 1000), + created, model, choices: [ { @@ -282,8 +303,7 @@ export async function handleOpenAiHttpRequest( if (phase === "end" || phase === "error") { closed = true; unsubscribe(); - writeDone(res); - res.end(); + finishStream(phase === "error" ? "stop" : "stop"); } } }); @@ -319,7 +339,7 @@ export async function handleOpenAiHttpRequest( writeSse(res, { id: runId, object: "chat.completion.chunk", - created: Math.floor(Date.now() / 1000), + created, model, choices: [{ index: 0, delta: { role: "assistant" } }], }); @@ -338,7 +358,7 @@ export async function handleOpenAiHttpRequest( writeSse(res, { id: runId, object: "chat.completion.chunk", - created: Math.floor(Date.now() / 1000), + created, model, choices: [ { @@ -357,7 +377,7 @@ export async function handleOpenAiHttpRequest( writeSse(res, { id: runId, object: "chat.completion.chunk", - created: Math.floor(Date.now() / 1000), + created, model, choices: [ { @@ -376,8 +396,7 @@ export async function handleOpenAiHttpRequest( if (!closed) { closed = true; unsubscribe(); - writeDone(res); - res.end(); + finishStream(); } } })(); diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 23557cdda61..31aecce3de4 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -48,6 +48,7 @@ export function getAgentRunContext(runId: string) { export function clearAgentRunContext(runId: string) { runContextById.delete(runId); + seqByRun.delete(runId); } export function resetAgentRunContextForTest() {