From cee65abfafac4256e44a233a0572392de7d793aa Mon Sep 17 00:00:00 2001 From: Mariano Belinky Date: Thu, 19 Feb 2026 20:08:00 +0000 Subject: [PATCH] OpenClawKit: use websocket ping keepalive --- .../Sources/OpenClawKit/GatewayChannel.swift | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift b/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift index 9682a31aa46..fc0be4a94a3 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawKit/GatewayChannel.swift @@ -7,6 +7,7 @@ public protocol WebSocketTasking: AnyObject { func resume() func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) func send(_ message: URLSessionWebSocketTask.Message) async throws + func sendPing(pongReceiveHandler: @escaping @Sendable (Error?) -> Void) func receive() async throws -> URLSessionWebSocketTask.Message func receive(completionHandler: @escaping @Sendable (Result) -> Void) } @@ -40,6 +41,18 @@ public struct WebSocketTaskBox: @unchecked Sendable { { self.task.receive(completionHandler: completionHandler) } + + public func sendPing() async throws { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.task.sendPing { error in + if let error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: ()) + } + } + } + } } public protocol WebSocketSessioning: AnyObject { @@ -213,7 +226,7 @@ public actor GatewayChannelActor { private func watchdogLoop() async { // Keep nudging reconnect in case exponential backoff stalls. while self.shouldReconnect { - try? await Task.sleep(nanoseconds: 30 * 1_000_000_000) // 30s cadence + guard await self.sleepUnlessCancelled(nanoseconds: 30 * 1_000_000_000) else { return } // 30s cadence guard self.shouldReconnect else { return } if self.connected { continue } do { @@ -285,13 +298,15 @@ public actor GatewayChannelActor { private func keepaliveLoop() async { while self.shouldReconnect { - try? await Task.sleep(nanoseconds: UInt64(self.keepaliveIntervalSeconds * 1_000_000_000)) + guard await self.sleepUnlessCancelled( + nanoseconds: UInt64(self.keepaliveIntervalSeconds * 1_000_000_000)) + else { return } guard self.shouldReconnect else { return } guard self.connected else { continue } - // Best-effort outbound message to keep intermediate NAT/proxy state alive. - // We intentionally ignore the response. + guard let task = self.task else { continue } + // Best-effort ping keeps NAT/proxy state alive without generating RPC load. do { - try await self.send(method: "health", params: nil) + try await task.sendPing() } catch { // Avoid spamming logs; the reconnect paths will surface meaningful errors. } @@ -593,7 +608,7 @@ public actor GatewayChannelActor { private func watchTicks() async { let tolerance = self.tickIntervalMs * 2 while self.connected { - try? await Task.sleep(nanoseconds: UInt64(tolerance * 1_000_000)) + guard await self.sleepUnlessCancelled(nanoseconds: UInt64(tolerance * 1_000_000)) else { return } guard self.connected else { return } if let last = self.lastTick { let delta = Date().timeIntervalSince(last) * 1000 @@ -616,7 +631,7 @@ public actor GatewayChannelActor { guard self.shouldReconnect else { return } let delay = self.backoffMs / 1000 self.backoffMs = min(self.backoffMs * 2, 30000) - try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + guard await self.sleepUnlessCancelled(nanoseconds: UInt64(delay * 1_000_000_000)) else { return } guard self.shouldReconnect else { return } do { try await self.connect() @@ -627,6 +642,15 @@ public actor GatewayChannelActor { } } + private nonisolated func sleepUnlessCancelled(nanoseconds: UInt64) async -> Bool { + do { + try await Task.sleep(nanoseconds: nanoseconds) + } catch { + return false + } + return !Task.isCancelled + } + public func request( method: String, params: [String: AnyCodable]?,