OpenClawKit: use websocket ping keepalive

This commit is contained in:
Mariano Belinky
2026-02-19 20:08:00 +00:00
parent d5a22068c7
commit cee65abfaf

View File

@@ -7,6 +7,7 @@ public protocol WebSocketTasking: AnyObject {
func resume()
func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?)
func send(_ message: URLSessionWebSocketTask.Message) async throws
func sendPing(pongReceiveHandler: @escaping @Sendable (Error?) -> Void)
func receive() async throws -> URLSessionWebSocketTask.Message
func receive(completionHandler: @escaping @Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)
}
@@ -40,6 +41,18 @@ public struct WebSocketTaskBox: @unchecked Sendable {
{
self.task.receive(completionHandler: completionHandler)
}
public func sendPing() async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
self.task.sendPing { error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
}
}
public protocol WebSocketSessioning: AnyObject {
@@ -213,7 +226,7 @@ public actor GatewayChannelActor {
private func watchdogLoop() async {
// Keep nudging reconnect in case exponential backoff stalls.
while self.shouldReconnect {
try? await Task.sleep(nanoseconds: 30 * 1_000_000_000) // 30s cadence
guard await self.sleepUnlessCancelled(nanoseconds: 30 * 1_000_000_000) else { return } // 30s cadence
guard self.shouldReconnect else { return }
if self.connected { continue }
do {
@@ -285,13 +298,15 @@ public actor GatewayChannelActor {
private func keepaliveLoop() async {
while self.shouldReconnect {
try? await Task.sleep(nanoseconds: UInt64(self.keepaliveIntervalSeconds * 1_000_000_000))
guard await self.sleepUnlessCancelled(
nanoseconds: UInt64(self.keepaliveIntervalSeconds * 1_000_000_000))
else { return }
guard self.shouldReconnect else { return }
guard self.connected else { continue }
// Best-effort outbound message to keep intermediate NAT/proxy state alive.
// We intentionally ignore the response.
guard let task = self.task else { continue }
// Best-effort ping keeps NAT/proxy state alive without generating RPC load.
do {
try await self.send(method: "health", params: nil)
try await task.sendPing()
} catch {
// Avoid spamming logs; the reconnect paths will surface meaningful errors.
}
@@ -593,7 +608,7 @@ public actor GatewayChannelActor {
private func watchTicks() async {
let tolerance = self.tickIntervalMs * 2
while self.connected {
try? await Task.sleep(nanoseconds: UInt64(tolerance * 1_000_000))
guard await self.sleepUnlessCancelled(nanoseconds: UInt64(tolerance * 1_000_000)) else { return }
guard self.connected else { return }
if let last = self.lastTick {
let delta = Date().timeIntervalSince(last) * 1000
@@ -616,7 +631,7 @@ public actor GatewayChannelActor {
guard self.shouldReconnect else { return }
let delay = self.backoffMs / 1000
self.backoffMs = min(self.backoffMs * 2, 30000)
try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
guard await self.sleepUnlessCancelled(nanoseconds: UInt64(delay * 1_000_000_000)) else { return }
guard self.shouldReconnect else { return }
do {
try await self.connect()
@@ -627,6 +642,15 @@ public actor GatewayChannelActor {
}
}
private nonisolated func sleepUnlessCancelled(nanoseconds: UInt64) async -> Bool {
do {
try await Task.sleep(nanoseconds: nanoseconds)
} catch {
return false
}
return !Task.isCancelled
}
public func request(
method: String,
params: [String: AnyCodable]?,