Skip to content

Commit

Permalink
Make Realtime and Channel Actors
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Jan 4, 2024
1 parent e455d01 commit c5c1c60
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 129 deletions.
4 changes: 2 additions & 2 deletions Sources/Realtime/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public actor RealtimeChannelV2 {
await socket?.connect()
}

socket?.addChannel(self)
await socket?.addChannel(self)

_status.value = .subscribing
debug("subscribing to channel \(topic)")
Expand All @@ -88,7 +88,7 @@ public actor RealtimeChannelV2 {
accessToken: currentJwt
)

joinRef = socket?.makeRef().description
joinRef = await socket?.makeRef().description

debug("subscribing to channel with body: \(joinConfig)")

Expand Down
221 changes: 95 additions & 126 deletions Sources/Realtime/Realtime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public protocol AuthTokenProvider: Sendable {
func authToken() async -> String?
}

public final class Realtime: @unchecked Sendable {
public actor Realtime {
public struct Configuration: Sendable {
var url: URL
var apiKey: String
Expand Down Expand Up @@ -52,45 +52,34 @@ public final class Realtime: @unchecked Sendable {
case connected
}

struct MutableState {
var ref = 0
var heartbeatRef: Int?
var heartbeatTask: Task<Void, Never>?
var messageTask: Task<Void, Never>?
var subscriptions: [String: RealtimeChannelV2] = [:]
var ws: WebSocketClientProtocol?

mutating func makeRef() -> Int {
ref += 1
return ref
}
}
var ref = 0
var pendingHeartbeatRef: Int?
var heartbeatTask: Task<Void, Never>?
var messageTask: Task<Void, Never>?
var inFlightConnectionTask: Task<Void, Never>?

public private(set) var subscriptions: [String: RealtimeChannelV2] = [:]
var ws: WebSocketClientProtocol?

let config: Configuration
let makeWebSocketClient: (URL) -> WebSocketClientProtocol
let mutableState = LockIsolated(MutableState())

let _status: CurrentValueSubject<Status, Never> = CurrentValueSubject(.disconnected)
public var status: Status { _status.value }

public var subscriptions: [String: RealtimeChannelV2] {
mutableState.subscriptions
}

init(config: Configuration, makeWebSocketClient: @escaping (URL) -> WebSocketClientProtocol) {
self.config = config
self.makeWebSocketClient = makeWebSocketClient
}

deinit {
mutableState.withValue {
$0.heartbeatTask?.cancel()
$0.messageTask?.cancel()
$0.subscriptions = [:]
$0.ws?.cancel()
}
heartbeatTask?.cancel()
messageTask?.cancel()
subscriptions = [:]
ws?.cancel()
}

public convenience init(config: Configuration) {
public init(config: Configuration) {
self.init(
config: config,
makeWebSocketClient: { WebSocketClient(realtimeURL: $0, configuration: .default) }
Expand All @@ -102,46 +91,52 @@ public final class Realtime: @unchecked Sendable {
}

func connect(reconnect: Bool) async {
if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay))
if let inFlightConnectionTask {
return await inFlightConnectionTask.value
}

if Task.isCancelled {
debug("reconnect cancelled, returning")
return
inFlightConnectionTask = Task {
if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay))

if Task.isCancelled {
debug("reconnect cancelled, returning")
return
}
}
}

if _status.value == .connected {
debug("Websocket already connected")
return
}
if _status.value == .connected {
debug("Websocket already connected")
return
}

_status.value = .connecting
_status.value = .connecting

let realtimeURL = realtimeWebSocketURL
let realtimeURL = realtimeWebSocketURL

let ws = mutableState.withValue {
$0.ws = makeWebSocketClient(realtimeURL)
return $0.ws!
}
let ws = makeWebSocketClient(realtimeURL)
self.ws = ws

let connectionStatus = try? await ws.connect().first { _ in true }
let connectionStatus = try? await ws.connect().first { _ in true }

if connectionStatus == .open {
_status.value = .connected
debug("Connected to realtime websocket")
listenForMessages()
startHeartbeating()
if reconnect {
await rejoinChannels()
if connectionStatus == .open {
_status.value = .connected
debug("Connected to realtime websocket")
listenForMessages()
startHeartbeating()
if reconnect {
await rejoinChannels()
}
} else {
debug(
"Error while trying to connect to realtime websocket. Trying again in \(config.reconnectDelay) seconds."
)
disconnect()
await connect(reconnect: true)
}
} else {
debug(
"Error while trying to connect to realtime websocket. Trying again in \(config.reconnectDelay) seconds."
)
disconnect()
await connect(reconnect: true)
}

await inFlightConnectionTask?.value
}

public func channel(
Expand All @@ -162,21 +157,19 @@ public final class Realtime: @unchecked Sendable {
}

public func addChannel(_ channel: RealtimeChannelV2) {
mutableState.withValue { $0.subscriptions[channel.topic] = channel }
subscriptions[channel.topic] = channel
}

public func removeChannel(_ channel: RealtimeChannelV2) async {
if channel._status.value == .subscribed {
await channel.unsubscribe()
}

mutableState.withValue {
$0.subscriptions[channel.topic] = nil
subscriptions[channel.topic] = nil

if $0.subscriptions.isEmpty {
debug("No more subscribed channel in socket")
disconnect()
}
if subscriptions.isEmpty {
debug("No more subscribed channel in socket")
disconnect()
}
}

Expand All @@ -188,68 +181,52 @@ public final class Realtime: @unchecked Sendable {
}

private func listenForMessages() {
mutableState.withValue {
let ws = $0.ws

$0.messageTask = Task { [weak self] in
guard let self, let ws else { return }

do {
for try await message in ws.receive() {
await onMessage(message)
}
} catch {
debug(
"Error while listening for messages. Trying again in \(config.reconnectDelay) \(error)"
)
disconnect()
await connect(reconnect: true)
messageTask = Task { [weak self] in
guard let self, let ws = await ws else { return }

do {
for try await message in ws.receive() {
await onMessage(message)
}
} catch {
debug(
"Error while listening for messages. Trying again in \(config.reconnectDelay) \(error)"
)
await disconnect()
await connect(reconnect: true)
}
}
}

private func startHeartbeating() {
mutableState.withValue {
$0.heartbeatTask = Task { [weak self] in
guard let self else { return }

while !Task.isCancelled {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.heartbeatInterval))
if Task.isCancelled {
break
}
await sendHeartbeat()
heartbeatTask = Task { [weak self] in
guard let self else { return }

while !Task.isCancelled {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.heartbeatInterval))
if Task.isCancelled {
break
}
await sendHeartbeat()
}
}
}

private func sendHeartbeat() async {
let timedOut = mutableState.withValue {
if $0.heartbeatRef != nil {
$0.heartbeatRef = nil
return true
}
return false
}

if timedOut {
if pendingHeartbeatRef != nil {
pendingHeartbeatRef = nil
debug("Heartbeat timeout. Trying to reconnect in \(config.reconnectDelay)")
disconnect()
await connect(reconnect: true)
return
}

let heartbeatRef = mutableState.withValue {
$0.heartbeatRef = $0.makeRef()
return $0.heartbeatRef
}
pendingHeartbeatRef = makeRef()

await send(
RealtimeMessageV2(
joinRef: nil,
ref: heartbeatRef?.description,
ref: pendingHeartbeatRef?.description,
topic: "phoenix",
event: "heartbeat",
payload: [:]
Expand All @@ -259,38 +236,29 @@ public final class Realtime: @unchecked Sendable {

public func disconnect() {
debug("Closing websocket connection")
mutableState.withValue {
$0.ref = 0
$0.messageTask?.cancel()
$0.heartbeatTask?.cancel()
$0.ws?.cancel()
$0.ws = nil
}
ref = 0
messageTask?.cancel()
heartbeatTask?.cancel()
ws?.cancel()
ws = nil
_status.value = .disconnected
}

private func onMessage(_ message: RealtimeMessageV2) async {
let forward: () async -> Void = mutableState.withValue {
let channel = $0.subscriptions[message.topic]
let channel = subscriptions[message.topic]

if Int(message.ref ?? "") == $0.heartbeatRef {
$0.heartbeatRef = 0
debug("heartbeat received")
return {}
} else {
debug("Received event \(message.event) for channel \(channel?.topic ?? "null")")
return {
await channel?.onMessage(message)
}
}
if Int(message.ref ?? "") == pendingHeartbeatRef {
pendingHeartbeatRef = nil
debug("heartbeat received")
} else {
debug("Received event \(message.event) for channel \(channel?.topic ?? "null")")
await channel?.onMessage(message)
}

await forward()
}

func send(_ message: RealtimeMessageV2) async {
do {
try await mutableState.ws?.send(message)
try await ws?.send(message)
} catch {
debug("""
Failed to send message:
Expand All @@ -303,7 +271,8 @@ public final class Realtime: @unchecked Sendable {
}

func makeRef() -> Int {
mutableState.withValue { $0.makeRef() }
ref += 1
return ref
}

private var realtimeBaseURL: URL {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Realtime/_Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ actor _Push {

func send() async -> PushStatus {
do {
try await channel?.socket?.mutableState.ws?.send(message)
try await channel?.socket?.ws?.send(message)

if channel?.config.broadcast.acknowledgeBroadcasts == true {
return await withCheckedContinuation {
Expand Down

0 comments on commit c5c1c60

Please sign in to comment.