diff --git a/Sources/Realtime/Channel.swift b/Sources/Realtime/Channel.swift index 3459e0a7..6102eaf1 100644 --- a/Sources/Realtime/Channel.swift +++ b/Sources/Realtime/Channel.swift @@ -71,7 +71,7 @@ public actor RealtimeChannelV2 { await socket?.connect() } - socket?.addChannel(self) + await socket?.addChannel(self) _status.value = .subscribing debug("subscribing to channel \(topic)") @@ -88,7 +88,7 @@ public actor RealtimeChannelV2 { accessToken: currentJwt ) - joinRef = socket?.makeRef().description + joinRef = await socket?.makeRef().description debug("subscribing to channel with body: \(joinConfig)") diff --git a/Sources/Realtime/Realtime.swift b/Sources/Realtime/Realtime.swift index 3796b5a6..6f6a080a 100644 --- a/Sources/Realtime/Realtime.swift +++ b/Sources/Realtime/Realtime.swift @@ -14,7 +14,7 @@ public protocol AuthTokenProvider: Sendable { func authToken() async -> String? } -public final class Realtime: @unchecked Sendable { +public actor Realtime { public struct Configuration: Sendable { var url: URL var apiKey: String @@ -52,45 +52,34 @@ public final class Realtime: @unchecked Sendable { case connected } - struct MutableState { - var ref = 0 - var heartbeatRef: Int? - var heartbeatTask: Task? - var messageTask: Task? - var subscriptions: [String: RealtimeChannelV2] = [:] - var ws: WebSocketClientProtocol? - - mutating func makeRef() -> Int { - ref += 1 - return ref - } - } + var ref = 0 + var pendingHeartbeatRef: Int? + var heartbeatTask: Task? + var messageTask: Task? + var inFlightConnectionTask: Task? + + public private(set) var subscriptions: [String: RealtimeChannelV2] = [:] + var ws: WebSocketClientProtocol? let config: Configuration let makeWebSocketClient: (URL) -> WebSocketClientProtocol - let mutableState = LockIsolated(MutableState()) + let _status: CurrentValueSubject = CurrentValueSubject(.disconnected) public var status: Status { _status.value } - public var subscriptions: [String: RealtimeChannelV2] { - mutableState.subscriptions - } - init(config: Configuration, makeWebSocketClient: @escaping (URL) -> WebSocketClientProtocol) { self.config = config self.makeWebSocketClient = makeWebSocketClient } deinit { - mutableState.withValue { - $0.heartbeatTask?.cancel() - $0.messageTask?.cancel() - $0.subscriptions = [:] - $0.ws?.cancel() - } + heartbeatTask?.cancel() + messageTask?.cancel() + subscriptions = [:] + ws?.cancel() } - public convenience init(config: Configuration) { + public init(config: Configuration) { self.init( config: config, makeWebSocketClient: { WebSocketClient(realtimeURL: $0, configuration: .default) } @@ -102,46 +91,52 @@ public final class Realtime: @unchecked Sendable { } func connect(reconnect: Bool) async { - if reconnect { - try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay)) + if let inFlightConnectionTask { + return await inFlightConnectionTask.value + } - if Task.isCancelled { - debug("reconnect cancelled, returning") - return + inFlightConnectionTask = Task { + if reconnect { + try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay)) + + if Task.isCancelled { + debug("reconnect cancelled, returning") + return + } } - } - if _status.value == .connected { - debug("Websocket already connected") - return - } + if _status.value == .connected { + debug("Websocket already connected") + return + } - _status.value = .connecting + _status.value = .connecting - let realtimeURL = realtimeWebSocketURL + let realtimeURL = realtimeWebSocketURL - let ws = mutableState.withValue { - $0.ws = makeWebSocketClient(realtimeURL) - return $0.ws! - } + let ws = makeWebSocketClient(realtimeURL) + self.ws = ws - let connectionStatus = try? await ws.connect().first { _ in true } + let connectionStatus = try? await ws.connect().first { _ in true } - if connectionStatus == .open { - _status.value = .connected - debug("Connected to realtime websocket") - listenForMessages() - startHeartbeating() - if reconnect { - await rejoinChannels() + if connectionStatus == .open { + _status.value = .connected + debug("Connected to realtime websocket") + listenForMessages() + startHeartbeating() + if reconnect { + await rejoinChannels() + } + } else { + debug( + "Error while trying to connect to realtime websocket. Trying again in \(config.reconnectDelay) seconds." + ) + disconnect() + await connect(reconnect: true) } - } else { - debug( - "Error while trying to connect to realtime websocket. Trying again in \(config.reconnectDelay) seconds." - ) - disconnect() - await connect(reconnect: true) } + + await inFlightConnectionTask?.value } public func channel( @@ -162,7 +157,7 @@ public final class Realtime: @unchecked Sendable { } public func addChannel(_ channel: RealtimeChannelV2) { - mutableState.withValue { $0.subscriptions[channel.topic] = channel } + subscriptions[channel.topic] = channel } public func removeChannel(_ channel: RealtimeChannelV2) async { @@ -170,13 +165,11 @@ public final class Realtime: @unchecked Sendable { await channel.unsubscribe() } - mutableState.withValue { - $0.subscriptions[channel.topic] = nil + subscriptions[channel.topic] = nil - if $0.subscriptions.isEmpty { - debug("No more subscribed channel in socket") - disconnect() - } + if subscriptions.isEmpty { + debug("No more subscribed channel in socket") + disconnect() } } @@ -188,68 +181,52 @@ public final class Realtime: @unchecked Sendable { } private func listenForMessages() { - mutableState.withValue { - let ws = $0.ws - - $0.messageTask = Task { [weak self] in - guard let self, let ws else { return } - - do { - for try await message in ws.receive() { - await onMessage(message) - } - } catch { - debug( - "Error while listening for messages. Trying again in \(config.reconnectDelay) \(error)" - ) - disconnect() - await connect(reconnect: true) + messageTask = Task { [weak self] in + guard let self, let ws = await ws else { return } + + do { + for try await message in ws.receive() { + await onMessage(message) } + } catch { + debug( + "Error while listening for messages. Trying again in \(config.reconnectDelay) \(error)" + ) + await disconnect() + await connect(reconnect: true) } } } private func startHeartbeating() { - mutableState.withValue { - $0.heartbeatTask = Task { [weak self] in - guard let self else { return } - - while !Task.isCancelled { - try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.heartbeatInterval)) - if Task.isCancelled { - break - } - await sendHeartbeat() + heartbeatTask = Task { [weak self] in + guard let self else { return } + + while !Task.isCancelled { + try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.heartbeatInterval)) + if Task.isCancelled { + break } + await sendHeartbeat() } } } private func sendHeartbeat() async { - let timedOut = mutableState.withValue { - if $0.heartbeatRef != nil { - $0.heartbeatRef = nil - return true - } - return false - } - - if timedOut { + if pendingHeartbeatRef != nil { + pendingHeartbeatRef = nil debug("Heartbeat timeout. Trying to reconnect in \(config.reconnectDelay)") disconnect() await connect(reconnect: true) return } - let heartbeatRef = mutableState.withValue { - $0.heartbeatRef = $0.makeRef() - return $0.heartbeatRef - } + pendingHeartbeatRef = makeRef() await send( RealtimeMessageV2( joinRef: nil, - ref: heartbeatRef?.description, + ref: pendingHeartbeatRef?.description, topic: "phoenix", event: "heartbeat", payload: [:] @@ -259,38 +236,29 @@ public final class Realtime: @unchecked Sendable { public func disconnect() { debug("Closing websocket connection") - mutableState.withValue { - $0.ref = 0 - $0.messageTask?.cancel() - $0.heartbeatTask?.cancel() - $0.ws?.cancel() - $0.ws = nil - } + ref = 0 + messageTask?.cancel() + heartbeatTask?.cancel() + ws?.cancel() + ws = nil _status.value = .disconnected } private func onMessage(_ message: RealtimeMessageV2) async { - let forward: () async -> Void = mutableState.withValue { - let channel = $0.subscriptions[message.topic] + let channel = subscriptions[message.topic] - if Int(message.ref ?? "") == $0.heartbeatRef { - $0.heartbeatRef = 0 - debug("heartbeat received") - return {} - } else { - debug("Received event \(message.event) for channel \(channel?.topic ?? "null")") - return { - await channel?.onMessage(message) - } - } + if Int(message.ref ?? "") == pendingHeartbeatRef { + pendingHeartbeatRef = nil + debug("heartbeat received") + } else { + debug("Received event \(message.event) for channel \(channel?.topic ?? "null")") + await channel?.onMessage(message) } - - await forward() } func send(_ message: RealtimeMessageV2) async { do { - try await mutableState.ws?.send(message) + try await ws?.send(message) } catch { debug(""" Failed to send message: @@ -303,7 +271,8 @@ public final class Realtime: @unchecked Sendable { } func makeRef() -> Int { - mutableState.withValue { $0.makeRef() } + ref += 1 + return ref } private var realtimeBaseURL: URL { diff --git a/Sources/Realtime/_Push.swift b/Sources/Realtime/_Push.swift index 3e2f1b0b..128629c6 100644 --- a/Sources/Realtime/_Push.swift +++ b/Sources/Realtime/_Push.swift @@ -21,7 +21,7 @@ actor _Push { func send() async -> PushStatus { do { - try await channel?.socket?.mutableState.ws?.send(message) + try await channel?.socket?.ws?.send(message) if channel?.config.broadcast.acknowledgeBroadcasts == true { return await withCheckedContinuation {