Skip to content

Commit

Permalink
Remove async
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Nov 29, 2023
1 parent ff47ca0 commit b5cf418
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 255 deletions.
87 changes: 48 additions & 39 deletions Examples/RealtimeSample/ContentView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,74 +18,85 @@ final class ViewModel: ObservableObject {
@Published var channelStatus: String?

@Published var publicSchema: RealtimeChannel?
@Published var isJoined: Bool = false

func createSubscription() async {
await supabase.realtime.connect()
func createSubscription() {
supabase.realtime.connect()

publicSchema = await supabase.realtime.channel("public")
publicSchema = supabase.realtime.channel("public")
.on(
"postgres_changes",
filter: ChannelFilter(event: "INSERT", schema: "public")
) { [weak self] message in
await MainActor.run { [weak self] in
Task { @MainActor [weak self] in
self?.inserts.append(message)
}
}
.on(
"postgres_changes",
filter: ChannelFilter(event: "UPDATE", schema: "public")
) { [weak self] message in
await MainActor.run { [weak self] in
Task { @MainActor [weak self] in
self?.updates.append(message)
}
}
.on(
"postgres_changes",
filter: ChannelFilter(event: "DELETE", schema: "public")
) { [weak self] message in
await MainActor.run { [weak self] in
Task { @MainActor [weak self] in
self?.deletes.append(message)
}
}

await publicSchema?.onError { @MainActor [weak self] _ in self?.channelStatus = "ERROR" }
await publicSchema?
.onClose { @MainActor [weak self] _ in self?.channelStatus = "Closed gracefully" }
await publicSchema?
.subscribe { @MainActor [weak self] state, _ in
self?.isJoined = await self?.publicSchema?.isJoined == true
switch state {
case .subscribed:
self?.channelStatus = "OK"
case .closed:
self?.channelStatus = "CLOSED"
case .timedOut:
self?.channelStatus = "Timed out"
case .channelError:
self?.channelStatus = "ERROR"
publicSchema?.onError { [weak self] _ in
Task { @MainActor [weak self] in
self?.channelStatus = "ERROR"
}
}
publicSchema?.onClose { [weak self] _ in
Task { @MainActor [weak self] in
self?.channelStatus = "Closed gracefully"
}
}
publicSchema?
.subscribe { [weak self] state, _ in
Task { @MainActor [weak self] in
switch state {
case .subscribed:
self?.channelStatus = "OK"
case .closed:
self?.channelStatus = "CLOSED"
case .timedOut:
self?.channelStatus = "Timed out"
case .channelError:
self?.channelStatus = "ERROR"
}
}
}

await supabase.realtime.connect()
await supabase.realtime.onOpen { @MainActor [weak self] in
self?.socketStatus = "OPEN"
supabase.realtime.connect()
supabase.realtime.onOpen { [weak self] in
Task { @MainActor [weak self] in
self?.socketStatus = "OPEN"
}
}
await supabase.realtime.onClose { [weak self] _, _ in
await MainActor.run { [weak self] in
supabase.realtime.onClose { [weak self] _, _ in
Task { @MainActor [weak self] in
self?.socketStatus = "CLOSE"
}
}
await supabase.realtime.onError { @MainActor [weak self] error, _ in
self?.socketStatus = "ERROR: \(error.localizedDescription)"
supabase.realtime.onError { [weak self] error, _ in
Task { @MainActor [weak self] in
self?.socketStatus = "ERROR: \(error.localizedDescription)"
}
}
}

func toggleSubscription() async {
if await publicSchema?.isJoined == true {
await publicSchema?.unsubscribe()
func toggleSubscription() {
if publicSchema?.isJoined == true {
publicSchema?.unsubscribe()
} else {
await createSubscription()
createSubscription()
}
}
}
Expand Down Expand Up @@ -118,11 +129,9 @@ struct ContentView: View {
Toggle(
"Toggle Subscription",
isOn: Binding(
get: { model.isJoined },
get: { model.publicSchema?.isJoined == true },
set: { _ in
Task {
await model.toggleSubscription()
}
model.toggleSubscription()
}
)
)
Expand All @@ -133,8 +142,8 @@ struct ContentView: View {
.background(.regularMaterial)
.padding()
}
.task {
await model.createSubscription()
.onAppear {
model.createSubscription()
}
}
}
Expand Down
34 changes: 15 additions & 19 deletions Sources/Realtime/PhoenixTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public protocol PhoenixTransportDelegate: AnyObject {

- Parameter response: Response from the server indicating that the WebSocket handshake was successful and the connection has been upgraded to webSockets
*/
func onOpen(response: URLResponse?) async
func onOpen(response: URLResponse?)

/**
Notified when the `Transport` receives an error.
Expand All @@ -81,22 +81,22 @@ public protocol PhoenixTransportDelegate: AnyObject {
- Parameter response: Response from the server, if any, that occurred with the Error

*/
func onError(error: Error, response: URLResponse?) async
func onError(error: Error, response: URLResponse?)

/**
Notified when the `Transport` receives a message from the server.

- Parameter message: Message received from the server
*/
func onMessage(message: Data) async
func onMessage(message: Data)

/**
Notified when the `Transport` closes.

- Parameter code: Code that was sent when the `Transport` closed
- Parameter reason: A concise human-readable prose explanation for the closure
*/
func onClose(code: Int, reason: String?) async
func onClose(code: Int, reason: String?)
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -233,9 +233,9 @@ open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketD
) {
// The Websocket is connected. Set Transport state to open and inform delegate
readyState = .open
delegate?.onOpen(response: webSocketTask.response)

Task {
await delegate?.onOpen(response: webSocketTask.response)
await receive()
}
}
Expand All @@ -248,11 +248,9 @@ open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketD
) {
// A close frame was received from the server.
readyState = .closed
Task {
await delegate?.onClose(
code: closeCode.rawValue, reason: reason.flatMap { String(data: $0, encoding: .utf8) }
)
}
delegate?.onClose(
code: closeCode.rawValue, reason: reason.flatMap { String(data: $0, encoding: .utf8) }
)
}

open func urlSession(
Expand All @@ -264,9 +262,7 @@ open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketD
// if this was caused by an error.
guard let error else { return }

Task {
await abnormalErrorReceived(error, response: task.response)
}
abnormalErrorReceived(error, response: task.response)
}

// MARK: - Private
Expand All @@ -280,31 +276,31 @@ open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketD
for try await message in stream {
switch message {
case let .data(data):
await delegate?.onMessage(message: data)
delegate?.onMessage(message: data)
case let .string(text):
let data = Data(text.utf8)
await delegate?.onMessage(message: data)
delegate?.onMessage(message: data)
@unknown default:
print("unkown message received")
}
}
} catch {
print("Error when receiving \(error)")
await abnormalErrorReceived(error, response: nil)
abnormalErrorReceived(error, response: nil)
}
}

private func abnormalErrorReceived(_ error: Error, response: URLResponse?) async {
private func abnormalErrorReceived(_ error: Error, response: URLResponse?) {
// Set the state of the Transport to closed
readyState = .closed

// Inform the Transport's delegate that an error occurred.
await delegate?.onError(error: error, response: response)
delegate?.onError(error: error, response: response)

// An abnormal error is results in an abnormal closure, such as internet getting dropped
// so inform the delegate that the Transport has closed abnormally. This will kick off
// the reconnect logic.
await delegate?.onClose(
delegate?.onClose(
code: RealtimeClient.CloseCode.abnormal.rawValue, reason: error.localizedDescription
)
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Realtime/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public final class Presence: @unchecked Sendable {
let newState = message.rawPayload as? State
else { return }

await onStateEvent(newState)
onStateEvent(newState)
}

channel.on(diffEvent, filter: ChannelFilter()) { [weak self] message in
Expand All @@ -256,7 +256,7 @@ public final class Presence: @unchecked Sendable {
}
}

private func onStateEvent(_ newState: State) async {
private func onStateEvent(_ newState: State) {
mutableState.withValue { mutableState in
mutableState.joinRef = mutableState.channel?.joinRef

Expand Down
22 changes: 11 additions & 11 deletions Sources/Realtime/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final class Push: @unchecked Sendable {
var timeoutTask: Task<Void, Never>?

/// Hooks into a Push. Where .receive("ok", callback(Payload)) are stored
var receiveHooks: [PushStatus: [@Sendable (Message) async -> Void]] = [:]
var receiveHooks: [PushStatus: [@Sendable (Message) -> Void]] = [:]

/// True if the Push has been sent
var sent: Bool = false
Expand Down Expand Up @@ -141,11 +141,11 @@ public final class Push: @unchecked Sendable {
@discardableResult
public func receive(
_ status: PushStatus,
callback: @escaping @Sendable (Message) async -> Void
) async -> Push {
callback: @escaping @Sendable (Message) -> Void
) -> Push {
// If the message has already been received, pass it to the callback immediately
if hasReceived(status: status), let receivedMessage = mutableState.receivedMessage {
await callback(receivedMessage)
callback(receivedMessage)
}

mutableState.withValue {
Expand Down Expand Up @@ -176,9 +176,9 @@ public final class Push: @unchecked Sendable {
///
/// - parameter status: Status which was received, e.g. "ok", "error", "timeout"
/// - parameter response: Response that was received
private func matchReceive(_ status: PushStatus, message: Message) async {
for hook in mutableState.receiveHooks[status] ?? [] {
await hook(message)
private func matchReceive(_ status: PushStatus, message: Message) {
mutableState.receiveHooks[status, default: []].forEach {
$0(message)
}
}

Expand Down Expand Up @@ -220,14 +220,14 @@ public final class Push: @unchecked Sendable {

/// Check if there is event a status available
guard let status = message.status else { return }
await self?.matchReceive(status, message: message)
self?.matchReceive(status, message: message)
}

let timeout = mutableState.timeout

let timeoutTask = Task {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(timeout))
await self.trigger(.timeout, payload: [:])
self.trigger(.timeout, payload: [:])
}

mutableState.withValue {
Expand All @@ -244,13 +244,13 @@ public final class Push: @unchecked Sendable {
}

/// Triggers an event to be sent though the Channel
func trigger(_ status: PushStatus, payload: Payload) async {
func trigger(_ status: PushStatus, payload: Payload) {
/// If there is no ref event, then there is nothing to trigger on the channel
guard let refEvent = mutableState.refEvent else { return }

var mutPayload = payload
mutPayload["status"] = .string(status.rawValue)

await mutableState.channel?.trigger(event: refEvent, payload: mutPayload)
mutableState.channel?.trigger(event: refEvent, payload: mutPayload)
}
}
Loading

0 comments on commit b5cf418

Please sign in to comment.