mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 12:04:59 +00:00
fix(ios): handle tts no_reply fallback in talk mode
This commit is contained in:
@@ -773,7 +773,15 @@ final class TalkModeManager: NSObject {
|
||||
await self.streamAssistant(runId: runId, gateway: gateway)
|
||||
}
|
||||
}
|
||||
let completion = await self.waitForChatCompletion(runId: runId, gateway: gateway, timeoutSeconds: 120)
|
||||
let completionStartedAt = Date()
|
||||
let completionResult = await self.waitForChatCompletion(
|
||||
runId: runId,
|
||||
gateway: gateway,
|
||||
timeoutSeconds: 120)
|
||||
let completion = completionResult.state
|
||||
let completionWaitMs = Int(Date().timeIntervalSince(completionStartedAt) * 1000)
|
||||
GatewayDiagnostics.log(
|
||||
"talk: chat completion state runId=\(runId) state=\(completion) waitMs=\(completionWaitMs)")
|
||||
if completion == .timeout {
|
||||
self.logger.warning(
|
||||
"chat completion timeout runId=\(runId, privacy: .public); attempting history fallback")
|
||||
@@ -796,13 +804,30 @@ final class TalkModeManager: NSObject {
|
||||
return
|
||||
}
|
||||
|
||||
var assistantText = try await self.waitForAssistantText(
|
||||
gateway: gateway,
|
||||
since: startedAt,
|
||||
timeoutSeconds: completion == .final ? 12 : 25)
|
||||
if assistantText == nil, shouldIncremental {
|
||||
let eventAssistantText = completionResult.finalAssistantText?
|
||||
.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
var assistantText: String?
|
||||
if let eventAssistantText, !eventAssistantText.isEmpty {
|
||||
GatewayDiagnostics.log(
|
||||
"talk: assistant text from chat event runId=\(runId) chars=\(eventAssistantText.count)")
|
||||
assistantText = eventAssistantText
|
||||
}
|
||||
|
||||
if assistantText == nil {
|
||||
// History can surface transient assistant/tool scaffolding before the final
|
||||
// assistant message is committed. Require a short stability window.
|
||||
let requireStableAssistantText = true
|
||||
assistantText = try await self.waitForAssistantText(
|
||||
gateway: gateway,
|
||||
since: startedAt,
|
||||
timeoutSeconds: completion == .final ? 12 : 25,
|
||||
runId: runId,
|
||||
requireStability: requireStableAssistantText)
|
||||
}
|
||||
if shouldIncremental {
|
||||
let fallback = self.incrementalSpeechBuffer.latestText
|
||||
if !fallback.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
|
||||
.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if assistantText == nil, !fallback.isEmpty {
|
||||
assistantText = fallback
|
||||
}
|
||||
}
|
||||
@@ -873,6 +898,11 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
}
|
||||
|
||||
private struct ChatCompletionResult {
|
||||
let state: ChatCompletionState
|
||||
let finalAssistantText: String?
|
||||
}
|
||||
|
||||
private func sendChat(_ message: String, gateway: GatewayNodeSession) async throws -> String {
|
||||
struct SendResponse: Decodable { let runId: String }
|
||||
let payload: [String: Any] = [
|
||||
@@ -897,13 +927,13 @@ final class TalkModeManager: NSObject {
|
||||
private func waitForChatCompletion(
|
||||
runId: String,
|
||||
gateway: GatewayNodeSession,
|
||||
timeoutSeconds: Int = 120) async -> ChatCompletionState
|
||||
timeoutSeconds: Int = 120) async -> ChatCompletionResult
|
||||
{
|
||||
let stream = await gateway.subscribeServerEvents(bufferingNewest: 200)
|
||||
return await withTaskGroup(of: ChatCompletionState.self) { group in
|
||||
return await withTaskGroup(of: ChatCompletionResult.self) { group in
|
||||
group.addTask { [runId] in
|
||||
for await evt in stream {
|
||||
if Task.isCancelled { return .timeout }
|
||||
if Task.isCancelled { return ChatCompletionResult(state: .timeout, finalAssistantText: nil) }
|
||||
guard evt.event == "chat", let payload = evt.payload else { continue }
|
||||
guard let chatEvent = try? GatewayPayloadDecoding.decode(payload, as: ChatEvent.self) else {
|
||||
continue
|
||||
@@ -911,41 +941,88 @@ final class TalkModeManager: NSObject {
|
||||
guard chatEvent.runid == runId else { continue }
|
||||
if let state = chatEvent.state.value as? String {
|
||||
switch state {
|
||||
case "final": return .final
|
||||
case "aborted": return .aborted
|
||||
case "error": return .error
|
||||
case "final":
|
||||
let eventText = Self.extractAssistantText(fromChatEventMessage: chatEvent.message)
|
||||
return ChatCompletionResult(state: .final, finalAssistantText: eventText)
|
||||
case "aborted":
|
||||
return ChatCompletionResult(state: .aborted, finalAssistantText: nil)
|
||||
case "error":
|
||||
return ChatCompletionResult(state: .error, finalAssistantText: nil)
|
||||
default: break
|
||||
}
|
||||
}
|
||||
}
|
||||
return .timeout
|
||||
GatewayDiagnostics.log("talk: chat event stream ended runId=\(runId)")
|
||||
return ChatCompletionResult(state: .timeout, finalAssistantText: nil)
|
||||
}
|
||||
group.addTask {
|
||||
try? await Task.sleep(nanoseconds: UInt64(timeoutSeconds) * 1_000_000_000)
|
||||
return .timeout
|
||||
return ChatCompletionResult(state: .timeout, finalAssistantText: nil)
|
||||
}
|
||||
let result = await group.next() ?? .timeout
|
||||
let result = await group.next() ?? ChatCompletionResult(state: .timeout, finalAssistantText: nil)
|
||||
group.cancelAll()
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
private struct AssistantHistoryMessage {
|
||||
let text: String
|
||||
let timestamp: Double?
|
||||
let source: String
|
||||
}
|
||||
|
||||
private func waitForAssistantText(
|
||||
gateway: GatewayNodeSession,
|
||||
since: Double,
|
||||
timeoutSeconds: Int) async throws -> String?
|
||||
timeoutSeconds: Int,
|
||||
runId: String,
|
||||
requireStability: Bool = false) async throws -> String?
|
||||
{
|
||||
let deadline = Date().addingTimeInterval(TimeInterval(timeoutSeconds))
|
||||
var candidate: String?
|
||||
var candidateSince: Date?
|
||||
let stableWindow: TimeInterval = 0.9
|
||||
|
||||
while Date() < deadline {
|
||||
if let text = try await self.fetchLatestAssistantText(gateway: gateway, since: since) {
|
||||
return text
|
||||
if let message = try await self.fetchLatestAssistantText(
|
||||
gateway: gateway,
|
||||
since: since,
|
||||
runId: runId)
|
||||
{
|
||||
let trimmed = message.text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else {
|
||||
try? await Task.sleep(nanoseconds: 300_000_000)
|
||||
continue
|
||||
}
|
||||
if !requireStability {
|
||||
GatewayDiagnostics.log(
|
||||
"talk: assistant wait return runId=\(runId) chars=\(trimmed.count) source=\(message.source)")
|
||||
return trimmed
|
||||
}
|
||||
if candidate != trimmed {
|
||||
candidate = trimmed
|
||||
candidateSince = Date()
|
||||
} else if let candidateSince,
|
||||
Date().timeIntervalSince(candidateSince) >= stableWindow
|
||||
{
|
||||
GatewayDiagnostics.log(
|
||||
"talk: assistant wait stable runId=\(runId) chars=\(trimmed.count)")
|
||||
return trimmed
|
||||
}
|
||||
}
|
||||
try? await Task.sleep(nanoseconds: 300_000_000)
|
||||
}
|
||||
return nil
|
||||
let candidateChars = candidate?.count ?? 0
|
||||
GatewayDiagnostics.log("talk: assistant wait timeout runId=\(runId) candidateChars=\(candidateChars)")
|
||||
return candidate
|
||||
}
|
||||
|
||||
private func fetchLatestAssistantText(gateway: GatewayNodeSession, since: Double? = nil) async throws -> String? {
|
||||
private func fetchLatestAssistantText(
|
||||
gateway: GatewayNodeSession,
|
||||
since: Double? = nil,
|
||||
runId: String? = nil
|
||||
) async throws -> AssistantHistoryMessage?
|
||||
{
|
||||
let res = try await gateway.request(
|
||||
method: "chat.history",
|
||||
paramsJSON: "{\"sessionKey\":\"\(self.mainSessionKey)\"}",
|
||||
@@ -954,15 +1031,158 @@ final class TalkModeManager: NSObject {
|
||||
guard let messages = json["messages"] as? [[String: Any]] else { return nil }
|
||||
for msg in messages.reversed() {
|
||||
guard (msg["role"] as? String) == "assistant" else { continue }
|
||||
if let since, let timestamp = msg["timestamp"] as? Double,
|
||||
TalkHistoryTimestamp.isAfter(timestamp, sinceSeconds: since) == false
|
||||
{
|
||||
let timestamp = Self.parseHistoryTimestamp(msg["timestamp"])
|
||||
if Self.isToolCarrierAssistantMessage(msg) {
|
||||
if let ttsText = Self.extractAssistantTTSToolCallText(fromHistoryAssistantMessage: msg) {
|
||||
return AssistantHistoryMessage(text: ttsText, timestamp: timestamp, source: "history:tts-tool-call")
|
||||
}
|
||||
continue
|
||||
}
|
||||
guard let content = msg["content"] as? [[String: Any]] else { continue }
|
||||
let text = content.compactMap { $0["text"] as? String }.joined(separator: "\n")
|
||||
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
if !trimmed.isEmpty { return trimmed }
|
||||
if let runId {
|
||||
let messageRunId = Self.extractHistoryRunId(msg)
|
||||
if let messageRunId, !messageRunId.isEmpty, messageRunId != runId {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if let since {
|
||||
guard let timestamp else { continue }
|
||||
if TalkHistoryTimestamp.isAfter(timestamp, sinceSeconds: since) == false {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if let assistantText = Self.extractAssistantSpokenText(fromHistoryAssistantMessage: msg) {
|
||||
return AssistantHistoryMessage(text: assistantText, timestamp: timestamp, source: "history:assistant-text")
|
||||
}
|
||||
if let ttsText = Self.extractAssistantTTSToolCallText(fromHistoryAssistantMessage: msg) {
|
||||
return AssistantHistoryMessage(text: ttsText, timestamp: timestamp, source: "history:tts-tool-call")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
private nonisolated static func extractAssistantText(fromChatEventMessage payload: AnyCodable?) -> String? {
|
||||
guard let payload else { return nil }
|
||||
|
||||
if let decoded = try? GatewayPayloadDecoding.decode(payload, as: OpenClawChatMessage.self),
|
||||
decoded.role == "assistant"
|
||||
{
|
||||
let text = decoded.content.compactMap(\.text).joined(separator: "\n")
|
||||
if let normalized = Self.normalizedSpokenAssistantText(text) {
|
||||
return normalized
|
||||
}
|
||||
}
|
||||
|
||||
guard let raw = payload.value as? [String: Any] else { return nil }
|
||||
guard (raw["role"] as? String) == "assistant" else { return nil }
|
||||
|
||||
if let normalized = Self.extractAssistantSpokenText(fromHistoryAssistantMessage: raw) {
|
||||
return normalized
|
||||
}
|
||||
|
||||
if let ttsText = Self.extractAssistantTTSToolCallText(fromHistoryAssistantMessage: raw) {
|
||||
return ttsText
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
private static func isToolCarrierAssistantMessage(_ msg: [String: Any]) -> Bool {
|
||||
if msg["toolCallId"] != nil || msg["tool_call_id"] != nil {
|
||||
return true
|
||||
}
|
||||
if msg["toolName"] != nil || msg["tool_name"] != nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
private static func extractHistoryRunId(_ msg: [String: Any]) -> String? {
|
||||
if let runId = msg["runId"] as? String {
|
||||
return runId
|
||||
}
|
||||
if let runId = msg["runid"] as? String {
|
||||
return runId
|
||||
}
|
||||
if let abort = msg["openclawAbort"] as? [String: Any], let runId = abort["runId"] as? String {
|
||||
return runId
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
private static func parseHistoryTimestamp(_ raw: Any?) -> Double? {
|
||||
if let value = raw as? Double { return value }
|
||||
if let value = raw as? Int { return Double(value) }
|
||||
if let value = raw as? Int64 { return Double(value) }
|
||||
if let value = raw as? NSNumber { return value.doubleValue }
|
||||
if let value = raw as? String, let parsed = Double(value) { return parsed }
|
||||
return nil
|
||||
}
|
||||
|
||||
private nonisolated static func normalizedSpokenAssistantText(_ raw: String?) -> String? {
|
||||
guard let raw else { return nil }
|
||||
let trimmed = raw.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else { return nil }
|
||||
if trimmed.uppercased() == "NO_REPLY" {
|
||||
return nil
|
||||
}
|
||||
return trimmed
|
||||
}
|
||||
|
||||
private nonisolated static func extractAssistantSpokenText(fromHistoryAssistantMessage msg: [String: Any]) -> String? {
|
||||
if let content = msg["content"] as? [[String: Any]] {
|
||||
let text = content.compactMap { block -> String? in
|
||||
if let type = (block["type"] as? String)?.lowercased(),
|
||||
type != "text",
|
||||
type != "output_text",
|
||||
type != "input_text"
|
||||
{
|
||||
return nil
|
||||
}
|
||||
return block["text"] as? String
|
||||
}.joined(separator: "\n")
|
||||
if let normalized = Self.normalizedSpokenAssistantText(text) {
|
||||
return normalized
|
||||
}
|
||||
}
|
||||
|
||||
if let content = msg["content"] as? String,
|
||||
let normalized = Self.normalizedSpokenAssistantText(content)
|
||||
{
|
||||
return normalized
|
||||
}
|
||||
|
||||
if let text = msg["text"] as? String,
|
||||
let normalized = Self.normalizedSpokenAssistantText(text)
|
||||
{
|
||||
return normalized
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
private nonisolated static func extractAssistantTTSToolCallText(fromHistoryAssistantMessage msg: [String: Any]) -> String? {
|
||||
guard let content = msg["content"] as? [[String: Any]] else { return nil }
|
||||
for block in content {
|
||||
let type = (block["type"] as? String)?.lowercased() ?? ""
|
||||
guard type == "toolcall" || type == "tool_call" else { continue }
|
||||
let name = (block["name"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() ?? ""
|
||||
guard name == "tts" else { continue }
|
||||
|
||||
if let args = block["arguments"] as? [String: Any],
|
||||
let text = args["text"] as? String,
|
||||
let normalized = Self.normalizedSpokenAssistantText(text)
|
||||
{
|
||||
return normalized
|
||||
}
|
||||
|
||||
if let args = block["arguments"] as? String,
|
||||
let argsData = args.data(using: .utf8),
|
||||
let json = (try? JSONSerialization.jsonObject(with: argsData)) as? [String: Any],
|
||||
let text = json["text"] as? String,
|
||||
let normalized = Self.normalizedSpokenAssistantText(text)
|
||||
{
|
||||
return normalized
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -973,6 +1193,7 @@ final class TalkModeManager: NSObject {
|
||||
let cleaned = parsed.stripped.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !cleaned.isEmpty else { return }
|
||||
self.applyDirective(directive)
|
||||
GatewayDiagnostics.log("talk tts: request chars=\(cleaned.count) mode=single")
|
||||
|
||||
self.statusText = "Generating voice…"
|
||||
self.isSpeaking = true
|
||||
@@ -1066,6 +1287,10 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
let duration = Date().timeIntervalSince(started)
|
||||
self.logger.info("elevenlabs stream finished=\(result.finished, privacy: .public) dur=\(duration, privacy: .public)s")
|
||||
let interruptedAt = result.interruptedAt.map { String(format: "%.3f", $0) } ?? "none"
|
||||
let durationMs = Int(duration * 1000)
|
||||
GatewayDiagnostics.log(
|
||||
"talk tts: playback finished=\(result.finished) interruptedAt=\(interruptedAt) durMs=\(durationMs) pcm=\(self.lastPlaybackWasPCM)")
|
||||
if !result.finished, let interruptedAt = result.interruptedAt {
|
||||
self.lastInterruptedAtSeconds = interruptedAt
|
||||
}
|
||||
@@ -1081,6 +1306,7 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
}
|
||||
self.statusText = "Speaking (System)…"
|
||||
GatewayDiagnostics.log("talk tts: system playback start chars=\(cleaned.count)")
|
||||
try await TalkSystemSpeechSynthesizer.shared.speak(text: cleaned, language: language)
|
||||
}
|
||||
} catch {
|
||||
@@ -1098,6 +1324,7 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
self.statusText = "Speaking (System)…"
|
||||
let language = ElevenLabsTTSClient.validatedLanguage(directive?.language)
|
||||
GatewayDiagnostics.log("talk tts: system playback fallback chars=\(cleaned.count)")
|
||||
try await TalkSystemSpeechSynthesizer.shared.speak(text: cleaned, language: language)
|
||||
} catch {
|
||||
self.statusText = "Speak failed: \(error.localizedDescription)"
|
||||
@@ -1123,6 +1350,9 @@ final class TalkModeManager: NSObject {
|
||||
_ = self.lastPlaybackWasPCM
|
||||
? self.mp3Player.stop()
|
||||
: self.pcmPlayer.stop()
|
||||
let interruptedSummary = interruptedAt.map { String(format: "%.3f", $0) } ?? "none"
|
||||
GatewayDiagnostics.log(
|
||||
"talk tts: stop interruptedAt=\(interruptedSummary) storeInterruption=\(storeInterruption) incrementalActive=\(hasIncremental)")
|
||||
} else if !hasIncremental {
|
||||
return
|
||||
}
|
||||
@@ -1546,6 +1776,7 @@ final class TalkModeManager: NSObject {
|
||||
} else {
|
||||
await self.updateIncrementalContextIfNeeded()
|
||||
guard let resolvedContext = self.incrementalSpeechContext else {
|
||||
GatewayDiagnostics.log("talk tts: incremental provider=system reason=context-missing chars=\(text.count)")
|
||||
try? await TalkSystemSpeechSynthesizer.shared.speak(
|
||||
text: text,
|
||||
language: self.incrementalSpeechLanguage)
|
||||
@@ -1555,6 +1786,7 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
guard context.canUseElevenLabs, let apiKey = context.apiKey, let voiceId = context.voiceId else {
|
||||
GatewayDiagnostics.log("talk tts: incremental provider=system reason=credentials-missing chars=\(text.count)")
|
||||
try? await TalkSystemSpeechSynthesizer.shared.speak(
|
||||
text: text,
|
||||
language: self.incrementalSpeechLanguage)
|
||||
@@ -2016,40 +2248,24 @@ extension TalkModeManager {
|
||||
}
|
||||
|
||||
private final class AudioTapDiagnostics: @unchecked Sendable {
|
||||
private let label: String
|
||||
private let onLevel: (@Sendable (Float) -> Void)?
|
||||
private let lock = NSLock()
|
||||
private var bufferCount: Int = 0
|
||||
private var lastLoggedAt = Date.distantPast
|
||||
private var lastLevelEmitAt = Date.distantPast
|
||||
private var maxRmsWindow: Float = 0
|
||||
private var lastRms: Float = 0
|
||||
|
||||
init(label: String, onLevel: (@Sendable (Float) -> Void)? = nil) {
|
||||
self.label = label
|
||||
_ = label
|
||||
self.onLevel = onLevel
|
||||
}
|
||||
|
||||
func onBuffer(_ buffer: AVAudioPCMBuffer) {
|
||||
var shouldLog = false
|
||||
var shouldEmitLevel = false
|
||||
var count = 0
|
||||
lock.lock()
|
||||
bufferCount += 1
|
||||
count = bufferCount
|
||||
let now = Date()
|
||||
if now.timeIntervalSince(lastLoggedAt) >= 1.0 {
|
||||
lastLoggedAt = now
|
||||
shouldLog = true
|
||||
}
|
||||
if now.timeIntervalSince(lastLevelEmitAt) >= 0.12 {
|
||||
lastLevelEmitAt = now
|
||||
shouldEmitLevel = true
|
||||
}
|
||||
lock.unlock()
|
||||
|
||||
let rate = buffer.format.sampleRate
|
||||
let ch = buffer.format.channelCount
|
||||
let frames = buffer.frameLength
|
||||
|
||||
var rms: Float?
|
||||
@@ -2066,20 +2282,9 @@ private final class AudioTapDiagnostics: @unchecked Sendable {
|
||||
}
|
||||
|
||||
let resolvedRms = rms ?? 0
|
||||
lock.lock()
|
||||
lastRms = resolvedRms
|
||||
if resolvedRms > maxRmsWindow { maxRmsWindow = resolvedRms }
|
||||
let maxRms = maxRmsWindow
|
||||
if shouldLog { maxRmsWindow = 0 }
|
||||
lock.unlock()
|
||||
|
||||
if shouldEmitLevel, let onLevel {
|
||||
onLevel(resolvedRms)
|
||||
}
|
||||
|
||||
guard shouldLog else { return }
|
||||
GatewayDiagnostics.log(
|
||||
"\(label) mic: buffers=\(count) frames=\(frames) rate=\(Int(rate))Hz ch=\(ch) rms=\(String(format: "%.4f", resolvedRms)) max=\(String(format: "%.4f", maxRms))")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user