Skip to content

Commit

Permalink
fix(realtime): lost postgres_changes on resubscribe (#585)
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev authored Nov 5, 2024
1 parent 1176dea commit fabc07d
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 265 deletions.
6 changes: 4 additions & 2 deletions Examples/SlackClone/AppView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ final class AppViewModel {
var session: Session?
var selectedChannel: Channel?

var realtimeConnectionStatus: RealtimeClientV2.Status?
var realtimeConnectionStatus: RealtimeClientStatus?

init() {
Task {
for await (event, session) in supabase.auth.authStateChanges {
Logger.main.debug("AuthStateChange: \(event.rawValue)")
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { return }
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else {
return
}
self.session = session

if session == nil {
Expand Down
5 changes: 3 additions & 2 deletions Examples/SlackClone/Supabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ let decoder: JSONDecoder = {
}()

let supabase = SupabaseClient(
supabaseURL: URL(string: "http://localhost:54321")!,
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
supabaseURL: URL(string: "http://127.0.0.1:54321")!,
supabaseKey:
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
options: SupabaseClientOptions(
db: .init(encoder: encoder, decoder: decoder),
auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")),
Expand Down
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ let package = Package(
name: "Realtime",
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "IssueReporting", package: "xctest-dynamic-overlay"),
"Helpers",
]
),
Expand Down
11 changes: 4 additions & 7 deletions Sources/Realtime/V2/CallbackManager.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//
// CallbackManager.swift
//
//
// Created by Guilherme Souza on 24/12/23.
//

import ConcurrencyExtras
import Foundation
import Helpers
Expand All @@ -26,6 +19,10 @@ final class CallbackManager: Sendable {
mutableState.callbacks
}

deinit {
reset()
}

@discardableResult
func addBroadcastCallback(
event: String,
Expand Down
38 changes: 22 additions & 16 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@ actor PushV2 {
}

func send() async -> PushStatus {
await channel?.socket.push(message)

if channel?.config.broadcast.acknowledgeBroadcasts == true {
do {
return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) {
await withCheckedContinuation {
self.receivedContinuation = $0
}
guard let channel = channel else {
return .error
}

await channel.socket.push(message)

if !channel.config.broadcast.acknowledgeBroadcasts {
// channel was configured with `ack = false`,
// don't wait for a response and return `ok`.
return .ok
}

do {
return try await withTimeout(interval: channel.socket.options().timeoutInterval) {
await withCheckedContinuation { continuation in
self.receivedContinuation = continuation
}
} catch is TimeoutError {
channel?.logger?.debug("Push timed out.")
return .timeout
} catch {
channel?.logger?.error("Error sending push: \(error)")
return .error
}
} catch is TimeoutError {
channel.logger?.debug("Push timed out.")
return .timeout
} catch {
channel.logger?.error("Error sending push: \(error.localizedDescription)")
return .error
}

return .ok
}

func didReceive(status: PushStatus) {
Expand Down
41 changes: 20 additions & 21 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
//
// RealtimeChannelV2.swift
//
//
// Created by Guilherme Souza on 26/12/23.
//

import ConcurrencyExtras
import Foundation
import HTTPTypes
import Helpers
import IssueReporting

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand Down Expand Up @@ -123,18 +117,14 @@ public final class RealtimeChannelV2: Sendable {
public func subscribe() async {
if socket.status() != .connected {
if socket.options().connectOnSubscribe != true {
fatalError(
reportIssue(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
return
}
await socket.connect()
}

guard status != .subscribed else {
logger?.warning("Channel \(topic) is already subscribed")
return
}

socket.addChannel(self)

status = .subscribing
Expand Down Expand Up @@ -266,15 +256,21 @@ public final class RealtimeChannelV2: Sendable {
}
}

/// Tracks the given state in the channel.
/// - Parameter state: The state to be tracked, conforming to `Codable`.
/// - Throws: An error if the tracking fails.
public func track(_ state: some Codable) async throws {
try await track(state: JSONObject(state))
}

/// Tracks the given state in the channel.
/// - Parameter state: The state to be tracked as a `JSONObject`.
public func track(state: JSONObject) async {
assert(
status == .subscribed,
"You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)
if status != .subscribed {
reportIssue(
"You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)
}

await push(
ChannelEvent.presence,
Expand All @@ -286,6 +282,7 @@ public final class RealtimeChannelV2: Sendable {
)
}

/// Stops tracking the current state in the channel.
public func untrack() async {
await push(
ChannelEvent.presence,
Expand Down Expand Up @@ -520,10 +517,12 @@ public final class RealtimeChannelV2: Sendable {
filter: String?,
callback: @escaping @Sendable (AnyAction) -> Void
) -> RealtimeSubscription {
precondition(
status != .subscribed,
"You cannot call postgresChange after joining the channel"
)
guard status != .subscribed else {
reportIssue(
"You cannot call postgresChange after joining the channel, this won't work as expected."
)
return RealtimeSubscription {}
}

let config = PostgresJoinConfig(
event: event,
Expand Down
42 changes: 32 additions & 10 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ public final class RealtimeClientV2: Sendable {
var accessToken: String?
var ref = 0
var pendingHeartbeatRef: Int?

/// Long-running task that keeps sending heartbeat messages.
var heartbeatTask: Task<Void, Never>?

/// Long-running task for listening for incoming messages from WebSocket.
var messageTask: Task<Void, Never>?

var connectionTask: Task<Void, Never>?
var channels: [String: RealtimeChannelV2] = [:]
var sendBuffer: [@Sendable () async -> Void] = []
Expand All @@ -34,13 +39,14 @@ public final class RealtimeClientV2: Sendable {
let http: any HTTPClientType
let apikey: String?

/// All managed channels indexed by their topics.
public var channels: [String: RealtimeChannelV2] {
mutableState.channels
}

private let statusEventEmitter = EventEmitter<RealtimeClientStatus>(initialEvent: .disconnected)

/// AsyncStream that emits when connection status change.
/// Listen for connection status changes.
///
/// You can also use ``onStatusChange(_:)`` for a closure based method.
public var statusChange: AsyncStream<RealtimeClientStatus> {
Expand Down Expand Up @@ -198,6 +204,13 @@ public final class RealtimeClientV2: Sendable {
await connect(reconnect: true)
}

/// Creates a new channel and bind it to this client.
/// - Parameters:
/// - topic: Channel's topic.
/// - options: Configuration options for the channel.
/// - Returns: Channel instance.
///
/// - Note: This method doesn't subscribe to the channel, call ``RealtimeChannelV2/subscribe()`` on the returned channel instance.
public func channel(
_ topic: String,
options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in }
Expand All @@ -223,6 +236,9 @@ public final class RealtimeClientV2: Sendable {
}
}

/// Unsubscribe and removes channel.
///
/// If there is no channel left, client is disconnected.
public func removeChannel(_ channel: RealtimeChannelV2) async {
if channel.status == .subscribed {
await channel.unsubscribe()
Expand All @@ -238,6 +254,7 @@ public final class RealtimeClientV2: Sendable {
}
}

/// Unsubscribes and removes all channels.
public func removeAllChannels() async {
await withTaskGroup(of: Void.self) { group in
for channel in channels.values {
Expand Down Expand Up @@ -327,15 +344,19 @@ public final class RealtimeClientV2: Sendable {
}
}

public func disconnect() {
/// Disconnects client.
/// - Parameters:
/// - code: A numeric status code to send on disconnect.
/// - reason: A custom reason for the disconnect.
public func disconnect(code: Int? = nil, reason: String? = nil) {
options.logger?.debug("Closing WebSocket connection")
mutableState.withValue {
$0.ref = 0
$0.messageTask?.cancel()
$0.heartbeatTask?.cancel()
$0.connectionTask?.cancel()
}
ws.disconnect()
ws.disconnect(code: code, reason: reason)
status = .disconnected
}

Expand Down Expand Up @@ -388,13 +409,14 @@ public final class RealtimeClientV2: Sendable {
try Task.checkCancellation()
try await self?.ws.send(message)
} catch {
self?.options.logger?.error("""
Failed to send message:
\(message)
Error:
\(error)
""")
self?.options.logger?.error(
"""
Failed to send message:
\(message)
Error:
\(error)
""")
}
}

Expand Down
7 changes: 0 additions & 7 deletions Sources/Realtime/V2/RealtimeMessageV2.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//
// RealtimeMessageV2.swift
//
//
// Created by Guilherme Souza on 11/01/24.
//

import Foundation
import Helpers

Expand Down
Loading

0 comments on commit fabc07d

Please sign in to comment.