mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 03:55:02 +00:00
fix(ios): reduce talk mode first-word latency
This commit is contained in:
@@ -35,6 +35,28 @@ final class TalkModeManager: NSObject {
|
||||
case pushToTalk
|
||||
}
|
||||
|
||||
private enum TranscriptTrigger: String {
|
||||
case pushToTalk = "ptt"
|
||||
case finalRecognition = "final"
|
||||
case silence = "silence"
|
||||
}
|
||||
|
||||
private enum TalkLatencyMarker: String {
|
||||
case firstAssistantDelta = "assistant_delta_first"
|
||||
case firstTTSRequest = "tts_request_first"
|
||||
case firstTTSByte = "tts_first_byte"
|
||||
case playbackStart = "playback_start"
|
||||
}
|
||||
|
||||
private struct TalkLatencyTrace {
|
||||
let id: String
|
||||
let speechEndAt: Date
|
||||
var firstAssistantDeltaMarked = false
|
||||
var firstTTSRequestMarked = false
|
||||
var firstTTSByteMarked = false
|
||||
var playbackStartMarked = false
|
||||
}
|
||||
|
||||
private var captureMode: CaptureMode = .idle
|
||||
private var resumeContinuousAfterPTT: Bool = false
|
||||
private var activePTTCaptureId: String?
|
||||
@@ -76,7 +98,11 @@ final class TalkModeManager: NSObject {
|
||||
|
||||
private var gateway: GatewayNodeSession?
|
||||
private var gatewayConnected = false
|
||||
private let silenceWindow: TimeInterval = 0.9
|
||||
private let silenceWindow: TimeInterval = 0.6
|
||||
private let silencePollIntervalNs: UInt64 = 100_000_000
|
||||
private let configReloadCacheTTL: TimeInterval = 60
|
||||
private var lastConfigReloadAt: Date?
|
||||
private var activeLatencyTrace: TalkLatencyTrace?
|
||||
private var lastAudioActivity: Date?
|
||||
private var noiseFloorSamples: [Double] = []
|
||||
private var noiseFloor: Double?
|
||||
@@ -101,8 +127,80 @@ final class TalkModeManager: NSObject {
|
||||
super.init()
|
||||
}
|
||||
|
||||
private func beginLatencyTrace(trigger: TranscriptTrigger, transcriptChars: Int) {
|
||||
let id = String(UUID().uuidString.prefix(8))
|
||||
self.activeLatencyTrace = TalkLatencyTrace(id: id, speechEndAt: Date())
|
||||
GatewayDiagnostics.log(
|
||||
"talk latency[\(id)]: speech_end trigger=\(trigger.rawValue) chars=\(transcriptChars)")
|
||||
}
|
||||
|
||||
private func markLatencyPhase(_ phase: String, details: String? = nil) {
|
||||
guard let trace = self.activeLatencyTrace else { return }
|
||||
let elapsedMs = Int(Date().timeIntervalSince(trace.speechEndAt) * 1000)
|
||||
if let details, !details.isEmpty {
|
||||
GatewayDiagnostics.log("talk latency[\(trace.id)]: \(phase) t=\(elapsedMs)ms \(details)")
|
||||
} else {
|
||||
GatewayDiagnostics.log("talk latency[\(trace.id)]: \(phase) t=\(elapsedMs)ms")
|
||||
}
|
||||
}
|
||||
|
||||
private func markLatencyFirst(_ marker: TalkLatencyMarker, details: String? = nil) {
|
||||
guard var trace = self.activeLatencyTrace else { return }
|
||||
let shouldMark: Bool
|
||||
switch marker {
|
||||
case .firstAssistantDelta:
|
||||
shouldMark = !trace.firstAssistantDeltaMarked
|
||||
if shouldMark {
|
||||
trace.firstAssistantDeltaMarked = true
|
||||
}
|
||||
case .firstTTSRequest:
|
||||
shouldMark = !trace.firstTTSRequestMarked
|
||||
if shouldMark {
|
||||
trace.firstTTSRequestMarked = true
|
||||
}
|
||||
case .firstTTSByte:
|
||||
shouldMark = !trace.firstTTSByteMarked
|
||||
if shouldMark {
|
||||
trace.firstTTSByteMarked = true
|
||||
}
|
||||
case .playbackStart:
|
||||
shouldMark = !trace.playbackStartMarked
|
||||
if shouldMark {
|
||||
trace.playbackStartMarked = true
|
||||
}
|
||||
}
|
||||
guard shouldMark else { return }
|
||||
self.activeLatencyTrace = trace
|
||||
self.markLatencyPhase(marker.rawValue, details: details)
|
||||
}
|
||||
|
||||
private func instrumentTTSStream(
|
||||
_ stream: AsyncThrowingStream<Data, Error>,
|
||||
source: String
|
||||
) -> AsyncThrowingStream<Data, Error>
|
||||
{
|
||||
AsyncThrowingStream { continuation in
|
||||
Task { @MainActor [weak self] in
|
||||
var sawFirstChunk = false
|
||||
do {
|
||||
for try await chunk in stream {
|
||||
if !sawFirstChunk {
|
||||
sawFirstChunk = true
|
||||
self?.markLatencyFirst(.firstTTSByte, details: "source=\(source) bytes=\(chunk.count)")
|
||||
}
|
||||
continuation.yield(chunk)
|
||||
}
|
||||
continuation.finish()
|
||||
} catch {
|
||||
continuation.finish(throwing: error)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func attachGateway(_ gateway: GatewayNodeSession) {
|
||||
self.gateway = gateway
|
||||
self.lastConfigReloadAt = nil
|
||||
}
|
||||
|
||||
func updateGatewayConnected(_ connected: Bool) {
|
||||
@@ -114,6 +212,7 @@ final class TalkModeManager: NSObject {
|
||||
Task { await self.start() }
|
||||
}
|
||||
} else {
|
||||
self.lastConfigReloadAt = nil
|
||||
if self.isEnabled, !self.isSpeaking {
|
||||
self.statusText = "Offline"
|
||||
}
|
||||
@@ -390,7 +489,10 @@ final class TalkModeManager: NSObject {
|
||||
|
||||
self.statusText = "Thinking…"
|
||||
Task { @MainActor in
|
||||
await self.processTranscript(transcript, restartAfter: self.resumeContinuousAfterPTT)
|
||||
await self.processTranscript(
|
||||
transcript,
|
||||
restartAfter: self.resumeContinuousAfterPTT,
|
||||
trigger: .pushToTalk)
|
||||
}
|
||||
self.resumeContinuousAfterPTT = false
|
||||
self.activePTTCaptureId = nil
|
||||
@@ -672,7 +774,7 @@ final class TalkModeManager: NSObject {
|
||||
return
|
||||
}
|
||||
if self.captureMode == .continuous, !self.isSpeechOutputActive {
|
||||
await self.processTranscript(trimmed, restartAfter: true)
|
||||
await self.processTranscript(trimmed, restartAfter: true, trigger: .finalRecognition)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -682,7 +784,7 @@ final class TalkModeManager: NSObject {
|
||||
self.silenceTask = Task { [weak self] in
|
||||
guard let self else { return }
|
||||
while self.isEnabled || (self.isPushToTalkActive && self.pttAutoStopEnabled) {
|
||||
try? await Task.sleep(nanoseconds: 200_000_000)
|
||||
try? await Task.sleep(nanoseconds: self.silencePollIntervalNs)
|
||||
await self.checkSilence()
|
||||
}
|
||||
}
|
||||
@@ -696,7 +798,7 @@ final class TalkModeManager: NSObject {
|
||||
let lastActivity = [self.lastHeard, self.lastAudioActivity].compactMap { $0 }.max()
|
||||
guard let lastActivity else { return }
|
||||
if Date().timeIntervalSince(lastActivity) < self.silenceWindow { return }
|
||||
await self.processTranscript(transcript, restartAfter: true)
|
||||
await self.processTranscript(transcript, restartAfter: true, trigger: .silence)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -732,7 +834,13 @@ final class TalkModeManager: NSObject {
|
||||
continuation.resume(returning: payload)
|
||||
}
|
||||
|
||||
private func processTranscript(_ transcript: String, restartAfter: Bool) async {
|
||||
private func processTranscript(
|
||||
_ transcript: String,
|
||||
restartAfter: Bool,
|
||||
trigger: TranscriptTrigger
|
||||
) async
|
||||
{
|
||||
self.beginLatencyTrace(trigger: trigger, transcriptChars: transcript.count)
|
||||
self.isListening = false
|
||||
self.captureMode = .idle
|
||||
self.statusText = "Thinking…"
|
||||
@@ -740,8 +848,9 @@ final class TalkModeManager: NSObject {
|
||||
self.lastHeard = nil
|
||||
self.stopRecognition()
|
||||
|
||||
GatewayDiagnostics.log("talk: process transcript chars=\(transcript.count) restartAfter=\(restartAfter)")
|
||||
await self.reloadConfig()
|
||||
GatewayDiagnostics.log(
|
||||
"talk: process transcript chars=\(transcript.count) restartAfter=\(restartAfter) trigger=\(trigger.rawValue)")
|
||||
await self.reloadConfig(force: false)
|
||||
let prompt = self.buildPrompt(transcript: transcript)
|
||||
guard self.gatewayConnected, let gateway else {
|
||||
self.statusText = "Gateway not connected"
|
||||
@@ -757,10 +866,12 @@ final class TalkModeManager: NSObject {
|
||||
let startedAt = Date().timeIntervalSince1970
|
||||
let sessionKey = self.mainSessionKey
|
||||
await self.subscribeChatIfNeeded(sessionKey: sessionKey)
|
||||
self.markLatencyPhase("chat_send_start", details: "sessionKey=\(sessionKey) chars=\(prompt.count)")
|
||||
self.logger.info(
|
||||
"chat.send start sessionKey=\(sessionKey, privacy: .public) chars=\(prompt.count, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: chat.send start sessionKey=\(sessionKey) chars=\(prompt.count)")
|
||||
let runId = try await self.sendChat(prompt, gateway: gateway)
|
||||
self.markLatencyPhase("chat_send_ack", details: "runId=\(runId)")
|
||||
self.logger.info("chat.send ok runId=\(runId, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: chat.send ok runId=\(runId)")
|
||||
let shouldIncremental = self.shouldUseIncrementalTTS()
|
||||
@@ -937,6 +1048,7 @@ final class TalkModeManager: NSObject {
|
||||
let deadline = Date().addingTimeInterval(TimeInterval(timeoutSeconds))
|
||||
while Date() < deadline {
|
||||
if let text = try await self.fetchLatestAssistantText(gateway: gateway, since: since) {
|
||||
self.markLatencyFirst(.firstAssistantDelta, details: "source=history chars=\(text.count)")
|
||||
return text
|
||||
}
|
||||
try? await Task.sleep(nanoseconds: 300_000_000)
|
||||
@@ -1030,9 +1142,11 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
let request = makeRequest(outputFormat: outputFormat)
|
||||
self.markLatencyFirst(.firstTTSRequest, details: "mode=full chars=\(cleaned.count)")
|
||||
|
||||
let client = ElevenLabsTTSClient(apiKey: apiKey)
|
||||
let stream = client.streamSynthesize(voiceId: voiceId, request: request)
|
||||
let rawStream = client.streamSynthesize(voiceId: voiceId, request: request)
|
||||
let stream = self.instrumentTTSStream(rawStream, source: "elevenlabs_full")
|
||||
|
||||
if self.interruptOnSpeech {
|
||||
do {
|
||||
@@ -1048,19 +1162,22 @@ final class TalkModeManager: NSObject {
|
||||
let result: StreamingPlaybackResult
|
||||
if let sampleRate {
|
||||
self.lastPlaybackWasPCM = true
|
||||
self.markLatencyFirst(.playbackStart, details: "transport=pcm sampleRate=\(Int(sampleRate))")
|
||||
var playback = await self.pcmPlayer.play(stream: stream, sampleRate: sampleRate)
|
||||
if !playback.finished, playback.interruptedAt == nil {
|
||||
let mp3Format = ElevenLabsTTSClient.validatedOutputFormat("mp3_44100")
|
||||
self.logger.warning("pcm playback failed; retrying mp3")
|
||||
self.lastPlaybackWasPCM = false
|
||||
let mp3Stream = client.streamSynthesize(
|
||||
let rawMP3Stream = client.streamSynthesize(
|
||||
voiceId: voiceId,
|
||||
request: makeRequest(outputFormat: mp3Format))
|
||||
let mp3Stream = self.instrumentTTSStream(rawMP3Stream, source: "elevenlabs_full_retry_mp3")
|
||||
playback = await self.mp3Player.play(stream: mp3Stream)
|
||||
}
|
||||
result = playback
|
||||
} else {
|
||||
self.lastPlaybackWasPCM = false
|
||||
self.markLatencyFirst(.playbackStart, details: "transport=mp3")
|
||||
result = await self.mp3Player.play(stream: stream)
|
||||
}
|
||||
let duration = Date().timeIntervalSince(started)
|
||||
@@ -1307,6 +1424,9 @@ final class TalkModeManager: NSObject {
|
||||
text: segment,
|
||||
context: context,
|
||||
outputFormat: prefetchOutputFormat)
|
||||
self.markLatencyFirst(
|
||||
.firstTTSRequest,
|
||||
details: "mode=incremental_prefetch chars=\(segment.count)")
|
||||
let id = UUID()
|
||||
let task = Task { [weak self] in
|
||||
let stream = ElevenLabsTTSClient(apiKey: apiKey).streamSynthesize(voiceId: voiceId, request: request)
|
||||
@@ -1431,6 +1551,7 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
guard agentEvent.runId == runId, agentEvent.stream == "assistant" else { continue }
|
||||
guard let text = agentEvent.data["text"]?.value as? String else { continue }
|
||||
self.markLatencyFirst(.firstAssistantDelta, details: "source=stream chars=\(text.count)")
|
||||
let segments = self.incrementalSpeechBuffer.ingest(text: text, isFinal: false)
|
||||
if let lang = self.incrementalSpeechBuffer.directive?.language {
|
||||
self.incrementalSpeechLanguage = ElevenLabsTTSClient.validatedLanguage(lang)
|
||||
@@ -1569,30 +1690,35 @@ final class TalkModeManager: NSObject {
|
||||
if let prefetchedAudio, !prefetchedAudio.chunks.isEmpty {
|
||||
stream = Self.makeBufferedAudioStream(chunks: prefetchedAudio.chunks)
|
||||
} else {
|
||||
self.markLatencyFirst(.firstTTSRequest, details: "mode=incremental chars=\(text.count)")
|
||||
stream = client.streamSynthesize(voiceId: voiceId, request: request)
|
||||
}
|
||||
let measuredStream = self.instrumentTTSStream(stream, source: "elevenlabs_incremental")
|
||||
let playbackFormat = prefetchedAudio?.outputFormat ?? context.outputFormat
|
||||
let sampleRate = TalkTTSValidation.pcmSampleRate(from: playbackFormat)
|
||||
let result: StreamingPlaybackResult
|
||||
if let sampleRate {
|
||||
self.lastPlaybackWasPCM = true
|
||||
var playback = await self.pcmPlayer.play(stream: stream, sampleRate: sampleRate)
|
||||
self.markLatencyFirst(.playbackStart, details: "transport=pcm sampleRate=\(Int(sampleRate))")
|
||||
var playback = await self.pcmPlayer.play(stream: measuredStream, sampleRate: sampleRate)
|
||||
if !playback.finished, playback.interruptedAt == nil {
|
||||
self.logger.warning("pcm playback failed; retrying mp3")
|
||||
self.lastPlaybackWasPCM = false
|
||||
let mp3Format = ElevenLabsTTSClient.validatedOutputFormat("mp3_44100")
|
||||
let mp3Stream = client.streamSynthesize(
|
||||
let rawMP3Stream = client.streamSynthesize(
|
||||
voiceId: voiceId,
|
||||
request: self.makeIncrementalTTSRequest(
|
||||
text: text,
|
||||
context: context,
|
||||
outputFormat: mp3Format))
|
||||
let mp3Stream = self.instrumentTTSStream(rawMP3Stream, source: "elevenlabs_incremental_retry_mp3")
|
||||
playback = await self.mp3Player.play(stream: mp3Stream)
|
||||
}
|
||||
result = playback
|
||||
} else {
|
||||
self.lastPlaybackWasPCM = false
|
||||
result = await self.mp3Player.play(stream: stream)
|
||||
self.markLatencyFirst(.playbackStart, details: "transport=mp3")
|
||||
result = await self.mp3Player.play(stream: measuredStream)
|
||||
}
|
||||
if !result.finished, let interruptedAt = result.interruptedAt {
|
||||
self.lastInterruptedAtSeconds = interruptedAt
|
||||
@@ -1602,22 +1728,26 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
private struct IncrementalSpeechBuffer {
|
||||
private static let partialWordThreshold = 8
|
||||
private static let partialFlushDelay: TimeInterval = 0.3
|
||||
|
||||
private(set) var latestText: String = ""
|
||||
private(set) var directive: TalkDirective?
|
||||
private var spokenOffset: Int = 0
|
||||
private var inCodeBlock = false
|
||||
private var directiveParsed = false
|
||||
private var pendingPartialSince: Date?
|
||||
|
||||
mutating func ingest(text: String, isFinal: Bool) -> [String] {
|
||||
mutating func ingest(text: String, isFinal: Bool, now: Date = Date()) -> [String] {
|
||||
let normalized = text.replacingOccurrences(of: "\r\n", with: "\n")
|
||||
guard let usable = self.stripDirectiveIfReady(from: normalized) else { return [] }
|
||||
self.updateText(usable)
|
||||
return self.extractSegments(isFinal: isFinal)
|
||||
return self.extractSegments(isFinal: isFinal, now: now)
|
||||
}
|
||||
|
||||
mutating func flush() -> String? {
|
||||
guard !self.latestText.isEmpty else { return nil }
|
||||
let segments = self.extractSegments(isFinal: true)
|
||||
let segments = self.extractSegments(isFinal: true, now: Date())
|
||||
return segments.first
|
||||
}
|
||||
|
||||
@@ -1672,9 +1802,12 @@ private struct IncrementalSpeechBuffer {
|
||||
return idx
|
||||
}
|
||||
|
||||
private mutating func extractSegments(isFinal: Bool) -> [String] {
|
||||
private mutating func extractSegments(isFinal: Bool, now: Date) -> [String] {
|
||||
let chars = Array(self.latestText)
|
||||
guard self.spokenOffset < chars.count else { return [] }
|
||||
guard self.spokenOffset < chars.count else {
|
||||
self.pendingPartialSince = nil
|
||||
return []
|
||||
}
|
||||
var idx = self.spokenOffset
|
||||
var lastBoundary: Int?
|
||||
var inCodeBlock = self.inCodeBlock
|
||||
@@ -1708,17 +1841,41 @@ private struct IncrementalSpeechBuffer {
|
||||
if let boundary = lastBoundary {
|
||||
self.spokenOffset = boundary
|
||||
self.inCodeBlock = inCodeBlockAtBoundary
|
||||
self.pendingPartialSince = nil
|
||||
let trimmed = bufferAtBoundary.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
return trimmed.isEmpty ? [] : [trimmed]
|
||||
}
|
||||
|
||||
guard isFinal else { return [] }
|
||||
let trimmed = buffer.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else {
|
||||
self.pendingPartialSince = nil
|
||||
return []
|
||||
}
|
||||
|
||||
if !isFinal {
|
||||
if self.pendingPartialSince == nil {
|
||||
self.pendingPartialSince = now
|
||||
return []
|
||||
}
|
||||
let elapsed = now.timeIntervalSince(self.pendingPartialSince ?? now)
|
||||
if elapsed < Self.partialFlushDelay { return [] }
|
||||
if Self.wordCount(trimmed) < Self.partialWordThreshold { return [] }
|
||||
self.spokenOffset = chars.count
|
||||
self.inCodeBlock = inCodeBlock
|
||||
self.pendingPartialSince = nil
|
||||
return [trimmed]
|
||||
}
|
||||
|
||||
self.spokenOffset = chars.count
|
||||
self.inCodeBlock = inCodeBlock
|
||||
let trimmed = buffer.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
self.pendingPartialSince = nil
|
||||
return trimmed.isEmpty ? [] : [trimmed]
|
||||
}
|
||||
|
||||
private static func wordCount(_ text: String) -> Int {
|
||||
text.split(whereSeparator: { $0.isWhitespace || $0.isNewline }).count
|
||||
}
|
||||
|
||||
private static func isBoundary(_ ch: Character) -> Bool {
|
||||
ch == "." || ch == "!" || ch == "?" || ch == "\n"
|
||||
}
|
||||
@@ -1885,8 +2042,17 @@ extension TalkModeManager {
|
||||
return trimmed
|
||||
}
|
||||
|
||||
func reloadConfig() async {
|
||||
func reloadConfig(force: Bool = true) async {
|
||||
guard let gateway else { return }
|
||||
let now = Date()
|
||||
if !force, let lastConfigReloadAt,
|
||||
now.timeIntervalSince(lastConfigReloadAt) < self.configReloadCacheTTL
|
||||
{
|
||||
let ageMs = Int(now.timeIntervalSince(lastConfigReloadAt) * 1000)
|
||||
self.markLatencyPhase("config_reload_skipped", details: "cache_age=\(ageMs)ms")
|
||||
return
|
||||
}
|
||||
self.markLatencyPhase("config_reload_start")
|
||||
do {
|
||||
let res = try await gateway.request(method: "talk.config", paramsJSON: "{\"includeSecrets\":true}", timeoutSeconds: 8)
|
||||
guard let json = try JSONSerialization.jsonObject(with: res) as? [String: Any] else { return }
|
||||
@@ -1932,6 +2098,8 @@ extension TalkModeManager {
|
||||
if let interrupt = talk?["interruptOnSpeech"] as? Bool {
|
||||
self.interruptOnSpeech = interrupt
|
||||
}
|
||||
self.lastConfigReloadAt = Date()
|
||||
self.markLatencyPhase("config_reload_end", details: "ok=true")
|
||||
} catch {
|
||||
self.defaultModelId = Self.defaultModelIdFallback
|
||||
if !self.modelOverrideActive {
|
||||
@@ -1941,6 +2109,7 @@ extension TalkModeManager {
|
||||
self.gatewayTalkDefaultModelId = nil
|
||||
self.gatewayTalkApiKeyConfigured = false
|
||||
self.gatewayTalkConfigLoaded = false
|
||||
self.markLatencyPhase("config_reload_end", details: "ok=false")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user