mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 22:24:31 +00:00
voice-call: hang up rejected inbounds, idempotency and logging (#15892)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 36f826ea23
Co-authored-by: dcantu96 <32658690+dcantu96@users.noreply.github.com>
Co-authored-by: steipete <58493+steipete@users.noreply.github.com>
Reviewed-by: @steipete
This commit is contained in:
committed by
GitHub
parent
13aface863
commit
9443c638f4
@@ -4,13 +4,13 @@ import fsp from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { CallMode, VoiceCallConfig } from "./config.js";
|
||||
import type { CallManagerContext } from "./manager/context.js";
|
||||
import type { VoiceCallProvider } from "./providers/base.js";
|
||||
import { isAllowlistedCaller, normalizePhoneNumber } from "./allowlist.js";
|
||||
import { processEvent as processManagerEvent } from "./manager/events.js";
|
||||
import {
|
||||
type CallId,
|
||||
type CallRecord,
|
||||
CallRecordSchema,
|
||||
type CallState,
|
||||
type NormalizedEvent,
|
||||
type OutboundCallOptions,
|
||||
TerminalStates,
|
||||
@@ -44,6 +44,7 @@ export class CallManager {
|
||||
private activeCalls = new Map<CallId, CallRecord>();
|
||||
private providerCallIdMap = new Map<string, CallId>(); // providerCallId -> internal callId
|
||||
private processedEventIds = new Set<string>();
|
||||
private rejectedProviderCallIds = new Set<string>();
|
||||
private provider: VoiceCallProvider | null = null;
|
||||
private config: VoiceCallConfig;
|
||||
private storePath: string;
|
||||
@@ -282,35 +283,6 @@ export class CallManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start max duration timer for a call.
|
||||
* Auto-hangup when maxDurationSeconds is reached.
|
||||
*/
|
||||
private startMaxDurationTimer(callId: CallId): void {
|
||||
// Clear any existing timer
|
||||
this.clearMaxDurationTimer(callId);
|
||||
|
||||
const maxDurationMs = this.config.maxDurationSeconds * 1000;
|
||||
console.log(
|
||||
`[voice-call] Starting max duration timer (${this.config.maxDurationSeconds}s) for call ${callId}`,
|
||||
);
|
||||
|
||||
const timer = setTimeout(async () => {
|
||||
this.maxDurationTimers.delete(callId);
|
||||
const call = this.getCall(callId);
|
||||
if (call && !TerminalStates.has(call.state)) {
|
||||
console.log(
|
||||
`[voice-call] Max duration reached (${this.config.maxDurationSeconds}s), ending call ${callId}`,
|
||||
);
|
||||
call.endReason = "timeout";
|
||||
this.persistCallRecord(call);
|
||||
await this.endCall(callId);
|
||||
}
|
||||
}, maxDurationMs);
|
||||
|
||||
this.maxDurationTimers.set(callId, timer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear max duration timer for a call.
|
||||
*/
|
||||
@@ -340,15 +312,6 @@ export class CallManager {
|
||||
waiter.reject(new Error(reason));
|
||||
}
|
||||
|
||||
private resolveTranscriptWaiter(callId: CallId, transcript: string): void {
|
||||
const waiter = this.transcriptWaiters.get(callId);
|
||||
if (!waiter) {
|
||||
return;
|
||||
}
|
||||
this.clearTranscriptWaiter(callId);
|
||||
waiter.resolve(transcript);
|
||||
}
|
||||
|
||||
private waitForFinalTranscript(callId: CallId): Promise<string> {
|
||||
// Only allow one in-flight waiter per call.
|
||||
this.rejectTranscriptWaiter(callId, "Transcript waiter replaced");
|
||||
@@ -458,220 +421,29 @@ export class CallManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an inbound call should be accepted based on policy.
|
||||
*/
|
||||
private shouldAcceptInbound(from: string | undefined): boolean {
|
||||
const { inboundPolicy: policy, allowFrom } = this.config;
|
||||
|
||||
switch (policy) {
|
||||
case "disabled":
|
||||
console.log("[voice-call] Inbound call rejected: policy is disabled");
|
||||
return false;
|
||||
|
||||
case "open":
|
||||
console.log("[voice-call] Inbound call accepted: policy is open");
|
||||
return true;
|
||||
|
||||
case "allowlist":
|
||||
case "pairing": {
|
||||
const normalized = normalizePhoneNumber(from);
|
||||
if (!normalized) {
|
||||
console.log("[voice-call] Inbound call rejected: missing caller ID");
|
||||
return false;
|
||||
}
|
||||
const allowed = isAllowlistedCaller(normalized, allowFrom);
|
||||
const status = allowed ? "accepted" : "rejected";
|
||||
console.log(
|
||||
`[voice-call] Inbound call ${status}: ${from} ${allowed ? "is in" : "not in"} allowlist`,
|
||||
);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a call record for an inbound call.
|
||||
*/
|
||||
private createInboundCall(providerCallId: string, from: string, to: string): CallRecord {
|
||||
const callId = crypto.randomUUID();
|
||||
|
||||
const callRecord: CallRecord = {
|
||||
callId,
|
||||
providerCallId,
|
||||
provider: this.provider?.name || "twilio",
|
||||
direction: "inbound",
|
||||
state: "ringing",
|
||||
from,
|
||||
to,
|
||||
startedAt: Date.now(),
|
||||
transcript: [],
|
||||
processedEventIds: [],
|
||||
metadata: {
|
||||
initialMessage: this.config.inboundGreeting || "Hello! How can I help you today?",
|
||||
private getContext(): CallManagerContext {
|
||||
return {
|
||||
activeCalls: this.activeCalls,
|
||||
providerCallIdMap: this.providerCallIdMap,
|
||||
processedEventIds: this.processedEventIds,
|
||||
rejectedProviderCallIds: this.rejectedProviderCallIds,
|
||||
onCallAnswered: (call) => {
|
||||
this.maybeSpeakInitialMessageOnAnswered(call);
|
||||
},
|
||||
provider: this.provider,
|
||||
config: this.config,
|
||||
storePath: this.storePath,
|
||||
webhookUrl: this.webhookUrl,
|
||||
transcriptWaiters: this.transcriptWaiters,
|
||||
maxDurationTimers: this.maxDurationTimers,
|
||||
};
|
||||
|
||||
this.activeCalls.set(callId, callRecord);
|
||||
this.providerCallIdMap.set(providerCallId, callId); // Map providerCallId to internal callId
|
||||
this.persistCallRecord(callRecord);
|
||||
|
||||
console.log(`[voice-call] Created inbound call record: ${callId} from ${from}`);
|
||||
return callRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up a call by either internal callId or providerCallId.
|
||||
*/
|
||||
private findCall(callIdOrProviderCallId: string): CallRecord | undefined {
|
||||
// Try direct lookup by internal callId
|
||||
const directCall = this.activeCalls.get(callIdOrProviderCallId);
|
||||
if (directCall) {
|
||||
return directCall;
|
||||
}
|
||||
|
||||
// Try lookup by providerCallId
|
||||
return this.getCallByProviderCallId(callIdOrProviderCallId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a webhook event.
|
||||
*/
|
||||
processEvent(event: NormalizedEvent): void {
|
||||
// Idempotency check
|
||||
if (this.processedEventIds.has(event.id)) {
|
||||
return;
|
||||
}
|
||||
this.processedEventIds.add(event.id);
|
||||
|
||||
let call = this.findCall(event.callId);
|
||||
|
||||
// Handle inbound calls - create record if it doesn't exist
|
||||
if (!call && event.direction === "inbound" && event.providerCallId) {
|
||||
// Check if we should accept this inbound call
|
||||
if (!this.shouldAcceptInbound(event.from)) {
|
||||
void this.rejectInboundCall(event);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a new call record for this inbound call
|
||||
call = this.createInboundCall(
|
||||
event.providerCallId,
|
||||
event.from || "unknown",
|
||||
event.to || this.config.fromNumber || "unknown",
|
||||
);
|
||||
|
||||
// Update the event's callId to use our internal ID
|
||||
event.callId = call.callId;
|
||||
}
|
||||
|
||||
if (!call) {
|
||||
// Still no call record - ignore event
|
||||
return;
|
||||
}
|
||||
|
||||
// Update provider call ID if we got it
|
||||
if (event.providerCallId && event.providerCallId !== call.providerCallId) {
|
||||
const previousProviderCallId = call.providerCallId;
|
||||
call.providerCallId = event.providerCallId;
|
||||
this.providerCallIdMap.set(event.providerCallId, call.callId);
|
||||
if (previousProviderCallId) {
|
||||
const mapped = this.providerCallIdMap.get(previousProviderCallId);
|
||||
if (mapped === call.callId) {
|
||||
this.providerCallIdMap.delete(previousProviderCallId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Track processed event
|
||||
call.processedEventIds.push(event.id);
|
||||
|
||||
// Process event based on type
|
||||
switch (event.type) {
|
||||
case "call.initiated":
|
||||
this.transitionState(call, "initiated");
|
||||
break;
|
||||
|
||||
case "call.ringing":
|
||||
this.transitionState(call, "ringing");
|
||||
break;
|
||||
|
||||
case "call.answered":
|
||||
call.answeredAt = event.timestamp;
|
||||
this.transitionState(call, "answered");
|
||||
// Start max duration timer when call is answered
|
||||
this.startMaxDurationTimer(call.callId);
|
||||
// Best-effort: speak initial message (for inbound greetings and outbound
|
||||
// conversation mode) once the call is answered.
|
||||
this.maybeSpeakInitialMessageOnAnswered(call);
|
||||
break;
|
||||
|
||||
case "call.active":
|
||||
this.transitionState(call, "active");
|
||||
break;
|
||||
|
||||
case "call.speaking":
|
||||
this.transitionState(call, "speaking");
|
||||
break;
|
||||
|
||||
case "call.speech":
|
||||
if (event.isFinal) {
|
||||
this.addTranscriptEntry(call, "user", event.transcript);
|
||||
this.resolveTranscriptWaiter(call.callId, event.transcript);
|
||||
}
|
||||
this.transitionState(call, "listening");
|
||||
break;
|
||||
|
||||
case "call.ended":
|
||||
call.endedAt = event.timestamp;
|
||||
call.endReason = event.reason;
|
||||
this.transitionState(call, event.reason as CallState);
|
||||
this.clearMaxDurationTimer(call.callId);
|
||||
this.rejectTranscriptWaiter(call.callId, `Call ended: ${event.reason}`);
|
||||
this.activeCalls.delete(call.callId);
|
||||
if (call.providerCallId) {
|
||||
this.providerCallIdMap.delete(call.providerCallId);
|
||||
}
|
||||
break;
|
||||
|
||||
case "call.error":
|
||||
if (!event.retryable) {
|
||||
call.endedAt = event.timestamp;
|
||||
call.endReason = "error";
|
||||
this.transitionState(call, "error");
|
||||
this.clearMaxDurationTimer(call.callId);
|
||||
this.rejectTranscriptWaiter(call.callId, `Call error: ${event.error}`);
|
||||
this.activeCalls.delete(call.callId);
|
||||
if (call.providerCallId) {
|
||||
this.providerCallIdMap.delete(call.providerCallId);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
this.persistCallRecord(call);
|
||||
}
|
||||
|
||||
private async rejectInboundCall(event: NormalizedEvent): Promise<void> {
|
||||
if (!this.provider || !event.providerCallId) {
|
||||
return;
|
||||
}
|
||||
const callId = event.callId || event.providerCallId;
|
||||
try {
|
||||
await this.provider.hangupCall({
|
||||
callId,
|
||||
providerCallId: event.providerCallId,
|
||||
reason: "hangup-bot",
|
||||
});
|
||||
} catch (err) {
|
||||
console.warn(
|
||||
`[voice-call] Failed to reject inbound call ${event.providerCallId}:`,
|
||||
err instanceof Error ? err.message : err,
|
||||
);
|
||||
}
|
||||
processManagerEvent(this.getContext(), event);
|
||||
}
|
||||
|
||||
private maybeSpeakInitialMessageOnAnswered(call: CallRecord): void {
|
||||
@@ -758,52 +530,6 @@ export class CallManager {
|
||||
return calls;
|
||||
}
|
||||
|
||||
// States that can cycle during multi-turn conversations
|
||||
private static readonly ConversationStates = new Set<CallState>(["speaking", "listening"]);
|
||||
|
||||
// Non-terminal state order for monotonic transitions
|
||||
private static readonly StateOrder: readonly CallState[] = [
|
||||
"initiated",
|
||||
"ringing",
|
||||
"answered",
|
||||
"active",
|
||||
"speaking",
|
||||
"listening",
|
||||
];
|
||||
|
||||
/**
|
||||
* Transition call state with monotonic enforcement.
|
||||
*/
|
||||
private transitionState(call: CallRecord, newState: CallState): void {
|
||||
// No-op for same state or already terminal
|
||||
if (call.state === newState || TerminalStates.has(call.state)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Terminal states can always be reached from non-terminal
|
||||
if (TerminalStates.has(newState)) {
|
||||
call.state = newState;
|
||||
return;
|
||||
}
|
||||
|
||||
// Allow cycling between speaking and listening (multi-turn conversations)
|
||||
if (
|
||||
CallManager.ConversationStates.has(call.state) &&
|
||||
CallManager.ConversationStates.has(newState)
|
||||
) {
|
||||
call.state = newState;
|
||||
return;
|
||||
}
|
||||
|
||||
// Only allow forward transitions in state order
|
||||
const currentIndex = CallManager.StateOrder.indexOf(call.state);
|
||||
const newIndex = CallManager.StateOrder.indexOf(newState);
|
||||
|
||||
if (newIndex > currentIndex) {
|
||||
call.state = newState;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an entry to the call transcript.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user