mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 01:57:27 +00:00
iOS: auto-resync chat after reconnect gaps (#21135)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 1beca3a76d
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
@@ -435,8 +435,12 @@ public final class OpenClawChatViewModel {
|
||||
case let .agent(agent):
|
||||
self.handleAgentEvent(agent)
|
||||
case .seqGap:
|
||||
self.errorText = "Event stream interrupted; try refreshing."
|
||||
self.errorText = nil
|
||||
self.clearPendingRuns(reason: nil)
|
||||
Task {
|
||||
await self.refreshHistoryAfterRun()
|
||||
await self.pollHealthIfNeeded(force: true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ public actor GatewayNodeSession {
|
||||
private var onConnected: (@Sendable () async -> Void)?
|
||||
private var onDisconnected: (@Sendable (String) async -> Void)?
|
||||
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
|
||||
private var hasEverConnected = false
|
||||
private var hasNotifiedConnected = false
|
||||
private var snapshotReceived = false
|
||||
private var snapshotWaiters: [CheckedContinuation<Bool, Never>] = []
|
||||
@@ -214,6 +215,7 @@ public actor GatewayNodeSession {
|
||||
self.activeToken = nil
|
||||
self.activePassword = nil
|
||||
self.activeConnectOptionsKey = nil
|
||||
self.hasEverConnected = false
|
||||
self.resetConnectionState()
|
||||
}
|
||||
|
||||
@@ -274,6 +276,11 @@ public actor GatewayNodeSession {
|
||||
case let .snapshot(ok):
|
||||
let raw = ok.canvashosturl?.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
self.canvasHostUrl = (raw?.isEmpty == false) ? raw : nil
|
||||
if self.hasEverConnected {
|
||||
self.broadcastServerEvent(
|
||||
EventFrame(type: "event", event: "seqGap", payload: nil, seq: nil, stateversion: nil))
|
||||
}
|
||||
self.hasEverConnected = true
|
||||
self.markSnapshotReceived()
|
||||
await self.notifyConnectedIfNeeded()
|
||||
case let .event(evt):
|
||||
|
||||
@@ -416,6 +416,48 @@ extension TestChatTransportState {
|
||||
#expect(await MainActor.run { vm.pendingToolCalls.isEmpty })
|
||||
}
|
||||
|
||||
@Test func seqGapClearsPendingRunsAndAutoRefreshesHistory() async throws {
|
||||
let now = Date().timeIntervalSince1970 * 1000
|
||||
let history1 = OpenClawChatHistoryPayload(
|
||||
sessionKey: "main",
|
||||
sessionId: "sess-main",
|
||||
messages: [],
|
||||
thinkingLevel: "off")
|
||||
let history2 = OpenClawChatHistoryPayload(
|
||||
sessionKey: "main",
|
||||
sessionId: "sess-main",
|
||||
messages: [
|
||||
AnyCodable([
|
||||
"role": "assistant",
|
||||
"content": [["type": "text", "text": "resynced after gap"]],
|
||||
"timestamp": now,
|
||||
]),
|
||||
],
|
||||
thinkingLevel: "off")
|
||||
|
||||
let transport = TestChatTransport(historyResponses: [history1, history2])
|
||||
let vm = await MainActor.run { OpenClawChatViewModel(sessionKey: "main", transport: transport) }
|
||||
|
||||
await MainActor.run { vm.load() }
|
||||
try await waitUntil("bootstrap") { await MainActor.run { vm.healthOK } }
|
||||
|
||||
await MainActor.run {
|
||||
vm.input = "hello"
|
||||
vm.send()
|
||||
}
|
||||
try await waitUntil("pending run starts") { await MainActor.run { vm.pendingRunCount == 1 } }
|
||||
|
||||
transport.emit(.seqGap)
|
||||
|
||||
try await waitUntil("pending run clears on seqGap") {
|
||||
await MainActor.run { vm.pendingRunCount == 0 }
|
||||
}
|
||||
try await waitUntil("history refreshes on seqGap") {
|
||||
await MainActor.run { vm.messages.contains(where: { $0.role == "assistant" }) }
|
||||
}
|
||||
#expect(await MainActor.run { vm.errorText == nil })
|
||||
}
|
||||
|
||||
@Test func sessionChoicesPreferMainAndRecent() async throws {
|
||||
let now = Date().timeIntervalSince1970 * 1000
|
||||
let recent = now - (2 * 60 * 60 * 1000)
|
||||
|
||||
@@ -3,6 +3,178 @@ import Testing
|
||||
@testable import OpenClawKit
|
||||
import OpenClawProtocol
|
||||
|
||||
private struct TimeoutError: Error, CustomStringConvertible {
|
||||
let label: String
|
||||
var description: String { "Timeout waiting for: \(self.label)" }
|
||||
}
|
||||
|
||||
private func waitUntil(
|
||||
_ label: String,
|
||||
timeoutSeconds: Double = 3.0,
|
||||
pollMs: UInt64 = 10,
|
||||
_ condition: @escaping @Sendable () async -> Bool) async throws
|
||||
{
|
||||
let deadline = Date().addingTimeInterval(timeoutSeconds)
|
||||
while Date() < deadline {
|
||||
if await condition() {
|
||||
return
|
||||
}
|
||||
try await Task.sleep(nanoseconds: pollMs * 1_000_000)
|
||||
}
|
||||
throw TimeoutError(label: label)
|
||||
}
|
||||
|
||||
private extension NSLock {
|
||||
func withLock<T>(_ body: () -> T) -> T {
|
||||
self.lock()
|
||||
defer { self.unlock() }
|
||||
return body()
|
||||
}
|
||||
}
|
||||
|
||||
private final class FakeGatewayWebSocketTask: WebSocketTasking, @unchecked Sendable {
|
||||
private let lock = NSLock()
|
||||
private var _state: URLSessionTask.State = .suspended
|
||||
private var connectRequestId: String?
|
||||
private var receivePhase = 0
|
||||
private var pendingReceiveHandler:
|
||||
(@Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)?
|
||||
|
||||
var state: URLSessionTask.State {
|
||||
get { self.lock.withLock { self._state } }
|
||||
set { self.lock.withLock { self._state = newValue } }
|
||||
}
|
||||
|
||||
func resume() {
|
||||
self.state = .running
|
||||
}
|
||||
|
||||
func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
|
||||
_ = (closeCode, reason)
|
||||
self.state = .canceling
|
||||
let handler = self.lock.withLock { () -> (@Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)? in
|
||||
defer { self.pendingReceiveHandler = nil }
|
||||
return self.pendingReceiveHandler
|
||||
}
|
||||
handler?(Result<URLSessionWebSocketTask.Message, Error>.failure(URLError(.cancelled)))
|
||||
}
|
||||
|
||||
func send(_ message: URLSessionWebSocketTask.Message) async throws {
|
||||
let data: Data? = switch message {
|
||||
case let .data(d): d
|
||||
case let .string(s): s.data(using: .utf8)
|
||||
@unknown default: nil
|
||||
}
|
||||
guard let data else { return }
|
||||
if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
||||
obj["type"] as? String == "req",
|
||||
obj["method"] as? String == "connect",
|
||||
let id = obj["id"] as? String
|
||||
{
|
||||
self.lock.withLock { self.connectRequestId = id }
|
||||
}
|
||||
}
|
||||
|
||||
func receive() async throws -> URLSessionWebSocketTask.Message {
|
||||
let phase = self.lock.withLock { () -> Int in
|
||||
let current = self.receivePhase
|
||||
self.receivePhase += 1
|
||||
return current
|
||||
}
|
||||
if phase == 0 {
|
||||
return .data(Self.connectChallengeData(nonce: "nonce-1"))
|
||||
}
|
||||
for _ in 0..<50 {
|
||||
let id = self.lock.withLock { self.connectRequestId }
|
||||
if let id {
|
||||
return .data(Self.connectOkData(id: id))
|
||||
}
|
||||
try await Task.sleep(nanoseconds: 1_000_000)
|
||||
}
|
||||
return .data(Self.connectOkData(id: "connect"))
|
||||
}
|
||||
|
||||
func receive(
|
||||
completionHandler: @escaping @Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)
|
||||
{
|
||||
self.lock.withLock { self.pendingReceiveHandler = completionHandler }
|
||||
}
|
||||
|
||||
func emitReceiveFailure() {
|
||||
let handler = self.lock.withLock { () -> (@Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)? in
|
||||
self._state = .canceling
|
||||
defer { self.pendingReceiveHandler = nil }
|
||||
return self.pendingReceiveHandler
|
||||
}
|
||||
handler?(Result<URLSessionWebSocketTask.Message, Error>.failure(URLError(.networkConnectionLost)))
|
||||
}
|
||||
|
||||
private static func connectChallengeData(nonce: String) -> Data {
|
||||
let json = """
|
||||
{
|
||||
"type": "event",
|
||||
"event": "connect.challenge",
|
||||
"payload": { "nonce": "\(nonce)" }
|
||||
}
|
||||
"""
|
||||
return Data(json.utf8)
|
||||
}
|
||||
|
||||
private static func connectOkData(id: String) -> Data {
|
||||
let json = """
|
||||
{
|
||||
"type": "res",
|
||||
"id": "\(id)",
|
||||
"ok": true,
|
||||
"payload": {
|
||||
"type": "hello-ok",
|
||||
"protocol": 2,
|
||||
"server": { "version": "test", "connId": "test" },
|
||||
"features": { "methods": [], "events": [] },
|
||||
"snapshot": {
|
||||
"presence": [ { "ts": 1 } ],
|
||||
"health": {},
|
||||
"stateVersion": { "presence": 0, "health": 0 },
|
||||
"uptimeMs": 0
|
||||
},
|
||||
"policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 }
|
||||
}
|
||||
}
|
||||
"""
|
||||
return Data(json.utf8)
|
||||
}
|
||||
}
|
||||
|
||||
private final class FakeGatewayWebSocketSession: WebSocketSessioning, @unchecked Sendable {
|
||||
private let lock = NSLock()
|
||||
private var tasks: [FakeGatewayWebSocketTask] = []
|
||||
private var makeCount = 0
|
||||
|
||||
func snapshotMakeCount() -> Int {
|
||||
self.lock.withLock { self.makeCount }
|
||||
}
|
||||
|
||||
func latestTask() -> FakeGatewayWebSocketTask? {
|
||||
self.lock.withLock { self.tasks.last }
|
||||
}
|
||||
|
||||
func makeWebSocketTask(url: URL) -> WebSocketTaskBox {
|
||||
_ = url
|
||||
return self.lock.withLock {
|
||||
self.makeCount += 1
|
||||
let task = FakeGatewayWebSocketTask()
|
||||
self.tasks.append(task)
|
||||
return WebSocketTaskBox(task: task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private actor SeqGapProbe {
|
||||
private var saw = false
|
||||
func mark() { self.saw = true }
|
||||
func value() -> Bool { self.saw }
|
||||
}
|
||||
|
||||
struct GatewayNodeSessionTests {
|
||||
@Test
|
||||
func invokeWithTimeoutReturnsUnderlyingResponseBeforeTimeout() async {
|
||||
@@ -53,4 +225,56 @@ struct GatewayNodeSessionTests {
|
||||
#expect(response.ok == true)
|
||||
#expect(response.error == nil)
|
||||
}
|
||||
|
||||
@Test
|
||||
func emitsSyntheticSeqGapAfterReconnectSnapshot() async throws {
|
||||
let session = FakeGatewayWebSocketSession()
|
||||
let gateway = GatewayNodeSession()
|
||||
let options = GatewayConnectOptions(
|
||||
role: "operator",
|
||||
scopes: ["operator.read"],
|
||||
caps: [],
|
||||
commands: [],
|
||||
permissions: [:],
|
||||
clientId: "openclaw-ios-test",
|
||||
clientMode: "ui",
|
||||
clientDisplayName: "iOS Test",
|
||||
includeDeviceIdentity: false)
|
||||
|
||||
let stream = await gateway.subscribeServerEvents(bufferingNewest: 32)
|
||||
let probe = SeqGapProbe()
|
||||
let listenTask = Task {
|
||||
for await evt in stream {
|
||||
if evt.event == "seqGap" {
|
||||
await probe.mark()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try await gateway.connect(
|
||||
url: URL(string: "ws://example.invalid")!,
|
||||
token: nil,
|
||||
password: nil,
|
||||
connectOptions: options,
|
||||
sessionBox: WebSocketSessionBox(session: session),
|
||||
onConnected: {},
|
||||
onDisconnected: { _ in },
|
||||
onInvoke: { req in
|
||||
BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil)
|
||||
})
|
||||
|
||||
let firstTask = try #require(session.latestTask())
|
||||
firstTask.emitReceiveFailure()
|
||||
|
||||
try await waitUntil("reconnect socket created") {
|
||||
session.snapshotMakeCount() >= 2
|
||||
}
|
||||
try await waitUntil("synthetic seqGap broadcast") {
|
||||
await probe.value()
|
||||
}
|
||||
|
||||
listenTask.cancel()
|
||||
await gateway.disconnect()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user