iOS: fix node notify and identity

This commit is contained in:
Mariano Belinky
2026-01-31 20:02:49 +01:00
committed by Mariano Belinky
parent d9cadf9737
commit 761188cd1d
11 changed files with 263 additions and 38 deletions

View File

@@ -16,6 +16,7 @@ public actor GatewayNodeSession {
private let logger = Logger(subsystem: "ai.openclaw", category: "node.gateway")
private let decoder = JSONDecoder()
private let encoder = JSONEncoder()
private static let defaultInvokeTimeoutMs = 30_000
private var channel: GatewayChannelActor?
private var activeURL: URL?
private var activeToken: String?
@@ -33,28 +34,66 @@ public actor GatewayNodeSession {
timeoutMs: Int?,
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse
) async -> BridgeInvokeResponse {
let timeout = max(0, timeoutMs ?? 0)
let timeoutLogger = Logger(subsystem: "ai.openclaw", category: "node.gateway")
let timeout: Int = {
if let timeoutMs { return max(0, timeoutMs) }
return Self.defaultInvokeTimeoutMs
}()
guard timeout > 0 else {
return await onInvoke(request)
}
return await withTaskGroup(of: BridgeInvokeResponse.self) { group in
group.addTask { await onInvoke(request) }
group.addTask {
final class InvokeLatch: @unchecked Sendable {
private let lock = NSLock()
private var continuation: CheckedContinuation<BridgeInvokeResponse, Never>?
private var resumed = false
func setContinuation(_ continuation: CheckedContinuation<BridgeInvokeResponse, Never>) {
self.lock.lock()
defer { self.lock.unlock() }
self.continuation = continuation
}
func resume(_ response: BridgeInvokeResponse) {
let cont: CheckedContinuation<BridgeInvokeResponse, Never>?
self.lock.lock()
if self.resumed {
self.lock.unlock()
return
}
self.resumed = true
cont = self.continuation
self.continuation = nil
self.lock.unlock()
cont?.resume(returning: response)
}
}
let latch = InvokeLatch()
var onInvokeTask: Task<Void, Never>?
var timeoutTask: Task<Void, Never>?
let response = await withCheckedContinuation { (cont: CheckedContinuation<BridgeInvokeResponse, Never>) in
latch.setContinuation(cont)
onInvokeTask = Task.detached {
let result = await onInvoke(request)
latch.resume(result)
}
timeoutTask = Task.detached {
try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000)
return BridgeInvokeResponse(
timeoutLogger.info("node invoke timeout fired id=\(request.id, privacy: .public)")
latch.resume(BridgeInvokeResponse(
id: request.id,
ok: false,
error: OpenClawNodeError(
code: .unavailable,
message: "node invoke timed out")
)
))
}
let first = await group.next()!
group.cancelAll()
return first
}
onInvokeTask?.cancel()
timeoutTask?.cancel()
timeoutLogger.info("node invoke race resolved id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)")
return response
}
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
private var canvasHostUrl: String?
@@ -255,14 +294,17 @@ public actor GatewayNodeSession {
guard let payload = evt.payload else { return }
do {
let request = try self.decodeInvokeRequest(from: payload)
self.logger.info("node invoke request decoded id=\(request.id, privacy: .public) command=\(request.command, privacy: .public)")
let timeoutLabel = request.timeoutMs.map(String.init) ?? "none"
self.logger.info("node invoke request decoded id=\(request.id, privacy: .public) command=\(request.command, privacy: .public) timeoutMs=\(timeoutLabel, privacy: .public)")
guard let onInvoke else { return }
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
self.logger.info("node invoke executing id=\(request.id, privacy: .public)")
let response = await Self.invokeWithTimeout(
request: req,
timeoutMs: request.timeoutMs,
onInvoke: onInvoke
)
self.logger.info("node invoke completed id=\(request.id, privacy: .public) ok=\(response.ok, privacy: .public)")
await self.sendInvokeResult(request: request, response: response)
} catch {
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")

View File

@@ -123,6 +123,10 @@
"screen_record": {
"label": "screen record",
"detailKeys": ["node", "nodeId", "duration", "durationMs", "fps", "screenIndex"]
},
"invoke": {
"label": "invoke",
"detailKeys": ["node", "nodeId", "invokeCommand"]
}
}
},