diff --git a/Package.swift b/Package.swift index 6105e5a6..e01c8df0 100644 --- a/Package.swift +++ b/Package.swift @@ -119,6 +119,7 @@ let package = Package( name: "RealtimeTests", dependencies: [ .product(name: "CustomDump", package: "swift-custom-dump"), + .product(name: "InlineSnapshotTesting", package: "swift-snapshot-testing"), "PostgREST", "Realtime", "TestHelpers", diff --git a/Sources/Realtime/Push.swift b/Sources/Realtime/Push.swift index 76c1a00e..70518976 100644 --- a/Sources/Realtime/Push.swift +++ b/Sources/Realtime/Push.swift @@ -31,7 +31,7 @@ actor Push { if channel?.config.broadcast.acknowledgeBroadcasts == true { do { - return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) { + return try await withTimeout(interval: channel?.socket.options.timeoutInterval ?? 10) { await withCheckedContinuation { self.receivedContinuation = $0 } diff --git a/Sources/Realtime/RealtimeChannel.swift b/Sources/Realtime/RealtimeChannel.swift index 0f9fed7b..a9ca1741 100644 --- a/Sources/Realtime/RealtimeChannel.swift +++ b/Sources/Realtime/RealtimeChannel.swift @@ -1,10 +1,3 @@ -// -// RealtimeChannel.swift -// -// -// Created by Guilherme Souza on 26/12/23. -// - import ConcurrencyExtras import Foundation import Helpers @@ -33,40 +26,7 @@ public struct RealtimeChannelConfig: Sendable { public var isPrivate: Bool } -struct Socket: Sendable { - var broadcastURL: @Sendable () -> URL - var status: @Sendable () -> RealtimeClient.Status - var options: @Sendable () -> RealtimeClientOptions - var accessToken: @Sendable () -> String? - var apiKey: @Sendable () -> String? - var makeRef: @Sendable () -> Int - - var connect: @Sendable () async -> Void - var addChannel: @Sendable (_ channel: RealtimeChannel) -> Void - var removeChannel: @Sendable (_ channel: RealtimeChannel) async -> Void - var push: @Sendable (_ message: RealtimeMessage) async -> Void - var httpSend: @Sendable (_ request: HTTPRequest) async throws -> HTTPResponse -} - -extension Socket { - init(client: RealtimeClient) { - self.init( - broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! }, - status: { [weak client] in client?.status ?? .disconnected }, - options: { [weak client] in client?.options ?? .init() }, - accessToken: { [weak client] in client?.mutableState.accessToken }, - apiKey: { [weak client] in client?.apikey }, - makeRef: { [weak client] in client?.makeRef() ?? 0 }, - connect: { [weak client] in await client?.connect() }, - addChannel: { [weak client] in client?.addChannel($0) }, - removeChannel: { [weak client] in await client?.removeChannel($0) }, - push: { [weak client] in await client?.push($0) }, - httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) } - ) - } -} - -public final class RealtimeChannel: Sendable { +public actor RealtimeChannel { public typealias Subscription = ObservationToken public enum Status: Sendable { @@ -76,21 +36,23 @@ public final class RealtimeChannel: Sendable { case unsubscribing } - struct MutableState { - var clientChanges: [PostgresJoinConfig] = [] - var joinRef: String? - var pushes: [String: Push] = [:] - } - - private let mutableState = LockIsolated(MutableState()) - let topic: String let config: RealtimeChannelConfig let logger: (any SupabaseLogger)? - let socket: Socket + private weak var _socket: RealtimeClient? + + var socket: RealtimeClient { + guard let _socket else { + fatalError("Expected a RealtimeClient instance to be associated with this channel.") + } + return _socket + } private let callbackManager = CallbackManager() private let statusEventEmitter = EventEmitter(initialEvent: .unsubscribed) + private(set) var clientChanges: [PostgresJoinConfig] = [] + private(set) var joinRef: String? + private(set) var pushes: [String: Push] = [:] public private(set) var status: Status { get { statusEventEmitter.lastEvent } @@ -115,13 +77,13 @@ public final class RealtimeChannel: Sendable { init( topic: String, config: RealtimeChannelConfig, - socket: Socket, + socket: RealtimeClient, logger: (any SupabaseLogger)? ) { self.topic = topic self.config = config self.logger = logger - self.socket = socket + _socket = socket } deinit { @@ -130,8 +92,8 @@ public final class RealtimeChannel: Sendable { /// Subscribes to the channel public func subscribe() async { - if socket.status() != .connected { - if socket.options().connectOnSubscribe != true { + if await socket.status != .connected { + if socket.options.connectOnSubscribe != true { fatalError( "You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?" ) @@ -139,7 +101,7 @@ public final class RealtimeChannel: Sendable { await socket.connect() } - socket.addChannel(self) + await socket.addChannel(self) status = .subscribing logger?.debug("subscribing to channel \(topic)") @@ -147,17 +109,16 @@ public final class RealtimeChannel: Sendable { let joinConfig = RealtimeJoinConfig( broadcast: config.broadcast, presence: config.presence, - postgresChanges: mutableState.clientChanges, + postgresChanges: clientChanges, isPrivate: config.isPrivate ) - let payload = RealtimeJoinPayload( + let payload = await RealtimeJoinPayload( config: joinConfig, - accessToken: socket.accessToken() + accessToken: socket.accessToken ) - let joinRef = socket.makeRef().description - mutableState.withValue { $0.joinRef = joinRef } + joinRef = await socket.makeRef().description logger?.debug("subscribing to channel with body: \(joinConfig)") @@ -172,7 +133,7 @@ public final class RealtimeChannel: Sendable { ) do { - try await withTimeout(interval: socket.options().timeoutInterval) { [self] in + try await withTimeout(interval: socket.options.timeoutInterval) { [self] in _ = await statusChange.first { @Sendable in $0 == .subscribed } } } catch { @@ -191,7 +152,7 @@ public final class RealtimeChannel: Sendable { await push( RealtimeMessage( - joinRef: mutableState.joinRef, + joinRef: joinRef, ref: socket.makeRef().description, topic: topic, event: ChannelEvent.leave, @@ -204,7 +165,7 @@ public final class RealtimeChannel: Sendable { logger?.debug("Updating auth token for channel \(topic)") await push( RealtimeMessage( - joinRef: mutableState.joinRef, + joinRef: joinRef, ref: socket.makeRef().description, topic: topic, event: ChannelEvent.accessToken, @@ -235,17 +196,18 @@ public final class RealtimeChannel: Sendable { } var headers = HTTPHeaders(["content-type": "application/json"]) - if let apiKey = socket.apiKey() { + if let apiKey = socket.apikey { headers["apikey"] = apiKey } - if let accessToken = socket.accessToken() { + + if let accessToken = await socket.accessToken { headers["authorization"] = "Bearer \(accessToken)" } let task = Task { [headers] in - _ = try? await socket.httpSend( + _ = try? await socket.http.send( HTTPRequest( - url: socket.broadcastURL(), + url: socket.broadcastURL, method: .post, headers: headers, body: JSONEncoder().encode( @@ -265,14 +227,14 @@ public final class RealtimeChannel: Sendable { } if config.broadcast.acknowledgeBroadcasts { - try? await withTimeout(interval: socket.options().timeoutInterval) { + try? await withTimeout(interval: socket.options.timeoutInterval) { await task.value } } } else { await push( RealtimeMessage( - joinRef: mutableState.joinRef, + joinRef: joinRef, ref: socket.makeRef().description, topic: topic, event: ChannelEvent.broadcast, @@ -298,7 +260,7 @@ public final class RealtimeChannel: Sendable { await push( RealtimeMessage( - joinRef: mutableState.joinRef, + joinRef: joinRef, ref: socket.makeRef().description, topic: topic, event: ChannelEvent.presence, @@ -314,7 +276,7 @@ public final class RealtimeChannel: Sendable { public func untrack() async { await push( RealtimeMessage( - joinRef: mutableState.joinRef, + joinRef: joinRef, ref: socket.makeRef().description, topic: topic, event: ChannelEvent.presence, @@ -326,7 +288,7 @@ public final class RealtimeChannel: Sendable { ) } - func onMessage(_ message: RealtimeMessage) { + func onMessage(_ message: RealtimeMessage) async { do { guard let eventType = message.eventType else { logger?.debug("Received message without event type: \(message)") @@ -437,13 +399,9 @@ public final class RealtimeChannel: Sendable { callbackManager.triggerBroadcast(event: event, json: payload) case .close: - Task { [weak self] in - guard let self else { return } - - await socket.removeChannel(self) - logger?.debug("Unsubscribed from channel \(message.topic)") - status = .unsubscribed - } + await socket.removeChannel(self) + logger?.debug("Unsubscribed from channel \(message.topic)") + status = .unsubscribed case .error: logger?.debug( @@ -551,9 +509,7 @@ public final class RealtimeChannel: Sendable { filter: filter ) - mutableState.withValue { - $0.clientChanges.append(config) - } + clientChanges.append(config) let id = callbackManager.addPostgresCallback(filter: config, callback: callback) return Subscription { [weak callbackManager, logger] in @@ -578,18 +534,14 @@ public final class RealtimeChannel: Sendable { private func push(_ message: RealtimeMessage) async -> PushStatus { let push = Push(channel: self, message: message) if let ref = message.ref { - mutableState.withValue { - $0.pushes[ref] = push - } + pushes[ref] = push } return await push.send() } private func didReceiveReply(ref: String, status: String) { Task { - let push = mutableState.withValue { - $0.pushes.removeValue(forKey: ref) - } + let push = pushes.removeValue(forKey: ref) await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) } } diff --git a/Sources/Realtime/RealtimeClient.swift b/Sources/Realtime/RealtimeClient.swift index bb04ebe3..a096ca4b 100644 --- a/Sources/Realtime/RealtimeClient.swift +++ b/Sources/Realtime/RealtimeClient.swift @@ -18,7 +18,7 @@ public typealias JSONObject = Helpers.JSONObject @available(*, deprecated, renamed: "RealtimeClient") public typealias RealtimeClientV2 = RealtimeClient -public final class RealtimeClient: Sendable { +public actor RealtimeClient { @available(*, deprecated, renamed: "RealtimeClientOptions") public struct Configuration: Sendable { var url: URL @@ -68,28 +68,19 @@ public final class RealtimeClient: Sendable { } } - struct MutableState { - var accessToken: String? - var ref = 0 - var pendingHeartbeatRef: Int? - var heartbeatTask: Task? - var messageTask: Task? - var connectionTask: Task? - var subscriptions: [String: RealtimeChannel] = [:] - } - let url: URL let options: RealtimeClientOptions let ws: any WebSocketClient - let mutableState = LockIsolated(MutableState()) let http: any HTTPClientType let apikey: String? - public var subscriptions: [String: RealtimeChannel] { - mutableState.subscriptions - } - private let statusEventEmitter = EventEmitter(initialEvent: .disconnected) + private(set) var accessToken: String? + private(set) var ref = 0 + private(set) var pendingHeartbeatRef: Int? + private(set) var heartbeatTask: Task? + private(set) var messageTask: Task? + private(set) var connectionTask: Task? /// AsyncStream that emits when connection status change. /// @@ -104,6 +95,8 @@ public final class RealtimeClient: Sendable { set { statusEventEmitter.emit(newValue) } } + public private(set) var subscriptions: [String: RealtimeChannel] = [:] + /// Listen for connection status changes. /// - Parameter listener: Closure that will be called when connection status changes. /// - Returns: An observation handle that can be used to stop listening. @@ -116,7 +109,7 @@ public final class RealtimeClient: Sendable { } @available(*, deprecated, renamed: "RealtimeClient.init(url:options:)") - public convenience init(config: Configuration) { + public init(config: Configuration) { self.init( url: config.url, options: RealtimeClientOptions( @@ -131,7 +124,7 @@ public final class RealtimeClient: Sendable { ) } - public convenience init(url: URL, options: RealtimeClientOptions) { + public init(url: URL, options: RealtimeClientOptions) { var interceptors: [any HTTPClientInterceptor] = [] if let logger = options.logger { @@ -166,18 +159,13 @@ public final class RealtimeClient: Sendable { self.ws = ws self.http = http apikey = options.apikey - - mutableState.withValue { - $0.accessToken = options.accessToken ?? options.apikey - } + accessToken = options.accessToken ?? options.apikey } deinit { - mutableState.withValue { - $0.heartbeatTask?.cancel() - $0.messageTask?.cancel() - $0.subscriptions = [:] - } + heartbeatTask?.cancel() + messageTask?.cancel() + subscriptions = [:] } /// Connects the socket. @@ -189,7 +177,7 @@ public final class RealtimeClient: Sendable { func connect(reconnect: Bool) async { if status == .disconnected { - let connectionTask = Task { + connectionTask = Task { if reconnect { try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.reconnectDelay)) @@ -223,10 +211,6 @@ public final class RealtimeClient: Sendable { } } } - - mutableState.withValue { - $0.connectionTask = connectionTask - } } _ = await statusChange.first { @Sendable in $0 == .connected } @@ -277,25 +261,21 @@ public final class RealtimeClient: Sendable { return RealtimeChannel( topic: "realtime:\(topic)", config: config, - socket: Socket(client: self), + socket: self, logger: self.options.logger ) } public func addChannel(_ channel: RealtimeChannel) { - mutableState.withValue { - $0.subscriptions[channel.topic] = channel - } + subscriptions[channel.topic] = channel } public func removeChannel(_ channel: RealtimeChannel) async { - if channel.status == .subscribed { + if await channel.status == .subscribed { await channel.unsubscribe() } - mutableState.withValue { - $0.subscriptions[channel.topic] = nil - } + subscriptions[channel.topic] = nil if subscriptions.isEmpty { options.logger?.debug("No more subscribed channel in socket") @@ -316,7 +296,7 @@ public final class RealtimeClient: Sendable { } private func listenForMessages() { - let messageTask = Task { [weak self] in + messageTask = Task { [weak self] in guard let self else { return } do { @@ -334,13 +314,10 @@ public final class RealtimeClient: Sendable { await reconnect() } } - mutableState.withValue { - $0.messageTask = messageTask - } } private func startHeartbeating() { - let heartbeatTask = Task { [weak self, options] in + heartbeatTask = Task { [weak self, options] in while !Task.isCancelled { try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.heartbeatInterval)) if Task.isCancelled { @@ -349,47 +326,34 @@ public final class RealtimeClient: Sendable { await self?.sendHeartbeat() } } - mutableState.withValue { - $0.heartbeatTask = heartbeatTask - } } private func sendHeartbeat() async { - let pendingHeartbeatRef: Int? = mutableState.withValue { - if $0.pendingHeartbeatRef != nil { - $0.pendingHeartbeatRef = nil - return nil - } - + if pendingHeartbeatRef != nil { + pendingHeartbeatRef = nil + options.logger?.debug("Heartbeat timeout") + await reconnect() + } else { let ref = makeRef() - $0.pendingHeartbeatRef = ref - return ref - } - - if let pendingHeartbeatRef { + pendingHeartbeatRef = ref await push( RealtimeMessage( joinRef: nil, - ref: pendingHeartbeatRef.description, + ref: pendingHeartbeatRef!.description, topic: "phoenix", event: "heartbeat", payload: [:] ) ) - } else { - options.logger?.debug("Heartbeat timeout") - await reconnect() } } public func disconnect() { options.logger?.debug("Closing WebSocket connection") - mutableState.withValue { - $0.ref = 0 - $0.messageTask?.cancel() - $0.heartbeatTask?.cancel() - $0.connectionTask?.cancel() - } + ref = 0 + messageTask?.cancel() + heartbeatTask?.cancel() + connectionTask?.cancel() ws.disconnect() status = .disconnected } @@ -397,29 +361,25 @@ public final class RealtimeClient: Sendable { /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. /// - Parameter token: A JWT string. public func setAuth(_ token: String?) async { - mutableState.withValue { - $0.accessToken = token - } + accessToken = token for channel in subscriptions.values { - if let token, channel.status == .subscribed { + if let token, await channel.status == .subscribed { await channel.updateAuth(jwt: token) } } } private func onMessage(_ message: RealtimeMessage) async { - mutableState.withValue { - let channel = $0.subscriptions[message.topic] - - if let ref = message.ref, Int(ref) == $0.pendingHeartbeatRef { - $0.pendingHeartbeatRef = nil - options.logger?.debug("heartbeat received") - } else { - options.logger? - .debug("Received event \(message.event) for channel \(channel?.topic ?? "null")") - channel?.onMessage(message) - } + let channel = subscriptions[message.topic] + + if let ref = message.ref, Int(ref) == pendingHeartbeatRef { + pendingHeartbeatRef = nil + options.logger?.debug("heartbeat received") + } else { + options.logger? + .debug("Received event \(message.event) for channel \(channel?.topic ?? "null")") + await channel?.onMessage(message) } } @@ -445,10 +405,8 @@ public final class RealtimeClient: Sendable { } func makeRef() -> Int { - mutableState.withValue { - $0.ref += 1 - return $0.ref - } + ref += 1 + return ref } static func realtimeBaseURL(url: URL) -> URL { diff --git a/Sources/Supabase/SupabaseClient.swift b/Sources/Supabase/SupabaseClient.swift index 5659b323..8456cf25 100644 --- a/Sources/Supabase/SupabaseClient.swift +++ b/Sources/Supabase/SupabaseClient.swift @@ -245,7 +245,7 @@ public final class SupabaseClient: Sendable { /// Returns all Realtime channels. public var channels: [RealtimeChannel] { - Array(realtime.subscriptions.values) + get async { await Array(realtime.subscriptions.values) } } /// Creates a Realtime channel with Broadcast, Presence, and Postgres Changes. @@ -255,8 +255,8 @@ public final class SupabaseClient: Sendable { public func channel( _ name: String, options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } - ) -> RealtimeChannel { - realtime.channel(name, options: options) + ) async -> RealtimeChannel { + await realtime.channel(name, options: options) } /// Unsubscribes and removes Realtime channel from Realtime client. diff --git a/Tests/IntegrationTests/RealtimeIntegrationTests.swift b/Tests/IntegrationTests/RealtimeIntegrationTests.swift index 07b49824..64d8b9e9 100644 --- a/Tests/IntegrationTests/RealtimeIntegrationTests.swift +++ b/Tests/IntegrationTests/RealtimeIntegrationTests.swift @@ -20,7 +20,7 @@ struct Logger: SupabaseLogger { } final class RealtimeIntegrationTests: XCTestCase { - let realtime = RealtimeClientV2( + let realtime = RealtimeClient( url: URL(string: "\(DotEnv.SUPABASE_URL)/realtime/v1")!, options: RealtimeClientOptions( headers: ["apikey": DotEnv.SUPABASE_ANON_KEY], @@ -46,14 +46,14 @@ final class RealtimeIntegrationTests: XCTestCase { let expectation = expectation(description: "receivedBroadcastMessages") expectation.expectedFulfillmentCount = 3 - let channel = realtime.channel("integration") { + let channel = await realtime.channel("integration") { $0.broadcast.receiveOwnBroadcasts = true } let receivedMessages = LockIsolated<[JSONObject]>([]) Task { - for await message in channel.broadcastStream(event: "test") { + for await message in await channel.broadcastStream(event: "test") { receivedMessages.withValue { $0.append(message) } @@ -107,7 +107,7 @@ final class RealtimeIntegrationTests: XCTestCase { } func testBroadcastWithUnsubscribedChannel() async throws { - let channel = realtime.channel("integration") { + let channel = await realtime.channel("integration") { $0.broadcast.acknowledgeBroadcasts = true } @@ -121,7 +121,7 @@ final class RealtimeIntegrationTests: XCTestCase { } func testPresence() async throws { - let channel = realtime.channel("integration") { + let channel = await realtime.channel("integration") { $0.broadcast.receiveOwnBroadcasts = true } @@ -131,7 +131,7 @@ final class RealtimeIntegrationTests: XCTestCase { let receivedPresenceChanges = LockIsolated<[any PresenceAction]>([]) Task { - for await presence in channel.presenceChange() { + for await presence in await channel.presenceChange() { receivedPresenceChanges.withValue { $0.append(presence) } diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 6e509196..e27fc077 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -1,6 +1,7 @@ import ConcurrencyExtras import CustomDump import Helpers +import InlineSnapshotTesting @testable import Realtime import TestHelpers import XCTest @@ -42,31 +43,30 @@ final class RealtimeTests: XCTestCase { ) } - override func tearDown() { - sut.disconnect() - - super.tearDown() + override func tearDown() async throws { + await sut.disconnect() + try await super.tearDown() } func testBehavior() async throws { - let channel = sut.channel("public:messages") + let channel = await sut.channel("public:messages") var subscriptions: Set = [] - channel.onPostgresChange(InsertAction.self, table: "messages") { _ in + await channel.onPostgresChange(InsertAction.self, table: "messages") { _ in } .store(in: &subscriptions) - channel.onPostgresChange(UpdateAction.self, table: "messages") { _ in + await channel.onPostgresChange(UpdateAction.self, table: "messages") { _ in } .store(in: &subscriptions) - channel.onPostgresChange(DeleteAction.self, table: "messages") { _ in + await channel.onPostgresChange(DeleteAction.self, table: "messages") { _ in } .store(in: &subscriptions) let socketStatuses = LockIsolated([RealtimeClient.Status]()) - sut.onStatusChange { status in + await sut.onStatusChange { status in socketStatuses.withValue { $0.append(status) } } .store(in: &subscriptions) @@ -75,14 +75,14 @@ final class RealtimeTests: XCTestCase { XCTAssertEqual(socketStatuses.value, [.disconnected, .connecting, .connected]) - let messageTask = sut.mutableState.messageTask + let messageTask = await sut.messageTask XCTAssertNotNil(messageTask) - let heartbeatTask = sut.mutableState.heartbeatTask + let heartbeatTask = await sut.heartbeatTask XCTAssertNotNil(heartbeatTask) let channelStatuses = LockIsolated([RealtimeChannel.Status]()) - channel.onStatusChange { status in + await channel.onStatusChange { status in channelStatuses.withValue { $0.append(status) } @@ -92,14 +92,52 @@ final class RealtimeTests: XCTestCase { ws.mockReceive(.messagesSubscribed) await channel.subscribe() - expectNoDifference( - ws.sentMessages, - [.subscribeToMessages(ref: "1", joinRef: "1")] - ) + assertInlineSnapshot(of: ws.sentMessages, as: .json) { + """ + [ + { + "event" : "phx_join", + "join_ref" : "1", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + { + "event" : "INSERT", + "schema" : "public", + "table" : "messages" + }, + { + "event" : "UPDATE", + "schema" : "public", + "table" : "messages" + }, + { + "event" : "DELETE", + "schema" : "public", + "table" : "messages" + } + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "1", + "topic" : "realtime:public:messages" + } + ] + """ + } } func testSubscribeTimeout() async throws { - let channel = sut.channel("public:messages") + let channel = await sut.channel("public:messages") let joinEventCount = LockIsolated(0) ws.on { message in @@ -135,37 +173,56 @@ final class RealtimeTests: XCTestCase { let joinSentMessages = ws.sentMessages.filter { $0.event == "phx_join" } - let expectedMessages = try [ - RealtimeMessage( - joinRef: "1", - ref: "1", - topic: "realtime:public:messages", - event: "phx_join", - payload: JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - RealtimeMessage( - joinRef: "2", - ref: "2", - topic: "realtime:public:messages", - event: "phx_join", - payload: JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - ] - - expectNoDifference( - joinSentMessages, - expectedMessages - ) + assertInlineSnapshot(of: joinSentMessages, as: .json) { + """ + [ + { + "event" : "phx_join", + "join_ref" : "1", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "1", + "topic" : "realtime:public:messages" + }, + { + "event" : "phx_join", + "join_ref" : "3", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "3", + "topic" : "realtime:public:messages" + } + ] + """ + } } func testHeartbeat() async throws { @@ -209,7 +266,7 @@ final class RealtimeTests: XCTestCase { let statuses = LockIsolated<[RealtimeClient.Status]>([]) Task { - for await status in sut.statusChange { + for await status in await sut.statusChange { statuses.withValue { $0.append(status) } @@ -220,7 +277,7 @@ final class RealtimeTests: XCTestCase { await fulfillment(of: [sentHeartbeatExpectation], timeout: 2) - let pendingHeartbeatRef = sut.mutableState.pendingHeartbeatRef + let pendingHeartbeatRef = await sut.pendingHeartbeatRef XCTAssertNotNil(pendingHeartbeatRef) // Wait until next heartbeat @@ -245,7 +302,7 @@ final class RealtimeTests: XCTestCase { await http.when { $0.url.path.hasSuffix("broadcast") } return: { _ in - HTTPResponse( + await HTTPResponse( data: "{}".data(using: .utf8)!, response: HTTPURLResponse( url: self.sut.broadcastURL, @@ -256,7 +313,7 @@ final class RealtimeTests: XCTestCase { ) } - let channel = sut.channel("public:messages") { + let channel = await sut.channel("public:messages") { $0.broadcast.acknowledgeBroadcasts = true } @@ -274,19 +331,23 @@ final class RealtimeTests: XCTestCase { let body = try XCTUnwrap(request?.body) let json = try JSONDecoder().decode(JSONObject.self, from: body) - expectNoDifference( - json, - [ - "messages": [ - [ - "topic": "realtime:public:messages", - "event": "test", - "payload": ["value": 42], - "private": false, - ], - ], - ] - ) + + assertInlineSnapshot(of: json, as: .json) { + """ + { + "messages" : [ + { + "event" : "test", + "payload" : { + "value" : 42 + }, + "private" : false, + "topic" : "realtime:public:messages" + } + ] + } + """ + } } private func connectSocketAndWait() async { @@ -296,31 +357,6 @@ final class RealtimeTests: XCTestCase { } extension RealtimeMessage { - static func subscribeToMessages(ref: String?, joinRef: String?) -> RealtimeMessage { - Self( - joinRef: joinRef, - ref: ref, - topic: "realtime:public:messages", - event: "phx_join", - payload: [ - "access_token": "anon.api.key", - "config": [ - "broadcast": [ - "self": false, - "ack": false, - ], - "postgres_changes": [ - ["table": "messages", "event": "INSERT", "schema": "public"], - ["table": "messages", "schema": "public", "event": "UPDATE"], - ["schema": "public", "table": "messages", "event": "DELETE"], - ], - "presence": ["key": ""], - "private": false, - ], - ] - ) - } - static let messagesSubscribed = Self( joinRef: nil, ref: "2", @@ -337,17 +373,6 @@ extension RealtimeMessage { "status": "ok", ] ) - - static let heartbeatResponse = Self( - joinRef: nil, - ref: "1", - topic: "phoenix", - event: "phx_reply", - payload: [ - "response": [:], - "status": "ok", - ] - ) } struct TestLogger: SupabaseLogger { diff --git a/Tests/RealtimeTests/_PushTests.swift b/Tests/RealtimeTests/_PushTests.swift index d7e3c9e4..ebebd0af 100644 --- a/Tests/RealtimeTests/_PushTests.swift +++ b/Tests/RealtimeTests/_PushTests.swift @@ -42,7 +42,7 @@ final class _PushTests: XCTestCase { presence: .init(), isPrivate: false ), - socket: Socket(client: socket), + socket: socket, logger: nil ) let push = Push( diff --git a/Tests/SupabaseTests/SupabaseClientTests.swift b/Tests/SupabaseTests/SupabaseClientTests.swift index 73072d66..f03b2fd1 100644 --- a/Tests/SupabaseTests/SupabaseClientTests.swift +++ b/Tests/SupabaseTests/SupabaseClientTests.swift @@ -74,10 +74,10 @@ final class SupabaseClientTests: XCTestCase { XCTAssertEqual(client.functions.region, "ap-northeast-1") - let realtimeURL = client.realtime.url + let realtimeURL = await client.realtime.url XCTAssertEqual(realtimeURL.absoluteString, "https://project-ref.supabase.co/realtime/v1") - let realtimeOptions = client.realtime.options + let realtimeOptions = await client.realtime.options let expectedRealtimeHeader = client._headers.merged(with: ["custom_realtime_header_key": "custom_realtime_header_value"]) expectNoDifference(realtimeOptions.headers, expectedRealtimeHeader) XCTAssertIdentical(realtimeOptions.logger as? Logger, logger)