Skip to content

Commit

Permalink
Started removing async where is isn't needed
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Nov 29, 2023
1 parent e7b44f5 commit ff47ca0
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 56 deletions.
6 changes: 3 additions & 3 deletions Sources/Realtime/HeartbeatTimer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import ConcurrencyExtras
import Foundation

protocol HeartbeatTimerProtocol: Sendable {
func start(_ handler: @escaping @Sendable () async -> Void) async
func start(_ handler: @escaping @Sendable () -> Void) async
func stop() async
}

Expand All @@ -15,13 +15,13 @@ actor HeartbeatTimer: HeartbeatTimerProtocol, @unchecked Sendable {

private var task: Task<Void, Never>?

func start(_ handler: @escaping @Sendable () async -> Void) {
func start(_ handler: @escaping @Sendable () -> Void) {
task?.cancel()
task = Task {
while !Task.isCancelled {
let seconds = UInt64(timeInterval)
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * seconds)
await handler()
handler()
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions Sources/Realtime/PhoenixTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public protocol PhoenixTransport {

- Parameter data: Data to send.
*/
func send(data: Data) async
func send(data: Data)
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -220,8 +220,8 @@ open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketD
session?.finishTasksAndInvalidate()
}

open func send(data: Data) async {
try? await stream?.task.send(.string(String(data: data, encoding: .utf8)!))
open func send(data: Data) {
stream?.task.send(.string(String(data: data, encoding: .utf8)!)) { _ in }
}

// MARK: - URLSessionWebSocketDelegate
Expand Down
30 changes: 14 additions & 16 deletions Sources/Realtime/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public final class Push: @unchecked Sendable {

/// The event that is associated with the reference ID of the Push
var refEvent: String?

/// Reverses the result on channel.on(ChannelEvent, callback) that spawned the Push
mutating func cancelRefEvent() {
guard let refEvent else { return }
channel?.off(refEvent)
}
}

private let mutableState = LockIsolated(MutableState())
Expand Down Expand Up @@ -85,17 +91,17 @@ public final class Push: @unchecked Sendable {

/// Resets and sends the Push
/// - parameter timeout: Optional. The push timeout. Default is 10.0s
public func resend(_ timeout: TimeInterval = Defaults.timeoutInterval) async {
public func resend(_ timeout: TimeInterval = Defaults.timeoutInterval) {
mutableState.withValue {
$0.timeout = timeout
}
await reset()
await send()
reset()
send()
}

/// Sends the Push. If it has already timed out, then the call will
/// be ignored and return early. Use `resend` in this case.
public func send() async {
public func send() {
guard !hasReceived(status: .timeout) else { return }

startTimeout()
Expand All @@ -105,7 +111,7 @@ public final class Push: @unchecked Sendable {

let channel = mutableState.channel

await channel?.socket?.push(
channel?.socket?.push(
message: Message(
ref: mutableState.ref ?? "",
topic: channel?.topic ?? "",
Expand Down Expand Up @@ -156,11 +162,9 @@ public final class Push: @unchecked Sendable {
}

/// Resets the Push as it was after it was first initialized.
func reset() async {
// TODO: move cancelRefEvent to MutableState
cancelRefEvent()

func reset() {
mutableState.withValue {
$0.cancelRefEvent()
$0.refEvent = nil
$0.ref = nil
$0.receivedMessage = nil
Expand All @@ -178,12 +182,6 @@ public final class Push: @unchecked Sendable {
}
}

/// Reverses the result on channel.on(ChannelEvent, callback) that spawned the Push
private func cancelRefEvent() {
guard let refEvent = mutableState.refEvent else { return }
mutableState.channel?.off(refEvent)
}

/// Cancel any ongoing Timeout Timer
func cancelTimeout() {
mutableState.withValue {
Expand Down Expand Up @@ -214,9 +212,9 @@ public final class Push: @unchecked Sendable {
/// If a response is received before the Timer triggers, cancel timer
/// and match the received event to it's corresponding hook
channel.on(refEvent, filter: ChannelFilter()) { [weak self] message in
self?.cancelRefEvent()
self?.cancelTimeout()
self?.mutableState.withValue {
$0.cancelRefEvent()
$0.receivedMessage = message
}

Expand Down
24 changes: 12 additions & 12 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public final class RealtimeChannel: @unchecked Sendable {

// Send and buffered messages and clear the buffer
for push in pushBuffer {
await push.send()
push.send()
}

mutableState.withValue {
Expand Down Expand Up @@ -337,13 +337,13 @@ public final class RealtimeChannel: @unchecked Sendable {
event: ChannelEvent.leave,
timeout: mutableState.timeout
)
await leavePush.send()
leavePush.send()

// Mark the RealtimeChannel as in an error and attempt to rejoin if socket is connected
mutableState.withValue {
$0.state = .errored
}
await joinPush.reset()
joinPush.reset()

if self.socket?.isConnected == true {
await rejoinTimer.scheduleTimeout()
Expand Down Expand Up @@ -387,7 +387,7 @@ public final class RealtimeChannel: @unchecked Sendable {
}

// Reset the push to be used again later
await self.joinPush.reset()
self.joinPush.reset()
}

// Mark the channel as errored and attempt to rejoin if socket is currently connected
Expand Down Expand Up @@ -489,7 +489,7 @@ public final class RealtimeChannel: @unchecked Sendable {
}

if self.socket?.accessToken != nil {
await self.socket?.setAuth(self.socket?.accessToken)
self.socket?.setAuth(self.socket?.accessToken)
}

guard let serverPostgresFilters = message.payload["postgres_changes"]?.arrayValue?
Expand Down Expand Up @@ -694,7 +694,7 @@ public final class RealtimeChannel: @unchecked Sendable {
_ event: String,
payload: Payload,
timeout: TimeInterval = Defaults.timeoutInterval
) async -> Push {
) -> Push {
guard joinedOnce else {
fatalError(
"Tried to push \(event) to \(topic) before joining. Use channel.join() before pushing events"
Expand All @@ -708,7 +708,7 @@ public final class RealtimeChannel: @unchecked Sendable {
timeout: timeout
)
if canPush {
await pushEvent.send()
pushEvent.send()
} else {
pushEvent.startTimeout()
mutableState.withValue {
Expand Down Expand Up @@ -763,7 +763,7 @@ public final class RealtimeChannel: @unchecked Sendable {
} else {
let continuation = LockIsolated(CheckedContinuation<ChannelResponse, Never>?.none)

let push = await push(
let push = push(
type.rawValue, payload: payload,
timeout: opts["timeout"]?.numberValue ?? mutableState.timeout
)
Expand Down Expand Up @@ -846,7 +846,7 @@ public final class RealtimeChannel: @unchecked Sendable {
await leavePush
.receive(.ok, callback: onCloseCallback)
.receive(.timeout, callback: onCloseCallback)
await leavePush.send()
leavePush.send()

// If the RealtimeChannel cannot send push events, trigger a success locally
if !canPush {
Expand Down Expand Up @@ -892,11 +892,11 @@ public final class RealtimeChannel: @unchecked Sendable {
}

/// Sends the payload to join the RealtimeChannel
func sendJoin(_ timeout: TimeInterval) async {
func sendJoin(_ timeout: TimeInterval) {
mutableState.withValue {
$0.state = .joining
}
await joinPush.resend(timeout)
joinPush.resend(timeout)
}

/// Rejoins the channel
Expand All @@ -908,7 +908,7 @@ public final class RealtimeChannel: @unchecked Sendable {
await socket?.leaveOpenTopic(topic: topic)

// Send the joinPush
await sendJoin(timeout ?? mutableState.timeout)
sendJoin(timeout ?? mutableState.timeout)
}

/// Triggers an event to the correct event bindings created by
Expand Down
33 changes: 14 additions & 19 deletions Sources/Realtime/RealtimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate

/// Buffers messages that need to be sent once the socket has connected. It is an array
/// of tuples, with the ref of the message to send and the callback that will send the message.
var sendBuffer: [(ref: String?, callback: () async throws -> Void)] = []
var sendBuffer: [(ref: String?, callback: () -> Void)] = []

/// Timer that triggers sending new Heartbeat messages
var heartbeatTimer: HeartbeatTimerProtocol?
Expand Down Expand Up @@ -313,7 +313,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate

/// Sets the JWT access token used for channel subscription authorization and Realtime RLS.
/// - Parameter token: A JWT string.
public func setAuth(_ token: String?) async {
public func setAuth(_ token: String?) {
mutableState.withValue {
$0.accessToken = token
}
Expand All @@ -322,7 +322,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate
channel.params["user_token"] = token.map(AnyJSON.string) ?? .null

if channel.joinedOnce, channel.isJoined {
await channel.push(
channel.push(
ChannelEvent.accessToken,
payload: ["access_token": token.map(AnyJSON.string) ?? .null]
)
Expand Down Expand Up @@ -571,8 +571,8 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate
/// - parameter payload:
/// - parameter ref: Optional. Defaults to nil
/// - parameter joinRef: Optional. Defaults to nil
func push(message: Message) async {
let callback: (() async throws -> Void) = { [weak self] in
func push(message: Message) {
let callback: (() -> Void) = { [weak self] in
guard let self else { return }
do {
let data = try JSONEncoder().encode(message)
Expand All @@ -581,15 +581,15 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate
"push",
"Sending \(String(data: data, encoding: String.Encoding.utf8) ?? "")"
)
await self.mutableState.connection?.send(data: data)
self.mutableState.connection?.send(data: data)
} catch {
// TODO: handle error
}
}

/// If the socket is connected, then execute the callback immediately.
if isConnected {
try? await callback()
callback()
} else {
/// If the socket is not connected, add the push to a buffer which will
/// be sent immediately upon connection.
Expand Down Expand Up @@ -622,7 +622,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate
}

// Send any messages that were waiting for a connection
await flushSendBuffer()
flushSendBuffer()

// Reset how the socket tried to reconnect
await reconnectTimer.reset()
Expand Down Expand Up @@ -712,15 +712,10 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate
}

/// Send all messages that were buffered before the socket opened
func flushSendBuffer() async {
let sendBuffer = mutableState.sendBuffer

guard isConnected, sendBuffer.count > 0 else { return }
for (_, callback) in sendBuffer {
try? await callback()
}

func flushSendBuffer() {
mutableState.withValue {
guard isConnected, $0.sendBuffer.count > 0 else { return }
$0.sendBuffer.forEach { $0.callback() }
$0.sendBuffer = []
}
}
Expand Down Expand Up @@ -799,12 +794,12 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate
mutableState.withValue { $0.heartbeatTimer = heartbeatTimer }

await heartbeatTimer.start { [weak self] in
await self?.sendHeartbeat()
self?.sendHeartbeat()
}
}

/// Sends a heartbeat payload to the phoenix servers
func sendHeartbeat() async {
func sendHeartbeat() {
// Do not send if the connection is closed
guard isConnected else { return }

Expand Down Expand Up @@ -833,7 +828,7 @@ public final class RealtimeClient: @unchecked Sendable, PhoenixTransportDelegate
}

if let pendingHeartbeatRef {
await push(
push(
message: Message(
ref: pendingHeartbeatRef,
topic: "phoenix",
Expand Down
6 changes: 3 additions & 3 deletions Sources/Supabase/SupabaseClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ public final class SupabaseClient: @unchecked Sendable {
listenForAuthEventsTask.setValue(
Task {
for await (event, session) in await auth.authStateChanges {
await handleTokenChanged(event: event, session: session)
handleTokenChanged(event: event, session: session)
}
}
)
}

private func handleTokenChanged(event: AuthChangeEvent, session: Session?) async {
private func handleTokenChanged(event: AuthChangeEvent, session: Session?) {
let supportedEvents: [AuthChangeEvent] = [.initialSession, .signedIn, .tokenRefreshed]
guard supportedEvents.contains(event) else { return }

await realtime.setAuth(session?.accessToken)
realtime.setAuth(session?.accessToken)
}
}

0 comments on commit ff47ca0

Please sign in to comment.