From ff47ca02f4b6b1d2c0307e4a1f0bda0e9029fdf7 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 28 Nov 2023 10:05:05 -0300 Subject: [PATCH] Started removing async where is isn't needed --- Sources/Realtime/HeartbeatTimer.swift | 6 ++--- Sources/Realtime/PhoenixTransport.swift | 6 ++--- Sources/Realtime/Push.swift | 30 +++++++++++----------- Sources/Realtime/RealtimeChannel.swift | 24 +++++++++--------- Sources/Realtime/RealtimeClient.swift | 33 +++++++++++-------------- Sources/Supabase/SupabaseClient.swift | 6 ++--- 6 files changed, 49 insertions(+), 56 deletions(-) diff --git a/Sources/Realtime/HeartbeatTimer.swift b/Sources/Realtime/HeartbeatTimer.swift index ed676979..51db3919 100644 --- a/Sources/Realtime/HeartbeatTimer.swift +++ b/Sources/Realtime/HeartbeatTimer.swift @@ -2,7 +2,7 @@ import ConcurrencyExtras import Foundation protocol HeartbeatTimerProtocol: Sendable { - func start(_ handler: @escaping @Sendable () async -> Void) async + func start(_ handler: @escaping @Sendable () -> Void) async func stop() async } @@ -15,13 +15,13 @@ actor HeartbeatTimer: HeartbeatTimerProtocol, @unchecked Sendable { private var task: Task? - func start(_ handler: @escaping @Sendable () async -> Void) { + func start(_ handler: @escaping @Sendable () -> Void) { task?.cancel() task = Task { while !Task.isCancelled { let seconds = UInt64(timeInterval) try? await Task.sleep(nanoseconds: NSEC_PER_SEC * seconds) - await handler() + handler() } } } diff --git a/Sources/Realtime/PhoenixTransport.swift b/Sources/Realtime/PhoenixTransport.swift index fc7abdd2..84b86206 100644 --- a/Sources/Realtime/PhoenixTransport.swift +++ b/Sources/Realtime/PhoenixTransport.swift @@ -57,7 +57,7 @@ public protocol PhoenixTransport { - Parameter data: Data to send. */ - func send(data: Data) async + func send(data: Data) } // ---------------------------------------------------------------------- @@ -220,8 +220,8 @@ open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketD session?.finishTasksAndInvalidate() } - open func send(data: Data) async { - try? await stream?.task.send(.string(String(data: data, encoding: .utf8)!)) + open func send(data: Data) { + stream?.task.send(.string(String(data: data, encoding: .utf8)!)) { _ in } } // MARK: - URLSessionWebSocketDelegate diff --git a/Sources/Realtime/Push.swift b/Sources/Realtime/Push.swift index 67caabf1..25f8db8c 100644 --- a/Sources/Realtime/Push.swift +++ b/Sources/Realtime/Push.swift @@ -45,6 +45,12 @@ public final class Push: @unchecked Sendable { /// The event that is associated with the reference ID of the Push var refEvent: String? + + /// Reverses the result on channel.on(ChannelEvent, callback) that spawned the Push + mutating func cancelRefEvent() { + guard let refEvent else { return } + channel?.off(refEvent) + } } private let mutableState = LockIsolated(MutableState()) @@ -85,17 +91,17 @@ public final class Push: @unchecked Sendable { /// Resets and sends the Push /// - parameter timeout: Optional. The push timeout. Default is 10.0s - public func resend(_ timeout: TimeInterval = Defaults.timeoutInterval) async { + public func resend(_ timeout: TimeInterval = Defaults.timeoutInterval) { mutableState.withValue { $0.timeout = timeout } - await reset() - await send() + reset() + send() } /// Sends the Push. If it has already timed out, then the call will /// be ignored and return early. Use `resend` in this case. - public func send() async { + public func send() { guard !hasReceived(status: .timeout) else { return } startTimeout() @@ -105,7 +111,7 @@ public final class Push: @unchecked Sendable { let channel = mutableState.channel - await channel?.socket?.push( + channel?.socket?.push( message: Message( ref: mutableState.ref ?? "", topic: channel?.topic ?? "", @@ -156,11 +162,9 @@ public final class Push: @unchecked Sendable { } /// Resets the Push as it was after it was first initialized. - func reset() async { - // TODO: move cancelRefEvent to MutableState - cancelRefEvent() - + func reset() { mutableState.withValue { + $0.cancelRefEvent() $0.refEvent = nil $0.ref = nil $0.receivedMessage = nil @@ -178,12 +182,6 @@ public final class Push: @unchecked Sendable { } } - /// Reverses the result on channel.on(ChannelEvent, callback) that spawned the Push - private func cancelRefEvent() { - guard let refEvent = mutableState.refEvent else { return } - mutableState.channel?.off(refEvent) - } - /// Cancel any ongoing Timeout Timer func cancelTimeout() { mutableState.withValue { @@ -214,9 +212,9 @@ public final class Push: @unchecked Sendable { /// If a response is received before the Timer triggers, cancel timer /// and match the received event to it's corresponding hook channel.on(refEvent, filter: ChannelFilter()) { [weak self] message in - self?.cancelRefEvent() self?.cancelTimeout() self?.mutableState.withValue { + $0.cancelRefEvent() $0.receivedMessage = message } diff --git a/Sources/Realtime/RealtimeChannel.swift b/Sources/Realtime/RealtimeChannel.swift index cbf2d90f..2a5e0b05 100644 --- a/Sources/Realtime/RealtimeChannel.swift +++ b/Sources/Realtime/RealtimeChannel.swift @@ -301,7 +301,7 @@ public final class RealtimeChannel: @unchecked Sendable { // Send and buffered messages and clear the buffer for push in pushBuffer { - await push.send() + push.send() } mutableState.withValue { @@ -337,13 +337,13 @@ public final class RealtimeChannel: @unchecked Sendable { event: ChannelEvent.leave, timeout: mutableState.timeout ) - await leavePush.send() + leavePush.send() // Mark the RealtimeChannel as in an error and attempt to rejoin if socket is connected mutableState.withValue { $0.state = .errored } - await joinPush.reset() + joinPush.reset() if self.socket?.isConnected == true { await rejoinTimer.scheduleTimeout() @@ -387,7 +387,7 @@ public final class RealtimeChannel: @unchecked Sendable { } // Reset the push to be used again later - await self.joinPush.reset() + self.joinPush.reset() } // Mark the channel as errored and attempt to rejoin if socket is currently connected @@ -489,7 +489,7 @@ public final class RealtimeChannel: @unchecked Sendable { } if self.socket?.accessToken != nil { - await self.socket?.setAuth(self.socket?.accessToken) + self.socket?.setAuth(self.socket?.accessToken) } guard let serverPostgresFilters = message.payload["postgres_changes"]?.arrayValue? @@ -694,7 +694,7 @@ public final class RealtimeChannel: @unchecked Sendable { _ event: String, payload: Payload, timeout: TimeInterval = Defaults.timeoutInterval - ) async -> Push { + ) -> Push { guard joinedOnce else { fatalError( "Tried to push \(event) to \(topic) before joining. Use channel.join() before pushing events" @@ -708,7 +708,7 @@ public final class RealtimeChannel: @unchecked Sendable { timeout: timeout ) if canPush { - await pushEvent.send() + pushEvent.send() } else { pushEvent.startTimeout() mutableState.withValue { @@ -763,7 +763,7 @@ public final class RealtimeChannel: @unchecked Sendable { } else { let continuation = LockIsolated(CheckedContinuation?.none) - let push = await push( + let push = push( type.rawValue, payload: payload, timeout: opts["timeout"]?.numberValue ?? mutableState.timeout ) @@ -846,7 +846,7 @@ public final class RealtimeChannel: @unchecked Sendable { await leavePush .receive(.ok, callback: onCloseCallback) .receive(.timeout, callback: onCloseCallback) - await leavePush.send() + leavePush.send() // If the RealtimeChannel cannot send push events, trigger a success locally if !canPush { @@ -892,11 +892,11 @@ public final class RealtimeChannel: @unchecked Sendable { } /// Sends the payload to join the RealtimeChannel - func sendJoin(_ timeout: TimeInterval) async { + func sendJoin(_ timeout: TimeInterval) { mutableState.withValue { $0.state = .joining } - await joinPush.resend(timeout) + joinPush.resend(timeout) } /// Rejoins the channel @@ -908,7 +908,7 @@ public final class RealtimeChannel: @unchecked Sendable { await socket?.leaveOpenTopic(topic: topic) // Send the joinPush - await sendJoin(timeout ?? mutableState.timeout) + sendJoin(timeout ?? mutableState.timeout) } /// Triggers an event to the correct event bindings created by diff --git a/Sources/Realtime/RealtimeClient.swift b/Sources/Realtime/RealtimeClient.swift index 900b536f..d517ae05 100644 --- a/Sources/Realtime/RealtimeClient.swift +++ b/Sources/Realtime/RealtimeClient.swift @@ -70,7 +70,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate /// Buffers messages that need to be sent once the socket has connected. It is an array /// of tuples, with the ref of the message to send and the callback that will send the message. - var sendBuffer: [(ref: String?, callback: () async throws -> Void)] = [] + var sendBuffer: [(ref: String?, callback: () -> Void)] = [] /// Timer that triggers sending new Heartbeat messages var heartbeatTimer: HeartbeatTimerProtocol? @@ -313,7 +313,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. /// - Parameter token: A JWT string. - public func setAuth(_ token: String?) async { + public func setAuth(_ token: String?) { mutableState.withValue { $0.accessToken = token } @@ -322,7 +322,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate channel.params["user_token"] = token.map(AnyJSON.string) ?? .null if channel.joinedOnce, channel.isJoined { - await channel.push( + channel.push( ChannelEvent.accessToken, payload: ["access_token": token.map(AnyJSON.string) ?? .null] ) @@ -571,8 +571,8 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate /// - parameter payload: /// - parameter ref: Optional. Defaults to nil /// - parameter joinRef: Optional. Defaults to nil - func push(message: Message) async { - let callback: (() async throws -> Void) = { [weak self] in + func push(message: Message) { + let callback: (() -> Void) = { [weak self] in guard let self else { return } do { let data = try JSONEncoder().encode(message) @@ -581,7 +581,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate "push", "Sending \(String(data: data, encoding: String.Encoding.utf8) ?? "")" ) - await self.mutableState.connection?.send(data: data) + self.mutableState.connection?.send(data: data) } catch { // TODO: handle error } @@ -589,7 +589,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate /// If the socket is connected, then execute the callback immediately. if isConnected { - try? await callback() + callback() } else { /// If the socket is not connected, add the push to a buffer which will /// be sent immediately upon connection. @@ -622,7 +622,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate } // Send any messages that were waiting for a connection - await flushSendBuffer() + flushSendBuffer() // Reset how the socket tried to reconnect await reconnectTimer.reset() @@ -712,15 +712,10 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate } /// Send all messages that were buffered before the socket opened - func flushSendBuffer() async { - let sendBuffer = mutableState.sendBuffer - - guard isConnected, sendBuffer.count > 0 else { return } - for (_, callback) in sendBuffer { - try? await callback() - } - + func flushSendBuffer() { mutableState.withValue { + guard isConnected, $0.sendBuffer.count > 0 else { return } + $0.sendBuffer.forEach { $0.callback() } $0.sendBuffer = [] } } @@ -799,12 +794,12 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate mutableState.withValue { $0.heartbeatTimer = heartbeatTimer } await heartbeatTimer.start { [weak self] in - await self?.sendHeartbeat() + self?.sendHeartbeat() } } /// Sends a heartbeat payload to the phoenix servers - func sendHeartbeat() async { + func sendHeartbeat() { // Do not send if the connection is closed guard isConnected else { return } @@ -833,7 +828,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate } if let pendingHeartbeatRef { - await push( + push( message: Message( ref: pendingHeartbeatRef, topic: "phoenix", diff --git a/Sources/Supabase/SupabaseClient.swift b/Sources/Supabase/SupabaseClient.swift index dc34a181..be5c930c 100644 --- a/Sources/Supabase/SupabaseClient.swift +++ b/Sources/Supabase/SupabaseClient.swift @@ -150,16 +150,16 @@ public final class SupabaseClient: @unchecked Sendable { listenForAuthEventsTask.setValue( Task { for await (event, session) in await auth.authStateChanges { - await handleTokenChanged(event: event, session: session) + handleTokenChanged(event: event, session: session) } } ) } - private func handleTokenChanged(event: AuthChangeEvent, session: Session?) async { + private func handleTokenChanged(event: AuthChangeEvent, session: Session?) { let supportedEvents: [AuthChangeEvent] = [.initialSession, .signedIn, .tokenRefreshed] guard supportedEvents.contains(event) else { return } - await realtime.setAuth(session?.accessToken) + realtime.setAuth(session?.accessToken) } }